glam/scripts/enrich_egypt_wikidata.py
2025-11-19 23:25:22 +01:00

511 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Enrich Egyptian heritage institutions with Wikidata identifiers.
This script queries Wikidata for Egyptian museums, libraries, archives, galleries,
and research centers, then fuzzy matches them with extracted institutions to add:
- Real Wikidata Q-numbers (replaces synthetic IDs if present)
- VIAF identifiers
- ISIL codes
- Geographic coordinates
- Founding dates
Strategy:
1. Load 29 Egyptian institutions from egypt_institutions.yaml
2. Query Wikidata for Egyptian heritage institutions (museums, libraries, archives)
3. Fuzzy match names (threshold: 0.75, improved normalization)
4. Enrich matched records with Wikidata metadata
5. Update confidence scores for verified matches
Target: 79% → 90%+ coverage with real Wikidata IDs
"""
import sys
from pathlib import Path
from typing import Any, Optional
from datetime import datetime, timezone
import time
import yaml
from difflib import SequenceMatcher
import re
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from SPARQLWrapper import SPARQLWrapper, JSON as SPARQL_JSON # type: ignore
def normalize_name(name: str) -> str:
"""
Normalize institution name for fuzzy matching.
IMPROVED VERSION: Preserves core type words (museum, library, archive)
and only removes articles/qualifiers to improve match quality.
"""
# Lowercase
name = name.lower()
# Remove articles (the, a, an)
name = re.sub(r'^(the|a|an)\s+', '', name)
# Remove qualifiers but KEEP type words (museum, library, archive, etc.)
# Only remove redundant qualifiers like "national", "regional", etc.
name = re.sub(r'\b(national|regional|central|public|state|royal|great)\b', '', name)
# Normalize Arabic transliterations (convert to common forms)
name = re.sub(r'\b(dar|dār)\b', 'dar', name) # Standardize to 'dar'
name = re.sub(r'\b(mathaf|mat?haf)\b', 'mathaf', name) # Standardize to 'mathaf'
name = re.sub(r'\b(maktabat)\b', 'library', name) # Convert to English
# Remove definite articles in Arabic
name = re.sub(r'\b(al-|el-)\b', '', name)
# Normalize punctuation to spaces (but don't remove entirely)
name = re.sub(r'[^\w\s]', ' ', name)
# Normalize whitespace
name = ' '.join(name.split())
return name
def similarity_score(name1: str, name2: str) -> float:
"""Calculate similarity between two names (0-1)."""
norm1 = normalize_name(name1)
norm2 = normalize_name(name2)
return SequenceMatcher(None, norm1, norm2).ratio()
def institution_type_compatible(inst_type: str, wd_type: str) -> bool:
"""Check if institution types are compatible (avoid museum/archive mismatches)."""
inst_lower = inst_type.lower()
wd_lower = wd_type.lower()
# Define type mappings
type_map = {
'MUSEUM': ['museum', 'museu', 'museo', 'musée', 'mathaf', 'mat?haf'],
'LIBRARY': ['library', 'biblioteca', 'bibliothèque', 'maktabat', 'dar al-kutub'],
'ARCHIVE': ['archive', 'archivo', 'arquivo', 'archief', 'watha\'iq', 'mahfuzat'],
'GALLERY': ['gallery', 'galerie', 'art center', 'art centre', 'kunsthalle'],
'RESEARCH_CENTER': ['institute', 'research center', 'research centre', 'documentation center']
}
# Find institution type keywords
inst_keywords = type_map.get(inst_type.upper(), [])
# If Wikidata type contains any of our keywords, it's compatible
return any(kw in wd_lower for kw in inst_keywords)
def query_wikidata_institutions(
sparql: SPARQLWrapper,
country_qid: str = "Q79", # Egypt
institution_types: Optional[list[str]] = None
) -> dict[str, dict[str, Any]]:
"""
Query Wikidata for heritage institutions in Egypt.
institution_types: List of Wikidata QIDs for institution types
Q33506 - museum
Q7075 - library
Q166118 - archive
Q1007870 - art gallery
Q31855 - research institute
"""
if institution_types is None:
institution_types = ["Q33506", "Q7075", "Q166118", "Q1007870", "Q31855"]
types_values = " ".join(f"wd:{qid}" for qid in institution_types)
query = f"""
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?itemAltLabel ?isil ?viaf ?coords ?website ?inception ?typeLabel
WHERE {{
VALUES ?type {{ {types_values} }}
?item wdt:P31 ?type . # instance of museum/library/archive/gallery/institute
?item wdt:P17 wd:{country_qid} . # country = Egypt
OPTIONAL {{ ?item wdt:P791 ?isil . }}
OPTIONAL {{ ?item wdt:P214 ?viaf . }}
OPTIONAL {{ ?item wdt:P625 ?coords . }}
OPTIONAL {{ ?item wdt:P856 ?website . }}
OPTIONAL {{ ?item wdt:P571 ?inception . }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,ar,fr" . }}
}}
LIMIT 1000
"""
sparql.setQuery(query)
try:
raw_results = sparql.query().convert()
bindings = raw_results.get("results", {}).get("bindings", []) if isinstance(raw_results, dict) else []
# Parse results into dict keyed by QID
results = {}
for binding in bindings:
item_uri = binding.get("item", {}).get("value", "")
qid = item_uri.split("/")[-1] if item_uri else None
if not qid or not qid.startswith("Q"):
continue
result = {
"qid": qid,
"name": binding.get("itemLabel", {}).get("value", ""),
"description": binding.get("itemDescription", {}).get("value", ""),
"type": binding.get("typeLabel", {}).get("value", ""),
"alternative_names": [],
"identifiers": {}
}
# Alternative names
if "itemAltLabel" in binding:
alt_label = binding["itemAltLabel"]["value"]
if alt_label and alt_label != result["name"]:
result["alternative_names"].append(alt_label)
if "isil" in binding:
result["identifiers"]["ISIL"] = binding["isil"]["value"]
if "viaf" in binding:
result["identifiers"]["VIAF"] = binding["viaf"]["value"]
if "website" in binding:
result["identifiers"]["Website"] = binding["website"]["value"]
if "inception" in binding:
result["founding_date"] = binding["inception"]["value"].split("T")[0]
if "coords" in binding:
coords_str = binding["coords"]["value"]
if coords_str.startswith("Point("):
lon, lat = coords_str[6:-1].split()
result["latitude"] = float(lat)
result["longitude"] = float(lon)
results[qid] = result
return results
except Exception as e:
print(f"\n❌ Error querying Wikidata: {e}")
import traceback
traceback.print_exc()
return {}
def fuzzy_match_institutions(
institutions: list[dict[str, Any]],
wikidata_results: dict[str, dict[str, Any]],
threshold: float = 0.75
) -> list[tuple[int, str, float, dict[str, Any]]]:
"""
Fuzzy match institutions with Wikidata results.
Returns: List of (institution_idx, qid, confidence_score, wd_data)
"""
matches = []
for idx, inst in enumerate(institutions):
inst_name = inst.get("name", "")
inst_type = inst.get("institution_type", "")
if not inst_name:
continue
# Skip if already has real Wikidata ID
has_wikidata = any(
id_obj.get("identifier_scheme") == "Wikidata" and
id_obj.get("identifier_value", "").startswith("Q") and
int(id_obj.get("identifier_value", "Q999999999")[1:]) < 100000000
for id_obj in inst.get("identifiers", [])
)
if has_wikidata:
continue
# Find best match
best_score = 0.0
best_qid = None
best_data = None
for qid, wd_data in wikidata_results.items():
wd_name = wd_data.get("name", "")
wd_type = wd_data.get("type", "")
if not wd_name:
continue
# Check type compatibility
if inst_type and not institution_type_compatible(inst_type, wd_type):
continue
# Calculate similarity with main name
score = similarity_score(inst_name, wd_name)
# Also check alternative names in Wikidata
for alt_name in wd_data.get("alternative_names", []):
alt_score = similarity_score(inst_name, alt_name)
score = max(score, alt_score)
# Also check our alternative names against Wikidata
for inst_alt_name in inst.get("alternative_names", []):
alt_score = similarity_score(inst_alt_name, wd_name)
score = max(score, alt_score)
if score > best_score:
best_score = score
best_qid = qid
best_data = wd_data
# Only include matches above threshold
if best_score >= threshold and best_qid and best_data:
matches.append((idx, best_qid, best_score, best_data))
return matches
def enrich_institution(inst: dict[str, Any], wd_data: dict[str, Any], confidence: float) -> bool:
"""Enrich an institution with Wikidata data. Returns True if enriched."""
enriched = False
if "identifiers" not in inst or not inst["identifiers"]:
inst["identifiers"] = []
identifiers_list = inst["identifiers"]
existing_schemes = {i.get("identifier_scheme", "") for i in identifiers_list if isinstance(i, dict)}
# Add Wikidata ID (or replace synthetic Q-number)
wikidata_idx = None
for i, id_obj in enumerate(identifiers_list):
if isinstance(id_obj, dict) and id_obj.get("identifier_scheme") == "Wikidata":
wikidata_idx = i
break
if wikidata_idx is not None:
# Replace existing (possibly synthetic) Wikidata ID
old_value = identifiers_list[wikidata_idx].get("identifier_value", "")
if old_value != wd_data["qid"]:
identifiers_list[wikidata_idx] = {
"identifier_scheme": "Wikidata",
"identifier_value": wd_data["qid"],
"identifier_url": f"https://www.wikidata.org/wiki/{wd_data['qid']}"
}
enriched = True
else:
# Add new Wikidata ID
identifiers_list.append({
"identifier_scheme": "Wikidata",
"identifier_value": wd_data["qid"],
"identifier_url": f"https://www.wikidata.org/wiki/{wd_data['qid']}"
})
enriched = True
# Add other identifiers
wd_identifiers = wd_data.get("identifiers", {})
for scheme, value in wd_identifiers.items():
if scheme not in existing_schemes:
id_obj = {
"identifier_scheme": scheme,
"identifier_value": value
}
if scheme == "VIAF":
id_obj["identifier_url"] = f"https://viaf.org/viaf/{value}"
elif scheme == "Website":
id_obj["identifier_url"] = value
elif scheme == "ISIL":
id_obj["identifier_url"] = f"https://isil.org/{value}"
identifiers_list.append(id_obj)
enriched = True
# Add founding date
if "founding_date" in wd_data and not inst.get("founding_date"):
inst["founding_date"] = wd_data["founding_date"]
enriched = True
# Add coordinates if missing
if "latitude" in wd_data and "longitude" in wd_data:
locations = inst.get("locations", [])
if locations and len(locations) > 0:
first_loc = locations[0]
if isinstance(first_loc, dict) and first_loc.get("latitude") is None:
first_loc["latitude"] = wd_data["latitude"]
first_loc["longitude"] = wd_data["longitude"]
enriched = True
elif not locations:
# Add new location with coordinates
inst["locations"] = [{
"country": "EG",
"latitude": wd_data["latitude"],
"longitude": wd_data["longitude"],
"is_primary": False
}]
enriched = True
# Update provenance
if enriched:
prov = inst.get("provenance", {})
if isinstance(prov, dict):
existing_method = prov.get("extraction_method", "")
if existing_method:
prov["extraction_method"] = f"{existing_method} + Wikidata enrichment (fuzzy match: {confidence:.3f})"
else:
prov["extraction_method"] = f"Wikidata enrichment (fuzzy match: {confidence:.3f})"
# Increase confidence score for verified matches
old_confidence = prov.get("confidence_score", 0.85)
new_confidence = min(0.95, old_confidence + (confidence - 0.85) * 0.1) # Slight boost based on match quality
prov["confidence_score"] = round(new_confidence, 2)
return enriched
def main():
base_dir = Path(__file__).parent.parent
input_file = base_dir / "data" / "instances" / "egypt_institutions_viaf_enriched.yaml"
output_file = base_dir / "data" / "instances" / "egypt_institutions_final_enriched.yaml"
print("="*80)
print("🇪🇬 EGYPTIAN INSTITUTIONS WIKIDATA ENRICHMENT")
print("="*80)
print(f"\n📖 Loading dataset...\n")
start_time = time.time()
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
# Split header and data
parts = content.split('---\n')
header = parts[0] if len(parts) > 1 else ""
yaml_content = parts[-1]
institutions = yaml.safe_load(yaml_content)
load_time = time.time() - start_time
print(f"✅ Loaded {len(institutions):,} Egyptian institutions in {load_time:.1f}s")
# Count those without real Wikidata
without_wikidata = [
idx for idx, inst in enumerate(institutions)
if not any(
id_obj.get("identifier_scheme") == "Wikidata" and
id_obj.get("identifier_value", "").startswith("Q") and
int(id_obj.get("identifier_value", "Q999999999")[1:]) < 100000000
for id_obj in inst.get("identifiers", [])
)
]
current_coverage = (len(institutions) - len(without_wikidata)) / len(institutions) * 100 if institutions else 0
new_coverage = current_coverage # Initialize new_coverage
print(f"✅ With Wikidata: {len(institutions) - len(without_wikidata):,} ({current_coverage:.1f}%)")
print(f"❓ Without Wikidata: {len(without_wikidata):,}\n")
if not without_wikidata:
print("✨ All institutions already have Wikidata IDs!")
return
# Setup SPARQL
print("🔍 Querying Wikidata for Egyptian heritage institutions...")
print(" (This may take 30-60 seconds)\n")
sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
sparql.setReturnFormat(SPARQL_JSON)
sparql.setMethod('POST')
sparql.addCustomHttpHeader("User-Agent", "GLAM-Extractor/0.2 (Egypt enrichment)")
# Query Wikidata
wikidata_results = query_wikidata_institutions(sparql)
print(f"✅ Found {len(wikidata_results):,} Egyptian institutions in Wikidata\n")
if not wikidata_results:
print("⚠️ No Wikidata results, skipping fuzzy matching")
enriched_count = 0
else:
# Fuzzy match
print("🔗 Fuzzy matching names (threshold: 0.75)...\n")
insts_without_wd = [institutions[idx] for idx in without_wikidata]
matches = fuzzy_match_institutions(insts_without_wd, wikidata_results, threshold=0.75)
print(f"✨ Found {len(matches):,} high-confidence matches\n")
# Show sample matches
if matches:
print(f"{'='*80}")
print(f"📋 SAMPLE MATCHES (Top 10)")
print(f"{'='*80}")
for i, (local_idx, qid, score, wd_data) in enumerate(matches[:10]):
inst = insts_without_wd[local_idx]
print(f"\n{i+1}. Confidence: {score:.3f}")
print(f" Local: {inst.get('name')}")
print(f" Wikidata: {wd_data.get('name')} ({wd_data.get('qid')})")
print(f" Type: {wd_data.get('type', 'Unknown')}")
if "ISIL" in wd_data.get("identifiers", {}):
print(f" ISIL: {wd_data['identifiers']['ISIL']}")
if "VIAF" in wd_data.get("identifiers", {}):
print(f" VIAF: {wd_data['identifiers']['VIAF']}")
print(f"\n{'='*80}\n")
# Apply all matches
print("✅ Applying all matches...\n")
enriched_count = 0
for local_idx, qid, score, wd_data in matches:
global_idx = without_wikidata[local_idx]
if enrich_institution(institutions[global_idx], wd_data, score):
enriched_count += 1
new_coverage = (len(institutions) - len(without_wikidata) + enriched_count) / len(institutions) * 100 if institutions else 0
print(f"✨ Enriched {enriched_count:,} institutions")
print(f"📈 Coverage: {current_coverage:.1f}% → {new_coverage:.1f}%\n")
else:
print("❌ No matches found. Try lowering threshold.\n")
enriched_count = 0
# Write output
print("="*80)
print("💾 WRITING ENRICHED DATASET")
print("="*80 + "\n")
output_header = f"""# Egyptian GLAM Institutions Dataset - Wikidata Enriched
# Original extraction: 39e11630-a2af-407c-a365-d485eb8257b0
# Enriched: {datetime.now(timezone.utc).isoformat()}
#
# Total institutions: {len(institutions)}
# Wikidata coverage: {new_coverage:.1f}%
# New Wikidata matches: {enriched_count}
#
# Coverage by type:
# - ARCHIVE: 1
# - GALLERY: 5
# - LIBRARY: 12
# - MUSEUM: 6
# - OFFICIAL_INSTITUTION: 2
# - RESEARCH_CENTER: 3
#
---
"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write(output_header)
yaml.dump(institutions, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120)
print(f"✅ Complete! Output: {output_file.name}\n")
# Final report
print("="*80)
print("📊 FINAL ENRICHMENT REPORT")
print("="*80)
print(f"\n✨ Results:")
print(f" Total institutions: {len(institutions):,}")
print(f" Institutions enriched: {enriched_count:,}")
print(f" Institutions without Wikidata: {len(without_wikidata) - enriched_count:,}")
print(f" Wikidata coverage: {current_coverage:.1f}% → {new_coverage:.1f}%")
print(f"\n⏱️ Total processing time: {(time.time()-start_time):.1f} seconds")
print("="*80 + "\n")
if __name__ == "__main__":
main()