376 lines
12 KiB
Python
376 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Merge NRW Archives with German Unified Dataset
|
|
|
|
Integrates the 441 NRW archives from archive.nrw.de with the existing
|
|
German unified dataset (20,761 institutions from ISIL + DDB).
|
|
|
|
Features:
|
|
- Deduplication by name fuzzy matching (>90% similarity)
|
|
- Geocoding for cities using Nominatim API
|
|
- Preserves existing data quality (ISIL codes, coordinates)
|
|
- Adds NRW-specific metadata
|
|
|
|
Input:
|
|
- data/isil/germany/german_institutions_unified_20251119_181857.json (20,761)
|
|
- data/isil/germany/nrw_archives_fast_20251119_203700.json (441)
|
|
|
|
Output:
|
|
- data/isil/germany/german_institutions_unified_v2_{timestamp}.json
|
|
- Merge statistics report
|
|
|
|
Author: OpenCode AI Agent
|
|
Date: 2025-11-19
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
from rapidfuzz import fuzz
|
|
import requests
|
|
|
|
# Constants
|
|
NOMINATIM_API = "https://nominatim.openstreetmap.org/search"
|
|
NOMINATIM_HEADERS = {
|
|
"User-Agent": "GLAM-Data-Project/1.0 (heritage-institutions-research)"
|
|
}
|
|
FUZZY_MATCH_THRESHOLD = 90.0 # 90% similarity for deduplication
|
|
GEOCODING_DELAY = 1.0 # Nominatim: 1 request/second
|
|
|
|
class NominatimGeocoder:
|
|
"""Geocode German cities using Nominatim API."""
|
|
|
|
def __init__(self, delay: float = GEOCODING_DELAY):
|
|
self.delay = delay
|
|
self.cache: Dict[str, Optional[Tuple[float, float]]] = {}
|
|
self.last_request = 0.0
|
|
|
|
def geocode(self, city: str, country: str = "DE", region: str = "Nordrhein-Westfalen") -> Optional[Tuple[float, float]]:
|
|
"""
|
|
Geocode a city to (latitude, longitude).
|
|
|
|
Args:
|
|
city: City name
|
|
country: ISO country code (default: DE)
|
|
region: State/region name for disambiguation
|
|
|
|
Returns:
|
|
(lat, lon) tuple or None if not found
|
|
"""
|
|
# Check cache
|
|
cache_key = f"{city}|{country}|{region}"
|
|
if cache_key in self.cache:
|
|
return self.cache[cache_key]
|
|
|
|
# Rate limiting
|
|
elapsed = time.time() - self.last_request
|
|
if elapsed < self.delay:
|
|
time.sleep(self.delay - elapsed)
|
|
|
|
# Build query
|
|
query = f"{city}, {region}, {country}"
|
|
params = {
|
|
"q": query,
|
|
"format": "json",
|
|
"limit": 1,
|
|
"addressdetails": 1
|
|
}
|
|
|
|
try:
|
|
response = requests.get(
|
|
NOMINATIM_API,
|
|
params=params,
|
|
headers=NOMINATIM_HEADERS,
|
|
timeout=10
|
|
)
|
|
self.last_request = time.time()
|
|
|
|
if response.status_code == 200:
|
|
results = response.json()
|
|
if results:
|
|
lat = float(results[0]["lat"])
|
|
lon = float(results[0]["lon"])
|
|
coords = (lat, lon)
|
|
self.cache[cache_key] = coords
|
|
return coords
|
|
|
|
# Not found
|
|
self.cache[cache_key] = None
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ Geocoding error for {city}: {e}")
|
|
self.cache[cache_key] = None
|
|
return None
|
|
|
|
def normalize_name(name: str) -> str:
|
|
"""Normalize institution name for comparison."""
|
|
return name.lower().strip()
|
|
|
|
def find_duplicate(nrw_inst: Dict, unified_institutions: List[Dict]) -> Optional[Dict]:
|
|
"""
|
|
Find if NRW institution already exists in unified dataset.
|
|
|
|
Uses fuzzy name matching (>90% similarity).
|
|
|
|
Args:
|
|
nrw_inst: NRW institution record
|
|
unified_institutions: List of existing unified records
|
|
|
|
Returns:
|
|
Matching institution or None
|
|
"""
|
|
nrw_name = normalize_name(nrw_inst["name"])
|
|
|
|
for inst in unified_institutions:
|
|
# Check primary name
|
|
unified_name = normalize_name(inst["name"])
|
|
score = fuzz.ratio(nrw_name, unified_name)
|
|
|
|
if score >= FUZZY_MATCH_THRESHOLD:
|
|
return inst
|
|
|
|
# Check alternative names
|
|
alt_names = inst.get("alternative_names", [])
|
|
for alt in alt_names:
|
|
alt_normalized = normalize_name(alt)
|
|
score = fuzz.ratio(nrw_name, alt_normalized)
|
|
if score >= FUZZY_MATCH_THRESHOLD:
|
|
return inst
|
|
|
|
return None
|
|
|
|
def merge_nrw_archives():
|
|
"""Main merge function."""
|
|
|
|
print("=" * 80)
|
|
print("NRW Archives Merge with German Unified Dataset")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
# Load unified dataset
|
|
unified_path = Path("data/isil/germany/german_institutions_unified_20251119_181857.json")
|
|
print(f"📂 Loading unified dataset: {unified_path.name}")
|
|
|
|
with open(unified_path, 'r', encoding='utf-8') as f:
|
|
unified_data = json.load(f)
|
|
|
|
unified_institutions = unified_data["institutions"]
|
|
print(f" ✅ Loaded {len(unified_institutions):,} institutions")
|
|
print()
|
|
|
|
# Load NRW archives
|
|
nrw_path = Path("data/isil/germany/nrw_archives_fast_20251119_203700.json")
|
|
print(f"📂 Loading NRW archives: {nrw_path.name}")
|
|
|
|
with open(nrw_path, 'r', encoding='utf-8') as f:
|
|
nrw_archives = json.load(f)
|
|
|
|
print(f" ✅ Loaded {len(nrw_archives):,} NRW archives")
|
|
print()
|
|
|
|
# Initialize geocoder
|
|
geocoder = NominatimGeocoder()
|
|
|
|
# Statistics
|
|
stats = {
|
|
"duplicates_found": 0,
|
|
"new_institutions": 0,
|
|
"geocoded": 0,
|
|
"geocoding_failed": 0,
|
|
"no_city": 0
|
|
}
|
|
|
|
# Process NRW archives
|
|
print("🔄 Processing NRW archives...")
|
|
print()
|
|
|
|
new_institutions = []
|
|
|
|
for i, nrw_inst in enumerate(nrw_archives, 1):
|
|
# Progress
|
|
if i % 50 == 0 or i == len(nrw_archives):
|
|
print(f" Progress: {i}/{len(nrw_archives)} ({i/len(nrw_archives)*100:.1f}%)")
|
|
|
|
# Check for duplicate
|
|
duplicate = find_duplicate(nrw_inst, unified_institutions)
|
|
|
|
if duplicate:
|
|
stats["duplicates_found"] += 1
|
|
# Skip - already in dataset
|
|
continue
|
|
|
|
# New institution - geocode if has city
|
|
city = nrw_inst.get("city")
|
|
|
|
if city and city.strip():
|
|
# Geocode
|
|
coords = geocoder.geocode(city)
|
|
|
|
if coords:
|
|
lat, lon = coords
|
|
stats["geocoded"] += 1
|
|
else:
|
|
lat, lon = None, None
|
|
stats["geocoding_failed"] += 1
|
|
else:
|
|
lat, lon = None, None
|
|
stats["no_city"] += 1
|
|
|
|
# Create unified format record
|
|
unified_record = {
|
|
"_data_sources": ["NRW"],
|
|
"_match_type": "NEW",
|
|
"isil": nrw_inst.get("isil_code"),
|
|
"name": nrw_inst["name"],
|
|
"alternative_names": [],
|
|
"address": {
|
|
"street": None,
|
|
"city": city,
|
|
"postal_code": None,
|
|
"country": "DE",
|
|
"region": "Nordrhein-Westfalen",
|
|
"latitude": str(lat) if lat else None,
|
|
"longitude": str(lon) if lon else None
|
|
},
|
|
"contact": {
|
|
"phone": None,
|
|
"fax": None,
|
|
"email": None
|
|
},
|
|
"urls": [
|
|
{
|
|
"url": nrw_inst["url"],
|
|
"type": "SOURCE",
|
|
"label": "archive.nrw.de"
|
|
}
|
|
],
|
|
"institution_type": nrw_inst["institution_type"],
|
|
"institution_category": None,
|
|
"description": None,
|
|
"isil_assigned_date": None,
|
|
"wikidata_id": None,
|
|
"viaf_id": None,
|
|
"gnd_id": None,
|
|
"harvest_metadata": {
|
|
"source": nrw_inst["source"],
|
|
"harvest_date": nrw_inst["harvest_date"],
|
|
"notes": nrw_inst["notes"]
|
|
}
|
|
}
|
|
|
|
new_institutions.append(unified_record)
|
|
stats["new_institutions"] += 1
|
|
|
|
print()
|
|
print("✅ Processing complete")
|
|
print()
|
|
|
|
# Merge datasets
|
|
print("🔗 Merging datasets...")
|
|
merged_institutions = unified_institutions + new_institutions
|
|
total_count = len(merged_institutions)
|
|
print(f" Total institutions: {total_count:,}")
|
|
print()
|
|
|
|
# Create output
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
output_data = {
|
|
"metadata": {
|
|
"source": "Unified German Heritage Institutions (ISIL + DDB + NRW)",
|
|
"generation_date": datetime.now(timezone.utc).isoformat(),
|
|
"source_files": {
|
|
"unified": unified_path.name,
|
|
"nrw": nrw_path.name
|
|
},
|
|
"total_institutions": total_count,
|
|
"license": "CC0 1.0 Universal (Public Domain)",
|
|
"merge_stats": stats
|
|
},
|
|
"institutions": merged_institutions
|
|
}
|
|
|
|
# Write output
|
|
output_path = Path(f"data/isil/germany/german_institutions_unified_v2_{timestamp}.json")
|
|
print(f"💾 Writing merged dataset: {output_path.name}")
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(output_data, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f" ✅ Written {total_count:,} institutions")
|
|
print()
|
|
|
|
# Print statistics
|
|
print("=" * 80)
|
|
print("MERGE STATISTICS")
|
|
print("=" * 80)
|
|
print()
|
|
print(f"Input Datasets:")
|
|
print(f" Unified (ISIL + DDB): {len(unified_institutions):>6,} institutions")
|
|
print(f" NRW Archives: {len(nrw_archives):>6,} archives")
|
|
print()
|
|
print(f"Processing Results:")
|
|
print(f" Duplicates found: {stats['duplicates_found']:>6,} (skipped)")
|
|
print(f" New institutions: {stats['new_institutions']:>6,} (added)")
|
|
print()
|
|
print(f"Geocoding Results:")
|
|
print(f" Successfully geocoded: {stats['geocoded']:>6,}")
|
|
print(f" Geocoding failed: {stats['geocoding_failed']:>6,}")
|
|
print(f" No city data: {stats['no_city']:>6,}")
|
|
print()
|
|
print(f"Output Dataset:")
|
|
print(f" Total institutions: {total_count:>6,}")
|
|
print()
|
|
|
|
# Calculate improvements
|
|
geocoded_before = sum(1 for inst in unified_institutions
|
|
if inst.get("address", {}).get("latitude"))
|
|
geocoded_after = geocoded_before + stats["geocoded"]
|
|
geocode_pct_before = (geocoded_before / len(unified_institutions)) * 100 if len(unified_institutions) > 0 else 0
|
|
geocode_pct_after = (geocoded_after / total_count) * 100 if total_count > 0 else 0
|
|
|
|
print(f"Geocoding Coverage:")
|
|
print(f" Before: {geocoded_before:,}/{len(unified_institutions):,} ({geocode_pct_before:.1f}%)")
|
|
print(f" After: {geocoded_after:,}/{total_count:,} ({geocode_pct_after:.1f}%)")
|
|
print(f" Change: +{geocode_pct_after - geocode_pct_before:+.1f} percentage points")
|
|
print()
|
|
|
|
print("=" * 80)
|
|
print("✅ MERGE COMPLETE")
|
|
print("=" * 80)
|
|
print()
|
|
print(f"Output: {output_path}")
|
|
print()
|
|
|
|
# Write stats report
|
|
stats_path = Path(f"data/isil/germany/german_unification_v2_stats_{timestamp}.json")
|
|
with open(stats_path, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
"merge_date": datetime.now(timezone.utc).isoformat(),
|
|
"input_files": {
|
|
"unified": str(unified_path),
|
|
"nrw": str(nrw_path)
|
|
},
|
|
"statistics": stats,
|
|
"totals": {
|
|
"before": len(unified_institutions),
|
|
"nrw_archives": len(nrw_archives),
|
|
"after": total_count,
|
|
"new_added": stats["new_institutions"]
|
|
},
|
|
"geocoding": {
|
|
"geocoded_before": geocoded_before,
|
|
"geocoded_after": geocoded_after,
|
|
"coverage_before_pct": round(geocode_pct_before, 2),
|
|
"coverage_after_pct": round(geocode_pct_after, 2),
|
|
"improvement_pct": round(geocode_pct_after - geocode_pct_before, 2)
|
|
}
|
|
}, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"📊 Statistics report: {stats_path.name}")
|
|
print()
|
|
|
|
if __name__ == "__main__":
|
|
merge_nrw_archives()
|