glam/scripts/enrich_swiss_wikidata_fuzzy.py
2025-12-21 00:01:54 +01:00

546 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Enrich Swiss custodian files with Wikidata Q-numbers using fuzzy name matching.
Uses Wikidata SPARQL endpoint to find matching institutions by name + location.
Writes enrichment data directly to individual custodian YAML files.
Process:
1. Query Wikidata for ALL Swiss heritage institutions
2. Load CH-*.yaml files without wikidata_enrichment
3. Fuzzy match by name + city location
4. Add Wikidata identifiers to matched files
5. Mark with enrichment_version: 2.1_generic
Usage:
python scripts/enrich_swiss_wikidata_fuzzy.py [--limit N] [--dry-run] [--threshold N]
"""
import yaml
import requests
import argparse
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timezone
from rapidfuzz import fuzz
# Wikidata SPARQL endpoint
WIKIDATA_SPARQL = "https://query.wikidata.org/sparql"
# Switzerland Wikidata ID
SWITZERLAND_QID = "Q39"
# Languages for Swiss institutions (German, French, Italian, Romansh, English)
SWISS_LANGUAGES = "de,fr,it,rm,en"
# Default similarity threshold
DEFAULT_THRESHOLD = 85.0
def query_wikidata_swiss_institutions() -> List[Dict]:
"""
Query Wikidata for ALL Swiss heritage institutions.
Returns:
List of dicts with: qid, label, type, location, coordinates, isil, viaf
"""
# Simplified SPARQL query - direct instance of, no subclass traversal
# This is much faster and avoids timeouts
query = f"""
SELECT DISTINCT ?item ?itemLabel ?typeLabel ?locationLabel ?coords ?isil ?viaf
WHERE {{
# Direct instance of heritage institution types (faster than subclass traversal)
VALUES ?type {{
wd:Q33506 # museum
wd:Q7075 # library
wd:Q166118 # archive
wd:Q1007870 # art gallery
wd:Q28564 # public library
wd:Q207694 # art museum
wd:Q17431399 # natural history museum
wd:Q3329412 # cantonal archive
wd:Q2668072 # cantonal library
wd:Q856584 # research library
}}
# Direct instance of (no subclass traversal for speed)
?item wdt:P31 ?type .
# Located in Switzerland
?item wdt:P17 wd:{SWITZERLAND_QID} .
# Optional: specific location (city/town/canton)
OPTIONAL {{ ?item wdt:P131 ?location }}
# Optional: coordinates
OPTIONAL {{ ?item wdt:P625 ?coords }}
# Optional: ISIL code
OPTIONAL {{ ?item wdt:P791 ?isil }}
# Optional: VIAF ID
OPTIONAL {{ ?item wdt:P214 ?viaf }}
# Get labels in Swiss languages + English
SERVICE wikibase:label {{
bd:serviceParam wikibase:language "{SWISS_LANGUAGES}"
}}
}}
LIMIT 10000
"""
print("Querying Wikidata for Swiss heritage institutions...")
print(f" Endpoint: {WIKIDATA_SPARQL}")
print(f" Languages: {SWISS_LANGUAGES}")
headers = {
'User-Agent': 'GLAM-Data-Extraction/0.2.1 (Swiss heritage institution research)',
'Accept': 'application/sparql-results+json'
}
try:
response = requests.get(
WIKIDATA_SPARQL,
params={'query': query},
headers=headers,
timeout=120 # Generous timeout for large query
)
response.raise_for_status()
data = response.json()
# Parse results
institutions = []
seen_qids = set() # Deduplicate by QID
for binding in data['results']['bindings']:
qid = binding['item']['value'].split('/')[-1]
# Skip duplicates (same institution may have multiple types)
if qid in seen_qids:
continue
seen_qids.add(qid)
label = binding['itemLabel']['value']
inst_type = binding.get('typeLabel', {}).get('value', '')
location = binding.get('locationLabel', {}).get('value', '')
coords = binding.get('coords', {}).get('value', '')
isil = binding.get('isil', {}).get('value', '')
viaf = binding.get('viaf', {}).get('value', '')
institutions.append({
'qid': qid,
'label': label,
'type': inst_type,
'location': location,
'coordinates': coords,
'isil': isil,
'viaf': viaf
})
print(f" Found {len(institutions)} unique institutions in Wikidata")
return institutions
except requests.exceptions.Timeout:
print("ERROR: Wikidata query timed out. Try again later.")
return []
except requests.exceptions.RequestException as e:
print(f"ERROR: Failed to query Wikidata: {e}")
return []
except Exception as e:
print(f"ERROR: Unexpected error: {e}")
return []
def fuzzy_match_institution(
inst_name: str,
inst_city: str,
wikidata_results: List[Dict],
threshold: float = DEFAULT_THRESHOLD
) -> Optional[Tuple[Dict, float]]:
"""
Fuzzy match institution to Wikidata results.
Uses a two-pass algorithm:
1. First try to find matches with BOTH name and location match (high confidence)
2. If no location match, fall back to name-only match with higher threshold
Args:
inst_name: Institution name from our dataset
inst_city: City location
wikidata_results: List of Wikidata query results
threshold: Minimum similarity threshold (0-100)
Returns:
Tuple of (matched_wikidata_record, confidence_score) or None
"""
best_match = None
best_score = 0.0
best_has_location_match = False
# Normalize our institution name
inst_name_lower = inst_name.lower().strip()
inst_city_lower = inst_city.lower().strip() if inst_city else ''
for wd in wikidata_results:
wd_label_lower = wd['label'].lower().strip()
wd_location_lower = wd.get('location', '').lower()
# Name similarity using token sort ratio (handles word reordering)
name_score = fuzz.token_sort_ratio(inst_name_lower, wd_label_lower)
# Check for location match
location_match = False
location_boost = 0
if inst_city_lower and wd_location_lower:
# Exact city name match in location
if inst_city_lower in wd_location_lower:
location_match = True
location_boost = 10
# Also check if city name is IN the Wikidata label itself (e.g., "Stadtarchiv Aarau")
elif inst_city_lower in wd_label_lower:
location_match = True
location_boost = 8
# Fuzzy location match
elif fuzz.partial_ratio(inst_city_lower, wd_location_lower) > 90:
location_match = True
location_boost = 5
# If we have a city but Wikidata label contains a DIFFERENT city, penalize
if inst_city_lower and not location_match:
# Check if Wikidata label contains a different Swiss city
# Common Swiss cities that might cause false matches
swiss_cities = ['zürich', 'zurich', 'bern', 'basel', 'geneva', 'genf', 'lausanne',
'luzern', 'lucerne', 'aarau', 'aarburg', 'winterthur', 'st. gallen',
'lugano', 'biel', 'thun', 'köniz', 'chur', 'schaffhausen', 'fribourg']
for city in swiss_cities:
if city in wd_label_lower and city != inst_city_lower:
# Different city mentioned in Wikidata label - big penalty
name_score = max(0, name_score - 20)
break
# Combined score
total_score = min(name_score + location_boost, 100)
# Prefer matches with location confirmation
is_better = False
if total_score >= threshold:
if location_match and not best_has_location_match:
# Location match beats non-location match
is_better = True
elif location_match == best_has_location_match and total_score > best_score:
# Same location status, higher score wins
is_better = True
if is_better:
best_score = total_score
best_match = wd
best_has_location_match = location_match
# For matches without location confirmation, require higher threshold
if best_match and not best_has_location_match:
# Require 95% name match if no location confirmation
if best_score < 95:
return None
if best_match:
return (best_match, best_score)
return None
def load_unenriched_files(custodian_dir: Path, limit: Optional[int] = None) -> List[Tuple[Path, Dict]]:
"""
Load CH-*.yaml files that don't have wikidata_enrichment.
Args:
custodian_dir: Path to data/custodian directory
limit: Optional limit on number of files to load
Returns:
List of (file_path, data_dict) tuples
"""
files = []
ch_files = sorted(custodian_dir.glob("CH-*.yaml"))
print(f"Scanning {len(ch_files)} CH-*.yaml files...")
for filepath in ch_files:
if limit and len(files) >= limit:
break
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
# Skip if already has wikidata_enrichment
if data.get('wikidata_enrichment'):
continue
# Skip if already has Wikidata identifier
has_wikidata = False
for identifier in data.get('identifiers', []):
if identifier.get('identifier_scheme') == 'Wikidata':
has_wikidata = True
break
if has_wikidata:
continue
files.append((filepath, data))
except Exception as e:
print(f" Warning: Could not load {filepath.name}: {e}")
print(f" Found {len(files)} files needing Wikidata enrichment")
return files
def save_enriched_file(filepath: Path, data: Dict) -> bool:
"""Save enriched data back to YAML file."""
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(
data,
f,
allow_unicode=True,
sort_keys=False,
default_flow_style=False,
width=120
)
return True
except Exception as e:
print(f" ERROR saving {filepath.name}: {e}")
return False
def enrich_with_wikidata(
limit: Optional[int] = None,
dry_run: bool = False,
threshold: float = DEFAULT_THRESHOLD
):
"""Main enrichment workflow."""
print("=" * 80)
print("SWISS INSTITUTIONS - WIKIDATA FUZZY MATCHING ENRICHMENT")
print("=" * 80)
print()
# Setup paths
custodian_dir = Path(__file__).parent.parent / "data" / "custodian"
if not custodian_dir.exists():
print(f"ERROR: Custodian directory not found: {custodian_dir}")
sys.exit(1)
# Query Wikidata for Swiss institutions
wikidata_results = query_wikidata_swiss_institutions()
if not wikidata_results:
print("No Wikidata results found. Exiting.")
sys.exit(1)
print()
# Load unenriched files
files_to_enrich = load_unenriched_files(custodian_dir, limit)
if not files_to_enrich:
print("No files need enrichment. Exiting.")
return
print()
print(f"Fuzzy matching {len(files_to_enrich)} institutions...")
print(f" Match threshold: {threshold}%")
print(f" Dry run: {dry_run}")
print()
# Statistics
matched = 0
high_confidence = 0
low_confidence = 0
saved = 0
errors = 0
timestamp = datetime.now(timezone.utc).isoformat()
for idx, (filepath, data) in enumerate(files_to_enrich, 1):
# Progress indicator
if idx % 50 == 0 or idx == len(files_to_enrich):
print(f" [{idx}/{len(files_to_enrich)}] Matched: {matched}, Saved: {saved}")
# Extract institution name
inst_name = None
if data.get('custodian_name', {}).get('claim_value'):
inst_name = data['custodian_name']['claim_value']
elif data.get('original_entry', {}).get('name'):
inst_name = data['original_entry']['name']
if not inst_name:
continue
# Extract city
inst_city = ''
if data.get('location', {}).get('city'):
inst_city = data['location']['city']
elif data.get('ghcid', {}).get('location_resolution', {}).get('city_name'):
inst_city = data['ghcid']['location_resolution']['city_name']
elif data.get('original_entry', {}).get('locations'):
locs = data['original_entry']['locations']
if locs and isinstance(locs, list) and locs[0].get('city'):
inst_city = locs[0]['city']
# Fuzzy match
match_result = fuzzy_match_institution(
inst_name,
inst_city,
wikidata_results,
threshold=threshold
)
if not match_result:
continue
matched_wd, confidence = match_result
matched += 1
if confidence >= 95:
high_confidence += 1
else:
low_confidence += 1
if dry_run:
print(f" [DRY RUN] Would match: {inst_name}")
print(f" -> {matched_wd['qid']} ({matched_wd['label']}) [{confidence:.1f}%]")
continue
# Add Wikidata enrichment
data['wikidata_enrichment'] = {
'wikidata_id': matched_wd['qid'],
'wikidata_label': matched_wd['label'],
'wikidata_url': f"https://www.wikidata.org/wiki/{matched_wd['qid']}",
'enrichment_date': timestamp,
'enrichment_version': '2.1_generic',
'enrichment_method': 'wikidata_fuzzy_match',
'match_confidence': round(confidence, 1),
'match_location': matched_wd.get('location', ''),
}
# Add ISIL if available from Wikidata
if matched_wd.get('isil'):
# Check if already has ISIL
has_isil = any(
i.get('identifier_scheme') == 'ISIL' and i.get('identifier_value') == matched_wd['isil']
for i in data.get('identifiers', [])
)
if not has_isil:
if 'identifiers' not in data:
data['identifiers'] = []
data['identifiers'].append({
'identifier_scheme': 'ISIL',
'identifier_value': matched_wd['isil'],
'identifier_source': 'wikidata'
})
# Add VIAF if available
if matched_wd.get('viaf'):
has_viaf = any(
i.get('identifier_scheme') == 'VIAF'
for i in data.get('identifiers', [])
)
if not has_viaf:
if 'identifiers' not in data:
data['identifiers'] = []
data['identifiers'].append({
'identifier_scheme': 'VIAF',
'identifier_value': matched_wd['viaf'],
'identifier_url': f"https://viaf.org/viaf/{matched_wd['viaf']}",
'identifier_source': 'wikidata'
})
# Add Wikidata identifier
has_wd_id = any(
i.get('identifier_scheme') == 'Wikidata'
for i in data.get('identifiers', [])
)
if not has_wd_id:
if 'identifiers' not in data:
data['identifiers'] = []
data['identifiers'].append({
'identifier_scheme': 'Wikidata',
'identifier_value': matched_wd['qid'],
'identifier_url': f"https://www.wikidata.org/wiki/{matched_wd['qid']}",
'identifier_source': 'wikidata_fuzzy_match'
})
# Update provenance notes
if 'provenance' not in data:
data['provenance'] = {}
if 'notes' not in data['provenance']:
data['provenance']['notes'] = []
data['provenance']['notes'].append(
f"Wikidata fuzzy match enrichment {timestamp}: "
f"Matched to {matched_wd['qid']} ({matched_wd['label']}) "
f"with {confidence:.1f}% confidence"
)
# Save file
if save_enriched_file(filepath, data):
saved += 1
else:
errors += 1
# Final summary
print()
print("=" * 80)
print("ENRICHMENT COMPLETE")
print("=" * 80)
print(f" Files scanned: {len(files_to_enrich)}")
print(f" Matched: {matched} ({matched/len(files_to_enrich)*100:.1f}%)")
print(f" High confidence (>=95%): {high_confidence}")
print(f" Low confidence (<95%): {low_confidence}")
if not dry_run:
print(f" Saved: {saved}")
print(f" Errors: {errors}")
else:
print(f" [DRY RUN - no files modified]")
print()
def main():
parser = argparse.ArgumentParser(
description="Enrich Swiss custodian files with Wikidata via fuzzy matching"
)
parser.add_argument(
'--limit', '-l',
type=int,
default=None,
help='Limit number of files to process (for testing)'
)
parser.add_argument(
'--dry-run', '-n',
action='store_true',
help='Show what would be matched without saving'
)
parser.add_argument(
'--threshold', '-t',
type=float,
default=DEFAULT_THRESHOLD,
help=f'Minimum similarity threshold (default: {DEFAULT_THRESHOLD})'
)
args = parser.parse_args()
enrich_with_wikidata(
limit=args.limit,
dry_run=args.dry_run,
threshold=args.threshold
)
if __name__ == '__main__':
main()