- Add fix_yaml_history.py and fix_yaml_history_v2.py for cleaning up malformed ghcid_history entries with duplicate/redundant data - Update load_custodians_to_ducklake.py for DuckDB lakehouse loading - Update migrate_web_archives.py for web archive management - Update deploy.sh with improvements - Ignore entire data/ducklake/ directory (generated databases)
396 lines
14 KiB
Python
396 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load all custodian YAML files into DuckLake as raw enriched data.
|
|
|
|
This script reads all YAML files from data/custodian/ and uploads them
|
|
to DuckLake as a single table for the bronze/raw data lake layer.
|
|
|
|
Usage:
|
|
python scripts/load_custodians_to_ducklake.py [--dry-run] [--limit N]
|
|
|
|
API Endpoint: http://localhost:8765/upload
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
# Configuration
|
|
DUCKLAKE_URL = os.getenv("DUCKLAKE_URL", "http://localhost:8765")
|
|
DEFAULT_CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
|
|
TABLE_NAME = "custodians_raw"
|
|
|
|
|
|
def flatten_for_json(obj, parent_key='', sep='_'):
|
|
"""
|
|
Flatten nested dict/list structure for better DuckDB compatibility.
|
|
Keeps complex nested objects as JSON strings.
|
|
"""
|
|
items = []
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
new_key = f"{parent_key}{sep}{k}" if parent_key else k
|
|
if isinstance(v, (dict, list)):
|
|
# Keep complex nested structures as JSON strings
|
|
items.append((new_key, json.dumps(v, ensure_ascii=False, default=str)))
|
|
else:
|
|
items.append((new_key, v))
|
|
return dict(items)
|
|
|
|
|
|
def extract_top_level_fields(data: dict) -> dict:
|
|
"""
|
|
Extract and flatten top-level fields from YAML data.
|
|
Complex nested objects are stored as JSON strings.
|
|
"""
|
|
record = {
|
|
# File identification
|
|
"file_name": data.get("_file_name", ""),
|
|
"ghcid_current": "",
|
|
"record_id": "",
|
|
|
|
# Original entry (flattened)
|
|
"org_name": "",
|
|
"org_type": "",
|
|
"wikidata_id": "",
|
|
|
|
# Enrichment status
|
|
"enrichment_status": data.get("enrichment_status", ""),
|
|
"processing_timestamp": data.get("processing_timestamp", ""),
|
|
|
|
# Location (from Google Maps)
|
|
"latitude": None,
|
|
"longitude": None,
|
|
"formatted_address": "",
|
|
"city": "",
|
|
"country": "",
|
|
"postal_code": "",
|
|
|
|
# Custodian name consensus
|
|
"custodian_name": "",
|
|
"custodian_name_confidence": None,
|
|
|
|
# Ratings
|
|
"google_rating": None,
|
|
"google_total_ratings": None,
|
|
|
|
# Complex nested objects as JSON strings
|
|
"original_entry_json": "",
|
|
"wikidata_enrichment_json": "",
|
|
"google_maps_enrichment_json": "",
|
|
"web_enrichment_json": "",
|
|
"web_claims_json": "",
|
|
"ghcid_json": "",
|
|
"identifiers_json": "",
|
|
"provenance_json": "",
|
|
"genealogiewerkbalk_json": "",
|
|
"digital_platforms_json": "",
|
|
}
|
|
|
|
# Extract GHCID
|
|
ghcid = data.get("ghcid", {})
|
|
if ghcid:
|
|
record["ghcid_current"] = ghcid.get("ghcid_current", "")
|
|
record["record_id"] = ghcid.get("record_id", "")
|
|
record["ghcid_json"] = json.dumps(ghcid, ensure_ascii=False, default=str)
|
|
|
|
# Extract original entry
|
|
original = data.get("original_entry", {})
|
|
if original:
|
|
# Try multiple field names for organization name
|
|
record["org_name"] = original.get("name", "") or original.get("organisatie", "")
|
|
|
|
# Try multiple field names for institution type
|
|
# First try 'institution_type' (CH-Annotator format), then 'type' (older format)
|
|
inst_type = original.get("institution_type", "") or original.get("type", "")
|
|
if isinstance(inst_type, list):
|
|
inst_type = ",".join(inst_type)
|
|
else:
|
|
inst_type = str(inst_type) if inst_type else ""
|
|
|
|
# Normalize institution type names to standard codes
|
|
type_normalize = {
|
|
"OFFICIAL_INSTITUTION": "OFFICIAL",
|
|
"RESEARCH_CENTER": "RESEARCH",
|
|
"BOTANICAL_ZOO": "BOTANICAL",
|
|
"EDUCATION_PROVIDER": "EDUCATION",
|
|
"COLLECTING_SOCIETY": "SOCIETY",
|
|
"INTANGIBLE_HERITAGE_GROUP": "INTANGIBLE",
|
|
}
|
|
record["org_type"] = type_normalize.get(inst_type.upper(), inst_type.upper()) if inst_type else ""
|
|
|
|
record["wikidata_id"] = original.get("wikidata_id", "")
|
|
record["original_entry_json"] = json.dumps(original, ensure_ascii=False, default=str)
|
|
|
|
# Fallback: Extract org_type from GHCID code (4th component)
|
|
# GHCID format: CC-RR-CCC-T-ABBREV where T is the type code
|
|
if not record["org_type"] and record.get("ghcid_current"):
|
|
ghcid_parts = record["ghcid_current"].split("-")
|
|
if len(ghcid_parts) >= 4:
|
|
type_code = ghcid_parts[3]
|
|
# Map single-letter codes to full type names
|
|
type_map = {
|
|
"G": "GALLERY", "L": "LIBRARY", "A": "ARCHIVE", "M": "MUSEUM",
|
|
"O": "OFFICIAL", "R": "RESEARCH", "C": "CORPORATION", "U": "UNKNOWN",
|
|
"B": "BOTANICAL", "E": "EDUCATION", "S": "SOCIETY", "F": "FEATURES",
|
|
"I": "INTANGIBLE", "X": "MIXED", "P": "PERSONAL", "H": "HOLY_SITES",
|
|
"D": "DIGITAL", "N": "NGO", "T": "TASTE_SMELL"
|
|
}
|
|
record["org_type"] = type_map.get(type_code, type_code)
|
|
|
|
# Extract Google Maps data
|
|
gm = data.get("google_maps_enrichment", {})
|
|
if gm:
|
|
coords = gm.get("coordinates", {})
|
|
record["latitude"] = coords.get("latitude")
|
|
record["longitude"] = coords.get("longitude")
|
|
record["formatted_address"] = gm.get("formatted_address", "")
|
|
record["google_rating"] = gm.get("rating")
|
|
record["google_total_ratings"] = gm.get("total_ratings")
|
|
|
|
# Extract city and country from address components
|
|
for comp in gm.get("address_components", []):
|
|
types = comp.get("types", [])
|
|
if "locality" in types:
|
|
record["city"] = comp.get("long_name", "")
|
|
if "country" in types:
|
|
record["country"] = comp.get("short_name", "")
|
|
if "postal_code" in types:
|
|
record["postal_code"] = comp.get("long_name", "")
|
|
|
|
record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str)
|
|
|
|
# Fallback: Extract location from locations array if not set from Google Maps
|
|
locations = data.get("locations", [])
|
|
if locations and isinstance(locations, list) and len(locations) > 0:
|
|
loc = locations[0] # Use first location
|
|
if not record["city"] and loc.get("city"):
|
|
record["city"] = loc.get("city", "")
|
|
if not record["country"] and loc.get("country"):
|
|
record["country"] = loc.get("country", "")
|
|
if record["latitude"] is None and loc.get("latitude"):
|
|
record["latitude"] = loc.get("latitude")
|
|
if record["longitude"] is None and loc.get("longitude"):
|
|
record["longitude"] = loc.get("longitude")
|
|
|
|
# Fallback: Extract country from GHCID location_resolution
|
|
ghcid = data.get("ghcid", {})
|
|
if ghcid and not record["country"]:
|
|
loc_res = ghcid.get("location_resolution", {})
|
|
if loc_res.get("country_code"):
|
|
record["country"] = loc_res.get("country_code", "")
|
|
# Also try to get city from geonames_name if not set
|
|
if not record["city"] and loc_res.get("geonames_name"):
|
|
record["city"] = loc_res.get("geonames_name", "")
|
|
|
|
# Ultimate fallback: Extract country from GHCID string (first 2 chars)
|
|
ghcid_current = record.get("ghcid_current", "")
|
|
if not record["country"] and ghcid_current and len(ghcid_current) >= 2:
|
|
record["country"] = ghcid_current[:2]
|
|
|
|
# Extract custodian name consensus
|
|
cn = data.get("custodian_name", {})
|
|
if cn:
|
|
record["custodian_name"] = cn.get("claim_value", "")
|
|
record["custodian_name_confidence"] = cn.get("confidence")
|
|
|
|
# Store complex objects as JSON
|
|
if data.get("wikidata_enrichment"):
|
|
record["wikidata_enrichment_json"] = json.dumps(
|
|
data["wikidata_enrichment"], ensure_ascii=False, default=str
|
|
)
|
|
|
|
if data.get("web_enrichment"):
|
|
record["web_enrichment_json"] = json.dumps(
|
|
data["web_enrichment"], ensure_ascii=False, default=str
|
|
)
|
|
|
|
if data.get("web_claims"):
|
|
record["web_claims_json"] = json.dumps(
|
|
data["web_claims"], ensure_ascii=False, default=str
|
|
)
|
|
|
|
if data.get("identifiers"):
|
|
record["identifiers_json"] = json.dumps(
|
|
data["identifiers"], ensure_ascii=False, default=str
|
|
)
|
|
|
|
if data.get("provenance"):
|
|
record["provenance_json"] = json.dumps(
|
|
data["provenance"], ensure_ascii=False, default=str
|
|
)
|
|
|
|
if data.get("genealogiewerkbalk_enrichment"):
|
|
record["genealogiewerkbalk_json"] = json.dumps(
|
|
data["genealogiewerkbalk_enrichment"], ensure_ascii=False, default=str
|
|
)
|
|
|
|
if data.get("digital_platforms"):
|
|
record["digital_platforms_json"] = json.dumps(
|
|
data["digital_platforms"], ensure_ascii=False, default=str
|
|
)
|
|
|
|
return record
|
|
|
|
|
|
def load_yaml_files(directory: Path, limit: int = None) -> list[dict]:
|
|
"""Load all YAML files from directory and convert to records."""
|
|
records = []
|
|
yaml_files = sorted(directory.glob("*.yaml"))
|
|
|
|
if limit:
|
|
yaml_files = yaml_files[:limit]
|
|
|
|
print(f"Loading {len(yaml_files)} YAML files from {directory}...")
|
|
|
|
for i, yaml_file in enumerate(yaml_files):
|
|
try:
|
|
with open(yaml_file, "r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if data:
|
|
data["_file_name"] = yaml_file.name
|
|
record = extract_top_level_fields(data)
|
|
records.append(record)
|
|
|
|
if (i + 1) % 100 == 0:
|
|
print(f" Processed {i + 1}/{len(yaml_files)} files...")
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Failed to load {yaml_file.name}: {e}")
|
|
continue
|
|
|
|
print(f"Successfully loaded {len(records)} records")
|
|
return records
|
|
|
|
|
|
def upload_to_ducklake(records: list[dict], table_name: str, mode: str = "replace") -> dict:
|
|
"""Upload records to DuckLake via API."""
|
|
print(f"\nUploading {len(records)} records to DuckLake table '{table_name}'...")
|
|
|
|
# Write records to temporary JSON file
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
|
json.dump(records, f, ensure_ascii=False, default=str)
|
|
temp_path = f.name
|
|
|
|
try:
|
|
# Upload via multipart form
|
|
with open(temp_path, "rb") as f:
|
|
files = {"file": ("custodians.json", f, "application/json")}
|
|
data = {"table_name": table_name, "mode": mode}
|
|
|
|
response = requests.post(
|
|
f"{DUCKLAKE_URL}/upload",
|
|
files=files,
|
|
data=data,
|
|
timeout=120
|
|
)
|
|
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
print(f"Upload successful: {result}")
|
|
return result
|
|
|
|
finally:
|
|
os.unlink(temp_path)
|
|
|
|
|
|
def check_ducklake_status() -> bool:
|
|
"""Check if DuckLake server is running."""
|
|
try:
|
|
response = requests.get(f"{DUCKLAKE_URL}/", timeout=5)
|
|
status = response.json()
|
|
print(f"DuckLake status: {status.get('status', 'unknown')}")
|
|
print(f" Tables: {status.get('tables', 0)}")
|
|
print(f" Total rows: {status.get('total_rows', 0)}")
|
|
print(f" Snapshots: {status.get('snapshots', 0)}")
|
|
return status.get("status") == "healthy"
|
|
except Exception as e:
|
|
print(f"Error connecting to DuckLake at {DUCKLAKE_URL}: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Load custodian YAML files to DuckLake")
|
|
parser.add_argument("--dry-run", action="store_true", help="Only load and validate, don't upload")
|
|
parser.add_argument("--limit", type=int, help="Limit number of files to process")
|
|
parser.add_argument("--mode", choices=["replace", "append", "create"], default="replace",
|
|
help="Upload mode (default: replace)")
|
|
parser.add_argument("--directory", "-d", type=Path, default=None,
|
|
help="Directory containing custodian YAML files (default: auto-detect)")
|
|
args = parser.parse_args()
|
|
|
|
# Determine custodian directory
|
|
if args.directory:
|
|
custodian_dir = args.directory
|
|
else:
|
|
custodian_dir = DEFAULT_CUSTODIAN_DIR
|
|
|
|
print("=" * 60)
|
|
print("DuckLake Custodian Data Loader")
|
|
print("=" * 60)
|
|
print(f"Source directory: {custodian_dir}")
|
|
print(f"Target table: {TABLE_NAME}")
|
|
print(f"DuckLake URL: {DUCKLAKE_URL}")
|
|
print(f"Mode: {'DRY RUN' if args.dry_run else args.mode.upper()}")
|
|
if args.limit:
|
|
print(f"Limit: {args.limit} files")
|
|
print("=" * 60)
|
|
|
|
# Check if directory exists
|
|
if not custodian_dir.exists():
|
|
print(f"Error: Directory not found: {custodian_dir}")
|
|
sys.exit(1)
|
|
|
|
# Count YAML files
|
|
yaml_count = len(list(custodian_dir.glob("*.yaml")))
|
|
print(f"Found {yaml_count} YAML files")
|
|
|
|
if yaml_count == 0:
|
|
print("No YAML files found. Exiting.")
|
|
sys.exit(0)
|
|
|
|
# Load YAML files
|
|
records = load_yaml_files(custodian_dir, limit=args.limit)
|
|
|
|
if not records:
|
|
print("No records loaded. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Show sample record
|
|
print("\nSample record (first):")
|
|
sample = records[0]
|
|
for key in ["file_name", "ghcid_current", "custodian_name", "city", "country"]:
|
|
print(f" {key}: {sample.get(key, 'N/A')}")
|
|
|
|
if args.dry_run:
|
|
print("\n[DRY RUN] Would upload to DuckLake. Exiting without upload.")
|
|
return
|
|
|
|
# Check DuckLake status
|
|
print("\nChecking DuckLake connection...")
|
|
if not check_ducklake_status():
|
|
print("Error: DuckLake is not available. Please start the server.")
|
|
print(" cd backend/ducklake && python main.py")
|
|
sys.exit(1)
|
|
|
|
# Upload to DuckLake
|
|
result = upload_to_ducklake(records, TABLE_NAME, mode=args.mode)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Load complete!")
|
|
print(f" Table: {TABLE_NAME}")
|
|
print(f" Rows: {result.get('rows_inserted', len(records))}")
|
|
print(f" Snapshot ID: {result.get('snapshot_id', 'N/A')}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|