glam/scripts/retry_japanese_geocoding.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

446 lines
16 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Retry Failed Japanese Geocoding with Enhanced Strategies
This script specifically targets the 1,481 failed Japanese institution geocoding
attempts with improved query strategies:
1. Hierarchical fallback: Try progressively broader queries
- Full address → City + Prefecture → Prefecture only
2. Alternative formats: Try different romanization/formatting
3. Prefecture-level geocoding: For rural/small towns not in database
4. Postal code lookup: Use postal codes as additional signal
Japanese Administrative Divisions:
- 都 (To) = Metropolis (Tokyo)
- 道 (Do) = Circuit (Hokkaido)
- 府 (Fu) = Urban prefecture (Osaka, Kyoto)
- 県 (Ken) = Prefecture
- 市 (Shi) = City
- 区 (Ku) = Ward (within cities)
- 郡 (Gun) = County/District
- 町 (Cho/Machi) = Town
- 村 (Mura/Son) = Village
Address Format Issues:
- ISIL registry uses all-caps romanization
- Nominatim works better with proper case
- "GUN" + "CHO" indicates county-level town (often not in Nominatim)
- Prefecture-level fallback is more reliable for rural areas
Usage:
python scripts/retry_japanese_geocoding.py [--dry-run] [--limit N]
"""
import argparse
import sqlite3
import time
import yaml
import requests
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple
import re
class JapaneseGeocodingRetry:
"""Enhanced geocoding for failed Japanese institutions."""
def __init__(self, cache_file: Path, data_file: Path, dry_run: bool = False):
self.cache_file = cache_file
self.data_file = data_file
self.dry_run = dry_run
self.cache_conn = sqlite3.connect(cache_file)
self.session = requests.Session()
self.session.headers.update({'User-Agent': 'GLAM-Data-Extractor/1.0'})
# Statistics
self.stats = {
'total_failed': 0,
'retry_attempted': 0,
'newly_geocoded': 0,
'still_failed': 0,
'cache_hits': 0,
'api_calls': 0,
'by_strategy': {
'full_address': 0,
'city_prefecture': 0,
'prefecture_only': 0,
'postal_code': 0,
'proper_case': 0
}
}
def normalize_japanese_city(self, city: str) -> str:
"""
Normalize Japanese city names for better Nominatim matching.
Examples:
- "SAPPORO SHI KITA KU""Sapporo, Hokkaido"
- "SHIRAOI GUN SHIRAOI CHO""Shiraoi, Hokkaido"
- "KAMIKITA GUN ROKKASHO MURA""Rokkasho, Aomori"
"""
# Extract main city/town name (before SHI, GUN, KU)
parts = city.split()
# Pattern 1: "CITY SHI WARD KU" → "City"
if 'SHI' in parts and 'KU' in parts:
shi_idx = parts.index('SHI')
return ' '.join(parts[:shi_idx]).title()
# Pattern 2: "COUNTY GUN TOWN CHO" → "Town"
if 'GUN' in parts and 'CHO' in parts:
gun_idx = parts.index('GUN')
cho_idx = parts.index('CHO')
# Town name is between GUN and CHO
return ' '.join(parts[gun_idx+1:cho_idx]).title()
# Pattern 3: "COUNTY GUN VILLAGE MURA" → "Village"
if 'GUN' in parts and 'MURA' in parts:
gun_idx = parts.index('GUN')
mura_idx = parts.index('MURA')
return ' '.join(parts[gun_idx+1:mura_idx]).title()
# Pattern 4: Just city name
if 'SHI' in parts:
shi_idx = parts.index('SHI')
return ' '.join(parts[:shi_idx]).title()
# Default: return as-is in title case
return city.title()
def normalize_japanese_prefecture(self, region: str) -> str:
"""
Normalize Japanese prefecture names.
Examples:
- "HOKKAIDO""Hokkaido"
- "TOKYO TO""Tokyo"
- "AOMORI KEN""Aomori"
"""
# Remove administrative suffixes
region = region.replace(' KEN', '').replace(' TO', '').replace(' FU', '').replace(' DO', '')
return region.title()
def build_query_strategies(self, location: Dict) -> List[Tuple[str, str]]:
"""
Build multiple query strategies for a failed location.
Returns list of (query_string, strategy_name) tuples in order of preference.
"""
strategies = []
city = location.get('city', '')
region = location.get('region', '')
street = location.get('street_address', '')
postal = location.get('postal_code', '')
# Normalize names
city_normalized = self.normalize_japanese_city(city)
prefecture_normalized = self.normalize_japanese_prefecture(region)
# Strategy 1: City + Prefecture (proper case)
if city_normalized and prefecture_normalized:
query = f"{city_normalized}, {prefecture_normalized}, Japan"
strategies.append((query, 'proper_case'))
# Strategy 2: Prefecture only (most reliable for rural areas)
if prefecture_normalized:
query = f"{prefecture_normalized}, Japan"
strategies.append((query, 'prefecture_only'))
# Strategy 3: Postal code + Prefecture (if available)
if postal and prefecture_normalized:
query = f"{postal}, {prefecture_normalized}, Japan"
strategies.append((query, 'postal_code'))
# Strategy 4: Original city + prefecture (all caps, last resort)
if city and region:
query = f"{city}, {region}, Japan"
strategies.append((query, 'city_prefecture'))
return strategies
def geocode_with_nominatim(self, query: str) -> Optional[Dict]:
"""Query Nominatim API with rate limiting."""
# Check cache first
cached = self.get_from_cache(query)
if cached is not None:
self.stats['cache_hits'] += 1
return cached
if self.dry_run:
print(f" [DRY RUN] Would query: {query}")
return None
# Rate limiting: 1 request per second
time.sleep(1.0)
try:
response = self.session.get(
'https://nominatim.openstreetmap.org/search',
params={
'q': query,
'format': 'json',
'limit': 1,
'addressdetails': 1,
'extratags': 1
},
timeout=10
)
response.raise_for_status()
self.stats['api_calls'] += 1
results = response.json()
if results:
result = results[0]
geo_data = {
'latitude': float(result['lat']),
'longitude': float(result['lon']),
'display_name': result.get('display_name'),
'geonames_id': None
}
# Try to extract GeoNames ID from extratags
if 'extratags' in result and isinstance(result['extratags'], dict):
geonames_id = result['extratags'].get('geonames_id')
if geonames_id:
geo_data['geonames_id'] = int(geonames_id)
# Cache success
self.cache_result(query, geo_data)
return geo_data
else:
# Cache failure
self.cache_result(query, None)
return None
except Exception as e:
print(f" ❌ API error: {e}")
# Cache failure to avoid retrying
self.cache_result(query, None)
return None
def get_from_cache(self, query: str) -> Optional[Dict]:
"""Retrieve from cache."""
cursor = self.cache_conn.execute(
"SELECT latitude, longitude, geonames_id, display_name, success FROM geocoding_cache WHERE query = ?",
(query,)
)
row = cursor.fetchone()
if row:
if row[4]: # success = 1
return {
'latitude': row[0],
'longitude': row[1],
'geonames_id': row[2],
'display_name': row[3]
}
else:
# Cached failure (return empty dict to signal "tried and failed")
return {}
return None # Not in cache at all
def cache_result(self, query: str, result: Optional[Dict]):
"""Store result in cache."""
if result:
self.cache_conn.execute("""
INSERT OR REPLACE INTO geocoding_cache
(query, latitude, longitude, geonames_id, display_name, timestamp, success)
VALUES (?, ?, ?, ?, ?, ?, 1)
""", (
query,
result.get('latitude'),
result.get('longitude'),
result.get('geonames_id'),
result.get('display_name'),
datetime.now(timezone.utc).isoformat()
))
else:
self.cache_conn.execute("""
INSERT OR REPLACE INTO geocoding_cache
(query, latitude, longitude, geonames_id, display_name, timestamp, success)
VALUES (?, NULL, NULL, NULL, NULL, ?, 0)
""", (query, datetime.now(timezone.utc).isoformat()))
self.cache_conn.commit()
def retry_institution(self, institution: Dict) -> bool:
"""
Retry geocoding for a single institution.
Returns True if newly geocoded, False otherwise.
"""
if not institution.get('locations'):
return False
location = institution['locations'][0]
# Skip if already geocoded
if location.get('latitude') is not None:
return False
# Skip non-Japanese
if location.get('country') != 'JP':
return False
self.stats['retry_attempted'] += 1
name = institution.get('name', 'Unknown')
print(f"\n[{self.stats['retry_attempted']}/{self.stats['total_failed']}] {name}")
print(f" Original: {location.get('city')}, {location.get('region')}")
# Try multiple strategies
strategies = self.build_query_strategies(location)
for query, strategy_name in strategies:
print(f" Trying ({strategy_name}): {query}")
result = self.geocode_with_nominatim(query)
if result and result.get('latitude'):
# Success!
location['latitude'] = result['latitude']
location['longitude'] = result['longitude']
if result.get('geonames_id'):
location['geonames_id'] = result['geonames_id']
self.stats['newly_geocoded'] += 1
self.stats['by_strategy'][strategy_name] += 1
print(f" ✅ Geocoded via {strategy_name}: {result['latitude']:.4f}, {result['longitude']:.4f}")
return True
# All strategies failed
print(f" ❌ All strategies failed")
self.stats['still_failed'] += 1
return False
def run(self, limit: Optional[int] = None):
"""Run retry process on all failed Japanese institutions."""
print("=" * 80)
print("JAPANESE GEOCODING RETRY")
print("=" * 80)
print()
# Load dataset
print(f"Loading dataset from {self.data_file}...")
with open(self.data_file, 'r') as f:
institutions = yaml.safe_load(f)
print(f"Loaded {len(institutions)} institutions")
print()
# Find failed Japanese geocoding
failed_japanese = []
for inst in institutions:
if inst.get('locations'):
loc = inst['locations'][0]
if loc.get('country') == 'JP' and loc.get('latitude') is None:
failed_japanese.append(inst)
self.stats['total_failed'] = len(failed_japanese)
print(f"Found {self.stats['total_failed']} failed Japanese geocoding attempts")
print()
if self.dry_run:
print("🧪 DRY RUN MODE - No changes will be made")
print()
# Apply limit if specified
if limit:
failed_japanese = failed_japanese[:limit]
print(f"Limiting to first {limit} institutions for testing")
print()
# Retry each failed institution
start_time = time.time()
for inst in failed_japanese:
self.retry_institution(inst)
# Progress indicator every 50 institutions
if self.stats['retry_attempted'] % 50 == 0:
success_rate = (self.stats['newly_geocoded'] / self.stats['retry_attempted'] * 100)
print(f"\n📊 Progress: {self.stats['retry_attempted']}/{self.stats['total_failed']} | "
f"Newly geocoded: {self.stats['newly_geocoded']} ({success_rate:.1f}%)")
# Save updated dataset
if not self.dry_run and self.stats['newly_geocoded'] > 0:
print(f"\n💾 Saving {self.stats['newly_geocoded']} newly geocoded institutions...")
with open(self.data_file, 'w') as f:
yaml.dump(institutions, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
print(f"✅ Saved to {self.data_file}")
# Print final statistics
elapsed = time.time() - start_time
print()
print("=" * 80)
print("RETRY STATISTICS")
print("=" * 80)
print(f"Total failed institutions: {self.stats['total_failed']}")
print(f"Retry attempted: {self.stats['retry_attempted']}")
print(f"Newly geocoded: {self.stats['newly_geocoded']}")
print(f"Still failed: {self.stats['still_failed']}")
print()
print(f"Cache hits: {self.stats['cache_hits']}")
print(f"API calls: {self.stats['api_calls']}")
print()
print("Success by strategy:")
for strategy, count in self.stats['by_strategy'].items():
if count > 0:
pct = (count / self.stats['newly_geocoded'] * 100) if self.stats['newly_geocoded'] > 0 else 0
print(f" {strategy:20s} {count:4d} ({pct:.1f}%)")
print()
print(f"Total execution time: {elapsed / 60:.1f} minutes")
if self.stats['newly_geocoded'] > 0:
avg_rate = self.stats['api_calls'] / elapsed if elapsed > 0 else 0
print(f"Average API call rate: {avg_rate:.2f} requests/second")
print("=" * 80)
# Calculate new overall coverage
if not self.dry_run and self.stats['newly_geocoded'] > 0:
total_jp = sum(1 for inst in institutions if inst.get('locations') and inst['locations'][0].get('country') == 'JP')
geocoded_jp = sum(1 for inst in institutions
if inst.get('locations')
and inst['locations'][0].get('country') == 'JP'
and inst['locations'][0].get('latitude') is not None)
print()
print("UPDATED JAPANESE COVERAGE:")
print(f" Total Japanese institutions: {total_jp}")
print(f" Successfully geocoded: {geocoded_jp} ({geocoded_jp/total_jp*100:.1f}%)")
print(f" Still failed: {total_jp - geocoded_jp} ({(total_jp - geocoded_jp)/total_jp*100:.1f}%)")
print("=" * 80)
def main():
parser = argparse.ArgumentParser(
description='Retry failed Japanese geocoding with enhanced strategies'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be done without making changes'
)
parser.add_argument(
'--limit',
type=int,
help='Limit retry to first N failed institutions (for testing)'
)
args = parser.parse_args()
# Paths
base_dir = Path(__file__).parent.parent
data_file = base_dir / 'data' / 'instances' / 'global' / 'global_heritage_institutions.yaml'
cache_file = base_dir / 'data' / 'cache' / 'geocoding_cache.db'
# Run retry
retry = JapaneseGeocodingRetry(cache_file, data_file, dry_run=args.dry_run)
retry.run(limit=args.limit)
if __name__ == '__main__':
main()