glam/scripts/fix_belgian_cities.py
kempersc e45c1a3c85 feat(scripts): add city enrichment and location resolution utilities
Enrichment scripts for country-specific city data:
- enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py
- enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py
- enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py

Location resolution utilities:
- resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames
- resolve_cities_wikidata.py - Use Wikidata P131 for city resolution
- resolve_country_codes.py - Standardize country codes
- resolve_cz_xx_regions.py - Fix Czech XX region codes
- resolve_locations_by_name.py - Name-based location lookup
- resolve_regions_from_city.py - Derive regions from city data
- update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data

CH-Annotator integration:
- create_custodian_from_ch_annotator.py - Create custodians from annotations
- add_ch_annotator_location_claims.py - Add location claims
- extract_locations_ch_annotator.py - Extract locations from annotations

Migration and fixes:
- migrate_egyptian_from_ch.py - Migrate Egyptian data
- migrate_web_archives.py - Migrate web archive data
- fix_belgian_cities.py - Fix Belgian city data
2025-12-07 14:26:59 +01:00

226 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
Fix remaining Belgian XXX files by re-scraping ISIL website with correct city extraction.
"""
import re
import sqlite3
import time
import unicodedata
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import urlopen, Request
# Belgian admin1 mapping
BELGIAN_ADMIN1_MAP = {
'Brussels Capital': 'BRU',
'Brussels': 'BRU',
'Flanders': 'VLG',
'Wallonia': 'WAL',
}
# City name aliases (Dutch → GeoNames)
CITY_ALIASES = {
'sint-lambrechts-woluwe': 'Woluwe-Saint-Lambert',
'sint-pieters-woluwe': 'Woluwe-Saint-Pierre',
'oostende': 'Ostend',
'brussel': 'Brussels',
'bruxelles': 'Brussels',
}
def scrape_isil_city(isil_code):
"""Scrape city from Belgian ISIL website."""
url = f"https://isil.kbr.be/{isil_code}"
try:
req = Request(url, headers={'User-Agent': 'Mozilla/5.0 GLAM-Scraper/1.0'})
with urlopen(req, timeout=10) as response:
html = response.read().decode('utf-8')
# Look for address pattern: "Street 123, POSTCODE City"
match = re.search(r',\s*(\d{4})\s+([A-Za-zÀ-ÿ\s\-\']+)</td>', html)
if match:
postal_code = match.group(1)
city = match.group(2).strip()
return city, postal_code
# Alternative pattern
match = re.search(r'(\d{4})\s+([A-Za-zÀ-ÿ\s\-\']+)', html)
if match:
return match.group(2).strip(), match.group(1)
except Exception as e:
print(f" Error scraping {isil_code}: {e}")
return None, None
def lookup_city(city_name, conn):
"""Look up city in GeoNames."""
if not city_name:
return None
# Check alias
normalized = city_name.lower().strip()
lookup_name = CITY_ALIASES.get(normalized, city_name)
cursor = conn.cursor()
cursor.execute("""
SELECT name, ascii_name, admin1_name, latitude, longitude, geonames_id, population, feature_code
FROM cities
WHERE country_code='BE'
AND (LOWER(name)=LOWER(?) OR LOWER(ascii_name)=LOWER(?))
AND feature_code NOT IN ('PPLX')
ORDER BY population DESC LIMIT 1
""", (lookup_name, lookup_name))
result = cursor.fetchone()
if result:
return {
'name': result[0],
'ascii_name': result[1],
'admin1_name': result[2],
'latitude': result[3],
'longitude': result[4],
'geonames_id': result[5],
'population': result[6],
}
return None
def generate_city_code(city_name):
"""Generate 3-letter city code."""
normalized = unicodedata.normalize('NFD', city_name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
clean = re.sub(r'[^a-zA-Z\s-]', '', ascii_name)
words = clean.split()
articles = {'de', 'het', 'le', 'la', 'les', 'den', 'der', 'des'}
if len(words) == 1:
return clean[:3].upper()
elif words[0].lower() in articles:
return (words[0][0] + words[1][:2]).upper()
else:
return ''.join(w[0] for w in words[:3]).upper()
def update_file(file_path, geo_data, method='ISIL_SCRAPE'):
"""Update custodian file with city data."""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
city_code = generate_city_code(geo_data['name'])
region_code = BELGIAN_ADMIN1_MAP.get(geo_data['admin1_name'], 'XX')
# Update GHCID
old_ghcid_match = re.search(r'ghcid_current:\s*([^\n]+)', content)
if not old_ghcid_match:
return False
old_ghcid = old_ghcid_match.group(1).strip()
new_ghcid = re.sub(r'^BE-XX-XXX-', f'BE-{region_code}-{city_code}-', old_ghcid)
if new_ghcid == old_ghcid:
return False
# Update content
content = content.replace(f'ghcid_current: {old_ghcid}', f'ghcid_current: {new_ghcid}')
content = content.replace(f'ghcid_original: {old_ghcid}', f'ghcid_original: {new_ghcid}')
content = content.replace(f'identifier_value: {old_ghcid}', f'identifier_value: {new_ghcid}')
content = content.replace(f"ghcid: {old_ghcid}", f"ghcid: {new_ghcid}")
# Update location_resolution
content = re.sub(r'region_code:\s*XX', f'region_code: {region_code}', content)
content = re.sub(r'city_code:\s*XXX', f'city_code: {city_code}', content)
# Add resolution details
timestamp = datetime.now(timezone.utc).isoformat()
history_entry = f"""
- ghcid: {new_ghcid}
valid_from: '{timestamp}'
reason: City resolved via {method} - {geo_data['name']} (GeoNames ID {geo_data['geonames_id']})"""
history_match = re.search(r'(ghcid_history:\s*\n)', content)
if history_match:
insert_pos = history_match.end()
content = content[:insert_pos] + history_entry + content[insert_pos:]
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
# Rename file
old_filename = file_path.name
new_filename = old_filename.replace('BE-XX-XXX-', f'BE-{region_code}-{city_code}-')
if new_filename != old_filename:
new_path = file_path.parent / new_filename
file_path.rename(new_path)
return True
def main():
import sys
dry_run = '--dry-run' in sys.argv
base_dir = Path(__file__).parent.parent
custodian_dir = base_dir / 'data' / 'custodian'
geonames_db = base_dir / 'data' / 'reference' / 'geonames.db'
print("Belgian City Fix Script")
print("=" * 50)
if dry_run:
print("DRY RUN MODE\n")
conn = sqlite3.connect(str(geonames_db))
xxx_files = list(custodian_dir.glob('BE-*-XXX-*.yaml'))
print(f"Found {len(xxx_files)} Belgian XXX files\n")
updated = 0
not_found = []
for file_path in xxx_files:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Get ISIL code
isil_match = re.search(r'identifier_value:\s*(BE-\w+)', content)
if not isil_match:
continue
isil_code = isil_match.group(1)
# Scrape city from website
city, postal = scrape_isil_city(isil_code)
if not city:
print(f"{file_path.name}: No city found for {isil_code}")
not_found.append((file_path.name, isil_code, 'scrape failed'))
time.sleep(1)
continue
# Lookup in GeoNames
geo_data = lookup_city(city, conn)
if not geo_data:
print(f"? {file_path.name}: {city} not in GeoNames")
not_found.append((file_path.name, isil_code, city))
time.sleep(1)
continue
if dry_run:
print(f"{file_path.name}: {isil_code}{city} ({geo_data['name']})")
else:
if update_file(file_path, geo_data):
print(f"✓ Updated: {file_path.name}{geo_data['name']}")
updated += 1
time.sleep(1) # Rate limit
print(f"\n{'=' * 50}")
print(f"Updated: {updated}")
print(f"Not found: {len(not_found)}")
if not_found:
print("\nNot resolved:")
for fname, isil, city in not_found:
print(f" {fname}: {isil}{city}")
conn.close()
if __name__ == '__main__':
main()