#!/usr/bin/env python3 """ Enrich Belgian (BE) custodian files with Wikidata data using ISIL identifiers. ISIL codes are stored in Wikidata as property P791. This script queries Wikidata for entities with matching ISIL codes. """ import yaml import glob import time import httpx from datetime import datetime, timezone from pathlib import Path import logging import sys # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('be_isil_enrichment.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) WIKIDATA_SPARQL = "https://query.wikidata.org/sparql" WIKIDATA_API = "https://www.wikidata.org/w/api.php" USER_AGENT = "GLAMBot/1.0 (Heritage Custodian Enrichment; contact@example.org)" def query_wikidata_by_isil(isil_code: str) -> dict | None: """Query Wikidata for an entity with the given ISIL code (P791).""" sparql_query = f""" SELECT ?item ?itemLabel ?itemDescription ?website ?image ?inception WHERE {{ ?item wdt:P791 "{isil_code}" . OPTIONAL {{ ?item wdt:P856 ?website . }} OPTIONAL {{ ?item wdt:P18 ?image . }} OPTIONAL {{ ?item wdt:P571 ?inception . }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "nl,fr,de,en". }} }} LIMIT 1 """ headers = { "User-Agent": USER_AGENT, "Accept": "application/sparql-results+json" } try: response = httpx.get( WIKIDATA_SPARQL, params={"query": sparql_query, "format": "json"}, headers=headers, timeout=30.0 ) response.raise_for_status() data = response.json() bindings = data.get("results", {}).get("bindings", []) if bindings: result = bindings[0] item_uri = result.get("item", {}).get("value", "") wikidata_id = item_uri.split("/")[-1] if item_uri else None return { "wikidata_id": wikidata_id, "wikidata_url": item_uri, "wikidata_label": result.get("itemLabel", {}).get("value"), "wikidata_description": result.get("itemDescription", {}).get("value"), "official_website": result.get("website", {}).get("value"), "image": result.get("image", {}).get("value"), "inception": result.get("inception", {}).get("value"), } except Exception as e: logger.error(f"Error querying Wikidata for ISIL {isil_code}: {e}") return None def get_instance_of(wikidata_id: str) -> list[str]: """Get instance_of (P31) values for a Wikidata entity.""" sparql_query = f""" SELECT ?type ?typeLabel WHERE {{ wd:{wikidata_id} wdt:P31 ?type . SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }} }} """ headers = { "User-Agent": USER_AGENT, "Accept": "application/sparql-results+json" } try: response = httpx.get( WIKIDATA_SPARQL, params={"query": sparql_query, "format": "json"}, headers=headers, timeout=30.0 ) response.raise_for_status() data = response.json() types = [] for binding in data.get("results", {}).get("bindings", []): type_uri = binding.get("type", {}).get("value", "") type_id = type_uri.split("/")[-1] if type_uri else None if type_id: types.append(type_id) return types except Exception as e: logger.error(f"Error getting instance_of for {wikidata_id}: {e}") return [] def extract_isil(data: dict) -> str | None: """Extract ISIL code from custodian data.""" # Check original_entry.identifiers for i in data.get('original_entry', {}).get('identifiers', []): if i.get('identifier_scheme') == 'ISIL': return i.get('identifier_value') # Check top-level identifiers for i in data.get('identifiers', []): if i.get('identifier_scheme') == 'ISIL': return i.get('identifier_value') return None def enrich_file(filepath: Path) -> bool: """Enrich a single custodian file with Wikidata data.""" try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return False # Skip if already enriched if 'wikidata_enrichment' in data: return False # Get ISIL code isil = extract_isil(data) if not isil: return False # Query Wikidata result = query_wikidata_by_isil(isil) if not result or not result.get('wikidata_id'): logger.info(f"No Wikidata match for ISIL {isil}") return False # Get instance_of types instance_of = get_instance_of(result['wikidata_id']) time.sleep(0.3) # Rate limiting # Build enrichment block enrichment = { 'wikidata_id': result['wikidata_id'], 'wikidata_url': result['wikidata_url'], 'matched_by': 'isil_identifier', 'matched_isil': isil, 'enrichment_date': datetime.now(timezone.utc).isoformat(), 'enrichment_version': '2.1.0', } if result.get('wikidata_label'): enrichment['wikidata_label'] = result['wikidata_label'] if result.get('wikidata_description'): enrichment['wikidata_description'] = result['wikidata_description'] if result.get('official_website'): enrichment['official_website'] = result['official_website'] if result.get('image'): enrichment['image'] = result['image'] if result.get('inception'): enrichment['inception'] = result['inception'] if instance_of: enrichment['instance_of'] = instance_of # Add to data data['wikidata_enrichment'] = enrichment # Write back with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) logger.info(f"Enriched {filepath.name} with {result['wikidata_id']} (ISIL: {isil})") return True except Exception as e: logger.error(f"Error processing {filepath}: {e}") return False def main(): """Main enrichment loop.""" data_dir = Path("data/custodian") be_files = sorted(data_dir.glob("BE-*.yaml")) logger.info(f"Found {len(be_files)} Belgian custodian files") enriched_count = 0 skipped_count = 0 failed_count = 0 for i, filepath in enumerate(be_files): if (i + 1) % 50 == 0: logger.info(f"Progress: {i+1}/{len(be_files)} files processed") try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if 'wikidata_enrichment' in data: skipped_count += 1 continue isil = extract_isil(data) if not isil: skipped_count += 1 continue except Exception as e: logger.error(f"Error reading {filepath}: {e}") failed_count += 1 continue if enrich_file(filepath): enriched_count += 1 else: failed_count += 1 time.sleep(0.5) # Rate limiting between files logger.info("=" * 60) logger.info("ENRICHMENT COMPLETE") logger.info(f"Total files: {len(be_files)}") logger.info(f"Enriched: {enriched_count}") logger.info(f"Skipped (already enriched or no ISIL): {skipped_count}") logger.info(f"Failed/No match: {failed_count}") if __name__ == "__main__": main()