glam/scripts/update_ghcid_with_geonames.py
kempersc e45c1a3c85 feat(scripts): add city enrichment and location resolution utilities
Enrichment scripts for country-specific city data:
- enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py
- enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py
- enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py

Location resolution utilities:
- resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames
- resolve_cities_wikidata.py - Use Wikidata P131 for city resolution
- resolve_country_codes.py - Standardize country codes
- resolve_cz_xx_regions.py - Fix Czech XX region codes
- resolve_locations_by_name.py - Name-based location lookup
- resolve_regions_from_city.py - Derive regions from city data
- update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data

CH-Annotator integration:
- create_custodian_from_ch_annotator.py - Create custodians from annotations
- add_ch_annotator_location_claims.py - Add location claims
- extract_locations_ch_annotator.py - Extract locations from annotations

Migration and fixes:
- migrate_egyptian_from_ch.py - Migrate Egyptian data
- migrate_web_archives.py - Migrate web archive data
- fix_belgian_cities.py - Fix Belgian city data
2025-12-07 14:26:59 +01:00

619 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Update GHCID region and city codes using GeoNames reverse geocoding.
For custodian files that have coordinates, this script:
1. Reverse geocodes coordinates to find the nearest GeoNames city
2. Extracts proper admin1_code (region) and city code
3. Updates the GHCID with correct codes
4. Renames the file if GHCID changes
Usage:
python scripts/update_ghcid_with_geonames.py [--dry-run] [--limit N] [--country CODE]
"""
import argparse
import hashlib
import os
import re
import shutil
import sqlite3
import uuid
import yaml
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Paths
PROJECT_ROOT = Path(__file__).parent.parent
CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian"
GEONAMES_DB = PROJECT_ROOT / "data" / "reference" / "geonames.db"
REPORTS_DIR = PROJECT_ROOT / "reports"
# GHCID namespace for UUID generation
GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
# Country-specific region code mappings (GeoNames admin1_code -> ISO 3166-2)
# This handles cases where GeoNames codes differ from ISO codes
REGION_CODE_MAPPINGS = {
'NL': {
'01': 'DR', # Drenthe
'02': 'FR', # Friesland
'03': 'GE', # Gelderland
'04': 'GR', # Groningen
'05': 'LI', # Limburg
'06': 'NB', # Noord-Brabant
'07': 'NH', # Noord-Holland
'09': 'UT', # Utrecht
'10': 'ZE', # Zeeland
'11': 'ZH', # Zuid-Holland
'15': 'OV', # Overijssel
'16': 'FL', # Flevoland
},
# Japan uses prefecture numbers which are fine as-is (2-digit)
# Most countries can use admin1_code directly
}
# Type code mapping
TYPE_TO_CODE = {
'GALLERY': 'G', 'LIBRARY': 'L', 'ARCHIVE': 'A', 'MUSEUM': 'M',
'OFFICIAL_INSTITUTION': 'O', 'RESEARCH_CENTER': 'R', 'CORPORATION': 'C',
'UNKNOWN': 'U', 'BOTANICAL_ZOO': 'B', 'EDUCATION_PROVIDER': 'E',
'COLLECTING_SOCIETY': 'S', 'FEATURES': 'F', 'INTANGIBLE_HERITAGE_GROUP': 'I',
'MIXED': 'X', 'PERSONAL_COLLECTION': 'P', 'HOLY_SITES': 'H',
'DIGITAL_PLATFORM': 'D', 'NGO': 'N', 'TASTE_SMELL': 'T',
}
def get_geonames_connection() -> sqlite3.Connection:
"""Get connection to GeoNames database."""
return sqlite3.connect(GEONAMES_DB)
def reverse_geocode(lat: float, lon: float, country_code: str, conn: sqlite3.Connection) -> Optional[Dict]:
"""
Find nearest GeoNames city for given coordinates.
Uses simple Euclidean distance (good enough for nearby city matching).
Filters by feature_code to exclude neighborhoods (PPLX).
"""
# Query for nearest city, excluding PPLX (neighborhoods)
cursor = conn.execute("""
SELECT
geonames_id, name, ascii_name, admin1_code, admin1_name,
latitude, longitude, feature_code, population,
((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as distance_sq
FROM cities
WHERE country_code = ?
AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
ORDER BY distance_sq
LIMIT 1
""", (lat, lat, lon, lon, country_code))
row = cursor.fetchone()
if row:
return {
'geonames_id': row[0],
'city_name': row[1],
'ascii_name': row[2],
'admin1_code': row[3],
'admin1_name': row[4],
'latitude': row[5],
'longitude': row[6],
'feature_code': row[7],
'population': row[8],
'distance_sq': row[9],
}
return None
def generate_city_code(name: str) -> str:
"""Generate 3-letter city code from name."""
import unicodedata
if not name:
return "XXX"
# Normalize and remove diacritics
normalized = unicodedata.normalize('NFD', name)
ascii_only = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Keep only alphanumeric
clean = re.sub(r'[^a-zA-Z0-9]', '', ascii_only)
return clean[:3].upper() if clean else "XXX"
def get_region_code(country_code: str, admin1_code: str) -> str:
"""Get 2-letter region code, using mappings if available."""
if not admin1_code:
return "XX"
# Check for country-specific mapping
if country_code in REGION_CODE_MAPPINGS:
mapped = REGION_CODE_MAPPINGS[country_code].get(admin1_code)
if mapped:
return mapped
# Use admin1_code directly (truncate to 2 chars if needed)
return admin1_code[:2].upper()
def generate_ghcid(country_code: str, region_code: str, city_code: str,
institution_type: str, abbreviation: str,
name_suffix: Optional[str] = None) -> str:
"""Generate GHCID string."""
type_code = TYPE_TO_CODE.get(institution_type, 'U')
ghcid = f"{country_code}-{region_code}-{city_code}-{type_code}-{abbreviation}"
if name_suffix:
ghcid = f"{ghcid}-{name_suffix}"
return ghcid
def generate_ghcid_uuid(ghcid: str) -> str:
"""Generate UUID v5 from GHCID."""
return str(uuid.uuid5(GHCID_NAMESPACE, ghcid))
def generate_ghcid_uuid_sha256(ghcid: str) -> str:
"""Generate UUID v8 (SHA-256 based) from GHCID."""
sha256_hash = hashlib.sha256(ghcid.encode()).hexdigest()
return f"{sha256_hash[:8]}-{sha256_hash[8:12]}-8{sha256_hash[13:16]}-{sha256_hash[16:20]}-{sha256_hash[20:32]}"
def generate_ghcid_numeric(ghcid: str) -> int:
"""Generate 64-bit numeric ID from GHCID."""
sha256_hash = hashlib.sha256(ghcid.encode()).digest()
return int.from_bytes(sha256_hash[:8], 'big')
def extract_coordinates(data: Dict) -> Optional[Tuple[float, float]]:
"""Extract latitude/longitude from custodian data."""
# Check original_entry.locations
locations = data.get('original_entry', {}).get('locations', [])
if locations and isinstance(locations, list):
loc = locations[0]
lat = loc.get('latitude')
lon = loc.get('longitude')
if lat is not None and lon is not None:
return (float(lat), float(lon))
# Check top-level locations
locations = data.get('locations', [])
if locations and isinstance(locations, list):
loc = locations[0]
lat = loc.get('latitude')
lon = loc.get('longitude')
if lat is not None and lon is not None:
return (float(lat), float(lon))
# Check google_maps_enrichment
gm = data.get('google_maps_enrichment', {})
lat = gm.get('latitude')
lon = gm.get('longitude')
if lat is not None and lon is not None:
return (float(lat), float(lon))
return None
def extract_country_code(data: Dict) -> str:
"""Extract country code from custodian data."""
# Try ghcid.location_resolution
country = data.get('ghcid', {}).get('location_resolution', {}).get('country_code')
if country and country != 'XX':
return country
# Try original_entry.locations
locations = data.get('original_entry', {}).get('locations', [])
if locations:
country = locations[0].get('country')
if country:
return country
# Try top-level locations
locations = data.get('locations', [])
if locations:
country = locations[0].get('country')
if country:
return country
return 'XX'
def extract_abbreviation_from_ghcid(ghcid: str) -> str:
"""Extract the abbreviation component from a GHCID."""
parts = ghcid.split('-')
if len(parts) >= 5:
return parts[4]
return "UNK"
def extract_name_suffix_from_ghcid(ghcid: str) -> Optional[str]:
"""Extract name suffix from GHCID if present."""
parts = ghcid.split('-')
if len(parts) > 5:
return '-'.join(parts[5:])
return None
def validate_ch_annotator_entity(data: Dict) -> Tuple[bool, str]:
"""
Validate that the entity has a valid CH-Annotator profile for heritage institutions.
Returns (is_valid, entity_subtype).
Valid subtypes for enrichment: GRP.HER.* (heritage institutions)
"""
ch_annotator = data.get('ch_annotator', {})
entity_class = ch_annotator.get('entity_classification', {})
hypernym = entity_class.get('hypernym', '')
subtype = entity_class.get('subtype', '')
# Valid heritage institution subtypes
valid_subtypes = [
'GRP.HER', # Generic heritage institution
'GRP.HER.GAL', # Gallery
'GRP.HER.LIB', # Library
'GRP.HER.ARC', # Archive
'GRP.HER.MUS', # Museum
'GRP.HER.RES', # Research center
'GRP.HER.EDU', # Education provider
'GRP.HER.REL', # Religious heritage site
'GRP.HER.BOT', # Botanical/zoo
'GRP.HER.MIX', # Mixed type
]
# Check if entity has valid heritage subtype
if subtype:
for valid in valid_subtypes:
if subtype.startswith(valid):
return (True, subtype)
# Fallback: check hypernym is GROUP
if hypernym == 'GRP':
# Check institution_type from original_entry
inst_type = data.get('original_entry', {}).get('institution_type', '')
if inst_type in TYPE_TO_CODE:
return (True, f'GRP.HER.{inst_type[:3]}')
# No valid CH-Annotator profile - but still allow processing if has institution_type
inst_type = data.get('original_entry', {}).get('institution_type', '')
if inst_type and inst_type != 'UNKNOWN':
return (True, f'INFERRED.{inst_type}')
return (False, '')
def process_file(filepath: Path, conn: sqlite3.Connection, dry_run: bool = False,
require_ch_annotator: bool = False) -> Dict:
"""
Process a single custodian file.
Args:
filepath: Path to custodian YAML file
conn: GeoNames database connection
dry_run: If True, don't write changes
require_ch_annotator: If True, skip files without valid CH-Annotator entity profile
Returns dict with processing results.
"""
result = {
'file': filepath.name,
'status': 'skipped',
'old_ghcid': None,
'new_ghcid': None,
'geonames_match': None,
'entity_profile': None,
'error': None,
}
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if not data:
result['status'] = 'error'
result['error'] = 'Empty file'
return result
# Validate CH-Annotator entity profile
is_valid_entity, entity_subtype = validate_ch_annotator_entity(data)
result['entity_profile'] = entity_subtype
if require_ch_annotator and not is_valid_entity:
result['status'] = 'invalid_entity_profile'
result['error'] = 'No valid CH-Annotator GRP.HER.* entity profile'
return result
# Get current GHCID
current_ghcid = data.get('ghcid', {}).get('ghcid_current')
if not current_ghcid:
result['status'] = 'error'
result['error'] = 'No GHCID found'
return result
result['old_ghcid'] = current_ghcid
# Check if already has proper GeoNames resolution
resolution = data.get('ghcid', {}).get('location_resolution', {})
if resolution.get('method') == 'REVERSE_GEOCODE' and resolution.get('geonames_id'):
result['status'] = 'already_geocoded'
return result
# Extract coordinates
coords = extract_coordinates(data)
if not coords:
result['status'] = 'no_coordinates'
return result
lat, lon = coords
country_code = extract_country_code(data)
if country_code == 'XX':
result['status'] = 'no_country'
return result
# Reverse geocode
geo_result = reverse_geocode(lat, lon, country_code, conn)
if not geo_result:
result['status'] = 'geocode_failed'
return result
result['geonames_match'] = {
'city': geo_result['city_name'],
'admin1': geo_result['admin1_name'],
'geonames_id': geo_result['geonames_id'],
}
# Generate new codes
new_region_code = get_region_code(country_code, geo_result['admin1_code'])
new_city_code = generate_city_code(geo_result['ascii_name'])
# Extract existing abbreviation and name suffix
abbreviation = extract_abbreviation_from_ghcid(current_ghcid)
name_suffix = extract_name_suffix_from_ghcid(current_ghcid)
# Get institution type
inst_type = data.get('original_entry', {}).get('institution_type', 'UNKNOWN')
# Generate new GHCID
new_ghcid = generate_ghcid(country_code, new_region_code, new_city_code,
inst_type, abbreviation, name_suffix)
result['new_ghcid'] = new_ghcid
# Check if GHCID changed
if new_ghcid == current_ghcid:
result['status'] = 'unchanged'
return result
if dry_run:
result['status'] = 'would_update'
return result
# Update the data
timestamp = datetime.now(timezone.utc).isoformat()
# Update GHCID section
data['ghcid']['ghcid_current'] = new_ghcid
data['ghcid']['ghcid_uuid'] = generate_ghcid_uuid(new_ghcid)
data['ghcid']['ghcid_uuid_sha256'] = generate_ghcid_uuid_sha256(new_ghcid)
data['ghcid']['ghcid_numeric'] = generate_ghcid_numeric(new_ghcid)
# Update location_resolution
data['ghcid']['location_resolution'] = {
'method': 'REVERSE_GEOCODE',
'country_code': country_code,
'region_code': new_region_code,
'region_name': geo_result['admin1_name'],
'city_code': new_city_code,
'city_name': geo_result['city_name'],
'geonames_id': geo_result['geonames_id'],
'feature_code': geo_result['feature_code'],
'resolution_date': timestamp,
}
# Add to GHCID history
history = data['ghcid'].get('ghcid_history', [])
# Mark old GHCID as superseded
if history:
history[0]['valid_to'] = timestamp
history[0]['superseded_by'] = new_ghcid
# Add new GHCID entry
history.insert(0, {
'ghcid': new_ghcid,
'ghcid_numeric': generate_ghcid_numeric(new_ghcid),
'valid_from': timestamp,
'reason': f'Updated via GeoNames reverse geocoding (matched {geo_result["city_name"]}, geonames:{geo_result["geonames_id"]})',
})
data['ghcid']['ghcid_history'] = history
# Update identifiers
for ident in data.get('identifiers', []):
if ident.get('identifier_scheme') == 'GHCID':
ident['identifier_value'] = new_ghcid
elif ident.get('identifier_scheme') == 'GHCID_UUID':
ident['identifier_value'] = generate_ghcid_uuid(new_ghcid)
elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256':
ident['identifier_value'] = generate_ghcid_uuid_sha256(new_ghcid)
elif ident.get('identifier_scheme') == 'GHCID_NUMERIC':
ident['identifier_value'] = str(generate_ghcid_numeric(new_ghcid))
# Write updated data
new_filename = f"{new_ghcid}.yaml"
new_filepath = CUSTODIAN_DIR / new_filename
with open(new_filepath, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Remove old file if different
if filepath != new_filepath:
os.remove(filepath)
result['status'] = 'updated'
return result
except Exception as e:
result['status'] = 'error'
result['error'] = str(e)
return result
def main():
parser = argparse.ArgumentParser(description='Update GHCID with GeoNames data')
parser.add_argument('--dry-run', action='store_true', help='Show changes without applying')
parser.add_argument('--limit', type=int, help='Limit number of files to process')
parser.add_argument('--country', type=str, help='Only process files for specific country')
parser.add_argument('--verbose', action='store_true', help='Show detailed output')
parser.add_argument('--require-ch-annotator', action='store_true',
help='Only process files with valid CH-Annotator GRP.HER.* entity profile')
args = parser.parse_args()
print("=" * 60)
print("Update GHCID with GeoNames Reverse Geocoding")
print("=" * 60)
print()
if args.dry_run:
print("*** DRY RUN - No changes will be made ***")
print()
if args.require_ch_annotator:
print("*** Requiring CH-Annotator entity profile (GRP.HER.*) ***")
print()
# Connect to GeoNames
if not GEONAMES_DB.exists():
print(f"Error: GeoNames database not found at {GEONAMES_DB}")
return
conn = get_geonames_connection()
print(f"Connected to GeoNames database")
# Get list of files
files = list(CUSTODIAN_DIR.glob("*.yaml"))
print(f"Found {len(files)} custodian files")
# Filter by country if specified
if args.country:
files = [f for f in files if f.name.startswith(f"{args.country}-")]
print(f"Filtered to {len(files)} files for country {args.country}")
# Apply limit
if args.limit:
files = files[:args.limit]
print(f"Limited to {args.limit} files")
print()
# Process files
stats = {
'updated': 0,
'unchanged': 0,
'already_geocoded': 0,
'no_coordinates': 0,
'no_country': 0,
'geocode_failed': 0,
'would_update': 0,
'invalid_entity_profile': 0,
'error': 0,
}
updates = []
entity_profiles_seen = {}
for i, filepath in enumerate(files):
if (i + 1) % 500 == 0:
print(f"Progress: {i + 1}/{len(files)}")
result = process_file(filepath, conn, args.dry_run, args.require_ch_annotator)
stats[result['status']] = stats.get(result['status'], 0) + 1
# Track entity profiles
profile = result.get('entity_profile', 'NONE')
entity_profiles_seen[profile] = entity_profiles_seen.get(profile, 0) + 1
if result['status'] in ('updated', 'would_update'):
updates.append(result)
if args.verbose:
print(f" {result['old_ghcid']} -> {result['new_ghcid']}")
print(f" Matched: {result['geonames_match']}")
print(f" Entity: {result.get('entity_profile', 'N/A')}")
conn.close()
# Print summary
print()
print("=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"Total files processed: {len(files)}")
print()
print("Results:")
print(f" Updated: {stats.get('updated', 0)}")
print(f" Would update (dry-run): {stats.get('would_update', 0)}")
print(f" Unchanged: {stats.get('unchanged', 0)}")
print(f" Already geocoded: {stats.get('already_geocoded', 0)}")
print(f" No coordinates: {stats.get('no_coordinates', 0)}")
print(f" No country code: {stats.get('no_country', 0)}")
print(f" Geocode failed: {stats.get('geocode_failed', 0)}")
print(f" Invalid entity profile: {stats.get('invalid_entity_profile', 0)}")
print(f" Errors: {stats.get('error', 0)}")
# Print entity profile breakdown
if entity_profiles_seen:
print()
print("CH-Annotator Entity Profiles:")
for profile, count in sorted(entity_profiles_seen.items(), key=lambda x: -x[1])[:10]:
print(f" {profile}: {count}")
# Save report
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
report_file = REPORTS_DIR / f"GEONAMES_UPDATE_REPORT_{timestamp}.md"
with open(report_file, 'w') as f:
f.write("# GeoNames GHCID Update Report\n\n")
f.write(f"Generated: {datetime.now(timezone.utc).isoformat()}\n\n")
f.write("## Summary\n\n")
f.write(f"| Metric | Count |\n")
f.write(f"|--------|-------|\n")
f.write(f"| Files processed | {len(files)} |\n")
f.write(f"| Updated | {stats.get('updated', 0)} |\n")
f.write(f"| Would update | {stats.get('would_update', 0)} |\n")
f.write(f"| Unchanged | {stats.get('unchanged', 0)} |\n")
f.write(f"| Already geocoded | {stats.get('already_geocoded', 0)} |\n")
f.write(f"| No coordinates | {stats.get('no_coordinates', 0)} |\n")
f.write(f"| Geocode failed | {stats.get('geocode_failed', 0)} |\n")
f.write(f"| Invalid entity profile | {stats.get('invalid_entity_profile', 0)} |\n")
f.write(f"| Errors | {stats.get('error', 0)} |\n")
# Entity profile breakdown
if entity_profiles_seen:
f.write("\n## CH-Annotator Entity Profiles\n\n")
f.write("| Entity Profile | Count |\n")
f.write("|---------------|-------|\n")
for profile, count in sorted(entity_profiles_seen.items(), key=lambda x: -x[1]):
f.write(f"| {profile} | {count} |\n")
if updates:
f.write("\n## Updates\n\n")
f.write("| Old GHCID | New GHCID | Matched City | Entity Profile |\n")
f.write("|-----------|-----------|-------------|----------------|\n")
for u in updates[:100]: # Limit to first 100
city = u.get('geonames_match', {}).get('city', 'N/A')
profile = u.get('entity_profile', 'N/A')
f.write(f"| {u['old_ghcid']} | {u['new_ghcid']} | {city} | {profile} |\n")
if len(updates) > 100:
f.write(f"\n*... and {len(updates) - 100} more updates*\n")
print()
print(f"Report saved to: {report_file}")
if __name__ == '__main__':
main()