glam/scripts/resolve_cities_from_file_coords.py
kempersc e45c1a3c85 feat(scripts): add city enrichment and location resolution utilities
Enrichment scripts for country-specific city data:
- enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py
- enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py
- enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py

Location resolution utilities:
- resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames
- resolve_cities_wikidata.py - Use Wikidata P131 for city resolution
- resolve_country_codes.py - Standardize country codes
- resolve_cz_xx_regions.py - Fix Czech XX region codes
- resolve_locations_by_name.py - Name-based location lookup
- resolve_regions_from_city.py - Derive regions from city data
- update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data

CH-Annotator integration:
- create_custodian_from_ch_annotator.py - Create custodians from annotations
- add_ch_annotator_location_claims.py - Add location claims
- extract_locations_ch_annotator.py - Extract locations from annotations

Migration and fixes:
- migrate_egyptian_from_ch.py - Migrate Egyptian data
- migrate_web_archives.py - Migrate web archive data
- fix_belgian_cities.py - Fix Belgian city data
2025-12-07 14:26:59 +01:00

301 lines
9.5 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Resolve XXX city codes using coordinates already in the file (locations[].latitude/longitude).
This script handles files that already have coordinates but haven't been geocoded yet.
Following AGENTS.md Rules:
- Rule 5: Additive only - never delete existing data
- GHCID settlement standardization: GeoNames is authoritative
"""
import os
import sys
import yaml
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List
# GeoNames database
GEONAMES_DB = Path(__file__).parent.parent / "data/reference/geonames.db"
CUSTODIAN_DIR = Path(__file__).parent.parent / "data/custodian"
# Feature codes for proper settlements (EXCLUDE PPLX neighborhoods)
SETTLEMENT_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
# Netherlands admin1 code mapping
NL_ADMIN1_MAP = {
'01': 'DR', '02': 'FR', '03': 'GE', '04': 'GR', '05': 'LI',
'06': 'NB', '07': 'NH', '09': 'UT', '10': 'ZE', '11': 'ZH',
'15': 'OV', '16': 'FL'
}
# Belgian admin2 to ISO mapping
BE_ADMIN2_MAP = {
'VAN': 'VAN', 'VBR': 'VBR', 'VLI': 'VLI', 'VOV': 'VOV', 'VWV': 'VWV',
'WBR': 'WBR', 'WHT': 'WHT', 'WLG': 'WLG', 'WLX': 'WLX', 'WNA': 'WNA', 'BRU': 'BRU'
}
def generate_city_code(name: str) -> str:
"""Generate 2-4 letter city code from name."""
import re
import unicodedata
# Normalize unicode
normalized = unicodedata.normalize('NFD', name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Remove special characters
clean = re.sub(r'[^a-zA-Z\s-]', '', ascii_name)
words = clean.split()
if not words:
return 'XXX'
# Dutch articles
dutch_articles = {'de', 'het', 'den', "'s", 's'}
if len(words) == 1:
# Single word: take first 3 letters
return words[0][:3].upper()
elif words[0].lower() in dutch_articles:
# Article + word: D + first 2 letters of main word
return (words[0][0] + words[1][:2]).upper()
else:
# Multi-word: initials
initials = ''.join(w[0] for w in words[:3])
return initials.upper()
def reverse_geocode(lat: float, lon: float, country: str, conn: sqlite3.Connection) -> Optional[Dict]:
"""Reverse geocode coordinates to nearest city in GeoNames."""
cursor = conn.cursor()
cursor.execute(f'''
SELECT geonames_id, name, ascii_name, admin1_code, admin2_code,
latitude, longitude, feature_code, population
FROM cities
WHERE country_code = ?
AND feature_code IN {SETTLEMENT_FEATURE_CODES}
ORDER BY ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?))
LIMIT 1
''', (country, lat, lat, lon, lon))
row = cursor.fetchone()
if not row:
return None
return {
'geonames_id': row[0],
'name': row[1],
'ascii_name': row[2],
'admin1_code': row[3],
'admin2_code': row[4],
'latitude': row[5],
'longitude': row[6],
'feature_code': row[7],
'population': row[8],
}
def get_region_code(country: str, admin1_code: str, admin2_code: str) -> str:
"""Get ISO 3166-2 region code from admin codes."""
if country == 'NL':
return NL_ADMIN1_MAP.get(admin1_code, 'XX')
elif country == 'BE':
return BE_ADMIN2_MAP.get(admin2_code, admin1_code if admin1_code else 'XX')
else:
return admin1_code if admin1_code else 'XX'
def find_coords_in_file(data: Dict) -> Optional[tuple]:
"""Find latitude/longitude in file data."""
# Check original_entry.locations
if 'original_entry' in data:
locations = data['original_entry'].get('locations', [])
for loc in locations:
if 'latitude' in loc and 'longitude' in loc:
country = loc.get('country', data.get('ghcid', {}).get('location_resolution', {}).get('country_code', 'XX'))
return (loc['latitude'], loc['longitude'], country)
# Check top-level locations
locations = data.get('locations', [])
for loc in locations:
if 'latitude' in loc and 'longitude' in loc:
country = loc.get('country', 'XX')
return (loc['latitude'], loc['longitude'], country)
return None
def process_file(filepath: Path, conn: sqlite3.Connection, apply: bool) -> bool:
"""Process a single file with XXX city code and coordinates."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
print(f" Error reading {filepath}: {e}")
return False
if not data:
return False
# Get coordinates from file
coords = find_coords_in_file(data)
if not coords:
return False
lat, lon, country = coords
print(f" Coords: {lat:.4f}, {lon:.4f} ({country})")
# Reverse geocode
city_data = reverse_geocode(lat, lon, country, conn)
if not city_data:
print(f" No GeoNames match for {country}")
return False
city_code = generate_city_code(city_data['ascii_name'])
region_code = get_region_code(country, city_data['admin1_code'], city_data.get('admin2_code', ''))
print(f" City: {city_data['name']} ({city_code}), Region: {region_code}")
if not apply:
return True
# Update GHCID
ghcid = data.get('ghcid', {})
current = ghcid.get('ghcid_current', '')
# Parse current GHCID
parts = current.split('-')
if len(parts) < 5:
print(f" Invalid GHCID format: {current}")
return False
# Update city code (and region if still XX)
old_region = parts[1]
old_city = parts[2]
if old_city != 'XXX':
print(f" City already resolved: {old_city}")
return False
# Update parts
if old_region == 'XX' and region_code != 'XX':
parts[1] = region_code
parts[2] = city_code
new_ghcid = '-'.join(parts)
# Update data
ghcid['ghcid_current'] = new_ghcid
loc_res = ghcid.get('location_resolution', {})
loc_res['city_code'] = city_code
loc_res['city_name'] = city_data['name']
loc_res['geonames_id'] = city_data['geonames_id']
loc_res['feature_code'] = city_data['feature_code']
if old_region == 'XX' and region_code != 'XX':
loc_res['region_code'] = region_code
loc_res['method'] = 'REVERSE_GEOCODE_FROM_FILE_COORDS'
loc_res['resolution_timestamp'] = datetime.now(timezone.utc).isoformat()
ghcid['location_resolution'] = loc_res
# Add to history
history = ghcid.get('ghcid_history', [])
history.append({
'ghcid': new_ghcid,
'valid_from': datetime.now(timezone.utc).isoformat(),
'reason': f'City resolved via reverse geocoding: XXX->{city_code} ({city_data["name"]})'
})
ghcid['ghcid_history'] = history
data['ghcid'] = ghcid
# Calculate new filename
old_name = filepath.name
new_name = old_name.replace(f'{old_region}-XXX', f'{parts[1]}-{city_code}')
if old_region != 'XX' or region_code == 'XX':
new_name = old_name.replace('-XXX-', f'-{city_code}-')
new_path = filepath.parent / new_name
# Write and rename
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
if new_path != filepath:
filepath.rename(new_path)
print(f" Renamed: {old_name} -> {new_name}")
return True
def main():
import argparse
parser = argparse.ArgumentParser(description='Resolve XXX city codes using coordinates in files')
parser.add_argument('--limit', type=int, default=100, help='Max files to process')
parser.add_argument('--apply', action='store_true', help='Apply changes (default: dry run)')
parser.add_argument('--country', help='Filter by country code')
args = parser.parse_args()
print("=" * 70)
print("CITY RESOLUTION FROM FILE COORDINATES")
print("=" * 70)
print(f"Mode: {'APPLYING CHANGES' if args.apply else 'DRY RUN'}")
print()
# Connect to GeoNames
if not GEONAMES_DB.exists():
print(f"ERROR: GeoNames database not found: {GEONAMES_DB}")
sys.exit(1)
conn = sqlite3.connect(str(GEONAMES_DB))
# Find XXX files with coordinates
xxx_files = []
for f in CUSTODIAN_DIR.glob('*.yaml'):
if '-XXX-' in f.name:
if args.country and not f.name.startswith(f'{args.country}-'):
continue
xxx_files.append(f)
print(f"Found {len(xxx_files)} files with XXX codes")
# Filter to files with coordinates
files_with_coords = []
for f in xxx_files:
try:
with open(f, 'r', encoding='utf-8') as fp:
content = fp.read()
if 'latitude:' in content and 'longitude:' in content:
files_with_coords.append(f)
except:
pass
print(f"Processing {min(len(files_with_coords), args.limit)} files with coordinates")
print()
resolved = 0
renamed = 0
for f in files_with_coords[:args.limit]:
print(f"Processing {f.name}...")
if process_file(f, conn, args.apply):
resolved += 1
if args.apply:
renamed += 1
conn.close()
print()
print("=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"Files processed: {min(len(files_with_coords), args.limit)}")
print(f"Resolved: {resolved}")
print(f"Renamed: {renamed}")
if __name__ == '__main__':
main()