glam/scripts/quick_ducklake_load.py
2025-12-10 23:51:51 +01:00

111 lines
3.9 KiB
Python

#!/usr/bin/env python3
"""Quick DuckLake loader - resumes from current count."""
import json
import yaml
import httpx
from pathlib import Path
DUCKLAKE_URL = "http://localhost:8765"
TABLE_NAME = "custodians_raw"
BATCH_SIZE = 2000 # Smaller batches for reliability
def extract_record(entry, file_name):
"""Extract minimal record for map display."""
original = entry.get('original_entry', {})
ghcid = entry.get('ghcid', {})
loc = ghcid.get('location_resolution', {})
gm = entry.get('google_maps_enrichment', {})
custodian_name = entry.get('custodian_name', {})
location = entry.get('location', {})
ch = entry.get('ch_annotator', {})
ec = ch.get('entity_classification', {})
name = custodian_name.get('claim_value') or original.get('name') or original.get('organisatie') or ''
lat = location.get('latitude') or gm.get('latitude') or loc.get('latitude')
lon = location.get('longitude') or gm.get('longitude') or loc.get('longitude')
inst_type = ec.get('glamorcubesfixphdnt_primary') or ''
if not inst_type and ghcid.get('ghcid_current'):
parts = ghcid['ghcid_current'].split('-')
if len(parts) >= 4 and len(parts[3]) == 1:
inst_type = parts[3]
if not inst_type:
types = original.get('type', [])
if isinstance(types, list) and types:
inst_type = types[0]
return {
'file_name': file_name,
'ghcid_current': ghcid.get('ghcid_current', ''),
'org_name': name,
'org_type': inst_type,
'institution_type': inst_type,
'latitude': lat,
'longitude': lon,
'city': location.get('city') or loc.get('city_name'),
'country': location.get('country') or loc.get('country_code'),
'custodian_name': name,
}
def upload_batch(records, mode="append"):
with httpx.Client(timeout=120.0) as client:
response = client.post(
f"{DUCKLAKE_URL}/upload",
files={"file": ("data.json", json.dumps(records), "application/json")},
data={"table_name": TABLE_NAME, "mode": mode},
)
return response.json() if response.status_code == 200 else None
def get_current_count():
with httpx.Client() as client:
response = client.post(
f"{DUCKLAKE_URL}/query",
json={"query": f"SELECT COUNT(*) FROM heritage.{TABLE_NAME}"}
)
if response.status_code == 200:
return response.json()['rows'][0][0]
return 0
if __name__ == "__main__":
custodian_dir = Path('/Users/kempersc/apps/glam/data/custodian')
yaml_files = sorted(custodian_dir.glob('*.yaml'))
current = get_current_count()
print(f"Current count in DuckLake: {current}")
print(f"Total YAML files: {len(yaml_files)}")
if current >= len(yaml_files):
print("Already fully loaded!")
exit(0)
# Start from current position
yaml_files = yaml_files[current:]
print(f"Loading remaining {len(yaml_files)} files...")
records = []
total = 0
for i, file_path in enumerate(yaml_files):
try:
with open(file_path) as f:
entry = yaml.safe_load(f)
if entry:
records.append(extract_record(entry, file_path.name))
except:
pass
if len(records) >= BATCH_SIZE:
result = upload_batch(records)
if result:
total += result.get('rows_inserted', 0)
print(f" Batch {(i+1)//BATCH_SIZE}: +{result.get('rows_inserted')} (total: {current + total})")
records = []
if records:
result = upload_batch(records)
if result:
total += result.get('rows_inserted', 0)
print(f" Final: +{result.get('rows_inserted')} (total: {current + total})")
print(f"\nComplete! Added {total} rows. Total now: {current + total}")