glam/scripts/fix_ghcid_location_mismatches.py
kempersc 933deb337c refactor(scripts): generalize GHCID location fixer for all institution types
- Add --type/-t flag to specify institution type (A, G, H, I, L, M, N, O, R, S, T, U, X, ALL)
- Default still Type I (Intangible Heritage) for backward compatibility
- Skip PENDING files that have no location data
- Update help text with all supported types
2026-01-09 11:54:28 +01:00

591 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Fix GHCID location mismatches for heritage custodian files.
This script:
1. Identifies files where GHCID location component doesn't match actual location in locations[] array
2. Looks up correct GeoNames data for the actual city
3. Generates proper GHCID with all identifier formats (UUID v5, UUID v8, numeric)
4. Updates all relevant fields in the YAML file
5. Renames files to match new GHCID
Usage:
python scripts/fix_ghcid_location_mismatches.py --dry-run # Preview Type I changes
python scripts/fix_ghcid_location_mismatches.py --type M --dry-run # Preview Museum changes
python scripts/fix_ghcid_location_mismatches.py --type A # Fix Archive files
python scripts/fix_ghcid_location_mismatches.py --type ALL --dry-run # Preview ALL types
Supported types: A (Archive), G (Gallery), H (Holy Sites), I (Intangible), L (Library),
M (Museum), N (NGO), O (Official), R (Research), S (Society),
T (Taste/Smell), U (Unknown), X (Mixed), ALL
"""
import argparse
import hashlib
import os
import re
import shutil
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Tuple
import yaml
# GHCID namespace UUID (RFC 4122 DNS namespace)
GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
# Netherlands admin1 code to ISO 3166-2 province code mapping
ADMIN1_TO_PROVINCE = {
'01': 'DR', # Drenthe
'02': 'FR', # Friesland
'03': 'GE', # Gelderland
'04': 'GR', # Groningen
'05': 'LI', # Limburg
'06': 'NB', # Noord-Brabant
'07': 'NH', # Noord-Holland
'09': 'UT', # Utrecht
'10': 'ZE', # Zeeland
'11': 'ZH', # Zuid-Holland
'15': 'OV', # Overijssel
'16': 'FL', # Flevoland
}
# Special city name mappings for 3-letter codes
SPECIAL_CITY_CODES = {
"'s-Hertogenbosch": "SHE",
"s-Hertogenbosch": "SHE",
"'s-Gravenhage": "SGR",
"Den Haag": "DHA",
"The Hague": "DHA",
"Den Burg": "DBU",
"Den Helder": "DHE",
"De Kwakel": "DKW",
"Sint Nicolaasga": "SNI",
"Sint Jansklooster": "SJK",
"Sint-Oedenrode": "SOR",
"Wijk bij Duurstede": "WBD",
"Alphen aan den Rijn": "AAR",
"Bergen op Zoom": "BOZ",
"Tweede Exloërmond": "TEX",
"Budel-Schoot": "BUS",
"Vierlingsbeek": "VIE",
"Leenderstrijp": "LEE",
"Sinoutskerke": "SIN",
"Espelo": "ESP",
"Denekamp": "DEN",
"Haarzuilens": "HAA",
"Nootdorp": "NOO",
"Ameland": "AME",
"Essen": "ESS",
"Didam": "DID",
"Venhuizen": "VEN",
"Bleskensgraaf": "BLE",
"Noordwijk": "NOO",
"Ootmarsum": "OOT",
"Zwaag": "ZWA",
"Diepenheim": "DIE",
"Wierden": "WIE",
"Zierikzee": "ZIE",
"Heemskerk": "HEE",
"Zundert": "ZUN",
}
# Valid feature codes for settlements (not neighborhoods)
VALID_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
def generate_city_code(city_name: str) -> str:
"""Generate 3-letter city code from city name."""
# Check special mappings first
if city_name in SPECIAL_CITY_CODES:
return SPECIAL_CITY_CODES[city_name]
# Handle Dutch articles and prefixes
name = city_name
# Remove quotes and normalize
name = name.replace("'", "").replace("-", " ")
# Split into words
words = name.split()
if len(words) == 1:
# Single word: first 3 letters
return words[0][:3].upper()
elif len(words) >= 2:
# Check for Dutch articles at start
dutch_articles = ['de', 'het', 'den', "'s"]
if words[0].lower() in dutch_articles:
# Article + main word: take article initial + 2 from main word
return (words[0][0] + words[1][:2]).upper()
else:
# Multi-word: take initials (up to 3)
initials = ''.join(w[0] for w in words[:3])
return initials.upper()
return name[:3].upper()
def generate_ghcid_identifiers(ghcid_string: str) -> dict:
"""Generate all GHCID identifier formats."""
# UUID v5 (SHA-1) - PRIMARY
uuid_v5 = uuid.uuid5(GHCID_NAMESPACE, ghcid_string)
# UUID v8 (SHA-256) - Secondary
sha256_hash = hashlib.sha256(ghcid_string.encode()).digest()[:16]
sha256_hash = bytearray(sha256_hash)
sha256_hash[6] = (sha256_hash[6] & 0x0F) | 0x80 # Version 8
sha256_hash[8] = (sha256_hash[8] & 0x3F) | 0x80 # Variant
uuid_sha256 = uuid.UUID(bytes=bytes(sha256_hash))
# Numeric (64-bit from SHA-256)
full_hash = hashlib.sha256(ghcid_string.encode()).digest()
numeric = int.from_bytes(full_hash[:8], 'big')
return {
'ghcid_uuid': str(uuid_v5),
'ghcid_uuid_sha256': str(uuid_sha256),
'ghcid_numeric': str(numeric),
}
# City name aliases for GeoNames lookup
CITY_NAME_ALIASES = {
"Den Haag": ["The Hague", "'s-Gravenhage", "s-Gravenhage"],
"The Hague": ["Den Haag", "'s-Gravenhage", "s-Gravenhage"],
"'s-Gravenhage": ["The Hague", "Den Haag", "s-Gravenhage"],
"'s-Hertogenbosch": ["s-Hertogenbosch", "Hertogenbosch", "Den Bosch"],
"Ameland": ["Hollum", "Nes"], # Main villages on Ameland
}
def lookup_city_geonames(db_path: str, city_name: str, country_code: str = 'NL') -> Optional[dict]:
"""Look up city in GeoNames database."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Normalize city name for search
search_name = city_name.replace("'s-", "s-").replace("'", "")
# Build list of names to try
names_to_try = [city_name, search_name]
if city_name in CITY_NAME_ALIASES:
names_to_try.extend(CITY_NAME_ALIASES[city_name])
# Try each name variant
row = None
for name_variant in names_to_try:
# Try exact match first
cursor.execute("""
SELECT geonames_id, name, ascii_name, admin1_code, admin1_name,
latitude, longitude, population, feature_code
FROM cities
WHERE country_code = ?
AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?))
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, name_variant, name_variant) + VALID_FEATURE_CODES)
row = cursor.fetchone()
if row:
break
if not row:
# Try fuzzy match with LIKE as last resort
for name_variant in names_to_try:
cursor.execute("""
SELECT geonames_id, name, ascii_name, admin1_code, admin1_name,
latitude, longitude, population, feature_code
FROM cities
WHERE country_code = ?
AND (LOWER(name) LIKE LOWER(?) OR LOWER(ascii_name) LIKE LOWER(?))
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, f"%{name_variant}%", f"%{name_variant}%") + VALID_FEATURE_CODES)
row = cursor.fetchone()
if row:
break
conn.close()
if row:
admin1_code = row[3] or ''
province_code = ADMIN1_TO_PROVINCE.get(admin1_code, 'XX')
return {
'geonames_id': row[0],
'name': row[1],
'ascii_name': row[2],
'admin1_code': admin1_code,
'admin1_name': row[4],
'province_code': province_code,
'latitude': row[5],
'longitude': row[6],
'population': row[7],
'feature_code': row[8],
}
return None
def extract_locations_city(data: dict) -> Optional[str]:
"""Extract city from locations array in YAML data."""
locations = data.get('locations', [])
if locations and isinstance(locations, list) and len(locations) > 0:
return locations[0].get('city')
return None
def extract_abbreviation(ghcid: str) -> str:
"""Extract abbreviation from GHCID (everything after 5th component)."""
parts = ghcid.split('-')
if len(parts) >= 5:
return '-'.join(parts[4:])
return ''
def parse_ghcid(ghcid: str) -> Tuple[str, str, str, str, str]:
"""Parse GHCID into components."""
parts = ghcid.split('-')
if len(parts) >= 5:
country = parts[0]
region = parts[1]
city = parts[2]
inst_type = parts[3]
abbrev = '-'.join(parts[4:])
return country, region, city, inst_type, abbrev
return '', '', '', '', ''
def build_ghcid(country: str, region: str, city_code: str, inst_type: str, abbrev: str) -> str:
"""Build GHCID from components."""
return f"{country}-{region}-{city_code}-{inst_type}-{abbrev}"
def update_yaml_ghcid(data: dict, new_ghcid: str, old_ghcid: str, geonames_data: dict,
timestamp: str) -> dict:
"""Update all GHCID-related fields in YAML data."""
identifiers = generate_ghcid_identifiers(new_ghcid)
# Update ghcid section
if 'ghcid' not in data:
data['ghcid'] = {}
ghcid_section = data['ghcid']
ghcid_section['ghcid_current'] = new_ghcid
ghcid_section['ghcid_uuid'] = identifiers['ghcid_uuid']
ghcid_section['ghcid_uuid_sha256'] = identifiers['ghcid_uuid_sha256']
ghcid_section['ghcid_numeric'] = int(identifiers['ghcid_numeric'])
ghcid_section['generation_timestamp'] = timestamp
# Preserve record_id if it exists
# record_id should NOT change - it's the database primary key
# Update location_resolution
ghcid_section['location_resolution'] = {
'method': 'GEONAMES_LOOKUP',
'geonames_id': geonames_data['geonames_id'],
'geonames_name': geonames_data['name'],
'feature_code': geonames_data['feature_code'],
'population': geonames_data['population'],
'admin1_code': geonames_data['admin1_code'],
'region_code': geonames_data['province_code'],
'country_code': 'NL',
}
ghcid_section['geonames_id'] = geonames_data['geonames_id']
# Update ghcid_history
if 'ghcid_history' not in ghcid_section:
ghcid_section['ghcid_history'] = []
# Mark old GHCID as ended
for entry in ghcid_section['ghcid_history']:
if entry.get('ghcid') == old_ghcid and entry.get('valid_to') is None:
entry['valid_to'] = timestamp
# Add new GHCID entry
ghcid_section['ghcid_history'].append({
'ghcid': new_ghcid,
'ghcid_numeric': int(identifiers['ghcid_numeric']),
'valid_from': timestamp,
'valid_to': None,
'reason': f"GHCID corrected: location mismatch fix from {old_ghcid} to {new_ghcid}",
})
# Update identifiers array
if 'identifiers' in data:
for identifier in data['identifiers']:
if identifier.get('identifier_scheme') == 'GHCID':
identifier['identifier_value'] = new_ghcid
elif identifier.get('identifier_scheme') == 'GHCID_UUID':
identifier['identifier_value'] = identifiers['ghcid_uuid']
identifier['identifier_url'] = f"urn:uuid:{identifiers['ghcid_uuid']}"
elif identifier.get('identifier_scheme') == 'GHCID_UUID_SHA256':
identifier['identifier_value'] = identifiers['ghcid_uuid_sha256']
identifier['identifier_url'] = f"urn:uuid:{identifiers['ghcid_uuid_sha256']}"
elif identifier.get('identifier_scheme') == 'GHCID_NUMERIC':
identifier['identifier_value'] = identifiers['ghcid_numeric']
# Update location section to match locations array
if 'location' in data:
data['location']['city'] = geonames_data['name']
data['location']['region_code'] = geonames_data['province_code']
data['location']['geonames_id'] = geonames_data['geonames_id']
data['location']['geonames_name'] = geonames_data['name']
data['location']['feature_code'] = geonames_data['feature_code']
if geonames_data.get('latitude'):
data['location']['latitude'] = geonames_data['latitude']
data['location']['longitude'] = geonames_data['longitude']
data['location']['normalization_timestamp'] = timestamp
# Remove old coordinate provenance notes
if 'note' in data['location']:
del data['location']['note']
if 'coordinate_provenance_removed' in data['location']:
del data['location']['coordinate_provenance_removed']
# Add provenance note
if 'provenance' in data:
if 'notes' not in data['provenance']:
data['provenance']['notes'] = []
if isinstance(data['provenance']['notes'], list):
data['provenance']['notes'].append(
f"GHCID location corrected via fix_ghcid_location_mismatches.py on {timestamp}: "
f"{old_ghcid} -> {new_ghcid}"
)
return data
def find_mismatched_files(custodian_dir: Path, db_path: str, inst_type: str = 'I') -> list:
"""Find all files of given type with GHCID location mismatches.
Args:
custodian_dir: Path to custodian directory
db_path: Path to GeoNames database
inst_type: Institution type code (I, M, A, L, etc.) or 'ALL' for all types
"""
mismatches = []
# Build glob pattern based on institution type
if inst_type == 'ALL':
pattern = 'NL-*-*.yaml'
else:
pattern = f'NL-*-{inst_type}-*.yaml'
for filepath in sorted(custodian_dir.glob(pattern)):
filename = filepath.stem
current_ghcid = filename
# Skip PENDING files (no location data)
if 'PENDING' in current_ghcid:
continue
# Parse current GHCID
country, region, city_code, file_inst_type, abbrev = parse_ghcid(current_ghcid)
if not abbrev:
continue
# Load YAML
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
print(f"Error loading {filepath}: {e}")
continue
# Get actual city from locations array
actual_city = extract_locations_city(data)
if not actual_city:
continue
# Skip "Nederland" as it's for national organizations
if actual_city.lower() == 'nederland':
continue
# Generate expected city code
expected_city_code = generate_city_code(actual_city)
# Check if mismatch (city code or region is wrong)
if city_code != expected_city_code:
# Look up correct GeoNames data
geonames_data = lookup_city_geonames(db_path, actual_city)
if geonames_data:
new_ghcid = build_ghcid(
country,
geonames_data['province_code'],
expected_city_code,
file_inst_type,
abbrev
)
# Only add if the GHCID actually changes
if new_ghcid != current_ghcid:
mismatches.append({
'filepath': filepath,
'old_ghcid': current_ghcid,
'new_ghcid': new_ghcid,
'actual_city': actual_city,
'geonames_data': geonames_data,
'data': data,
})
else:
print(f"WARNING: Could not find GeoNames data for '{actual_city}' in {filepath}")
return mismatches
# Valid institution type codes
VALID_INST_TYPES = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'L', 'M', 'N', 'O', 'P', 'R', 'S', 'T', 'U', 'X', 'ALL']
def check_collision(custodian_dir: Path, new_ghcid: str, old_filepath: Path) -> bool:
"""Check if the new GHCID would collide with an existing file."""
new_filepath = custodian_dir / f"{new_ghcid}.yaml"
return new_filepath.exists() and new_filepath != old_filepath
def main():
parser = argparse.ArgumentParser(description='Fix GHCID location mismatches for heritage custodian files')
parser.add_argument('--dry-run', action='store_true', help='Preview changes without applying')
parser.add_argument('--type', '-t', default='I', choices=VALID_INST_TYPES,
help='Institution type code: A (Archive), G (Gallery), H (Holy Sites), '
'I (Intangible, default), L (Library), M (Museum), N (NGO), O (Official), '
'R (Research), S (Society), T (Taste/Smell), U (Unknown), X (Mixed), '
'ALL (all types)')
parser.add_argument('--custodian-dir', default='data/custodian', help='Path to custodian directory')
parser.add_argument('--geonames-db', default='data/reference/geonames.db', help='Path to GeoNames database')
args = parser.parse_args()
inst_type = args.type
# Resolve paths
script_dir = Path(__file__).parent.parent
custodian_dir = script_dir / args.custodian_dir
db_path = script_dir / args.geonames_db
if not custodian_dir.exists():
print(f"ERROR: Custodian directory not found: {custodian_dir}")
return 1
if not db_path.exists():
print(f"ERROR: GeoNames database not found: {db_path}")
return 1
print("=" * 80)
type_name = 'ALL types' if inst_type == 'ALL' else f'Type {inst_type}'
print(f"GHCID Location Mismatch Fixer for {type_name} Heritage Custodians")
print("=" * 80)
print(f"Custodian directory: {custodian_dir}")
print(f"GeoNames database: {db_path}")
print(f"Institution type: {inst_type}")
print(f"Mode: {'DRY RUN (preview only)' if args.dry_run else 'LIVE (applying changes)'}")
print()
# Find mismatches
print("Scanning for location mismatches...")
mismatches = find_mismatched_files(custodian_dir, str(db_path), inst_type)
print(f"Found {len(mismatches)} files with GHCID location mismatches")
print()
if not mismatches:
print("No mismatches found. Exiting.")
return 0
# Generate timestamp for all updates
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
# Process each mismatch
fixed_count = 0
skipped_count = 0
errors = []
for mismatch in mismatches:
old_ghcid = mismatch['old_ghcid']
new_ghcid = mismatch['new_ghcid']
actual_city = mismatch['actual_city']
filepath = mismatch['filepath']
geonames_data = mismatch['geonames_data']
data = mismatch['data']
print(f"{'[DRY RUN] ' if args.dry_run else ''}Processing: {old_ghcid}")
print(f" Actual city: {actual_city}")
print(f" Province: {geonames_data['province_code']} ({geonames_data.get('admin1_name', 'Unknown')})")
print(f" New GHCID: {new_ghcid}")
# Check for collision
if check_collision(custodian_dir, new_ghcid, filepath):
print(f" SKIPPED: Collision - {new_ghcid}.yaml already exists")
skipped_count += 1
print()
continue
# Generate new identifiers for display
identifiers = generate_ghcid_identifiers(new_ghcid)
print(f" UUID v5: {identifiers['ghcid_uuid']}")
print(f" UUID v8: {identifiers['ghcid_uuid_sha256']}")
print(f" Numeric: {identifiers['ghcid_numeric']}")
if args.dry_run:
print(f" Would rename: {filepath.name} -> {new_ghcid}.yaml")
print()
fixed_count += 1
continue
try:
# Update YAML data
updated_data = update_yaml_ghcid(
data, new_ghcid, old_ghcid, geonames_data, timestamp
)
# Write updated YAML to new file
new_filepath = filepath.parent / f"{new_ghcid}.yaml"
with open(new_filepath, 'w', encoding='utf-8') as f:
yaml.dump(updated_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Remove old file if different from new
if filepath != new_filepath:
filepath.unlink()
print(f" Renamed: {filepath.name} -> {new_filepath.name}")
else:
print(f" Updated: {filepath.name}")
fixed_count += 1
except Exception as e:
error_msg = f"Error processing {filepath}: {e}"
print(f" ERROR: {e}")
errors.append(error_msg)
print()
# Summary
print("=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total mismatches found: {len(mismatches)}")
print(f"Successfully {'would fix' if args.dry_run else 'fixed'}: {fixed_count}")
print(f"Skipped (collisions): {skipped_count}")
print(f"Errors: {len(errors)}")
if errors:
print("\nErrors:")
for error in errors:
print(f" - {error}")
if args.dry_run:
print("\nThis was a dry run. Run without --dry-run to apply changes.")
return 0 if not errors else 1
if __name__ == '__main__':
exit(main())