#!/usr/bin/env python3 """ DuckLake Sync Module - Sync custodian YAML files to DuckLake. This module syncs all custodian YAML files to the DuckLake lakehouse database. DuckLake provides time travel, ACID transactions, and schema evolution. Usage: python -m scripts.sync.ducklake_sync [--dry-run] [--limit N] """ import argparse import json import os import sys import tempfile from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import requests import yaml # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR # Configuration DUCKLAKE_URL = os.getenv("DUCKLAKE_URL", "http://localhost:8765") TABLE_NAME = "custodians_raw" def extract_coordinates(data: dict) -> tuple[float | None, float | None, str]: """ Extract coordinates using cascading priority fallback. Returns: Tuple of (latitude, longitude, source) where source indicates which pattern was used for provenance tracking. """ # Priority 1: manual_location_override (user-verified corrections) override = data.get("manual_location_override", {}) if override: coords = override.get("coordinates", {}) lat = coords.get("latitude") lon = coords.get("longitude") if lat is not None and lon is not None: return lat, lon, "manual_location_override" # Priority 2: google_maps_enrichment (most accurate for operational institutions) gm = data.get("google_maps_enrichment", {}) if gm: coords = gm.get("coordinates", {}) lat = coords.get("latitude") lon = coords.get("longitude") if lat is not None and lon is not None: return lat, lon, "google_maps_enrichment" # Priority 3: wikidata_enrichment.wikidata_coordinates (with precision) wd = data.get("wikidata_enrichment", {}) if wd: wd_coords = wd.get("wikidata_coordinates", {}) if wd_coords: lat = wd_coords.get("latitude") lon = wd_coords.get("longitude") if lat is not None and lon is not None: return lat, lon, "wikidata_enrichment.wikidata_coordinates" coords = wd.get("coordinates", {}) if coords: lat = coords.get("latitude") lon = coords.get("longitude") if lat is not None and lon is not None: return lat, lon, "wikidata_enrichment.coordinates" # Priority 4: ghcid.location_resolution.source_coordinates ghcid = data.get("ghcid", {}) if ghcid: loc_res = ghcid.get("location_resolution", {}) if loc_res: src_coords = loc_res.get("source_coordinates", {}) if src_coords: lat = src_coords.get("latitude") lon = src_coords.get("longitude") if lat is not None and lon is not None: return lat, lon, "ghcid.location_resolution.source_coordinates" lat = loc_res.get("latitude") lon = loc_res.get("longitude") if lat is not None and lon is not None: return lat, lon, "ghcid.location_resolution" # Priority 5: original_entry.locations[0] original = data.get("original_entry", {}) if original: locations = original.get("locations", []) if locations and isinstance(locations, list) and len(locations) > 0: loc = locations[0] lat = loc.get("latitude") lon = loc.get("longitude") if lat is not None and lon is not None: return lat, lon, "original_entry.locations[0]" # Priority 6: locations[0] locations = data.get("locations", []) if locations and isinstance(locations, list) and len(locations) > 0: loc = locations[0] lat = loc.get("latitude") lon = loc.get("longitude") if lat is not None and lon is not None: return lat, lon, "locations[0]" coords = loc.get("coordinates", {}) if coords: lat = coords.get("latitude") lon = coords.get("longitude") if lat is not None and lon is not None: return lat, lon, "locations[0].coordinates" # Priority 7: location (root level) location = data.get("location", {}) if location and isinstance(location, dict): lat = location.get("latitude") lon = location.get("longitude") if lat is not None and lon is not None: return lat, lon, "location" return None, None, "" def extract_city_country(data: dict) -> tuple[str, str]: """Extract city and country using cascading fallback.""" city = "" country = "" # Priority 1: Google Maps gm = data.get("google_maps_enrichment", {}) if gm: for comp in gm.get("address_components", []): types = comp.get("types", []) if "locality" in types and not city: city = comp.get("long_name", "") if "country" in types and not country: country = comp.get("short_name", "") # Priority 2: ghcid.location_resolution if not city or not country: ghcid = data.get("ghcid", {}) loc_res = ghcid.get("location_resolution", {}) if ghcid else {} if loc_res: if not city: city = loc_res.get("geonames_name", "") or loc_res.get("city_name", "") if not country: country = loc_res.get("country_code", "") # Priority 3: location (root level) if not city or not country: location = data.get("location", {}) if location: if not city: city = location.get("city", "") if not country: country = location.get("country", "") # Priority 4: locations[0] if not city or not country: locations = data.get("locations", []) if locations and len(locations) > 0: loc = locations[0] if not city: city = loc.get("city", "") if not country: country = loc.get("country", "") # Fallback: Extract from GHCID if not country: ghcid_current = data.get("ghcid", {}).get("ghcid_current", "") if ghcid_current and len(ghcid_current) >= 2: country = ghcid_current[:2] return city, country def extract_top_level_fields(data: dict) -> dict: """Extract and flatten top-level fields from YAML data.""" record = { "file_name": data.get("_file_name", ""), "ghcid_current": "", "record_id": "", "org_name": "", "org_type": "", "institution_type": "", "wikidata_id": "", "enrichment_status": data.get("enrichment_status", ""), "processing_timestamp": data.get("processing_timestamp", ""), "latitude": None, "longitude": None, "coordinate_source": "", "formatted_address": "", "city": "", "country": "", "postal_code": "", "region": "", "custodian_name": "", "custodian_name_confidence": None, "emic_name": "", "name_language": "", "google_rating": None, "google_total_ratings": None, "has_annual_report": False, "has_financial_statement": False, "has_anbi_publication": False, "has_policy_document": False, "annual_report_count": 0, "financial_statement_count": 0, "anbi_publication_count": 0, "policy_document_count": 0, "latest_annual_report_url": "", "latest_annual_report_year": None, "latest_financial_statement_url": "", "latest_policy_document_url": "", "latest_anbi_publication_url": "", "timespan_begin": None, "timespan_end": None, "destruction_date": None, "founding_date": None, "dissolution_date": None, "wikidata_inception": None, "original_entry_json": "", "wikidata_enrichment_json": "", "google_maps_enrichment_json": "", "web_enrichment_json": "", "web_claims_json": "", "ghcid_json": "", "identifiers_json": "", "provenance_json": "", "genealogiewerkbalk_json": "", "digital_platforms_json": "", "service_area_json": "", "timespan_json": "", "time_of_destruction_json": "", "conflict_status_json": "", "temporal_extent_json": "", "youtube_enrichment_json": "", "unesco_mow_enrichment_json": "", "ch_annotator_json": "", "location_json": "", } # Extract GHCID ghcid = data.get("ghcid", {}) if ghcid: record["ghcid_current"] = ghcid.get("ghcid_current", "") record["record_id"] = ghcid.get("record_id", "") record["ghcid_json"] = json.dumps(ghcid, ensure_ascii=False, default=str) # Extract original entry original = data.get("original_entry", {}) if original: record["org_name"] = original.get("organisatie", "") or original.get("name", "") or "" org_types = original.get("type", []) record["org_type"] = ",".join(org_types) if isinstance(org_types, list) else str(org_types) record["institution_type"] = original.get("institution_type", "") record["wikidata_id"] = original.get("wikidata_id", "") record["original_entry_json"] = json.dumps(original, ensure_ascii=False, default=str) # Extract coordinates lat, lon, coord_source = extract_coordinates(data) record["latitude"] = lat record["longitude"] = lon record["coordinate_source"] = coord_source # Extract city and country city, country = extract_city_country(data) record["city"] = city record["country"] = country # Extract Google Maps data gm = data.get("google_maps_enrichment", {}) if gm: record["formatted_address"] = gm.get("formatted_address", "") record["google_rating"] = gm.get("rating") record["google_total_ratings"] = gm.get("total_ratings") for comp in gm.get("address_components", []): types = comp.get("types", []) if "postal_code" in types: record["postal_code"] = comp.get("long_name", "") if "administrative_area_level_1" in types: record["region"] = comp.get("short_name", "") record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str) # Fallback region from GHCID if not record["region"] and ghcid: loc_res = ghcid.get("location_resolution", {}) if loc_res: record["region"] = loc_res.get("region_code", "") # Extract custodian name cn = data.get("custodian_name", {}) if cn: record["custodian_name"] = cn.get("claim_value", "") record["custodian_name_confidence"] = cn.get("confidence") record["emic_name"] = cn.get("emic_name", "") or cn.get("claim_value_arabic", "") or "" # Language inference language_map = { "NL": "nl", "BE": "nl", "DE": "de", "AT": "de", "CH": "de", "FR": "fr", "ES": "es", "IT": "it", "PT": "pt", "BR": "pt", "GB": "en", "US": "en", "AU": "en", "CA": "en", "IE": "en", } record["name_language"] = language_map.get(record["country"], "") # Extract Wikidata enrichment wd = data.get("wikidata_enrichment", {}) if wd: wikidata_inception = wd.get("wikidata_inception", "") or wd.get("wikidata_founded", "") record["wikidata_inception"] = wikidata_inception if wikidata_inception and not record["founding_date"]: record["founding_date"] = wikidata_inception record["wikidata_enrichment_json"] = json.dumps(wd, ensure_ascii=False, default=str) # Store complex objects as JSON for field, key in [ ("web_enrichment", "web_enrichment_json"), ("web_claims", "web_claims_json"), ("identifiers", "identifiers_json"), ("provenance", "provenance_json"), ("genealogiewerkbalk_enrichment", "genealogiewerkbalk_json"), ("digital_platforms", "digital_platforms_json"), ("service_area", "service_area_json"), ("timespan", "timespan_json"), ("youtube_enrichment", "youtube_enrichment_json"), ("unesco_mow_enrichment", "unesco_mow_enrichment_json"), ("ch_annotator", "ch_annotator_json"), ("location", "location_json"), ]: if data.get(field): record[key] = json.dumps(data[field], ensure_ascii=False, default=str) return record class DuckLakeSyncer(BaseSyncer): """Sync custodian YAML files to DuckLake.""" database_name = "ducklake" def __init__(self, url: str = DUCKLAKE_URL, **kwargs): super().__init__(**kwargs) self.url = url self.table_name = TABLE_NAME def check_connection(self) -> bool: """Check if DuckLake server is running.""" try: response = requests.get(f"{self.url}/", timeout=5) status = response.json() return status.get("status") == "healthy" except Exception as e: self.logger.error(f"DuckLake connection failed: {e}") return False def get_status(self) -> dict: """Get DuckLake server status.""" try: response = requests.get(f"{self.url}/", timeout=5) return response.json() except Exception: return {"status": "unavailable"} def sync(self, limit: Optional[int] = None, dry_run: bool = False) -> SyncResult: """Sync all YAML files to DuckLake.""" result = SyncResult( database="ducklake", status=SyncStatus.IN_PROGRESS, start_time=datetime.now(timezone.utc), ) # Check connection if not dry_run and not self.check_connection(): result.status = SyncStatus.FAILED result.error_message = f"Cannot connect to DuckLake at {self.url}" result.end_time = datetime.now(timezone.utc) return result # Load YAML files yaml_files = self._list_yaml_files() if limit: yaml_files = yaml_files[:limit] self.progress.total_files = len(yaml_files) self.progress.current_database = "ducklake" self.logger.info(f"Loading {len(yaml_files)} YAML files...") records = [] for i, yaml_file in enumerate(yaml_files): self.progress.processed_files = i + 1 self.progress.current_file = yaml_file.name self._report_progress() try: with open(yaml_file, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data: data["_file_name"] = yaml_file.name record = extract_top_level_fields(data) records.append(record) result.records_succeeded += 1 except Exception as e: self.logger.warning(f"Failed to load {yaml_file.name}: {e}") result.records_failed += 1 self.progress.errors.append(f"{yaml_file.name}: {str(e)}") result.records_processed = len(yaml_files) if dry_run: self.logger.info(f"[DRY RUN] Would upload {len(records)} records to DuckLake") result.status = SyncStatus.SUCCESS result.end_time = datetime.now(timezone.utc) result.details["dry_run"] = True return result # Upload to DuckLake if records: try: upload_result = self._upload_records(records) result.details.update(upload_result) result.status = SyncStatus.SUCCESS if result.records_failed == 0 else SyncStatus.PARTIAL except Exception as e: self.logger.error(f"Upload failed: {e}") result.status = SyncStatus.FAILED result.error_message = str(e) else: result.status = SyncStatus.FAILED result.error_message = "No records to upload" result.end_time = datetime.now(timezone.utc) return result def _upload_records(self, records: list[dict]) -> dict: """Upload records to DuckLake via API.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(records, f, ensure_ascii=False, default=str) temp_path = f.name try: with open(temp_path, "rb") as f: files = {"file": ("custodians.json", f, "application/json")} data = {"table_name": self.table_name, "mode": "replace"} response = requests.post( f"{self.url}/upload", files=files, data=data, timeout=600 ) response.raise_for_status() return response.json() finally: os.unlink(temp_path) def main(): parser = argparse.ArgumentParser(description="Sync custodian YAML files to DuckLake") parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument("--url", default=DUCKLAKE_URL, help="DuckLake server URL") args = parser.parse_args() import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) syncer = DuckLakeSyncer(url=args.url) print("=" * 60) print("DuckLake Sync") print("=" * 60) if not args.dry_run: print(f"Checking connection to {args.url}...") status = syncer.get_status() print(f" Status: {status.get('status', 'unknown')}") print(f" Tables: {status.get('tables', 0)}") print(f" Rows: {status.get('total_rows', 0)}") result = syncer.sync(limit=args.limit, dry_run=args.dry_run) print("\n" + "=" * 60) print(f"Sync Result: {result.status.value.upper()}") print(f" Processed: {result.records_processed}") print(f" Succeeded: {result.records_succeeded}") print(f" Failed: {result.records_failed}") print(f" Duration: {result.duration_seconds:.2f}s") if result.error_message: print(f" Error: {result.error_message}") print("=" * 60) if __name__ == "__main__": main()