Enrichment scripts for country-specific city data: - enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py - enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py - enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py Location resolution utilities: - resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames - resolve_cities_wikidata.py - Use Wikidata P131 for city resolution - resolve_country_codes.py - Standardize country codes - resolve_cz_xx_regions.py - Fix Czech XX region codes - resolve_locations_by_name.py - Name-based location lookup - resolve_regions_from_city.py - Derive regions from city data - update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data CH-Annotator integration: - create_custodian_from_ch_annotator.py - Create custodians from annotations - add_ch_annotator_location_claims.py - Add location claims - extract_locations_ch_annotator.py - Extract locations from annotations Migration and fixes: - migrate_egyptian_from_ch.py - Migrate Egyptian data - migrate_web_archives.py - Migrate web archive data - fix_belgian_cities.py - Fix Belgian city data
301 lines
9.5 KiB
Python
Executable file
301 lines
9.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Resolve XXX city codes using coordinates already in the file (locations[].latitude/longitude).
|
|
|
|
This script handles files that already have coordinates but haven't been geocoded yet.
|
|
|
|
Following AGENTS.md Rules:
|
|
- Rule 5: Additive only - never delete existing data
|
|
- GHCID settlement standardization: GeoNames is authoritative
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import yaml
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
# GeoNames database
|
|
GEONAMES_DB = Path(__file__).parent.parent / "data/reference/geonames.db"
|
|
CUSTODIAN_DIR = Path(__file__).parent.parent / "data/custodian"
|
|
|
|
# Feature codes for proper settlements (EXCLUDE PPLX neighborhoods)
|
|
SETTLEMENT_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
|
|
|
|
# Netherlands admin1 code mapping
|
|
NL_ADMIN1_MAP = {
|
|
'01': 'DR', '02': 'FR', '03': 'GE', '04': 'GR', '05': 'LI',
|
|
'06': 'NB', '07': 'NH', '09': 'UT', '10': 'ZE', '11': 'ZH',
|
|
'15': 'OV', '16': 'FL'
|
|
}
|
|
|
|
# Belgian admin2 to ISO mapping
|
|
BE_ADMIN2_MAP = {
|
|
'VAN': 'VAN', 'VBR': 'VBR', 'VLI': 'VLI', 'VOV': 'VOV', 'VWV': 'VWV',
|
|
'WBR': 'WBR', 'WHT': 'WHT', 'WLG': 'WLG', 'WLX': 'WLX', 'WNA': 'WNA', 'BRU': 'BRU'
|
|
}
|
|
|
|
|
|
def generate_city_code(name: str) -> str:
|
|
"""Generate 2-4 letter city code from name."""
|
|
import re
|
|
import unicodedata
|
|
|
|
# Normalize unicode
|
|
normalized = unicodedata.normalize('NFD', name)
|
|
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
|
|
# Remove special characters
|
|
clean = re.sub(r'[^a-zA-Z\s-]', '', ascii_name)
|
|
words = clean.split()
|
|
|
|
if not words:
|
|
return 'XXX'
|
|
|
|
# Dutch articles
|
|
dutch_articles = {'de', 'het', 'den', "'s", 's'}
|
|
|
|
if len(words) == 1:
|
|
# Single word: take first 3 letters
|
|
return words[0][:3].upper()
|
|
elif words[0].lower() in dutch_articles:
|
|
# Article + word: D + first 2 letters of main word
|
|
return (words[0][0] + words[1][:2]).upper()
|
|
else:
|
|
# Multi-word: initials
|
|
initials = ''.join(w[0] for w in words[:3])
|
|
return initials.upper()
|
|
|
|
|
|
def reverse_geocode(lat: float, lon: float, country: str, conn: sqlite3.Connection) -> Optional[Dict]:
|
|
"""Reverse geocode coordinates to nearest city in GeoNames."""
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(f'''
|
|
SELECT geonames_id, name, ascii_name, admin1_code, admin2_code,
|
|
latitude, longitude, feature_code, population
|
|
FROM cities
|
|
WHERE country_code = ?
|
|
AND feature_code IN {SETTLEMENT_FEATURE_CODES}
|
|
ORDER BY ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?))
|
|
LIMIT 1
|
|
''', (country, lat, lat, lon, lon))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
|
|
return {
|
|
'geonames_id': row[0],
|
|
'name': row[1],
|
|
'ascii_name': row[2],
|
|
'admin1_code': row[3],
|
|
'admin2_code': row[4],
|
|
'latitude': row[5],
|
|
'longitude': row[6],
|
|
'feature_code': row[7],
|
|
'population': row[8],
|
|
}
|
|
|
|
|
|
def get_region_code(country: str, admin1_code: str, admin2_code: str) -> str:
|
|
"""Get ISO 3166-2 region code from admin codes."""
|
|
if country == 'NL':
|
|
return NL_ADMIN1_MAP.get(admin1_code, 'XX')
|
|
elif country == 'BE':
|
|
return BE_ADMIN2_MAP.get(admin2_code, admin1_code if admin1_code else 'XX')
|
|
else:
|
|
return admin1_code if admin1_code else 'XX'
|
|
|
|
|
|
def find_coords_in_file(data: Dict) -> Optional[tuple]:
|
|
"""Find latitude/longitude in file data."""
|
|
# Check original_entry.locations
|
|
if 'original_entry' in data:
|
|
locations = data['original_entry'].get('locations', [])
|
|
for loc in locations:
|
|
if 'latitude' in loc and 'longitude' in loc:
|
|
country = loc.get('country', data.get('ghcid', {}).get('location_resolution', {}).get('country_code', 'XX'))
|
|
return (loc['latitude'], loc['longitude'], country)
|
|
|
|
# Check top-level locations
|
|
locations = data.get('locations', [])
|
|
for loc in locations:
|
|
if 'latitude' in loc and 'longitude' in loc:
|
|
country = loc.get('country', 'XX')
|
|
return (loc['latitude'], loc['longitude'], country)
|
|
|
|
return None
|
|
|
|
|
|
def process_file(filepath: Path, conn: sqlite3.Connection, apply: bool) -> bool:
|
|
"""Process a single file with XXX city code and coordinates."""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f" Error reading {filepath}: {e}")
|
|
return False
|
|
|
|
if not data:
|
|
return False
|
|
|
|
# Get coordinates from file
|
|
coords = find_coords_in_file(data)
|
|
if not coords:
|
|
return False
|
|
|
|
lat, lon, country = coords
|
|
print(f" Coords: {lat:.4f}, {lon:.4f} ({country})")
|
|
|
|
# Reverse geocode
|
|
city_data = reverse_geocode(lat, lon, country, conn)
|
|
if not city_data:
|
|
print(f" No GeoNames match for {country}")
|
|
return False
|
|
|
|
city_code = generate_city_code(city_data['ascii_name'])
|
|
region_code = get_region_code(country, city_data['admin1_code'], city_data.get('admin2_code', ''))
|
|
|
|
print(f" City: {city_data['name']} ({city_code}), Region: {region_code}")
|
|
|
|
if not apply:
|
|
return True
|
|
|
|
# Update GHCID
|
|
ghcid = data.get('ghcid', {})
|
|
current = ghcid.get('ghcid_current', '')
|
|
|
|
# Parse current GHCID
|
|
parts = current.split('-')
|
|
if len(parts) < 5:
|
|
print(f" Invalid GHCID format: {current}")
|
|
return False
|
|
|
|
# Update city code (and region if still XX)
|
|
old_region = parts[1]
|
|
old_city = parts[2]
|
|
|
|
if old_city != 'XXX':
|
|
print(f" City already resolved: {old_city}")
|
|
return False
|
|
|
|
# Update parts
|
|
if old_region == 'XX' and region_code != 'XX':
|
|
parts[1] = region_code
|
|
parts[2] = city_code
|
|
|
|
new_ghcid = '-'.join(parts)
|
|
|
|
# Update data
|
|
ghcid['ghcid_current'] = new_ghcid
|
|
loc_res = ghcid.get('location_resolution', {})
|
|
loc_res['city_code'] = city_code
|
|
loc_res['city_name'] = city_data['name']
|
|
loc_res['geonames_id'] = city_data['geonames_id']
|
|
loc_res['feature_code'] = city_data['feature_code']
|
|
if old_region == 'XX' and region_code != 'XX':
|
|
loc_res['region_code'] = region_code
|
|
loc_res['method'] = 'REVERSE_GEOCODE_FROM_FILE_COORDS'
|
|
loc_res['resolution_timestamp'] = datetime.now(timezone.utc).isoformat()
|
|
ghcid['location_resolution'] = loc_res
|
|
|
|
# Add to history
|
|
history = ghcid.get('ghcid_history', [])
|
|
history.append({
|
|
'ghcid': new_ghcid,
|
|
'valid_from': datetime.now(timezone.utc).isoformat(),
|
|
'reason': f'City resolved via reverse geocoding: XXX->{city_code} ({city_data["name"]})'
|
|
})
|
|
ghcid['ghcid_history'] = history
|
|
data['ghcid'] = ghcid
|
|
|
|
# Calculate new filename
|
|
old_name = filepath.name
|
|
new_name = old_name.replace(f'{old_region}-XXX', f'{parts[1]}-{city_code}')
|
|
if old_region != 'XX' or region_code == 'XX':
|
|
new_name = old_name.replace('-XXX-', f'-{city_code}-')
|
|
|
|
new_path = filepath.parent / new_name
|
|
|
|
# Write and rename
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
if new_path != filepath:
|
|
filepath.rename(new_path)
|
|
print(f" Renamed: {old_name} -> {new_name}")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description='Resolve XXX city codes using coordinates in files')
|
|
parser.add_argument('--limit', type=int, default=100, help='Max files to process')
|
|
parser.add_argument('--apply', action='store_true', help='Apply changes (default: dry run)')
|
|
parser.add_argument('--country', help='Filter by country code')
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 70)
|
|
print("CITY RESOLUTION FROM FILE COORDINATES")
|
|
print("=" * 70)
|
|
print(f"Mode: {'APPLYING CHANGES' if args.apply else 'DRY RUN'}")
|
|
print()
|
|
|
|
# Connect to GeoNames
|
|
if not GEONAMES_DB.exists():
|
|
print(f"ERROR: GeoNames database not found: {GEONAMES_DB}")
|
|
sys.exit(1)
|
|
|
|
conn = sqlite3.connect(str(GEONAMES_DB))
|
|
|
|
# Find XXX files with coordinates
|
|
xxx_files = []
|
|
for f in CUSTODIAN_DIR.glob('*.yaml'):
|
|
if '-XXX-' in f.name:
|
|
if args.country and not f.name.startswith(f'{args.country}-'):
|
|
continue
|
|
xxx_files.append(f)
|
|
|
|
print(f"Found {len(xxx_files)} files with XXX codes")
|
|
|
|
# Filter to files with coordinates
|
|
files_with_coords = []
|
|
for f in xxx_files:
|
|
try:
|
|
with open(f, 'r', encoding='utf-8') as fp:
|
|
content = fp.read()
|
|
if 'latitude:' in content and 'longitude:' in content:
|
|
files_with_coords.append(f)
|
|
except:
|
|
pass
|
|
|
|
print(f"Processing {min(len(files_with_coords), args.limit)} files with coordinates")
|
|
print()
|
|
|
|
resolved = 0
|
|
renamed = 0
|
|
|
|
for f in files_with_coords[:args.limit]:
|
|
print(f"Processing {f.name}...")
|
|
if process_file(f, conn, args.apply):
|
|
resolved += 1
|
|
if args.apply:
|
|
renamed += 1
|
|
|
|
conn.close()
|
|
|
|
print()
|
|
print("=" * 70)
|
|
print("SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Files processed: {min(len(files_with_coords), args.limit)}")
|
|
print(f"Resolved: {resolved}")
|
|
print(f"Renamed: {renamed}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|