#!/usr/bin/env python3 """ Resolve XXX city codes using coordinates already in the file (locations[].latitude/longitude). This script handles files that already have coordinates but haven't been geocoded yet. Following AGENTS.md Rules: - Rule 5: Additive only - never delete existing data - GHCID settlement standardization: GeoNames is authoritative """ import os import sys import yaml import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any, List # GeoNames database GEONAMES_DB = Path(__file__).parent.parent / "data/reference/geonames.db" CUSTODIAN_DIR = Path(__file__).parent.parent / "data/custodian" # Feature codes for proper settlements (EXCLUDE PPLX neighborhoods) SETTLEMENT_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') # Netherlands admin1 code mapping NL_ADMIN1_MAP = { '01': 'DR', '02': 'FR', '03': 'GE', '04': 'GR', '05': 'LI', '06': 'NB', '07': 'NH', '09': 'UT', '10': 'ZE', '11': 'ZH', '15': 'OV', '16': 'FL' } # Belgian admin2 to ISO mapping BE_ADMIN2_MAP = { 'VAN': 'VAN', 'VBR': 'VBR', 'VLI': 'VLI', 'VOV': 'VOV', 'VWV': 'VWV', 'WBR': 'WBR', 'WHT': 'WHT', 'WLG': 'WLG', 'WLX': 'WLX', 'WNA': 'WNA', 'BRU': 'BRU' } def generate_city_code(name: str) -> str: """Generate 2-4 letter city code from name.""" import re import unicodedata # Normalize unicode normalized = unicodedata.normalize('NFD', name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Remove special characters clean = re.sub(r'[^a-zA-Z\s-]', '', ascii_name) words = clean.split() if not words: return 'XXX' # Dutch articles dutch_articles = {'de', 'het', 'den', "'s", 's'} if len(words) == 1: # Single word: take first 3 letters return words[0][:3].upper() elif words[0].lower() in dutch_articles: # Article + word: D + first 2 letters of main word return (words[0][0] + words[1][:2]).upper() else: # Multi-word: initials initials = ''.join(w[0] for w in words[:3]) return initials.upper() def reverse_geocode(lat: float, lon: float, country: str, conn: sqlite3.Connection) -> Optional[Dict]: """Reverse geocode coordinates to nearest city in GeoNames.""" cursor = conn.cursor() cursor.execute(f''' SELECT geonames_id, name, ascii_name, admin1_code, admin2_code, latitude, longitude, feature_code, population FROM cities WHERE country_code = ? AND feature_code IN {SETTLEMENT_FEATURE_CODES} ORDER BY ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) LIMIT 1 ''', (country, lat, lat, lon, lon)) row = cursor.fetchone() if not row: return None return { 'geonames_id': row[0], 'name': row[1], 'ascii_name': row[2], 'admin1_code': row[3], 'admin2_code': row[4], 'latitude': row[5], 'longitude': row[6], 'feature_code': row[7], 'population': row[8], } def get_region_code(country: str, admin1_code: str, admin2_code: str) -> str: """Get ISO 3166-2 region code from admin codes.""" if country == 'NL': return NL_ADMIN1_MAP.get(admin1_code, 'XX') elif country == 'BE': return BE_ADMIN2_MAP.get(admin2_code, admin1_code if admin1_code else 'XX') else: return admin1_code if admin1_code else 'XX' def find_coords_in_file(data: Dict) -> Optional[tuple]: """Find latitude/longitude in file data.""" # Check original_entry.locations if 'original_entry' in data: locations = data['original_entry'].get('locations', []) for loc in locations: if 'latitude' in loc and 'longitude' in loc: country = loc.get('country', data.get('ghcid', {}).get('location_resolution', {}).get('country_code', 'XX')) return (loc['latitude'], loc['longitude'], country) # Check top-level locations locations = data.get('locations', []) for loc in locations: if 'latitude' in loc and 'longitude' in loc: country = loc.get('country', 'XX') return (loc['latitude'], loc['longitude'], country) return None def process_file(filepath: Path, conn: sqlite3.Connection, apply: bool) -> bool: """Process a single file with XXX city code and coordinates.""" try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: print(f" Error reading {filepath}: {e}") return False if not data: return False # Get coordinates from file coords = find_coords_in_file(data) if not coords: return False lat, lon, country = coords print(f" Coords: {lat:.4f}, {lon:.4f} ({country})") # Reverse geocode city_data = reverse_geocode(lat, lon, country, conn) if not city_data: print(f" No GeoNames match for {country}") return False city_code = generate_city_code(city_data['ascii_name']) region_code = get_region_code(country, city_data['admin1_code'], city_data.get('admin2_code', '')) print(f" City: {city_data['name']} ({city_code}), Region: {region_code}") if not apply: return True # Update GHCID ghcid = data.get('ghcid', {}) current = ghcid.get('ghcid_current', '') # Parse current GHCID parts = current.split('-') if len(parts) < 5: print(f" Invalid GHCID format: {current}") return False # Update city code (and region if still XX) old_region = parts[1] old_city = parts[2] if old_city != 'XXX': print(f" City already resolved: {old_city}") return False # Update parts if old_region == 'XX' and region_code != 'XX': parts[1] = region_code parts[2] = city_code new_ghcid = '-'.join(parts) # Update data ghcid['ghcid_current'] = new_ghcid loc_res = ghcid.get('location_resolution', {}) loc_res['city_code'] = city_code loc_res['city_name'] = city_data['name'] loc_res['geonames_id'] = city_data['geonames_id'] loc_res['feature_code'] = city_data['feature_code'] if old_region == 'XX' and region_code != 'XX': loc_res['region_code'] = region_code loc_res['method'] = 'REVERSE_GEOCODE_FROM_FILE_COORDS' loc_res['resolution_timestamp'] = datetime.now(timezone.utc).isoformat() ghcid['location_resolution'] = loc_res # Add to history history = ghcid.get('ghcid_history', []) history.append({ 'ghcid': new_ghcid, 'valid_from': datetime.now(timezone.utc).isoformat(), 'reason': f'City resolved via reverse geocoding: XXX->{city_code} ({city_data["name"]})' }) ghcid['ghcid_history'] = history data['ghcid'] = ghcid # Calculate new filename old_name = filepath.name new_name = old_name.replace(f'{old_region}-XXX', f'{parts[1]}-{city_code}') if old_region != 'XX' or region_code == 'XX': new_name = old_name.replace('-XXX-', f'-{city_code}-') new_path = filepath.parent / new_name # Write and rename with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) if new_path != filepath: filepath.rename(new_path) print(f" Renamed: {old_name} -> {new_name}") return True def main(): import argparse parser = argparse.ArgumentParser(description='Resolve XXX city codes using coordinates in files') parser.add_argument('--limit', type=int, default=100, help='Max files to process') parser.add_argument('--apply', action='store_true', help='Apply changes (default: dry run)') parser.add_argument('--country', help='Filter by country code') args = parser.parse_args() print("=" * 70) print("CITY RESOLUTION FROM FILE COORDINATES") print("=" * 70) print(f"Mode: {'APPLYING CHANGES' if args.apply else 'DRY RUN'}") print() # Connect to GeoNames if not GEONAMES_DB.exists(): print(f"ERROR: GeoNames database not found: {GEONAMES_DB}") sys.exit(1) conn = sqlite3.connect(str(GEONAMES_DB)) # Find XXX files with coordinates xxx_files = [] for f in CUSTODIAN_DIR.glob('*.yaml'): if '-XXX-' in f.name: if args.country and not f.name.startswith(f'{args.country}-'): continue xxx_files.append(f) print(f"Found {len(xxx_files)} files with XXX codes") # Filter to files with coordinates files_with_coords = [] for f in xxx_files: try: with open(f, 'r', encoding='utf-8') as fp: content = fp.read() if 'latitude:' in content and 'longitude:' in content: files_with_coords.append(f) except: pass print(f"Processing {min(len(files_with_coords), args.limit)} files with coordinates") print() resolved = 0 renamed = 0 for f in files_with_coords[:args.limit]: print(f"Processing {f.name}...") if process_file(f, conn, args.apply): resolved += 1 if args.apply: renamed += 1 conn.close() print() print("=" * 70) print("SUMMARY") print("=" * 70) print(f"Files processed: {min(len(files_with_coords), args.limit)}") print(f"Resolved: {resolved}") print(f"Renamed: {renamed}") if __name__ == '__main__': main()