509 lines
18 KiB
Python
509 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
DuckLake Sync Module - Sync custodian YAML files to DuckLake.
|
|
|
|
This module syncs all custodian YAML files to the DuckLake lakehouse database.
|
|
DuckLake provides time travel, ACID transactions, and schema evolution.
|
|
|
|
Usage:
|
|
python -m scripts.sync.ducklake_sync [--dry-run] [--limit N]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
# Add project root to path
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR
|
|
|
|
# Configuration
|
|
DUCKLAKE_URL = os.getenv("DUCKLAKE_URL", "http://localhost:8765")
|
|
TABLE_NAME = "custodians_raw"
|
|
|
|
|
|
def extract_coordinates(data: dict) -> tuple[float | None, float | None, str]:
|
|
"""
|
|
Extract coordinates using cascading priority fallback.
|
|
|
|
Returns:
|
|
Tuple of (latitude, longitude, source) where source indicates
|
|
which pattern was used for provenance tracking.
|
|
"""
|
|
|
|
# Priority 1: manual_location_override (user-verified corrections)
|
|
override = data.get("manual_location_override", {})
|
|
if override:
|
|
coords = override.get("coordinates", {})
|
|
lat = coords.get("latitude")
|
|
lon = coords.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "manual_location_override"
|
|
|
|
# Priority 2: google_maps_enrichment (most accurate for operational institutions)
|
|
gm = data.get("google_maps_enrichment", {})
|
|
if gm:
|
|
coords = gm.get("coordinates", {})
|
|
lat = coords.get("latitude")
|
|
lon = coords.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "google_maps_enrichment"
|
|
|
|
# Priority 3: wikidata_enrichment.wikidata_coordinates (with precision)
|
|
wd = data.get("wikidata_enrichment", {})
|
|
if wd:
|
|
wd_coords = wd.get("wikidata_coordinates", {})
|
|
if wd_coords:
|
|
lat = wd_coords.get("latitude")
|
|
lon = wd_coords.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "wikidata_enrichment.wikidata_coordinates"
|
|
|
|
coords = wd.get("coordinates", {})
|
|
if coords:
|
|
lat = coords.get("latitude")
|
|
lon = coords.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "wikidata_enrichment.coordinates"
|
|
|
|
# Priority 4: ghcid.location_resolution.source_coordinates
|
|
ghcid = data.get("ghcid", {})
|
|
if ghcid:
|
|
loc_res = ghcid.get("location_resolution", {})
|
|
if loc_res:
|
|
src_coords = loc_res.get("source_coordinates", {})
|
|
if src_coords:
|
|
lat = src_coords.get("latitude")
|
|
lon = src_coords.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "ghcid.location_resolution.source_coordinates"
|
|
|
|
lat = loc_res.get("latitude")
|
|
lon = loc_res.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "ghcid.location_resolution"
|
|
|
|
# Priority 5: original_entry.locations[0]
|
|
original = data.get("original_entry", {})
|
|
if original:
|
|
locations = original.get("locations", [])
|
|
if locations and isinstance(locations, list) and len(locations) > 0:
|
|
loc = locations[0]
|
|
lat = loc.get("latitude")
|
|
lon = loc.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "original_entry.locations[0]"
|
|
|
|
# Priority 6: locations[0]
|
|
locations = data.get("locations", [])
|
|
if locations and isinstance(locations, list) and len(locations) > 0:
|
|
loc = locations[0]
|
|
lat = loc.get("latitude")
|
|
lon = loc.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "locations[0]"
|
|
|
|
coords = loc.get("coordinates", {})
|
|
if coords:
|
|
lat = coords.get("latitude")
|
|
lon = coords.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "locations[0].coordinates"
|
|
|
|
# Priority 7: location (root level)
|
|
location = data.get("location", {})
|
|
if location and isinstance(location, dict):
|
|
lat = location.get("latitude")
|
|
lon = location.get("longitude")
|
|
if lat is not None and lon is not None:
|
|
return lat, lon, "location"
|
|
|
|
return None, None, ""
|
|
|
|
|
|
def extract_city_country(data: dict) -> tuple[str, str]:
|
|
"""Extract city and country using cascading fallback."""
|
|
city = ""
|
|
country = ""
|
|
|
|
# Priority 1: Google Maps
|
|
gm = data.get("google_maps_enrichment", {})
|
|
if gm:
|
|
for comp in gm.get("address_components", []):
|
|
types = comp.get("types", [])
|
|
if "locality" in types and not city:
|
|
city = comp.get("long_name", "")
|
|
if "country" in types and not country:
|
|
country = comp.get("short_name", "")
|
|
|
|
# Priority 2: ghcid.location_resolution
|
|
if not city or not country:
|
|
ghcid = data.get("ghcid", {})
|
|
loc_res = ghcid.get("location_resolution", {}) if ghcid else {}
|
|
if loc_res:
|
|
if not city:
|
|
city = loc_res.get("geonames_name", "") or loc_res.get("city_name", "")
|
|
if not country:
|
|
country = loc_res.get("country_code", "")
|
|
|
|
# Priority 3: location (root level)
|
|
if not city or not country:
|
|
location = data.get("location", {})
|
|
if location:
|
|
if not city:
|
|
city = location.get("city", "")
|
|
if not country:
|
|
country = location.get("country", "")
|
|
|
|
# Priority 4: locations[0]
|
|
if not city or not country:
|
|
locations = data.get("locations", [])
|
|
if locations and len(locations) > 0:
|
|
loc = locations[0]
|
|
if not city:
|
|
city = loc.get("city", "")
|
|
if not country:
|
|
country = loc.get("country", "")
|
|
|
|
# Fallback: Extract from GHCID
|
|
if not country:
|
|
ghcid_current = data.get("ghcid", {}).get("ghcid_current", "")
|
|
if ghcid_current and len(ghcid_current) >= 2:
|
|
country = ghcid_current[:2]
|
|
|
|
return city, country
|
|
|
|
|
|
def extract_top_level_fields(data: dict) -> dict:
|
|
"""Extract and flatten top-level fields from YAML data."""
|
|
record = {
|
|
"file_name": data.get("_file_name", ""),
|
|
"ghcid_current": "",
|
|
"record_id": "",
|
|
"org_name": "",
|
|
"org_type": "",
|
|
"institution_type": "",
|
|
"wikidata_id": "",
|
|
"enrichment_status": data.get("enrichment_status", ""),
|
|
"processing_timestamp": data.get("processing_timestamp", ""),
|
|
"latitude": None,
|
|
"longitude": None,
|
|
"coordinate_source": "",
|
|
"formatted_address": "",
|
|
"city": "",
|
|
"country": "",
|
|
"postal_code": "",
|
|
"region": "",
|
|
"custodian_name": "",
|
|
"custodian_name_confidence": None,
|
|
"emic_name": "",
|
|
"name_language": "",
|
|
"google_rating": None,
|
|
"google_total_ratings": None,
|
|
"has_annual_report": False,
|
|
"has_financial_statement": False,
|
|
"has_anbi_publication": False,
|
|
"has_policy_document": False,
|
|
"annual_report_count": 0,
|
|
"financial_statement_count": 0,
|
|
"anbi_publication_count": 0,
|
|
"policy_document_count": 0,
|
|
"latest_annual_report_url": "",
|
|
"latest_annual_report_year": None,
|
|
"latest_financial_statement_url": "",
|
|
"latest_policy_document_url": "",
|
|
"latest_anbi_publication_url": "",
|
|
"timespan_begin": None,
|
|
"timespan_end": None,
|
|
"destruction_date": None,
|
|
"founding_date": None,
|
|
"dissolution_date": None,
|
|
"wikidata_inception": None,
|
|
"original_entry_json": "",
|
|
"wikidata_enrichment_json": "",
|
|
"google_maps_enrichment_json": "",
|
|
"web_enrichment_json": "",
|
|
"web_claims_json": "",
|
|
"ghcid_json": "",
|
|
"identifiers_json": "",
|
|
"provenance_json": "",
|
|
"genealogiewerkbalk_json": "",
|
|
"digital_platforms_json": "",
|
|
"service_area_json": "",
|
|
"timespan_json": "",
|
|
"time_of_destruction_json": "",
|
|
"conflict_status_json": "",
|
|
"temporal_extent_json": "",
|
|
"youtube_enrichment_json": "",
|
|
"unesco_mow_enrichment_json": "",
|
|
"ch_annotator_json": "",
|
|
"location_json": "",
|
|
}
|
|
|
|
# Extract GHCID
|
|
ghcid = data.get("ghcid", {})
|
|
if ghcid:
|
|
record["ghcid_current"] = ghcid.get("ghcid_current", "")
|
|
record["record_id"] = ghcid.get("record_id", "")
|
|
record["ghcid_json"] = json.dumps(ghcid, ensure_ascii=False, default=str)
|
|
|
|
# Extract original entry
|
|
original = data.get("original_entry", {})
|
|
if original:
|
|
record["org_name"] = original.get("organisatie", "") or original.get("name", "") or ""
|
|
org_types = original.get("type", [])
|
|
record["org_type"] = ",".join(org_types) if isinstance(org_types, list) else str(org_types)
|
|
record["institution_type"] = original.get("institution_type", "")
|
|
record["wikidata_id"] = original.get("wikidata_id", "")
|
|
record["original_entry_json"] = json.dumps(original, ensure_ascii=False, default=str)
|
|
|
|
# Extract coordinates
|
|
lat, lon, coord_source = extract_coordinates(data)
|
|
record["latitude"] = lat
|
|
record["longitude"] = lon
|
|
record["coordinate_source"] = coord_source
|
|
|
|
# Extract city and country
|
|
city, country = extract_city_country(data)
|
|
record["city"] = city
|
|
record["country"] = country
|
|
|
|
# Extract Google Maps data
|
|
gm = data.get("google_maps_enrichment", {})
|
|
if gm:
|
|
record["formatted_address"] = gm.get("formatted_address", "")
|
|
record["google_rating"] = gm.get("rating")
|
|
record["google_total_ratings"] = gm.get("total_ratings")
|
|
for comp in gm.get("address_components", []):
|
|
types = comp.get("types", [])
|
|
if "postal_code" in types:
|
|
record["postal_code"] = comp.get("long_name", "")
|
|
if "administrative_area_level_1" in types:
|
|
record["region"] = comp.get("short_name", "")
|
|
record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str)
|
|
|
|
# Fallback region from GHCID
|
|
if not record["region"] and ghcid:
|
|
loc_res = ghcid.get("location_resolution", {})
|
|
if loc_res:
|
|
record["region"] = loc_res.get("region_code", "")
|
|
|
|
# Extract custodian name
|
|
cn = data.get("custodian_name", {})
|
|
if cn:
|
|
record["custodian_name"] = cn.get("claim_value", "")
|
|
record["custodian_name_confidence"] = cn.get("confidence")
|
|
record["emic_name"] = cn.get("emic_name", "") or cn.get("claim_value_arabic", "") or ""
|
|
|
|
# Language inference
|
|
language_map = {
|
|
"NL": "nl", "BE": "nl", "DE": "de", "AT": "de", "CH": "de",
|
|
"FR": "fr", "ES": "es", "IT": "it", "PT": "pt", "BR": "pt",
|
|
"GB": "en", "US": "en", "AU": "en", "CA": "en", "IE": "en",
|
|
}
|
|
record["name_language"] = language_map.get(record["country"], "")
|
|
|
|
# Extract Wikidata enrichment
|
|
wd = data.get("wikidata_enrichment", {})
|
|
if wd:
|
|
wikidata_inception = wd.get("wikidata_inception", "") or wd.get("wikidata_founded", "")
|
|
record["wikidata_inception"] = wikidata_inception
|
|
if wikidata_inception and not record["founding_date"]:
|
|
record["founding_date"] = wikidata_inception
|
|
record["wikidata_enrichment_json"] = json.dumps(wd, ensure_ascii=False, default=str)
|
|
|
|
# Store complex objects as JSON
|
|
for field, key in [
|
|
("web_enrichment", "web_enrichment_json"),
|
|
("web_claims", "web_claims_json"),
|
|
("identifiers", "identifiers_json"),
|
|
("provenance", "provenance_json"),
|
|
("genealogiewerkbalk_enrichment", "genealogiewerkbalk_json"),
|
|
("digital_platforms", "digital_platforms_json"),
|
|
("service_area", "service_area_json"),
|
|
("timespan", "timespan_json"),
|
|
("youtube_enrichment", "youtube_enrichment_json"),
|
|
("unesco_mow_enrichment", "unesco_mow_enrichment_json"),
|
|
("ch_annotator", "ch_annotator_json"),
|
|
("location", "location_json"),
|
|
]:
|
|
if data.get(field):
|
|
record[key] = json.dumps(data[field], ensure_ascii=False, default=str)
|
|
|
|
return record
|
|
|
|
|
|
class DuckLakeSyncer(BaseSyncer):
|
|
"""Sync custodian YAML files to DuckLake."""
|
|
|
|
database_name = "ducklake"
|
|
|
|
def __init__(self, url: str = DUCKLAKE_URL, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.url = url
|
|
self.table_name = TABLE_NAME
|
|
|
|
def check_connection(self) -> bool:
|
|
"""Check if DuckLake server is running."""
|
|
try:
|
|
response = requests.get(f"{self.url}/", timeout=5)
|
|
status = response.json()
|
|
return status.get("status") == "healthy"
|
|
except Exception as e:
|
|
self.logger.error(f"DuckLake connection failed: {e}")
|
|
return False
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get DuckLake server status."""
|
|
try:
|
|
response = requests.get(f"{self.url}/", timeout=5)
|
|
return response.json()
|
|
except Exception:
|
|
return {"status": "unavailable"}
|
|
|
|
def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult:
|
|
"""Sync all YAML files to DuckLake."""
|
|
result = SyncResult(
|
|
database="ducklake",
|
|
status=SyncStatus.IN_PROGRESS,
|
|
start_time=datetime.now(timezone.utc),
|
|
)
|
|
|
|
# Check connection
|
|
if not dry_run and not self.check_connection():
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = f"Cannot connect to DuckLake at {self.url}"
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
# Load YAML files
|
|
yaml_files = self._list_yaml_files()
|
|
if limit:
|
|
yaml_files = yaml_files[:limit]
|
|
|
|
self.progress.total_files = len(yaml_files)
|
|
self.progress.current_database = "ducklake"
|
|
|
|
self.logger.info(f"Loading {len(yaml_files)} YAML files...")
|
|
|
|
records = []
|
|
for i, yaml_file in enumerate(yaml_files):
|
|
self.progress.processed_files = i + 1
|
|
self.progress.current_file = yaml_file.name
|
|
self._report_progress()
|
|
|
|
try:
|
|
with open(yaml_file, "r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if data:
|
|
data["_file_name"] = yaml_file.name
|
|
record = extract_top_level_fields(data)
|
|
records.append(record)
|
|
result.records_succeeded += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to load {yaml_file.name}: {e}")
|
|
result.records_failed += 1
|
|
self.progress.errors.append(f"{yaml_file.name}: {str(e)}")
|
|
|
|
result.records_processed = len(yaml_files)
|
|
|
|
if dry_run:
|
|
self.logger.info(f"[DRY RUN] Would upload {len(records)} records to DuckLake")
|
|
result.status = SyncStatus.SUCCESS
|
|
result.end_time = datetime.now(timezone.utc)
|
|
result.details["dry_run"] = True
|
|
return result
|
|
|
|
# Upload to DuckLake
|
|
if records:
|
|
try:
|
|
upload_result = self._upload_records(records)
|
|
result.details.update(upload_result)
|
|
result.status = SyncStatus.SUCCESS if result.records_failed == 0 else SyncStatus.PARTIAL
|
|
except Exception as e:
|
|
self.logger.error(f"Upload failed: {e}")
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = str(e)
|
|
else:
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = "No records to upload"
|
|
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
def _upload_records(self, records: list[dict]) -> dict:
|
|
"""Upload records to DuckLake via API."""
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
|
json.dump(records, f, ensure_ascii=False, default=str)
|
|
temp_path = f.name
|
|
|
|
try:
|
|
with open(temp_path, "rb") as f:
|
|
files = {"file": ("custodians.json", f, "application/json")}
|
|
data = {"table_name": self.table_name, "mode": "replace"}
|
|
|
|
response = requests.post(
|
|
f"{self.url}/upload",
|
|
files=files,
|
|
data=data,
|
|
timeout=600
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
finally:
|
|
os.unlink(temp_path)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Sync custodian YAML files to DuckLake")
|
|
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload")
|
|
parser.add_argument("--limit", type=int, help="Limit number of files to process")
|
|
parser.add_argument("--url", default=DUCKLAKE_URL, help="DuckLake server URL")
|
|
args = parser.parse_args()
|
|
|
|
import logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
)
|
|
|
|
syncer = DuckLakeSyncer(url=args.url)
|
|
|
|
print("=" * 60)
|
|
print("DuckLake Sync")
|
|
print("=" * 60)
|
|
|
|
if not args.dry_run:
|
|
print(f"Checking connection to {args.url}...")
|
|
status = syncer.get_status()
|
|
print(f" Status: {status.get('status', 'unknown')}")
|
|
print(f" Tables: {status.get('tables', 0)}")
|
|
print(f" Rows: {status.get('total_rows', 0)}")
|
|
|
|
result = syncer.sync(limit=args.limit, dry_run=args.dry_run)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Sync Result: {result.status.value.upper()}")
|
|
print(f" Processed: {result.records_processed}")
|
|
print(f" Succeeded: {result.records_succeeded}")
|
|
print(f" Failed: {result.records_failed}")
|
|
print(f" Duration: {result.duration_seconds:.2f}s")
|
|
if result.error_message:
|
|
print(f" Error: {result.error_message}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|