glam/scripts/enrich_cities_google.py
2025-12-30 23:07:03 +01:00

608 lines
20 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Enrich custodian files with city/region data using Google Places API.
This is a generic script that works for any country's XXX files.
Usage:
python scripts/enrich_cities_google.py --country KR [--dry-run] [--limit N]
python scripts/enrich_cities_google.py --country AR [--dry-run] [--limit N]
python scripts/enrich_cities_google.py --all [--dry-run] [--limit N]
Environment Variables:
GOOGLE_PLACES_TOKEN - Required: Google Cloud API key with Places API enabled
"""
import os
import sys
import time
import sqlite3
import re
import argparse
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
import yaml
import httpx
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
GOOGLE_PLACES_TOKEN = os.getenv("GOOGLE_PLACES_TOKEN", "")
GEONAMES_DB = Path("/Users/kempersc/apps/glam/data/reference/geonames.db")
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
# Google Places API
TEXT_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText"
REQUEST_DELAY = 0.3
# Country name mapping for search queries
COUNTRY_NAMES = {
'KR': 'South Korea',
'AR': 'Argentina',
'US': 'United States',
'IN': 'India',
'JM': 'Jamaica',
'UZ': 'Uzbekistan',
'UA': 'Ukraine',
'TJ': 'Tajikistan',
'OM': 'Oman',
'NL': 'Netherlands',
'NA': 'Namibia',
'ML': 'Mali',
'LK': 'Sri Lanka',
'LB': 'Lebanon',
'IT': 'Italy',
'IR': 'Iran',
'EC': 'Ecuador',
'DK': 'Denmark',
'CU': 'Cuba',
'CO': 'Colombia',
'BR': 'Brazil',
'MX': 'Mexico',
'JP': 'Japan',
'CZ': 'Czech Republic',
'DE': 'Germany',
'FR': 'France',
'GB': 'United Kingdom',
'EE': 'Estonia',
'PH': 'Philippines',
'CL': 'Chile',
'CH': 'Switzerland',
}
def get_city_code(city_name: str) -> str:
"""Generate 3-letter city code from city name."""
name = city_name.strip()
# Remove common suffixes
for suffix in [' City', ' Town', '-shi', '-ku', '-gun', '-cho', ' District']:
if name.endswith(suffix):
name = name[:-len(suffix)]
words = name.split()
if len(words) == 1:
return name[:3].upper()
elif len(words) == 2:
return (words[0][0] + words[1][:2]).upper()
else:
return ''.join(w[0] for w in words[:3]).upper()
def search_google_places(query: str, api_key: str) -> Optional[dict]:
"""Search Google Places API for a location."""
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": api_key,
"X-Goog-FieldMask": "places.displayName,places.formattedAddress,places.location,places.addressComponents,places.types,places.id,places.websiteUri"
}
payload = {
"textQuery": query,
"languageCode": "en"
}
try:
response = httpx.post(TEXT_SEARCH_URL, json=payload, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
if "places" in data and len(data["places"]) > 0:
return data["places"][0]
return None
except Exception as e:
print(f" Error searching Google Places: {e}")
return None
def extract_location_from_google(place: dict) -> dict:
"""Extract location information from Google Places result."""
result = {
'city': None,
'region': None,
'latitude': None,
'longitude': None,
'formatted_address': None,
'place_id': None,
'website': None,
'country_code': None, # Added for country validation
}
if not place:
return result
result['place_id'] = place.get('id')
result['formatted_address'] = place.get('formattedAddress')
result['website'] = place.get('websiteUri')
location = place.get('location', {})
result['latitude'] = location.get('latitude')
result['longitude'] = location.get('longitude')
components = place.get('addressComponents', [])
for comp in components:
types = comp.get('types', [])
long_name = comp.get('longText', '')
short_name = comp.get('shortText', '')
if 'locality' in types:
result['city'] = long_name
elif 'administrative_area_level_1' in types:
result['region'] = long_name
elif 'sublocality_level_1' in types and not result['city']:
result['city'] = long_name
elif 'country' in types:
# Extract country code for validation
result['country_code'] = short_name
return result
def lookup_city_geonames(conn: sqlite3.Connection, lat: float, lon: float, country_code: str) -> Optional[dict]:
"""Reverse geocode coordinates to find nearest city in GeoNames."""
cursor = conn.cursor()
cursor.execute("""
SELECT name, ascii_name, admin1_code, admin1_name, geonames_id,
latitude, longitude, population, feature_code,
((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as dist_sq
FROM cities
WHERE country_code = ?
AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
ORDER BY dist_sq
LIMIT 1
""", (lat, lat, lon, lon, country_code))
row = cursor.fetchone()
if row:
return {
'name': row[0],
'ascii_name': row[1],
'admin1_code': row[2],
'admin1_name': row[3],
'geonames_id': row[4],
'latitude': row[5],
'longitude': row[6],
'population': row[7],
'feature_code': row[8],
}
return None
# Brazil: GeoNames admin1_code → ISO 3166-2:BR state code
BRAZIL_STATE_CODES = {
'01': 'AC', # Acre
'02': 'AL', # Alagoas
'03': 'AP', # Amapá
'04': 'AM', # Amazonas
'05': 'BA', # Bahia
'06': 'CE', # Ceará
'07': 'DF', # Federal District (Distrito Federal)
'08': 'ES', # Espírito Santo
'11': 'MS', # Mato Grosso do Sul
'13': 'MA', # Maranhão
'14': 'MT', # Mato Grosso
'15': 'MG', # Minas Gerais
'16': 'PA', # Pará
'17': 'PB', # Paraíba
'18': 'PR', # Paraná
'20': 'PI', # Piauí
'21': 'RJ', # Rio de Janeiro
'22': 'RN', # Rio Grande do Norte
'23': 'RS', # Rio Grande do Sul
'24': 'RO', # Rondônia
'25': 'RR', # Roraima
'26': 'SC', # Santa Catarina
'27': 'SP', # São Paulo
'28': 'SE', # Sergipe
'29': 'GO', # Goiás
'30': 'PE', # Pernambuco
'31': 'TO', # Tocantins
}
# Switzerland: GeoNames admin1_code → ISO 3166-2:CH canton code
SWITZERLAND_CANTON_CODES = {
'AG': 'AG', # Aargau
'AI': 'AI', # Appenzell Innerrhoden
'AR': 'AR', # Appenzell Ausserrhoden
'BE': 'BE', # Bern
'BL': 'BL', # Basel-Landschaft
'BS': 'BS', # Basel-Stadt
'FR': 'FR', # Fribourg
'GE': 'GE', # Geneva
'GL': 'GL', # Glarus
'GR': 'GR', # Graubünden
'JU': 'JU', # Jura
'LU': 'LU', # Lucerne
'NE': 'NE', # Neuchâtel
'NW': 'NW', # Nidwalden
'OW': 'OW', # Obwalden
'SG': 'SG', # St. Gallen
'SH': 'SH', # Schaffhausen
'SO': 'SO', # Solothurn
'SZ': 'SZ', # Schwyz
'TG': 'TG', # Thurgau
'TI': 'TI', # Ticino
'UR': 'UR', # Uri
'VD': 'VD', # Vaud
'VS': 'VS', # Valais
'ZG': 'ZG', # Zug
'ZH': 'ZH', # Zürich
}
# Mexico: GeoNames admin1_code → ISO 3166-2:MX state code
MEXICO_STATE_CODES = {
'01': 'AGU', # Aguascalientes
'02': 'BCN', # Baja California
'03': 'BCS', # Baja California Sur
'04': 'CAM', # Campeche
'05': 'COA', # Coahuila
'06': 'COL', # Colima
'07': 'CHP', # Chiapas
'08': 'CHH', # Chihuahua
'09': 'CMX', # Ciudad de México (CDMX)
'10': 'DUR', # Durango
'11': 'GUA', # Guanajuato
'12': 'GRO', # Guerrero
'13': 'HID', # Hidalgo
'14': 'JAL', # Jalisco
'15': 'MEX', # México (State of Mexico)
'16': 'MIC', # Michoacán
'17': 'MOR', # Morelos
'18': 'NAY', # Nayarit
'19': 'NLE', # Nuevo León
'20': 'OAX', # Oaxaca
'21': 'PUE', # Puebla
'22': 'QUE', # Querétaro
'23': 'ROO', # Quintana Roo
'24': 'SLP', # San Luis Potosí
'25': 'SIN', # Sinaloa
'26': 'SON', # Sonora
'27': 'TAB', # Tabasco
'28': 'TAM', # Tamaulipas
'29': 'TLA', # Tlaxcala
'30': 'VER', # Veracruz
'31': 'YUC', # Yucatán
'32': 'ZAC', # Zacatecas
}
# Chile: GeoNames admin1_code → ISO 3166-2:CL region code
CHILE_REGION_CODES = {
'01': 'TA', # Tarapacá
'02': 'AN', # Antofagasta
'03': 'AT', # Atacama
'04': 'CO', # Coquimbo
'05': 'VS', # Valparaíso
'06': 'LI', # Libertador General Bernardo O'Higgins
'07': 'ML', # Maule
'08': 'BI', # Biobío
'09': 'AR', # La Araucanía
'10': 'LL', # Los Lagos
'11': 'AI', # Aisén del General Carlos Ibáñez del Campo
'12': 'MA', # Magallanes y de la Antártica Chilena
'13': 'RM', # Región Metropolitana de Santiago
'14': 'LR', # Los Ríos
'15': 'AP', # Arica y Parinacota
'16': 'NB', # Ñuble
}
def get_region_code(admin1_code: str, country_code: str, admin1_name: str) -> str:
"""Get ISO-style region code from GeoNames admin1_code."""
if not admin1_code:
return 'XX'
# Country-specific mappings
if country_code == 'BR' and admin1_code in BRAZIL_STATE_CODES:
return BRAZIL_STATE_CODES[admin1_code]
if country_code == 'CH' and admin1_code in SWITZERLAND_CANTON_CODES:
return SWITZERLAND_CANTON_CODES[admin1_code]
if country_code == 'MX' and admin1_code in MEXICO_STATE_CODES:
return MEXICO_STATE_CODES[admin1_code]
if country_code == 'CL' and admin1_code in CHILE_REGION_CODES:
return CHILE_REGION_CODES[admin1_code]
# For most countries, use first 2-3 characters of admin1_code or name
if len(admin1_code) <= 3:
return admin1_code.upper()
# Use abbreviation from name
if admin1_name:
words = admin1_name.split()
if len(words) == 1:
return admin1_name[:2].upper()
else:
return ''.join(w[0] for w in words[:2]).upper()
return admin1_code[:2].upper()
def process_file(filepath: Path, conn: sqlite3.Connection, api_key: str,
country_code: str, country_name: str, dry_run: bool = False) -> dict:
"""Process a single custodian file."""
result = {
'file': str(filepath),
'status': 'skipped',
'old_ghcid': None,
'new_ghcid': None,
'city': None,
'region': None,
'error': None,
}
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
result['status'] = 'error'
result['error'] = f'Failed to load YAML: {e}'
return result
if not data:
result['status'] = 'error'
result['error'] = 'Empty YAML file'
return result
ghcid_data = data.get('ghcid', {})
old_ghcid = ghcid_data.get('ghcid_current', '')
result['old_ghcid'] = old_ghcid
# Match patterns with XXX city code:
# - {country}-XX-XXX-... (2-letter region like XX, BE, GE)
# - {country}-10-XXX-... (2-digit region like 10, 52, 37)
# - {country}-UKM-XXX-... (3-letter region like UKM, IDF, CMX)
xxx_pattern = re.compile(rf'^{country_code}-[A-Z0-9]{{2,3}}-XXX-')
if not xxx_pattern.match(old_ghcid):
result['status'] = 'skipped'
result['error'] = f'Not a {country_code}-*-XXX file'
return result
# Get institution name
name = data.get('custodian_name', {}).get('claim_value', '')
if not name:
name = data.get('original_entry', {}).get('name', '')
if not name:
result['status'] = 'error'
result['error'] = 'No institution name found'
return result
# Search Google Places
search_query = f"{name} {country_name}"
print(f" Searching: {name[:50]}...")
place = search_google_places(search_query, api_key)
time.sleep(REQUEST_DELAY)
if not place:
result['status'] = 'error'
result['error'] = 'Not found in Google Places'
return result
location_info = extract_location_from_google(place)
if not location_info['latitude'] or not location_info['longitude']:
result['status'] = 'error'
result['error'] = 'No coordinates from Google'
return result
# CRITICAL: Validate that Google returned a result in the expected country
# This prevents data contamination from similarly-named institutions in other countries
google_country = location_info.get('country_code')
if google_country and google_country != country_code:
result['status'] = 'error'
result['error'] = f'COUNTRY MISMATCH: Google returned {google_country}, expected {country_code}. Address: {location_info.get("formatted_address", "unknown")}'
return result
# Lookup in GeoNames
city_info = lookup_city_geonames(conn, location_info['latitude'],
location_info['longitude'], country_code)
if not city_info:
result['status'] = 'error'
result['error'] = 'City not found in GeoNames'
return result
region_code = get_region_code(city_info['admin1_code'], country_code, city_info['admin1_name'])
city_code = get_city_code(city_info['ascii_name'])
result['city'] = city_info['ascii_name']
result['region'] = city_info['admin1_name']
# Build new GHCID
parts = old_ghcid.split('-')
if len(parts) >= 5:
inst_type = parts[3]
abbreviation = '-'.join(parts[4:])
else:
result['status'] = 'error'
result['error'] = f'Invalid GHCID format: {old_ghcid}'
return result
new_ghcid = f'{country_code}-{region_code}-{city_code}-{inst_type}-{abbreviation}'
result['new_ghcid'] = new_ghcid
if dry_run:
result['status'] = 'would_update'
return result
# Update the data
timestamp = datetime.now(timezone.utc).isoformat()
data['ghcid']['ghcid_current'] = new_ghcid
data['ghcid']['location_resolution'] = {
'method': 'GOOGLE_PLACES_GEONAMES',
'country_code': country_code,
'region_code': region_code,
'region_name': city_info['admin1_name'],
'city_code': city_code,
'city_name': city_info['ascii_name'],
'geonames_id': city_info['geonames_id'],
'feature_code': city_info['feature_code'],
'google_place_id': location_info.get('place_id'),
'latitude': location_info['latitude'],
'longitude': location_info['longitude'],
'resolution_date': timestamp,
}
data['google_maps_enrichment'] = {
'place_id': location_info.get('place_id'),
'formatted_address': location_info.get('formatted_address'),
'website': location_info.get('website'),
'latitude': location_info['latitude'],
'longitude': location_info['longitude'],
'enriched_at': timestamp,
'source': 'Google Places API (New)',
}
# Update GHCID history
if 'ghcid_history' not in data['ghcid']:
data['ghcid']['ghcid_history'] = []
for entry in data['ghcid']['ghcid_history']:
if entry.get('ghcid') == old_ghcid and not entry.get('valid_to'):
entry['valid_to'] = timestamp
data['ghcid']['ghcid_history'].append({
'ghcid': new_ghcid,
'ghcid_numeric': data['ghcid'].get('ghcid_numeric'),
'valid_from': timestamp,
'reason': f'Location resolved via Google Places + GeoNames: {city_info["ascii_name"]} ({region_code})',
})
if 'identifiers' in data:
for identifier in data['identifiers']:
if identifier.get('identifier_scheme') == 'GHCID':
identifier['identifier_value'] = new_ghcid
# Write and rename
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
new_filename = f'{new_ghcid}.yaml'
new_filepath = filepath.parent / new_filename
if filepath != new_filepath and not new_filepath.exists():
filepath.rename(new_filepath)
result['new_file'] = str(new_filepath)
elif new_filepath.exists() and filepath != new_filepath:
result['status'] = 'collision'
result['error'] = f'Target file exists: {new_filepath.name}'
return result
result['status'] = 'updated'
return result
def main():
parser = argparse.ArgumentParser(description='Enrich custodian files with Google Places data')
parser.add_argument('--country', type=str, help='Country code (e.g., KR, AR, US)')
parser.add_argument('--all', action='store_true', help='Process all countries with XXX files')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
parser.add_argument('--limit', type=int, help='Limit number of files per country')
args = parser.parse_args()
if not GOOGLE_PLACES_TOKEN:
print("ERROR: GOOGLE_PLACES_TOKEN environment variable is required")
sys.exit(1)
if not GEONAMES_DB.exists():
print(f"ERROR: GeoNames database not found: {GEONAMES_DB}")
sys.exit(1)
# Determine which countries to process
if args.all:
# Find all countries with XXX files (either XX-XXX or {region}-XXX)
countries = set()
for f in CUSTODIAN_DIR.glob('*-*-XXX-*.yaml'):
cc = f.name[:2]
if cc in COUNTRY_NAMES:
countries.add(cc)
countries = sorted(countries)
elif args.country:
countries = [args.country.upper()]
else:
print("ERROR: Specify --country CODE or --all")
sys.exit(1)
conn = sqlite3.connect(str(GEONAMES_DB))
total_stats = {'updated': 0, 'error': 0, 'skipped': 0, 'would_update': 0, 'collision': 0}
for country_code in countries:
country_name = COUNTRY_NAMES.get(country_code) or country_code
files = sorted(CUSTODIAN_DIR.glob(f'{country_code}-*-XXX-*.yaml'))
if args.limit:
files = files[:args.limit]
if not files:
continue
print(f"\n{'='*60}")
print(f"Processing {country_code} ({country_name}): {len(files)} files")
print('='*60)
stats = {'updated': 0, 'error': 0, 'skipped': 0, 'would_update': 0, 'collision': 0}
for filepath in files:
print(f"Processing: {filepath.name}")
result = process_file(filepath, conn, GOOGLE_PLACES_TOKEN,
country_code, country_name, dry_run=args.dry_run)
stats[result['status']] = stats.get(result['status'], 0) + 1
if result['status'] in ('updated', 'would_update'):
print(f"{result['city']} ({result['region']}): {result['old_ghcid']}{result['new_ghcid']}")
elif result['status'] == 'error':
print(f"{result['error']}")
elif result['status'] == 'collision':
print(f"{result['error']}")
print(f"\n{country_code} Summary: Updated={stats.get('updated', 0)}, "
f"Would update={stats.get('would_update', 0)}, "
f"Errors={stats.get('error', 0)}")
for k, v in stats.items():
total_stats[k] = total_stats.get(k, 0) + v
conn.close()
print()
print('='*60)
print('TOTAL Summary:')
print(f" Updated: {total_stats.get('updated', 0)}")
print(f" Would update: {total_stats.get('would_update', 0)}")
print(f" Errors: {total_stats.get('error', 0)}")
print(f" Collisions: {total_stats.get('collision', 0)}")
print(f" Skipped: {total_stats.get('skipped', 0)}")
if __name__ == '__main__':
main()