glam/scripts/sync/ducklake_sync.py
2025-12-14 17:09:55 +01:00

509 lines
18 KiB
Python

#!/usr/bin/env python3
"""
DuckLake Sync Module - Sync custodian YAML files to DuckLake.
This module syncs all custodian YAML files to the DuckLake lakehouse database.
DuckLake provides time travel, ACID transactions, and schema evolution.
Usage:
python -m scripts.sync.ducklake_sync [--dry-run] [--limit N]
"""
import argparse
import json
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import requests
import yaml
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR
# Configuration
DUCKLAKE_URL = os.getenv("DUCKLAKE_URL", "http://localhost:8765")
TABLE_NAME = "custodians_raw"
def extract_coordinates(data: dict) -> tuple[float | None, float | None, str]:
"""
Extract coordinates using cascading priority fallback.
Returns:
Tuple of (latitude, longitude, source) where source indicates
which pattern was used for provenance tracking.
"""
# Priority 1: manual_location_override (user-verified corrections)
override = data.get("manual_location_override", {})
if override:
coords = override.get("coordinates", {})
lat = coords.get("latitude")
lon = coords.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "manual_location_override"
# Priority 2: google_maps_enrichment (most accurate for operational institutions)
gm = data.get("google_maps_enrichment", {})
if gm:
coords = gm.get("coordinates", {})
lat = coords.get("latitude")
lon = coords.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "google_maps_enrichment"
# Priority 3: wikidata_enrichment.wikidata_coordinates (with precision)
wd = data.get("wikidata_enrichment", {})
if wd:
wd_coords = wd.get("wikidata_coordinates", {})
if wd_coords:
lat = wd_coords.get("latitude")
lon = wd_coords.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "wikidata_enrichment.wikidata_coordinates"
coords = wd.get("coordinates", {})
if coords:
lat = coords.get("latitude")
lon = coords.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "wikidata_enrichment.coordinates"
# Priority 4: ghcid.location_resolution.source_coordinates
ghcid = data.get("ghcid", {})
if ghcid:
loc_res = ghcid.get("location_resolution", {})
if loc_res:
src_coords = loc_res.get("source_coordinates", {})
if src_coords:
lat = src_coords.get("latitude")
lon = src_coords.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "ghcid.location_resolution.source_coordinates"
lat = loc_res.get("latitude")
lon = loc_res.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "ghcid.location_resolution"
# Priority 5: original_entry.locations[0]
original = data.get("original_entry", {})
if original:
locations = original.get("locations", [])
if locations and isinstance(locations, list) and len(locations) > 0:
loc = locations[0]
lat = loc.get("latitude")
lon = loc.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "original_entry.locations[0]"
# Priority 6: locations[0]
locations = data.get("locations", [])
if locations and isinstance(locations, list) and len(locations) > 0:
loc = locations[0]
lat = loc.get("latitude")
lon = loc.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "locations[0]"
coords = loc.get("coordinates", {})
if coords:
lat = coords.get("latitude")
lon = coords.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "locations[0].coordinates"
# Priority 7: location (root level)
location = data.get("location", {})
if location and isinstance(location, dict):
lat = location.get("latitude")
lon = location.get("longitude")
if lat is not None and lon is not None:
return lat, lon, "location"
return None, None, ""
def extract_city_country(data: dict) -> tuple[str, str]:
"""Extract city and country using cascading fallback."""
city = ""
country = ""
# Priority 1: Google Maps
gm = data.get("google_maps_enrichment", {})
if gm:
for comp in gm.get("address_components", []):
types = comp.get("types", [])
if "locality" in types and not city:
city = comp.get("long_name", "")
if "country" in types and not country:
country = comp.get("short_name", "")
# Priority 2: ghcid.location_resolution
if not city or not country:
ghcid = data.get("ghcid", {})
loc_res = ghcid.get("location_resolution", {}) if ghcid else {}
if loc_res:
if not city:
city = loc_res.get("geonames_name", "") or loc_res.get("city_name", "")
if not country:
country = loc_res.get("country_code", "")
# Priority 3: location (root level)
if not city or not country:
location = data.get("location", {})
if location:
if not city:
city = location.get("city", "")
if not country:
country = location.get("country", "")
# Priority 4: locations[0]
if not city or not country:
locations = data.get("locations", [])
if locations and len(locations) > 0:
loc = locations[0]
if not city:
city = loc.get("city", "")
if not country:
country = loc.get("country", "")
# Fallback: Extract from GHCID
if not country:
ghcid_current = data.get("ghcid", {}).get("ghcid_current", "")
if ghcid_current and len(ghcid_current) >= 2:
country = ghcid_current[:2]
return city, country
def extract_top_level_fields(data: dict) -> dict:
"""Extract and flatten top-level fields from YAML data."""
record = {
"file_name": data.get("_file_name", ""),
"ghcid_current": "",
"record_id": "",
"org_name": "",
"org_type": "",
"institution_type": "",
"wikidata_id": "",
"enrichment_status": data.get("enrichment_status", ""),
"processing_timestamp": data.get("processing_timestamp", ""),
"latitude": None,
"longitude": None,
"coordinate_source": "",
"formatted_address": "",
"city": "",
"country": "",
"postal_code": "",
"region": "",
"custodian_name": "",
"custodian_name_confidence": None,
"emic_name": "",
"name_language": "",
"google_rating": None,
"google_total_ratings": None,
"has_annual_report": False,
"has_financial_statement": False,
"has_anbi_publication": False,
"has_policy_document": False,
"annual_report_count": 0,
"financial_statement_count": 0,
"anbi_publication_count": 0,
"policy_document_count": 0,
"latest_annual_report_url": "",
"latest_annual_report_year": None,
"latest_financial_statement_url": "",
"latest_policy_document_url": "",
"latest_anbi_publication_url": "",
"timespan_begin": None,
"timespan_end": None,
"destruction_date": None,
"founding_date": None,
"dissolution_date": None,
"wikidata_inception": None,
"original_entry_json": "",
"wikidata_enrichment_json": "",
"google_maps_enrichment_json": "",
"web_enrichment_json": "",
"web_claims_json": "",
"ghcid_json": "",
"identifiers_json": "",
"provenance_json": "",
"genealogiewerkbalk_json": "",
"digital_platforms_json": "",
"service_area_json": "",
"timespan_json": "",
"time_of_destruction_json": "",
"conflict_status_json": "",
"temporal_extent_json": "",
"youtube_enrichment_json": "",
"unesco_mow_enrichment_json": "",
"ch_annotator_json": "",
"location_json": "",
}
# Extract GHCID
ghcid = data.get("ghcid", {})
if ghcid:
record["ghcid_current"] = ghcid.get("ghcid_current", "")
record["record_id"] = ghcid.get("record_id", "")
record["ghcid_json"] = json.dumps(ghcid, ensure_ascii=False, default=str)
# Extract original entry
original = data.get("original_entry", {})
if original:
record["org_name"] = original.get("organisatie", "") or original.get("name", "") or ""
org_types = original.get("type", [])
record["org_type"] = ",".join(org_types) if isinstance(org_types, list) else str(org_types)
record["institution_type"] = original.get("institution_type", "")
record["wikidata_id"] = original.get("wikidata_id", "")
record["original_entry_json"] = json.dumps(original, ensure_ascii=False, default=str)
# Extract coordinates
lat, lon, coord_source = extract_coordinates(data)
record["latitude"] = lat
record["longitude"] = lon
record["coordinate_source"] = coord_source
# Extract city and country
city, country = extract_city_country(data)
record["city"] = city
record["country"] = country
# Extract Google Maps data
gm = data.get("google_maps_enrichment", {})
if gm:
record["formatted_address"] = gm.get("formatted_address", "")
record["google_rating"] = gm.get("rating")
record["google_total_ratings"] = gm.get("total_ratings")
for comp in gm.get("address_components", []):
types = comp.get("types", [])
if "postal_code" in types:
record["postal_code"] = comp.get("long_name", "")
if "administrative_area_level_1" in types:
record["region"] = comp.get("short_name", "")
record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str)
# Fallback region from GHCID
if not record["region"] and ghcid:
loc_res = ghcid.get("location_resolution", {})
if loc_res:
record["region"] = loc_res.get("region_code", "")
# Extract custodian name
cn = data.get("custodian_name", {})
if cn:
record["custodian_name"] = cn.get("claim_value", "")
record["custodian_name_confidence"] = cn.get("confidence")
record["emic_name"] = cn.get("emic_name", "") or cn.get("claim_value_arabic", "") or ""
# Language inference
language_map = {
"NL": "nl", "BE": "nl", "DE": "de", "AT": "de", "CH": "de",
"FR": "fr", "ES": "es", "IT": "it", "PT": "pt", "BR": "pt",
"GB": "en", "US": "en", "AU": "en", "CA": "en", "IE": "en",
}
record["name_language"] = language_map.get(record["country"], "")
# Extract Wikidata enrichment
wd = data.get("wikidata_enrichment", {})
if wd:
wikidata_inception = wd.get("wikidata_inception", "") or wd.get("wikidata_founded", "")
record["wikidata_inception"] = wikidata_inception
if wikidata_inception and not record["founding_date"]:
record["founding_date"] = wikidata_inception
record["wikidata_enrichment_json"] = json.dumps(wd, ensure_ascii=False, default=str)
# Store complex objects as JSON
for field, key in [
("web_enrichment", "web_enrichment_json"),
("web_claims", "web_claims_json"),
("identifiers", "identifiers_json"),
("provenance", "provenance_json"),
("genealogiewerkbalk_enrichment", "genealogiewerkbalk_json"),
("digital_platforms", "digital_platforms_json"),
("service_area", "service_area_json"),
("timespan", "timespan_json"),
("youtube_enrichment", "youtube_enrichment_json"),
("unesco_mow_enrichment", "unesco_mow_enrichment_json"),
("ch_annotator", "ch_annotator_json"),
("location", "location_json"),
]:
if data.get(field):
record[key] = json.dumps(data[field], ensure_ascii=False, default=str)
return record
class DuckLakeSyncer(BaseSyncer):
"""Sync custodian YAML files to DuckLake."""
database_name = "ducklake"
def __init__(self, url: str = DUCKLAKE_URL, **kwargs):
super().__init__(**kwargs)
self.url = url
self.table_name = TABLE_NAME
def check_connection(self) -> bool:
"""Check if DuckLake server is running."""
try:
response = requests.get(f"{self.url}/", timeout=5)
status = response.json()
return status.get("status") == "healthy"
except Exception as e:
self.logger.error(f"DuckLake connection failed: {e}")
return False
def get_status(self) -> dict:
"""Get DuckLake server status."""
try:
response = requests.get(f"{self.url}/", timeout=5)
return response.json()
except Exception:
return {"status": "unavailable"}
def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult:
"""Sync all YAML files to DuckLake."""
result = SyncResult(
database="ducklake",
status=SyncStatus.IN_PROGRESS,
start_time=datetime.now(timezone.utc),
)
# Check connection
if not dry_run and not self.check_connection():
result.status = SyncStatus.FAILED
result.error_message = f"Cannot connect to DuckLake at {self.url}"
result.end_time = datetime.now(timezone.utc)
return result
# Load YAML files
yaml_files = self._list_yaml_files()
if limit:
yaml_files = yaml_files[:limit]
self.progress.total_files = len(yaml_files)
self.progress.current_database = "ducklake"
self.logger.info(f"Loading {len(yaml_files)} YAML files...")
records = []
for i, yaml_file in enumerate(yaml_files):
self.progress.processed_files = i + 1
self.progress.current_file = yaml_file.name
self._report_progress()
try:
with open(yaml_file, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data:
data["_file_name"] = yaml_file.name
record = extract_top_level_fields(data)
records.append(record)
result.records_succeeded += 1
except Exception as e:
self.logger.warning(f"Failed to load {yaml_file.name}: {e}")
result.records_failed += 1
self.progress.errors.append(f"{yaml_file.name}: {str(e)}")
result.records_processed = len(yaml_files)
if dry_run:
self.logger.info(f"[DRY RUN] Would upload {len(records)} records to DuckLake")
result.status = SyncStatus.SUCCESS
result.end_time = datetime.now(timezone.utc)
result.details["dry_run"] = True
return result
# Upload to DuckLake
if records:
try:
upload_result = self._upload_records(records)
result.details.update(upload_result)
result.status = SyncStatus.SUCCESS if result.records_failed == 0 else SyncStatus.PARTIAL
except Exception as e:
self.logger.error(f"Upload failed: {e}")
result.status = SyncStatus.FAILED
result.error_message = str(e)
else:
result.status = SyncStatus.FAILED
result.error_message = "No records to upload"
result.end_time = datetime.now(timezone.utc)
return result
def _upload_records(self, records: list[dict]) -> dict:
"""Upload records to DuckLake via API."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(records, f, ensure_ascii=False, default=str)
temp_path = f.name
try:
with open(temp_path, "rb") as f:
files = {"file": ("custodians.json", f, "application/json")}
data = {"table_name": self.table_name, "mode": "replace"}
response = requests.post(
f"{self.url}/upload",
files=files,
data=data,
timeout=600
)
response.raise_for_status()
return response.json()
finally:
os.unlink(temp_path)
def main():
parser = argparse.ArgumentParser(description="Sync custodian YAML files to DuckLake")
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload")
parser.add_argument("--limit", type=int, help="Limit number of files to process")
parser.add_argument("--url", default=DUCKLAKE_URL, help="DuckLake server URL")
args = parser.parse_args()
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
syncer = DuckLakeSyncer(url=args.url)
print("=" * 60)
print("DuckLake Sync")
print("=" * 60)
if not args.dry_run:
print(f"Checking connection to {args.url}...")
status = syncer.get_status()
print(f" Status: {status.get('status', 'unknown')}")
print(f" Tables: {status.get('tables', 0)}")
print(f" Rows: {status.get('total_rows', 0)}")
result = syncer.sync(limit=args.limit, dry_run=args.dry_run)
print("\n" + "=" * 60)
print(f"Sync Result: {result.status.value.upper()}")
print(f" Processed: {result.records_processed}")
print(f" Succeeded: {result.records_succeeded}")
print(f" Failed: {result.records_failed}")
print(f" Duration: {result.duration_seconds:.2f}s")
if result.error_message:
print(f" Error: {result.error_message}")
print("=" * 60)
if __name__ == "__main__":
main()