#!/usr/bin/env python3 """ Resolve XXX city codes using Wikidata P159 (headquarters) or P625 (coordinates). This script handles files with XXX city codes by: 1. Getting Wikidata ID from the file 2. Querying P625 (coordinates) or P159 (headquarters location) 3. Reverse geocoding to GeoNames to find the nearest city Following AGENTS.md Rules: - Rule 5: Additive only - never delete existing data - GHCID settlement standardization: GeoNames is authoritative """ import os import sys import yaml import json import time import sqlite3 import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any, Tuple # GeoNames database GEONAMES_DB = Path(__file__).parent.parent / "data/reference/geonames.db" # Feature codes for proper settlements (EXCLUDE PPLX neighborhoods) SETTLEMENT_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') def get_wikidata_location(wikidata_id: str) -> Optional[Tuple[float, float]]: """Get coordinates from Wikidata entity using P625 or P159.""" headers = {'User-Agent': 'GLAM-Extractor/1.0 (heritage research project)'} url = f'https://www.wikidata.org/w/api.php?action=wbgetentities&ids={wikidata_id}&props=claims&format=json' try: req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=30) as response: data = json.loads(response.read().decode('utf-8')) claims = data['entities'][wikidata_id]['claims'] # Try P625 (coordinates) first if 'P625' in claims: coords = claims['P625'][0]['mainsnak']['datavalue']['value'] return (coords['latitude'], coords['longitude']) # Try P159 (headquarters location) if 'P159' in claims: loc_id = claims['P159'][0]['mainsnak']['datavalue']['value']['id'] time.sleep(0.5) # Rate limiting # Get coordinates of headquarters url2 = f'https://www.wikidata.org/w/api.php?action=wbgetentities&ids={loc_id}&props=claims&format=json' req2 = urllib.request.Request(url2, headers=headers) with urllib.request.urlopen(req2, timeout=30) as response2: data2 = json.loads(response2.read().decode('utf-8')) claims2 = data2['entities'][loc_id]['claims'] if 'P625' in claims2: coords = claims2['P625'][0]['mainsnak']['datavalue']['value'] return (coords['latitude'], coords['longitude']) return None except Exception as e: print(f" Error fetching Wikidata {wikidata_id}: {e}") return None def reverse_geocode(lat: float, lon: float, country: str, conn: sqlite3.Connection) -> Optional[Dict]: """Reverse geocode coordinates to nearest city in GeoNames.""" cursor = conn.cursor() cursor.execute(f''' SELECT geonames_id, name, ascii_name, admin1_code, admin2_code, latitude, longitude, feature_code, population, ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as distance_sq FROM cities WHERE country_code = ? AND feature_code IN {SETTLEMENT_FEATURE_CODES} ORDER BY distance_sq LIMIT 1 ''', (lat, lat, lon, lon, country)) row = cursor.fetchone() if not row: return None return { 'geonames_id': row[0], 'name': row[1], 'ascii_name': row[2], 'admin1_code': row[3], 'admin2_code': row[4], 'latitude': row[5], 'longitude': row[6], 'feature_code': row[7], 'population': row[8], 'distance_sq': row[9], } def generate_city_code(city_name: str) -> str: """Generate 3-letter city code from name.""" words = city_name.split() if len(words) == 1: return city_name[:3].upper() else: initials = ''.join(w[0] for w in words if w)[:3] return initials.upper() def process_file(filepath: Path, conn: sqlite3.Connection, dry_run: bool = True) -> Tuple[bool, Optional[Path]]: """Process a single file to resolve XXX city code.""" try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: print(f" Error reading {filepath}: {e}") return False, None # Check if has XXX city code ghcid = data.get('ghcid', {}) loc_res = ghcid.get('location_resolution', {}) if loc_res.get('city_code', '') != 'XXX': return False, None country = loc_res.get('country_code', '') if not country: return False, None # Get Wikidata ID wikidata_id = None if 'original_entry' in data and 'wikidata_id' in data['original_entry']: wikidata_id = data['original_entry']['wikidata_id'] elif 'wikidata_enrichment' in data and 'wikidata_entity_id' in data['wikidata_enrichment']: wikidata_id = data['wikidata_enrichment']['wikidata_entity_id'] if not wikidata_id: return False, None # Get coordinates from Wikidata coords = get_wikidata_location(wikidata_id) if not coords: print(f" No coordinates for {wikidata_id}") return False, None lat, lon = coords print(f" Coords: {lat:.4f}, {lon:.4f}") # Reverse geocode city_data = reverse_geocode(lat, lon, country, conn) if not city_data: print(f" No GeoNames match in {country}") return False, None city_name = city_data['ascii_name'] or city_data['name'] city_code = generate_city_code(city_name) print(f" City: {city_name} ({city_code})") # Update file old_city_code = loc_res.get('city_code', 'XXX') loc_res['city_code'] = city_code loc_res['city_label'] = city_name loc_res['geonames_id'] = city_data['geonames_id'] loc_res['method'] = 'WIKIDATA_COORDS_REVERSE_GEOCODE' loc_res['resolution_timestamp'] = datetime.now(timezone.utc).isoformat() # Update GHCID string old_ghcid = ghcid.get('ghcid_current', '') new_ghcid = old_ghcid.replace(f'-XXX-', f'-{city_code}-') ghcid['ghcid_current'] = new_ghcid # Add to history if 'ghcid_history' not in ghcid: ghcid['ghcid_history'] = [] ghcid['ghcid_history'].append({ 'ghcid': new_ghcid, 'valid_from': datetime.now(timezone.utc).isoformat(), 'reason': f"City resolved via Wikidata {wikidata_id} coordinates: XXX->{city_code} ({city_name})" }) # Add provenance note if 'provenance' not in data: data['provenance'] = {} if 'notes' not in data['provenance']: data['provenance']['notes'] = [] elif isinstance(data['provenance']['notes'], str): data['provenance']['notes'] = [data['provenance']['notes']] data['provenance']['notes'].append( f"City resolved {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}: " f"XXX->{city_code} via Wikidata {wikidata_id} coords ({lat:.4f},{lon:.4f}) -> {city_name} (GeoNames:{city_data['geonames_id']})" ) # Determine new filename new_filename = filepath.name.replace(f'-XXX-', f'-{city_code}-') new_filepath = filepath.parent / new_filename if not dry_run: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) if new_filepath != filepath and not new_filepath.exists(): filepath.rename(new_filepath) return True, new_filepath if new_filepath != filepath else None def main(): import argparse parser = argparse.ArgumentParser(description='Resolve XXX city codes using Wikidata coordinates') parser.add_argument('--apply', action='store_true', help='Actually apply the fixes') parser.add_argument('--path', type=str, default='data/custodian', help='Path to custodian files') parser.add_argument('--limit', type=int, default=50, help='Limit number of files to process') parser.add_argument('--country', type=str, help='Only process files for a specific country') args = parser.parse_args() custodian_dir = Path(args.path) if not custodian_dir.exists(): print(f"Error: Directory {custodian_dir} does not exist") sys.exit(1) # Connect to GeoNames if not GEONAMES_DB.exists(): print(f"Error: GeoNames database not found at {GEONAMES_DB}") sys.exit(1) conn = sqlite3.connect(GEONAMES_DB) dry_run = not args.apply print("=" * 70) print("WIKIDATA COORDINATES CITY RESOLUTION") print("=" * 70) print(f"Mode: {'DRY RUN' if dry_run else 'APPLYING CHANGES'}") print() # Find files with XXX city codes files_to_process = list(custodian_dir.glob('*-XXX-*.yaml')) print(f"Found {len(files_to_process)} files with XXX codes") # Filter and collect files with Wikidata IDs file_data = [] for filepath in files_to_process: if len(file_data) >= args.limit: break try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) country = data.get('ghcid', {}).get('location_resolution', {}).get('country_code', '') if args.country and country != args.country: continue # Check for Wikidata ID wikidata_id = None if 'original_entry' in data and 'wikidata_id' in data['original_entry']: wikidata_id = data['original_entry']['wikidata_id'] elif 'wikidata_enrichment' in data and 'wikidata_entity_id' in data['wikidata_enrichment']: wikidata_id = data['wikidata_enrichment']['wikidata_entity_id'] if not wikidata_id: continue file_data.append({ 'filepath': filepath, 'wikidata_id': wikidata_id, 'country': country, }) except Exception: pass print(f"Processing {len(file_data)} files with Wikidata IDs") print() resolved = 0 renamed = 0 for f in file_data: filepath = f['filepath'] print(f"Processing {filepath.name}...") print(f" Wikidata: {f['wikidata_id']}") success, new_path = process_file(filepath, conn, dry_run=dry_run) if success: resolved += 1 if new_path: renamed += 1 print(f" Renamed: {filepath.name} -> {new_path.name}") time.sleep(0.5) # Rate limiting conn.close() print() print("=" * 70) print("SUMMARY") print("=" * 70) print(f"Files processed: {len(file_data)}") print(f"Resolved: {resolved}") print(f"Renamed: {renamed}") if dry_run: print() print("This was a DRY RUN. Use --apply to make changes.") if __name__ == '__main__': main()