250 lines
7.9 KiB
Python
Executable file
250 lines
7.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Enrich Belgian (BE) custodian files with Wikidata data using ISIL identifiers.
|
|
|
|
ISIL codes are stored in Wikidata as property P791.
|
|
This script queries Wikidata for entities with matching ISIL codes.
|
|
"""
|
|
|
|
import yaml
|
|
import glob
|
|
import time
|
|
import httpx
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
import logging
|
|
import sys
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('be_isil_enrichment.log'),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
WIKIDATA_SPARQL = "https://query.wikidata.org/sparql"
|
|
WIKIDATA_API = "https://www.wikidata.org/w/api.php"
|
|
|
|
USER_AGENT = "GLAMBot/1.0 (Heritage Custodian Enrichment; contact@example.org)"
|
|
|
|
def query_wikidata_by_isil(isil_code: str) -> dict | None:
|
|
"""Query Wikidata for an entity with the given ISIL code (P791)."""
|
|
sparql_query = f"""
|
|
SELECT ?item ?itemLabel ?itemDescription ?website ?image ?inception WHERE {{
|
|
?item wdt:P791 "{isil_code}" .
|
|
OPTIONAL {{ ?item wdt:P856 ?website . }}
|
|
OPTIONAL {{ ?item wdt:P18 ?image . }}
|
|
OPTIONAL {{ ?item wdt:P571 ?inception . }}
|
|
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "nl,fr,de,en". }}
|
|
}}
|
|
LIMIT 1
|
|
"""
|
|
|
|
headers = {
|
|
"User-Agent": USER_AGENT,
|
|
"Accept": "application/sparql-results+json"
|
|
}
|
|
|
|
try:
|
|
response = httpx.get(
|
|
WIKIDATA_SPARQL,
|
|
params={"query": sparql_query, "format": "json"},
|
|
headers=headers,
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
bindings = data.get("results", {}).get("bindings", [])
|
|
if bindings:
|
|
result = bindings[0]
|
|
item_uri = result.get("item", {}).get("value", "")
|
|
wikidata_id = item_uri.split("/")[-1] if item_uri else None
|
|
|
|
return {
|
|
"wikidata_id": wikidata_id,
|
|
"wikidata_url": item_uri,
|
|
"wikidata_label": result.get("itemLabel", {}).get("value"),
|
|
"wikidata_description": result.get("itemDescription", {}).get("value"),
|
|
"official_website": result.get("website", {}).get("value"),
|
|
"image": result.get("image", {}).get("value"),
|
|
"inception": result.get("inception", {}).get("value"),
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error querying Wikidata for ISIL {isil_code}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def get_instance_of(wikidata_id: str) -> list[str]:
|
|
"""Get instance_of (P31) values for a Wikidata entity."""
|
|
sparql_query = f"""
|
|
SELECT ?type ?typeLabel WHERE {{
|
|
wd:{wikidata_id} wdt:P31 ?type .
|
|
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
|
|
}}
|
|
"""
|
|
|
|
headers = {
|
|
"User-Agent": USER_AGENT,
|
|
"Accept": "application/sparql-results+json"
|
|
}
|
|
|
|
try:
|
|
response = httpx.get(
|
|
WIKIDATA_SPARQL,
|
|
params={"query": sparql_query, "format": "json"},
|
|
headers=headers,
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
types = []
|
|
for binding in data.get("results", {}).get("bindings", []):
|
|
type_uri = binding.get("type", {}).get("value", "")
|
|
type_id = type_uri.split("/")[-1] if type_uri else None
|
|
if type_id:
|
|
types.append(type_id)
|
|
return types
|
|
except Exception as e:
|
|
logger.error(f"Error getting instance_of for {wikidata_id}: {e}")
|
|
|
|
return []
|
|
|
|
|
|
def extract_isil(data: dict) -> str | None:
|
|
"""Extract ISIL code from custodian data."""
|
|
# Check original_entry.identifiers
|
|
for i in data.get('original_entry', {}).get('identifiers', []):
|
|
if i.get('identifier_scheme') == 'ISIL':
|
|
return i.get('identifier_value')
|
|
|
|
# Check top-level identifiers
|
|
for i in data.get('identifiers', []):
|
|
if i.get('identifier_scheme') == 'ISIL':
|
|
return i.get('identifier_value')
|
|
|
|
return None
|
|
|
|
|
|
def enrich_file(filepath: Path) -> bool:
|
|
"""Enrich a single custodian file with Wikidata data."""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if not data:
|
|
return False
|
|
|
|
# Skip if already enriched
|
|
if 'wikidata_enrichment' in data:
|
|
return False
|
|
|
|
# Get ISIL code
|
|
isil = extract_isil(data)
|
|
if not isil:
|
|
return False
|
|
|
|
# Query Wikidata
|
|
result = query_wikidata_by_isil(isil)
|
|
if not result or not result.get('wikidata_id'):
|
|
logger.info(f"No Wikidata match for ISIL {isil}")
|
|
return False
|
|
|
|
# Get instance_of types
|
|
instance_of = get_instance_of(result['wikidata_id'])
|
|
time.sleep(0.3) # Rate limiting
|
|
|
|
# Build enrichment block
|
|
enrichment = {
|
|
'wikidata_id': result['wikidata_id'],
|
|
'wikidata_url': result['wikidata_url'],
|
|
'matched_by': 'isil_identifier',
|
|
'matched_isil': isil,
|
|
'enrichment_date': datetime.now(timezone.utc).isoformat(),
|
|
'enrichment_version': '2.1.0',
|
|
}
|
|
|
|
if result.get('wikidata_label'):
|
|
enrichment['wikidata_label'] = result['wikidata_label']
|
|
if result.get('wikidata_description'):
|
|
enrichment['wikidata_description'] = result['wikidata_description']
|
|
if result.get('official_website'):
|
|
enrichment['official_website'] = result['official_website']
|
|
if result.get('image'):
|
|
enrichment['image'] = result['image']
|
|
if result.get('inception'):
|
|
enrichment['inception'] = result['inception']
|
|
if instance_of:
|
|
enrichment['instance_of'] = instance_of
|
|
|
|
# Add to data
|
|
data['wikidata_enrichment'] = enrichment
|
|
|
|
# Write back
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
logger.info(f"Enriched {filepath.name} with {result['wikidata_id']} (ISIL: {isil})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {filepath}: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""Main enrichment loop."""
|
|
data_dir = Path("data/custodian")
|
|
be_files = sorted(data_dir.glob("BE-*.yaml"))
|
|
|
|
logger.info(f"Found {len(be_files)} Belgian custodian files")
|
|
|
|
enriched_count = 0
|
|
skipped_count = 0
|
|
failed_count = 0
|
|
|
|
for i, filepath in enumerate(be_files):
|
|
if (i + 1) % 50 == 0:
|
|
logger.info(f"Progress: {i+1}/{len(be_files)} files processed")
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if 'wikidata_enrichment' in data:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
isil = extract_isil(data)
|
|
if not isil:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading {filepath}: {e}")
|
|
failed_count += 1
|
|
continue
|
|
|
|
if enrich_file(filepath):
|
|
enriched_count += 1
|
|
else:
|
|
failed_count += 1
|
|
|
|
time.sleep(0.5) # Rate limiting between files
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("ENRICHMENT COMPLETE")
|
|
logger.info(f"Total files: {len(be_files)}")
|
|
logger.info(f"Enriched: {enriched_count}")
|
|
logger.info(f"Skipped (already enriched or no ISIL): {skipped_count}")
|
|
logger.info(f"Failed/No match: {failed_count}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|