#!/usr/bin/env python3 """ Fast Czech city enrichment - extracts cities from institution names. This is a simplified script that: 1. Extracts city names from Czech institution name patterns (v/ve + City) 2. Converts from Czech locative case to nominative 3. Validates against GeoNames 4. Updates custodian files with city codes Usage: python scripts/enrich_czech_cities_fast.py [--dry-run] [--limit N] """ import argparse import hashlib import os import re import shutil import sqlite3 import uuid import yaml from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional # Paths PROJECT_ROOT = Path(__file__).parent.parent CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian" GEONAMES_DB = PROJECT_ROOT / "data" / "reference" / "geonames.db" REPORTS_DIR = PROJECT_ROOT / "reports" # GHCID namespace for UUID generation GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # Czech region mapping (GeoNames admin1 to ISO 3166-2:CZ) CZECH_ADMIN1_MAP = { '52': 'JC', '78': 'JM', '81': 'KA', '82': 'VY', '51': 'KR', '53': 'LI', '84': 'MO', '85': 'OL', '86': 'PA', '54': 'PL', '10': 'PR', '55': 'ST', '56': 'US', '87': 'ZL', } # Czech locative to nominative mappings LOCATIVE_TO_NOMINATIVE = { # Major cities 'Praze': 'Praha', 'Brně': 'Brno', 'Ostravě': 'Ostrava', 'Plzni': 'Plzeň', 'Olomouci': 'Olomouc', 'Liberci': 'Liberec', 'Opavě': 'Opava', 'Hradci Králové': 'Hradec Králové', 'Českých Budějovicích': 'České Budějovice', 'Pardubicích': 'Pardubice', 'Zlíně': 'Zlín', 'Kladně': 'Kladno', 'Havlíčkově Brodě': 'Havlíčkův Brod', # Medium cities 'Prostějově': 'Prostějov', 'Domažlicích': 'Domažlice', 'Litoměřicích': 'Litoměřice', 'Klatovech': 'Klatovy', 'Kopřivnici': 'Kopřivnice', 'Pacově': 'Pacov', 'Táboře': 'Tábor', 'Písku': 'Písek', 'Trutnově': 'Trutnov', 'Chebu': 'Cheb', 'Karviné': 'Karviná', 'Havířově': 'Havířov', 'Mostě': 'Most', 'Chomutově': 'Chomutov', 'Teplicích': 'Teplice', 'Děčíně': 'Děčín', 'Jablonci nad Nisou': 'Jablonec nad Nisou', 'Mladé Boleslavi': 'Mladá Boleslav', 'Příbrami': 'Příbram', 'Kolíně': 'Kolín', 'Jihlavě': 'Jihlava', 'Třebíči': 'Třebíč', 'Znojmě': 'Znojmo', 'Břeclavi': 'Břeclav', 'Hodoníně': 'Hodonín', 'Vyškově': 'Vyškov', 'Kroměříži': 'Kroměříž', 'Vsetíně': 'Vsetín', 'Frýdku-Místku': 'Frýdek-Místek', 'Novém Jičíně': 'Nový Jičín', 'Šumperku': 'Šumperk', 'Přerově': 'Přerov', 'Prostějově': 'Prostějov', 'Uherském Hradišti': 'Uherské Hradiště', 'Svitavách': 'Svitavy', 'Chrudimi': 'Chrudim', 'Ústí nad Orlicí': 'Ústí nad Orlicí', 'Náchodě': 'Náchod', 'Rychnově nad Kněžnou': 'Rychnov nad Kněžnou', 'Semilech': 'Semily', 'Jičíně': 'Jičín', 'České Lípě': 'Česká Lípa', 'Lounech': 'Louny', 'Rakovníku': 'Rakovník', 'Berouně': 'Beroun', 'Benešově': 'Benešov', 'Kutné Hoře': 'Kutná Hora', 'Nymburce': 'Nymburk', 'Mělníku': 'Mělník', 'Sokolově': 'Sokolov', 'Rokycanech': 'Rokycany', 'Klatovech': 'Klatovy', 'Strakonicích': 'Strakonice', 'Českém Krumlově': 'Český Krumlov', 'Jindřichově Hradci': 'Jindřichův Hradec', 'Pelhřimově': 'Pelhřimov', 'Žďáru nad Sázavou': 'Žďár nad Sázavou', # Compound patterns with "nad" 'Metují': 'Metuje', # Nové Město nad Metují 'Nisou': 'Nisa', 'Labem': 'Labe', 'Sázavou': 'Sázava', 'Kněžnou': 'Kněžná', 'Orlicí': 'Orlice', } def convert_locative_to_nominative(city: str) -> str: """Convert Czech locative case to nominative.""" # Try exact match first if city in LOCATIVE_TO_NOMINATIVE: return LOCATIVE_TO_NOMINATIVE[city] # Try lowercase match for locative, nominative in LOCATIVE_TO_NOMINATIVE.items(): if city.lower() == locative.lower(): return nominative # Return as-is if no mapping return city def extract_city_from_name(name: str) -> Optional[str]: """Extract city name from Czech institution name patterns.""" if not name: return None # Pattern: "v/ve + City" (locative case) patterns = [ r'\bv\s+([A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+(?:\s+(?:nad|pod)?\s*[A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+)*)', r'\bve\s+([A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+(?:\s+(?:nad|pod)?\s*[A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+)*)', ] for pattern in patterns: match = re.search(pattern, name) if match: city = match.group(1) return convert_locative_to_nominative(city) return None def generate_city_code(city_name: str) -> str: """Generate 3-letter city code from city name.""" if not city_name: return 'XXX' import unicodedata normalized = unicodedata.normalize('NFD', city_name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') skip_words = {'nad', 'pod', 'u', 'v', 've', 'na', 'do', 'z', 'ze', 'k', 'ke'} words = ascii_name.split() significant_words = [w for w in words if w.lower() not in skip_words] if not significant_words: significant_words = words if len(significant_words) == 1: return significant_words[0][:3].upper() else: return ''.join(w[0] for w in significant_words[:3]).upper() def generate_ghcid_uuid(ghcid_string: str) -> str: return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string)) def generate_ghcid_uuid_sha256(ghcid_string: str) -> str: hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest()[:16] hash_bytes = bytearray(hash_bytes) hash_bytes[6] = (hash_bytes[6] & 0x0F) | 0x80 hash_bytes[8] = (hash_bytes[8] & 0x3F) | 0x80 return str(uuid.UUID(bytes=bytes(hash_bytes))) def generate_ghcid_numeric(ghcid_string: str) -> int: hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest() return int.from_bytes(hash_bytes[:8], 'big') def lookup_city_geonames(city_name: str, db_path: Path) -> Optional[Dict]: """Look up city in GeoNames database.""" try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # Try exact match cursor.execute(""" SELECT geonames_id, name, ascii_name, latitude, longitude, population, feature_code, admin1_code FROM cities WHERE country_code = 'CZ' AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC') AND (name = ? OR ascii_name = ? OR LOWER(name) = LOWER(?)) ORDER BY population DESC LIMIT 1 """, (city_name, city_name, city_name)) row = cursor.fetchone() if not row: # Try prefix match cursor.execute(""" SELECT geonames_id, name, ascii_name, latitude, longitude, population, feature_code, admin1_code FROM cities WHERE country_code = 'CZ' AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC') AND (name LIKE ? OR ascii_name LIKE ?) ORDER BY population DESC LIMIT 1 """, (f"{city_name}%", f"{city_name}%")) row = cursor.fetchone() conn.close() if row: admin1_code = row[7] return { 'geonames_id': row[0], 'geonames_name': row[1], 'ascii_name': row[2], 'latitude': row[3], 'longitude': row[4], 'population': row[5], 'feature_code': row[6], 'admin1_code': admin1_code, 'region_code': CZECH_ADMIN1_MAP.get(admin1_code), } return None except Exception as e: print(f" GeoNames error: {e}") return None def process_file(file_path: Path, dry_run: bool = True) -> Dict: """Process a single custodian file.""" result = {'status': 'unchanged', 'old_ghcid': None, 'new_ghcid': None, 'city': None, 'error': None} try: with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: result['status'] = 'error' result['error'] = 'Empty file' return result ghcid_current = data.get('ghcid', {}).get('ghcid_current', '') if not ghcid_current.startswith('CZ-') or '-XXX-' not in ghcid_current: result['status'] = 'skipped' return result result['old_ghcid'] = ghcid_current # Get institution name inst_name = data.get('original_entry', {}).get('name', '') if not inst_name: inst_name = data.get('custodian_name', {}).get('claim_value', '') # Try to extract city from name extracted_city = extract_city_from_name(inst_name) if not extracted_city: result['status'] = 'no_city_in_name' return result # Validate against GeoNames geonames_data = lookup_city_geonames(extracted_city, GEONAMES_DB) if not geonames_data: result['status'] = 'city_not_in_geonames' result['error'] = f'City not found in GeoNames: {extracted_city}' return result city_name = geonames_data['geonames_name'] city_code = generate_city_code(city_name) region_code = geonames_data.get('region_code') result['city'] = city_name # Build new GHCID parts = ghcid_current.split('-') if len(parts) >= 5: parts[2] = city_code if region_code: parts[1] = region_code new_ghcid = '-'.join(parts) else: new_ghcid = ghcid_current.replace('-XXX-', f'-{city_code}-') result['new_ghcid'] = new_ghcid if new_ghcid == ghcid_current: result['status'] = 'unchanged' return result if dry_run: result['status'] = 'would_update' return result # Update the data now = datetime.now(timezone.utc).isoformat() data['ghcid']['ghcid_current'] = new_ghcid data['ghcid']['ghcid_uuid'] = generate_ghcid_uuid(new_ghcid) data['ghcid']['ghcid_uuid_sha256'] = generate_ghcid_uuid_sha256(new_ghcid) data['ghcid']['ghcid_numeric'] = generate_ghcid_numeric(new_ghcid) data['ghcid']['location_resolution'] = { 'method': 'EXTRACTED_FROM_NAME', 'city_name': city_name, 'city_code': city_code, 'region_code': region_code, 'country_code': 'CZ', 'enrichment_date': now, 'geonames_id': geonames_data['geonames_id'], 'geonames_name': geonames_data['geonames_name'], 'latitude': geonames_data['latitude'], 'longitude': geonames_data['longitude'], } # Add history entry history = data['ghcid'].get('ghcid_history', []) if history and isinstance(history[0], dict): history[0]['valid_to'] = now history.insert(0, { 'ghcid': new_ghcid, 'ghcid_numeric': data['ghcid']['ghcid_numeric'], 'valid_from': now, 'reason': f'City extracted from name: {city_name} -> {city_code}' }) data['ghcid']['ghcid_history'] = history # Update identifiers for ident in data.get('identifiers', []): if isinstance(ident, dict) and ident.get('identifier_scheme') == 'GHCID': ident['identifier_value'] = new_ghcid # Write updated file with open(file_path, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) # Rename file new_filename = f"{new_ghcid}.yaml" new_path = file_path.parent / new_filename if new_path != file_path and not new_path.exists(): shutil.move(file_path, new_path) result['renamed_to'] = str(new_path.name) result['status'] = 'updated' return result except Exception as e: result['status'] = 'error' result['error'] = str(e) return result def main(): parser = argparse.ArgumentParser(description='Fast Czech city enrichment from names') parser.add_argument('--dry-run', action='store_true') parser.add_argument('--limit', type=int) parser.add_argument('--verbose', '-v', action='store_true') args = parser.parse_args() print("=" * 60) print("CZECH CITY ENRICHMENT (Fast Mode)") print("=" * 60) if args.dry_run: print("DRY RUN MODE") czech_xxx_files = list(CUSTODIAN_DIR.glob("CZ-*-XXX-*.yaml")) if args.limit: czech_xxx_files = czech_xxx_files[:args.limit] print(f"Found {len(czech_xxx_files)} Czech files with XXX placeholder") stats = {} cities_found = {} for i, file_path in enumerate(czech_xxx_files, 1): if i % 50 == 0: print(f"Progress: {i}/{len(czech_xxx_files)}") result = process_file(file_path, dry_run=args.dry_run) stats[result['status']] = stats.get(result['status'], 0) + 1 if result.get('city'): cities_found[result['city']] = cities_found.get(result['city'], 0) + 1 if args.verbose and result['status'] in ('updated', 'would_update'): print(f" {result['old_ghcid']} -> {result['new_ghcid']} ({result['city']})") print() print("=" * 60) print("SUMMARY") print("=" * 60) print(f"Total processed: {len(czech_xxx_files)}") for status, count in sorted(stats.items()): if count > 0: print(f" {status}: {count}") if cities_found: print(f"\nCities found: {len(cities_found)} unique") print("Top 10:") for city, count in sorted(cities_found.items(), key=lambda x: -x[1])[:10]: print(f" {city}: {count}") # Save report REPORTS_DIR.mkdir(exist_ok=True) report_file = REPORTS_DIR / f"CZECH_CITY_FAST_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" with open(report_file, 'w') as f: f.write(f"# Czech City Enrichment (Fast Mode)\n\n") f.write(f"**Date**: {datetime.now().isoformat()}\n") f.write(f"**Mode**: {'Dry Run' if args.dry_run else 'Live'}\n\n") f.write(f"## Results\n") for status, count in sorted(stats.items()): f.write(f"- {status}: {count}\n") print(f"\nReport: {report_file}") if __name__ == '__main__': main()