#!/usr/bin/env python3 """ Enrich Czech custodian files with city data from the CH-Annotator source file. For Czech custodian files with XXX city placeholder, this script: 1. Loads the source CH-Annotator file (czech_unified_ch_annotator.yaml) 2. Matches by name, ARON UUID, or Wikidata ID to get city/coordinates 3. Falls back to Wikidata P131 lookup via SPARQL for missing data 4. Updates the GHCID with correct city code 5. Renames the file if GHCID changes Usage: python scripts/enrich_czech_cities.py [--dry-run] [--limit N] """ import argparse import hashlib import os import re import shutil import sqlite3 import time import uuid import yaml import requests from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple # Paths PROJECT_ROOT = Path(__file__).parent.parent CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian" GEONAMES_DB = PROJECT_ROOT / "data" / "reference" / "geonames.db" REPORTS_DIR = PROJECT_ROOT / "reports" CZECH_CH_ANNOTATOR_FILE = PROJECT_ROOT / "data" / "instances" / "czech_unified_ch_annotator.yaml" # GHCID namespace for UUID generation GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # Rate limiting for Wikidata REQUEST_DELAY = 1.0 # Czech region mapping (GeoNames admin1 to ISO 3166-2:CZ) CZECH_ADMIN1_MAP = { '52': 'JC', # Jihočeský (South Bohemian) '78': 'JM', # Jihomoravský (South Moravian) '81': 'KA', # Karlovarský (Karlovy Vary) '82': 'VY', # Vysočina (Vysočina) '51': 'KR', # Královéhradecký (Hradec Králové) '53': 'LI', # Liberecký (Liberec) '84': 'MO', # Moravskoslezský (Moravian-Silesian) '85': 'OL', # Olomoucký (Olomouc) '86': 'PA', # Pardubický (Pardubice) '54': 'PL', # Plzeňský (Plzeň) '10': 'PR', # Praha (Prague) '55': 'ST', # Středočeský (Central Bohemian) '56': 'US', # Ústecký (Ústí nad Labem) '87': 'ZL', # Zlínský (Zlín) } # Region name to code mapping (from source data) CZECH_REGION_NAMES = { 'Jihočeský': 'JC', 'Jihomoravský': 'JM', 'Karlovarský': 'KA', 'Vysočina': 'VY', 'Královéhradecký': 'KR', 'Liberecký': 'LI', 'Moravskoslezský': 'MO', 'Olomoucký': 'OL', 'Pardubický': 'PA', 'Plzeňský': 'PL', 'Hlavní město Praha': 'PR', 'Praha': 'PR', 'Středočeský': 'ST', 'Ústecký': 'US', 'Zlínský': 'ZL', } def extract_city_from_name(name: str) -> Optional[str]: """Try to extract city name from Czech institution name patterns.""" if not name: return None # Common patterns in Czech: "v Praze", "v Brně", "v Kladně", "ve Šlapanicích" # Also: "nad Metují", "nad Labem" import re # Pattern: "v/ve + City" (locative case) patterns = [ # "v CityName" - most common r'\bv\s+([A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+(?:\s+[A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+)*)', # "ve CityName" (before consonant clusters) r'\bve\s+([A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+(?:\s+[A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+)*)', # "nad CityName" or "pod CityName" r'\b(?:nad|pod)\s+([A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+)', ] for pattern in patterns: match = re.search(pattern, name) if match: city = match.group(1) # Convert locative case to nominative (approximation) # Common endings: -ě/-e -> -a, -ích -> -y, -ové -> -ov city = convert_locative_to_nominative(city) return city return None def convert_locative_to_nominative(city: str) -> str: """Convert Czech locative case to nominative (best effort).""" # This is approximate - Czech declension is complex # Common patterns: replacements = [ # Praha (Prague): Praze -> Praha ('Praze', 'Praha'), ('Brně', 'Brno'), ('Hradci Králové', 'Hradec Králové'), ('Havlíčkově Brodě', 'Havlíčkův Brod'), ('Liberci', 'Liberec'), ('Olomouci', 'Olomouc'), ('Plzni', 'Plzeň'), ('Ostravě', 'Ostrava'), ('Ústí nad Labem', 'Ústí nad Labem'), # no change ('Opavě', 'Opava'), # Generic endings ] for locative, nominative in replacements: if city == locative: return nominative # Generic ending transformations (approximate) if city.endswith('ě') or city.endswith('e'): # Could be -a noun (Praha -> Praze) or -o noun (Brno -> Brně) # Try replacing with -a first (more common) pass # For now, return as-is if no specific mapping found return city def normalize_czech_name(name: str) -> str: """Normalize Czech institution name for matching.""" if not name: return '' # Remove common suffixes and legal forms suffixes = [ 'o. p. s.', 'o.p.s.', 'p. o.', 'p.o.', 's. r. o.', 's.r.o.', 'příspěvková organizace', ', příspěvková organizace', ', p. o.', ] result = name for suffix in suffixes: result = result.replace(suffix, '') # Clean up extra whitespace result = ' '.join(result.split()) result = result.strip(' -,') return result def load_czech_source_data() -> Dict[str, Dict]: """Load Czech CH-Annotator source file and create lookup tables.""" by_name = {} by_aron_uuid = {} by_wikidata = {} if not CZECH_CH_ANNOTATOR_FILE.exists(): print(f"Warning: Czech CH-Annotator file not found: {CZECH_CH_ANNOTATOR_FILE}") return {'by_name': by_name, 'by_aron_uuid': by_aron_uuid, 'by_wikidata': by_wikidata} print(f"Loading Czech CH-Annotator source file...") with open(CZECH_CH_ANNOTATOR_FILE, 'r', encoding='utf-8') as f: entries = yaml.safe_load(f) if not entries: return {'by_name': by_name, 'by_aron_uuid': by_aron_uuid, 'by_wikidata': by_wikidata} for entry in entries: if not isinstance(entry, dict): continue # Extract location data locations = entry.get('locations', []) if not locations: continue loc = locations[0] if locations else {} if not loc.get('city'): continue location_data = { 'city': loc.get('city'), 'region': loc.get('region'), 'region_code': CZECH_REGION_NAMES.get(loc.get('region', ''), None), 'postal_code': loc.get('postal_code'), 'street_address': loc.get('street_address'), 'latitude': loc.get('latitude'), 'longitude': loc.get('longitude'), 'name': entry.get('name', '') } # Index by name (exact and normalized) name = entry.get('name', '') if name: by_name[name] = location_data by_name[name.lower()] = location_data # Also normalized version normalized = normalize_czech_name(name) if normalized and normalized != name: by_name[normalized] = location_data by_name[normalized.lower()] = location_data # Index by alternative names for alt_name in entry.get('alternative_names', []): if alt_name: by_name[alt_name] = location_data by_name[alt_name.lower()] = location_data normalized = normalize_czech_name(alt_name) if normalized and normalized != alt_name: by_name[normalized] = location_data by_name[normalized.lower()] = location_data # Index by ARON UUID and Wikidata for ident in entry.get('identifiers', []): if not isinstance(ident, dict): continue scheme = ident.get('identifier_scheme', '') value = ident.get('identifier_value', '') if scheme == 'ARON_UUID' and value: by_aron_uuid[value] = location_data elif scheme == 'Wikidata' and value: by_wikidata[value] = location_data print(f" Loaded {len(by_name)} by name, {len(by_aron_uuid)} by ARON UUID, {len(by_wikidata)} by Wikidata") return {'by_name': by_name, 'by_aron_uuid': by_aron_uuid, 'by_wikidata': by_wikidata} def generate_city_code(city_name: str) -> str: """Generate 3-letter city code from city name.""" if not city_name: return 'XXX' # Remove diacritics and normalize import unicodedata normalized = unicodedata.normalize('NFD', city_name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Czech articles/prepositions to skip skip_words = {'nad', 'pod', 'u', 'v', 've', 'na', 'do', 'z', 'ze', 'k', 'ke', 'o', 's', 'se'} words = ascii_name.split() significant_words = [w for w in words if w.lower() not in skip_words] if not significant_words: significant_words = words if len(significant_words) == 1: # Single word: first 3 letters return significant_words[0][:3].upper() else: # Multiple words: initials (up to 3) return ''.join(w[0] for w in significant_words[:3]).upper() def generate_ghcid_uuid(ghcid_string: str) -> str: """Generate deterministic UUID v5 from GHCID string.""" return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string)) def generate_ghcid_uuid_sha256(ghcid_string: str) -> str: """Generate UUID v8 style from SHA-256 hash.""" hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest()[:16] hash_bytes = bytearray(hash_bytes) hash_bytes[6] = (hash_bytes[6] & 0x0F) | 0x80 # version 8 hash_bytes[8] = (hash_bytes[8] & 0x3F) | 0x80 # variant return str(uuid.UUID(bytes=bytes(hash_bytes))) def generate_ghcid_numeric(ghcid_string: str) -> int: """Generate 64-bit numeric ID from SHA-256 hash.""" hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest() return int.from_bytes(hash_bytes[:8], 'big') def fetch_wikidata_location(wikidata_id: str, session: requests.Session) -> Optional[Dict]: """Fetch location via Wikidata SPARQL (P131 located in administrative entity).""" if not wikidata_id or not wikidata_id.startswith('Q'): return None query = f""" SELECT ?cityLabel ?regionLabel ?coords WHERE {{ wd:{wikidata_id} wdt:P131* ?city . ?city wdt:P31/wdt:P279* wd:Q515 . # city OPTIONAL {{ ?city wdt:P625 ?coords }} OPTIONAL {{ wd:{wikidata_id} wdt:P131+ ?region . ?region wdt:P31 wd:Q20916591 . # Czech region }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "cs,en" }} }} LIMIT 1 """ try: response = session.get( 'https://query.wikidata.org/sparql', params={'query': query, 'format': 'json'}, headers={'User-Agent': 'GLAMDataExtractor/1.0'}, timeout=30 ) response.raise_for_status() data = response.json() results = data.get('results', {}).get('bindings', []) if results: result = results[0] city = result.get('cityLabel', {}).get('value', '') region = result.get('regionLabel', {}).get('value', '') coords = result.get('coords', {}).get('value', '') lat, lon = None, None if coords and coords.startswith('Point('): # Parse Point(lon lat) format match = re.match(r'Point\(([^ ]+) ([^)]+)\)', coords) if match: lon, lat = float(match.group(1)), float(match.group(2)) return { 'city': city, 'region': region, 'region_code': CZECH_REGION_NAMES.get(region, None), 'latitude': lat, 'longitude': lon, 'source': 'wikidata_sparql' } except Exception as e: print(f" Wikidata SPARQL error: {e}") return None def reverse_geocode_city(city_name: str, country_code: str, db_path: Path) -> Optional[Dict]: """Look up city in GeoNames database to get coordinates and admin1.""" try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # Try exact match first cursor.execute(""" SELECT geonames_id, name, ascii_name, latitude, longitude, population, feature_code, admin1_code, admin1_name FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND (name = ? OR ascii_name = ? OR LOWER(name) = LOWER(?)) ORDER BY population DESC LIMIT 1 """, (country_code, city_name, city_name, city_name)) row = cursor.fetchone() if not row: # Try fuzzy match cursor.execute(""" SELECT geonames_id, name, ascii_name, latitude, longitude, population, feature_code, admin1_code, admin1_name FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND (name LIKE ? OR ascii_name LIKE ?) ORDER BY population DESC LIMIT 1 """, (country_code, f"{city_name}%", f"{city_name}%")) row = cursor.fetchone() conn.close() if row: admin1_code = row[7] region_code = CZECH_ADMIN1_MAP.get(admin1_code, None) return { 'geonames_id': row[0], 'geonames_name': row[1], 'ascii_name': row[2], 'latitude': row[3], 'longitude': row[4], 'population': row[5], 'feature_code': row[6], 'admin1_code': admin1_code, 'admin1_name': row[8], 'region_code': region_code } return None except Exception as e: print(f" GeoNames lookup error: {e}") return None def process_file(file_path: Path, lookup: Dict, session: requests.Session, dry_run: bool = True) -> Dict: """Process a single custodian file.""" result = { 'status': 'unchanged', 'old_ghcid': None, 'new_ghcid': None, 'city': None, 'error': None } try: with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: result['status'] = 'error' result['error'] = 'Empty file' return result # Check if this is a Czech file with XXX city placeholder ghcid_current = data.get('ghcid', {}).get('ghcid_current', '') if not ghcid_current.startswith('CZ-') or '-XXX-' not in ghcid_current: result['status'] = 'skipped' return result result['old_ghcid'] = ghcid_current # Get institution name for lookup inst_name = data.get('original_entry', {}).get('name', '') if not inst_name: inst_name = data.get('custodian_name', {}).get('claim_value', '') # Get identifiers for lookup aron_uuid = None wikidata_id = None for ident in data.get('identifiers', []): if isinstance(ident, dict): scheme = ident.get('identifier_scheme', '') value = ident.get('identifier_value', '') if scheme == 'ARON_UUID': aron_uuid = value elif scheme == 'Wikidata': wikidata_id = value # Also check original_entry.identifiers for ident in data.get('original_entry', {}).get('identifiers', []): if isinstance(ident, dict): scheme = ident.get('identifier_scheme', '') value = ident.get('identifier_value', '') if scheme == 'ARON_UUID' and not aron_uuid: aron_uuid = value elif scheme == 'Wikidata' and not wikidata_id: wikidata_id = value # Try to find location data from source location_data = None location_source = None # Try by name first if inst_name: location_data = lookup['by_name'].get(inst_name) if location_data: location_source = 'source_by_name' else: # Try lowercase location_data = lookup['by_name'].get(inst_name.lower()) if location_data: location_source = 'source_by_name_lower' else: # Try normalized normalized = normalize_czech_name(inst_name) if normalized: location_data = lookup['by_name'].get(normalized) if location_data: location_source = 'source_by_normalized_name' else: location_data = lookup['by_name'].get(normalized.lower()) if location_data: location_source = 'source_by_normalized_name_lower' # Try by ARON UUID if not location_data and aron_uuid: location_data = lookup['by_aron_uuid'].get(aron_uuid) if location_data: location_source = 'source_by_aron_uuid' # Try by Wikidata if not location_data and wikidata_id: location_data = lookup['by_wikidata'].get(wikidata_id) if location_data: location_source = 'source_by_wikidata' # Fallback to Wikidata SPARQL (skip for now - too slow) # if not location_data and wikidata_id: # time.sleep(REQUEST_DELAY) # location_data = fetch_wikidata_location(wikidata_id, session) # if location_data: # location_source = 'wikidata_sparql' # Fallback: extract city from institution name if not location_data or not location_data.get('city'): extracted_city = extract_city_from_name(inst_name) if extracted_city: # Validate against GeoNames geonames_data = reverse_geocode_city(extracted_city, 'CZ', GEONAMES_DB) if geonames_data: location_data = { 'city': geonames_data.get('geonames_name', extracted_city), 'region_code': geonames_data.get('region_code'), 'geonames_id': geonames_data.get('geonames_id'), 'geonames_name': geonames_data.get('geonames_name'), 'latitude': geonames_data.get('latitude'), 'longitude': geonames_data.get('longitude'), } location_source = 'extracted_from_name' if not location_data or not location_data.get('city'): result['status'] = 'no_city_found' result['error'] = f'No location data for: {inst_name}' return result city_name = location_data['city'] result['city'] = city_name # Generate city code city_code = generate_city_code(city_name) # Get region code region_code = location_data.get('region_code') if not region_code: # Try to get from GeoNames geonames_data = reverse_geocode_city(city_name, 'CZ', GEONAMES_DB) if geonames_data: region_code = geonames_data.get('region_code') location_data['geonames_id'] = geonames_data.get('geonames_id') location_data['geonames_name'] = geonames_data.get('geonames_name') if not location_data.get('latitude'): location_data['latitude'] = geonames_data.get('latitude') location_data['longitude'] = geonames_data.get('longitude') # Build new GHCID parts = ghcid_current.split('-') if len(parts) >= 5: # Replace XXX with city code, and update region if we have it parts[2] = city_code if region_code: parts[1] = region_code new_ghcid = '-'.join(parts) else: new_ghcid = ghcid_current.replace('-XXX-', f'-{city_code}-') result['new_ghcid'] = new_ghcid if new_ghcid == ghcid_current: result['status'] = 'unchanged' return result if dry_run: result['status'] = 'would_update' return result # Update the data now = datetime.now(timezone.utc).isoformat() # Update GHCID data['ghcid']['ghcid_current'] = new_ghcid data['ghcid']['ghcid_uuid'] = generate_ghcid_uuid(new_ghcid) data['ghcid']['ghcid_uuid_sha256'] = generate_ghcid_uuid_sha256(new_ghcid) data['ghcid']['ghcid_numeric'] = generate_ghcid_numeric(new_ghcid) # Update location_resolution location_resolution = { 'method': 'CZECH_CH_ANNOTATOR_ENRICHMENT', 'city_name': city_name, 'city_code': city_code, 'country_code': 'CZ', 'enrichment_date': now, 'source': location_source } if region_code: location_resolution['region_code'] = region_code location_resolution['region_name'] = location_data.get('region', f'CZ-{region_code}') if location_data.get('geonames_id'): location_resolution['geonames_id'] = location_data['geonames_id'] location_resolution['geonames_name'] = location_data['geonames_name'] if location_data.get('latitude'): location_resolution['latitude'] = location_data['latitude'] location_resolution['longitude'] = location_data['longitude'] data['ghcid']['location_resolution'] = location_resolution # Add GHCID history entry history = data['ghcid'].get('ghcid_history', []) if history and isinstance(history, list) and len(history) > 0: # Close previous entry if isinstance(history[0], dict): history[0]['valid_to'] = now history.insert(0, { 'ghcid': new_ghcid, 'ghcid_numeric': data['ghcid']['ghcid_numeric'], 'valid_from': now, 'valid_to': None, 'reason': f'City code updated from Czech CH-Annotator enrichment: {city_name} -> {city_code}' }) data['ghcid']['ghcid_history'] = history # Update location in original_entry if exists if 'original_entry' in data: if 'locations' not in data['original_entry'] or not data['original_entry']['locations']: data['original_entry']['locations'] = [{}] for loc in data['original_entry']['locations']: if isinstance(loc, dict): loc['city'] = city_name if location_data.get('postal_code'): loc['postal_code'] = location_data['postal_code'] if location_data.get('street_address'): loc['street_address'] = location_data['street_address'] if location_data.get('latitude'): loc['latitude'] = location_data['latitude'] loc['longitude'] = location_data['longitude'] if region_code: loc['region'] = location_data.get('region', f'CZ-{region_code}') # Update identifiers for ident in data.get('identifiers', []): if isinstance(ident, dict) and ident.get('identifier_scheme') == 'GHCID': ident['identifier_value'] = new_ghcid # Add provenance note notes = data.get('provenance', {}).get('notes', []) if isinstance(notes, str): notes = [notes] if not isinstance(notes, list): notes = [] notes.append(f'City resolved {now[:19]}Z: {city_name} -> {city_code} via {location_source}') data['provenance'] = data.get('provenance', {}) data['provenance']['notes'] = notes # Write updated file with open(file_path, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) # Rename file if GHCID changed new_filename = f"{new_ghcid}.yaml" new_path = file_path.parent / new_filename if new_path != file_path and not new_path.exists(): shutil.move(file_path, new_path) result['renamed_to'] = str(new_path.name) result['status'] = 'updated' return result except Exception as e: result['status'] = 'error' result['error'] = str(e) import traceback traceback.print_exc() return result def main(): parser = argparse.ArgumentParser(description='Enrich Czech custodian files with city data') parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes') parser.add_argument('--limit', type=int, help='Limit number of files to process') parser.add_argument('--verbose', '-v', action='store_true', help='Show verbose output') args = parser.parse_args() print("=" * 60) print("CZECH CITY ENRICHMENT") print("=" * 60) if args.dry_run: print("DRY RUN MODE - No files will be modified") # Find Czech files with XXX city placeholder czech_xxx_files = list(CUSTODIAN_DIR.glob("CZ-*-XXX-*.yaml")) if args.limit: czech_xxx_files = czech_xxx_files[:args.limit] print(f"Limited to {args.limit} files") print(f"Found {len(czech_xxx_files)} Czech files with XXX city placeholder") print() # Load Czech source data lookup = load_czech_source_data() # Process files session = requests.Session() session.headers['User-Agent'] = 'GLAMDataExtractor/1.0 (heritage-data-enrichment)' stats = { 'updated': 0, 'would_update': 0, 'unchanged': 0, 'skipped': 0, 'no_city_found': 0, 'error': 0 } cities_found = {} errors = [] for i, file_path in enumerate(czech_xxx_files, 1): if i % 100 == 0 or args.verbose: print(f"Progress: {i}/{len(czech_xxx_files)}") result = process_file(file_path, lookup, session, dry_run=args.dry_run) stats[result['status']] = stats.get(result['status'], 0) + 1 if result.get('city'): cities_found[result['city']] = cities_found.get(result['city'], 0) + 1 if result.get('error'): errors.append(f"{file_path.name}: {result['error']}") if args.verbose and result['status'] in ('updated', 'would_update'): print(f" {file_path.name}") print(f" City: {result.get('city')}") print(f" {result['old_ghcid']} -> {result['new_ghcid']}") # Print summary print() print("=" * 60) print("SUMMARY") print("=" * 60) print(f"Total files processed: {len(czech_xxx_files)}") print() print("Results:") for status, count in sorted(stats.items()): if count > 0: print(f" {status}: {count}") if cities_found: print() print(f"Cities found: {len(cities_found)} unique") print("Top 10 cities:") for city, count in sorted(cities_found.items(), key=lambda x: -x[1])[:10]: print(f" {city}: {count}") if errors: print() print(f"Errors ({len(errors)}):") for err in errors[:10]: print(f" {err}") if len(errors) > 10: print(f" ... and {len(errors) - 10} more") # Save report REPORTS_DIR.mkdir(exist_ok=True) report_file = REPORTS_DIR / f"CZECH_CITY_ENRICHMENT_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" with open(report_file, 'w') as f: f.write("# Czech City Enrichment Report\n\n") f.write(f"**Date**: {datetime.now().isoformat()}\n") f.write(f"**Mode**: {'Dry Run' if args.dry_run else 'Live'}\n\n") f.write("## Summary\n\n") f.write(f"- Total files processed: {len(czech_xxx_files)}\n") for status, count in sorted(stats.items()): if count > 0: f.write(f"- {status}: {count}\n") if cities_found: f.write(f"\n## Cities Found ({len(cities_found)} unique)\n\n") for city, count in sorted(cities_found.items(), key=lambda x: -x[1]): f.write(f"- {city}: {count}\n") print() print(f"Report saved to: {report_file}") if __name__ == '__main__': main()