#!/usr/bin/env python3 """ Extract and resolve locations from custodian files using CH-Annotator convention. This script follows CH-Annotator v1.7.0 TOPONYM (TOP) hypernym for: - TOP.SET: Settlements (cities, towns, villages) - TOP.REG: Regions (provinces, states) - TOP.CTY: Countries Following AGENTS.md Rules: - Rule 5: Additive only - never delete existing data - Rule 10: CH-Annotator is the entity annotation convention - GHCID settlement standardization: GeoNames is authoritative """ import os import sys import yaml import sqlite3 import re from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any, List, Tuple # GeoNames database path GEONAMES_DB = Path(__file__).parent.parent / "data/reference/geonames.db" # Feature codes for proper settlements (EXCLUDE PPLX neighborhoods) SETTLEMENT_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') # Admin1 to ISO 3166-2 mappings by country ADMIN1_TO_ISO = { 'BE': { 'BRU': 'BRU', # Brussels-Capital 'VLG': 'VLG', # Flanders 'WAL': 'WAL', # Wallonia 'VAN': 'VAN', # Antwerp 'VBR': 'VBR', # Flemish Brabant 'VLI': 'VLI', # Limburg 'VOV': 'VOV', # East Flanders 'VWV': 'VWV', # West Flanders 'WBR': 'WBR', # Walloon Brabant 'WHT': 'WHT', # Hainaut 'WLG': 'WLG', # Liège 'WLX': 'WLX', # Luxembourg 'WNA': 'WNA', # Namur }, 'AT': { '01': '1', # Burgenland '02': '2', # Kärnten '03': '3', # Niederösterreich '04': '4', # Oberösterreich '05': '5', # Salzburg '06': '6', # Steiermark '07': '7', # Tirol '08': '8', # Vorarlberg '09': '9', # Wien }, 'BG': { '42': '22', # Sofia City '41': '23', # Sofia Province '01': '01', # Blagoevgrad '02': '02', # Burgas '03': '03', # Varna '04': '04', # Veliko Tarnovo '05': '05', # Vidin '06': '06', # Vratsa '07': '07', # Gabrovo '08': '08', # Dobrich '09': '09', # Kardzhali '10': '10', # Kyustendil '11': '11', # Lovech '12': '12', # Montana '13': '13', # Pazardzhik '14': '14', # Pernik '15': '15', # Pleven '16': '16', # Plovdiv '17': '17', # Razgrad '18': '18', # Ruse '19': '19', # Silistra '20': '20', # Sliven '21': '21', # Smolyan '24': '24', # Stara Zagora '25': '25', # Targovishte '26': '26', # Haskovo '27': '27', # Shumen '28': '28', # Yambol }, 'CH': { 'AG': 'AG', 'AI': 'AI', 'AR': 'AR', 'BE': 'BE', 'BL': 'BL', 'BS': 'BS', 'FR': 'FR', 'GE': 'GE', 'GL': 'GL', 'GR': 'GR', 'JU': 'JU', 'LU': 'LU', 'NE': 'NE', 'NW': 'NW', 'OW': 'OW', 'SG': 'SG', 'SH': 'SH', 'SO': 'SO', 'SZ': 'SZ', 'TG': 'TG', 'TI': 'TI', 'UR': 'UR', 'VD': 'VD', 'VS': 'VS', 'ZG': 'ZG', 'ZH': 'ZH', }, 'CZ': { '52': '10', # Prague '78': '20', # Central Bohemia '79': '31', # South Bohemia '80': '32', # Plzeň '81': '41', # Karlovy Vary '82': '42', # Ústí nad Labem '83': '51', # Liberec '84': '52', # Hradec Králové '85': '53', # Pardubice '86': '63', # Vysočina '78': '64', # South Moravia '87': '71', # Olomouc '88': '72', # Zlín '89': '80', # Moravia-Silesia }, } def connect_geonames() -> Optional[sqlite3.Connection]: """Connect to GeoNames database.""" if not GEONAMES_DB.exists(): print(f"Error: GeoNames database not found at {GEONAMES_DB}") return None return sqlite3.connect(str(GEONAMES_DB)) def extract_toponym_from_name(name: str, country: str) -> Optional[str]: """ Extract TOPONYM (TOP.SET) from institution name using CH-Annotator patterns. CH-Annotator TOP.SET pattern: - City/town names embedded in institution names - Often after prepositions: "in", "van", "de", "of", etc. - Or as suffix/prefix in compound names Returns extracted city name or None. """ if not name: return None # Normalize name_lower = name.lower() # Pattern 1: Explicit city indicators # "bibliotheek [CityName]", "museum [CityName]", etc. city_patterns = [ r'bibliotheek\s+(\w+)', r'bibliothek\s+(\w+)', r'museum\s+(\w+)', r'archief\s+(\w+)', r'archiv\s+(\w+)', r'archive\s+(\w+)', r'openbare\s+bibliotheek\s+(\w+)', r'gemeentelijke.*bibliotheek\s+(\w+)', r'stedelijke.*bibliotheek\s+(\w+)', r'stadsarchief\s+(\w+)', ] for pattern in city_patterns: match = re.search(pattern, name_lower) if match: city = match.group(1) # Filter out generic words if city not in ('van', 'de', 'het', 'der', 'voor', 'en', 'vzw', 'bv', 'nv'): return city.title() # Pattern 2: Parenthetical city names # "Institution Name (City)" or "City Name (Alias)" paren_match = re.search(r'\(([^)]+)\)', name) if paren_match: paren_content = paren_match.group(1).strip() # Check for "(Bib CityName)" pattern - extract last word bib_match = re.match(r'(?:Bib|OB|POB|Bibliotheek)\s+(\w+)', paren_content, re.IGNORECASE) if bib_match: return bib_match.group(1).title() # Check if it looks like a city name (capitalized, not too long) words = paren_content.split() if len(words) <= 3 and words[0][0].isupper(): return paren_content # Pattern 3: Hyphenated city names (Belgian pattern) # "Brussel-Stad", "Sint-Niklaas" hyphen_match = re.search(r'(\w+-\w+)', name) if hyphen_match: compound = hyphen_match.group(1) # Check against known Belgian compound cities known_compounds = ['sint-niklaas', 'sint-truiden', 'brussel-stad', 'la-louvière', 'molenbeek-saint-jean'] if compound.lower() in known_compounds: return compound.title() # Pattern 4: Last word as city (common pattern) # "Historisch Museum [CityName]" words = name.split() if len(words) >= 2: last_word = words[-1].strip('()') # Check if last word is capitalized and not a common suffix if (last_word[0].isupper() and last_word.lower() not in ('vzw', 'bv', 'nv', 'asbl', 'bibliotheek', 'museum', 'archief', 'archiv')): return last_word return None def lookup_city_in_geonames(city_name: str, country: str, conn: sqlite3.Connection) -> Optional[Dict]: """ Look up a city name in GeoNames database. Returns dict with: - geonames_id - name (ascii_name) - admin1_code - region_code (ISO 3166-2) - latitude, longitude """ cursor = conn.cursor() # Try exact match first - include admin2_code for countries that use it (Belgium) cursor.execute(""" SELECT geonames_id, name, ascii_name, admin1_code, admin2_code, latitude, longitude, feature_code, population FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND (LOWER(name) = LOWER(?) OR LOWER(ascii_name) = LOWER(?)) ORDER BY population DESC LIMIT 1 """, (country, city_name, city_name)) row = cursor.fetchone() if not row: # Try partial match - but require minimum 4 chars to avoid false positives if len(city_name) >= 4: cursor.execute(""" SELECT geonames_id, name, ascii_name, admin1_code, admin2_code, latitude, longitude, feature_code, population FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND (LOWER(name) LIKE LOWER(?) OR LOWER(ascii_name) LIKE LOWER(?)) ORDER BY population DESC LIMIT 1 """, (country, f"{city_name}%", f"{city_name}%")) row = cursor.fetchone() if not row: return None geonames_id, name, ascii_name, admin1_code, admin2_code, lat, lon, feature_code, population = row # Convert to ISO region code # Belgium uses admin2 for provinces, most countries use admin1 region_code = 'XX' if country == 'BE': # Belgium: use admin2 (province) instead of admin1 (region) if admin2_code: region_code = admin2_code elif admin1_code: region_code = admin1_code elif country in ADMIN1_TO_ISO and admin1_code in ADMIN1_TO_ISO[country]: region_code = ADMIN1_TO_ISO[country][admin1_code] elif admin1_code: region_code = admin1_code return { 'geonames_id': geonames_id, 'geonames_name': ascii_name or name, 'admin1_code': admin1_code, 'region_code': region_code, 'latitude': lat, 'longitude': lon, 'feature_code': feature_code, 'population': population, } def generate_city_code(city_name: str) -> str: """Generate 3-letter city code from name.""" words = city_name.split() if len(words) == 1: return city_name[:3].upper() else: # Use initials for multi-word names initials = ''.join(w[0] for w in words if w)[:3] return initials.upper() def update_file_with_location(filepath: Path, location_data: Dict, city_name: str, dry_run: bool = True) -> Tuple[bool, Optional[Path]]: """Update custodian file with resolved location following CH-Annotator convention.""" try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: print(f" Error reading {filepath}: {e}") return False, None if 'ghcid' not in data: return False, None ghcid = data['ghcid'] if 'location_resolution' not in ghcid: ghcid['location_resolution'] = {} loc_res = ghcid['location_resolution'] country_code = loc_res.get('country_code', '') old_region = loc_res.get('region_code', 'XX') old_city = loc_res.get('city_code', 'XXX') if not country_code: return False, None # Only update if we have XX or XXX to resolve if old_region != 'XX' and old_city != 'XXX': return False, None region_code = location_data['region_code'] city_code = generate_city_code(location_data['geonames_name']) # Update location resolution with CH-Annotator provenance if old_region == 'XX': loc_res['region_code'] = region_code if old_city == 'XXX': loc_res['city_code'] = city_code loc_res['city_name'] = location_data['geonames_name'] loc_res['geonames_id'] = location_data['geonames_id'] loc_res['feature_code'] = location_data['feature_code'] loc_res['method'] = 'CH_ANNOTATOR_TOP_SET' loc_res['resolution_timestamp'] = datetime.now(timezone.utc).isoformat() loc_res['extracted_toponym'] = city_name if location_data.get('latitude'): loc_res['latitude'] = location_data['latitude'] loc_res['longitude'] = location_data['longitude'] # Update GHCID string old_ghcid = ghcid.get('ghcid_current', '') new_ghcid = old_ghcid if old_region == 'XX': new_ghcid = new_ghcid.replace(f'{country_code}-XX-', f'{country_code}-{region_code}-') if old_city == 'XXX': new_ghcid = new_ghcid.replace(f'-XXX-', f'-{city_code}-') if new_ghcid != old_ghcid: ghcid['ghcid_current'] = new_ghcid if 'ghcid_history' not in ghcid: ghcid['ghcid_history'] = [] ghcid['ghcid_history'].append({ 'ghcid': new_ghcid, 'valid_from': datetime.now(timezone.utc).isoformat(), 'reason': f"Location resolved via CH-Annotator TOP.SET extraction: {city_name} -> {location_data['geonames_name']} (GeoNames:{location_data['geonames_id']})" }) # Add CH-Annotator entity claim for location if 'ch_annotator' not in data: data['ch_annotator'] = {} if 'entity_claims' not in data['ch_annotator']: data['ch_annotator']['entity_claims'] = [] # Add TOP.SET claim data['ch_annotator']['entity_claims'].append({ 'claim_type': 'location_settlement', 'claim_value': location_data['geonames_name'], 'property_uri': 'schema:location', 'hypernym_code': 'TOP.SET', 'hypernym_label': 'SETTLEMENT', 'provenance': { 'namespace': 'geonames', 'path': f"/geonames/{location_data['geonames_id']}", 'timestamp': datetime.now(timezone.utc).isoformat(), 'agent': 'extract_locations_ch_annotator.py', 'context_convention': 'ch_annotator-v1_7_0', }, 'confidence': 0.85, 'extraction_source': { 'field': 'institution_name', 'extracted_text': city_name, 'method': 'pattern_matching', }, }) # Add provenance note if 'provenance' not in data: data['provenance'] = {} if 'notes' not in data['provenance']: data['provenance']['notes'] = [] elif isinstance(data['provenance']['notes'], str): data['provenance']['notes'] = [data['provenance']['notes']] data['provenance']['notes'].append( f"Location resolved {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}: " f"CH-Annotator TOP.SET extraction '{city_name}' -> {location_data['geonames_name']} " f"(GeoNames:{location_data['geonames_id']}, Region:{region_code})" ) # Determine new filename new_filename = filepath.name if old_region == 'XX': new_filename = new_filename.replace(f'{country_code}-XX-', f'{country_code}-{region_code}-') if old_city == 'XXX': new_filename = new_filename.replace(f'-XXX-', f'-{city_code}-') new_filepath = filepath.parent / new_filename if not dry_run: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) if new_filepath != filepath and not new_filepath.exists(): filepath.rename(new_filepath) return True, new_filepath if new_filepath != filepath else None def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser( description='Extract locations using CH-Annotator TOPONYM convention' ) parser.add_argument('--apply', action='store_true', help='Actually apply the fixes (default: dry run)') parser.add_argument('--path', type=str, default='data/custodian', help='Path to custodian files directory') parser.add_argument('--limit', type=int, default=100, help='Limit number of files to process') parser.add_argument('--country', type=str, help='Only process files for a specific country') args = parser.parse_args() custodian_dir = Path(args.path) if not custodian_dir.exists(): print(f"Error: Directory {custodian_dir} does not exist") sys.exit(1) # Connect to GeoNames conn = connect_geonames() if not conn: sys.exit(1) dry_run = not args.apply print("=" * 70) print("CH-ANNOTATOR TOPONYM (TOP.SET) LOCATION EXTRACTION") print("=" * 70) print(f"Mode: {'DRY RUN' if dry_run else 'APPLYING CHANGES'}") print(f"Convention: ch_annotator-v1_7_0") print() # Find files with XX region codes or XXX city codes files_to_process = [] for filepath in custodian_dir.glob('*-XX-*.yaml'): files_to_process.append(filepath) for filepath in custodian_dir.glob('*-XXX-*.yaml'): if filepath not in files_to_process: files_to_process.append(filepath) print(f"Found {len(files_to_process)} files with XX/XXX codes") # Process files file_data = [] files_processed = 0 for filepath in files_to_process: # Apply limit AFTER country filtering if len(file_data) >= args.limit: break try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) # Get country code country = None if 'ghcid' in data and 'location_resolution' in data['ghcid']: country = data['ghcid']['location_resolution'].get('country_code') if not country: continue if args.country and country != args.country: continue # Get institution name name = None if 'custodian_name' in data: name = data['custodian_name'].get('claim_value') if not name and 'original_entry' in data: name = data['original_entry'].get('name') if not name: continue file_data.append({ 'filepath': filepath, 'data': data, 'country': country, 'name': name, }) except Exception as e: print(f"Error loading {filepath}: {e}") print(f"Processing {len(file_data)} files") print() # Process each file resolved = 0 renamed = 0 no_toponym = 0 no_geonames = 0 for f in file_data: filepath = f['filepath'] name = f['name'] country = f['country'] # Extract toponym using CH-Annotator patterns toponym = extract_toponym_from_name(name, country) if not toponym: no_toponym += 1 continue # Look up in GeoNames location = lookup_city_in_geonames(toponym, country, conn) if not location: no_geonames += 1 print(f" No GeoNames match for '{toponym}' in {country}") continue print(f"Processing {filepath.name}...") print(f" Name: {name}") print(f" TOP.SET: {toponym} -> {location['geonames_name']} (Region: {location['region_code']})") # Update file success, new_path = update_file_with_location(filepath, location, toponym, dry_run=dry_run) if success: resolved += 1 if new_path: renamed += 1 print(f" Renamed: {filepath.name} -> {new_path.name}") conn.close() print() print("=" * 70) print("SUMMARY") print("=" * 70) print(f"Files processed: {len(file_data)}") print(f"Resolved: {resolved}") print(f"Renamed: {renamed}") print(f"No toponym extracted: {no_toponym}") print(f"No GeoNames match: {no_geonames}") if dry_run: print() print("This was a DRY RUN. Use --apply to make changes.") if __name__ == '__main__': main()