Enrichment scripts for country-specific city data: - enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py - enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py - enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py Location resolution utilities: - resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames - resolve_cities_wikidata.py - Use Wikidata P131 for city resolution - resolve_country_codes.py - Standardize country codes - resolve_cz_xx_regions.py - Fix Czech XX region codes - resolve_locations_by_name.py - Name-based location lookup - resolve_regions_from_city.py - Derive regions from city data - update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data CH-Annotator integration: - create_custodian_from_ch_annotator.py - Create custodians from annotations - add_ch_annotator_location_claims.py - Add location claims - extract_locations_ch_annotator.py - Extract locations from annotations Migration and fixes: - migrate_egyptian_from_ch.py - Migrate Egyptian data - migrate_web_archives.py - Migrate web archive data - fix_belgian_cities.py - Fix Belgian city data
226 lines
7.2 KiB
Python
226 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Fix remaining Belgian XXX files by re-scraping ISIL website with correct city extraction.
|
|
"""
|
|
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
import unicodedata
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.request import urlopen, Request
|
|
|
|
# Belgian admin1 mapping
|
|
BELGIAN_ADMIN1_MAP = {
|
|
'Brussels Capital': 'BRU',
|
|
'Brussels': 'BRU',
|
|
'Flanders': 'VLG',
|
|
'Wallonia': 'WAL',
|
|
}
|
|
|
|
# City name aliases (Dutch → GeoNames)
|
|
CITY_ALIASES = {
|
|
'sint-lambrechts-woluwe': 'Woluwe-Saint-Lambert',
|
|
'sint-pieters-woluwe': 'Woluwe-Saint-Pierre',
|
|
'oostende': 'Ostend',
|
|
'brussel': 'Brussels',
|
|
'bruxelles': 'Brussels',
|
|
}
|
|
|
|
def scrape_isil_city(isil_code):
|
|
"""Scrape city from Belgian ISIL website."""
|
|
url = f"https://isil.kbr.be/{isil_code}"
|
|
try:
|
|
req = Request(url, headers={'User-Agent': 'Mozilla/5.0 GLAM-Scraper/1.0'})
|
|
with urlopen(req, timeout=10) as response:
|
|
html = response.read().decode('utf-8')
|
|
|
|
# Look for address pattern: "Street 123, POSTCODE City"
|
|
match = re.search(r',\s*(\d{4})\s+([A-Za-zÀ-ÿ\s\-\']+)</td>', html)
|
|
if match:
|
|
postal_code = match.group(1)
|
|
city = match.group(2).strip()
|
|
return city, postal_code
|
|
|
|
# Alternative pattern
|
|
match = re.search(r'(\d{4})\s+([A-Za-zÀ-ÿ\s\-\']+)', html)
|
|
if match:
|
|
return match.group(2).strip(), match.group(1)
|
|
|
|
except Exception as e:
|
|
print(f" Error scraping {isil_code}: {e}")
|
|
|
|
return None, None
|
|
|
|
def lookup_city(city_name, conn):
|
|
"""Look up city in GeoNames."""
|
|
if not city_name:
|
|
return None
|
|
|
|
# Check alias
|
|
normalized = city_name.lower().strip()
|
|
lookup_name = CITY_ALIASES.get(normalized, city_name)
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT name, ascii_name, admin1_name, latitude, longitude, geonames_id, population, feature_code
|
|
FROM cities
|
|
WHERE country_code='BE'
|
|
AND (LOWER(name)=LOWER(?) OR LOWER(ascii_name)=LOWER(?))
|
|
AND feature_code NOT IN ('PPLX')
|
|
ORDER BY population DESC LIMIT 1
|
|
""", (lookup_name, lookup_name))
|
|
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return {
|
|
'name': result[0],
|
|
'ascii_name': result[1],
|
|
'admin1_name': result[2],
|
|
'latitude': result[3],
|
|
'longitude': result[4],
|
|
'geonames_id': result[5],
|
|
'population': result[6],
|
|
}
|
|
return None
|
|
|
|
def generate_city_code(city_name):
|
|
"""Generate 3-letter city code."""
|
|
normalized = unicodedata.normalize('NFD', city_name)
|
|
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
clean = re.sub(r'[^a-zA-Z\s-]', '', ascii_name)
|
|
words = clean.split()
|
|
|
|
articles = {'de', 'het', 'le', 'la', 'les', 'den', 'der', 'des'}
|
|
|
|
if len(words) == 1:
|
|
return clean[:3].upper()
|
|
elif words[0].lower() in articles:
|
|
return (words[0][0] + words[1][:2]).upper()
|
|
else:
|
|
return ''.join(w[0] for w in words[:3]).upper()
|
|
|
|
def update_file(file_path, geo_data, method='ISIL_SCRAPE'):
|
|
"""Update custodian file with city data."""
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
city_code = generate_city_code(geo_data['name'])
|
|
region_code = BELGIAN_ADMIN1_MAP.get(geo_data['admin1_name'], 'XX')
|
|
|
|
# Update GHCID
|
|
old_ghcid_match = re.search(r'ghcid_current:\s*([^\n]+)', content)
|
|
if not old_ghcid_match:
|
|
return False
|
|
|
|
old_ghcid = old_ghcid_match.group(1).strip()
|
|
new_ghcid = re.sub(r'^BE-XX-XXX-', f'BE-{region_code}-{city_code}-', old_ghcid)
|
|
|
|
if new_ghcid == old_ghcid:
|
|
return False
|
|
|
|
# Update content
|
|
content = content.replace(f'ghcid_current: {old_ghcid}', f'ghcid_current: {new_ghcid}')
|
|
content = content.replace(f'ghcid_original: {old_ghcid}', f'ghcid_original: {new_ghcid}')
|
|
content = content.replace(f'identifier_value: {old_ghcid}', f'identifier_value: {new_ghcid}')
|
|
content = content.replace(f"ghcid: {old_ghcid}", f"ghcid: {new_ghcid}")
|
|
|
|
# Update location_resolution
|
|
content = re.sub(r'region_code:\s*XX', f'region_code: {region_code}', content)
|
|
content = re.sub(r'city_code:\s*XXX', f'city_code: {city_code}', content)
|
|
|
|
# Add resolution details
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
history_entry = f"""
|
|
- ghcid: {new_ghcid}
|
|
valid_from: '{timestamp}'
|
|
reason: City resolved via {method} - {geo_data['name']} (GeoNames ID {geo_data['geonames_id']})"""
|
|
|
|
history_match = re.search(r'(ghcid_history:\s*\n)', content)
|
|
if history_match:
|
|
insert_pos = history_match.end()
|
|
content = content[:insert_pos] + history_entry + content[insert_pos:]
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
# Rename file
|
|
old_filename = file_path.name
|
|
new_filename = old_filename.replace('BE-XX-XXX-', f'BE-{region_code}-{city_code}-')
|
|
if new_filename != old_filename:
|
|
new_path = file_path.parent / new_filename
|
|
file_path.rename(new_path)
|
|
|
|
return True
|
|
|
|
def main():
|
|
import sys
|
|
dry_run = '--dry-run' in sys.argv
|
|
|
|
base_dir = Path(__file__).parent.parent
|
|
custodian_dir = base_dir / 'data' / 'custodian'
|
|
geonames_db = base_dir / 'data' / 'reference' / 'geonames.db'
|
|
|
|
print("Belgian City Fix Script")
|
|
print("=" * 50)
|
|
if dry_run:
|
|
print("DRY RUN MODE\n")
|
|
|
|
conn = sqlite3.connect(str(geonames_db))
|
|
|
|
xxx_files = list(custodian_dir.glob('BE-*-XXX-*.yaml'))
|
|
print(f"Found {len(xxx_files)} Belgian XXX files\n")
|
|
|
|
updated = 0
|
|
not_found = []
|
|
|
|
for file_path in xxx_files:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Get ISIL code
|
|
isil_match = re.search(r'identifier_value:\s*(BE-\w+)', content)
|
|
if not isil_match:
|
|
continue
|
|
|
|
isil_code = isil_match.group(1)
|
|
|
|
# Scrape city from website
|
|
city, postal = scrape_isil_city(isil_code)
|
|
if not city:
|
|
print(f"✗ {file_path.name}: No city found for {isil_code}")
|
|
not_found.append((file_path.name, isil_code, 'scrape failed'))
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Lookup in GeoNames
|
|
geo_data = lookup_city(city, conn)
|
|
if not geo_data:
|
|
print(f"? {file_path.name}: {city} not in GeoNames")
|
|
not_found.append((file_path.name, isil_code, city))
|
|
time.sleep(1)
|
|
continue
|
|
|
|
if dry_run:
|
|
print(f"✓ {file_path.name}: {isil_code} → {city} ({geo_data['name']})")
|
|
else:
|
|
if update_file(file_path, geo_data):
|
|
print(f"✓ Updated: {file_path.name} → {geo_data['name']}")
|
|
updated += 1
|
|
|
|
time.sleep(1) # Rate limit
|
|
|
|
print(f"\n{'=' * 50}")
|
|
print(f"Updated: {updated}")
|
|
print(f"Not found: {len(not_found)}")
|
|
|
|
if not_found:
|
|
print("\nNot resolved:")
|
|
for fname, isil, city in not_found:
|
|
print(f" {fname}: {isil} → {city}")
|
|
|
|
conn.close()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|