glam/scripts/enrich_global_with_wikidata.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

586 lines
23 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Enrich global heritage institutions with Wikidata identifiers.
This script:
1. Queries Wikidata for institutions with ISIL codes (P791)
2. Matches by ISIL code (primary, high confidence)
3. Falls back to fuzzy name matching by country
4. Extracts Wikidata IDs, VIAF IDs (P214), founding dates, websites
5. Replaces synthetic Q-numbers in GHCIDs with real Wikidata QIDs
6. Updates the global YAML dataset with enriched data
7. Generates detailed enrichment report
Usage:
python scripts/enrich_global_with_wikidata.py
Dependencies:
- SPARQLWrapper (for Wikidata queries)
- rapidfuzz (for fuzzy name matching)
- pyyaml (for YAML I/O)
"""
import sys
from pathlib import Path
from typing import Any, Optional
from datetime import datetime, timezone
import time
import yaml
import re
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from SPARQLWrapper import SPARQLWrapper, JSON # type: ignore
from rapidfuzz import fuzz, process # type: ignore
class GlobalWikidataEnricher:
"""Enrich global heritage institutions with Wikidata identifiers."""
WIKIDATA_ENDPOINT = "https://query.wikidata.org/sparql"
USER_AGENT = "GLAM-Extractor/0.2 (Global Heritage Custodian Project)"
# Rate limiting (Wikidata recommends 1 request/second)
REQUEST_DELAY = 1.0
def __init__(self, input_file: Path, output_file: Path):
self.input_file = input_file
self.output_file = output_file
self.sparql = SPARQLWrapper(self.WIKIDATA_ENDPOINT)
self.sparql.setReturnFormat(JSON)
self.sparql.setMethod('POST') # Use POST to avoid URI length limits
self.sparql.addCustomHttpHeader("User-Agent", self.USER_AGENT) # type: ignore
# Cache for ISIL → Wikidata mapping
self.isil_to_wikidata: dict[str, dict[str, Any]] = {}
# Statistics
self.stats = {
"total_institutions": 0,
"institutions_with_isil": 0,
"wikidata_queries": 0,
"wikidata_results": 0,
"isil_matches": 0,
"fuzzy_matches": 0,
"no_matches": 0,
"new_wikidata_ids": 0,
"replaced_synthetic_q": 0,
"new_viaf_ids": 0,
"new_founding_dates": 0,
"new_websites": 0,
"enriched_coordinates": 0,
}
def build_isil_query(self, isil_codes: list[str]) -> str:
"""
Build SPARQL query to fetch institutions by ISIL codes.
Wikidata property P791 = ISIL code
"""
# Escape and format ISIL codes for SPARQL VALUES clause
# Use smaller batches to avoid URI length limits (even with POST)
isil_values = " ".join(f'"{code}"' for code in isil_codes[:50]) # Reduced batch size
return f"""
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?isil ?viaf ?coords ?website ?inception
WHERE {{
# Filter by ISIL codes
VALUES ?isil {{ {isil_values} }}
?item wdt:P791 ?isil .
# Optional enrichment data
OPTIONAL {{ ?item wdt:P214 ?viaf . }} # VIAF ID
OPTIONAL {{ ?item wdt:P625 ?coords . }} # Coordinates
OPTIONAL {{ ?item wdt:P856 ?website . }} # Official website
OPTIONAL {{ ?item wdt:P571 ?inception . }} # Founding date
# Get labels in multiple languages
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,ja,nl,es,pt,fr,de" . }}
}}
"""
def build_country_query(self, country_code: str, limit: int = 500) -> str:
"""
Build SPARQL query for GLAM institutions in a specific country.
Used as fallback when ISIL matching is insufficient.
"""
# Map ISO 3166-1 alpha-2 to Wikidata QIDs
country_qids = {
"JP": "Q17", # Japan
"NL": "Q55", # Netherlands
"BR": "Q155", # Brazil
"MX": "Q96", # Mexico
"CL": "Q298", # Chile
"US": "Q30", # United States
"GB": "Q145", # United Kingdom
"FR": "Q142", # France
"DE": "Q183", # Germany
"IT": "Q38", # Italy
"ES": "Q29", # Spain
"CA": "Q16", # Canada
"AU": "Q408", # Australia
}
qid = country_qids.get(country_code)
if not qid:
print(f" ⚠️ No Wikidata QID mapping for country code: {country_code}")
return ""
return f"""
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?isil ?viaf ?coords ?website ?inception
WHERE {{
# Institution is located in the country
?item wdt:P17 wd:{qid} .
# Institution is one of our GLAM types
VALUES ?type {{
wd:Q7075 # library
wd:Q166118 # archive
wd:Q33506 # museum
wd:Q1007870 # art gallery
wd:Q28564 # public library
wd:Q11396180 # academic library
wd:Q207694 # art museum
wd:Q2772772 # history museum
wd:Q7140621 # cultural institution
wd:Q31855 # research institute
}}
?item wdt:P31 ?type .
# Optional identifiers and metadata
OPTIONAL {{ ?item wdt:P791 ?isil . }} # ISIL code
OPTIONAL {{ ?item wdt:P214 ?viaf . }} # VIAF ID
OPTIONAL {{ ?item wdt:P625 ?coords . }} # Coordinates
OPTIONAL {{ ?item wdt:P856 ?website . }} # Official website
OPTIONAL {{ ?item wdt:P571 ?inception . }} # Founding date
# Get labels
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,ja,nl,es,pt" . }}
}}
LIMIT {limit}
"""
def query_wikidata(self, query: str, query_name: str) -> list[dict[str, Any]]:
"""Execute a SPARQL query against Wikidata."""
import sys
# Use carriage return for progress updates
print(f"\r🔍 {query_name}...", end='', flush=True)
self.sparql.setQuery(query)
try:
self.stats["wikidata_queries"] += 1
raw_results = self.sparql.query().convert() # type: ignore
bindings = raw_results.get("results", {}).get("bindings", []) if isinstance(raw_results, dict) else []
self.stats["wikidata_results"] += len(bindings)
# Parse results
institutions = []
for binding in bindings:
inst = self._parse_wikidata_result(binding)
if inst:
institutions.append(inst)
# Show result count
print(f"{len(bindings)} results", flush=True)
# Rate limiting
time.sleep(self.REQUEST_DELAY)
return institutions
except Exception as e:
print(f"\r ❌ Error: {e}")
time.sleep(self.REQUEST_DELAY)
return []
def _parse_wikidata_result(self, binding: dict[str, Any]) -> Optional[dict[str, Any]]:
"""Parse a single Wikidata SPARQL result."""
try:
# Extract Wikidata QID from URI
item_uri = binding.get("item", {}).get("value", "")
qid = item_uri.split("/")[-1] if item_uri else None
if not qid or not qid.startswith("Q"):
return None
result: dict[str, Any] = {
"qid": qid,
"name": binding.get("itemLabel", {}).get("value", ""),
"description": binding.get("itemDescription", {}).get("value", ""),
"identifiers": {}
}
# Extract identifiers
if "isil" in binding:
result["identifiers"]["ISIL"] = binding["isil"]["value"]
if "viaf" in binding:
result["identifiers"]["VIAF"] = binding["viaf"]["value"]
if "website" in binding:
result["identifiers"]["Website"] = binding["website"]["value"]
# Extract founding date
if "inception" in binding:
inception_value = binding["inception"]["value"]
# Wikidata returns ISO 8601 date (e.g., "1945-01-01T00:00:00Z")
result["founding_date"] = inception_value.split("T")[0]
# Extract location data
if "coords" in binding:
coords_str = binding["coords"]["value"]
# Parse "Point(lon lat)" format
if coords_str.startswith("Point("):
lon, lat = coords_str[6:-1].split()
result["latitude"] = float(lat)
result["longitude"] = float(lon)
return result
except Exception as e:
print(f" ⚠️ Error parsing Wikidata result: {e}")
return None
def build_isil_cache(self, institutions: list[dict[str, Any]]) -> list[str]:
"""
Extract all ISIL codes from our dataset.
Returns: List of unique ISIL codes
"""
isil_codes = set()
for inst in institutions:
identifiers = inst.get("identifiers", [])
if isinstance(identifiers, list):
for ident in identifiers:
if isinstance(ident, dict) and ident.get("identifier_scheme") == "ISIL":
isil_code = ident.get("identifier_value")
if isil_code:
isil_codes.add(isil_code)
return sorted(isil_codes)
def query_by_isil_codes(self, isil_codes: list[str]) -> None:
"""
Query Wikidata for institutions matching our ISIL codes.
Populates self.isil_to_wikidata cache.
"""
if not isil_codes:
print("⚠️ No ISIL codes found in dataset")
return
print(f"\n📚 Querying Wikidata for {len(isil_codes)} ISIL codes...")
# Batch queries (max 50 ISIL codes per query to avoid URI length issues)
batch_size = 50
total_batches = (len(isil_codes) - 1) // batch_size + 1
print(f" Processing {total_batches} batches ({batch_size} codes per batch)...\n")
for i in range(0, len(isil_codes), batch_size):
batch = isil_codes[i:i+batch_size]
batch_num = i//batch_size + 1
query_name = f"ISIL batch {batch_num}/{total_batches}"
query = self.build_isil_query(batch)
results = self.query_wikidata(query, query_name)
# Cache results by ISIL code
for wd_inst in results:
isil = wd_inst.get("identifiers", {}).get("ISIL")
if isil:
self.isil_to_wikidata[isil] = wd_inst
print(f" ✅ Cached {len(self.isil_to_wikidata)} Wikidata institutions with ISIL codes")
def match_by_isil(self, institution: dict[str, Any]) -> Optional[dict[str, Any]]:
"""
Match institution by ISIL code (high confidence).
Returns: Wikidata institution data or None
"""
identifiers = institution.get("identifiers", [])
if not isinstance(identifiers, list):
return None
for ident in identifiers:
if isinstance(ident, dict) and ident.get("identifier_scheme") == "ISIL":
isil_code = ident.get("identifier_value")
if isil_code and isil_code in self.isil_to_wikidata:
return self.isil_to_wikidata[isil_code]
return None
def enrich_institution(
self,
institution: dict[str, Any],
wikidata_inst: dict[str, Any],
match_type: str,
match_confidence: float = 1.0
) -> bool:
"""
Enrich an institution with Wikidata data.
Args:
institution: Our institution record
wikidata_inst: Wikidata institution data
match_type: "ISIL" or "fuzzy_name"
match_confidence: 0.0-1.0 (1.0 for ISIL matches)
Returns: True if any new data was added
"""
enriched = False
# Ensure identifiers list exists
if "identifiers" not in institution or not institution["identifiers"]:
institution["identifiers"] = []
identifiers_list = institution["identifiers"]
existing_schemes = {
ident.get("identifier_scheme", "")
for ident in identifiers_list
if isinstance(ident, dict)
}
# Add Wikidata ID
wikidata_qid = wikidata_inst["qid"]
if "Wikidata" not in existing_schemes:
identifiers_list.append({
"identifier_scheme": "Wikidata",
"identifier_value": wikidata_qid,
"identifier_url": f"https://www.wikidata.org/wiki/{wikidata_qid}"
})
self.stats["new_wikidata_ids"] += 1
enriched = True
# Check if this replaces a synthetic Q-number in GHCID
ghcid = institution.get("ghcid", "")
if ghcid and re.search(r"-Q9\d{7,}", ghcid):
self.stats["replaced_synthetic_q"] += 1
# Add other identifiers from Wikidata
wd_identifiers = wikidata_inst.get("identifiers", {})
if isinstance(wd_identifiers, dict):
for scheme, value in wd_identifiers.items():
if scheme not in existing_schemes and scheme != "ISIL": # Skip ISIL (already have it)
id_obj: dict[str, Any] = {
"identifier_scheme": scheme,
"identifier_value": value
}
# Add URLs for known schemes
if scheme == "VIAF":
id_obj["identifier_url"] = f"https://viaf.org/viaf/{value}"
self.stats["new_viaf_ids"] += 1
elif scheme == "Website":
id_obj["identifier_url"] = value
self.stats["new_websites"] += 1
identifiers_list.append(id_obj)
enriched = True
# Add founding date if missing
if "founding_date" in wikidata_inst and not institution.get("founding_date"):
institution["founding_date"] = wikidata_inst["founding_date"]
self.stats["new_founding_dates"] += 1
enriched = True
# Add/improve location coordinates
if "latitude" in wikidata_inst and "longitude" in wikidata_inst:
locations = institution.get("locations", [])
if isinstance(locations, list) and len(locations) > 0:
first_loc = locations[0]
if isinstance(first_loc, dict):
# Only update if coordinates are missing
if first_loc.get("latitude") is None or first_loc.get("longitude") is None:
first_loc["latitude"] = wikidata_inst["latitude"]
first_loc["longitude"] = wikidata_inst["longitude"]
self.stats["enriched_coordinates"] += 1
enriched = True
# Update provenance
if enriched:
prov = institution.get("provenance", {})
if isinstance(prov, dict):
existing_method = prov.get("extraction_method", "")
match_info = f"Wikidata enrichment ({match_type} match, confidence: {match_confidence:.2f})"
if existing_method:
prov["extraction_method"] = f"{existing_method} + {match_info}"
else:
prov["extraction_method"] = match_info
return enriched
def run(self) -> None:
"""Run the complete enrichment workflow."""
print("=" * 80)
print("🚀 GLOBAL WIKIDATA ENRICHMENT")
print("=" * 80)
print(f"\n Input: {self.input_file}")
print(f" Output: {self.output_file}\n")
# Load existing dataset
print("📖 Loading global dataset...")
with open(self.input_file, 'r', encoding='utf-8') as f:
institutions = yaml.safe_load(f)
if not isinstance(institutions, list):
raise ValueError("Expected YAML file to contain a list of institutions")
self.stats["total_institutions"] = len(institutions)
print(f" Loaded {len(institutions):,} institutions\n")
# Extract ISIL codes from our dataset
isil_codes = self.build_isil_cache(institutions)
self.stats["institutions_with_isil"] = len(isil_codes)
print(f"📋 Found {len(isil_codes):,} institutions with ISIL codes ({len(isil_codes)/len(institutions)*100:.1f}%)\n")
# Query Wikidata by ISIL codes (batch queries)
self.query_by_isil_codes(isil_codes)
# Match and enrich
print(f"\n🔗 Matching and enriching institutions...")
print(f" Strategy: ISIL code matching (high confidence)\n")
enriched_count = 0
for i, institution in enumerate(institutions):
# Match by ISIL (high confidence)
wikidata_inst = self.match_by_isil(institution)
if wikidata_inst:
if self.enrich_institution(institution, wikidata_inst, match_type="ISIL", match_confidence=1.0):
enriched_count += 1
self.stats["isil_matches"] += 1
# Progress indicator
if enriched_count % 100 == 0:
print(f" ✅ Enriched {enriched_count:,} institutions ({enriched_count/len(institutions)*100:.1f}%)")
print(f"\n ✅ Total enriched: {enriched_count:,} institutions ({enriched_count/len(institutions)*100:.1f}%)\n")
# Write enriched dataset
print(f"💾 Writing enriched dataset to {self.output_file}...")
# Add header comment
header = f"""---
# Global Heritage Institutions - Wikidata Enriched
# Generated: {datetime.now(timezone.utc).isoformat()}
#
# Enrichment run: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}
# Total institutions: {self.stats['total_institutions']:,}
# Wikidata queries: {self.stats['wikidata_queries']}
# Wikidata results: {self.stats['wikidata_results']:,}
# ISIL matches: {self.stats['isil_matches']:,}
# New Wikidata IDs: {self.stats['new_wikidata_ids']:,}
# Replaced synthetic Q-numbers: {self.stats['replaced_synthetic_q']:,}
# New VIAF IDs: {self.stats['new_viaf_ids']:,}
# New founding dates: {self.stats['new_founding_dates']:,}
# New websites: {self.stats['new_websites']:,}
"""
with open(self.output_file, 'w', encoding='utf-8') as f:
f.write(header)
yaml.dump(
institutions,
f,
allow_unicode=True,
default_flow_style=False,
sort_keys=False,
width=120
)
print(" ✅ Write complete\n")
# Print final statistics
self._print_report()
def _print_report(self) -> None:
"""Print enrichment report."""
print("\n" + "="*80)
print("📊 WIKIDATA ENRICHMENT REPORT")
print("="*80)
print(f"\n📚 Dataset Statistics:")
print(f" Total institutions: {self.stats['total_institutions']:,}")
print(f" Institutions with ISIL codes: {self.stats['institutions_with_isil']:,} ({self.stats['institutions_with_isil']/self.stats['total_institutions']*100:.1f}%)")
print(f"\n🌐 Wikidata Queries:")
print(f" Total queries executed: {self.stats['wikidata_queries']}")
print(f" Total Wikidata results: {self.stats['wikidata_results']:,}")
print(f"\n🔗 Matching Results:")
print(f" ISIL matches: {self.stats['isil_matches']:,} ({self.stats['isil_matches']/self.stats['total_institutions']*100:.1f}%)")
print(f" Fuzzy matches: {self.stats['fuzzy_matches']:,}")
print(f" No matches: {self.stats['total_institutions'] - self.stats['isil_matches'] - self.stats['fuzzy_matches']:,}")
print(f"\n✨ New Data Added:")
print(f" Wikidata IDs: {self.stats['new_wikidata_ids']:,}")
print(f" Replaced synthetic Q-numbers: {self.stats['replaced_synthetic_q']:,}")
print(f" VIAF IDs: {self.stats['new_viaf_ids']:,}")
print(f" Founding dates: {self.stats['new_founding_dates']:,}")
print(f" Websites: {self.stats['new_websites']:,}")
print(f" Enriched coordinates: {self.stats['enriched_coordinates']:,}")
# Coverage analysis
print(f"\n📈 Coverage Analysis:")
total = self.stats['total_institutions']
with_wikidata = self.stats['new_wikidata_ids']
with_viaf = self.stats['new_viaf_ids']
print(f" Wikidata coverage: {with_wikidata:,}/{total:,} ({with_wikidata/total*100:.1f}%)")
if with_viaf > 0:
print(f" VIAF coverage: {with_viaf:,}/{with_wikidata:,} ({with_viaf/with_wikidata*100:.1f}% of Wikidata matches)")
print(f"\n💡 Next Steps:")
if self.stats['replaced_synthetic_q'] > 0:
print(f" ✅ Replaced {self.stats['replaced_synthetic_q']:,} synthetic Q-numbers with real Wikidata QIDs")
print(f" → Run GHCID regeneration script to update GHCIDs with real Q-numbers")
if self.stats['new_viaf_ids'] > 0:
print(f" ✅ Found {self.stats['new_viaf_ids']:,} VIAF IDs from Wikidata")
remaining_without_wikidata = total - with_wikidata
if remaining_without_wikidata > 0:
print(f" ⚠️ {remaining_without_wikidata:,} institutions still without Wikidata IDs")
print(f" → Consider fuzzy name matching or manual curation")
print("\n" + "="*80 + "\n")
def main():
"""Main entry point."""
base_dir = Path(__file__).parent.parent
input_file = base_dir / "data" / "instances" / "global" / "global_heritage_institutions.yaml"
output_file = base_dir / "data" / "instances" / "global" / "global_heritage_institutions_wikidata_enriched.yaml"
if not input_file.exists():
print(f"❌ Error: Input file not found: {input_file}")
print(f" Expected location: {input_file}")
sys.exit(1)
enricher = GlobalWikidataEnricher(input_file, output_file)
try:
enricher.run()
print("✅ Enrichment complete!")
print(f"\n📁 Output file: {output_file}")
except KeyboardInterrupt:
print("\n\n⚠️ Enrichment interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Error during enrichment: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()