glam/scripts/enrich_nde_genealogiewerkbalk.py
2025-12-03 17:38:46 +01:00

589 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Enrich NDE entries with Genealogiewerkbalk municipality archive data.
This script enriches NDE entries with data from the Genealogiewerkbalk.nl
municipality archives registry, which maps Dutch municipalities to their
responsible archives with ISIL codes, websites, and provincial archive info.
Data source:
https://docs.google.com/spreadsheets/d/1rS_Z5L6L2vvfGLS6eHI8wfyiwB-KUfHEr7W1VNY3rpg/export?format=csv
Matching strategy:
1. Match by municipality name from original_entry.plaatsnaam_bezoekadres
2. Match by Google Maps administrative_area_level_2 (gemeente)
3. Match by Google Maps locality that maps to a municipality
Usage:
python scripts/enrich_nde_genealogiewerkbalk.py
python scripts/enrich_nde_genealogiewerkbalk.py --dry-run
python scripts/enrich_nde_genealogiewerkbalk.py --entry 0016
python scripts/enrich_nde_genealogiewerkbalk.py --refresh-csv
Environment:
No special environment variables required.
"""
import os
import sys
import csv
import yaml
import argparse
import logging
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any, Tuple
from difflib import SequenceMatcher
import urllib.request
import unicodedata
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Paths
PROJECT_ROOT = Path(__file__).parent.parent
ENTRIES_DIR = PROJECT_ROOT / "data" / "nde" / "enriched" / "entries"
SOURCES_DIR = PROJECT_ROOT / "data" / "nde" / "enriched" / "sources"
CSV_FILE = SOURCES_DIR / "genealogiewerkbalk_municipality_archives.csv"
CSV_URL = "https://docs.google.com/spreadsheets/d/1rS_Z5L6L2vvfGLS6eHI8wfyiwB-KUfHEr7W1VNY3rpg/export?format=csv"
# Known municipality name aliases (normalized form -> canonical normalized form)
# The canonical form must match what's in the Genealogiewerkbalk CSV after normalization
MUNICIPALITY_ALIASES = {
# Den Haag / 's-Gravenhage
"den haag": "gravenhage",
"the hague": "gravenhage",
# Scheveningen is part of Den Haag
"scheveningen": "gravenhage",
"scheveingen": "gravenhage",
"loosduinen": "gravenhage",
# Voorburg is now part of Leidschendam-Voorburg
"voorburg": "leidschendam voorburg",
# Villages that are parts of municipalities - Bergen (NH)
"egmond aan zee": "bergen (nh.)",
"egmond binnen": "bergen (nh.)",
"egmond aan den hoef": "bergen (nh.)",
"bergen": "bergen (nh.)", # Default Bergen to NH (most heritage institutions are there)
# Schagen area
"callantsoog": "schagen",
"sint maarten": "schagen",
# Frisian name variants
"haren": "groningen", # Haren merged with Groningen in 2019
"zuidwolde": "de wolden",
"de knipe": "heerenveen",
# Other common variants
"krommenie": "zaanstad",
"spaarndam": "haarlem",
"midwoud": "medemblik",
"hoogblokland": "vijfheerenlanden",
"hoogblokland hoornaar noordeloos": "vijfheerenlanden",
"ouddorp": "goeree overflakkee",
# Noord-Brabant villages
"berlicum": "sint michielsgestel",
"berlicum middelrode": "sint michielsgestel",
"oeffelt": "berg en dal",
# Limburg villages
"helden": "peel en maas",
# Zeeland villages
"wissekerke": "noord beveland",
}
def normalize_municipality_name(name: str) -> str:
"""Normalize municipality name for matching.
Handles:
- Case insensitivity
- Dutch articles and prefixes
- Common abbreviations
- Unicode normalization
- Apostrophes and special characters
- Known aliases (Den Haag -> 's-Gravenhage, etc.)
"""
if not name:
return ""
# Unicode normalize
name = unicodedata.normalize('NFKC', name)
# Lowercase
name = name.lower().strip()
# Handle 's- prefix (e.g., 's-Gravenhage -> gravenhage)
if name.startswith("'s-"):
name = name[3:]
elif name.startswith("'s "):
name = name[3:]
# Remove common prefixes that might vary
prefixes_to_remove = ['gemeente ', 'gem. ', 'gem ']
for prefix in prefixes_to_remove:
if name.startswith(prefix):
name = name[len(prefix):]
# Normalize hyphens and spaces
name = name.replace('-', ' ').replace(' ', ' ')
# Remove trailing periods
name = name.rstrip('.')
name = name.strip()
# Apply known aliases to map villages/variants to their municipality
if name in MUNICIPALITY_ALIASES:
name = MUNICIPALITY_ALIASES[name]
return name
def load_genealogiewerkbalk_data(csv_path: Path) -> Dict[str, Dict[str, Any]]:
"""Load the Genealogiewerkbalk CSV into a lookup dictionary.
Returns:
Dict mapping normalized municipality names to their data.
"""
municipalities = {}
if not csv_path.exists():
logger.warning(f"CSV file not found: {csv_path}")
return municipalities
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
gemeente = row.get('gemeentenaam', '').strip()
if not gemeente:
continue
# Store with normalized key
norm_key = normalize_municipality_name(gemeente)
# Parse ISIL - handle "geen*" codes as no ISIL
isil = row.get('isil', '').strip()
has_valid_isil = isil and not isil.startswith('geen')
municipalities[norm_key] = {
'gemeentenaam': gemeente,
'gemeentecode': row.get('gemeentecode', '').strip(),
'archief_gemeente': row.get('archief_gemeente', '').strip(),
'isil': isil if has_valid_isil else None,
'isil_raw': isil, # Keep original for reference
'extra_info': row.get('extra_info', '').strip(),
'website_gemeentearchief': row.get('website_gemeentearchief', '').strip(),
'provincienaam': row.get('provincienaam', '').strip(),
'provinciecode': row.get('provinciecode', '').strip(),
'archief_provincie': row.get('archief_provincie', '').strip(),
'website_provinciaal_archief': row.get('website_provinciaal_archief', '').strip(),
}
logger.info(f"Loaded {len(municipalities)} municipalities from Genealogiewerkbalk CSV")
return municipalities
def find_municipality_match(
entry: Dict[str, Any],
municipalities: Dict[str, Dict[str, Any]]
) -> Tuple[Optional[Dict[str, Any]], str, float]:
"""Find matching municipality for an entry.
Args:
entry: The NDE entry data
municipalities: Lookup dictionary of municipality data
Returns:
Tuple of (matched_data, match_method, confidence_score)
"""
# Strategy 1: Match by plaatsnaam_bezoekadres
plaatsnaam = entry.get('original_entry', {}).get('plaatsnaam_bezoekadres', '')
if plaatsnaam:
norm_plaats = normalize_municipality_name(plaatsnaam)
if norm_plaats in municipalities:
return municipalities[norm_plaats], 'plaatsnaam_bezoekadres', 1.0
# Try fuzzy match on plaatsnaam
best_match, score = fuzzy_match_municipality(norm_plaats, municipalities)
if best_match and score >= 0.85:
return municipalities[best_match], 'plaatsnaam_fuzzy', score
# Strategy 2: Match by Google Maps administrative_area_level_2
google_data = entry.get('google_maps_enrichment', {})
address_components = google_data.get('address_components', [])
for component in address_components:
types = component.get('types', [])
if 'administrative_area_level_2' in types:
gemeente = component.get('long_name', '')
norm_gemeente = normalize_municipality_name(gemeente)
if norm_gemeente in municipalities:
return municipalities[norm_gemeente], 'google_maps_admin2', 0.95
# Try fuzzy
best_match, score = fuzzy_match_municipality(norm_gemeente, municipalities)
if best_match and score >= 0.85:
return municipalities[best_match], 'google_maps_admin2_fuzzy', score * 0.95
# Strategy 3: Match by Google Maps locality
for component in address_components:
types = component.get('types', [])
if 'locality' in types:
locality = component.get('long_name', '')
norm_locality = normalize_municipality_name(locality)
# Some localities are also municipalities
if norm_locality in municipalities:
return municipalities[norm_locality], 'google_maps_locality', 0.85
# Try fuzzy
best_match, score = fuzzy_match_municipality(norm_locality, municipalities)
if best_match and score >= 0.90: # Higher threshold for locality
return municipalities[best_match], 'google_maps_locality_fuzzy', score * 0.85
# Strategy 4: Match by web_enrichment.claims municipality
web_enrichment = entry.get('web_enrichment', {})
claims = web_enrichment.get('claims', [])
for claim in claims:
if claim.get('claim_type') == 'municipality':
gemeente = claim.get('claim_value', '')
if gemeente:
norm_gemeente = normalize_municipality_name(gemeente)
if norm_gemeente in municipalities:
return municipalities[norm_gemeente], 'web_claim_municipality', 0.90
# Try fuzzy
best_match, score = fuzzy_match_municipality(norm_gemeente, municipalities)
if best_match and score >= 0.85:
return municipalities[best_match], 'web_claim_municipality_fuzzy', score * 0.90
# Strategy 5: Match by location.municipality
location = entry.get('location', {})
loc_municipality = location.get('municipality', '')
if loc_municipality:
norm_gemeente = normalize_municipality_name(loc_municipality)
if norm_gemeente in municipalities:
return municipalities[norm_gemeente], 'location_municipality', 0.90
# Try fuzzy
best_match, score = fuzzy_match_municipality(norm_gemeente, municipalities)
if best_match and score >= 0.85:
return municipalities[best_match], 'location_municipality_fuzzy', score * 0.90
# Strategy 6: Match by manual_location_override.municipality
manual_override = entry.get('manual_location_override', {})
override_municipality = manual_override.get('municipality', '')
if override_municipality:
norm_gemeente = normalize_municipality_name(override_municipality)
if norm_gemeente in municipalities:
return municipalities[norm_gemeente], 'manual_override_municipality', 0.95
# Try fuzzy
best_match, score = fuzzy_match_municipality(norm_gemeente, municipalities)
if best_match and score >= 0.85:
return municipalities[best_match], 'manual_override_municipality_fuzzy', score * 0.95
# Strategy 7: Match by zcbs_enrichment.municipality
zcbs = entry.get('zcbs_enrichment', {})
zcbs_municipality = zcbs.get('municipality', '')
if zcbs_municipality:
norm_gemeente = normalize_municipality_name(zcbs_municipality)
if norm_gemeente in municipalities:
return municipalities[norm_gemeente], 'zcbs_municipality', 0.90
# Try fuzzy
best_match, score = fuzzy_match_municipality(norm_gemeente, municipalities)
if best_match and score >= 0.85:
return municipalities[best_match], 'zcbs_municipality_fuzzy', score * 0.90
return None, 'no_match', 0.0
def fuzzy_match_municipality(
search_term: str,
municipalities: Dict[str, Dict[str, Any]],
threshold: float = 0.80
) -> Tuple[Optional[str], float]:
"""Find best fuzzy match for a municipality name.
Returns:
Tuple of (matched_key, similarity_score) or (None, 0.0)
"""
if not search_term:
return None, 0.0
best_match = None
best_score = 0.0
for key in municipalities:
score = SequenceMatcher(None, search_term, key).ratio()
if score > best_score and score >= threshold:
best_score = score
best_match = key
return best_match, best_score
def create_enrichment_section(
match_data: Dict[str, Any],
match_method: str,
confidence: float
) -> Dict[str, Any]:
"""Create the genealogiewerkbalk_enrichment section for an entry."""
enrichment = {
'source': 'Genealogiewerkbalk.nl Municipality Archives Registry',
'source_url': 'https://www.genealogiewerkbalk.nl/archieven.html',
'data_url': CSV_URL,
'data_tier': 'TIER_2_VERIFIED',
'enrichment_timestamp': datetime.now(timezone.utc).isoformat(),
'match_method': match_method,
'match_confidence': round(confidence, 4),
# Municipality info
'municipality': {
'name': match_data['gemeentenaam'],
'code': match_data['gemeentecode'],
},
# Municipal archive info
'municipal_archive': {
'name': match_data['archief_gemeente'],
'website': match_data['website_gemeentearchief'] or None,
'isil': match_data['isil'],
},
# Province info
'province': {
'name': match_data['provincienaam'],
'code': match_data['provinciecode'],
},
# Provincial archive info
'provincial_archive': {
'name': match_data['archief_provincie'],
'website': match_data['website_provinciaal_archief'] or None,
},
}
# Add extra info if present
if match_data.get('extra_info'):
enrichment['extra_info'] = match_data['extra_info']
# Add raw ISIL if different from parsed (for "geen*" codes)
if match_data.get('isil_raw') and match_data['isil_raw'] != match_data['isil']:
enrichment['municipal_archive']['isil_note'] = match_data['isil_raw']
return enrichment
def update_provenance(entry: Dict[str, Any], match_method: str) -> None:
"""Update provenance tracking with Genealogiewerkbalk source."""
if 'provenance' not in entry:
entry['provenance'] = {
'schema_version': '1.0.0',
'generated_at': datetime.now(timezone.utc).isoformat(),
'sources': {}
}
sources = entry['provenance'].setdefault('sources', {})
# Add genealogiewerkbalk source
sources['genealogiewerkbalk'] = [{
'source_type': 'genealogiewerkbalk_registry',
'fetch_timestamp': datetime.now(timezone.utc).isoformat(),
'data_url': CSV_URL,
'match_method': match_method,
'claims_extracted': [
'municipality_name',
'municipality_code',
'municipal_archive_name',
'municipal_archive_website',
'municipal_archive_isil',
'province_name',
'province_code',
'provincial_archive_name',
'provincial_archive_website',
]
}]
# Update data tier summary
tier_summary = entry['provenance'].setdefault('data_tier_summary', {})
tier_2 = tier_summary.setdefault('TIER_2_VERIFIED', [])
if 'genealogiewerkbalk_registry' not in tier_2:
tier_2.append('genealogiewerkbalk_registry')
def refresh_csv() -> bool:
"""Download fresh CSV from Google Sheets."""
logger.info(f"Downloading fresh CSV from: {CSV_URL}")
try:
# Ensure directory exists
SOURCES_DIR.mkdir(parents=True, exist_ok=True)
# Download
urllib.request.urlretrieve(CSV_URL, CSV_FILE)
# Verify
with open(CSV_FILE, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
rows = list(reader)
logger.info(f"Downloaded CSV with {len(rows)} municipalities")
return True
except Exception as e:
logger.error(f"Failed to download CSV: {e}")
return False
def process_entry(
entry_path: Path,
municipalities: Dict[str, Dict[str, Any]],
dry_run: bool = False,
force: bool = False
) -> Tuple[str, Optional[str]]:
"""Process a single entry file.
Returns:
Tuple of (status, match_info)
status: 'enriched', 'already_enriched', 'no_match', 'error'
"""
try:
with open(entry_path, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
return 'error', 'Empty file'
# Check if already enriched
if not force and 'genealogiewerkbalk_enrichment' in entry:
return 'already_enriched', None
# Find match
match_data, match_method, confidence = find_municipality_match(entry, municipalities)
if not match_data:
return 'no_match', None
# Create enrichment
enrichment = create_enrichment_section(match_data, match_method, confidence)
if dry_run:
gemeente = match_data['gemeentenaam']
archive = match_data['archief_gemeente']
return 'would_enrich', f"{gemeente} -> {archive} ({match_method}, {confidence:.2f})"
# Update entry
entry['genealogiewerkbalk_enrichment'] = enrichment
update_provenance(entry, match_method)
# Write back
with open(entry_path, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
gemeente = match_data['gemeentenaam']
archive = match_data['archief_gemeente']
return 'enriched', f"{gemeente} -> {archive} ({match_method})"
except Exception as e:
logger.error(f"Error processing {entry_path.name}: {e}")
return 'error', str(e)
def main():
parser = argparse.ArgumentParser(
description='Enrich NDE entries with Genealogiewerkbalk municipality archive data'
)
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without making changes')
parser.add_argument('--entry', type=str,
help='Process only a specific entry (e.g., "0016" or "0016_Q81181377")')
parser.add_argument('--force', action='store_true',
help='Re-enrich even if already enriched')
parser.add_argument('--refresh-csv', action='store_true',
help='Download fresh CSV before processing')
parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed output')
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
# Refresh CSV if requested or not present
if args.refresh_csv or not CSV_FILE.exists():
if not refresh_csv():
logger.error("Failed to get CSV data")
sys.exit(1)
# Load municipality data
municipalities = load_genealogiewerkbalk_data(CSV_FILE)
if not municipalities:
logger.error("No municipality data loaded")
sys.exit(1)
# Find entry files
if args.entry:
# Specific entry
pattern = f"{args.entry}*.yaml"
entry_files = list(ENTRIES_DIR.glob(pattern))
if not entry_files:
logger.error(f"No entry files found matching: {pattern}")
sys.exit(1)
else:
# All entries
entry_files = sorted(ENTRIES_DIR.glob("*.yaml"))
logger.info(f"Processing {len(entry_files)} entry files...")
# Statistics
stats = {
'total': len(entry_files),
'enriched': 0,
'already_enriched': 0,
'no_match': 0,
'error': 0,
}
# Process entries
for entry_path in entry_files:
status, info = process_entry(
entry_path,
municipalities,
dry_run=args.dry_run,
force=args.force
)
if status == 'enriched' or status == 'would_enrich':
stats['enriched'] += 1
logger.info(f"{'[DRY-RUN] Would enrich' if args.dry_run else 'Enriched'}: {entry_path.name} - {info}")
elif status == 'already_enriched':
stats['already_enriched'] += 1
if args.verbose:
logger.debug(f"Already enriched: {entry_path.name}")
elif status == 'no_match':
stats['no_match'] += 1
if args.verbose:
logger.debug(f"No match: {entry_path.name}")
elif status == 'error':
stats['error'] += 1
logger.warning(f"Error: {entry_path.name} - {info}")
# Summary
logger.info("\n=== Enrichment Summary ===")
logger.info(f"Total files: {stats['total']}")
logger.info(f"Enriched: {stats['enriched']}")
logger.info(f"Already enriched: {stats['already_enriched']}")
logger.info(f"No match: {stats['no_match']}")
logger.info(f"Errors: {stats['error']}")
if args.dry_run:
logger.info("\n[DRY-RUN] No changes were made.")
if __name__ == '__main__':
main()