glam/scripts/enrich_tunisia_wikidata_validated.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

486 lines
19 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Wikidata enrichment for Tunisian heritage institutions with entity type and geographic validation.
This version adds TWO layers of semantic validation to eliminate false positives:
- Verifies entity type matches institution type (museums should be museums, not banks)
- Verifies geographic location for location-specific institutions (universities, research centers)
- Uses fuzzy matching for multilingual name variations
- Prevents matches like "Banque de Tunisie", "lac de Tunis", or wrong-city universities
GLAM Data Extraction Project
Schema: LinkML v0.2.1
"""
import yaml
import time
import requests
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List, Set
from rapidfuzz import fuzz
SPARQL_ENDPOINT = "https://query.wikidata.org/sparql"
USER_AGENT = "GLAM-Tunisia-Wikidata-Enrichment/3.0"
# Institution type mapping: LinkML enum -> Wikidata entity types
INSTITUTION_TYPE_MAPPING = {
'MUSEUM': {
'Q33506', # Museum
'Q1030034', # Archaeological museum
'Q3329412', # Archaeological museum (variant - Tunisia specific)
'Q473972', # Art museum
'Q2668072', # National museum
'Q207694', # History museum
'Q7328910', # Science museum
'Q15243387', # Cultural heritage site
'Q3152824', # Archaeological site (for heritage museums)
'Q1153562', # Open-air museum
'Q1496967', # Folk museum
'Q17431399', # Heritage museum
'Q28835878', # Heritage site
},
'LIBRARY': {
'Q7075', # Library
'Q2668072', # National library
'Q570116', # Public library
'Q5193377', # University library
'Q28564', # Academic library
'Q1479716', # Regional library
'Q1622062', # Digital library
'Q17297735', # Diocesan library
'Q105338594', # Bibliothèque diocésaine (specific diocesan library subtype)
},
'ARCHIVE': {
'Q166118', # Archive
'Q7840289', # Art gallery (can have archival collections)
'Q2668072', # National archive
'Q1497375', # Historical archive
'Q64578911', # Regional archive
},
'HOLY_SITES': {
'Q22687', # Synagogue
'Q16970', # Church
'Q32815', # Mosque
'Q44539', # Temple
'Q44613', # Monastery
'Q34627', # Synagogue (duplicate but included for safety)
'Q697295', # Cathedral
'Q56242275', # Pilgrimage site
},
'GALLERY': {
'Q7840289', # Art gallery
'Q473972', # Art museum
'Q1007870', # Art centre
},
'UNIVERSITY': {
'Q3918', # University
'Q875538', # Public university
'Q2467461', # Private university
'Q15936437', # Research university
'Q38723', # Higher education institution
'Q3354859', # Technical university
},
'RESEARCH_CENTER': {
'Q31855', # Research institute
'Q7315155', # Research center
'Q2467461', # Research institution
'Q483242', # Laboratory
'Q1664720', # Institute
},
'EDUCATION_PROVIDER': {
'Q2385804', # Educational institution
'Q5341295', # Music school
'Q1664720', # Institute
'Q180958', # Faculty
'Q38723', # Higher education institution
},
'OFFICIAL_INSTITUTION': {
'Q7210356', # Cultural institution
'Q7840289', # Cultural center
'Q1030034', # Cultural heritage institution
'Q1664720', # Institute
'Q7210356', # Government cultural organization
},
'PERSONAL_COLLECTION': {
'Q7075', # Library (personal libraries are still libraries)
'Q166118', # Archive (personal archives)
'Q33506', # Museum (personal museums)
},
'MIXED': {
'Q33506', # Museum
'Q7075', # Library
'Q166118', # Archive
'Q7210356', # Cultural institution
'Q7840289', # Cultural center
'Q1030034', # Cultural complex
}
}
def get_valid_types_for_institution(inst_type: str) -> Set[str]:
"""Get set of valid Wikidata entity types for institution type."""
return INSTITUTION_TYPE_MAPPING.get(inst_type, set())
def search_wikidata_with_validation(
name: str,
inst_type: str,
city: Optional[str] = None,
alternative_names: Optional[List[str]] = None,
timeout: int = 60
) -> Optional[Dict[str, Any]]:
"""
Search Wikidata for Tunisian heritage institutions with entity type and geographic validation.
Args:
name: Institution name to search
inst_type: Institution type (MUSEUM, LIBRARY, ARCHIVE, etc.)
city: Optional city name for additional filtering
alternative_names: List of alternative names to try if primary name fails
timeout: Query timeout in seconds
Returns:
Dict with Wikidata data if valid match found, None otherwise
"""
# Get valid Wikidata entity types for this institution type
valid_types = get_valid_types_for_institution(inst_type)
if not valid_types:
print(f" ⚠️ Unknown institution type: {inst_type}")
return None
# Build VALUES clause for SPARQL query - filter by institution type server-side
# This ensures we get relevant results within the 200-result LIMIT
type_values = " ".join([f"wd:{qid}" for qid in valid_types])
# Build SPARQL query - fetch Tunisian institutions matching the specific type
query = f"""
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?type ?typeLabel
?viaf ?isil ?website ?coords ?inception ?itemAltLabel
?location ?locationLabel
WHERE {{
# Must be in Tunisia
?item wdt:P17 wd:Q948 .
# Must have an instance-of type matching our institution type
?item wdt:P31 ?type .
# Filter to relevant types for this institution (server-side filtering)
VALUES ?type {{ {type_values} }}
# Add location (P131: located in administrative territorial entity)
OPTIONAL {{ ?item wdt:P131 ?location . }}
OPTIONAL {{ ?item wdt:P214 ?viaf . }}
OPTIONAL {{ ?item wdt:P791 ?isil . }}
OPTIONAL {{ ?item wdt:P856 ?website . }}
OPTIONAL {{ ?item wdt:P625 ?coords . }}
OPTIONAL {{ ?item wdt:P571 ?inception . }}
OPTIONAL {{ ?item skos:altLabel ?itemAltLabel . FILTER(LANG(?itemAltLabel) IN ("fr", "ar", "en")) }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "fr,ar,en" . }}
}}
LIMIT 200
"""
headers = {'User-Agent': USER_AGENT}
params = {
'query': query,
'format': 'json'
}
try:
time.sleep(1.5) # Rate limiting
response = requests.get(SPARQL_ENDPOINT, params=params, headers=headers, timeout=timeout)
response.raise_for_status()
results = response.json()
bindings = results.get("results", {}).get("bindings", [])
if not bindings:
return None
# Fuzzy match against results WITH entity type AND geographic validation
best_match = None
best_score = 0
matched_name = name # Track which name produced the match
# Prepare all names to try (primary + alternatives)
names_to_try = [name]
if alternative_names:
names_to_try.extend(alternative_names)
city_lower = city.lower() if city else None
# Location-specific institution types require stricter geographic matching
requires_city_match = inst_type in {'UNIVERSITY', 'RESEARCH_CENTER', 'EDUCATION_PROVIDER'}
# Try each name variation
for name_variant in names_to_try:
name_lower = name_variant.lower()
for binding in bindings:
# CRITICAL: Validate entity type FIRST
entity_type_uri = binding.get("type", {}).get("value", "")
entity_type_qid = entity_type_uri.split("/")[-1] if entity_type_uri else None
# Skip if entity type doesn't match our institution type
if entity_type_qid not in valid_types:
continue
# GEOGRAPHIC VALIDATION: Check location match for location-specific institutions
if city_lower and requires_city_match:
location_label = binding.get("locationLabel", {}).get("value", "").lower() if binding.get("locationLabel") else ""
# Must have location data
if not location_label:
continue
# Location must match expected city (fuzzy match for spelling variations)
location_match = fuzz.ratio(city_lower, location_label)
if location_match < 70: # Location mismatch - skip this result
continue
# Now do fuzzy matching on validated entities only
item_label = binding.get("itemLabel", {}).get("value", "").lower()
item_desc = binding.get("itemDescription", {}).get("value", "").lower()
# Calculate match score using multiple strategies
label_score = fuzz.ratio(name_lower, item_label)
partial_score = fuzz.partial_ratio(name_lower, item_label)
token_score = fuzz.token_set_ratio(name_lower, item_label)
# Best of the three fuzzy match strategies
score = max(label_score, partial_score, token_score)
if score > best_score:
best_score = score
best_match = binding
matched_name = name_variant # Record which name variation matched
# Require minimum 65% match (lowered to capture multilingual variations)
if best_score < 65:
return None
# Extract data from best match
if not best_match:
return None
item_uri = best_match.get("item", {}).get("value", "")
qid = item_uri.split("/")[-1] if item_uri else None
if not qid or not qid.startswith("Q"):
return None
result = {
"qid": qid,
"name": best_match.get("itemLabel", {}).get("value", ""),
"description": best_match.get("itemDescription", {}).get("value", ""),
"entity_type": best_match.get("typeLabel", {}).get("value", ""),
"match_score": best_score,
"matched_name": matched_name # Record which name variant matched
}
# Add optional fields if present
viaf_data = best_match.get("viaf")
if viaf_data and isinstance(viaf_data, dict):
result["viaf"] = viaf_data.get("value", "")
isil_data = best_match.get("isil")
if isil_data and isinstance(isil_data, dict):
result["isil"] = isil_data.get("value", "")
website_data = best_match.get("website")
if website_data and isinstance(website_data, dict):
result["website"] = website_data.get("value", "")
inception_data = best_match.get("inception")
if inception_data and isinstance(inception_data, dict):
result["founded_date"] = inception_data.get("value", "").split("T")[0]
coords_data = best_match.get("coords")
if coords_data and isinstance(coords_data, dict):
coords_str = coords_data.get("value", "")
if coords_str and coords_str.startswith("Point("):
lon, lat = coords_str[6:-1].split()
result["latitude"] = float(lat)
result["longitude"] = float(lon)
return result
except requests.exceptions.Timeout:
print(f" ⏱️ Query timeout (>{timeout}s)")
return None
except requests.exceptions.RequestException as e:
print(f" ❌ Network error: {e}")
return None
except Exception as e:
print(f" ❌ Error: {e}")
return None
def add_wikidata_to_institution(institution: dict, wikidata_result: dict):
"""Add Wikidata information to institution record."""
# Add Wikidata identifier
if 'identifiers' not in institution:
institution['identifiers'] = []
# Check if Wikidata already exists
existing_schemes = {i.get('identifier_scheme') for i in institution['identifiers']}
if 'Wikidata' not in existing_schemes:
institution['identifiers'].append({
'identifier_scheme': 'Wikidata',
'identifier_value': wikidata_result['qid'],
'identifier_url': f"https://www.wikidata.org/wiki/{wikidata_result['qid']}"
})
# Add VIAF if present
if wikidata_result.get('viaf') and 'VIAF' not in existing_schemes:
institution['identifiers'].append({
'identifier_scheme': 'VIAF',
'identifier_value': wikidata_result['viaf'],
'identifier_url': f"https://viaf.org/viaf/{wikidata_result['viaf']}"
})
# Add ISIL if present
if wikidata_result.get('isil') and 'ISIL' not in existing_schemes:
institution['identifiers'].append({
'identifier_scheme': 'ISIL',
'identifier_value': wikidata_result['isil'],
'identifier_url': f"https://isil.org/{wikidata_result['isil']}"
})
# Update provenance notes with entity type validation
if 'provenance' not in institution:
institution['provenance'] = {}
notes = institution['provenance'].get('notes', '')
entity_type = wikidata_result.get('entity_type', 'unknown')
enrich_note = f" Wikidata enriched {datetime.now(timezone.utc).strftime('%Y-%m-%d')} ({wikidata_result['qid']} [{entity_type}], match: {wikidata_result.get('match_score', 0):.0f}%, validated)."
institution['provenance']['notes'] = (notes + enrich_note).strip()
def save_checkpoint(data: dict, input_file: Path, stats: dict):
"""Save progress checkpoint."""
print(f"\n💾 Saving checkpoint... (enriched: {stats['enriched']}, total coverage: {stats['already_enriched'] + stats['enriched']}/{stats['total']})")
data['_metadata']['generated'] = datetime.now(timezone.utc).isoformat()
if 'Wikidata enrichment (validated)' not in data['_metadata'].get('enhancements', []):
data['_metadata']['enhancements'].append('Wikidata enrichment (validated)')
with open(input_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
def main():
input_file = Path('data/instances/tunisia/tunisian_institutions_enhanced.yaml')
print("Tunisia Wikidata Enrichment (Entity Type + Geographic Validated)")
print("=" * 60)
print("Features:")
print(" ✅ Entity type validation (museums must be museums, not banks)")
print(" ✅ Geographic validation (universities must be in correct city)")
print(" ✅ Fuzzy name matching (70% threshold)")
print(" ✅ Multilingual support (French, Arabic, English)")
print(" ✅ Checkpoint saving every 10 institutions")
print(" ✅ Prevents false positives (Banque, lac, wrong-city matches)")
print("=" * 60)
# Load data
print(f"\nReading: {input_file}")
with open(input_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
institutions = data['institutions']
print(f"Total institutions: {len(institutions)}")
# Statistics
stats = {
'total': len(institutions),
'already_enriched': 0,
'searched': 0,
'found': 0,
'enriched': 0,
'failed': 0,
'type_mismatch': 0,
'low_confidence': 0
}
# Process each institution
checkpoint_interval = 10
for i, inst in enumerate(institutions, 1):
name = inst.get('name', '')
inst_type = inst.get('institution_type', 'MIXED')
city = inst.get('locations', [{}])[0].get('city', '') if inst.get('locations') else ''
# Check if already has Wikidata
identifiers = inst.get('identifiers', [])
existing_schemes = {id.get('identifier_scheme') for id in identifiers}
if 'Wikidata' in existing_schemes:
stats['already_enriched'] += 1
qid = next((id['identifier_value'] for id in identifiers if id.get('identifier_scheme') == 'Wikidata'), 'unknown')
print(f"[{i}/{len(institutions)}] ✓ {name} (already has {qid})")
continue
# Search Wikidata with type validation
print(f"[{i}/{len(institutions)}] Searching: {name} [{inst_type}] ({city})")
stats['searched'] += 1
# Extract alternative names for multilingual matching
alt_names = inst.get('alternative_names', [])
result = search_wikidata_with_validation(name, inst_type, city, alternative_names=alt_names, timeout=60)
if result:
stats['found'] += 1
match_score = result.get('match_score', 0)
entity_type = result.get('entity_type', 'unknown')
matched_name = result.get('matched_name', name)
# Show which name variant was used for matching
name_note = f" [matched: {matched_name}]" if matched_name != name else ""
print(f" ✅ Found: {result['qid']} [{entity_type}] - {result.get('name', '')} (match: {match_score:.0f}%{name_note})")
# Accept matches above 70% (already validated by entity type)
if match_score >= 70:
add_wikidata_to_institution(inst, result)
stats['enriched'] += 1
print(f" ✅ Enriched with validated match")
else:
stats['low_confidence'] += 1
stats['failed'] += 1
print(f" ⚠️ Match score too low, skipping")
else:
stats['failed'] += 1
print(f" ❌ Not found or type mismatch")
# Checkpoint every N institutions
if i % checkpoint_interval == 0 or i == len(institutions):
save_checkpoint(data, input_file, stats)
# Final save
save_checkpoint(data, input_file, stats)
# Print statistics
print("\n" + "=" * 60)
print("WIKIDATA ENRICHMENT STATISTICS (VALIDATED)")
print("=" * 60)
print(f"Total institutions: {stats['total']}")
print(f"Already enriched: {stats['already_enriched']}")
print(f"Searched: {stats['searched']}")
print(f"Found (validated): {stats['found']}")
print(f"Enriched (new): {stats['enriched']}")
print(f"Failed: {stats['failed']}")
print(f" - Type mismatches filtered: {stats['type_mismatch']}")
print(f" - Low confidence: {stats['low_confidence']}")
print(f"\nFinal Wikidata coverage: {stats['already_enriched'] + stats['enriched']}/{stats['total']} ({100*(stats['already_enriched'] + stats['enriched'])/stats['total']:.1f}%)")
if stats['enriched'] > 0:
improvement = stats['enriched']
print(f"✨ Added {improvement} new validated Wikidata identifiers!")
print(f"✅ All matches validated against correct entity types")
print("\n✅ Wikidata enrichment complete!")
if __name__ == '__main__':
main()