From e313744cf67edc990ad7589a452efe7c473a1c38 Mon Sep 17 00:00:00 2001 From: kempersc Date: Fri, 9 Jan 2026 12:18:46 +0100 Subject: [PATCH] feat(scripts): add resolve_pending_locations.py for GHCID resolution Script to resolve NL-XX-XXX-PENDING files that have city names in filename: - Looks up city in GeoNames database - Updates YAML with location data (city, region, country) - Generates proper GHCID with UUID v5/v8 - Renames files to match new GHCID - Archives original PENDING files for reference --- scripts/resolve_pending_locations.py | 526 +++++++++++++++++++++++++++ 1 file changed, 526 insertions(+) create mode 100755 scripts/resolve_pending_locations.py diff --git a/scripts/resolve_pending_locations.py b/scripts/resolve_pending_locations.py new file mode 100755 index 0000000000..b68c6736e7 --- /dev/null +++ b/scripts/resolve_pending_locations.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +""" +Resolve location data for NL-XX-XXX-PENDING files that have city names in their filename. + +This script: +1. Scans PENDING files for Dutch city names in their filename +2. Looks up the city in GeoNames database +3. Updates the YAML with location data +4. Generates proper GHCID +5. Renames files to match new GHCID + +Usage: + python scripts/resolve_pending_locations.py --dry-run # Preview changes + python scripts/resolve_pending_locations.py # Apply changes +""" + +import argparse +import hashlib +import os +import re +import sqlite3 +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional, List, Tuple + +import yaml + +# GHCID namespace UUID (RFC 4122 DNS namespace) +GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') + +# Netherlands admin1 code to ISO 3166-2 province code mapping +ADMIN1_TO_PROVINCE = { + '01': 'DR', # Drenthe + '02': 'FR', # Friesland + '03': 'GE', # Gelderland + '04': 'GR', # Groningen + '05': 'LI', # Limburg + '06': 'NB', # Noord-Brabant + '07': 'NH', # Noord-Holland + '09': 'UT', # Utrecht + '10': 'ZE', # Zeeland + '11': 'ZH', # Zuid-Holland + '15': 'OV', # Overijssel + '16': 'FL', # Flevoland +} + +# Dutch cities to search for in filenames (lowercase for matching) +DUTCH_CITIES = [ + 'amsterdam', 'rotterdam', 'den-haag', 'the-hague', 'utrecht', 'eindhoven', + 'groningen', 'tilburg', 'almere', 'breda', 'nijmegen', 'apeldoorn', + 'haarlem', 'arnhem', 'enschede', 'amersfoort', 'zaanstad', 'haarlemmermeer', + 's-hertogenbosch', 'hertogenbosch', 'den-bosch', 'zwolle', 'zoetermeer', + 'leiden', 'maastricht', 'dordrecht', 'ede', 'delft', 'alkmaar', 'venlo', + 'deventer', 'hilversum', 'heerlen', 'leeuwarden', 'lelystad', 'roosendaal', + 'middelburg', 'oss', 'helmond', 'almelo', 'gouda', 'vlissingen', 'hoorn' +] + +# Map filename city patterns to GeoNames search names +CITY_FILENAME_MAP = { + 'den-haag': 'The Hague', + 'the-hague': 'The Hague', + 's-hertogenbosch': "'s-Hertogenbosch", + 'hertogenbosch': "'s-Hertogenbosch", + 'den-bosch': "'s-Hertogenbosch", +} + +# Institution type mapping from institution_type field +INST_TYPE_MAP = { + 'ARCHIVE': 'A', + 'BOTANICAL_ZOO': 'B', + 'CORPORATION': 'C', + 'DIGITAL_PLATFORM': 'D', + 'EDUCATION_PROVIDER': 'E', + 'FEATURES': 'F', + 'GALLERY': 'G', + 'HOLY_SITES': 'H', + 'INTANGIBLE_HERITAGE_GROUP': 'I', + 'LIBRARY': 'L', + 'MUSEUM': 'M', + 'NGO': 'N', + 'OFFICIAL_INSTITUTION': 'O', + 'PERSONAL_COLLECTION': 'P', + 'RESEARCH_CENTER': 'R', + 'COLLECTING_SOCIETY': 'S', + 'TASTE_SMELL': 'T', + 'UNKNOWN': 'U', + 'MIXED': 'X', +} + +# Valid feature codes for settlements (not neighborhoods) +VALID_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') + + +def extract_city_from_filename(filename: str) -> Optional[str]: + """Extract Dutch city name from PENDING filename.""" + # Remove extension and prefix + name = filename.replace('.yaml', '').replace('NL-XX-XXX-PENDING-', '') + name_lower = name.lower() + + # Check each city + for city in DUTCH_CITIES: + # Check if city appears as a word boundary in filename + pattern = rf'(^|-)({re.escape(city)})(-|$)' + if re.search(pattern, name_lower): + # Map to proper GeoNames name + if city in CITY_FILENAME_MAP: + return CITY_FILENAME_MAP[city] + # Capitalize properly + return city.replace('-', ' ').title() + + return None + + +def lookup_city_geonames(db_path: str, city_name: str, country_code: str = 'NL') -> Optional[dict]: + """Look up city in GeoNames database.""" + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # Normalize city name for search + search_name = city_name.replace("'s-", "s-").replace("'", "") + + # Try exact match first + cursor.execute(""" + SELECT geonames_id, name, ascii_name, admin1_code, admin1_name, + latitude, longitude, population, feature_code + FROM cities + WHERE country_code = ? + AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?)) + AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?) + ORDER BY population DESC + LIMIT 1 + """, (country_code, city_name, city_name) + VALID_FEATURE_CODES) + + row = cursor.fetchone() + + if not row: + # Try with normalized name + cursor.execute(""" + SELECT geonames_id, name, ascii_name, admin1_code, admin1_name, + latitude, longitude, population, feature_code + FROM cities + WHERE country_code = ? + AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?)) + AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?) + ORDER BY population DESC + LIMIT 1 + """, (country_code, search_name, search_name) + VALID_FEATURE_CODES) + row = cursor.fetchone() + + if not row: + # Try fuzzy match + cursor.execute(""" + SELECT geonames_id, name, ascii_name, admin1_code, admin1_name, + latitude, longitude, population, feature_code + FROM cities + WHERE country_code = ? + AND (LOWER(name) LIKE LOWER(?) OR LOWER(ascii_name) LIKE LOWER(?)) + AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?) + ORDER BY population DESC + LIMIT 1 + """, (country_code, f"%{city_name}%", f"%{city_name}%") + VALID_FEATURE_CODES) + row = cursor.fetchone() + + conn.close() + + if row: + admin1_code = row[3] or '' + province_code = ADMIN1_TO_PROVINCE.get(admin1_code, 'XX') + + return { + 'geonames_id': row[0], + 'name': row[1], + 'ascii_name': row[2], + 'admin1_code': admin1_code, + 'admin1_name': row[4], + 'province_code': province_code, + 'latitude': row[5], + 'longitude': row[6], + 'population': row[7], + 'feature_code': row[8], + } + + return None + + +def generate_city_code(city_name: str) -> str: + """Generate 3-letter city code from city name.""" + # Special mappings + special_codes = { + "'s-Hertogenbosch": "SHE", + "The Hague": "DHA", + "'s-Gravenhage": "SGR", + } + + if city_name in special_codes: + return special_codes[city_name] + + # Handle Dutch articles and prefixes + name = city_name.replace("'", "").replace("-", " ") + words = name.split() + + if len(words) == 1: + return words[0][:3].upper() + elif len(words) >= 2: + dutch_articles = ['de', 'het', 'den', "'s"] + if words[0].lower() in dutch_articles: + return (words[0][0] + words[1][:2]).upper() + else: + initials = ''.join(w[0] for w in words[:3]) + return initials.upper() + + return city_name[:3].upper() + + +def generate_abbreviation(emic_name: str) -> str: + """Generate abbreviation from emic name.""" + # Skip words (articles, prepositions) + skip_words = {'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', + 'des', "'s", 'aan', 'bij', 'met', 'naar', 'om', 'tot', 'uit', + 'over', 'onder', 'door', 'en', 'of', 'the', 'a', 'an', 'of', + 'and', 'or', 'for', 'to', 'at', 'by', 'with', 'from'} + + # Clean name + name = re.sub(r'[^\w\s]', '', emic_name) + words = name.split() + + # Take first letter of significant words + initials = [] + for word in words: + if word.lower() not in skip_words and word: + initials.append(word[0].upper()) + + abbrev = ''.join(initials[:10]) # Max 10 chars + return abbrev if abbrev else emic_name[:3].upper() + + +def generate_ghcid_identifiers(ghcid_string: str) -> dict: + """Generate all GHCID identifier formats.""" + # UUID v5 (SHA-1) - PRIMARY + uuid_v5 = uuid.uuid5(GHCID_NAMESPACE, ghcid_string) + + # UUID v8 (SHA-256) - Secondary + sha256_hash = hashlib.sha256(ghcid_string.encode()).digest()[:16] + sha256_hash = bytearray(sha256_hash) + sha256_hash[6] = (sha256_hash[6] & 0x0F) | 0x80 # Version 8 + sha256_hash[8] = (sha256_hash[8] & 0x3F) | 0x80 # Variant + uuid_sha256 = uuid.UUID(bytes=bytes(sha256_hash)) + + # Numeric (64-bit from SHA-256) + full_hash = hashlib.sha256(ghcid_string.encode()).digest() + numeric = int.from_bytes(full_hash[:8], 'big') + + return { + 'ghcid_uuid': str(uuid_v5), + 'ghcid_uuid_sha256': str(uuid_sha256), + 'ghcid_numeric': str(numeric), + } + + +def update_yaml_with_location(data: dict, geonames_data: dict, new_ghcid: str, + old_ghcid: str, timestamp: str) -> dict: + """Update YAML data with location and GHCID information.""" + identifiers = generate_ghcid_identifiers(new_ghcid) + + # Add locations array if missing + if 'locations' not in data or not data['locations']: + data['locations'] = [] + + # Add location entry + location_entry = { + 'city': geonames_data['name'], + 'region_code': geonames_data['province_code'], + 'country': 'NL', + 'geonames_id': geonames_data['geonames_id'], + 'latitude': geonames_data['latitude'], + 'longitude': geonames_data['longitude'], + } + + # Only add if not already present + existing_cities = [loc.get('city') for loc in data['locations']] + if geonames_data['name'] not in existing_cities: + data['locations'].insert(0, location_entry) + + # Update ghcid section + if 'ghcid' not in data: + data['ghcid'] = {} + + ghcid_section = data['ghcid'] + ghcid_section['ghcid_current'] = new_ghcid + ghcid_section['ghcid_uuid'] = identifiers['ghcid_uuid'] + ghcid_section['ghcid_uuid_sha256'] = identifiers['ghcid_uuid_sha256'] + ghcid_section['ghcid_numeric'] = int(identifiers['ghcid_numeric']) + ghcid_section['generation_timestamp'] = timestamp + + # Update location_resolution + ghcid_section['location_resolution'] = { + 'method': 'FILENAME_CITY_EXTRACTION', + 'geonames_id': geonames_data['geonames_id'], + 'geonames_name': geonames_data['name'], + 'feature_code': geonames_data['feature_code'], + 'population': geonames_data['population'], + 'admin1_code': geonames_data['admin1_code'], + 'region_code': geonames_data['province_code'], + 'country_code': 'NL', + } + + # Update ghcid_history + if 'ghcid_history' not in ghcid_section: + ghcid_section['ghcid_history'] = [] + + ghcid_section['ghcid_history'].append({ + 'ghcid': new_ghcid, + 'ghcid_numeric': int(identifiers['ghcid_numeric']), + 'valid_from': timestamp, + 'valid_to': None, + 'reason': f"Location resolved from filename: {old_ghcid} -> {new_ghcid}", + }) + + # Update top-level ghcid_current + data['ghcid_current'] = new_ghcid + + # Add provenance note + if 'provenance' in data: + if 'notes' not in data['provenance']: + data['provenance']['notes'] = '' + notes = data['provenance'].get('notes', '') + if isinstance(notes, str): + data['provenance']['notes'] = notes + f"\nLocation resolved from filename on {timestamp}." + + return data + + +def check_collision(custodian_dir: Path, new_ghcid: str, old_filepath: Path) -> bool: + """Check if the new GHCID would collide with an existing file.""" + new_filepath = custodian_dir / f"{new_ghcid}.yaml" + return new_filepath.exists() and new_filepath != old_filepath + + +def find_resolvable_pending_files(custodian_dir: Path, db_path: str) -> List[dict]: + """Find PENDING files that can be resolved via filename city extraction.""" + resolvable = [] + + for filepath in sorted(custodian_dir.glob('NL-XX-XXX-PENDING-*.yaml')): + filename = filepath.name + + # Try to extract city from filename + city = extract_city_from_filename(filename) + if not city: + continue + + # Look up city in GeoNames + geonames_data = lookup_city_geonames(db_path, city) + if not geonames_data: + print(f"WARNING: Could not find GeoNames data for '{city}' extracted from {filename}") + continue + + # Load YAML to get institution type and emic name + try: + with open(filepath, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) + except Exception as e: + print(f"Error loading {filepath}: {e}") + continue + + # Get institution type code + inst_type_str = data.get('institution_type', 'UNKNOWN') + inst_type_code = INST_TYPE_MAP.get(inst_type_str, 'U') + + # Get emic name for abbreviation + emic_name = data.get('custodian_name', {}).get('emic_name', '') + if not emic_name: + emic_name = filename.replace('NL-XX-XXX-PENDING-', '').replace('.yaml', '').replace('-', ' ') + + # Generate abbreviation + abbrev = generate_abbreviation(emic_name) + + # Build new GHCID + city_code = generate_city_code(geonames_data['name']) + new_ghcid = f"NL-{geonames_data['province_code']}-{city_code}-{inst_type_code}-{abbrev}" + + resolvable.append({ + 'filepath': filepath, + 'old_ghcid': filename.replace('.yaml', ''), + 'new_ghcid': new_ghcid, + 'city': city, + 'geonames_data': geonames_data, + 'data': data, + 'emic_name': emic_name, + }) + + return resolvable + + +def main(): + parser = argparse.ArgumentParser(description='Resolve location data for PENDING custodian files') + parser.add_argument('--dry-run', action='store_true', help='Preview changes without applying') + parser.add_argument('--custodian-dir', default='data/custodian', help='Path to custodian directory') + parser.add_argument('--geonames-db', default='data/reference/geonames.db', help='Path to GeoNames database') + parser.add_argument('--limit', type=int, default=0, help='Limit number of files to process (0 = no limit)') + args = parser.parse_args() + + # Resolve paths + script_dir = Path(__file__).parent.parent + custodian_dir = script_dir / args.custodian_dir + db_path = script_dir / args.geonames_db + + if not custodian_dir.exists(): + print(f"ERROR: Custodian directory not found: {custodian_dir}") + return 1 + + if not db_path.exists(): + print(f"ERROR: GeoNames database not found: {db_path}") + return 1 + + print("=" * 80) + print("PENDING File Location Resolver") + print("=" * 80) + print(f"Custodian directory: {custodian_dir}") + print(f"GeoNames database: {db_path}") + print(f"Mode: {'DRY RUN (preview only)' if args.dry_run else 'LIVE (applying changes)'}") + if args.limit: + print(f"Limit: {args.limit} files") + print() + + # Find resolvable files + print("Scanning for PENDING files with city names in filename...") + resolvable = find_resolvable_pending_files(custodian_dir, str(db_path)) + + if args.limit: + resolvable = resolvable[:args.limit] + + print(f"Found {len(resolvable)} files that can be resolved") + print() + + if not resolvable: + print("No resolvable files found. Exiting.") + return 0 + + # Generate timestamp for all updates + timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + + # Process each file + resolved_count = 0 + skipped_count = 0 + errors = [] + + for item in resolvable: + old_ghcid = item['old_ghcid'] + new_ghcid = item['new_ghcid'] + city = item['city'] + filepath = item['filepath'] + geonames_data = item['geonames_data'] + data = item['data'] + emic_name = item['emic_name'] + + print(f"{'[DRY RUN] ' if args.dry_run else ''}Processing: {old_ghcid}") + print(f" Emic name: {emic_name}") + print(f" City extracted: {city}") + print(f" Province: {geonames_data['province_code']} ({geonames_data.get('admin1_name', 'Unknown')})") + print(f" New GHCID: {new_ghcid}") + + # Check for collision + if check_collision(custodian_dir, new_ghcid, filepath): + print(f" SKIPPED: Collision - {new_ghcid}.yaml already exists") + skipped_count += 1 + print() + continue + + # Generate identifiers for display + identifiers = generate_ghcid_identifiers(new_ghcid) + print(f" UUID v5: {identifiers['ghcid_uuid']}") + + if args.dry_run: + print(f" Would rename: {filepath.name} -> {new_ghcid}.yaml") + print() + resolved_count += 1 + continue + + try: + # Update YAML data + updated_data = update_yaml_with_location( + data, geonames_data, new_ghcid, old_ghcid, timestamp + ) + + # Write updated YAML to new file + new_filepath = filepath.parent / f"{new_ghcid}.yaml" + + with open(new_filepath, 'w', encoding='utf-8') as f: + yaml.dump(updated_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) + + # Remove old file + filepath.unlink() + print(f" Renamed: {filepath.name} -> {new_filepath.name}") + + resolved_count += 1 + + except Exception as e: + error_msg = f"Error processing {filepath}: {e}" + print(f" ERROR: {e}") + errors.append(error_msg) + + print() + + # Summary + print("=" * 80) + print("SUMMARY") + print("=" * 80) + print(f"Total resolvable files: {len(resolvable)}") + print(f"Successfully {'would resolve' if args.dry_run else 'resolved'}: {resolved_count}") + print(f"Skipped (collisions): {skipped_count}") + print(f"Errors: {len(errors)}") + + if errors: + print("\nErrors:") + for error in errors: + print(f" - {error}") + + if args.dry_run: + print("\nThis was a dry run. Run without --dry-run to apply changes.") + + return 0 if not errors else 1 + + +if __name__ == '__main__': + exit(main())