#!/usr/bin/env python3 """ Load all custodian YAML files into DuckLake as raw enriched data. This script reads all YAML files from data/custodian/ and uploads them to DuckLake as a single table for the bronze/raw data lake layer. Usage: python scripts/load_custodians_to_ducklake.py [--dry-run] [--limit N] API Endpoint: http://localhost:8765/upload """ import argparse import json import os import sys import tempfile from datetime import datetime from pathlib import Path import requests import yaml # Configuration DUCKLAKE_URL = os.getenv("DUCKLAKE_URL", "http://localhost:8765") DEFAULT_CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian" TABLE_NAME = "custodians_raw" def flatten_for_json(obj, parent_key='', sep='_'): """ Flatten nested dict/list structure for better DuckDB compatibility. Keeps complex nested objects as JSON strings. """ items = [] if isinstance(obj, dict): for k, v in obj.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, (dict, list)): # Keep complex nested structures as JSON strings items.append((new_key, json.dumps(v, ensure_ascii=False, default=str))) else: items.append((new_key, v)) return dict(items) def extract_top_level_fields(data: dict) -> dict: """ Extract and flatten top-level fields from YAML data. Complex nested objects are stored as JSON strings. """ record = { # File identification "file_name": data.get("_file_name", ""), "ghcid_current": "", "record_id": "", # Original entry (flattened) "org_name": "", "org_type": "", "wikidata_id": "", # Enrichment status "enrichment_status": data.get("enrichment_status", ""), "processing_timestamp": data.get("processing_timestamp", ""), # Location (from Google Maps) "latitude": None, "longitude": None, "formatted_address": "", "city": "", "country": "", "postal_code": "", # Custodian name consensus "custodian_name": "", "custodian_name_confidence": None, # Ratings "google_rating": None, "google_total_ratings": None, # Complex nested objects as JSON strings "original_entry_json": "", "wikidata_enrichment_json": "", "google_maps_enrichment_json": "", "web_enrichment_json": "", "web_claims_json": "", "ghcid_json": "", "identifiers_json": "", "provenance_json": "", "genealogiewerkbalk_json": "", "digital_platforms_json": "", } # Extract GHCID ghcid = data.get("ghcid", {}) if ghcid: record["ghcid_current"] = ghcid.get("ghcid_current", "") record["record_id"] = ghcid.get("record_id", "") record["ghcid_json"] = json.dumps(ghcid, ensure_ascii=False, default=str) # Extract original entry original = data.get("original_entry", {}) if original: record["org_name"] = original.get("organisatie", "") org_types = original.get("type", []) record["org_type"] = ",".join(org_types) if isinstance(org_types, list) else str(org_types) record["wikidata_id"] = original.get("wikidata_id", "") record["original_entry_json"] = json.dumps(original, ensure_ascii=False, default=str) # Extract Google Maps data gm = data.get("google_maps_enrichment", {}) if gm: coords = gm.get("coordinates", {}) record["latitude"] = coords.get("latitude") record["longitude"] = coords.get("longitude") record["formatted_address"] = gm.get("formatted_address", "") record["google_rating"] = gm.get("rating") record["google_total_ratings"] = gm.get("total_ratings") # Extract city and country from address components for comp in gm.get("address_components", []): types = comp.get("types", []) if "locality" in types: record["city"] = comp.get("long_name", "") if "country" in types: record["country"] = comp.get("short_name", "") if "postal_code" in types: record["postal_code"] = comp.get("long_name", "") record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str) # Fallback: Extract location from locations array if not set from Google Maps locations = data.get("locations", []) if locations and isinstance(locations, list) and len(locations) > 0: loc = locations[0] # Use first location if not record["city"] and loc.get("city"): record["city"] = loc.get("city", "") if not record["country"] and loc.get("country"): record["country"] = loc.get("country", "") if record["latitude"] is None and loc.get("latitude"): record["latitude"] = loc.get("latitude") if record["longitude"] is None and loc.get("longitude"): record["longitude"] = loc.get("longitude") # Fallback: Extract country from GHCID location_resolution ghcid = data.get("ghcid", {}) if ghcid and not record["country"]: loc_res = ghcid.get("location_resolution", {}) if loc_res.get("country_code"): record["country"] = loc_res.get("country_code", "") # Also try to get city from geonames_name if not set if not record["city"] and loc_res.get("geonames_name"): record["city"] = loc_res.get("geonames_name", "") # Ultimate fallback: Extract country from GHCID string (first 2 chars) ghcid_current = record.get("ghcid_current", "") if not record["country"] and ghcid_current and len(ghcid_current) >= 2: record["country"] = ghcid_current[:2] # Extract custodian name consensus cn = data.get("custodian_name", {}) if cn: record["custodian_name"] = cn.get("claim_value", "") record["custodian_name_confidence"] = cn.get("confidence") # Store complex objects as JSON if data.get("wikidata_enrichment"): record["wikidata_enrichment_json"] = json.dumps( data["wikidata_enrichment"], ensure_ascii=False, default=str ) if data.get("web_enrichment"): record["web_enrichment_json"] = json.dumps( data["web_enrichment"], ensure_ascii=False, default=str ) if data.get("web_claims"): record["web_claims_json"] = json.dumps( data["web_claims"], ensure_ascii=False, default=str ) if data.get("identifiers"): record["identifiers_json"] = json.dumps( data["identifiers"], ensure_ascii=False, default=str ) if data.get("provenance"): record["provenance_json"] = json.dumps( data["provenance"], ensure_ascii=False, default=str ) if data.get("genealogiewerkbalk_enrichment"): record["genealogiewerkbalk_json"] = json.dumps( data["genealogiewerkbalk_enrichment"], ensure_ascii=False, default=str ) if data.get("digital_platforms"): record["digital_platforms_json"] = json.dumps( data["digital_platforms"], ensure_ascii=False, default=str ) return record def load_yaml_files(directory: Path, limit: int = None) -> list[dict]: """Load all YAML files from directory and convert to records.""" records = [] yaml_files = sorted(directory.glob("*.yaml")) if limit: yaml_files = yaml_files[:limit] print(f"Loading {len(yaml_files)} YAML files from {directory}...") for i, yaml_file in enumerate(yaml_files): try: with open(yaml_file, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data: data["_file_name"] = yaml_file.name record = extract_top_level_fields(data) records.append(record) if (i + 1) % 100 == 0: print(f" Processed {i + 1}/{len(yaml_files)} files...") except Exception as e: print(f" Warning: Failed to load {yaml_file.name}: {e}") continue print(f"Successfully loaded {len(records)} records") return records def upload_to_ducklake(records: list[dict], table_name: str, mode: str = "replace") -> dict: """Upload records to DuckLake via API.""" print(f"\nUploading {len(records)} records to DuckLake table '{table_name}'...") # Write records to temporary JSON file with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(records, f, ensure_ascii=False, default=str) temp_path = f.name try: # Upload via multipart form with open(temp_path, "rb") as f: files = {"file": ("custodians.json", f, "application/json")} data = {"table_name": table_name, "mode": mode} response = requests.post( f"{DUCKLAKE_URL}/upload", files=files, data=data, timeout=120 ) response.raise_for_status() result = response.json() print(f"Upload successful: {result}") return result finally: os.unlink(temp_path) def check_ducklake_status() -> bool: """Check if DuckLake server is running.""" try: response = requests.get(f"{DUCKLAKE_URL}/", timeout=5) status = response.json() print(f"DuckLake status: {status.get('status', 'unknown')}") print(f" Tables: {status.get('tables', 0)}") print(f" Total rows: {status.get('total_rows', 0)}") print(f" Snapshots: {status.get('snapshots', 0)}") return status.get("status") == "healthy" except Exception as e: print(f"Error connecting to DuckLake at {DUCKLAKE_URL}: {e}") return False def main(): parser = argparse.ArgumentParser(description="Load custodian YAML files to DuckLake") parser.add_argument("--dry-run", action="store_true", help="Only load and validate, don't upload") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument("--mode", choices=["replace", "append", "create"], default="replace", help="Upload mode (default: replace)") parser.add_argument("--directory", "-d", type=Path, default=None, help="Directory containing custodian YAML files (default: auto-detect)") args = parser.parse_args() # Determine custodian directory if args.directory: custodian_dir = args.directory else: custodian_dir = DEFAULT_CUSTODIAN_DIR print("=" * 60) print("DuckLake Custodian Data Loader") print("=" * 60) print(f"Source directory: {custodian_dir}") print(f"Target table: {TABLE_NAME}") print(f"DuckLake URL: {DUCKLAKE_URL}") print(f"Mode: {'DRY RUN' if args.dry_run else args.mode.upper()}") if args.limit: print(f"Limit: {args.limit} files") print("=" * 60) # Check if directory exists if not custodian_dir.exists(): print(f"Error: Directory not found: {custodian_dir}") sys.exit(1) # Count YAML files yaml_count = len(list(custodian_dir.glob("*.yaml"))) print(f"Found {yaml_count} YAML files") if yaml_count == 0: print("No YAML files found. Exiting.") sys.exit(0) # Load YAML files records = load_yaml_files(custodian_dir, limit=args.limit) if not records: print("No records loaded. Exiting.") sys.exit(1) # Show sample record print("\nSample record (first):") sample = records[0] for key in ["file_name", "ghcid_current", "custodian_name", "city", "country"]: print(f" {key}: {sample.get(key, 'N/A')}") if args.dry_run: print("\n[DRY RUN] Would upload to DuckLake. Exiting without upload.") return # Check DuckLake status print("\nChecking DuckLake connection...") if not check_ducklake_status(): print("Error: DuckLake is not available. Please start the server.") print(" cd backend/ducklake && python main.py") sys.exit(1) # Upload to DuckLake result = upload_to_ducklake(records, TABLE_NAME, mode=args.mode) print("\n" + "=" * 60) print("Load complete!") print(f" Table: {TABLE_NAME}") print(f" Rows: {result.get('rows_inserted', len(records))}") print(f" Snapshot ID: {result.get('snapshot_id', 'N/A')}") print("=" * 60) if __name__ == "__main__": main()