glam/scripts/enrich_nde_from_wikidata.py

530 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Enrich NDE Register NL entries with Wikidata data.
This script reads the NDE Register YAML file, fetches comprehensive data from Wikidata
for entries that have a 'wikidata_id' field, and creates an enriched YAML file with
all available Wikidata properties.
The script uses the Wikibase REST API and SPARQL endpoints to maximize data retrieval
while respecting rate limits.
Usage:
python scripts/enrich_nde_from_wikidata.py
Environment Variables:
WIKIDATA_API_TOKEN - Optional OAuth2 token for increased rate limits (5,000 req/hr)
WIKIMEDIA_CONTACT_EMAIL - Contact email for User-Agent (required by Wikimedia policy)
Output:
data/nde/nde_register_nl_enriched_{timestamp}.yaml
"""
import os
import sys
import time
import json
import yaml
import httpx
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field, asdict
import logging
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1"
WIKIDATA_ACTION_API = "https://www.wikidata.org/w/api.php"
SPARQL_URL = "https://query.wikidata.org/sparql"
# Rate limiting: 500 req/hr for anonymous, 5000 req/hr with token
WIKIDATA_API_TOKEN = os.getenv("WIKIDATA_API_TOKEN", "")
WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-data@example.com")
USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL})"
# Request delay based on authentication status
if WIKIDATA_API_TOKEN:
REQUEST_DELAY = 0.75 # ~4800 requests per hour (below 5000 limit)
logger.info("Using authenticated mode: 5,000 req/hr limit")
else:
REQUEST_DELAY = 7.5 # ~480 requests per hour (below 500 limit)
logger.info("Using anonymous mode: 500 req/hr limit")
# Headers
HEADERS = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
if WIKIDATA_API_TOKEN:
HEADERS["Authorization"] = f"Bearer {WIKIDATA_API_TOKEN}"
@dataclass
class WikidataEnrichment:
"""Container for all Wikidata data extracted for an entity."""
entity_id: str
labels: Dict[str, str] = field(default_factory=dict)
descriptions: Dict[str, str] = field(default_factory=dict)
aliases: Dict[str, List[str]] = field(default_factory=dict)
sitelinks: Dict[str, str] = field(default_factory=dict)
claims: Dict[str, Any] = field(default_factory=dict)
identifiers: Dict[str, str] = field(default_factory=dict)
instance_of: List[Dict[str, str]] = field(default_factory=list)
country: Optional[Dict[str, str]] = None
location: Optional[Dict[str, str]] = None
coordinates: Optional[Dict[str, float]] = None
inception: Optional[str] = None
dissolution: Optional[str] = None
official_website: Optional[str] = None
image: Optional[str] = None
logo: Optional[str] = None
fetch_timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
# Property IDs for heritage institutions
PROPERTY_LABELS = {
# Core properties
"P31": "instance_of", # Instance of (type)
"P17": "country", # Country
"P131": "located_in", # Located in administrative territory
"P625": "coordinates", # Coordinate location
"P571": "inception", # Date founded
"P576": "dissolution", # Date dissolved
"P856": "official_website", # Official website
"P18": "image", # Image
"P154": "logo", # Logo
# Identifiers
"P791": "isil", # ISIL code
"P214": "viaf", # VIAF ID
"P227": "gnd", # GND ID
"P244": "lcnaf", # Library of Congress ID
"P268": "bnf", # BnF ID
"P269": "idref", # IdRef ID
"P213": "isni", # ISNI
"P1566": "geonames", # GeoNames ID
"P2427": "grid", # GRID ID
"P3500": "ringgold", # Ringgold ID
"P5785": "museofile", # Museofile ID (France)
"P8168": "factgrid", # FactGrid ID
# Cultural heritage specific
"P361": "part_of", # Part of
"P355": "subsidiaries", # Subsidiaries
"P749": "parent_org", # Parent organization
"P127": "owned_by", # Owned by
"P1037": "director", # Director/manager
"P159": "headquarters", # Headquarters location
"P463": "member_of", # Member of
"P1435": "heritage_status", # Heritage designation
"P910": "topic_category", # Topic's main category
"P373": "commons_category", # Commons category
# Additional metadata
"P2044": "elevation", # Elevation
"P6375": "street_address", # Street address
"P281": "postal_code", # Postal code
"P1329": "phone", # Phone number
"P968": "email", # Email
"P973": "described_at_url", # Described at URL
"P8402": "kvk_number", # KvK number (Dutch Chamber of Commerce)
}
def fetch_entity_data(entity_id: str, client: httpx.Client) -> Optional[Dict]:
"""
Fetch full entity data from Wikibase REST API.
Args:
entity_id: Wikidata Q-number (e.g., "Q22246632")
client: HTTP client for making requests
Returns:
Full entity data as dictionary, or None on error
"""
url = f"{WIKIDATA_REST_API}/entities/items/{entity_id}"
try:
response = client.get(url, headers=HEADERS)
# Handle OAuth errors (retry without auth)
if response.status_code == 403:
headers_no_auth = {k: v for k, v in HEADERS.items() if k != "Authorization"}
response = client.get(url, headers=headers_no_auth)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.warning(f"Entity {entity_id} not found")
else:
logger.error(f"HTTP error fetching {entity_id}: {e}")
return None
except Exception as e:
logger.error(f"Error fetching {entity_id}: {e}")
return None
def extract_value_from_statement(statement: Dict) -> Any:
"""Extract the value from a Wikidata statement structure."""
try:
value_data = statement.get("value", {})
value_type = value_data.get("type")
content = value_data.get("content")
if value_type == "value":
# Simple string/number values
return content
elif isinstance(content, dict):
if "entity-type" in content or "id" in content:
# Entity reference
return content.get("id", content)
elif "time" in content:
# Time value
return content.get("time")
elif "latitude" in content and "longitude" in content:
# Coordinates
return {
"latitude": content.get("latitude"),
"longitude": content.get("longitude"),
"precision": content.get("precision")
}
else:
return content
else:
return content
except Exception:
return None
def parse_entity_data(entity_id: str, data: Dict) -> WikidataEnrichment:
"""
Parse the full entity data into a WikidataEnrichment object.
Args:
entity_id: The Wikidata entity ID
data: Raw API response data
Returns:
WikidataEnrichment object with all extracted data
"""
enrichment = WikidataEnrichment(entity_id=entity_id)
# Extract labels
labels = data.get("labels", {})
enrichment.labels = labels
# Extract descriptions
descriptions = data.get("descriptions", {})
enrichment.descriptions = descriptions
# Extract aliases
aliases_raw = data.get("aliases", {})
enrichment.aliases = aliases_raw
# Extract sitelinks
sitelinks = data.get("sitelinks", {})
for site, link_data in sitelinks.items():
if isinstance(link_data, dict):
enrichment.sitelinks[site] = link_data.get("title", link_data)
else:
enrichment.sitelinks[site] = link_data
# Extract statements/claims
statements = data.get("statements", {})
for prop_id, prop_statements in statements.items():
prop_name = PROPERTY_LABELS.get(prop_id, prop_id)
if not prop_statements:
continue
# Extract first value (or all values for multi-value properties)
values = []
for stmt in prop_statements:
value = extract_value_from_statement(stmt)
if value is not None:
values.append(value)
if not values:
continue
# Handle specific properties
if prop_id == "P31": # Instance of
enrichment.instance_of = [{"id": v} if isinstance(v, str) else v for v in values]
elif prop_id == "P17": # Country
enrichment.country = {"id": values[0]} if values else None
elif prop_id == "P131": # Located in
enrichment.location = {"id": values[0]} if values else None
elif prop_id == "P625": # Coordinates
if values and isinstance(values[0], dict):
enrichment.coordinates = values[0]
elif prop_id == "P571": # Inception
enrichment.inception = values[0] if values else None
elif prop_id == "P576": # Dissolution
enrichment.dissolution = values[0] if values else None
elif prop_id == "P856": # Official website
enrichment.official_website = values[0] if values else None
elif prop_id == "P18": # Image
enrichment.image = values[0] if values else None
elif prop_id == "P154": # Logo
enrichment.logo = values[0] if values else None
elif prop_id in ["P791", "P214", "P227", "P244", "P268", "P269",
"P213", "P1566", "P2427", "P3500", "P5785", "P8168", "P8402"]:
# Identifiers
enrichment.identifiers[prop_name] = values[0] if values else None
else:
# Store other claims
enrichment.claims[prop_name] = values[0] if len(values) == 1 else values
return enrichment
def enrich_entity(entity_id: str, client: httpx.Client) -> Optional[WikidataEnrichment]:
"""
Fetch and enrich a single entity from Wikidata.
Args:
entity_id: Wikidata Q-number (e.g., "Q22246632")
client: HTTP client for requests
Returns:
WikidataEnrichment object or None on error
"""
# Ensure proper Q-number format
if not entity_id.startswith("Q"):
entity_id = f"Q{entity_id}"
data = fetch_entity_data(entity_id, client)
if data is None:
return None
return parse_entity_data(entity_id, data)
def enrichment_to_dict(enrichment: WikidataEnrichment) -> Dict:
"""Convert WikidataEnrichment to a clean dictionary for YAML output."""
result = {
"wikidata_entity_id": enrichment.entity_id,
"wikidata_fetch_timestamp": enrichment.fetch_timestamp,
}
# Add labels (prioritize nl, en)
if enrichment.labels:
result["wikidata_labels"] = enrichment.labels
# Add convenient primary label
if "nl" in enrichment.labels:
result["wikidata_label_nl"] = enrichment.labels["nl"]
if "en" in enrichment.labels:
result["wikidata_label_en"] = enrichment.labels["en"]
# Add descriptions
if enrichment.descriptions:
result["wikidata_descriptions"] = enrichment.descriptions
if "nl" in enrichment.descriptions:
result["wikidata_description_nl"] = enrichment.descriptions["nl"]
if "en" in enrichment.descriptions:
result["wikidata_description_en"] = enrichment.descriptions["en"]
# Add aliases
if enrichment.aliases:
result["wikidata_aliases"] = enrichment.aliases
# Add identifiers
if enrichment.identifiers:
result["wikidata_identifiers"] = {k: v for k, v in enrichment.identifiers.items() if v}
# Add instance types
if enrichment.instance_of:
result["wikidata_instance_of"] = enrichment.instance_of
# Add location data
if enrichment.country:
result["wikidata_country"] = enrichment.country
if enrichment.location:
result["wikidata_located_in"] = enrichment.location
if enrichment.coordinates:
result["wikidata_coordinates"] = enrichment.coordinates
# Add temporal data
if enrichment.inception:
result["wikidata_inception"] = enrichment.inception
if enrichment.dissolution:
result["wikidata_dissolution"] = enrichment.dissolution
# Add web presence
if enrichment.official_website:
result["wikidata_official_website"] = enrichment.official_website
# Add media
if enrichment.image:
result["wikidata_image"] = enrichment.image
if enrichment.logo:
result["wikidata_logo"] = enrichment.logo
# Add sitelinks (Wikipedia links)
if enrichment.sitelinks:
result["wikidata_sitelinks"] = enrichment.sitelinks
# Add other claims
if enrichment.claims:
result["wikidata_claims"] = enrichment.claims
return result
def main():
"""Main entry point with incremental saving."""
# Paths
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data" / "nde"
input_file = data_dir / "nde_register_nl.yaml"
# Generate timestamp for output file
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
output_file = data_dir / f"nde_register_nl_enriched_{timestamp}.yaml"
progress_file = data_dir / "enrichment_progress.json"
logger.info(f"Input file: {input_file}")
logger.info(f"Output file: {output_file}")
# Load input YAML
logger.info("Loading input YAML file...")
with open(input_file, 'r', encoding='utf-8') as f:
entries = yaml.safe_load(f)
total_entries = len(entries)
logger.info(f"Loaded {total_entries} entries")
# Count entries with wikidata_id
entries_with_wikidata = [e for e in entries if e.get("wikidata_id")]
logger.info(f"Found {len(entries_with_wikidata)} entries with wikidata_id")
# Check for existing progress
start_index = 0
enriched_entries = []
if progress_file.exists():
try:
with open(progress_file, 'r') as f:
progress = json.load(f)
start_index = progress.get("last_processed_index", 0) + 1
enriched_entries = progress.get("enriched_entries", [])
logger.info(f"Resuming from index {start_index} (already processed {len(enriched_entries)} entries)")
except Exception as e:
logger.warning(f"Could not load progress file: {e}")
# Process entries
success_count = len([e for e in enriched_entries if e.get("wikidata_enrichment")])
skip_count = len([e for e in enriched_entries if not e.get("wikidata_id") and not e.get("wikidata_enrichment_error")])
error_count = len([e for e in enriched_entries if e.get("wikidata_enrichment_error")])
# Save interval (save progress every N entries)
SAVE_INTERVAL = 10
with httpx.Client(timeout=30.0) as client:
for i, entry in enumerate(entries):
# Skip already processed entries
if i < start_index:
continue
wikidata_id = entry.get("wikidata_id")
org_name = entry.get("organisatie", "Unknown")
if not wikidata_id:
# Keep entry as-is, skip enrichment
enriched_entries.append(entry)
skip_count += 1
else:
# Log progress
logger.info(f"[{i+1}/{total_entries}] Enriching: {org_name} ({wikidata_id})")
# Fetch and enrich
try:
enrichment = enrich_entity(str(wikidata_id), client)
if enrichment:
# Merge enrichment data with original entry
enriched_entry = dict(entry)
enriched_entry["wikidata_enrichment"] = enrichment_to_dict(enrichment)
enriched_entries.append(enriched_entry)
success_count += 1
else:
# Keep original entry on error
entry_copy = dict(entry)
entry_copy["wikidata_enrichment_error"] = "Failed to fetch from Wikidata"
enriched_entries.append(entry_copy)
error_count += 1
except Exception as e:
logger.error(f"Error processing {org_name}: {e}")
entry_copy = dict(entry)
entry_copy["wikidata_enrichment_error"] = str(e)
enriched_entries.append(entry_copy)
error_count += 1
# Rate limiting
time.sleep(REQUEST_DELAY)
# Save progress periodically
if (i + 1) % SAVE_INTERVAL == 0:
progress_data = {
"last_processed_index": i,
"enriched_entries": enriched_entries,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(progress_data, f)
logger.info(f"Progress saved at index {i}")
# Write final output
logger.info(f"Writing enriched data to {output_file}...")
with open(output_file, 'w', encoding='utf-8') as f:
yaml.dump(enriched_entries, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
# Remove progress file on successful completion
if progress_file.exists():
progress_file.unlink()
logger.info("Removed progress file (enrichment complete)")
# Summary
logger.info("=" * 60)
logger.info("ENRICHMENT COMPLETE")
logger.info("=" * 60)
logger.info(f"Total entries: {total_entries}")
logger.info(f"Entries with wikidata_id: {len(entries_with_wikidata)}")
logger.info(f"Successfully enriched: {success_count}")
logger.info(f"Skipped (no wikidata_id): {skip_count}")
logger.info(f"Errors: {error_count}")
logger.info(f"Output file: {output_file}")
# Create log file
log_file = data_dir / f"enrichment_log_{timestamp}.json"
log_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"input_file": str(input_file),
"output_file": str(output_file),
"total_entries": total_entries,
"entries_with_wikidata_id": len(entries_with_wikidata),
"successfully_enriched": success_count,
"skipped_no_wikidata_id": skip_count,
"errors": error_count,
"authenticated": bool(WIKIDATA_API_TOKEN),
"rate_limit_delay_seconds": REQUEST_DELAY,
}
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, indent=2)
logger.info(f"Log file: {log_file}")
return 0
if __name__ == "__main__":
sys.exit(main())