#!/usr/bin/env python3 """ Update GHCID region and city codes using GeoNames reverse geocoding. For custodian files that have coordinates, this script: 1. Reverse geocodes coordinates to find the nearest GeoNames city 2. Extracts proper admin1_code (region) and city code 3. Updates the GHCID with correct codes 4. Renames the file if GHCID changes Usage: python scripts/update_ghcid_with_geonames.py [--dry-run] [--limit N] [--country CODE] """ import argparse import hashlib import os import re import shutil import sqlite3 import uuid import yaml from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple # Paths PROJECT_ROOT = Path(__file__).parent.parent CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian" GEONAMES_DB = PROJECT_ROOT / "data" / "reference" / "geonames.db" REPORTS_DIR = PROJECT_ROOT / "reports" # GHCID namespace for UUID generation GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # Country-specific region code mappings (GeoNames admin1_code -> ISO 3166-2) # This handles cases where GeoNames codes differ from ISO codes REGION_CODE_MAPPINGS = { 'NL': { '01': 'DR', # Drenthe '02': 'FR', # Friesland '03': 'GE', # Gelderland '04': 'GR', # Groningen '05': 'LI', # Limburg '06': 'NB', # Noord-Brabant '07': 'NH', # Noord-Holland '09': 'UT', # Utrecht '10': 'ZE', # Zeeland '11': 'ZH', # Zuid-Holland '15': 'OV', # Overijssel '16': 'FL', # Flevoland }, # Japan uses prefecture numbers which are fine as-is (2-digit) # Most countries can use admin1_code directly } # Type code mapping TYPE_TO_CODE = { 'GALLERY': 'G', 'LIBRARY': 'L', 'ARCHIVE': 'A', 'MUSEUM': 'M', 'OFFICIAL_INSTITUTION': 'O', 'RESEARCH_CENTER': 'R', 'CORPORATION': 'C', 'UNKNOWN': 'U', 'BOTANICAL_ZOO': 'B', 'EDUCATION_PROVIDER': 'E', 'COLLECTING_SOCIETY': 'S', 'FEATURES': 'F', 'INTANGIBLE_HERITAGE_GROUP': 'I', 'MIXED': 'X', 'PERSONAL_COLLECTION': 'P', 'HOLY_SITES': 'H', 'DIGITAL_PLATFORM': 'D', 'NGO': 'N', 'TASTE_SMELL': 'T', } def get_geonames_connection() -> sqlite3.Connection: """Get connection to GeoNames database.""" return sqlite3.connect(GEONAMES_DB) def reverse_geocode(lat: float, lon: float, country_code: str, conn: sqlite3.Connection) -> Optional[Dict]: """ Find nearest GeoNames city for given coordinates. Uses simple Euclidean distance (good enough for nearby city matching). Filters by feature_code to exclude neighborhoods (PPLX). """ # Query for nearest city, excluding PPLX (neighborhoods) cursor = conn.execute(""" SELECT geonames_id, name, ascii_name, admin1_code, admin1_name, latitude, longitude, feature_code, population, ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as distance_sq FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') ORDER BY distance_sq LIMIT 1 """, (lat, lat, lon, lon, country_code)) row = cursor.fetchone() if row: return { 'geonames_id': row[0], 'city_name': row[1], 'ascii_name': row[2], 'admin1_code': row[3], 'admin1_name': row[4], 'latitude': row[5], 'longitude': row[6], 'feature_code': row[7], 'population': row[8], 'distance_sq': row[9], } return None def generate_city_code(name: str) -> str: """Generate 3-letter city code from name.""" import unicodedata if not name: return "XXX" # Normalize and remove diacritics normalized = unicodedata.normalize('NFD', name) ascii_only = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Keep only alphanumeric clean = re.sub(r'[^a-zA-Z0-9]', '', ascii_only) return clean[:3].upper() if clean else "XXX" def get_region_code(country_code: str, admin1_code: str) -> str: """Get 2-letter region code, using mappings if available.""" if not admin1_code: return "XX" # Check for country-specific mapping if country_code in REGION_CODE_MAPPINGS: mapped = REGION_CODE_MAPPINGS[country_code].get(admin1_code) if mapped: return mapped # Use admin1_code directly (truncate to 2 chars if needed) return admin1_code[:2].upper() def generate_ghcid(country_code: str, region_code: str, city_code: str, institution_type: str, abbreviation: str, name_suffix: Optional[str] = None) -> str: """Generate GHCID string.""" type_code = TYPE_TO_CODE.get(institution_type, 'U') ghcid = f"{country_code}-{region_code}-{city_code}-{type_code}-{abbreviation}" if name_suffix: ghcid = f"{ghcid}-{name_suffix}" return ghcid def generate_ghcid_uuid(ghcid: str) -> str: """Generate UUID v5 from GHCID.""" return str(uuid.uuid5(GHCID_NAMESPACE, ghcid)) def generate_ghcid_uuid_sha256(ghcid: str) -> str: """Generate UUID v8 (SHA-256 based) from GHCID.""" sha256_hash = hashlib.sha256(ghcid.encode()).hexdigest() return f"{sha256_hash[:8]}-{sha256_hash[8:12]}-8{sha256_hash[13:16]}-{sha256_hash[16:20]}-{sha256_hash[20:32]}" def generate_ghcid_numeric(ghcid: str) -> int: """Generate 64-bit numeric ID from GHCID.""" sha256_hash = hashlib.sha256(ghcid.encode()).digest() return int.from_bytes(sha256_hash[:8], 'big') def extract_coordinates(data: Dict) -> Optional[Tuple[float, float]]: """Extract latitude/longitude from custodian data.""" # Check original_entry.locations locations = data.get('original_entry', {}).get('locations', []) if locations and isinstance(locations, list): loc = locations[0] lat = loc.get('latitude') lon = loc.get('longitude') if lat is not None and lon is not None: return (float(lat), float(lon)) # Check top-level locations locations = data.get('locations', []) if locations and isinstance(locations, list): loc = locations[0] lat = loc.get('latitude') lon = loc.get('longitude') if lat is not None and lon is not None: return (float(lat), float(lon)) # Check google_maps_enrichment gm = data.get('google_maps_enrichment', {}) lat = gm.get('latitude') lon = gm.get('longitude') if lat is not None and lon is not None: return (float(lat), float(lon)) return None def extract_country_code(data: Dict) -> str: """Extract country code from custodian data.""" # Try ghcid.location_resolution country = data.get('ghcid', {}).get('location_resolution', {}).get('country_code') if country and country != 'XX': return country # Try original_entry.locations locations = data.get('original_entry', {}).get('locations', []) if locations: country = locations[0].get('country') if country: return country # Try top-level locations locations = data.get('locations', []) if locations: country = locations[0].get('country') if country: return country return 'XX' def extract_abbreviation_from_ghcid(ghcid: str) -> str: """Extract the abbreviation component from a GHCID.""" parts = ghcid.split('-') if len(parts) >= 5: return parts[4] return "UNK" def extract_name_suffix_from_ghcid(ghcid: str) -> Optional[str]: """Extract name suffix from GHCID if present.""" parts = ghcid.split('-') if len(parts) > 5: return '-'.join(parts[5:]) return None def validate_ch_annotator_entity(data: Dict) -> Tuple[bool, str]: """ Validate that the entity has a valid CH-Annotator profile for heritage institutions. Returns (is_valid, entity_subtype). Valid subtypes for enrichment: GRP.HER.* (heritage institutions) """ ch_annotator = data.get('ch_annotator', {}) entity_class = ch_annotator.get('entity_classification', {}) hypernym = entity_class.get('hypernym', '') subtype = entity_class.get('subtype', '') # Valid heritage institution subtypes valid_subtypes = [ 'GRP.HER', # Generic heritage institution 'GRP.HER.GAL', # Gallery 'GRP.HER.LIB', # Library 'GRP.HER.ARC', # Archive 'GRP.HER.MUS', # Museum 'GRP.HER.RES', # Research center 'GRP.HER.EDU', # Education provider 'GRP.HER.REL', # Religious heritage site 'GRP.HER.BOT', # Botanical/zoo 'GRP.HER.MIX', # Mixed type ] # Check if entity has valid heritage subtype if subtype: for valid in valid_subtypes: if subtype.startswith(valid): return (True, subtype) # Fallback: check hypernym is GROUP if hypernym == 'GRP': # Check institution_type from original_entry inst_type = data.get('original_entry', {}).get('institution_type', '') if inst_type in TYPE_TO_CODE: return (True, f'GRP.HER.{inst_type[:3]}') # No valid CH-Annotator profile - but still allow processing if has institution_type inst_type = data.get('original_entry', {}).get('institution_type', '') if inst_type and inst_type != 'UNKNOWN': return (True, f'INFERRED.{inst_type}') return (False, '') def process_file(filepath: Path, conn: sqlite3.Connection, dry_run: bool = False, require_ch_annotator: bool = False) -> Dict: """ Process a single custodian file. Args: filepath: Path to custodian YAML file conn: GeoNames database connection dry_run: If True, don't write changes require_ch_annotator: If True, skip files without valid CH-Annotator entity profile Returns dict with processing results. """ result = { 'file': filepath.name, 'status': 'skipped', 'old_ghcid': None, 'new_ghcid': None, 'geonames_match': None, 'entity_profile': None, 'error': None, } try: with open(filepath, 'r') as f: data = yaml.safe_load(f) if not data: result['status'] = 'error' result['error'] = 'Empty file' return result # Validate CH-Annotator entity profile is_valid_entity, entity_subtype = validate_ch_annotator_entity(data) result['entity_profile'] = entity_subtype if require_ch_annotator and not is_valid_entity: result['status'] = 'invalid_entity_profile' result['error'] = 'No valid CH-Annotator GRP.HER.* entity profile' return result # Get current GHCID current_ghcid = data.get('ghcid', {}).get('ghcid_current') if not current_ghcid: result['status'] = 'error' result['error'] = 'No GHCID found' return result result['old_ghcid'] = current_ghcid # Check if already has proper GeoNames resolution resolution = data.get('ghcid', {}).get('location_resolution', {}) if resolution.get('method') == 'REVERSE_GEOCODE' and resolution.get('geonames_id'): result['status'] = 'already_geocoded' return result # Extract coordinates coords = extract_coordinates(data) if not coords: result['status'] = 'no_coordinates' return result lat, lon = coords country_code = extract_country_code(data) if country_code == 'XX': result['status'] = 'no_country' return result # Reverse geocode geo_result = reverse_geocode(lat, lon, country_code, conn) if not geo_result: result['status'] = 'geocode_failed' return result result['geonames_match'] = { 'city': geo_result['city_name'], 'admin1': geo_result['admin1_name'], 'geonames_id': geo_result['geonames_id'], } # Generate new codes new_region_code = get_region_code(country_code, geo_result['admin1_code']) new_city_code = generate_city_code(geo_result['ascii_name']) # Extract existing abbreviation and name suffix abbreviation = extract_abbreviation_from_ghcid(current_ghcid) name_suffix = extract_name_suffix_from_ghcid(current_ghcid) # Get institution type inst_type = data.get('original_entry', {}).get('institution_type', 'UNKNOWN') # Generate new GHCID new_ghcid = generate_ghcid(country_code, new_region_code, new_city_code, inst_type, abbreviation, name_suffix) result['new_ghcid'] = new_ghcid # Check if GHCID changed if new_ghcid == current_ghcid: result['status'] = 'unchanged' return result if dry_run: result['status'] = 'would_update' return result # Update the data timestamp = datetime.now(timezone.utc).isoformat() # Update GHCID section data['ghcid']['ghcid_current'] = new_ghcid data['ghcid']['ghcid_uuid'] = generate_ghcid_uuid(new_ghcid) data['ghcid']['ghcid_uuid_sha256'] = generate_ghcid_uuid_sha256(new_ghcid) data['ghcid']['ghcid_numeric'] = generate_ghcid_numeric(new_ghcid) # Update location_resolution data['ghcid']['location_resolution'] = { 'method': 'REVERSE_GEOCODE', 'country_code': country_code, 'region_code': new_region_code, 'region_name': geo_result['admin1_name'], 'city_code': new_city_code, 'city_name': geo_result['city_name'], 'geonames_id': geo_result['geonames_id'], 'feature_code': geo_result['feature_code'], 'resolution_date': timestamp, } # Add to GHCID history history = data['ghcid'].get('ghcid_history', []) # Mark old GHCID as superseded if history: history[0]['valid_to'] = timestamp history[0]['superseded_by'] = new_ghcid # Add new GHCID entry history.insert(0, { 'ghcid': new_ghcid, 'ghcid_numeric': generate_ghcid_numeric(new_ghcid), 'valid_from': timestamp, 'reason': f'Updated via GeoNames reverse geocoding (matched {geo_result["city_name"]}, geonames:{geo_result["geonames_id"]})', }) data['ghcid']['ghcid_history'] = history # Update identifiers for ident in data.get('identifiers', []): if ident.get('identifier_scheme') == 'GHCID': ident['identifier_value'] = new_ghcid elif ident.get('identifier_scheme') == 'GHCID_UUID': ident['identifier_value'] = generate_ghcid_uuid(new_ghcid) elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256': ident['identifier_value'] = generate_ghcid_uuid_sha256(new_ghcid) elif ident.get('identifier_scheme') == 'GHCID_NUMERIC': ident['identifier_value'] = str(generate_ghcid_numeric(new_ghcid)) # Write updated data new_filename = f"{new_ghcid}.yaml" new_filepath = CUSTODIAN_DIR / new_filename with open(new_filepath, 'w') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) # Remove old file if different if filepath != new_filepath: os.remove(filepath) result['status'] = 'updated' return result except Exception as e: result['status'] = 'error' result['error'] = str(e) return result def main(): parser = argparse.ArgumentParser(description='Update GHCID with GeoNames data') parser.add_argument('--dry-run', action='store_true', help='Show changes without applying') parser.add_argument('--limit', type=int, help='Limit number of files to process') parser.add_argument('--country', type=str, help='Only process files for specific country') parser.add_argument('--verbose', action='store_true', help='Show detailed output') parser.add_argument('--require-ch-annotator', action='store_true', help='Only process files with valid CH-Annotator GRP.HER.* entity profile') args = parser.parse_args() print("=" * 60) print("Update GHCID with GeoNames Reverse Geocoding") print("=" * 60) print() if args.dry_run: print("*** DRY RUN - No changes will be made ***") print() if args.require_ch_annotator: print("*** Requiring CH-Annotator entity profile (GRP.HER.*) ***") print() # Connect to GeoNames if not GEONAMES_DB.exists(): print(f"Error: GeoNames database not found at {GEONAMES_DB}") return conn = get_geonames_connection() print(f"Connected to GeoNames database") # Get list of files files = list(CUSTODIAN_DIR.glob("*.yaml")) print(f"Found {len(files)} custodian files") # Filter by country if specified if args.country: files = [f for f in files if f.name.startswith(f"{args.country}-")] print(f"Filtered to {len(files)} files for country {args.country}") # Apply limit if args.limit: files = files[:args.limit] print(f"Limited to {args.limit} files") print() # Process files stats = { 'updated': 0, 'unchanged': 0, 'already_geocoded': 0, 'no_coordinates': 0, 'no_country': 0, 'geocode_failed': 0, 'would_update': 0, 'invalid_entity_profile': 0, 'error': 0, } updates = [] entity_profiles_seen = {} for i, filepath in enumerate(files): if (i + 1) % 500 == 0: print(f"Progress: {i + 1}/{len(files)}") result = process_file(filepath, conn, args.dry_run, args.require_ch_annotator) stats[result['status']] = stats.get(result['status'], 0) + 1 # Track entity profiles profile = result.get('entity_profile', 'NONE') entity_profiles_seen[profile] = entity_profiles_seen.get(profile, 0) + 1 if result['status'] in ('updated', 'would_update'): updates.append(result) if args.verbose: print(f" {result['old_ghcid']} -> {result['new_ghcid']}") print(f" Matched: {result['geonames_match']}") print(f" Entity: {result.get('entity_profile', 'N/A')}") conn.close() # Print summary print() print("=" * 60) print("SUMMARY") print("=" * 60) print(f"Total files processed: {len(files)}") print() print("Results:") print(f" Updated: {stats.get('updated', 0)}") print(f" Would update (dry-run): {stats.get('would_update', 0)}") print(f" Unchanged: {stats.get('unchanged', 0)}") print(f" Already geocoded: {stats.get('already_geocoded', 0)}") print(f" No coordinates: {stats.get('no_coordinates', 0)}") print(f" No country code: {stats.get('no_country', 0)}") print(f" Geocode failed: {stats.get('geocode_failed', 0)}") print(f" Invalid entity profile: {stats.get('invalid_entity_profile', 0)}") print(f" Errors: {stats.get('error', 0)}") # Print entity profile breakdown if entity_profiles_seen: print() print("CH-Annotator Entity Profiles:") for profile, count in sorted(entity_profiles_seen.items(), key=lambda x: -x[1])[:10]: print(f" {profile}: {count}") # Save report timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') report_file = REPORTS_DIR / f"GEONAMES_UPDATE_REPORT_{timestamp}.md" with open(report_file, 'w') as f: f.write("# GeoNames GHCID Update Report\n\n") f.write(f"Generated: {datetime.now(timezone.utc).isoformat()}\n\n") f.write("## Summary\n\n") f.write(f"| Metric | Count |\n") f.write(f"|--------|-------|\n") f.write(f"| Files processed | {len(files)} |\n") f.write(f"| Updated | {stats.get('updated', 0)} |\n") f.write(f"| Would update | {stats.get('would_update', 0)} |\n") f.write(f"| Unchanged | {stats.get('unchanged', 0)} |\n") f.write(f"| Already geocoded | {stats.get('already_geocoded', 0)} |\n") f.write(f"| No coordinates | {stats.get('no_coordinates', 0)} |\n") f.write(f"| Geocode failed | {stats.get('geocode_failed', 0)} |\n") f.write(f"| Invalid entity profile | {stats.get('invalid_entity_profile', 0)} |\n") f.write(f"| Errors | {stats.get('error', 0)} |\n") # Entity profile breakdown if entity_profiles_seen: f.write("\n## CH-Annotator Entity Profiles\n\n") f.write("| Entity Profile | Count |\n") f.write("|---------------|-------|\n") for profile, count in sorted(entity_profiles_seen.items(), key=lambda x: -x[1]): f.write(f"| {profile} | {count} |\n") if updates: f.write("\n## Updates\n\n") f.write("| Old GHCID | New GHCID | Matched City | Entity Profile |\n") f.write("|-----------|-----------|-------------|----------------|\n") for u in updates[:100]: # Limit to first 100 city = u.get('geonames_match', {}).get('city', 'N/A') profile = u.get('entity_profile', 'N/A') f.write(f"| {u['old_ghcid']} | {u['new_ghcid']} | {city} | {profile} |\n") if len(updates) > 100: f.write(f"\n*... and {len(updates) - 100} more updates*\n") print() print(f"Report saved to: {report_file}") if __name__ == '__main__': main()