- JP: Handle Gun/Cho/Machi/Mura compound city names (2615 files) - CZ: Map city codes to GeoNames entries (667 files) - Overall coverage: 84.5% → 96.4%
505 lines
18 KiB
Python
505 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Geocode Japanese Compound City Names from GeoNames Database
|
|
|
|
This script handles Japanese compound city names that weren't matched by the
|
|
standard geocoding approach. Japanese locations often use compound names like:
|
|
|
|
- "Aichi Gun Togo Cho" = Aichi District, Togo Town → search for "Togo"
|
|
- "Nagoya Shi Chikusa Ku" = Nagoya City, Chikusa Ward → search for "Nagoya"
|
|
- "Kamikita Gun Rokkasho Mura" = Kamikita District, Rokkasho Village → search for "Rokkasho"
|
|
|
|
Japanese Administrative Divisions:
|
|
- 県 (Ken) = Prefecture
|
|
- 市 (Shi) = City
|
|
- 区 (Ku) = Ward (within cities)
|
|
- 郡 (Gun) = County/District
|
|
- 町 (Cho/Machi) = Town
|
|
- 村 (Mura/Son) = Village
|
|
|
|
Strategy:
|
|
1. Parse compound city name to extract settlement name
|
|
2. For "X Gun Y Cho/Mura" → search for Y
|
|
3. For "X Shi Y Ku" → search for X (the main city)
|
|
4. Use GeoNames local database for fast lookups
|
|
|
|
Usage:
|
|
python scripts/geocode_jp_compound_cities.py --dry-run
|
|
python scripts/geocode_jp_compound_cities.py --limit 100
|
|
python scripts/geocode_jp_compound_cities.py --all
|
|
"""
|
|
|
|
import argparse
|
|
import re
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from ruamel.yaml import YAML
|
|
|
|
# Setup ruamel.yaml for round-trip preservation
|
|
yaml = YAML()
|
|
yaml.preserve_quotes = True
|
|
yaml.width = 120
|
|
|
|
# Configuration
|
|
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
|
|
GEONAMES_DB = Path("/Users/kempersc/apps/glam/data/reference/geonames.db")
|
|
|
|
|
|
def parse_japanese_compound_city(city_name: str) -> list[str]:
|
|
"""
|
|
Parse Japanese compound city name and return list of candidate search terms.
|
|
|
|
Examples:
|
|
"Aichi Gun Togo Cho" → ["Togo", "Togo-cho", "Togocho"]
|
|
"Nagoya Shi Chikusa Ku" → ["Nagoya", "Chikusa"]
|
|
"Kamikita Gun Rokkasho Mura" → ["Rokkasho", "Rokkasho-mura"]
|
|
"Kitanagoyashi" → ["Kitanagoya"]
|
|
|
|
Returns list of candidate names in order of preference.
|
|
"""
|
|
if not city_name:
|
|
return []
|
|
|
|
candidates = []
|
|
|
|
# Normalize: remove extra spaces, handle case
|
|
city = city_name.strip()
|
|
parts = city.split()
|
|
|
|
# Pattern 1: "X Gun Y Cho" (District + Town)
|
|
# Extract Y as the main settlement
|
|
if 'Gun' in parts and 'Cho' in parts:
|
|
gun_idx = parts.index('Gun')
|
|
cho_idx = parts.index('Cho')
|
|
if gun_idx < cho_idx:
|
|
town_name = ' '.join(parts[gun_idx + 1:cho_idx])
|
|
if town_name:
|
|
candidates.extend([
|
|
town_name,
|
|
f"{town_name}-cho",
|
|
f"{town_name}cho",
|
|
town_name.lower(),
|
|
])
|
|
|
|
# Pattern 2: "X Gun Y Mura" (District + Village)
|
|
if 'Gun' in parts and 'Mura' in parts:
|
|
gun_idx = parts.index('Gun')
|
|
mura_idx = parts.index('Mura')
|
|
if gun_idx < mura_idx:
|
|
village_name = ' '.join(parts[gun_idx + 1:mura_idx])
|
|
if village_name:
|
|
candidates.extend([
|
|
village_name,
|
|
f"{village_name}-mura",
|
|
f"{village_name}mura",
|
|
village_name.lower(),
|
|
])
|
|
|
|
# Pattern 2b: "X Gun Y Machi" (District + Town - alternate romanization)
|
|
if 'Gun' in parts and 'Machi' in parts:
|
|
gun_idx = parts.index('Gun')
|
|
machi_idx = parts.index('Machi')
|
|
if gun_idx < machi_idx:
|
|
town_name = ' '.join(parts[gun_idx + 1:machi_idx])
|
|
if town_name:
|
|
candidates.extend([
|
|
town_name,
|
|
f"{town_name}-machi",
|
|
f"{town_name}machi",
|
|
f"{town_name}-cho",
|
|
f"{town_name}cho",
|
|
town_name.lower(),
|
|
])
|
|
|
|
# Pattern 3: "X Shi Y Ku" (City + Ward) → Use the city (X)
|
|
if 'Shi' in parts and 'Ku' in parts:
|
|
shi_idx = parts.index('Shi')
|
|
city_name_part = ' '.join(parts[:shi_idx])
|
|
if city_name_part:
|
|
candidates.extend([
|
|
city_name_part,
|
|
city_name_part.lower(),
|
|
])
|
|
# Also add the ward as fallback
|
|
ku_idx = parts.index('Ku')
|
|
ward_name = ' '.join(parts[shi_idx + 1:ku_idx])
|
|
if ward_name:
|
|
candidates.extend([
|
|
ward_name,
|
|
f"{ward_name}-ku",
|
|
])
|
|
|
|
# Pattern 4: "Xshi" (concatenated city name, e.g., "Kitanagoyashi")
|
|
if city.lower().endswith('shi') and ' ' not in city:
|
|
base = city[:-3] # Remove "shi"
|
|
if base:
|
|
candidates.extend([
|
|
base,
|
|
base.lower(),
|
|
city, # Also try full name
|
|
])
|
|
|
|
# Pattern 5: Just "X Shi" without ward
|
|
if 'Shi' in parts and 'Ku' not in parts:
|
|
shi_idx = parts.index('Shi')
|
|
city_name_part = ' '.join(parts[:shi_idx])
|
|
if city_name_part:
|
|
candidates.extend([
|
|
city_name_part,
|
|
city_name_part.lower(),
|
|
])
|
|
|
|
# Pattern 6: Try full name as-is
|
|
if not candidates:
|
|
candidates.append(city)
|
|
|
|
# Deduplicate while preserving order
|
|
seen = set()
|
|
unique = []
|
|
for c in candidates:
|
|
c_lower = c.lower()
|
|
if c_lower not in seen and c:
|
|
seen.add(c_lower)
|
|
unique.append(c)
|
|
|
|
return unique
|
|
|
|
|
|
class GeoNamesLookup:
|
|
"""Fast city coordinate lookup from GeoNames database."""
|
|
|
|
def __init__(self, db_path: Path):
|
|
self.conn = sqlite3.connect(db_path)
|
|
self.conn.row_factory = sqlite3.Row
|
|
|
|
def lookup_city(self, candidates: list[str], country_code: str = "JP") -> Optional[dict]:
|
|
"""
|
|
Look up city coordinates trying multiple candidate names.
|
|
|
|
Args:
|
|
candidates: List of potential city names to try
|
|
country_code: Country code (default JP)
|
|
|
|
Returns:
|
|
Dict with coordinates or None if not found
|
|
"""
|
|
# Preferred feature codes (proper settlements)
|
|
preferred_features = "('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')"
|
|
# Fallback includes PPLX (neighborhoods/sections) for Japan
|
|
fallback_features = "('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG', 'PPLX')"
|
|
|
|
for candidate in candidates:
|
|
if not candidate:
|
|
continue
|
|
|
|
# Try exact match with preferred features
|
|
cursor = self.conn.execute(f"""
|
|
SELECT geonames_id, name, ascii_name, latitude, longitude,
|
|
admin1_code, admin1_name, feature_code, population
|
|
FROM cities
|
|
WHERE country_code = ?
|
|
AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?))
|
|
AND feature_code IN {preferred_features}
|
|
ORDER BY population DESC
|
|
LIMIT 1
|
|
""", (country_code, candidate, candidate))
|
|
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return self._row_to_dict(row, candidate)
|
|
|
|
# Try partial match (starts with) with preferred features
|
|
cursor = self.conn.execute(f"""
|
|
SELECT geonames_id, name, ascii_name, latitude, longitude,
|
|
admin1_code, admin1_name, feature_code, population
|
|
FROM cities
|
|
WHERE country_code = ?
|
|
AND (LOWER(name) LIKE LOWER(?) OR LOWER(ascii_name) LIKE LOWER(?))
|
|
AND feature_code IN {preferred_features}
|
|
ORDER BY population DESC
|
|
LIMIT 1
|
|
""", (country_code, f"{candidate}%", f"{candidate}%"))
|
|
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return self._row_to_dict(row, candidate)
|
|
|
|
# Fallback: try again with PPLX included (for Japanese administrative units)
|
|
for candidate in candidates:
|
|
if not candidate:
|
|
continue
|
|
|
|
cursor = self.conn.execute(f"""
|
|
SELECT geonames_id, name, ascii_name, latitude, longitude,
|
|
admin1_code, admin1_name, feature_code, population
|
|
FROM cities
|
|
WHERE country_code = ?
|
|
AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?))
|
|
AND feature_code IN {fallback_features}
|
|
ORDER BY population DESC
|
|
LIMIT 1
|
|
""", (country_code, candidate, candidate))
|
|
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return self._row_to_dict(row, candidate)
|
|
|
|
return None
|
|
|
|
def _row_to_dict(self, row, matched_candidate: str) -> dict:
|
|
"""Convert database row to dictionary."""
|
|
return {
|
|
'geonames_id': row['geonames_id'],
|
|
'geonames_name': row['name'],
|
|
'ascii_name': row['ascii_name'],
|
|
'latitude': row['latitude'],
|
|
'longitude': row['longitude'],
|
|
'admin1_code': row['admin1_code'],
|
|
'admin1_name': row['admin1_name'],
|
|
'feature_code': row['feature_code'],
|
|
'population': row['population'],
|
|
'matched_candidate': matched_candidate
|
|
}
|
|
|
|
def close(self):
|
|
self.conn.close()
|
|
|
|
|
|
def extract_city(data: dict) -> Optional[str]:
|
|
"""Extract city name from custodian data."""
|
|
# Try location block
|
|
city = data.get('location', {}).get('city')
|
|
if city:
|
|
return city
|
|
|
|
# Try original_entry.locations
|
|
orig_locs = data.get('original_entry', {}).get('locations', [])
|
|
if orig_locs and len(orig_locs) > 0:
|
|
return orig_locs[0].get('city')
|
|
|
|
return None
|
|
|
|
|
|
def has_coordinates(data: dict) -> bool:
|
|
"""Check if file already has coordinates."""
|
|
loc = data.get('location', {})
|
|
return loc.get('latitude') is not None and loc.get('longitude') is not None
|
|
|
|
|
|
def geocode_file(filepath: Path, geonames: GeoNamesLookup, dry_run: bool = False, verbose: bool = False) -> dict:
|
|
"""
|
|
Geocode a single Japanese custodian file.
|
|
|
|
Returns dict with results.
|
|
"""
|
|
result = {
|
|
'success': False,
|
|
'geocoded': False,
|
|
'already_has_coords': False,
|
|
'city': None,
|
|
'candidates': [],
|
|
'matched_candidate': None,
|
|
'error': None
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.load(f)
|
|
|
|
if not isinstance(data, dict):
|
|
result['error'] = "Invalid YAML structure"
|
|
return result
|
|
|
|
# Check if already has coordinates
|
|
if has_coordinates(data):
|
|
result['success'] = True
|
|
result['already_has_coords'] = True
|
|
return result
|
|
|
|
# Extract city name
|
|
city = extract_city(data)
|
|
result['city'] = city
|
|
|
|
if not city:
|
|
result['error'] = "No city found"
|
|
result['success'] = True
|
|
return result
|
|
|
|
# Parse compound city name
|
|
candidates = parse_japanese_compound_city(city)
|
|
result['candidates'] = candidates
|
|
|
|
if verbose:
|
|
print(f" City: {city}")
|
|
print(f" Candidates: {candidates}")
|
|
|
|
if not candidates:
|
|
result['error'] = f"Could not parse city name: {city}"
|
|
result['success'] = True
|
|
return result
|
|
|
|
# Look up in GeoNames
|
|
geo_result = geonames.lookup_city(candidates, "JP")
|
|
|
|
if not geo_result:
|
|
result['error'] = f"No match for: {candidates}"
|
|
result['success'] = True
|
|
return result
|
|
|
|
result['matched_candidate'] = geo_result['matched_candidate']
|
|
|
|
# Update location block with coordinates
|
|
if 'location' not in data:
|
|
data['location'] = {}
|
|
|
|
data['location']['latitude'] = geo_result['latitude']
|
|
data['location']['longitude'] = geo_result['longitude']
|
|
data['location']['coordinate_provenance'] = {
|
|
'source_type': 'GEONAMES_JP_COMPOUND',
|
|
'source_path': 'data/reference/geonames.db',
|
|
'entity_id': geo_result['geonames_id'],
|
|
'original_query': city,
|
|
'matched_candidate': geo_result['matched_candidate'],
|
|
'original_timestamp': datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
# Add geonames reference
|
|
if not data['location'].get('geonames_id'):
|
|
data['location']['geonames_id'] = geo_result['geonames_id']
|
|
if not data['location'].get('geonames_name'):
|
|
data['location']['geonames_name'] = geo_result['geonames_name']
|
|
if not data['location'].get('feature_code'):
|
|
data['location']['feature_code'] = geo_result['feature_code']
|
|
|
|
data['location']['normalization_timestamp'] = datetime.now(timezone.utc).isoformat()
|
|
|
|
if not dry_run:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f)
|
|
|
|
result['success'] = True
|
|
result['geocoded'] = True
|
|
return result
|
|
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Geocode Japanese compound city names using GeoNames database"
|
|
)
|
|
parser.add_argument('--dry-run', action='store_true', help="Preview without writing")
|
|
parser.add_argument('--limit', type=int, default=0, help="Limit number of files to process")
|
|
parser.add_argument('--all', action='store_true', help="Process all files (no limit)")
|
|
parser.add_argument('--verbose', action='store_true', help="Show detailed output")
|
|
parser.add_argument('--file-list', type=str, help="File containing list of files to process (one per line)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.dry_run:
|
|
print("DRY RUN - No files will be modified\n")
|
|
|
|
# Initialize GeoNames lookup
|
|
if not GEONAMES_DB.exists():
|
|
print(f"Error: GeoNames database not found at {GEONAMES_DB}")
|
|
return 1
|
|
|
|
geonames = GeoNamesLookup(GEONAMES_DB)
|
|
|
|
# Get list of files to process
|
|
if args.file_list:
|
|
# Read from provided file list
|
|
with open(args.file_list, 'r') as f:
|
|
files_to_process = [CUSTODIAN_DIR / line.strip() for line in f if line.strip()]
|
|
print(f"Loaded {len(files_to_process)} files from {args.file_list}")
|
|
else:
|
|
# Scan directory (slow)
|
|
print("Scanning for JP files missing coordinates...")
|
|
all_jp_files = sorted(CUSTODIAN_DIR.glob("JP-*.yaml"))
|
|
print(f"Total JP files: {len(all_jp_files)}")
|
|
|
|
# Filter to only files missing coordinates
|
|
files_to_process = []
|
|
for fp in all_jp_files:
|
|
try:
|
|
with open(fp, 'r', encoding='utf-8') as f:
|
|
data = yaml.load(f)
|
|
if isinstance(data, dict) and not has_coordinates(data):
|
|
files_to_process.append(fp)
|
|
except Exception:
|
|
pass # Skip files that can't be read
|
|
|
|
print(f"Files to process: {len(files_to_process)}")
|
|
|
|
if args.limit and not args.all:
|
|
files_to_process = files_to_process[:args.limit]
|
|
print(f"Limited to first {args.limit} files")
|
|
|
|
# Statistics
|
|
stats = {
|
|
'total': len(files_to_process),
|
|
'geocoded': 0,
|
|
'not_found': 0,
|
|
'no_city': 0,
|
|
'errors': 0,
|
|
'by_pattern': {}
|
|
}
|
|
|
|
not_found_samples = []
|
|
|
|
for i, filepath in enumerate(files_to_process):
|
|
result = geocode_file(filepath, geonames, dry_run=args.dry_run, verbose=args.verbose)
|
|
|
|
if result['geocoded']:
|
|
stats['geocoded'] += 1
|
|
# Track which candidate matched
|
|
matched = result.get('matched_candidate', 'unknown')
|
|
stats['by_pattern'][matched] = stats['by_pattern'].get(matched, 0) + 1
|
|
if args.verbose:
|
|
print(f"✅ {filepath.name}: {result['city']} → {matched}")
|
|
elif result.get('error') and 'No match' in result['error']:
|
|
stats['not_found'] += 1
|
|
if len(not_found_samples) < 50:
|
|
not_found_samples.append((filepath.name, result['city'], result['candidates']))
|
|
elif result.get('error') and 'No city' in result['error']:
|
|
stats['no_city'] += 1
|
|
elif result.get('error'):
|
|
stats['errors'] += 1
|
|
if args.verbose:
|
|
print(f"❌ {filepath.name}: {result['error']}")
|
|
|
|
if not args.verbose and (i + 1) % 500 == 0:
|
|
print(f"Processed {i+1}/{len(files_to_process)} files... (geocoded: {stats['geocoded']})")
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 70)
|
|
print("JAPANESE COMPOUND CITY GEOCODING SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Total files processed: {stats['total']}")
|
|
print(f"Successfully geocoded: {stats['geocoded']}")
|
|
print(f"City not found: {stats['not_found']}")
|
|
print(f"No city in file: {stats['no_city']}")
|
|
print(f"Errors: {stats['errors']}")
|
|
|
|
if stats['geocoded'] > 0:
|
|
print(f"\nSuccess rate: {stats['geocoded']/stats['total']*100:.1f}%")
|
|
|
|
if not_found_samples:
|
|
print(f"\nSample cities not found ({len(not_found_samples)} shown):")
|
|
for filename, city, candidates in not_found_samples[:20]:
|
|
print(f" {filename}: {city} → tried {candidates[:3]}")
|
|
|
|
if args.dry_run:
|
|
print("\n(DRY RUN - No files were modified)")
|
|
|
|
geonames.close()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|