#!/usr/bin/env python3 """ Enrich Egyptian heritage institutions with Wikidata identifiers. This script queries Wikidata for Egyptian museums, libraries, archives, galleries, and research centers, then fuzzy matches them with extracted institutions to add: - Real Wikidata Q-numbers (replaces synthetic IDs if present) - VIAF identifiers - ISIL codes - Geographic coordinates - Founding dates Strategy: 1. Load 29 Egyptian institutions from egypt_institutions.yaml 2. Query Wikidata for Egyptian heritage institutions (museums, libraries, archives) 3. Fuzzy match names (threshold: 0.75, improved normalization) 4. Enrich matched records with Wikidata metadata 5. Update confidence scores for verified matches Target: 79% → 90%+ coverage with real Wikidata IDs """ import sys from pathlib import Path from typing import Any, Optional from datetime import datetime, timezone import time import yaml from difflib import SequenceMatcher import re sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from SPARQLWrapper import SPARQLWrapper, JSON as SPARQL_JSON # type: ignore def normalize_name(name: str) -> str: """ Normalize institution name for fuzzy matching. IMPROVED VERSION: Preserves core type words (museum, library, archive) and only removes articles/qualifiers to improve match quality. """ # Lowercase name = name.lower() # Remove articles (the, a, an) name = re.sub(r'^(the|a|an)\s+', '', name) # Remove qualifiers but KEEP type words (museum, library, archive, etc.) # Only remove redundant qualifiers like "national", "regional", etc. name = re.sub(r'\b(national|regional|central|public|state|royal|great)\b', '', name) # Normalize Arabic transliterations (convert to common forms) name = re.sub(r'\b(dar|dār)\b', 'dar', name) # Standardize to 'dar' name = re.sub(r'\b(mathaf|mat?haf)\b', 'mathaf', name) # Standardize to 'mathaf' name = re.sub(r'\b(maktabat)\b', 'library', name) # Convert to English # Remove definite articles in Arabic name = re.sub(r'\b(al-|el-)\b', '', name) # Normalize punctuation to spaces (but don't remove entirely) name = re.sub(r'[^\w\s]', ' ', name) # Normalize whitespace name = ' '.join(name.split()) return name def similarity_score(name1: str, name2: str) -> float: """Calculate similarity between two names (0-1).""" norm1 = normalize_name(name1) norm2 = normalize_name(name2) return SequenceMatcher(None, norm1, norm2).ratio() def institution_type_compatible(inst_type: str, wd_type: str) -> bool: """Check if institution types are compatible (avoid museum/archive mismatches).""" inst_lower = inst_type.lower() wd_lower = wd_type.lower() # Define type mappings type_map = { 'MUSEUM': ['museum', 'museu', 'museo', 'musée', 'mathaf', 'mat?haf'], 'LIBRARY': ['library', 'biblioteca', 'bibliothèque', 'maktabat', 'dar al-kutub'], 'ARCHIVE': ['archive', 'archivo', 'arquivo', 'archief', 'watha\'iq', 'mahfuzat'], 'GALLERY': ['gallery', 'galerie', 'art center', 'art centre', 'kunsthalle'], 'RESEARCH_CENTER': ['institute', 'research center', 'research centre', 'documentation center'] } # Find institution type keywords inst_keywords = type_map.get(inst_type.upper(), []) # If Wikidata type contains any of our keywords, it's compatible return any(kw in wd_lower for kw in inst_keywords) def query_wikidata_institutions( sparql: SPARQLWrapper, country_qid: str = "Q79", # Egypt institution_types: Optional[list[str]] = None ) -> dict[str, dict[str, Any]]: """ Query Wikidata for heritage institutions in Egypt. institution_types: List of Wikidata QIDs for institution types Q33506 - museum Q7075 - library Q166118 - archive Q1007870 - art gallery Q31855 - research institute """ if institution_types is None: institution_types = ["Q33506", "Q7075", "Q166118", "Q1007870", "Q31855"] types_values = " ".join(f"wd:{qid}" for qid in institution_types) query = f""" SELECT DISTINCT ?item ?itemLabel ?itemDescription ?itemAltLabel ?isil ?viaf ?coords ?website ?inception ?typeLabel WHERE {{ VALUES ?type {{ {types_values} }} ?item wdt:P31 ?type . # instance of museum/library/archive/gallery/institute ?item wdt:P17 wd:{country_qid} . # country = Egypt OPTIONAL {{ ?item wdt:P791 ?isil . }} OPTIONAL {{ ?item wdt:P214 ?viaf . }} OPTIONAL {{ ?item wdt:P625 ?coords . }} OPTIONAL {{ ?item wdt:P856 ?website . }} OPTIONAL {{ ?item wdt:P571 ?inception . }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,ar,fr" . }} }} LIMIT 1000 """ sparql.setQuery(query) try: raw_results = sparql.query().convert() bindings = raw_results.get("results", {}).get("bindings", []) if isinstance(raw_results, dict) else [] # Parse results into dict keyed by QID results = {} for binding in bindings: item_uri = binding.get("item", {}).get("value", "") qid = item_uri.split("/")[-1] if item_uri else None if not qid or not qid.startswith("Q"): continue result = { "qid": qid, "name": binding.get("itemLabel", {}).get("value", ""), "description": binding.get("itemDescription", {}).get("value", ""), "type": binding.get("typeLabel", {}).get("value", ""), "alternative_names": [], "identifiers": {} } # Alternative names if "itemAltLabel" in binding: alt_label = binding["itemAltLabel"]["value"] if alt_label and alt_label != result["name"]: result["alternative_names"].append(alt_label) if "isil" in binding: result["identifiers"]["ISIL"] = binding["isil"]["value"] if "viaf" in binding: result["identifiers"]["VIAF"] = binding["viaf"]["value"] if "website" in binding: result["identifiers"]["Website"] = binding["website"]["value"] if "inception" in binding: result["founding_date"] = binding["inception"]["value"].split("T")[0] if "coords" in binding: coords_str = binding["coords"]["value"] if coords_str.startswith("Point("): lon, lat = coords_str[6:-1].split() result["latitude"] = float(lat) result["longitude"] = float(lon) results[qid] = result return results except Exception as e: print(f"\n❌ Error querying Wikidata: {e}") import traceback traceback.print_exc() return {} def fuzzy_match_institutions( institutions: list[dict[str, Any]], wikidata_results: dict[str, dict[str, Any]], threshold: float = 0.75 ) -> list[tuple[int, str, float, dict[str, Any]]]: """ Fuzzy match institutions with Wikidata results. Returns: List of (institution_idx, qid, confidence_score, wd_data) """ matches = [] for idx, inst in enumerate(institutions): inst_name = inst.get("name", "") inst_type = inst.get("institution_type", "") if not inst_name: continue # Skip if already has real Wikidata ID has_wikidata = any( id_obj.get("identifier_scheme") == "Wikidata" and id_obj.get("identifier_value", "").startswith("Q") and int(id_obj.get("identifier_value", "Q999999999")[1:]) < 100000000 for id_obj in inst.get("identifiers", []) ) if has_wikidata: continue # Find best match best_score = 0.0 best_qid = None best_data = None for qid, wd_data in wikidata_results.items(): wd_name = wd_data.get("name", "") wd_type = wd_data.get("type", "") if not wd_name: continue # Check type compatibility if inst_type and not institution_type_compatible(inst_type, wd_type): continue # Calculate similarity with main name score = similarity_score(inst_name, wd_name) # Also check alternative names in Wikidata for alt_name in wd_data.get("alternative_names", []): alt_score = similarity_score(inst_name, alt_name) score = max(score, alt_score) # Also check our alternative names against Wikidata for inst_alt_name in inst.get("alternative_names", []): alt_score = similarity_score(inst_alt_name, wd_name) score = max(score, alt_score) if score > best_score: best_score = score best_qid = qid best_data = wd_data # Only include matches above threshold if best_score >= threshold and best_qid and best_data: matches.append((idx, best_qid, best_score, best_data)) return matches def enrich_institution(inst: dict[str, Any], wd_data: dict[str, Any], confidence: float) -> bool: """Enrich an institution with Wikidata data. Returns True if enriched.""" enriched = False if "identifiers" not in inst or not inst["identifiers"]: inst["identifiers"] = [] identifiers_list = inst["identifiers"] existing_schemes = {i.get("identifier_scheme", "") for i in identifiers_list if isinstance(i, dict)} # Add Wikidata ID (or replace synthetic Q-number) wikidata_idx = None for i, id_obj in enumerate(identifiers_list): if isinstance(id_obj, dict) and id_obj.get("identifier_scheme") == "Wikidata": wikidata_idx = i break if wikidata_idx is not None: # Replace existing (possibly synthetic) Wikidata ID old_value = identifiers_list[wikidata_idx].get("identifier_value", "") if old_value != wd_data["qid"]: identifiers_list[wikidata_idx] = { "identifier_scheme": "Wikidata", "identifier_value": wd_data["qid"], "identifier_url": f"https://www.wikidata.org/wiki/{wd_data['qid']}" } enriched = True else: # Add new Wikidata ID identifiers_list.append({ "identifier_scheme": "Wikidata", "identifier_value": wd_data["qid"], "identifier_url": f"https://www.wikidata.org/wiki/{wd_data['qid']}" }) enriched = True # Add other identifiers wd_identifiers = wd_data.get("identifiers", {}) for scheme, value in wd_identifiers.items(): if scheme not in existing_schemes: id_obj = { "identifier_scheme": scheme, "identifier_value": value } if scheme == "VIAF": id_obj["identifier_url"] = f"https://viaf.org/viaf/{value}" elif scheme == "Website": id_obj["identifier_url"] = value elif scheme == "ISIL": # ISIL codes don't have a universal URL - identifier_value only pass identifiers_list.append(id_obj) enriched = True # Add founding date if "founding_date" in wd_data and not inst.get("founding_date"): inst["founding_date"] = wd_data["founding_date"] enriched = True # Add coordinates if missing if "latitude" in wd_data and "longitude" in wd_data: locations = inst.get("locations", []) if locations and len(locations) > 0: first_loc = locations[0] if isinstance(first_loc, dict) and first_loc.get("latitude") is None: first_loc["latitude"] = wd_data["latitude"] first_loc["longitude"] = wd_data["longitude"] enriched = True elif not locations: # Add new location with coordinates inst["locations"] = [{ "country": "EG", "latitude": wd_data["latitude"], "longitude": wd_data["longitude"], "is_primary": False }] enriched = True # Update provenance if enriched: prov = inst.get("provenance", {}) if isinstance(prov, dict): existing_method = prov.get("extraction_method", "") if existing_method: prov["extraction_method"] = f"{existing_method} + Wikidata enrichment (fuzzy match: {confidence:.3f})" else: prov["extraction_method"] = f"Wikidata enrichment (fuzzy match: {confidence:.3f})" # Increase confidence score for verified matches old_confidence = prov.get("confidence_score", 0.85) new_confidence = min(0.95, old_confidence + (confidence - 0.85) * 0.1) # Slight boost based on match quality prov["confidence_score"] = round(new_confidence, 2) return enriched def main(): base_dir = Path(__file__).parent.parent input_file = base_dir / "data" / "instances" / "egypt_institutions_viaf_enriched.yaml" output_file = base_dir / "data" / "instances" / "egypt_institutions_final_enriched.yaml" print("="*80) print("🇪🇬 EGYPTIAN INSTITUTIONS WIKIDATA ENRICHMENT") print("="*80) print(f"\n📖 Loading dataset...\n") start_time = time.time() with open(input_file, 'r', encoding='utf-8') as f: content = f.read() # Split header and data parts = content.split('---\n') header = parts[0] if len(parts) > 1 else "" yaml_content = parts[-1] institutions = yaml.safe_load(yaml_content) load_time = time.time() - start_time print(f"✅ Loaded {len(institutions):,} Egyptian institutions in {load_time:.1f}s") # Count those without real Wikidata without_wikidata = [ idx for idx, inst in enumerate(institutions) if not any( id_obj.get("identifier_scheme") == "Wikidata" and id_obj.get("identifier_value", "").startswith("Q") and int(id_obj.get("identifier_value", "Q999999999")[1:]) < 100000000 for id_obj in inst.get("identifiers", []) ) ] current_coverage = (len(institutions) - len(without_wikidata)) / len(institutions) * 100 if institutions else 0 new_coverage = current_coverage # Initialize new_coverage print(f"✅ With Wikidata: {len(institutions) - len(without_wikidata):,} ({current_coverage:.1f}%)") print(f"❓ Without Wikidata: {len(without_wikidata):,}\n") if not without_wikidata: print("✨ All institutions already have Wikidata IDs!") return # Setup SPARQL print("🔍 Querying Wikidata for Egyptian heritage institutions...") print(" (This may take 30-60 seconds)\n") sparql = SPARQLWrapper("https://query.wikidata.org/sparql") sparql.setReturnFormat(SPARQL_JSON) sparql.setMethod('POST') sparql.addCustomHttpHeader("User-Agent", "GLAM-Extractor/0.2 (Egypt enrichment)") # Query Wikidata wikidata_results = query_wikidata_institutions(sparql) print(f"✅ Found {len(wikidata_results):,} Egyptian institutions in Wikidata\n") if not wikidata_results: print("⚠️ No Wikidata results, skipping fuzzy matching") enriched_count = 0 else: # Fuzzy match print("🔗 Fuzzy matching names (threshold: 0.75)...\n") insts_without_wd = [institutions[idx] for idx in without_wikidata] matches = fuzzy_match_institutions(insts_without_wd, wikidata_results, threshold=0.75) print(f"✨ Found {len(matches):,} high-confidence matches\n") # Show sample matches if matches: print(f"{'='*80}") print(f"📋 SAMPLE MATCHES (Top 10)") print(f"{'='*80}") for i, (local_idx, qid, score, wd_data) in enumerate(matches[:10]): inst = insts_without_wd[local_idx] print(f"\n{i+1}. Confidence: {score:.3f}") print(f" Local: {inst.get('name')}") print(f" Wikidata: {wd_data.get('name')} ({wd_data.get('qid')})") print(f" Type: {wd_data.get('type', 'Unknown')}") if "ISIL" in wd_data.get("identifiers", {}): print(f" ISIL: {wd_data['identifiers']['ISIL']}") if "VIAF" in wd_data.get("identifiers", {}): print(f" VIAF: {wd_data['identifiers']['VIAF']}") print(f"\n{'='*80}\n") # Apply all matches print("✅ Applying all matches...\n") enriched_count = 0 for local_idx, qid, score, wd_data in matches: global_idx = without_wikidata[local_idx] if enrich_institution(institutions[global_idx], wd_data, score): enriched_count += 1 new_coverage = (len(institutions) - len(without_wikidata) + enriched_count) / len(institutions) * 100 if institutions else 0 print(f"✨ Enriched {enriched_count:,} institutions") print(f"📈 Coverage: {current_coverage:.1f}% → {new_coverage:.1f}%\n") else: print("❌ No matches found. Try lowering threshold.\n") enriched_count = 0 # Write output print("="*80) print("💾 WRITING ENRICHED DATASET") print("="*80 + "\n") output_header = f"""# Egyptian GLAM Institutions Dataset - Wikidata Enriched # Original extraction: 39e11630-a2af-407c-a365-d485eb8257b0 # Enriched: {datetime.now(timezone.utc).isoformat()} # # Total institutions: {len(institutions)} # Wikidata coverage: {new_coverage:.1f}% # New Wikidata matches: {enriched_count} # # Coverage by type: # - ARCHIVE: 1 # - GALLERY: 5 # - LIBRARY: 12 # - MUSEUM: 6 # - OFFICIAL_INSTITUTION: 2 # - RESEARCH_CENTER: 3 # --- """ with open(output_file, 'w', encoding='utf-8') as f: f.write(output_header) yaml.dump(institutions, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120) print(f"✅ Complete! Output: {output_file.name}\n") # Final report print("="*80) print("📊 FINAL ENRICHMENT REPORT") print("="*80) print(f"\n✨ Results:") print(f" Total institutions: {len(institutions):,}") print(f" Institutions enriched: {enriched_count:,}") print(f" Institutions without Wikidata: {len(without_wikidata) - enriched_count:,}") print(f" Wikidata coverage: {current_coverage:.1f}% → {new_coverage:.1f}%") print(f"\n⏱️ Total processing time: {(time.time()-start_time):.1f} seconds") print("="*80 + "\n") if __name__ == "__main__": main()