#!/usr/bin/env python3 """ Enrich KIEN entries with Wikidata data. This script searches Wikidata for matching entities for each KIEN organization and adds wikidata_enrichment data to the entry files. Usage: python scripts/enrich_kien_wikidata.py [--dry-run] [--limit N] """ import argparse import json import re import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional, Union import requests import yaml PROJECT_ROOT = Path(__file__).parent.parent # Wikidata API endpoints WIKIDATA_API = "https://www.wikidata.org/w/api.php" WIKIDATA_SPARQL = "https://query.wikidata.org/sparql" # Rate limiting REQUEST_DELAY = 0.5 # seconds between requests def search_wikidata(query: str, language: str = "nl") -> Optional[str]: """Search Wikidata for an entity by name.""" params = { "action": "wbsearchentities", "search": query, "language": language, "format": "json", "limit": 5, } try: response = requests.get(WIKIDATA_API, params=params, timeout=10) response.raise_for_status() data = response.json() if data.get("search"): # Return the first result return data["search"][0]["id"] except Exception as e: print(f" Search error for '{query}': {e}") return None def get_entity_data(entity_id: str) -> Optional[dict]: """Get entity data from Wikidata.""" params = { "action": "wbgetentities", "ids": entity_id, "languages": "nl|en|de|fr", "props": "labels|descriptions|claims|sitelinks", "format": "json", } try: response = requests.get(WIKIDATA_API, params=params, timeout=10) response.raise_for_status() data = response.json() if "entities" in data and entity_id in data["entities"]: return data["entities"][entity_id] except Exception as e: print(f" Entity fetch error for '{entity_id}': {e}") return None def extract_claim_value(claims: dict, property_id: str) -> Any: """Extract a simple value from Wikidata claims.""" if property_id not in claims: return None try: claim = claims[property_id][0] mainsnak = claim.get("mainsnak", {}) datavalue = mainsnak.get("datavalue", {}) if datavalue.get("type") == "wikibase-entityid": return datavalue["value"]["id"] elif datavalue.get("type") == "string": return datavalue["value"] elif datavalue.get("type") == "time": return datavalue["value"]["time"] elif datavalue.get("type") == "globecoordinate": return { "latitude": datavalue["value"]["latitude"], "longitude": datavalue["value"]["longitude"], } except (KeyError, IndexError): pass return None def get_entity_label(entity_id: str, language: str = "nl") -> Optional[str]: """Get the label for a Wikidata entity.""" data = get_entity_data(entity_id) if data and "labels" in data: if language in data["labels"]: return data["labels"][language]["value"] elif "en" in data["labels"]: return data["labels"]["en"]["value"] return None def enrich_with_wikidata(entity_id: str) -> dict[str, Any]: """Create wikidata_enrichment block from entity data.""" data = get_entity_data(entity_id) if not data: return {} enrichment = { "wikidata_entity_id": entity_id, "wikidata_url": f"https://www.wikidata.org/wiki/{entity_id}", } # Labels labels = data.get("labels", {}) if "nl" in labels: enrichment["wikidata_label_nl"] = labels["nl"]["value"] if "en" in labels: enrichment["wikidata_label_en"] = labels["en"]["value"] # Descriptions descriptions = data.get("descriptions", {}) if "nl" in descriptions: enrichment["wikidata_description_nl"] = descriptions["nl"]["value"] if "en" in descriptions: enrichment["wikidata_description_en"] = descriptions["en"]["value"] # Claims claims = data.get("claims", {}) # P31 - instance of instance_of = extract_claim_value(claims, "P31") if instance_of: label = get_entity_label(instance_of) enrichment["instance_of"] = {"id": instance_of, "label": label} # P17 - country country = extract_claim_value(claims, "P17") if country: label = get_entity_label(country) enrichment["country"] = {"id": country, "label": label} # P131 - located in administrative territorial entity located_in = extract_claim_value(claims, "P131") if located_in: label = get_entity_label(located_in) enrichment["located_in"] = {"id": located_in, "label": label} # P625 - coordinate location coords = extract_claim_value(claims, "P625") if coords: enrichment["wikidata_coordinates"] = coords # P856 - official website website = extract_claim_value(claims, "P856") if website: enrichment["official_website"] = website # P571 - inception date inception = extract_claim_value(claims, "P571") if inception: enrichment["inception"] = inception # Sitelinks (Wikipedia articles) sitelinks = data.get("sitelinks", {}) if sitelinks: enrichment["wikipedia_articles"] = list(sitelinks.keys()) if "nlwiki" in sitelinks: enrichment["wikipedia_nl"] = f"https://nl.wikipedia.org/wiki/{sitelinks['nlwiki']['title'].replace(' ', '_')}" enrichment["enrichment_timestamp"] = datetime.now(timezone.utc).isoformat() return enrichment def process_kien_entries(entries_dir: Path, dry_run: bool = False, limit: int = 0) -> tuple[dict[str, Any], list[dict[str, Any]]]: """Process KIEN entries and search for Wikidata matches.""" stats = { "total": 0, "searched": 0, "found": 0, "already_has_wikidata": 0, "not_found": 0, "errors": [], } # Find KIEN entries kien_files = [] for f in entries_dir.glob("*.yaml"): match = re.match(r'^(\d+)_', f.name) if match: idx = int(match.group(1)) if 1674 <= idx <= 1860: kien_files.append(f) def get_entry_index(filepath: Path) -> int: match = re.match(r'^(\d+)_', filepath.name) return int(match.group(1)) if match else 0 kien_files.sort(key=get_entry_index) stats["total"] = len(kien_files) if limit > 0: kien_files = kien_files[:limit] print(f"Processing {len(kien_files)} KIEN entries...") matches = [] for i, filepath in enumerate(kien_files): try: with open(filepath, 'r', encoding='utf-8') as f: entry = yaml.safe_load(f) if not entry: continue # Skip if already has Wikidata if 'wikidata_enrichment' in entry and entry['wikidata_enrichment'].get('wikidata_entity_id'): stats["already_has_wikidata"] += 1 continue # Get organization name name = None if 'kien_enrichment' in entry: name = entry['kien_enrichment'].get('kien_name') if not name and 'original_entry' in entry: name = entry['original_entry'].get('organisatie') if not name: continue stats["searched"] += 1 # Search Wikidata print(f" [{i+1}/{len(kien_files)}] Searching: {name[:50]}...") entity_id = search_wikidata(name) time.sleep(REQUEST_DELAY) if entity_id: print(f" Found: {entity_id}") stats["found"] += 1 # Get enrichment data time.sleep(REQUEST_DELAY) enrichment = enrich_with_wikidata(entity_id) matches.append({ "file": filepath.name, "name": name, "wikidata_id": entity_id, "wikidata_label": enrichment.get("wikidata_label_nl"), }) if not dry_run: entry["wikidata_enrichment"] = enrichment # Add to identifiers if "identifiers" not in entry: entry["identifiers"] = [] # Remove existing Wikidata identifier entry["identifiers"] = [ i for i in entry["identifiers"] if i.get("identifier_scheme") != "Wikidata" ] # Add new Wikidata identifier entry["identifiers"].append({ "identifier_scheme": "Wikidata", "identifier_value": entity_id, "identifier_url": f"https://www.wikidata.org/wiki/{entity_id}", }) with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False) else: stats["not_found"] += 1 except Exception as e: stats["errors"].append(f"{filepath.name}: {str(e)}") return stats, matches def main(): parser = argparse.ArgumentParser(description="Enrich KIEN entries with Wikidata") parser.add_argument('--dry-run', action='store_true', help="Preview without writing") parser.add_argument('--limit', type=int, default=0, help="Limit number of entries to process") args = parser.parse_args() entries_dir = PROJECT_ROOT / "data" / "nde" / "enriched" / "entries" print("="*70) print("KIEN WIKIDATA ENRICHMENT") print("="*70) print(f"Entries directory: {entries_dir}") print(f"Dry run: {args.dry_run}") print(f"Limit: {args.limit if args.limit > 0 else 'none'}") print() stats, matches = process_kien_entries(entries_dir, dry_run=args.dry_run, limit=args.limit) print() print("="*70) print("SUMMARY") print("="*70) print(f"Total KIEN entries: {stats['total']}") print(f"Already have Wikidata: {stats['already_has_wikidata']}") print(f"Searched: {stats['searched']}") print(f"Found in Wikidata: {stats['found']}") print(f"Not found: {stats['not_found']}") if matches: print(f"\nMatches found ({len(matches)}):") for m in matches[:20]: print(f" {m['wikidata_id']}: {m['name'][:40]}") if len(matches) > 20: print(f" ... and {len(matches) - 20} more") if stats['errors']: print(f"\nErrors ({len(stats['errors'])}):") for err in stats['errors'][:5]: print(f" - {err}") print() if args.dry_run: print("DRY RUN COMPLETE - No files modified") else: print("WIKIDATA ENRICHMENT COMPLETE") if __name__ == "__main__": main()