glam/scripts/resolve_pending_locations.py
kempersc e313744cf6 feat(scripts): add resolve_pending_locations.py for GHCID resolution
Script to resolve NL-XX-XXX-PENDING files that have city names in filename:
- Looks up city in GeoNames database
- Updates YAML with location data (city, region, country)
- Generates proper GHCID with UUID v5/v8
- Renames files to match new GHCID
- Archives original PENDING files for reference
2026-01-09 12:18:46 +01:00

526 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Resolve location data for NL-XX-XXX-PENDING files that have city names in their filename.
This script:
1. Scans PENDING files for Dutch city names in their filename
2. Looks up the city in GeoNames database
3. Updates the YAML with location data
4. Generates proper GHCID
5. Renames files to match new GHCID
Usage:
python scripts/resolve_pending_locations.py --dry-run # Preview changes
python scripts/resolve_pending_locations.py # Apply changes
"""
import argparse
import hashlib
import os
import re
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Tuple
import yaml
# GHCID namespace UUID (RFC 4122 DNS namespace)
GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
# Netherlands admin1 code to ISO 3166-2 province code mapping
ADMIN1_TO_PROVINCE = {
'01': 'DR', # Drenthe
'02': 'FR', # Friesland
'03': 'GE', # Gelderland
'04': 'GR', # Groningen
'05': 'LI', # Limburg
'06': 'NB', # Noord-Brabant
'07': 'NH', # Noord-Holland
'09': 'UT', # Utrecht
'10': 'ZE', # Zeeland
'11': 'ZH', # Zuid-Holland
'15': 'OV', # Overijssel
'16': 'FL', # Flevoland
}
# Dutch cities to search for in filenames (lowercase for matching)
DUTCH_CITIES = [
'amsterdam', 'rotterdam', 'den-haag', 'the-hague', 'utrecht', 'eindhoven',
'groningen', 'tilburg', 'almere', 'breda', 'nijmegen', 'apeldoorn',
'haarlem', 'arnhem', 'enschede', 'amersfoort', 'zaanstad', 'haarlemmermeer',
's-hertogenbosch', 'hertogenbosch', 'den-bosch', 'zwolle', 'zoetermeer',
'leiden', 'maastricht', 'dordrecht', 'ede', 'delft', 'alkmaar', 'venlo',
'deventer', 'hilversum', 'heerlen', 'leeuwarden', 'lelystad', 'roosendaal',
'middelburg', 'oss', 'helmond', 'almelo', 'gouda', 'vlissingen', 'hoorn'
]
# Map filename city patterns to GeoNames search names
CITY_FILENAME_MAP = {
'den-haag': 'The Hague',
'the-hague': 'The Hague',
's-hertogenbosch': "'s-Hertogenbosch",
'hertogenbosch': "'s-Hertogenbosch",
'den-bosch': "'s-Hertogenbosch",
}
# Institution type mapping from institution_type field
INST_TYPE_MAP = {
'ARCHIVE': 'A',
'BOTANICAL_ZOO': 'B',
'CORPORATION': 'C',
'DIGITAL_PLATFORM': 'D',
'EDUCATION_PROVIDER': 'E',
'FEATURES': 'F',
'GALLERY': 'G',
'HOLY_SITES': 'H',
'INTANGIBLE_HERITAGE_GROUP': 'I',
'LIBRARY': 'L',
'MUSEUM': 'M',
'NGO': 'N',
'OFFICIAL_INSTITUTION': 'O',
'PERSONAL_COLLECTION': 'P',
'RESEARCH_CENTER': 'R',
'COLLECTING_SOCIETY': 'S',
'TASTE_SMELL': 'T',
'UNKNOWN': 'U',
'MIXED': 'X',
}
# Valid feature codes for settlements (not neighborhoods)
VALID_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
def extract_city_from_filename(filename: str) -> Optional[str]:
"""Extract Dutch city name from PENDING filename."""
# Remove extension and prefix
name = filename.replace('.yaml', '').replace('NL-XX-XXX-PENDING-', '')
name_lower = name.lower()
# Check each city
for city in DUTCH_CITIES:
# Check if city appears as a word boundary in filename
pattern = rf'(^|-)({re.escape(city)})(-|$)'
if re.search(pattern, name_lower):
# Map to proper GeoNames name
if city in CITY_FILENAME_MAP:
return CITY_FILENAME_MAP[city]
# Capitalize properly
return city.replace('-', ' ').title()
return None
def lookup_city_geonames(db_path: str, city_name: str, country_code: str = 'NL') -> Optional[dict]:
"""Look up city in GeoNames database."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Normalize city name for search
search_name = city_name.replace("'s-", "s-").replace("'", "")
# Try exact match first
cursor.execute("""
SELECT geonames_id, name, ascii_name, admin1_code, admin1_name,
latitude, longitude, population, feature_code
FROM cities
WHERE country_code = ?
AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?))
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, city_name, city_name) + VALID_FEATURE_CODES)
row = cursor.fetchone()
if not row:
# Try with normalized name
cursor.execute("""
SELECT geonames_id, name, ascii_name, admin1_code, admin1_name,
latitude, longitude, population, feature_code
FROM cities
WHERE country_code = ?
AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?))
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, search_name, search_name) + VALID_FEATURE_CODES)
row = cursor.fetchone()
if not row:
# Try fuzzy match
cursor.execute("""
SELECT geonames_id, name, ascii_name, admin1_code, admin1_name,
latitude, longitude, population, feature_code
FROM cities
WHERE country_code = ?
AND (LOWER(name) LIKE LOWER(?) OR LOWER(ascii_name) LIKE LOWER(?))
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, f"%{city_name}%", f"%{city_name}%") + VALID_FEATURE_CODES)
row = cursor.fetchone()
conn.close()
if row:
admin1_code = row[3] or ''
province_code = ADMIN1_TO_PROVINCE.get(admin1_code, 'XX')
return {
'geonames_id': row[0],
'name': row[1],
'ascii_name': row[2],
'admin1_code': admin1_code,
'admin1_name': row[4],
'province_code': province_code,
'latitude': row[5],
'longitude': row[6],
'population': row[7],
'feature_code': row[8],
}
return None
def generate_city_code(city_name: str) -> str:
"""Generate 3-letter city code from city name."""
# Special mappings
special_codes = {
"'s-Hertogenbosch": "SHE",
"The Hague": "DHA",
"'s-Gravenhage": "SGR",
}
if city_name in special_codes:
return special_codes[city_name]
# Handle Dutch articles and prefixes
name = city_name.replace("'", "").replace("-", " ")
words = name.split()
if len(words) == 1:
return words[0][:3].upper()
elif len(words) >= 2:
dutch_articles = ['de', 'het', 'den', "'s"]
if words[0].lower() in dutch_articles:
return (words[0][0] + words[1][:2]).upper()
else:
initials = ''.join(w[0] for w in words[:3])
return initials.upper()
return city_name[:3].upper()
def generate_abbreviation(emic_name: str) -> str:
"""Generate abbreviation from emic name."""
# Skip words (articles, prepositions)
skip_words = {'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der',
'des', "'s", 'aan', 'bij', 'met', 'naar', 'om', 'tot', 'uit',
'over', 'onder', 'door', 'en', 'of', 'the', 'a', 'an', 'of',
'and', 'or', 'for', 'to', 'at', 'by', 'with', 'from'}
# Clean name
name = re.sub(r'[^\w\s]', '', emic_name)
words = name.split()
# Take first letter of significant words
initials = []
for word in words:
if word.lower() not in skip_words and word:
initials.append(word[0].upper())
abbrev = ''.join(initials[:10]) # Max 10 chars
return abbrev if abbrev else emic_name[:3].upper()
def generate_ghcid_identifiers(ghcid_string: str) -> dict:
"""Generate all GHCID identifier formats."""
# UUID v5 (SHA-1) - PRIMARY
uuid_v5 = uuid.uuid5(GHCID_NAMESPACE, ghcid_string)
# UUID v8 (SHA-256) - Secondary
sha256_hash = hashlib.sha256(ghcid_string.encode()).digest()[:16]
sha256_hash = bytearray(sha256_hash)
sha256_hash[6] = (sha256_hash[6] & 0x0F) | 0x80 # Version 8
sha256_hash[8] = (sha256_hash[8] & 0x3F) | 0x80 # Variant
uuid_sha256 = uuid.UUID(bytes=bytes(sha256_hash))
# Numeric (64-bit from SHA-256)
full_hash = hashlib.sha256(ghcid_string.encode()).digest()
numeric = int.from_bytes(full_hash[:8], 'big')
return {
'ghcid_uuid': str(uuid_v5),
'ghcid_uuid_sha256': str(uuid_sha256),
'ghcid_numeric': str(numeric),
}
def update_yaml_with_location(data: dict, geonames_data: dict, new_ghcid: str,
old_ghcid: str, timestamp: str) -> dict:
"""Update YAML data with location and GHCID information."""
identifiers = generate_ghcid_identifiers(new_ghcid)
# Add locations array if missing
if 'locations' not in data or not data['locations']:
data['locations'] = []
# Add location entry
location_entry = {
'city': geonames_data['name'],
'region_code': geonames_data['province_code'],
'country': 'NL',
'geonames_id': geonames_data['geonames_id'],
'latitude': geonames_data['latitude'],
'longitude': geonames_data['longitude'],
}
# Only add if not already present
existing_cities = [loc.get('city') for loc in data['locations']]
if geonames_data['name'] not in existing_cities:
data['locations'].insert(0, location_entry)
# Update ghcid section
if 'ghcid' not in data:
data['ghcid'] = {}
ghcid_section = data['ghcid']
ghcid_section['ghcid_current'] = new_ghcid
ghcid_section['ghcid_uuid'] = identifiers['ghcid_uuid']
ghcid_section['ghcid_uuid_sha256'] = identifiers['ghcid_uuid_sha256']
ghcid_section['ghcid_numeric'] = int(identifiers['ghcid_numeric'])
ghcid_section['generation_timestamp'] = timestamp
# Update location_resolution
ghcid_section['location_resolution'] = {
'method': 'FILENAME_CITY_EXTRACTION',
'geonames_id': geonames_data['geonames_id'],
'geonames_name': geonames_data['name'],
'feature_code': geonames_data['feature_code'],
'population': geonames_data['population'],
'admin1_code': geonames_data['admin1_code'],
'region_code': geonames_data['province_code'],
'country_code': 'NL',
}
# Update ghcid_history
if 'ghcid_history' not in ghcid_section:
ghcid_section['ghcid_history'] = []
ghcid_section['ghcid_history'].append({
'ghcid': new_ghcid,
'ghcid_numeric': int(identifiers['ghcid_numeric']),
'valid_from': timestamp,
'valid_to': None,
'reason': f"Location resolved from filename: {old_ghcid} -> {new_ghcid}",
})
# Update top-level ghcid_current
data['ghcid_current'] = new_ghcid
# Add provenance note
if 'provenance' in data:
if 'notes' not in data['provenance']:
data['provenance']['notes'] = ''
notes = data['provenance'].get('notes', '')
if isinstance(notes, str):
data['provenance']['notes'] = notes + f"\nLocation resolved from filename on {timestamp}."
return data
def check_collision(custodian_dir: Path, new_ghcid: str, old_filepath: Path) -> bool:
"""Check if the new GHCID would collide with an existing file."""
new_filepath = custodian_dir / f"{new_ghcid}.yaml"
return new_filepath.exists() and new_filepath != old_filepath
def find_resolvable_pending_files(custodian_dir: Path, db_path: str) -> List[dict]:
"""Find PENDING files that can be resolved via filename city extraction."""
resolvable = []
for filepath in sorted(custodian_dir.glob('NL-XX-XXX-PENDING-*.yaml')):
filename = filepath.name
# Try to extract city from filename
city = extract_city_from_filename(filename)
if not city:
continue
# Look up city in GeoNames
geonames_data = lookup_city_geonames(db_path, city)
if not geonames_data:
print(f"WARNING: Could not find GeoNames data for '{city}' extracted from {filename}")
continue
# Load YAML to get institution type and emic name
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
print(f"Error loading {filepath}: {e}")
continue
# Get institution type code
inst_type_str = data.get('institution_type', 'UNKNOWN')
inst_type_code = INST_TYPE_MAP.get(inst_type_str, 'U')
# Get emic name for abbreviation
emic_name = data.get('custodian_name', {}).get('emic_name', '')
if not emic_name:
emic_name = filename.replace('NL-XX-XXX-PENDING-', '').replace('.yaml', '').replace('-', ' ')
# Generate abbreviation
abbrev = generate_abbreviation(emic_name)
# Build new GHCID
city_code = generate_city_code(geonames_data['name'])
new_ghcid = f"NL-{geonames_data['province_code']}-{city_code}-{inst_type_code}-{abbrev}"
resolvable.append({
'filepath': filepath,
'old_ghcid': filename.replace('.yaml', ''),
'new_ghcid': new_ghcid,
'city': city,
'geonames_data': geonames_data,
'data': data,
'emic_name': emic_name,
})
return resolvable
def main():
parser = argparse.ArgumentParser(description='Resolve location data for PENDING custodian files')
parser.add_argument('--dry-run', action='store_true', help='Preview changes without applying')
parser.add_argument('--custodian-dir', default='data/custodian', help='Path to custodian directory')
parser.add_argument('--geonames-db', default='data/reference/geonames.db', help='Path to GeoNames database')
parser.add_argument('--limit', type=int, default=0, help='Limit number of files to process (0 = no limit)')
args = parser.parse_args()
# Resolve paths
script_dir = Path(__file__).parent.parent
custodian_dir = script_dir / args.custodian_dir
db_path = script_dir / args.geonames_db
if not custodian_dir.exists():
print(f"ERROR: Custodian directory not found: {custodian_dir}")
return 1
if not db_path.exists():
print(f"ERROR: GeoNames database not found: {db_path}")
return 1
print("=" * 80)
print("PENDING File Location Resolver")
print("=" * 80)
print(f"Custodian directory: {custodian_dir}")
print(f"GeoNames database: {db_path}")
print(f"Mode: {'DRY RUN (preview only)' if args.dry_run else 'LIVE (applying changes)'}")
if args.limit:
print(f"Limit: {args.limit} files")
print()
# Find resolvable files
print("Scanning for PENDING files with city names in filename...")
resolvable = find_resolvable_pending_files(custodian_dir, str(db_path))
if args.limit:
resolvable = resolvable[:args.limit]
print(f"Found {len(resolvable)} files that can be resolved")
print()
if not resolvable:
print("No resolvable files found. Exiting.")
return 0
# Generate timestamp for all updates
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
# Process each file
resolved_count = 0
skipped_count = 0
errors = []
for item in resolvable:
old_ghcid = item['old_ghcid']
new_ghcid = item['new_ghcid']
city = item['city']
filepath = item['filepath']
geonames_data = item['geonames_data']
data = item['data']
emic_name = item['emic_name']
print(f"{'[DRY RUN] ' if args.dry_run else ''}Processing: {old_ghcid}")
print(f" Emic name: {emic_name}")
print(f" City extracted: {city}")
print(f" Province: {geonames_data['province_code']} ({geonames_data.get('admin1_name', 'Unknown')})")
print(f" New GHCID: {new_ghcid}")
# Check for collision
if check_collision(custodian_dir, new_ghcid, filepath):
print(f" SKIPPED: Collision - {new_ghcid}.yaml already exists")
skipped_count += 1
print()
continue
# Generate identifiers for display
identifiers = generate_ghcid_identifiers(new_ghcid)
print(f" UUID v5: {identifiers['ghcid_uuid']}")
if args.dry_run:
print(f" Would rename: {filepath.name} -> {new_ghcid}.yaml")
print()
resolved_count += 1
continue
try:
# Update YAML data
updated_data = update_yaml_with_location(
data, geonames_data, new_ghcid, old_ghcid, timestamp
)
# Write updated YAML to new file
new_filepath = filepath.parent / f"{new_ghcid}.yaml"
with open(new_filepath, 'w', encoding='utf-8') as f:
yaml.dump(updated_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Remove old file
filepath.unlink()
print(f" Renamed: {filepath.name} -> {new_filepath.name}")
resolved_count += 1
except Exception as e:
error_msg = f"Error processing {filepath}: {e}"
print(f" ERROR: {e}")
errors.append(error_msg)
print()
# Summary
print("=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total resolvable files: {len(resolvable)}")
print(f"Successfully {'would resolve' if args.dry_run else 'resolved'}: {resolved_count}")
print(f"Skipped (collisions): {skipped_count}")
print(f"Errors: {len(errors)}")
if errors:
print("\nErrors:")
for error in errors:
print(f" - {error}")
if args.dry_run:
print("\nThis was a dry run. Run without --dry-run to apply changes.")
return 0 if not errors else 1
if __name__ == '__main__':
exit(main())