glam/scripts/enrich_belgium_isil.py
2025-12-21 00:01:54 +01:00

250 lines
7.9 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Enrich Belgian (BE) custodian files with Wikidata data using ISIL identifiers.
ISIL codes are stored in Wikidata as property P791.
This script queries Wikidata for entities with matching ISIL codes.
"""
import yaml
import glob
import time
import httpx
from datetime import datetime, timezone
from pathlib import Path
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('be_isil_enrichment.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
WIKIDATA_SPARQL = "https://query.wikidata.org/sparql"
WIKIDATA_API = "https://www.wikidata.org/w/api.php"
USER_AGENT = "GLAMBot/1.0 (Heritage Custodian Enrichment; contact@example.org)"
def query_wikidata_by_isil(isil_code: str) -> dict | None:
"""Query Wikidata for an entity with the given ISIL code (P791)."""
sparql_query = f"""
SELECT ?item ?itemLabel ?itemDescription ?website ?image ?inception WHERE {{
?item wdt:P791 "{isil_code}" .
OPTIONAL {{ ?item wdt:P856 ?website . }}
OPTIONAL {{ ?item wdt:P18 ?image . }}
OPTIONAL {{ ?item wdt:P571 ?inception . }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "nl,fr,de,en". }}
}}
LIMIT 1
"""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/sparql-results+json"
}
try:
response = httpx.get(
WIKIDATA_SPARQL,
params={"query": sparql_query, "format": "json"},
headers=headers,
timeout=30.0
)
response.raise_for_status()
data = response.json()
bindings = data.get("results", {}).get("bindings", [])
if bindings:
result = bindings[0]
item_uri = result.get("item", {}).get("value", "")
wikidata_id = item_uri.split("/")[-1] if item_uri else None
return {
"wikidata_id": wikidata_id,
"wikidata_url": item_uri,
"wikidata_label": result.get("itemLabel", {}).get("value"),
"wikidata_description": result.get("itemDescription", {}).get("value"),
"official_website": result.get("website", {}).get("value"),
"image": result.get("image", {}).get("value"),
"inception": result.get("inception", {}).get("value"),
}
except Exception as e:
logger.error(f"Error querying Wikidata for ISIL {isil_code}: {e}")
return None
def get_instance_of(wikidata_id: str) -> list[str]:
"""Get instance_of (P31) values for a Wikidata entity."""
sparql_query = f"""
SELECT ?type ?typeLabel WHERE {{
wd:{wikidata_id} wdt:P31 ?type .
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
}}
"""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/sparql-results+json"
}
try:
response = httpx.get(
WIKIDATA_SPARQL,
params={"query": sparql_query, "format": "json"},
headers=headers,
timeout=30.0
)
response.raise_for_status()
data = response.json()
types = []
for binding in data.get("results", {}).get("bindings", []):
type_uri = binding.get("type", {}).get("value", "")
type_id = type_uri.split("/")[-1] if type_uri else None
if type_id:
types.append(type_id)
return types
except Exception as e:
logger.error(f"Error getting instance_of for {wikidata_id}: {e}")
return []
def extract_isil(data: dict) -> str | None:
"""Extract ISIL code from custodian data."""
# Check original_entry.identifiers
for i in data.get('original_entry', {}).get('identifiers', []):
if i.get('identifier_scheme') == 'ISIL':
return i.get('identifier_value')
# Check top-level identifiers
for i in data.get('identifiers', []):
if i.get('identifier_scheme') == 'ISIL':
return i.get('identifier_value')
return None
def enrich_file(filepath: Path) -> bool:
"""Enrich a single custodian file with Wikidata data."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return False
# Skip if already enriched
if 'wikidata_enrichment' in data:
return False
# Get ISIL code
isil = extract_isil(data)
if not isil:
return False
# Query Wikidata
result = query_wikidata_by_isil(isil)
if not result or not result.get('wikidata_id'):
logger.info(f"No Wikidata match for ISIL {isil}")
return False
# Get instance_of types
instance_of = get_instance_of(result['wikidata_id'])
time.sleep(0.3) # Rate limiting
# Build enrichment block
enrichment = {
'wikidata_id': result['wikidata_id'],
'wikidata_url': result['wikidata_url'],
'matched_by': 'isil_identifier',
'matched_isil': isil,
'enrichment_date': datetime.now(timezone.utc).isoformat(),
'enrichment_version': '2.1.0',
}
if result.get('wikidata_label'):
enrichment['wikidata_label'] = result['wikidata_label']
if result.get('wikidata_description'):
enrichment['wikidata_description'] = result['wikidata_description']
if result.get('official_website'):
enrichment['official_website'] = result['official_website']
if result.get('image'):
enrichment['image'] = result['image']
if result.get('inception'):
enrichment['inception'] = result['inception']
if instance_of:
enrichment['instance_of'] = instance_of
# Add to data
data['wikidata_enrichment'] = enrichment
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
logger.info(f"Enriched {filepath.name} with {result['wikidata_id']} (ISIL: {isil})")
return True
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
return False
def main():
"""Main enrichment loop."""
data_dir = Path("data/custodian")
be_files = sorted(data_dir.glob("BE-*.yaml"))
logger.info(f"Found {len(be_files)} Belgian custodian files")
enriched_count = 0
skipped_count = 0
failed_count = 0
for i, filepath in enumerate(be_files):
if (i + 1) % 50 == 0:
logger.info(f"Progress: {i+1}/{len(be_files)} files processed")
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if 'wikidata_enrichment' in data:
skipped_count += 1
continue
isil = extract_isil(data)
if not isil:
skipped_count += 1
continue
except Exception as e:
logger.error(f"Error reading {filepath}: {e}")
failed_count += 1
continue
if enrich_file(filepath):
enriched_count += 1
else:
failed_count += 1
time.sleep(0.5) # Rate limiting between files
logger.info("=" * 60)
logger.info("ENRICHMENT COMPLETE")
logger.info(f"Total files: {len(be_files)}")
logger.info(f"Enriched: {enriched_count}")
logger.info(f"Skipped (already enriched or no ISIL): {skipped_count}")
logger.info(f"Failed/No match: {failed_count}")
if __name__ == "__main__":
main()