glam/scripts/enrich_nde_with_wikidata.py
2025-11-19 23:25:22 +01:00

349 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Enrich NDE Dutch Heritage Organizations with Wikidata Q-numbers.
This script uses the Wikidata MCP service to find matching Wikidata entities
for Dutch heritage organizations, with comprehensive logging of all SPARQL queries.
"""
import yaml
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple
import time
class WikidataEnricher:
"""Enriches NDE organizations with Wikidata Q-numbers."""
def __init__(self, data_path: Path, sparql_log_dir: Path):
self.data_path = data_path
self.sparql_log_dir = sparql_log_dir
self.sparql_log_dir.mkdir(parents=True, exist_ok=True)
# Statistics
self.stats = {
'total_records': 0,
'already_enriched': 0,
'newly_enriched': 0,
'no_match_found': 0,
'multiple_matches': 0,
'queries_executed': 0,
'errors': 0
}
# Query log
self.query_log = []
def log_sparql_query(self, query: str, org_name: str, results,
match_status: str, timestamp: str):
"""
Log a SPARQL query with results to a file.
Args:
query: SPARQL query string
org_name: Organization name being searched
results: Query results
match_status: 'matched', 'no_match', 'multiple', 'error'
timestamp: ISO timestamp
Returns:
Path to log file
"""
# Create sanitized filename
safe_org_name = "".join(c if c.isalnum() or c in ('-', '_') else '_'
for c in org_name[:50])
filename = f"{timestamp}_{safe_org_name}_{match_status}.json"
log_path = self.sparql_log_dir / filename
log_entry = {
'timestamp': timestamp,
'organization_name': org_name,
'query': query,
'results': results,
'match_status': match_status,
'query_number': len(self.query_log) + 1
}
# Write individual log file
with open(log_path, 'w', encoding='utf-8') as f:
json.dump(log_entry, f, indent=2, ensure_ascii=False)
# Add to master log
self.query_log.append(log_entry)
return log_path
def build_wikidata_query(self, org_name: str, org_type: Optional[str] = None,
city: Optional[str] = None, isil: Optional[str] = None) -> str:
"""
Build a SPARQL query to search Wikidata for a heritage organization.
Args:
org_name: Organization name
org_type: Type (museum, archief, bibliotheek, etc.)
city: City name
isil: ISIL code
Returns:
SPARQL query string
"""
# Map organization types to Wikidata classes
type_mapping = {
'museum': 'wd:Q33506', # museum
'archief': 'wd:Q166118', # archive
'bibliotheek': 'wd:Q7075', # library
'historische vereniging': 'wd:Q1964266', # historical society
'kenniscentrum': 'wd:Q7604693' # knowledge center
}
# Start building query
query_parts = [
"SELECT ?item ?itemLabel ?isil ?viaf ?website ?coords WHERE {",
]
# Search by ISIL code if available (most precise)
if isil:
query_parts.append(f' ?item wdt:P791 "{isil}" .')
else:
# Search by name and type
if org_type and org_type.lower() in type_mapping:
type_class = type_mapping[org_type.lower()]
query_parts.append(f' ?item wdt:P31/wdt:P279* {type_class} .')
# Add country constraint (Netherlands)
query_parts.append(' ?item wdt:P17 wd:Q55 .') # country: Netherlands
# Add city constraint if available
if city:
query_parts.append(f' # Located in or near {city}')
query_parts.append(' ?item wdt:P131* ?location .')
# Optional properties
query_parts.extend([
' OPTIONAL { ?item wdt:P791 ?isil }',
' OPTIONAL { ?item wdt:P214 ?viaf }',
' OPTIONAL { ?item wdt:P856 ?website }',
' OPTIONAL { ?item wdt:P625 ?coords }',
' SERVICE wikibase:label { bd:serviceParam wikibase:language "nl,en" }',
])
# Add label filter if not using ISIL
if not isil:
# Escape quotes in org_name
escaped_name = org_name.replace('"', '\\"')
query_parts.append(f' FILTER(CONTAINS(LCASE(?itemLabel), "{escaped_name.lower()}"))')
query_parts.append('}')
query_parts.append('LIMIT 10')
return '\n'.join(query_parts)
def parse_wikidata_results(self, results: str) -> List[Dict]:
"""
Parse Wikidata SPARQL results.
Args:
results: JSON string from Wikidata query
Returns:
List of result dictionaries
"""
try:
data = json.loads(results)
bindings = data.get('results', {}).get('bindings', [])
parsed = []
for binding in bindings:
item_uri = binding.get('item', {}).get('value', '')
q_number = item_uri.split('/')[-1] if item_uri else None
parsed.append({
'q_number': q_number,
'label': binding.get('itemLabel', {}).get('value', ''),
'isil': binding.get('isil', {}).get('value'),
'viaf': binding.get('viaf', {}).get('value'),
'website': binding.get('website', {}).get('value'),
'coords': binding.get('coords', {}).get('value')
})
return parsed
except json.JSONDecodeError:
return []
def find_best_match(self, org_name: str, candidates: List[Dict]) -> Optional[str]:
"""
Find the best matching Q-number from candidates.
Args:
org_name: Organization name to match
candidates: List of candidate results
Returns:
Q-number of best match, or None
"""
if not candidates:
return None
# If only one result, return it
if len(candidates) == 1:
return candidates[0]['q_number']
# Look for exact name match
org_lower = org_name.lower()
for candidate in candidates:
label_lower = candidate['label'].lower()
if label_lower == org_lower:
return candidate['q_number']
# Look for close match (name contained in label or vice versa)
for candidate in candidates:
label_lower = candidate['label'].lower()
if org_lower in label_lower or label_lower in org_lower:
# Check confidence: at least 70% overlap
shorter = min(len(org_lower), len(label_lower))
longer = max(len(org_lower), len(label_lower))
if shorter / longer >= 0.7:
return candidate['q_number']
# Multiple matches, cannot determine automatically
return None
def enrich_record(self, record: Dict, record_idx: int) -> Tuple[Dict, str]:
"""
Enrich a single record with Wikidata Q-number.
Args:
record: Organization record
record_idx: Record index for logging
Returns:
Tuple of (enriched_record, status_message)
"""
org_name = record.get('organisatie', 'Unknown')
# Skip if already has wikidata_id
if record.get('wikidata_id'):
self.stats['already_enriched'] += 1
return record, f"Already enriched: {record['wikidata_id']}"
# Build query
query = self.build_wikidata_query(
org_name=org_name,
org_type=record.get('type_organisatie'),
city=record.get('plaatsnaam_bezoekadres'),
isil=record.get('isil-code_na')
)
timestamp = datetime.now(timezone.utc).isoformat().replace(':', '-').replace('.', '-')
print(f"\n[{record_idx + 1}/{self.stats['total_records']}] {org_name[:60]}")
print(f" Type: {record.get('type_organisatie', 'N/A')}")
print(f" City: {record.get('plaatsnaam_bezoekadres', 'N/A')}")
print(f" ISIL: {record.get('isil-code_na', 'N/A')}")
# NOTE: This is where we would call the Wikidata MCP service
# For now, we'll create the query structure and log it
# Placeholder for MCP service call
# results_json = wikidata_mcp_service.execute_sparql(query)
# For demonstration, log the query
self.stats['queries_executed'] += 1
log_path = self.log_sparql_query(
query=query,
org_name=org_name,
results={'note': 'Query prepared for Wikidata MCP service execution'},
match_status='prepared',
timestamp=timestamp
)
print(f" Query logged: {log_path.name}")
return record, "Query prepared (MCP service integration pending)"
def save_master_query_log(self):
"""Save master log of all queries."""
timestamp = datetime.now(timezone.utc).isoformat().replace(':', '-').replace('.', '-')
master_log_path = self.sparql_log_dir / f"master_query_log_{timestamp}.json"
master_log = {
'enrichment_run': timestamp,
'statistics': self.stats,
'total_queries': len(self.query_log),
'queries': self.query_log
}
with open(master_log_path, 'w', encoding='utf-8') as f:
json.dump(master_log, f, indent=2, ensure_ascii=False)
print(f"\nMaster query log saved: {master_log_path}")
return master_log_path
def run_enrichment(self, limit: Optional[int] = None):
"""
Run the enrichment process.
Args:
limit: Optional limit on number of records to process (for testing)
"""
print("=" * 80)
print("NDE WIKIDATA ENRICHMENT")
print("=" * 80)
print()
# Load data
print(f"Loading data from {self.data_path}...")
with open(self.data_path, 'r', encoding='utf-8') as f:
records = yaml.safe_load(f)
self.stats['total_records'] = len(records) if not limit else min(limit, len(records))
print(f"Processing {self.stats['total_records']} records")
print()
# Process records
enriched_records = []
for idx, record in enumerate(records[:self.stats['total_records']]):
enriched_record, status = self.enrich_record(record, idx)
enriched_records.append(enriched_record)
# Small delay to be respectful to Wikidata
time.sleep(0.1)
# Save master log
self.save_master_query_log()
# Print statistics
print("\n" + "=" * 80)
print("ENRICHMENT STATISTICS")
print("=" * 80)
print(f"Total records processed: {self.stats['total_records']}")
print(f"Already enriched: {self.stats['already_enriched']}")
print(f"Newly enriched: {self.stats['newly_enriched']}")
print(f"No match found: {self.stats['no_match_found']}")
print(f"Multiple matches: {self.stats['multiple_matches']}")
print(f"Errors: {self.stats['errors']}")
print(f"Queries executed: {self.stats['queries_executed']}")
print()
return enriched_records
def main():
"""Main entry point."""
data_path = Path("data/nde/voorbeeld_lijst_organisaties_en_diensten-totaallijst_nederland.yaml")
sparql_log_dir = Path("data/nde/sparql")
enricher = WikidataEnricher(data_path, sparql_log_dir)
# Start with first 10 records for testing
enriched = enricher.run_enrichment(limit=10)
print("Enrichment preparation complete!")
print(f"SPARQL queries logged in: {sparql_log_dir}")
if __name__ == "__main__":
main()