glam/scripts/geocode_missing_from_geonames.py
kempersc 3a6ead8fde feat: Add legal form filtering rule for CustodianName
- Introduced LEGAL-FORM-FILTER rule to standardize CustodianName by removing legal form designations.
- Documented rationale, examples, and implementation guidelines for the filtering process.

docs: Create README for value standardization rules

- Established a comprehensive README outlining various value standardization rules applicable to Heritage Custodian classes.
- Categorized rules into Name Standardization, Geographic Standardization, Web Observation, and Schema Evolution.

feat: Implement transliteration standards for non-Latin scripts

- Added TRANSLIT-ISO rule to ensure GHCID abbreviations are generated from emic names using ISO standards for transliteration.
- Included detailed guidelines for various scripts and languages, along with implementation examples.

feat: Define XPath provenance rules for web observations

- Created XPATH-PROVENANCE rule mandating XPath pointers for claims extracted from web sources.
- Established a workflow for archiving websites and verifying claims against archived HTML.

chore: Update records lifecycle diagram

- Generated a new Mermaid diagram illustrating the records lifecycle for heritage custodians.
- Included phases for active records, inactive archives, and processed heritage collections with key relationships and classifications.
2025-12-09 16:58:41 +01:00

388 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Geocode Missing Coordinates from GeoNames Database
This script geocodes custodian files that are missing coordinates using the local
GeoNames database. It's much faster than API-based geocoding (no rate limits).
Features:
- Uses local GeoNames SQLite database for instant lookups
- Fuzzy matching for city names
- Updates files in-place preserving YAML structure
- Batch processing with progress tracking
- Safe updates (additive only, preserves existing data)
Usage:
python scripts/geocode_missing_from_geonames.py --dry-run
python scripts/geocode_missing_from_geonames.py --country JP --limit 100
python scripts/geocode_missing_from_geonames.py --all
"""
import argparse
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import unicodedata
from ruamel.yaml import YAML
# Setup ruamel.yaml for round-trip preservation
yaml = YAML()
yaml.preserve_quotes = True
yaml.width = 120
# Configuration
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
GEONAMES_DB = Path("/Users/kempersc/apps/glam/data/reference/geonames.db")
def normalize_city_name(name: Optional[str]) -> str:
"""Normalize city name for matching."""
if not name:
return ""
# NFD decomposition and remove accents
normalized = unicodedata.normalize('NFD', name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
result = ascii_name.lower().strip()
# Remove common Japanese administrative suffixes
# These are romanized forms of 市 (shi/city), 区 (ku/ward), 町 (machi/town), etc.
jp_suffixes = [' shi', '-shi', ' ku', '-ku', ' machi', '-machi', ' cho', '-cho',
' ken', '-ken', ' gun', '-gun', ' son', '-son', ' mura', '-mura']
for suffix in jp_suffixes:
if result.endswith(suffix):
result = result[:-len(suffix)]
break
return result
class GeoNamesLookup:
"""Fast city coordinate lookup from GeoNames database."""
def __init__(self, db_path: Path):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
def lookup_city(self, city: str, country_code: str, region: str = None) -> Optional[dict]:
"""
Look up city coordinates in GeoNames database.
Returns dict with latitude, longitude, geonames_id, etc. or None if not found.
"""
if not city or not country_code:
return None
# Normalize inputs
city_norm = normalize_city_name(city)
country_code = country_code.upper()
# Try exact match first (case-insensitive)
cursor = self.conn.execute("""
SELECT geonames_id, name, ascii_name, latitude, longitude,
admin1_code, admin1_name, feature_code, population
FROM cities
WHERE country_code = ?
AND (LOWER(name) = ? OR LOWER(ascii_name) = ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, city_norm or "", city_norm or ""))
row = cursor.fetchone()
if row:
return self._row_to_dict(row)
# Try with original city name (for non-ASCII)
cursor = self.conn.execute("""
SELECT geonames_id, name, ascii_name, latitude, longitude,
admin1_code, admin1_name, feature_code, population
FROM cities
WHERE country_code = ?
AND (name = ? OR ascii_name = ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, city, city))
row = cursor.fetchone()
if row:
return self._row_to_dict(row)
# Try partial match (city name contains or is contained in)
cursor = self.conn.execute("""
SELECT geonames_id, name, ascii_name, latitude, longitude,
admin1_code, admin1_name, feature_code, population
FROM cities
WHERE country_code = ?
AND (LOWER(name) LIKE ? OR LOWER(ascii_name) LIKE ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, f"%{city_norm}%", f"%{city_norm}%"))
row = cursor.fetchone()
if row:
return self._row_to_dict(row)
return None
def _row_to_dict(self, row) -> dict:
"""Convert database row to dictionary."""
return {
'geonames_id': row['geonames_id'],
'geonames_name': row['name'],
'latitude': row['latitude'],
'longitude': row['longitude'],
'admin1_code': row['admin1_code'],
'admin1_name': row['admin1_name'],
'feature_code': row['feature_code'],
'population': row['population']
}
def close(self):
self.conn.close()
def extract_city_country(data: dict) -> tuple[Optional[str], Optional[str]]:
"""Extract city and country from custodian data."""
city = None
country = None
# Try location block first
loc = data.get('location', {})
if loc:
city = loc.get('city')
country = loc.get('country')
# Try ghcid.location_resolution
if not city:
ghcid_loc = data.get('ghcid', {}).get('location_resolution', {})
if ghcid_loc:
city = (ghcid_loc.get('city_name') or
ghcid_loc.get('city_label') or
ghcid_loc.get('geonames_name') or
ghcid_loc.get('google_maps_locality'))
if not country:
country = ghcid_loc.get('country_code')
# Try original_entry.locations
if not city:
orig_locs = data.get('original_entry', {}).get('locations', [])
if orig_locs and len(orig_locs) > 0:
city = orig_locs[0].get('city')
country = orig_locs[0].get('country')
# Try to infer country from GHCID
if not country:
ghcid = data.get('ghcid', {}).get('ghcid_current', '')
if ghcid and len(ghcid) >= 2:
country = ghcid[:2]
return city, country
def geocode_file(filepath: Path, geonames: GeoNamesLookup, dry_run: bool = False) -> dict:
"""
Geocode a single custodian file using GeoNames.
Returns:
Dictionary with results:
- success: bool
- geocoded: bool (True if coordinates were added)
- already_has_coords: bool
- error: str or None
"""
result = {
'success': False,
'geocoded': False,
'already_has_coords': False,
'city': None,
'country': None,
'error': None
}
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.load(f)
if not isinstance(data, dict):
result['error'] = "Invalid YAML structure"
return result
# Check if already has coordinates
loc = data.get('location', {})
if loc.get('latitude') is not None and loc.get('longitude') is not None:
result['success'] = True
result['already_has_coords'] = True
return result
# Extract city and country
city, country = extract_city_country(data)
result['city'] = city
result['country'] = country
if not city or not country:
result['error'] = f"Missing city ({city}) or country ({country})"
result['success'] = True # Not an error, just no data to geocode
return result
# Look up in GeoNames
geo_result = geonames.lookup_city(city, country)
if not geo_result:
result['error'] = f"City not found in GeoNames: {city}, {country}"
result['success'] = True # Not a fatal error
return result
# Update location block with coordinates
if 'location' not in data:
data['location'] = {}
data['location']['latitude'] = geo_result['latitude']
data['location']['longitude'] = geo_result['longitude']
data['location']['coordinate_provenance'] = {
'source_type': 'GEONAMES_LOCAL',
'source_path': 'data/reference/geonames.db',
'entity_id': geo_result['geonames_id'],
'original_timestamp': datetime.now(timezone.utc).isoformat()
}
# Add geonames reference if not present
if not data['location'].get('geonames_id'):
data['location']['geonames_id'] = geo_result['geonames_id']
if not data['location'].get('geonames_name'):
data['location']['geonames_name'] = geo_result['geonames_name']
if not data['location'].get('feature_code'):
data['location']['feature_code'] = geo_result['feature_code']
# Update normalization timestamp
data['location']['normalization_timestamp'] = datetime.now(timezone.utc).isoformat()
if not dry_run:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f)
result['success'] = True
result['geocoded'] = True
return result
except Exception as e:
result['error'] = str(e)
return result
def main():
parser = argparse.ArgumentParser(
description="Geocode missing coordinates using GeoNames database"
)
parser.add_argument('--dry-run', action='store_true', help="Preview without writing")
parser.add_argument('--country', type=str, help="Only process specific country code (e.g., JP)")
parser.add_argument('--limit', type=int, default=0, help="Limit number of files to process")
parser.add_argument('--all', action='store_true', help="Process all files (no limit)")
parser.add_argument('--verbose', action='store_true', help="Show detailed output")
args = parser.parse_args()
if args.dry_run:
print("DRY RUN - No files will be modified\n")
# Initialize GeoNames lookup
if not GEONAMES_DB.exists():
print(f"Error: GeoNames database not found at {GEONAMES_DB}")
return 1
geonames = GeoNamesLookup(GEONAMES_DB)
# Get list of files to process
if args.country:
pattern = f"{args.country.upper()}-*.yaml"
files = sorted(CUSTODIAN_DIR.glob(pattern))
print(f"Processing {args.country.upper()} files: {len(files)} found")
else:
files = sorted(CUSTODIAN_DIR.glob("*.yaml"))
print(f"Processing all files: {len(files)} found")
if args.limit and not args.all:
files = files[:args.limit]
print(f"Limited to first {args.limit} files")
# Statistics
stats = {
'total': len(files),
'geocoded': 0,
'already_has_coords': 0,
'no_city_data': 0,
'not_found': 0,
'errors': 0,
'by_country': {}
}
errors = []
not_found = []
for i, filepath in enumerate(files):
result = geocode_file(filepath, geonames, dry_run=args.dry_run)
# Extract country from filename
country = filepath.name[:2]
if country not in stats['by_country']:
stats['by_country'][country] = {'geocoded': 0, 'not_found': 0}
if result['geocoded']:
stats['geocoded'] += 1
stats['by_country'][country]['geocoded'] += 1
elif result['already_has_coords']:
stats['already_has_coords'] += 1
elif result['error'] and 'Missing city' in result['error']:
stats['no_city_data'] += 1
elif result['error'] and 'not found in GeoNames' in result['error']:
stats['not_found'] += 1
stats['by_country'][country]['not_found'] += 1
if len(not_found) < 100:
not_found.append((filepath.name, result['city'], result['country']))
elif result['error']:
stats['errors'] += 1
if len(errors) < 20:
errors.append((filepath.name, result['error']))
if args.verbose:
status = "GEOCODED" if result['geocoded'] else "SKIP" if result['already_has_coords'] else "FAIL"
print(f"[{i+1}/{len(files)}] {filepath.name}: {status}")
elif (i + 1) % 1000 == 0:
print(f"Processed {i+1}/{len(files)} files... (geocoded: {stats['geocoded']})")
# Print summary
print("\n" + "=" * 60)
print("GEOCODING SUMMARY")
print("=" * 60)
print(f"Total files processed: {stats['total']}")
print(f"Already had coordinates: {stats['already_has_coords']}")
print(f"Successfully geocoded: {stats['geocoded']}")
print(f"No city data available: {stats['no_city_data']}")
print(f"City not found in GeoNames: {stats['not_found']}")
print(f"Errors: {stats['errors']}")
if stats['by_country']:
print("\nResults by country:")
for country, data in sorted(stats['by_country'].items(), key=lambda x: -x[1]['geocoded']):
if data['geocoded'] > 0 or data['not_found'] > 0:
print(f" {country}: geocoded={data['geocoded']}, not_found={data['not_found']}")
if not_found:
print(f"\nFirst {len(not_found)} cities not found:")
for filename, city, country in not_found[:20]:
print(f" {filename}: {city}, {country}")
if errors:
print(f"\nFirst {len(errors)} errors:")
for filename, error in errors:
print(f" {filename}: {error}")
if args.dry_run:
print("\n(DRY RUN - No files were modified)")
geonames.close()
return 0
if __name__ == "__main__":
exit(main())