#!/usr/bin/env python3 """ Load all custodian YAML files into DuckLake as raw enriched data. This script reads all YAML files from data/custodian/ and uploads them to DuckLake as a single table for the bronze/raw data lake layer. Usage: python scripts/load_custodians_to_ducklake.py [--dry-run] [--limit N] API Endpoint: http://localhost:8765/upload """ import argparse import json import os import sys import tempfile from datetime import datetime from pathlib import Path import requests import yaml # Configuration DUCKLAKE_URL = os.getenv("DUCKLAKE_URL", "http://localhost:8765") DEFAULT_CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian" TABLE_NAME = "custodians_raw" def flatten_for_json(obj, parent_key='', sep='_'): """ Flatten nested dict/list structure for better DuckDB compatibility. Keeps complex nested objects as JSON strings. """ items = [] if isinstance(obj, dict): for k, v in obj.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, (dict, list)): # Keep complex nested structures as JSON strings items.append((new_key, json.dumps(v, ensure_ascii=False, default=str))) else: items.append((new_key, v)) return dict(items) def extract_top_level_fields(data: dict) -> dict: """ Extract and flatten top-level fields from YAML data. Complex nested objects are stored as JSON strings. """ record = { # File identification "file_name": data.get("_file_name", ""), "ghcid_current": "", "record_id": "", # Original entry (flattened) "org_name": "", "org_type": "", "wikidata_id": "", # Enrichment status "enrichment_status": data.get("enrichment_status", ""), "processing_timestamp": data.get("processing_timestamp", ""), # Location (from Google Maps) "latitude": None, "longitude": None, "formatted_address": "", "city": "", "country": "", "postal_code": "", # Custodian name consensus "custodian_name": "", "custodian_name_confidence": None, "emic_name": "", # Official name in native/local language "name_language": "", # ISO 639-1 language code for emic_name # Ratings "google_rating": None, "google_total_ratings": None, # TimeSpan (CIDOC-CRM E52_Time-Span) "timespan_begin": None, # begin_of_the_begin (P82a) "timespan_end": None, # begin_of_the_end (P81b) - destruction/closure "timespan_notes": "", "timespan_json": "", # Conflict-related temporal data (Palestinian heritage, etc.) "time_of_destruction_json": "", "conflict_status_json": "", "destruction_date": None, # From time_of_destruction.date or conflict_status.date # Temporal extent (founding/dissolution dates) "founding_date": None, "dissolution_date": None, "temporal_extent_json": "", # Wikidata inception (P571) "wikidata_inception": None, # YouTube enrichment fields (extracted for querying) "youtube_channel_id": "", "youtube_channel_title": "", "youtube_channel_url": "", "youtube_subscriber_count": None, "youtube_video_count": None, "youtube_view_count": None, "youtube_published_at": None, "youtube_description": "", # Google Maps extended fields (in addition to rating/total_ratings) "google_place_id": "", "google_business_status": "", "google_website": "", "google_phone_international": "", "google_primary_type": "", "google_opening_hours_json": "", "google_reviews_json": "", "google_photo_count": None, # Complex nested objects as JSON strings "original_entry_json": "", "wikidata_enrichment_json": "", "google_maps_enrichment_json": "", "youtube_enrichment_json": "", "web_enrichment_json": "", "web_claims_json": "", "ghcid_json": "", "identifiers_json": "", "provenance_json": "", "genealogiewerkbalk_json": "", "digital_platforms_json": "", "service_area_json": "", "unesco_mow_enrichment_json": "", # UNESCO Memory of the World inscriptions } # Extract GHCID ghcid = data.get("ghcid", {}) if ghcid: record["ghcid_current"] = ghcid.get("ghcid_current", "") record["record_id"] = ghcid.get("record_id", "") record["ghcid_json"] = json.dumps(ghcid, ensure_ascii=False, default=str) # Extract original entry original = data.get("original_entry", {}) if original: # Try multiple field names for organization name record["org_name"] = original.get("name", "") or original.get("organisatie", "") # Try multiple field names for institution type # First try 'institution_type' (CH-Annotator format), then 'type' (older format) inst_type = original.get("institution_type", "") or original.get("type", "") if isinstance(inst_type, list): inst_type = ",".join(inst_type) else: inst_type = str(inst_type) if inst_type else "" # Normalize institution type names to standard codes type_normalize = { # Full names to shorter standard codes "OFFICIAL_INSTITUTION": "OFFICIAL", "RESEARCH_CENTER": "RESEARCH", "BOTANICAL_ZOO": "BOTANICAL", "EDUCATION_PROVIDER": "EDUCATION", "COLLECTING_SOCIETY": "SOCIETY", "INTANGIBLE_HERITAGE_GROUP": "INTANGIBLE", # Single-letter codes to full names "G": "GALLERY", "L": "LIBRARY", "A": "ARCHIVE", "M": "MUSEUM", "O": "OFFICIAL", "R": "RESEARCH", "C": "CORPORATION", "U": "UNKNOWN", "B": "BOTANICAL", "E": "EDUCATION", "S": "SOCIETY", "F": "FEATURES", "I": "INTANGIBLE", "X": "MIXED", "P": "PERSONAL", "H": "HOLY_SITES", "D": "DIGITAL", "N": "NGO", "T": "TASTE_SMELL", # Legacy GRP.HER format "GRP.HER.GAL": "GALLERY", "GRP.HER.LIB": "LIBRARY", "GRP.HER.ARC": "ARCHIVE", "GRP.HER.MUS": "MUSEUM", "GRP.HER.DIG": "DIGITAL", "GRP.HER.MIX": "MIXED", "GRP.HER": "UNKNOWN", } # Handle compound types (e.g., "M,F" -> "MUSEUM,FEATURES") def normalize_type(t: str) -> str: if not t: return "" # Split compound types and normalize each parts = [p.strip() for p in t.upper().split(",")] normalized = [type_normalize.get(p, p) for p in parts] return ",".join(normalized) record["org_type"] = normalize_type(inst_type) record["wikidata_id"] = original.get("wikidata_id", "") record["original_entry_json"] = json.dumps(original, ensure_ascii=False, default=str) # Fallback: Extract org_type from GHCID code (4th component) # GHCID format: CC-RR-CCC-T-ABBREV where T is the type code if not record["org_type"] and record.get("ghcid_current"): ghcid_parts = record["ghcid_current"].split("-") if len(ghcid_parts) >= 4: type_code = ghcid_parts[3] # Map single-letter codes to full type names type_map = { "G": "GALLERY", "L": "LIBRARY", "A": "ARCHIVE", "M": "MUSEUM", "O": "OFFICIAL", "R": "RESEARCH", "C": "CORPORATION", "U": "UNKNOWN", "B": "BOTANICAL", "E": "EDUCATION", "S": "SOCIETY", "F": "FEATURES", "I": "INTANGIBLE", "X": "MIXED", "P": "PERSONAL", "H": "HOLY_SITES", "D": "DIGITAL", "N": "NGO", "T": "TASTE_SMELL" } record["org_type"] = type_map.get(type_code, type_code) # ========================================================================== # COORDINATE EXTRACTION - Priority order (first valid wins) # ========================================================================== # 1a. google_maps_enrichment.coordinates.latitude/longitude (nested) # 1b. google_maps_enrichment.latitude/longitude (flat - Argentine files) # 2. ghcid.location_resolution.source_coordinates.latitude/longitude # 3. wikidata_enrichment.wikidata_coordinates.latitude/longitude # 4. locations[0].latitude/longitude OR locations[0].lat/lon # 5. original_entry.locations[0].latitude/longitude # 6. root-level latitude/longitude # ========================================================================== # Helper to check if coordinates are valid def is_valid_coord(lat, lon): if lat is None or lon is None: return False try: lat_f = float(lat) lon_f = float(lon) return -90 <= lat_f <= 90 and -180 <= lon_f <= 180 except (ValueError, TypeError): return False # 1. Extract Google Maps data (highest priority for coordinates) gm = data.get("google_maps_enrichment", {}) if gm: # 1a. Try nested structure first: google_maps_enrichment.coordinates.latitude coords = gm.get("coordinates", {}) lat = coords.get("latitude") lon = coords.get("longitude") if is_valid_coord(lat, lon): record["latitude"] = lat record["longitude"] = lon # 1b. Fallback to flat structure: google_maps_enrichment.latitude # (used by Argentine and other recent enrichments) if record["latitude"] is None: lat = gm.get("latitude") lon = gm.get("longitude") if is_valid_coord(lat, lon): record["latitude"] = lat record["longitude"] = lon record["formatted_address"] = gm.get("formatted_address", "") record["google_rating"] = gm.get("rating") record["google_total_ratings"] = gm.get("total_ratings") # Extract city and country from address components for comp in gm.get("address_components", []): types = comp.get("types", []) if "locality" in types: record["city"] = comp.get("long_name", "") if "country" in types: record["country"] = comp.get("short_name", "") if "postal_code" in types: record["postal_code"] = comp.get("long_name", "") record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str) # Extract extended Google Maps fields record["google_place_id"] = gm.get("place_id", "") record["google_business_status"] = gm.get("business_status", "") record["google_website"] = gm.get("website", "") record["google_phone_international"] = gm.get("phone_international", "") record["google_primary_type"] = gm.get("primary_type", "") record["google_photo_count"] = gm.get("photo_count") # Opening hours as JSON (complex nested structure) if gm.get("opening_hours"): record["google_opening_hours_json"] = json.dumps( gm["opening_hours"], ensure_ascii=False, default=str ) # Reviews as JSON array if gm.get("reviews"): record["google_reviews_json"] = json.dumps( gm["reviews"], ensure_ascii=False, default=str ) # ========================================================================== # YOUTUBE ENRICHMENT EXTRACTION # ========================================================================== yt = data.get("youtube_enrichment", {}) if yt: record["youtube_enrichment_json"] = json.dumps(yt, ensure_ascii=False, default=str) # Extract channel data channel = yt.get("channel", {}) if channel: record["youtube_channel_id"] = channel.get("channel_id", "") record["youtube_channel_title"] = channel.get("title", "") record["youtube_channel_url"] = channel.get("channel_url", "") record["youtube_subscriber_count"] = channel.get("subscriber_count") record["youtube_video_count"] = channel.get("video_count") record["youtube_view_count"] = channel.get("view_count") record["youtube_published_at"] = channel.get("published_at") record["youtube_description"] = channel.get("description", "") # 2. Fallback: GHCID location_resolution.source_coordinates ghcid = data.get("ghcid", {}) if ghcid and record["latitude"] is None: loc_res = ghcid.get("location_resolution", {}) src_coords = loc_res.get("source_coordinates", {}) lat = src_coords.get("latitude") lon = src_coords.get("longitude") if is_valid_coord(lat, lon): record["latitude"] = lat record["longitude"] = lon # 3. Fallback: Wikidata coordinates wd = data.get("wikidata_enrichment", {}) if wd and record["latitude"] is None: wd_coords = wd.get("wikidata_coordinates", {}) lat = wd_coords.get("latitude") lon = wd_coords.get("longitude") if is_valid_coord(lat, lon): record["latitude"] = lat record["longitude"] = lon # 4. Fallback: locations array locations = data.get("locations", []) if locations and isinstance(locations, list) and len(locations) > 0: loc = locations[0] # Use first location if not record["city"] and loc.get("city"): record["city"] = loc.get("city", "") if not record["country"] and loc.get("country"): record["country"] = loc.get("country", "") if record["latitude"] is None: # Try latitude/longitude first, then lat/lon lat = loc.get("latitude") or loc.get("lat") lon = loc.get("longitude") or loc.get("lon") if is_valid_coord(lat, lon): record["latitude"] = lat record["longitude"] = lon # 5. Fallback: original_entry.locations array (Japanese files, etc.) orig_locations = original.get("locations", []) if original else [] if orig_locations and isinstance(orig_locations, list) and len(orig_locations) > 0: orig_loc = orig_locations[0] if record["latitude"] is None: lat = orig_loc.get("latitude") or orig_loc.get("lat") lon = orig_loc.get("longitude") or orig_loc.get("lon") if is_valid_coord(lat, lon): record["latitude"] = lat record["longitude"] = lon # Also try to get city/country from original_entry.locations if not set if not record["city"] and orig_loc.get("city"): record["city"] = orig_loc.get("city", "") if not record["country"] and orig_loc.get("country"): record["country"] = orig_loc.get("country", "") # 6. Fallback: Root-level coordinates if record["latitude"] is None: lat = data.get("latitude") or data.get("lat") lon = data.get("longitude") or data.get("lon") if is_valid_coord(lat, lon): record["latitude"] = lat record["longitude"] = lon # ========================================================================== # COUNTRY/CITY EXTRACTION - Fallbacks from GHCID # ========================================================================== # Fallback: Extract country from GHCID location_resolution if ghcid and not record["country"]: loc_res = ghcid.get("location_resolution", {}) if loc_res.get("country_code"): record["country"] = loc_res.get("country_code", "") # Also try to get city from geonames_name if not set if not record["city"] and loc_res.get("geonames_name"): record["city"] = loc_res.get("geonames_name", "") # Ultimate fallback: Extract country from GHCID string (first 2 chars) ghcid_current = record.get("ghcid_current", "") if not record["country"] and ghcid_current and len(ghcid_current) >= 2: record["country"] = ghcid_current[:2] # Extract custodian name consensus cn = data.get("custodian_name", {}) if cn: record["custodian_name"] = cn.get("claim_value", "") record["custodian_name_confidence"] = cn.get("confidence") record["emic_name"] = cn.get("emic_name", "") record["name_language"] = cn.get("name_language", "") # Store complex objects as JSON if data.get("wikidata_enrichment"): record["wikidata_enrichment_json"] = json.dumps( data["wikidata_enrichment"], ensure_ascii=False, default=str ) if data.get("web_enrichment"): record["web_enrichment_json"] = json.dumps( data["web_enrichment"], ensure_ascii=False, default=str ) if data.get("web_claims"): record["web_claims_json"] = json.dumps( data["web_claims"], ensure_ascii=False, default=str ) if data.get("identifiers"): record["identifiers_json"] = json.dumps( data["identifiers"], ensure_ascii=False, default=str ) if data.get("provenance"): record["provenance_json"] = json.dumps( data["provenance"], ensure_ascii=False, default=str ) if data.get("genealogiewerkbalk_enrichment"): record["genealogiewerkbalk_json"] = json.dumps( data["genealogiewerkbalk_enrichment"], ensure_ascii=False, default=str ) if data.get("digital_platforms"): record["digital_platforms_json"] = json.dumps( data["digital_platforms"], ensure_ascii=False, default=str ) if data.get("service_area"): record["service_area_json"] = json.dumps( data["service_area"], ensure_ascii=False, default=str ) # Extract UNESCO Memory of the World enrichment data if data.get("unesco_mow_enrichment"): record["unesco_mow_enrichment_json"] = json.dumps( data["unesco_mow_enrichment"], ensure_ascii=False, default=str ) # Extract TimeSpan (CIDOC-CRM E52_Time-Span) timespan = data.get("timespan", {}) if timespan: # Primary dates for timeline features record["timespan_begin"] = timespan.get("begin_of_the_begin") # Founding date (P82a) record["timespan_end"] = timespan.get("begin_of_the_end") # Destruction/closure date (P81b) record["timespan_notes"] = timespan.get("notes", "") record["timespan_json"] = json.dumps(timespan, ensure_ascii=False, default=str) # ========================================================================== # TEMPORAL DATA EXTRACTION - Multiple paths # ========================================================================== # Extract time_of_destruction (conflict-related: PS-GZ-*, PS-GZA-* files) time_of_destruction = data.get("time_of_destruction", {}) if time_of_destruction: record["time_of_destruction_json"] = json.dumps(time_of_destruction, ensure_ascii=False, default=str) # Extract destruction date if time_of_destruction.get("date"): record["destruction_date"] = time_of_destruction.get("date") # Extract conflict_status (current operational status) conflict_status = data.get("conflict_status", {}) if conflict_status: record["conflict_status_json"] = json.dumps(conflict_status, ensure_ascii=False, default=str) # If status is 'destroyed' and we don't have destruction_date yet, use this if conflict_status.get("status") == "destroyed" and not record.get("destruction_date"): record["destruction_date"] = conflict_status.get("date") # Extract temporal_extent (founding/dissolution dates) temporal_extent = data.get("temporal_extent", {}) if temporal_extent: record["temporal_extent_json"] = json.dumps(temporal_extent, ensure_ascii=False, default=str) record["founding_date"] = temporal_extent.get("founding_date") record["dissolution_date"] = temporal_extent.get("dissolution_date") or temporal_extent.get("end_date") # Fallback: Check identifiers for temporal_extent identifiers = data.get("identifiers", {}) if identifiers and isinstance(identifiers, dict): id_temporal = identifiers.get("temporal_extent", {}) if id_temporal and not record.get("founding_date"): record["founding_date"] = id_temporal.get("founding_date") if id_temporal and not record.get("dissolution_date"): record["dissolution_date"] = id_temporal.get("dissolution_date") or id_temporal.get("end_date") # Also check for founding_year in identifiers if identifiers.get("founding_year") and not record.get("founding_date"): # Convert year to date format record["founding_date"] = f"{identifiers['founding_year']}-01-01" # Extract wikidata_inception from wikidata_enrichment wd = data.get("wikidata_enrichment", {}) if wd: # Direct wikidata_inception field if wd.get("wikidata_inception"): record["wikidata_inception"] = wd.get("wikidata_inception") # Or from wikidata_claims.inception elif wd.get("wikidata_claims", {}).get("inception"): record["wikidata_inception"] = wd.get("wikidata_claims", {}).get("inception") # Fallback: Check web_enrichment claims for inception or founding_date web_enrichment = data.get("web_enrichment", {}) if web_enrichment and web_enrichment.get("claims"): for claim in web_enrichment.get("claims", []): claim_type = claim.get("claim_type", "") if claim_type in ("inception", "founding_date") and not record.get("founding_date"): record["founding_date"] = claim.get("claim_value") break # Final consolidation: If we have timespan_begin but no founding_date, use it if record.get("timespan_begin") and not record.get("founding_date"): record["founding_date"] = record["timespan_begin"] # If we have timespan_end but no dissolution_date, use it if record.get("timespan_end") and not record.get("dissolution_date"): record["dissolution_date"] = record["timespan_end"] # If we have destruction_date but no dissolution_date, use it if record.get("destruction_date") and not record.get("dissolution_date"): record["dissolution_date"] = record["destruction_date"] return record def load_yaml_files(directory: Path, limit: int | None = None) -> list[dict]: """Load all YAML files from directory and convert to records.""" records = [] yaml_files = sorted(directory.glob("*.yaml")) if limit: yaml_files = yaml_files[:limit] print(f"Loading {len(yaml_files)} YAML files from {directory}...") for i, yaml_file in enumerate(yaml_files): try: with open(yaml_file, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data: data["_file_name"] = yaml_file.name record = extract_top_level_fields(data) records.append(record) if (i + 1) % 100 == 0: print(f" Processed {i + 1}/{len(yaml_files)} files...") except Exception as e: print(f" Warning: Failed to load {yaml_file.name}: {e}") continue print(f"Successfully loaded {len(records)} records") return records def upload_to_ducklake(records: list[dict], table_name: str, mode: str = "replace") -> dict: """Upload records to DuckLake via API.""" print(f"\nUploading {len(records)} records to DuckLake table '{table_name}'...") # Write records to temporary JSON file with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(records, f, ensure_ascii=False, default=str) temp_path = f.name try: # Upload via multipart form with open(temp_path, "rb") as f: files = {"file": ("custodians.json", f, "application/json")} data = {"table_name": table_name, "mode": mode} response = requests.post( f"{DUCKLAKE_URL}/upload", files=files, data=data, timeout=120 ) response.raise_for_status() result = response.json() print(f"Upload successful: {result}") return result finally: os.unlink(temp_path) def check_ducklake_status() -> bool: """Check if DuckLake server is running.""" try: response = requests.get(f"{DUCKLAKE_URL}/", timeout=5) status = response.json() print(f"DuckLake status: {status.get('status', 'unknown')}") print(f" Tables: {status.get('tables', 0)}") print(f" Total rows: {status.get('total_rows', 0)}") print(f" Snapshots: {status.get('snapshots', 0)}") return status.get("status") == "healthy" except Exception as e: print(f"Error connecting to DuckLake at {DUCKLAKE_URL}: {e}") return False def main(): parser = argparse.ArgumentParser(description="Load custodian YAML files to DuckLake") parser.add_argument("--dry-run", action="store_true", help="Only load and validate, don't upload") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument("--mode", choices=["replace", "append", "create"], default="replace", help="Upload mode (default: replace)") parser.add_argument("--directory", "-d", type=Path, default=None, help="Directory containing custodian YAML files (default: auto-detect)") args = parser.parse_args() # Determine custodian directory if args.directory: custodian_dir = args.directory else: custodian_dir = DEFAULT_CUSTODIAN_DIR print("=" * 60) print("DuckLake Custodian Data Loader") print("=" * 60) print(f"Source directory: {custodian_dir}") print(f"Target table: {TABLE_NAME}") print(f"DuckLake URL: {DUCKLAKE_URL}") print(f"Mode: {'DRY RUN' if args.dry_run else args.mode.upper()}") if args.limit: print(f"Limit: {args.limit} files") print("=" * 60) # Check if directory exists if not custodian_dir.exists(): print(f"Error: Directory not found: {custodian_dir}") sys.exit(1) # Count YAML files yaml_count = len(list(custodian_dir.glob("*.yaml"))) print(f"Found {yaml_count} YAML files") if yaml_count == 0: print("No YAML files found. Exiting.") sys.exit(0) # Load YAML files records = load_yaml_files(custodian_dir, limit=args.limit) if not records: print("No records loaded. Exiting.") sys.exit(1) # Show sample record print("\nSample record (first):") sample = records[0] for key in ["file_name", "ghcid_current", "custodian_name", "emic_name", "name_language", "city", "country", "google_rating", "youtube_channel_id"]: value = sample.get(key, 'N/A') if value == "" or value is None: value = "(empty)" print(f" {key}: {value}") # Count non-empty enrichment fields yt_count = sum(1 for r in records if r.get("youtube_channel_id")) gm_count = sum(1 for r in records if r.get("google_place_id")) coord_count = sum(1 for r in records if r.get("latitude") is not None) emic_count = sum(1 for r in records if r.get("emic_name")) print(f"\nEnrichment summary:") print(f" With coordinates: {coord_count}/{len(records)}") print(f" With Google Maps: {gm_count}/{len(records)}") print(f" With YouTube: {yt_count}/{len(records)}") print(f" With emic_name: {emic_count}/{len(records)}") if args.dry_run: print("\n[DRY RUN] Would upload to DuckLake. Exiting without upload.") return # Check DuckLake status print("\nChecking DuckLake connection...") if not check_ducklake_status(): print("Error: DuckLake is not available. Please start the server.") print(" cd backend/ducklake && python main.py") sys.exit(1) # Upload to DuckLake result = upload_to_ducklake(records, TABLE_NAME, mode=args.mode) print("\n" + "=" * 60) print("Load complete!") print(f" Table: {TABLE_NAME}") print(f" Rows: {result.get('rows_inserted', len(records))}") print(f" Snapshot ID: {result.get('snapshot_id', 'N/A')}") print("=" * 60) if __name__ == "__main__": main()