#!/usr/bin/env python3 """ Enrich Swiss ISIL custodian files with city data from the Swiss ISIL website. For Swiss custodian files with XXX city placeholder, this script: 1. Loads the source CH-Annotator file to get ISIL URLs by institution name 2. Fetches the institution page from isil.nb.admin.ch 3. Extracts city (Location) and address data 4. Reverse geocodes using GeoNames to get proper city code 5. Updates the GHCID with correct city code 6. Renames the file if GHCID changes Usage: python scripts/enrich_swiss_isil_cities.py [--dry-run] [--limit N] """ import argparse import hashlib import os import re import shutil import sqlite3 import time import uuid import yaml import requests from bs4 import BeautifulSoup from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple # Paths PROJECT_ROOT = Path(__file__).parent.parent CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian" GEONAMES_DB = PROJECT_ROOT / "data" / "reference" / "geonames.db" REPORTS_DIR = PROJECT_ROOT / "reports" SWISS_CH_ANNOTATOR_FILE = PROJECT_ROOT / "data" / "instances" / "switzerland_isil_ch_annotator.yaml" # GHCID namespace for UUID generation GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # Rate limiting REQUEST_DELAY = 1.0 # seconds between requests # Swiss canton codes (already ISO 3166-2) SWISS_CANTON_CODES = { 'Aargau': 'AG', 'Appenzell Ausserrhoden': 'AR', 'Appenzell Innerrhoden': 'AI', 'Basel-Landschaft': 'BL', 'Basel-Stadt': 'BS', 'Bern': 'BE', 'Fribourg': 'FR', 'Geneva': 'GE', 'Glarus': 'GL', 'Graubünden': 'GR', 'Jura': 'JU', 'Lucerne': 'LU', 'Neuchâtel': 'NE', 'Nidwalden': 'NW', 'Obwalden': 'OW', 'Schaffhausen': 'SH', 'Schwyz': 'SZ', 'Solothurn': 'SO', 'St. Gallen': 'SG', 'Thurgau': 'TG', 'Ticino': 'TI', 'Uri': 'UR', 'Valais': 'VS', 'Vaud': 'VD', 'Zug': 'ZG', 'Zürich': 'ZH', # German names 'Genf': 'GE', 'Luzern': 'LU', 'Neuenburg': 'NE', 'Wallis': 'VS', 'Waadt': 'VD', # French names 'Genève': 'GE', 'Lucerne': 'LU', 'Valais': 'VS', 'Vaud': 'VD', 'Fribourg': 'FR', # Italian names 'Ginevra': 'GE', 'Grigioni': 'GR', 'Ticino': 'TI', 'Vallese': 'VS', } def load_swiss_isil_lookup() -> Dict[str, str]: """Load Swiss CH-Annotator source file and create name -> ISIL URL lookup.""" lookup = {} if not SWISS_CH_ANNOTATOR_FILE.exists(): print(f"Warning: Swiss CH-Annotator file not found: {SWISS_CH_ANNOTATOR_FILE}") return lookup print(f"Loading Swiss CH-Annotator source file...") with open(SWISS_CH_ANNOTATOR_FILE, 'r', encoding='utf-8') as f: entries = yaml.safe_load(f) if not entries: return lookup for entry in entries: if not isinstance(entry, dict): continue name = entry.get('name', '') if not name: continue # Look for ISIL URL in digital_platforms for platform in entry.get('digital_platforms', []): if isinstance(platform, dict): url = platform.get('platform_url', '') if 'isil.nb.admin.ch' in url: lookup[name] = url break print(f" Loaded {len(lookup)} institutions with ISIL URLs") return lookup def generate_city_code(city_name: str) -> str: """Generate 3-letter city code from city name.""" if not city_name: return 'XXX' # Remove diacritics and normalize import unicodedata normalized = unicodedata.normalize('NFD', city_name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Skip articles and prepositions skip_words = {'de', 'la', 'le', 'les', 'du', 'des', 'von', 'am', 'im', 'an', 'der', 'die', 'das'} words = ascii_name.split() significant_words = [w for w in words if w.lower() not in skip_words] if not significant_words: significant_words = words if len(significant_words) == 1: # Single word: first 3 letters return significant_words[0][:3].upper() else: # Multiple words: initials return ''.join(w[0] for w in significant_words[:3]).upper() def generate_ghcid_uuid(ghcid_string: str) -> str: """Generate deterministic UUID v5 from GHCID string.""" return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string)) def generate_ghcid_uuid_sha256(ghcid_string: str) -> str: """Generate UUID v8 style from SHA-256 hash.""" hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest()[:16] hash_bytes = bytearray(hash_bytes) hash_bytes[6] = (hash_bytes[6] & 0x0F) | 0x80 # version 8 hash_bytes[8] = (hash_bytes[8] & 0x3F) | 0x80 # variant return str(uuid.UUID(bytes=bytes(hash_bytes))) def generate_ghcid_numeric(ghcid_string: str) -> int: """Generate 64-bit numeric ID from SHA-256 hash.""" hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest() return int.from_bytes(hash_bytes[:8], 'big') def fetch_isil_page(isil_url: str, session: requests.Session) -> Optional[Dict]: """Fetch and parse Swiss ISIL institution page.""" try: response = session.get(isil_url, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') result = {} # Find all dt/dd pairs in the definition lists for dt in soup.find_all('dt'): label = dt.get_text(strip=True) dd = dt.find_next_sibling('dd') if dd: value = dd.get_text(strip=True) if label == 'Location': result['city'] = value elif label == 'Zip code': result['postal_code'] = value elif label == 'Street and number': result['street_address'] = value elif label == 'Canton': result['canton'] = value result['region'] = SWISS_CANTON_CODES.get(value, value[:2].upper() if len(value) >= 2 else None) return result if result.get('city') else None except Exception as e: print(f" Error fetching {isil_url}: {e}") return None def reverse_geocode_city(city_name: str, region_code: str, country_code: str, db_path: Path) -> Optional[Dict]: """Look up city in GeoNames database to get coordinates and proper data.""" try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # Swiss admin1 codes in GeoNames swiss_admin1_map = { 'AG': '01', 'AR': '15', 'AI': '16', 'BL': '06', 'BS': '05', 'BE': '02', 'FR': '04', 'GE': '07', 'GL': '08', 'GR': '03', 'JU': '26', 'LU': '09', 'NE': '10', 'NW': '11', 'OW': '12', 'SH': '14', 'SZ': '17', 'SO': '13', 'SG': '18', 'TG': '20', 'TI': '21', 'UR': '19', 'VS': '22', 'VD': '23', 'ZG': '25', 'ZH': '24' } admin1_code = swiss_admin1_map.get(region_code) # Try exact match first query = """ SELECT geonames_id, name, ascii_name, latitude, longitude, population, feature_code, admin1_code, admin1_name FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND (name = ? OR ascii_name = ? OR LOWER(name) = LOWER(?)) """ if admin1_code: query += " AND admin1_code = ?" cursor.execute(query + " ORDER BY population DESC LIMIT 1", (country_code, city_name, city_name, city_name, admin1_code)) else: cursor.execute(query + " ORDER BY population DESC LIMIT 1", (country_code, city_name, city_name, city_name)) row = cursor.fetchone() if row: return { 'geonames_id': row[0], 'geonames_name': row[1], 'ascii_name': row[2], 'latitude': row[3], 'longitude': row[4], 'population': row[5], 'feature_code': row[6], 'admin1_code': row[7], 'admin1_name': row[8] } # Try fuzzy match cursor.execute(""" SELECT geonames_id, name, ascii_name, latitude, longitude, population, feature_code, admin1_code, admin1_name FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND (name LIKE ? OR ascii_name LIKE ?) ORDER BY population DESC LIMIT 1 """, (country_code, f"{city_name}%", f"{city_name}%")) row = cursor.fetchone() conn.close() if row: return { 'geonames_id': row[0], 'geonames_name': row[1], 'ascii_name': row[2], 'latitude': row[3], 'longitude': row[4], 'population': row[5], 'feature_code': row[6], 'admin1_code': row[7], 'admin1_name': row[8] } return None except Exception as e: print(f" GeoNames lookup error: {e}") return None def process_file(file_path: Path, session: requests.Session, isil_lookup: Dict[str, str], dry_run: bool = True) -> Dict: """Process a single custodian file.""" result = { 'status': 'unchanged', 'old_ghcid': None, 'new_ghcid': None, 'city': None, 'error': None } try: with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: result['status'] = 'error' result['error'] = 'Empty file' return result # Check if this is a Swiss file with XXX city placeholder ghcid_current = data.get('ghcid', {}).get('ghcid_current', '') if not ghcid_current.startswith('CH-') or '-XXX-' not in ghcid_current: result['status'] = 'skipped' return result result['old_ghcid'] = ghcid_current # Get institution name for lookup inst_name = data.get('original_entry', {}).get('name', '') if not inst_name: inst_name = data.get('custodian_name', {}).get('claim_value', '') # Find ISIL URL - first try lookup by name isil_url = isil_lookup.get(inst_name) # Then check identifiers in the file if not isil_url: identifiers = data.get('identifiers', []) for ident in identifiers: if isinstance(ident, dict) and ident.get('identifier_scheme') == 'ISIL': url = ident.get('identifier_url', '') if 'isil.nb.admin.ch' in url: isil_url = url break # Also check original_entry.identifiers if not isil_url: original_identifiers = data.get('original_entry', {}).get('identifiers', []) for ident in original_identifiers: if isinstance(ident, dict) and ident.get('identifier_scheme') == 'ISIL': url = ident.get('identifier_url', '') if 'isil.nb.admin.ch' in url: isil_url = url break if not isil_url: result['status'] = 'no_isil_url' result['error'] = f'No ISIL URL found for: {inst_name}' return result # Convert to proper page URL format if '?isil=' in isil_url: isil_code = isil_url.split('?isil=')[-1] # Convert to institution page URL isil_url = f"https://www.isil.nb.admin.ch/en/?isil={isil_code}" # Fetch city data from ISIL website time.sleep(REQUEST_DELAY) isil_data = fetch_isil_page(isil_url, session) if not isil_data or not isil_data.get('city'): result['status'] = 'no_city_found' return result city_name = isil_data['city'] result['city'] = city_name # Get region from GHCID or ISIL data parts = ghcid_current.split('-') region_code = parts[1] if len(parts) > 1 else isil_data.get('region', 'XX') # Generate city code city_code = generate_city_code(city_name) # Try to get GeoNames data for coordinates geonames_data = reverse_geocode_city(city_name, region_code, 'CH', GEONAMES_DB) # Build new GHCID # Format: CH-{region}-{city}-{type}-{abbrev}[-{suffix}] new_ghcid = ghcid_current.replace('-XXX-', f'-{city_code}-') result['new_ghcid'] = new_ghcid if new_ghcid == ghcid_current: result['status'] = 'unchanged' return result if dry_run: result['status'] = 'would_update' return result # Update the data now = datetime.now(timezone.utc).isoformat() # Update GHCID data['ghcid']['ghcid_current'] = new_ghcid data['ghcid']['ghcid_uuid'] = generate_ghcid_uuid(new_ghcid) data['ghcid']['ghcid_uuid_sha256'] = generate_ghcid_uuid_sha256(new_ghcid) data['ghcid']['ghcid_numeric'] = generate_ghcid_numeric(new_ghcid) # Update location_resolution location_resolution = { 'method': 'SWISS_ISIL_ENRICHMENT', 'city_name': city_name, 'city_code': city_code, 'region_code': region_code, 'country_code': 'CH', 'enrichment_date': now, 'source_url': isil_url } if geonames_data: location_resolution.update({ 'geonames_id': geonames_data['geonames_id'], 'geonames_name': geonames_data['geonames_name'], 'feature_code': geonames_data['feature_code'], 'population': geonames_data['population'], 'latitude': geonames_data['latitude'], 'longitude': geonames_data['longitude'] }) data['ghcid']['location_resolution'] = location_resolution # Add GHCID history entry history = data['ghcid'].get('ghcid_history', []) if history: # Close previous entry history[0]['valid_to'] = now history.insert(0, { 'ghcid': new_ghcid, 'ghcid_numeric': data['ghcid']['ghcid_numeric'], 'valid_from': now, 'valid_to': None, 'reason': f'City code updated from Swiss ISIL enrichment: {city_name} -> {city_code}' }) data['ghcid']['ghcid_history'] = history # Update location in original_entry if exists if 'locations' in data.get('original_entry', {}): for loc in data['original_entry']['locations']: if isinstance(loc, dict) and not loc.get('city'): loc['city'] = city_name if isil_data.get('postal_code'): loc['postal_code'] = isil_data['postal_code'] if isil_data.get('street_address'): loc['street_address'] = isil_data['street_address'] # Update identifiers for ident in data.get('identifiers', []): if isinstance(ident, dict) and ident.get('identifier_scheme') == 'GHCID': ident['identifier_value'] = new_ghcid # Write updated file with open(file_path, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) # Rename file if GHCID changed new_filename = f"{new_ghcid}.yaml" new_path = file_path.parent / new_filename if new_path != file_path and not new_path.exists(): shutil.move(file_path, new_path) result['renamed_to'] = str(new_path.name) result['status'] = 'updated' return result except Exception as e: result['status'] = 'error' result['error'] = str(e) return result def main(): parser = argparse.ArgumentParser(description='Enrich Swiss ISIL custodian files with city data') parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes') parser.add_argument('--limit', type=int, help='Limit number of files to process') parser.add_argument('--verbose', '-v', action='store_true', help='Show verbose output') args = parser.parse_args() print("=" * 60) print("SWISS ISIL CITY ENRICHMENT") print("=" * 60) if args.dry_run: print("DRY RUN MODE - No files will be modified") # Find Swiss files with XXX city placeholder swiss_xxx_files = list(CUSTODIAN_DIR.glob("CH-*-XXX-*.yaml")) if args.limit: swiss_xxx_files = swiss_xxx_files[:args.limit] print(f"Limited to {args.limit} files") print(f"Found {len(swiss_xxx_files)} Swiss files with XXX city placeholder") print() # Load Swiss ISIL lookup from CH-Annotator source file isil_lookup = load_swiss_isil_lookup() # Process files session = requests.Session() session.headers['User-Agent'] = 'GLAMDataExtractor/1.0 (heritage-data-enrichment)' stats = { 'updated': 0, 'would_update': 0, 'unchanged': 0, 'skipped': 0, 'no_isil_url': 0, 'no_city_found': 0, 'error': 0 } cities_found = {} errors = [] for i, file_path in enumerate(swiss_xxx_files, 1): if i % 100 == 0 or args.verbose: print(f"Progress: {i}/{len(swiss_xxx_files)}") result = process_file(file_path, session, isil_lookup, dry_run=args.dry_run) stats[result['status']] = stats.get(result['status'], 0) + 1 if result.get('city'): cities_found[result['city']] = cities_found.get(result['city'], 0) + 1 if result.get('error'): errors.append(f"{file_path.name}: {result['error']}") if args.verbose and result['status'] in ('updated', 'would_update'): print(f" {file_path.name}") print(f" City: {result.get('city')}") print(f" {result['old_ghcid']} -> {result['new_ghcid']}") # Print summary print() print("=" * 60) print("SUMMARY") print("=" * 60) print(f"Total files processed: {len(swiss_xxx_files)}") print() print("Results:") for status, count in sorted(stats.items()): if count > 0: print(f" {status}: {count}") if cities_found: print() print(f"Cities found: {len(cities_found)} unique") print("Top 10 cities:") for city, count in sorted(cities_found.items(), key=lambda x: -x[1])[:10]: print(f" {city}: {count}") if errors: print() print(f"Errors ({len(errors)}):") for err in errors[:10]: print(f" {err}") if len(errors) > 10: print(f" ... and {len(errors) - 10} more") # Save report REPORTS_DIR.mkdir(exist_ok=True) report_file = REPORTS_DIR / f"SWISS_ISIL_ENRICHMENT_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" with open(report_file, 'w') as f: f.write("# Swiss ISIL City Enrichment Report\n\n") f.write(f"**Date**: {datetime.now().isoformat()}\n") f.write(f"**Mode**: {'Dry Run' if args.dry_run else 'Live'}\n\n") f.write("## Summary\n\n") f.write(f"- Total files processed: {len(swiss_xxx_files)}\n") for status, count in sorted(stats.items()): if count > 0: f.write(f"- {status}: {count}\n") if cities_found: f.write(f"\n## Cities Found ({len(cities_found)} unique)\n\n") for city, count in sorted(cities_found.items(), key=lambda x: -x[1]): f.write(f"- {city}: {count}\n") print() print(f"Report saved to: {report_file}") if __name__ == '__main__': main()