glam/scripts/scrapers/crossreference_german_data.py
2025-11-19 23:25:22 +01:00

433 lines
15 KiB
Python

#!/usr/bin/env python3
"""
German Heritage Institution Data Cross-Reference
Merges DDB institutions with ISIL registry data
This script cross-references two German datasets:
1. ISIL Registry (16,979 institutions) - DNB/Staatsbibliothek zu Berlin
2. DDB Institutions (4,937 institutions) - Deutsche Digitale Bibliothek
Matching strategy:
- Primary: ISIL code (if available in DDB data)
- Secondary: Fuzzy name matching
- Tertiary: Location matching (city + postal code)
Outputs:
- Unified German dataset
- Statistics report
- Match quality analysis
Author: OpenCode + MCP Tools
Date: 2025-11-19
"""
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import List, Dict, Tuple, Optional
from collections import defaultdict
from rapidfuzz import fuzz
# Configuration
DATA_DIR = Path("/Users/kempersc/apps/glam/data/isil/germany")
ISIL_FILE = DATA_DIR / "german_isil_complete_20251119_134939.json"
DDB_FILE = DATA_DIR / "ddb_institutions_all_sectors_20251119_191121.json"
OUTPUT_FILE = DATA_DIR / f"german_institutions_unified_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
STATS_FILE = DATA_DIR / f"german_unification_stats_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
# Fuzzy matching thresholds
NAME_MATCH_THRESHOLD = 85
LOCATION_MATCH_THRESHOLD = 80
def load_isil_data() -> List[Dict]:
"""Load ISIL registry data."""
print(f"📄 Loading ISIL registry data from {ISIL_FILE.name}...")
with open(ISIL_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
institutions = data['records']
print(f"✅ Loaded {len(institutions)} institutions from ISIL registry")
print(f" Metadata: {data['metadata']['source']}")
return institutions
def load_ddb_data() -> List[Dict]:
"""Load DDB institutions data."""
print(f"📄 Loading DDB institutions from {DDB_FILE.name}...")
with open(DDB_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
institutions = data['institutions']
print(f"✅ Loaded {len(institutions)} institutions from DDB")
print(f" Metadata: {data['metadata']['source']}")
return institutions
def normalize_name(name: str) -> str:
"""Normalize institution name for matching."""
if not name:
return ""
# Convert to lowercase
name = name.lower()
# Remove common prefixes/suffixes
replacements = [
('stadtarchiv ', ''),
('landesarchiv ', ''),
('staatsarchiv ', ''),
('universitätsbibliothek ', ''),
('museum ', ''),
(' e.v.', ''),
(' e. v.', ''),
]
for old, new in replacements:
name = name.replace(old, new)
return name.strip()
def match_by_location(ddb_inst: Dict, isil_inst: Dict) -> int:
"""Calculate location match score."""
ddb_loc = ddb_inst.get('locationDisplayName', '').lower()
isil_addr = isil_inst.get('address', {})
if not ddb_loc:
return 0
# Extract city and postal code from ISIL
city = isil_addr.get('city', '').lower()
postal = isil_addr.get('postal_code', '').lower()
score = 0
if city and city in ddb_loc:
score += 50
if postal and postal in ddb_loc:
score += 50
return score
def cross_reference_institutions(isil_data: List[Dict], ddb_data: List[Dict]) -> Tuple[List[Dict], Dict]:
"""
Cross-reference DDB and ISIL institutions.
Returns:
- Unified institution list
- Match statistics
"""
print(f"\n🔍 Cross-referencing {len(ddb_data)} DDB institutions with {len(isil_data)} ISIL records...")
# Index ISIL data by ISIL code for fast lookup
isil_by_code: Dict[str, Dict] = {}
for inst in isil_data:
isil_code = inst.get('isil')
if isil_code:
isil_by_code[isil_code] = inst
print(f" - Indexed {len(isil_by_code)} ISIL codes for matching")
# Track matches
matched_ddb = []
unmatched_ddb = []
match_stats = {
'isil_code_matches': 0,
'name_matches': 0,
'location_matches': 0,
'no_match': 0
}
for ddb_inst in ddb_data:
ddb_name = ddb_inst.get('name', '')
# Try ISIL code match first (if DDB has ISIL field)
ddb_isil = ddb_inst.get('isil') # Some DDB records may have ISIL
if ddb_isil and ddb_isil in isil_by_code:
# Perfect match via ISIL code
isil_inst = isil_by_code[ddb_isil]
merged = merge_institutions(ddb_inst, isil_inst, 'ISIL_CODE')
matched_ddb.append(merged)
match_stats['isil_code_matches'] += 1
continue
# Try fuzzy name matching
best_match = None
best_score = 0
match_type = None
for isil_inst in isil_data:
isil_name = isil_inst.get('name', '')
# Fuzzy name matching
name_score = fuzz.ratio(
normalize_name(ddb_name),
normalize_name(isil_name)
)
if name_score >= NAME_MATCH_THRESHOLD and name_score > best_score:
# Check location as secondary validation
loc_score = match_by_location(ddb_inst, isil_inst)
combined_score = (name_score * 0.7) + (loc_score * 0.3)
if combined_score > best_score:
best_score = combined_score
best_match = isil_inst
match_type = 'NAME_FUZZY' if loc_score > 0 else 'NAME_ONLY'
if best_match and best_score >= NAME_MATCH_THRESHOLD and match_type:
merged = merge_institutions(ddb_inst, best_match, match_type)
merged['_match_score'] = round(best_score, 2)
matched_ddb.append(merged)
match_stats['name_matches'] += 1
else:
# No match found - keep DDB-only record
ddb_inst['_data_sources'] = ['DDB']
ddb_inst['_match_type'] = 'DDB_ONLY'
unmatched_ddb.append(ddb_inst)
match_stats['no_match'] += 1
# Add ISIL-only records (not matched with DDB)
matched_isil_codes = set()
for inst in matched_ddb:
isil = inst.get('isil')
if isil:
matched_isil_codes.add(isil)
isil_only = []
for inst in isil_data:
isil = inst.get('isil')
if isil not in matched_isil_codes:
inst['_data_sources'] = ['ISIL']
inst['_match_type'] = 'ISIL_ONLY'
isil_only.append(inst)
# Combine all institutions
unified = matched_ddb + unmatched_ddb + isil_only
match_stats['total_unified'] = len(unified)
match_stats['matched_records'] = len(matched_ddb)
match_stats['ddb_only'] = len(unmatched_ddb)
match_stats['isil_only'] = len(isil_only)
print(f"\n✅ Cross-reference complete!")
print(f" - ISIL code matches: {match_stats['isil_code_matches']}")
print(f" - Name fuzzy matches: {match_stats['name_matches']}")
print(f" - Total matched: {match_stats['matched_records']}")
print(f" - DDB only: {match_stats['ddb_only']}")
print(f" - ISIL only: {match_stats['isil_only']}")
print(f" - Total unified records: {match_stats['total_unified']}")
return unified, match_stats
def merge_institutions(ddb_inst: Dict, isil_inst: Dict, match_type: str) -> Dict:
"""
Merge DDB and ISIL institution records.
Priority:
- ISIL for authoritative metadata (ISIL code, address, contact)
- DDB for sector classification, geocoding, item counts
"""
merged = {
'_data_sources': ['DDB', 'ISIL'],
'_match_type': match_type
}
# ISIL fields (authoritative)
merged['isil'] = isil_inst.get('isil')
merged['name'] = isil_inst.get('name') # ISIL name is authoritative
merged['alternative_names'] = isil_inst.get('alternative_names', [])
# Add DDB name as alternative if different
ddb_name = ddb_inst.get('name')
if ddb_name and ddb_name != merged['name']:
if ddb_name not in merged['alternative_names']:
merged['alternative_names'].append(ddb_name)
# Address (ISIL preferred, DDB as fallback for geocoding)
merged['address'] = isil_inst.get('address', {})
# If ISIL lacks geocoding but DDB has it, add DDB coordinates
if not merged['address'].get('latitude') and ddb_inst.get('latitude'):
merged['address']['latitude'] = ddb_inst.get('latitude')
merged['address']['longitude'] = ddb_inst.get('longitude')
merged['address']['_geocoding_source'] = 'DDB'
# Contact (ISIL only)
merged['contact'] = isil_inst.get('contact', {})
# URLs (ISIL)
merged['urls'] = isil_inst.get('urls', [])
# DDB-specific fields
merged['ddb_id'] = ddb_inst.get('id')
merged['sector'] = ddb_inst.get('sector_name')
merged['sector_code'] = ddb_inst.get('sector_code')
merged['has_digital_items'] = ddb_inst.get('hasItems', False)
merged['digital_item_count'] = ddb_inst.get('numberOfItems', 0)
merged['ddb_location_display'] = ddb_inst.get('locationDisplayName')
# ISIL-specific fields
merged['institution_type'] = isil_inst.get('institution_type')
merged['parent_org'] = isil_inst.get('parent_org')
merged['interloan_region'] = isil_inst.get('interloan_region')
merged['notes'] = isil_inst.get('notes')
return merged
def generate_statistics(unified: List[Dict], match_stats: Dict) -> Dict:
"""Generate comprehensive unification statistics."""
stats = {
'generation_date': datetime.now(timezone.utc).isoformat(),
'source_files': {
'isil': str(ISIL_FILE.name),
'ddb': str(DDB_FILE.name)
},
'match_statistics': match_stats,
'coverage': {
'total_institutions': len(unified),
'with_isil_code': 0,
'with_ddb_id': 0,
'with_geocoding': 0,
'with_contact_info': 0,
'with_website': 0,
'with_digital_items': 0,
'matched_both_sources': 0,
'isil_only': 0,
'ddb_only': 0
},
'by_sector': defaultdict(int),
'by_region': defaultdict(int),
'top_cities': defaultdict(int)
}
for inst in unified:
sources = inst.get('_data_sources', [])
# Count by source
if len(sources) == 2:
stats['coverage']['matched_both_sources'] += 1
elif 'ISIL' in sources:
stats['coverage']['isil_only'] += 1
elif 'DDB' in sources:
stats['coverage']['ddb_only'] += 1
# Count features
if inst.get('isil'):
stats['coverage']['with_isil_code'] += 1
if inst.get('ddb_id'):
stats['coverage']['with_ddb_id'] += 1
address = inst.get('address', {})
if address.get('latitude'):
stats['coverage']['with_geocoding'] += 1
contact = inst.get('contact', {})
if contact.get('email') or contact.get('phone'):
stats['coverage']['with_contact_info'] += 1
if inst.get('urls'):
stats['coverage']['with_website'] += 1
if inst.get('has_digital_items'):
stats['coverage']['with_digital_items'] += 1
# Count by sector
sector = inst.get('sector', 'unknown')
stats['by_sector'][sector] += 1
# Count by region
region = address.get('region', 'unknown')
stats['by_region'][region] += 1
# Count by city
city = address.get('city', 'unknown')
stats['top_cities'][city] += 1
# Convert defaultdicts and sort
stats['by_sector'] = dict(sorted(stats['by_sector'].items(), key=lambda x: x[1], reverse=True))
stats['by_region'] = dict(sorted(stats['by_region'].items(), key=lambda x: x[1], reverse=True))
stats['top_cities'] = dict(sorted(stats['top_cities'].items(), key=lambda x: x[1], reverse=True)[:20])
return stats
def main():
"""Main unification workflow."""
print("🇩🇪 German Heritage Institution Data Unification")
print("=" * 60)
# Load data
isil_data = load_isil_data()
ddb_data = load_ddb_data()
# Cross-reference
unified, match_stats = cross_reference_institutions(isil_data, ddb_data)
# Generate statistics
print(f"\n📈 Generating statistics...")
stats = generate_statistics(unified, match_stats)
# Export unified data
print(f"\n💾 Exporting unified dataset...")
output_data = {
'metadata': {
'source': 'Unified German Heritage Institutions (ISIL + DDB)',
'generation_date': stats['generation_date'],
'source_files': stats['source_files'],
'total_institutions': len(unified),
'license': 'CC0 1.0 Universal (Public Domain)'
},
'institutions': unified
}
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"✅ Saved to: {OUTPUT_FILE}")
print(f" Size: {OUTPUT_FILE.stat().st_size / 1024 / 1024:.1f} MB")
# Export statistics
with open(STATS_FILE, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
print(f"✅ Statistics saved to: {STATS_FILE}")
# Print summary
print(f"\n" + "=" * 60)
print(f"📊 UNIFICATION SUMMARY")
print(f"=" * 60)
print(f"Total unified institutions: {stats['coverage']['total_institutions']}")
print(f"")
print(f"Match quality:")
print(f" - Both ISIL + DDB: {stats['coverage']['matched_both_sources']} ({stats['coverage']['matched_both_sources']/stats['coverage']['total_institutions']*100:.1f}%)")
print(f" - ISIL only: {stats['coverage']['isil_only']} ({stats['coverage']['isil_only']/stats['coverage']['total_institutions']*100:.1f}%)")
print(f" - DDB only: {stats['coverage']['ddb_only']} ({stats['coverage']['ddb_only']/stats['coverage']['total_institutions']*100:.1f}%)")
print(f"")
print(f"Coverage:")
print(f" - With ISIL codes: {stats['coverage']['with_isil_code']} ({stats['coverage']['with_isil_code']/stats['coverage']['total_institutions']*100:.1f}%)")
print(f" - With geocoding: {stats['coverage']['with_geocoding']} ({stats['coverage']['with_geocoding']/stats['coverage']['total_institutions']*100:.1f}%)")
print(f" - With contact info: {stats['coverage']['with_contact_info']} ({stats['coverage']['with_contact_info']/stats['coverage']['total_institutions']*100:.1f}%)")
print(f" - With digital items: {stats['coverage']['with_digital_items']} ({stats['coverage']['with_digital_items']/stats['coverage']['total_institutions']*100:.1f}%)")
print(f"")
print(f"Top 5 sectors:")
for i, (sector, count) in enumerate(list(stats['by_sector'].items())[:5], 1):
print(f" {i}. {sector}: {count}")
print(f"")
print(f"Top 5 regions:")
for i, (region, count) in enumerate(list(stats['by_region'].items())[:5], 1):
print(f" {i}. {region}: {count}")
print(f"")
print(f"Top 5 cities:")
for i, (city, count) in enumerate(list(stats['top_cities'].items())[:5], 1):
print(f" {i}. {city}: {count}")
print(f"=" * 60)
if __name__ == "__main__":
main()