glam/scripts/scrapers/merge_nrw_to_german_dataset.py
2025-11-19 23:25:22 +01:00

376 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Merge NRW Archives with German Unified Dataset
Integrates the 441 NRW archives from archive.nrw.de with the existing
German unified dataset (20,761 institutions from ISIL + DDB).
Features:
- Deduplication by name fuzzy matching (>90% similarity)
- Geocoding for cities using Nominatim API
- Preserves existing data quality (ISIL codes, coordinates)
- Adds NRW-specific metadata
Input:
- data/isil/germany/german_institutions_unified_20251119_181857.json (20,761)
- data/isil/germany/nrw_archives_fast_20251119_203700.json (441)
Output:
- data/isil/germany/german_institutions_unified_v2_{timestamp}.json
- Merge statistics report
Author: OpenCode AI Agent
Date: 2025-11-19
"""
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from rapidfuzz import fuzz
import requests
# Constants
NOMINATIM_API = "https://nominatim.openstreetmap.org/search"
NOMINATIM_HEADERS = {
"User-Agent": "GLAM-Data-Project/1.0 (heritage-institutions-research)"
}
FUZZY_MATCH_THRESHOLD = 90.0 # 90% similarity for deduplication
GEOCODING_DELAY = 1.0 # Nominatim: 1 request/second
class NominatimGeocoder:
"""Geocode German cities using Nominatim API."""
def __init__(self, delay: float = GEOCODING_DELAY):
self.delay = delay
self.cache: Dict[str, Optional[Tuple[float, float]]] = {}
self.last_request = 0.0
def geocode(self, city: str, country: str = "DE", region: str = "Nordrhein-Westfalen") -> Optional[Tuple[float, float]]:
"""
Geocode a city to (latitude, longitude).
Args:
city: City name
country: ISO country code (default: DE)
region: State/region name for disambiguation
Returns:
(lat, lon) tuple or None if not found
"""
# Check cache
cache_key = f"{city}|{country}|{region}"
if cache_key in self.cache:
return self.cache[cache_key]
# Rate limiting
elapsed = time.time() - self.last_request
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
# Build query
query = f"{city}, {region}, {country}"
params = {
"q": query,
"format": "json",
"limit": 1,
"addressdetails": 1
}
try:
response = requests.get(
NOMINATIM_API,
params=params,
headers=NOMINATIM_HEADERS,
timeout=10
)
self.last_request = time.time()
if response.status_code == 200:
results = response.json()
if results:
lat = float(results[0]["lat"])
lon = float(results[0]["lon"])
coords = (lat, lon)
self.cache[cache_key] = coords
return coords
# Not found
self.cache[cache_key] = None
return None
except Exception as e:
print(f" ⚠️ Geocoding error for {city}: {e}")
self.cache[cache_key] = None
return None
def normalize_name(name: str) -> str:
"""Normalize institution name for comparison."""
return name.lower().strip()
def find_duplicate(nrw_inst: Dict, unified_institutions: List[Dict]) -> Optional[Dict]:
"""
Find if NRW institution already exists in unified dataset.
Uses fuzzy name matching (>90% similarity).
Args:
nrw_inst: NRW institution record
unified_institutions: List of existing unified records
Returns:
Matching institution or None
"""
nrw_name = normalize_name(nrw_inst["name"])
for inst in unified_institutions:
# Check primary name
unified_name = normalize_name(inst["name"])
score = fuzz.ratio(nrw_name, unified_name)
if score >= FUZZY_MATCH_THRESHOLD:
return inst
# Check alternative names
alt_names = inst.get("alternative_names", [])
for alt in alt_names:
alt_normalized = normalize_name(alt)
score = fuzz.ratio(nrw_name, alt_normalized)
if score >= FUZZY_MATCH_THRESHOLD:
return inst
return None
def merge_nrw_archives():
"""Main merge function."""
print("=" * 80)
print("NRW Archives Merge with German Unified Dataset")
print("=" * 80)
print()
# Load unified dataset
unified_path = Path("data/isil/germany/german_institutions_unified_20251119_181857.json")
print(f"📂 Loading unified dataset: {unified_path.name}")
with open(unified_path, 'r', encoding='utf-8') as f:
unified_data = json.load(f)
unified_institutions = unified_data["institutions"]
print(f" ✅ Loaded {len(unified_institutions):,} institutions")
print()
# Load NRW archives
nrw_path = Path("data/isil/germany/nrw_archives_fast_20251119_203700.json")
print(f"📂 Loading NRW archives: {nrw_path.name}")
with open(nrw_path, 'r', encoding='utf-8') as f:
nrw_archives = json.load(f)
print(f" ✅ Loaded {len(nrw_archives):,} NRW archives")
print()
# Initialize geocoder
geocoder = NominatimGeocoder()
# Statistics
stats = {
"duplicates_found": 0,
"new_institutions": 0,
"geocoded": 0,
"geocoding_failed": 0,
"no_city": 0
}
# Process NRW archives
print("🔄 Processing NRW archives...")
print()
new_institutions = []
for i, nrw_inst in enumerate(nrw_archives, 1):
# Progress
if i % 50 == 0 or i == len(nrw_archives):
print(f" Progress: {i}/{len(nrw_archives)} ({i/len(nrw_archives)*100:.1f}%)")
# Check for duplicate
duplicate = find_duplicate(nrw_inst, unified_institutions)
if duplicate:
stats["duplicates_found"] += 1
# Skip - already in dataset
continue
# New institution - geocode if has city
city = nrw_inst.get("city")
if city and city.strip():
# Geocode
coords = geocoder.geocode(city)
if coords:
lat, lon = coords
stats["geocoded"] += 1
else:
lat, lon = None, None
stats["geocoding_failed"] += 1
else:
lat, lon = None, None
stats["no_city"] += 1
# Create unified format record
unified_record = {
"_data_sources": ["NRW"],
"_match_type": "NEW",
"isil": nrw_inst.get("isil_code"),
"name": nrw_inst["name"],
"alternative_names": [],
"address": {
"street": None,
"city": city,
"postal_code": None,
"country": "DE",
"region": "Nordrhein-Westfalen",
"latitude": str(lat) if lat else None,
"longitude": str(lon) if lon else None
},
"contact": {
"phone": None,
"fax": None,
"email": None
},
"urls": [
{
"url": nrw_inst["url"],
"type": "SOURCE",
"label": "archive.nrw.de"
}
],
"institution_type": nrw_inst["institution_type"],
"institution_category": None,
"description": None,
"isil_assigned_date": None,
"wikidata_id": None,
"viaf_id": None,
"gnd_id": None,
"harvest_metadata": {
"source": nrw_inst["source"],
"harvest_date": nrw_inst["harvest_date"],
"notes": nrw_inst["notes"]
}
}
new_institutions.append(unified_record)
stats["new_institutions"] += 1
print()
print("✅ Processing complete")
print()
# Merge datasets
print("🔗 Merging datasets...")
merged_institutions = unified_institutions + new_institutions
total_count = len(merged_institutions)
print(f" Total institutions: {total_count:,}")
print()
# Create output
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
output_data = {
"metadata": {
"source": "Unified German Heritage Institutions (ISIL + DDB + NRW)",
"generation_date": datetime.now(timezone.utc).isoformat(),
"source_files": {
"unified": unified_path.name,
"nrw": nrw_path.name
},
"total_institutions": total_count,
"license": "CC0 1.0 Universal (Public Domain)",
"merge_stats": stats
},
"institutions": merged_institutions
}
# Write output
output_path = Path(f"data/isil/germany/german_institutions_unified_v2_{timestamp}.json")
print(f"💾 Writing merged dataset: {output_path.name}")
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output_data, f, ensure_ascii=False, indent=2)
print(f" ✅ Written {total_count:,} institutions")
print()
# Print statistics
print("=" * 80)
print("MERGE STATISTICS")
print("=" * 80)
print()
print(f"Input Datasets:")
print(f" Unified (ISIL + DDB): {len(unified_institutions):>6,} institutions")
print(f" NRW Archives: {len(nrw_archives):>6,} archives")
print()
print(f"Processing Results:")
print(f" Duplicates found: {stats['duplicates_found']:>6,} (skipped)")
print(f" New institutions: {stats['new_institutions']:>6,} (added)")
print()
print(f"Geocoding Results:")
print(f" Successfully geocoded: {stats['geocoded']:>6,}")
print(f" Geocoding failed: {stats['geocoding_failed']:>6,}")
print(f" No city data: {stats['no_city']:>6,}")
print()
print(f"Output Dataset:")
print(f" Total institutions: {total_count:>6,}")
print()
# Calculate improvements
geocoded_before = sum(1 for inst in unified_institutions
if inst.get("address", {}).get("latitude"))
geocoded_after = geocoded_before + stats["geocoded"]
geocode_pct_before = (geocoded_before / len(unified_institutions)) * 100 if len(unified_institutions) > 0 else 0
geocode_pct_after = (geocoded_after / total_count) * 100 if total_count > 0 else 0
print(f"Geocoding Coverage:")
print(f" Before: {geocoded_before:,}/{len(unified_institutions):,} ({geocode_pct_before:.1f}%)")
print(f" After: {geocoded_after:,}/{total_count:,} ({geocode_pct_after:.1f}%)")
print(f" Change: +{geocode_pct_after - geocode_pct_before:+.1f} percentage points")
print()
print("=" * 80)
print("✅ MERGE COMPLETE")
print("=" * 80)
print()
print(f"Output: {output_path}")
print()
# Write stats report
stats_path = Path(f"data/isil/germany/german_unification_v2_stats_{timestamp}.json")
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump({
"merge_date": datetime.now(timezone.utc).isoformat(),
"input_files": {
"unified": str(unified_path),
"nrw": str(nrw_path)
},
"statistics": stats,
"totals": {
"before": len(unified_institutions),
"nrw_archives": len(nrw_archives),
"after": total_count,
"new_added": stats["new_institutions"]
},
"geocoding": {
"geocoded_before": geocoded_before,
"geocoded_after": geocoded_after,
"coverage_before_pct": round(geocode_pct_before, 2),
"coverage_after_pct": round(geocode_pct_after, 2),
"improvement_pct": round(geocode_pct_after - geocode_pct_before, 2)
}
}, f, ensure_ascii=False, indent=2)
print(f"📊 Statistics report: {stats_path.name}")
print()
if __name__ == "__main__":
merge_nrw_archives()