glam/scripts/enrich_czech_cities_fast.py
kempersc e45c1a3c85 feat(scripts): add city enrichment and location resolution utilities
Enrichment scripts for country-specific city data:
- enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py
- enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py
- enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py

Location resolution utilities:
- resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames
- resolve_cities_wikidata.py - Use Wikidata P131 for city resolution
- resolve_country_codes.py - Standardize country codes
- resolve_cz_xx_regions.py - Fix Czech XX region codes
- resolve_locations_by_name.py - Name-based location lookup
- resolve_regions_from_city.py - Derive regions from city data
- update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data

CH-Annotator integration:
- create_custodian_from_ch_annotator.py - Create custodians from annotations
- add_ch_annotator_location_claims.py - Add location claims
- extract_locations_ch_annotator.py - Extract locations from annotations

Migration and fixes:
- migrate_egyptian_from_ch.py - Migrate Egyptian data
- migrate_web_archives.py - Migrate web archive data
- fix_belgian_cities.py - Fix Belgian city data
2025-12-07 14:26:59 +01:00

449 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Fast Czech city enrichment - extracts cities from institution names.
This is a simplified script that:
1. Extracts city names from Czech institution name patterns (v/ve + City)
2. Converts from Czech locative case to nominative
3. Validates against GeoNames
4. Updates custodian files with city codes
Usage:
python scripts/enrich_czech_cities_fast.py [--dry-run] [--limit N]
"""
import argparse
import hashlib
import os
import re
import shutil
import sqlite3
import uuid
import yaml
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional
# Paths
PROJECT_ROOT = Path(__file__).parent.parent
CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian"
GEONAMES_DB = PROJECT_ROOT / "data" / "reference" / "geonames.db"
REPORTS_DIR = PROJECT_ROOT / "reports"
# GHCID namespace for UUID generation
GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
# Czech region mapping (GeoNames admin1 to ISO 3166-2:CZ)
CZECH_ADMIN1_MAP = {
'52': 'JC', '78': 'JM', '81': 'KA', '82': 'VY', '51': 'KR',
'53': 'LI', '84': 'MO', '85': 'OL', '86': 'PA', '54': 'PL',
'10': 'PR', '55': 'ST', '56': 'US', '87': 'ZL',
}
# Czech locative to nominative mappings
LOCATIVE_TO_NOMINATIVE = {
# Major cities
'Praze': 'Praha',
'Brně': 'Brno',
'Ostravě': 'Ostrava',
'Plzni': 'Plzeň',
'Olomouci': 'Olomouc',
'Liberci': 'Liberec',
'Opavě': 'Opava',
'Hradci Králové': 'Hradec Králové',
'Českých Budějovicích': 'České Budějovice',
'Pardubicích': 'Pardubice',
'Zlíně': 'Zlín',
'Kladně': 'Kladno',
'Havlíčkově Brodě': 'Havlíčkův Brod',
# Medium cities
'Prostějově': 'Prostějov',
'Domažlicích': 'Domažlice',
'Litoměřicích': 'Litoměřice',
'Klatovech': 'Klatovy',
'Kopřivnici': 'Kopřivnice',
'Pacově': 'Pacov',
'Táboře': 'Tábor',
'Písku': 'Písek',
'Trutnově': 'Trutnov',
'Chebu': 'Cheb',
'Karviné': 'Karviná',
'Havířově': 'Havířov',
'Mostě': 'Most',
'Chomutově': 'Chomutov',
'Teplicích': 'Teplice',
'Děčíně': 'Děčín',
'Jablonci nad Nisou': 'Jablonec nad Nisou',
'Mladé Boleslavi': 'Mladá Boleslav',
'Příbrami': 'Příbram',
'Kolíně': 'Kolín',
'Jihlavě': 'Jihlava',
'Třebíči': 'Třebíč',
'Znojmě': 'Znojmo',
'Břeclavi': 'Břeclav',
'Hodoníně': 'Hodonín',
'Vyškově': 'Vyškov',
'Kroměříži': 'Kroměříž',
'Vsetíně': 'Vsetín',
'Frýdku-Místku': 'Frýdek-Místek',
'Novém Jičíně': 'Nový Jičín',
'Šumperku': 'Šumperk',
'Přerově': 'Přerov',
'Prostějově': 'Prostějov',
'Uherském Hradišti': 'Uherské Hradiště',
'Svitavách': 'Svitavy',
'Chrudimi': 'Chrudim',
'Ústí nad Orlicí': 'Ústí nad Orlicí',
'Náchodě': 'Náchod',
'Rychnově nad Kněžnou': 'Rychnov nad Kněžnou',
'Semilech': 'Semily',
'Jičíně': 'Jičín',
'České Lípě': 'Česká Lípa',
'Lounech': 'Louny',
'Rakovníku': 'Rakovník',
'Berouně': 'Beroun',
'Benešově': 'Benešov',
'Kutné Hoře': 'Kutná Hora',
'Nymburce': 'Nymburk',
'Mělníku': 'Mělník',
'Sokolově': 'Sokolov',
'Rokycanech': 'Rokycany',
'Klatovech': 'Klatovy',
'Strakonicích': 'Strakonice',
'Českém Krumlově': 'Český Krumlov',
'Jindřichově Hradci': 'Jindřichův Hradec',
'Pelhřimově': 'Pelhřimov',
'Žďáru nad Sázavou': 'Žďár nad Sázavou',
# Compound patterns with "nad"
'Metují': 'Metuje', # Nové Město nad Metují
'Nisou': 'Nisa',
'Labem': 'Labe',
'Sázavou': 'Sázava',
'Kněžnou': 'Kněžná',
'Orlicí': 'Orlice',
}
def convert_locative_to_nominative(city: str) -> str:
"""Convert Czech locative case to nominative."""
# Try exact match first
if city in LOCATIVE_TO_NOMINATIVE:
return LOCATIVE_TO_NOMINATIVE[city]
# Try lowercase match
for locative, nominative in LOCATIVE_TO_NOMINATIVE.items():
if city.lower() == locative.lower():
return nominative
# Return as-is if no mapping
return city
def extract_city_from_name(name: str) -> Optional[str]:
"""Extract city name from Czech institution name patterns."""
if not name:
return None
# Pattern: "v/ve + City" (locative case)
patterns = [
r'\bv\s+([A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+(?:\s+(?:nad|pod)?\s*[A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+)*)',
r'\bve\s+([A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+(?:\s+(?:nad|pod)?\s*[A-ZÁČĎÉĚÍŇÓŘŠŤÚŮÝŽ][a-záčďéěíňóřšťúůýž]+)*)',
]
for pattern in patterns:
match = re.search(pattern, name)
if match:
city = match.group(1)
return convert_locative_to_nominative(city)
return None
def generate_city_code(city_name: str) -> str:
"""Generate 3-letter city code from city name."""
if not city_name:
return 'XXX'
import unicodedata
normalized = unicodedata.normalize('NFD', city_name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
skip_words = {'nad', 'pod', 'u', 'v', 've', 'na', 'do', 'z', 'ze', 'k', 'ke'}
words = ascii_name.split()
significant_words = [w for w in words if w.lower() not in skip_words]
if not significant_words:
significant_words = words
if len(significant_words) == 1:
return significant_words[0][:3].upper()
else:
return ''.join(w[0] for w in significant_words[:3]).upper()
def generate_ghcid_uuid(ghcid_string: str) -> str:
return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string))
def generate_ghcid_uuid_sha256(ghcid_string: str) -> str:
hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest()[:16]
hash_bytes = bytearray(hash_bytes)
hash_bytes[6] = (hash_bytes[6] & 0x0F) | 0x80
hash_bytes[8] = (hash_bytes[8] & 0x3F) | 0x80
return str(uuid.UUID(bytes=bytes(hash_bytes)))
def generate_ghcid_numeric(ghcid_string: str) -> int:
hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
return int.from_bytes(hash_bytes[:8], 'big')
def lookup_city_geonames(city_name: str, db_path: Path) -> Optional[Dict]:
"""Look up city in GeoNames database."""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Try exact match
cursor.execute("""
SELECT geonames_id, name, ascii_name, latitude, longitude,
population, feature_code, admin1_code
FROM cities
WHERE country_code = 'CZ'
AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC')
AND (name = ? OR ascii_name = ? OR LOWER(name) = LOWER(?))
ORDER BY population DESC
LIMIT 1
""", (city_name, city_name, city_name))
row = cursor.fetchone()
if not row:
# Try prefix match
cursor.execute("""
SELECT geonames_id, name, ascii_name, latitude, longitude,
population, feature_code, admin1_code
FROM cities
WHERE country_code = 'CZ'
AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC')
AND (name LIKE ? OR ascii_name LIKE ?)
ORDER BY population DESC
LIMIT 1
""", (f"{city_name}%", f"{city_name}%"))
row = cursor.fetchone()
conn.close()
if row:
admin1_code = row[7]
return {
'geonames_id': row[0],
'geonames_name': row[1],
'ascii_name': row[2],
'latitude': row[3],
'longitude': row[4],
'population': row[5],
'feature_code': row[6],
'admin1_code': admin1_code,
'region_code': CZECH_ADMIN1_MAP.get(admin1_code),
}
return None
except Exception as e:
print(f" GeoNames error: {e}")
return None
def process_file(file_path: Path, dry_run: bool = True) -> Dict:
"""Process a single custodian file."""
result = {'status': 'unchanged', 'old_ghcid': None, 'new_ghcid': None, 'city': None, 'error': None}
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
result['status'] = 'error'
result['error'] = 'Empty file'
return result
ghcid_current = data.get('ghcid', {}).get('ghcid_current', '')
if not ghcid_current.startswith('CZ-') or '-XXX-' not in ghcid_current:
result['status'] = 'skipped'
return result
result['old_ghcid'] = ghcid_current
# Get institution name
inst_name = data.get('original_entry', {}).get('name', '')
if not inst_name:
inst_name = data.get('custodian_name', {}).get('claim_value', '')
# Try to extract city from name
extracted_city = extract_city_from_name(inst_name)
if not extracted_city:
result['status'] = 'no_city_in_name'
return result
# Validate against GeoNames
geonames_data = lookup_city_geonames(extracted_city, GEONAMES_DB)
if not geonames_data:
result['status'] = 'city_not_in_geonames'
result['error'] = f'City not found in GeoNames: {extracted_city}'
return result
city_name = geonames_data['geonames_name']
city_code = generate_city_code(city_name)
region_code = geonames_data.get('region_code')
result['city'] = city_name
# Build new GHCID
parts = ghcid_current.split('-')
if len(parts) >= 5:
parts[2] = city_code
if region_code:
parts[1] = region_code
new_ghcid = '-'.join(parts)
else:
new_ghcid = ghcid_current.replace('-XXX-', f'-{city_code}-')
result['new_ghcid'] = new_ghcid
if new_ghcid == ghcid_current:
result['status'] = 'unchanged'
return result
if dry_run:
result['status'] = 'would_update'
return result
# Update the data
now = datetime.now(timezone.utc).isoformat()
data['ghcid']['ghcid_current'] = new_ghcid
data['ghcid']['ghcid_uuid'] = generate_ghcid_uuid(new_ghcid)
data['ghcid']['ghcid_uuid_sha256'] = generate_ghcid_uuid_sha256(new_ghcid)
data['ghcid']['ghcid_numeric'] = generate_ghcid_numeric(new_ghcid)
data['ghcid']['location_resolution'] = {
'method': 'EXTRACTED_FROM_NAME',
'city_name': city_name,
'city_code': city_code,
'region_code': region_code,
'country_code': 'CZ',
'enrichment_date': now,
'geonames_id': geonames_data['geonames_id'],
'geonames_name': geonames_data['geonames_name'],
'latitude': geonames_data['latitude'],
'longitude': geonames_data['longitude'],
}
# Add history entry
history = data['ghcid'].get('ghcid_history', [])
if history and isinstance(history[0], dict):
history[0]['valid_to'] = now
history.insert(0, {
'ghcid': new_ghcid,
'ghcid_numeric': data['ghcid']['ghcid_numeric'],
'valid_from': now,
'reason': f'City extracted from name: {city_name} -> {city_code}'
})
data['ghcid']['ghcid_history'] = history
# Update identifiers
for ident in data.get('identifiers', []):
if isinstance(ident, dict) and ident.get('identifier_scheme') == 'GHCID':
ident['identifier_value'] = new_ghcid
# Write updated file
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Rename file
new_filename = f"{new_ghcid}.yaml"
new_path = file_path.parent / new_filename
if new_path != file_path and not new_path.exists():
shutil.move(file_path, new_path)
result['renamed_to'] = str(new_path.name)
result['status'] = 'updated'
return result
except Exception as e:
result['status'] = 'error'
result['error'] = str(e)
return result
def main():
parser = argparse.ArgumentParser(description='Fast Czech city enrichment from names')
parser.add_argument('--dry-run', action='store_true')
parser.add_argument('--limit', type=int)
parser.add_argument('--verbose', '-v', action='store_true')
args = parser.parse_args()
print("=" * 60)
print("CZECH CITY ENRICHMENT (Fast Mode)")
print("=" * 60)
if args.dry_run:
print("DRY RUN MODE")
czech_xxx_files = list(CUSTODIAN_DIR.glob("CZ-*-XXX-*.yaml"))
if args.limit:
czech_xxx_files = czech_xxx_files[:args.limit]
print(f"Found {len(czech_xxx_files)} Czech files with XXX placeholder")
stats = {}
cities_found = {}
for i, file_path in enumerate(czech_xxx_files, 1):
if i % 50 == 0:
print(f"Progress: {i}/{len(czech_xxx_files)}")
result = process_file(file_path, dry_run=args.dry_run)
stats[result['status']] = stats.get(result['status'], 0) + 1
if result.get('city'):
cities_found[result['city']] = cities_found.get(result['city'], 0) + 1
if args.verbose and result['status'] in ('updated', 'would_update'):
print(f" {result['old_ghcid']} -> {result['new_ghcid']} ({result['city']})")
print()
print("=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"Total processed: {len(czech_xxx_files)}")
for status, count in sorted(stats.items()):
if count > 0:
print(f" {status}: {count}")
if cities_found:
print(f"\nCities found: {len(cities_found)} unique")
print("Top 10:")
for city, count in sorted(cities_found.items(), key=lambda x: -x[1])[:10]:
print(f" {city}: {count}")
# Save report
REPORTS_DIR.mkdir(exist_ok=True)
report_file = REPORTS_DIR / f"CZECH_CITY_FAST_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
with open(report_file, 'w') as f:
f.write(f"# Czech City Enrichment (Fast Mode)\n\n")
f.write(f"**Date**: {datetime.now().isoformat()}\n")
f.write(f"**Mode**: {'Dry Run' if args.dry_run else 'Live'}\n\n")
f.write(f"## Results\n")
for status, count in sorted(stats.items()):
f.write(f"- {status}: {count}\n")
print(f"\nReport: {report_file}")
if __name__ == '__main__':
main()