#!/usr/bin/env python3 """ Enrich NDE Register NL entries with Wikidata data. This script reads the NDE Register YAML file, fetches comprehensive data from Wikidata for entries that have a 'wikidata_id' field, and creates an enriched YAML file with all available Wikidata properties. The script uses the Wikibase REST API and SPARQL endpoints to maximize data retrieval while respecting rate limits. Usage: python scripts/enrich_nde_from_wikidata.py Environment Variables: WIKIDATA_API_TOKEN - Optional OAuth2 token for increased rate limits (5,000 req/hr) WIKIMEDIA_CONTACT_EMAIL - Contact email for User-Agent (required by Wikimedia policy) Output: data/nde/nde_register_nl_enriched_{timestamp}.yaml """ import os import sys import time import json import yaml import httpx from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Any from dataclasses import dataclass, field, asdict import logging # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1" WIKIDATA_ACTION_API = "https://www.wikidata.org/w/api.php" SPARQL_URL = "https://query.wikidata.org/sparql" # Rate limiting: 500 req/hr for anonymous, 5000 req/hr with token WIKIDATA_API_TOKEN = os.getenv("WIKIDATA_API_TOKEN", "") WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-data@example.com") USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL})" # Request delay based on authentication status if WIKIDATA_API_TOKEN: REQUEST_DELAY = 0.75 # ~4800 requests per hour (below 5000 limit) logger.info("Using authenticated mode: 5,000 req/hr limit") else: REQUEST_DELAY = 7.5 # ~480 requests per hour (below 500 limit) logger.info("Using anonymous mode: 500 req/hr limit") # Headers HEADERS = { "Accept": "application/json", "User-Agent": USER_AGENT, } if WIKIDATA_API_TOKEN: HEADERS["Authorization"] = f"Bearer {WIKIDATA_API_TOKEN}" @dataclass class WikidataEnrichment: """Container for all Wikidata data extracted for an entity.""" entity_id: str labels: Dict[str, str] = field(default_factory=dict) descriptions: Dict[str, str] = field(default_factory=dict) aliases: Dict[str, List[str]] = field(default_factory=dict) sitelinks: Dict[str, str] = field(default_factory=dict) claims: Dict[str, Any] = field(default_factory=dict) identifiers: Dict[str, str] = field(default_factory=dict) instance_of: List[Dict[str, str]] = field(default_factory=list) country: Optional[Dict[str, str]] = None location: Optional[Dict[str, str]] = None coordinates: Optional[Dict[str, float]] = None inception: Optional[str] = None dissolution: Optional[str] = None official_website: Optional[str] = None image: Optional[str] = None logo: Optional[str] = None fetch_timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) # Property IDs for heritage institutions PROPERTY_LABELS = { # Core properties "P31": "instance_of", # Instance of (type) "P17": "country", # Country "P131": "located_in", # Located in administrative territory "P625": "coordinates", # Coordinate location "P571": "inception", # Date founded "P576": "dissolution", # Date dissolved "P856": "official_website", # Official website "P18": "image", # Image "P154": "logo", # Logo # Identifiers "P791": "isil", # ISIL code "P214": "viaf", # VIAF ID "P227": "gnd", # GND ID "P244": "lcnaf", # Library of Congress ID "P268": "bnf", # BnF ID "P269": "idref", # IdRef ID "P213": "isni", # ISNI "P1566": "geonames", # GeoNames ID "P2427": "grid", # GRID ID "P3500": "ringgold", # Ringgold ID "P5785": "museofile", # Museofile ID (France) "P8168": "factgrid", # FactGrid ID # Cultural heritage specific "P361": "part_of", # Part of "P355": "subsidiaries", # Subsidiaries "P749": "parent_org", # Parent organization "P127": "owned_by", # Owned by "P1037": "director", # Director/manager "P159": "headquarters", # Headquarters location "P463": "member_of", # Member of "P1435": "heritage_status", # Heritage designation "P910": "topic_category", # Topic's main category "P373": "commons_category", # Commons category # Additional metadata "P2044": "elevation", # Elevation "P6375": "street_address", # Street address "P281": "postal_code", # Postal code "P1329": "phone", # Phone number "P968": "email", # Email "P973": "described_at_url", # Described at URL "P8402": "kvk_number", # KvK number (Dutch Chamber of Commerce) } def fetch_entity_data(entity_id: str, client: httpx.Client) -> Optional[Dict]: """ Fetch full entity data from Wikibase REST API. Args: entity_id: Wikidata Q-number (e.g., "Q22246632") client: HTTP client for making requests Returns: Full entity data as dictionary, or None on error """ url = f"{WIKIDATA_REST_API}/entities/items/{entity_id}" try: response = client.get(url, headers=HEADERS) # Handle OAuth errors (retry without auth) if response.status_code == 403: headers_no_auth = {k: v for k, v in HEADERS.items() if k != "Authorization"} response = client.get(url, headers=headers_no_auth) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 404: logger.warning(f"Entity {entity_id} not found") else: logger.error(f"HTTP error fetching {entity_id}: {e}") return None except Exception as e: logger.error(f"Error fetching {entity_id}: {e}") return None def extract_value_from_statement(statement: Dict) -> Any: """Extract the value from a Wikidata statement structure.""" try: value_data = statement.get("value", {}) value_type = value_data.get("type") content = value_data.get("content") if value_type == "value": # Simple string/number values return content elif isinstance(content, dict): if "entity-type" in content or "id" in content: # Entity reference return content.get("id", content) elif "time" in content: # Time value return content.get("time") elif "latitude" in content and "longitude" in content: # Coordinates return { "latitude": content.get("latitude"), "longitude": content.get("longitude"), "precision": content.get("precision") } else: return content else: return content except Exception: return None def parse_entity_data(entity_id: str, data: Dict) -> WikidataEnrichment: """ Parse the full entity data into a WikidataEnrichment object. Args: entity_id: The Wikidata entity ID data: Raw API response data Returns: WikidataEnrichment object with all extracted data """ enrichment = WikidataEnrichment(entity_id=entity_id) # Extract labels labels = data.get("labels", {}) enrichment.labels = labels # Extract descriptions descriptions = data.get("descriptions", {}) enrichment.descriptions = descriptions # Extract aliases aliases_raw = data.get("aliases", {}) enrichment.aliases = aliases_raw # Extract sitelinks sitelinks = data.get("sitelinks", {}) for site, link_data in sitelinks.items(): if isinstance(link_data, dict): enrichment.sitelinks[site] = link_data.get("title", link_data) else: enrichment.sitelinks[site] = link_data # Extract statements/claims statements = data.get("statements", {}) for prop_id, prop_statements in statements.items(): prop_name = PROPERTY_LABELS.get(prop_id, prop_id) if not prop_statements: continue # Extract first value (or all values for multi-value properties) values = [] for stmt in prop_statements: value = extract_value_from_statement(stmt) if value is not None: values.append(value) if not values: continue # Handle specific properties if prop_id == "P31": # Instance of enrichment.instance_of = [{"id": v} if isinstance(v, str) else v for v in values] elif prop_id == "P17": # Country enrichment.country = {"id": values[0]} if values else None elif prop_id == "P131": # Located in enrichment.location = {"id": values[0]} if values else None elif prop_id == "P625": # Coordinates if values and isinstance(values[0], dict): enrichment.coordinates = values[0] elif prop_id == "P571": # Inception enrichment.inception = values[0] if values else None elif prop_id == "P576": # Dissolution enrichment.dissolution = values[0] if values else None elif prop_id == "P856": # Official website enrichment.official_website = values[0] if values else None elif prop_id == "P18": # Image enrichment.image = values[0] if values else None elif prop_id == "P154": # Logo enrichment.logo = values[0] if values else None elif prop_id in ["P791", "P214", "P227", "P244", "P268", "P269", "P213", "P1566", "P2427", "P3500", "P5785", "P8168", "P8402"]: # Identifiers enrichment.identifiers[prop_name] = values[0] if values else None else: # Store other claims enrichment.claims[prop_name] = values[0] if len(values) == 1 else values return enrichment def enrich_entity(entity_id: str, client: httpx.Client) -> Optional[WikidataEnrichment]: """ Fetch and enrich a single entity from Wikidata. Args: entity_id: Wikidata Q-number (e.g., "Q22246632") client: HTTP client for requests Returns: WikidataEnrichment object or None on error """ # Ensure proper Q-number format if not entity_id.startswith("Q"): entity_id = f"Q{entity_id}" data = fetch_entity_data(entity_id, client) if data is None: return None return parse_entity_data(entity_id, data) def enrichment_to_dict(enrichment: WikidataEnrichment) -> Dict: """Convert WikidataEnrichment to a clean dictionary for YAML output.""" result = { "wikidata_entity_id": enrichment.entity_id, "wikidata_fetch_timestamp": enrichment.fetch_timestamp, } # Add labels (prioritize nl, en) if enrichment.labels: result["wikidata_labels"] = enrichment.labels # Add convenient primary label if "nl" in enrichment.labels: result["wikidata_label_nl"] = enrichment.labels["nl"] if "en" in enrichment.labels: result["wikidata_label_en"] = enrichment.labels["en"] # Add descriptions if enrichment.descriptions: result["wikidata_descriptions"] = enrichment.descriptions if "nl" in enrichment.descriptions: result["wikidata_description_nl"] = enrichment.descriptions["nl"] if "en" in enrichment.descriptions: result["wikidata_description_en"] = enrichment.descriptions["en"] # Add aliases if enrichment.aliases: result["wikidata_aliases"] = enrichment.aliases # Add identifiers if enrichment.identifiers: result["wikidata_identifiers"] = {k: v for k, v in enrichment.identifiers.items() if v} # Add instance types if enrichment.instance_of: result["wikidata_instance_of"] = enrichment.instance_of # Add location data if enrichment.country: result["wikidata_country"] = enrichment.country if enrichment.location: result["wikidata_located_in"] = enrichment.location if enrichment.coordinates: result["wikidata_coordinates"] = enrichment.coordinates # Add temporal data if enrichment.inception: result["wikidata_inception"] = enrichment.inception if enrichment.dissolution: result["wikidata_dissolution"] = enrichment.dissolution # Add web presence if enrichment.official_website: result["wikidata_official_website"] = enrichment.official_website # Add media if enrichment.image: result["wikidata_image"] = enrichment.image if enrichment.logo: result["wikidata_logo"] = enrichment.logo # Add sitelinks (Wikipedia links) if enrichment.sitelinks: result["wikidata_sitelinks"] = enrichment.sitelinks # Add other claims if enrichment.claims: result["wikidata_claims"] = enrichment.claims return result def main(): """Main entry point with incremental saving.""" # Paths script_dir = Path(__file__).parent data_dir = script_dir.parent / "data" / "nde" input_file = data_dir / "nde_register_nl.yaml" # Generate timestamp for output file timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") output_file = data_dir / f"nde_register_nl_enriched_{timestamp}.yaml" progress_file = data_dir / "enrichment_progress.json" logger.info(f"Input file: {input_file}") logger.info(f"Output file: {output_file}") # Load input YAML logger.info("Loading input YAML file...") with open(input_file, 'r', encoding='utf-8') as f: entries = yaml.safe_load(f) total_entries = len(entries) logger.info(f"Loaded {total_entries} entries") # Count entries with wikidata_id entries_with_wikidata = [e for e in entries if e.get("wikidata_id")] logger.info(f"Found {len(entries_with_wikidata)} entries with wikidata_id") # Check for existing progress start_index = 0 enriched_entries = [] if progress_file.exists(): try: with open(progress_file, 'r') as f: progress = json.load(f) start_index = progress.get("last_processed_index", 0) + 1 enriched_entries = progress.get("enriched_entries", []) logger.info(f"Resuming from index {start_index} (already processed {len(enriched_entries)} entries)") except Exception as e: logger.warning(f"Could not load progress file: {e}") # Process entries success_count = len([e for e in enriched_entries if e.get("wikidata_enrichment")]) skip_count = len([e for e in enriched_entries if not e.get("wikidata_id") and not e.get("wikidata_enrichment_error")]) error_count = len([e for e in enriched_entries if e.get("wikidata_enrichment_error")]) # Save interval (save progress every N entries) SAVE_INTERVAL = 10 with httpx.Client(timeout=30.0) as client: for i, entry in enumerate(entries): # Skip already processed entries if i < start_index: continue wikidata_id = entry.get("wikidata_id") org_name = entry.get("organisatie", "Unknown") if not wikidata_id: # Keep entry as-is, skip enrichment enriched_entries.append(entry) skip_count += 1 else: # Log progress logger.info(f"[{i+1}/{total_entries}] Enriching: {org_name} ({wikidata_id})") # Fetch and enrich try: enrichment = enrich_entity(str(wikidata_id), client) if enrichment: # Merge enrichment data with original entry enriched_entry = dict(entry) enriched_entry["wikidata_enrichment"] = enrichment_to_dict(enrichment) enriched_entries.append(enriched_entry) success_count += 1 else: # Keep original entry on error entry_copy = dict(entry) entry_copy["wikidata_enrichment_error"] = "Failed to fetch from Wikidata" enriched_entries.append(entry_copy) error_count += 1 except Exception as e: logger.error(f"Error processing {org_name}: {e}") entry_copy = dict(entry) entry_copy["wikidata_enrichment_error"] = str(e) enriched_entries.append(entry_copy) error_count += 1 # Rate limiting time.sleep(REQUEST_DELAY) # Save progress periodically if (i + 1) % SAVE_INTERVAL == 0: progress_data = { "last_processed_index": i, "enriched_entries": enriched_entries, "timestamp": datetime.now(timezone.utc).isoformat(), } with open(progress_file, 'w', encoding='utf-8') as f: json.dump(progress_data, f) logger.info(f"Progress saved at index {i}") # Write final output logger.info(f"Writing enriched data to {output_file}...") with open(output_file, 'w', encoding='utf-8') as f: yaml.dump(enriched_entries, f, allow_unicode=True, default_flow_style=False, sort_keys=False) # Remove progress file on successful completion if progress_file.exists(): progress_file.unlink() logger.info("Removed progress file (enrichment complete)") # Summary logger.info("=" * 60) logger.info("ENRICHMENT COMPLETE") logger.info("=" * 60) logger.info(f"Total entries: {total_entries}") logger.info(f"Entries with wikidata_id: {len(entries_with_wikidata)}") logger.info(f"Successfully enriched: {success_count}") logger.info(f"Skipped (no wikidata_id): {skip_count}") logger.info(f"Errors: {error_count}") logger.info(f"Output file: {output_file}") # Create log file log_file = data_dir / f"enrichment_log_{timestamp}.json" log_data = { "timestamp": datetime.now(timezone.utc).isoformat(), "input_file": str(input_file), "output_file": str(output_file), "total_entries": total_entries, "entries_with_wikidata_id": len(entries_with_wikidata), "successfully_enriched": success_count, "skipped_no_wikidata_id": skip_count, "errors": error_count, "authenticated": bool(WIKIDATA_API_TOKEN), "rate_limit_delay_seconds": REQUEST_DELAY, } with open(log_file, 'w', encoding='utf-8') as f: json.dump(log_data, f, indent=2) logger.info(f"Log file: {log_file}") return 0 if __name__ == "__main__": sys.exit(main())