glam/scripts/enrich_nde_fast.py

567 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Fast Wikidata enrichment for NDE Register NL entries.
This script:
- Stores each enriched entry as a separate YAML file
- Allows resuming from last enriched entry
- Uses faster rate limiting (with exponential backoff on errors)
- Adds comprehensive API metadata to each entry
Usage:
python scripts/enrich_nde_fast.py
Output:
data/nde/enriched/entries/{index}_{wikidata_id}.yaml
"""
import os
import sys
import time
import json
import yaml
import httpx
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
import logging
import re
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1"
SPARQL_URL = "https://query.wikidata.org/sparql"
# Rate limiting - FASTER but with backoff
# Wikimedia guidelines: be polite, use User-Agent, back off on errors
# Anonymous: 500 req/hr = ~8.3 req/min, we'll try 5 req/min (12 sec delay)
# If we get 429s, we back off exponentially
BASE_DELAY = 2.0 # Start with 2 seconds between requests
MAX_DELAY = 60.0 # Max backoff delay
BACKOFF_FACTOR = 2.0 # Double delay on each error
WIKIDATA_API_TOKEN = os.getenv("WIKIDATA_API_TOKEN", "")
WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-data@example.com")
USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL}) Python/httpx"
# Headers
HEADERS = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
if WIKIDATA_API_TOKEN:
HEADERS["Authorization"] = f"Bearer {WIKIDATA_API_TOKEN}"
logger.info("Using authenticated mode")
else:
logger.info("Using anonymous mode")
@dataclass
class APIMetadata:
"""Metadata about the API call."""
api_endpoint: str
entity_id: str
request_url: str
response_status: int
response_time_ms: float
fetch_timestamp: str
user_agent: str
authenticated: bool
rate_limit_delay_used: float
@dataclass
class WikidataEnrichment:
"""Container for all Wikidata data extracted for an entity."""
entity_id: str
labels: Dict[str, str] = field(default_factory=dict)
descriptions: Dict[str, str] = field(default_factory=dict)
aliases: Dict[str, List[str]] = field(default_factory=dict)
sitelinks: Dict[str, str] = field(default_factory=dict)
claims: Dict[str, Any] = field(default_factory=dict)
identifiers: Dict[str, str] = field(default_factory=dict)
instance_of: List[Dict[str, str]] = field(default_factory=list)
country: Optional[Dict[str, str]] = None
location: Optional[Dict[str, str]] = None
coordinates: Optional[Dict[str, float]] = None
inception: Optional[str] = None
dissolution: Optional[str] = None
official_website: Optional[str] = None
image: Optional[str] = None
logo: Optional[str] = None
api_metadata: Optional[Dict[str, Any]] = None
# Property IDs for heritage institutions
PROPERTY_LABELS = {
"P31": "instance_of", "P17": "country", "P131": "located_in",
"P625": "coordinates", "P571": "inception", "P576": "dissolution",
"P856": "official_website", "P18": "image", "P154": "logo",
"P791": "isil", "P214": "viaf", "P227": "gnd", "P244": "lcnaf",
"P268": "bnf", "P269": "idref", "P213": "isni", "P1566": "geonames",
"P2427": "grid", "P3500": "ringgold", "P5785": "museofile",
"P8168": "factgrid", "P361": "part_of", "P355": "subsidiaries",
"P749": "parent_org", "P127": "owned_by", "P1037": "director",
"P159": "headquarters", "P463": "member_of", "P1435": "heritage_status",
"P910": "topic_category", "P373": "commons_category", "P2044": "elevation",
"P6375": "street_address", "P281": "postal_code", "P1329": "phone",
"P968": "email", "P973": "described_at_url", "P8402": "kvk_number",
}
def fetch_entity_with_metadata(entity_id: str, client: httpx.Client, delay_used: float) -> tuple[Optional[Dict], Optional[APIMetadata]]:
"""
Fetch entity data with comprehensive metadata tracking.
Returns:
Tuple of (entity_data, api_metadata)
"""
url = f"{WIKIDATA_REST_API}/entities/items/{entity_id}"
fetch_timestamp = datetime.now(timezone.utc).isoformat()
start_time = time.perf_counter()
try:
response = client.get(url, headers=HEADERS)
elapsed_ms = (time.perf_counter() - start_time) * 1000
metadata = APIMetadata(
api_endpoint=WIKIDATA_REST_API,
entity_id=entity_id,
request_url=url,
response_status=response.status_code,
response_time_ms=round(elapsed_ms, 2),
fetch_timestamp=fetch_timestamp,
user_agent=USER_AGENT,
authenticated=bool(WIKIDATA_API_TOKEN),
rate_limit_delay_used=delay_used,
)
if response.status_code == 429:
# Rate limited
logger.warning(f"Rate limited on {entity_id}")
return None, metadata
response.raise_for_status()
return response.json(), metadata
except httpx.HTTPStatusError as e:
elapsed_ms = (time.perf_counter() - start_time) * 1000
metadata = APIMetadata(
api_endpoint=WIKIDATA_REST_API,
entity_id=entity_id,
request_url=url,
response_status=e.response.status_code,
response_time_ms=round(elapsed_ms, 2),
fetch_timestamp=fetch_timestamp,
user_agent=USER_AGENT,
authenticated=bool(WIKIDATA_API_TOKEN),
rate_limit_delay_used=delay_used,
)
logger.error(f"HTTP error fetching {entity_id}: {e}")
return None, metadata
except Exception as e:
logger.error(f"Error fetching {entity_id}: {e}")
return None, None
def extract_value_from_statement(statement: Dict) -> Any:
"""Extract the value from a Wikidata statement structure."""
try:
value_data = statement.get("value", {})
value_type = value_data.get("type")
content = value_data.get("content")
if value_type == "value":
return content
elif isinstance(content, dict):
if "entity-type" in content or "id" in content:
return content.get("id", content)
elif "time" in content:
return content.get("time")
elif "latitude" in content and "longitude" in content:
return {
"latitude": content.get("latitude"),
"longitude": content.get("longitude"),
"precision": content.get("precision")
}
else:
return content
else:
return content
except Exception:
return None
def parse_entity_data(entity_id: str, data: Dict, api_metadata: APIMetadata) -> WikidataEnrichment:
"""Parse the full entity data into a WikidataEnrichment object."""
enrichment = WikidataEnrichment(entity_id=entity_id)
# Store API metadata
enrichment.api_metadata = {
"api_endpoint": api_metadata.api_endpoint,
"request_url": api_metadata.request_url,
"response_status": api_metadata.response_status,
"response_time_ms": api_metadata.response_time_ms,
"fetch_timestamp": api_metadata.fetch_timestamp,
"user_agent": api_metadata.user_agent,
"authenticated": api_metadata.authenticated,
"rate_limit_delay_used": api_metadata.rate_limit_delay_used,
}
# Extract labels, descriptions, aliases
enrichment.labels = data.get("labels", {})
enrichment.descriptions = data.get("descriptions", {})
enrichment.aliases = data.get("aliases", {})
# Extract sitelinks
sitelinks = data.get("sitelinks", {})
for site, link_data in sitelinks.items():
if isinstance(link_data, dict):
enrichment.sitelinks[site] = link_data.get("title", str(link_data))
else:
enrichment.sitelinks[site] = str(link_data)
# Extract statements/claims
statements = data.get("statements", {})
for prop_id, prop_statements in statements.items():
prop_name = PROPERTY_LABELS.get(prop_id, prop_id)
if not prop_statements:
continue
values = []
for stmt in prop_statements:
value = extract_value_from_statement(stmt)
if value is not None:
values.append(value)
if not values:
continue
# Handle specific properties
if prop_id == "P31":
enrichment.instance_of = [{"id": v} if isinstance(v, str) else v for v in values]
elif prop_id == "P17":
enrichment.country = {"id": values[0]} if values else None
elif prop_id == "P131":
enrichment.location = {"id": values[0]} if values else None
elif prop_id == "P625":
if values and isinstance(values[0], dict):
enrichment.coordinates = values[0]
elif prop_id == "P571":
enrichment.inception = values[0] if values else None
elif prop_id == "P576":
enrichment.dissolution = values[0] if values else None
elif prop_id == "P856":
enrichment.official_website = values[0] if values else None
elif prop_id == "P18":
enrichment.image = values[0] if values else None
elif prop_id == "P154":
enrichment.logo = values[0] if values else None
elif prop_id in ["P791", "P214", "P227", "P244", "P268", "P269",
"P213", "P1566", "P2427", "P3500", "P5785", "P8168", "P8402"]:
enrichment.identifiers[prop_name] = values[0] if values else None
else:
enrichment.claims[prop_name] = values[0] if len(values) == 1 else values
return enrichment
def enrichment_to_dict(enrichment: WikidataEnrichment) -> Dict[str, Any]:
"""Convert WikidataEnrichment to a clean dictionary for YAML output."""
result: Dict[str, Any] = {
"wikidata_entity_id": enrichment.entity_id,
}
# Add API metadata first
if enrichment.api_metadata:
result["api_metadata"] = enrichment.api_metadata
# Add labels (prioritize nl, en)
if enrichment.labels:
result["wikidata_labels"] = enrichment.labels
if "nl" in enrichment.labels:
result["wikidata_label_nl"] = enrichment.labels["nl"]
if "en" in enrichment.labels:
result["wikidata_label_en"] = enrichment.labels["en"]
# Add descriptions
if enrichment.descriptions:
result["wikidata_descriptions"] = enrichment.descriptions
if "nl" in enrichment.descriptions:
result["wikidata_description_nl"] = enrichment.descriptions["nl"]
if "en" in enrichment.descriptions:
result["wikidata_description_en"] = enrichment.descriptions["en"]
# Add aliases
if enrichment.aliases:
result["wikidata_aliases"] = enrichment.aliases
# Add identifiers
if enrichment.identifiers:
result["wikidata_identifiers"] = {k: v for k, v in enrichment.identifiers.items() if v}
# Add instance types
if enrichment.instance_of:
result["wikidata_instance_of"] = enrichment.instance_of
# Add location data
if enrichment.country:
result["wikidata_country"] = enrichment.country
if enrichment.location:
result["wikidata_located_in"] = enrichment.location
if enrichment.coordinates:
result["wikidata_coordinates"] = enrichment.coordinates
# Add temporal data
if enrichment.inception:
result["wikidata_inception"] = enrichment.inception
if enrichment.dissolution:
result["wikidata_dissolution"] = enrichment.dissolution
# Add web presence
if enrichment.official_website:
result["wikidata_official_website"] = enrichment.official_website
# Add media
if enrichment.image:
result["wikidata_image"] = enrichment.image
if enrichment.logo:
result["wikidata_logo"] = enrichment.logo
# Add sitelinks (Wikipedia links)
if enrichment.sitelinks:
result["wikidata_sitelinks"] = enrichment.sitelinks
# Add other claims
if enrichment.claims:
result["wikidata_claims"] = enrichment.claims
return result
def get_processed_entries(entries_dir: Path) -> set:
"""Get set of already processed entry indices."""
processed = set()
for f in entries_dir.glob("*.yaml"):
# Extract index from filename like "0001_Q12345.yaml"
match = re.match(r"(\d+)_", f.name)
if match:
processed.add(int(match.group(1)))
return processed
def save_entry(entries_dir: Path, index: int, entry: Dict, enrichment_data: Optional[Dict], error: Optional[str] = None):
"""Save a single entry to a YAML file."""
wikidata_id = entry.get("wikidata_id", "unknown")
# Sanitize wikidata_id for use in filename
# Handle cases where wikidata_id is a URL or contains invalid characters
if wikidata_id and isinstance(wikidata_id, str):
# Check if it's a valid Q-number
if re.match(r'^Q\d+$', wikidata_id):
safe_id = wikidata_id
else:
# Invalid wikidata_id (URL or other format) - mark as invalid
safe_id = "invalid_id"
else:
safe_id = "unknown"
filename = f"{index:04d}_{safe_id}.yaml"
filepath = entries_dir / filename
output = {
"original_entry": entry,
"entry_index": index,
"processing_timestamp": datetime.now(timezone.utc).isoformat(),
}
if enrichment_data:
output["wikidata_enrichment"] = enrichment_data
output["enrichment_status"] = "success"
elif error:
output["enrichment_error"] = error
output["enrichment_status"] = "error"
else:
output["enrichment_status"] = "skipped"
output["skip_reason"] = "no_wikidata_id"
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(output, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
return filepath
def main():
"""Main entry point with per-entry storage and resume capability."""
# Paths
script_dir = Path(__file__).parent
data_dir = script_dir.parent / "data" / "nde"
input_file = data_dir / "nde_register_nl.yaml"
entries_dir = data_dir / "enriched" / "entries"
# Ensure entries directory exists
entries_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Input file: {input_file}")
logger.info(f"Entries directory: {entries_dir}")
# Load input YAML
logger.info("Loading input YAML file...")
with open(input_file, 'r', encoding='utf-8') as f:
entries = yaml.safe_load(f)
total_entries = len(entries)
logger.info(f"Loaded {total_entries} entries")
# Check for already processed entries
processed_indices = get_processed_entries(entries_dir)
logger.info(f"Found {len(processed_indices)} already processed entries")
# Count entries with wikidata_id
entries_with_wikidata = [i for i, e in enumerate(entries) if e.get("wikidata_id")]
remaining_to_process = [i for i in entries_with_wikidata if i not in processed_indices]
logger.info(f"Entries with wikidata_id: {len(entries_with_wikidata)}")
logger.info(f"Remaining to process: {len(remaining_to_process)}")
# Stats
success_count = 0
skip_count = 0
error_count = 0
rate_limit_count = 0
# Adaptive delay
current_delay = BASE_DELAY
consecutive_successes = 0
with httpx.Client(timeout=30.0) as client:
for i, entry in enumerate(entries):
# Skip already processed
if i in processed_indices:
continue
wikidata_id = entry.get("wikidata_id")
org_name = entry.get("organisatie", "Unknown")
if not wikidata_id:
# Save as skipped
save_entry(entries_dir, i, entry, None)
skip_count += 1
continue
# Validate and normalize wikidata_id
wikidata_id_str = str(wikidata_id).strip()
# Check if it's a valid Q-number or can be converted to one
if re.match(r'^Q\d+$', wikidata_id_str):
# Already valid Q-number
pass
elif re.match(r'^\d+$', wikidata_id_str):
# Just a number, add Q prefix
wikidata_id_str = f"Q{wikidata_id_str}"
else:
# Invalid format (URL, text, etc.) - skip with error
logger.warning(f"[{i+1}/{total_entries}] Invalid wikidata_id format: {wikidata_id_str} for {org_name}")
save_entry(entries_dir, i, entry, None, f"Invalid wikidata_id format: {wikidata_id_str}")
error_count += 1
continue
# Log progress
progress_pct = (i + 1) / total_entries * 100
logger.info(f"[{i+1}/{total_entries}] ({progress_pct:.1f}%) Enriching: {org_name} ({wikidata_id_str}) [delay={current_delay:.1f}s]")
# Fetch and enrich
try:
data, metadata = fetch_entity_with_metadata(wikidata_id_str, client, current_delay)
if data and metadata:
enrichment = parse_entity_data(wikidata_id_str, data, metadata)
enrichment_dict = enrichment_to_dict(enrichment)
save_entry(entries_dir, i, entry, enrichment_dict)
success_count += 1
consecutive_successes += 1
# Reduce delay after consecutive successes (min BASE_DELAY)
if consecutive_successes >= 5 and current_delay > BASE_DELAY:
current_delay = max(BASE_DELAY, current_delay / 1.5)
consecutive_successes = 0
logger.info(f"Reducing delay to {current_delay:.1f}s after successful requests")
elif metadata and metadata.response_status == 429:
# Rate limited - back off
rate_limit_count += 1
consecutive_successes = 0
current_delay = min(MAX_DELAY, current_delay * BACKOFF_FACTOR)
logger.warning(f"Rate limited! Increasing delay to {current_delay:.1f}s")
# Save as error but allow retry later
save_entry(entries_dir, i, entry, None, f"Rate limited (429)")
error_count += 1
# Extra wait on rate limit
time.sleep(current_delay * 2)
else:
save_entry(entries_dir, i, entry, None, "Failed to fetch from Wikidata")
error_count += 1
consecutive_successes = 0
except Exception as e:
logger.error(f"Error processing {org_name}: {e}")
save_entry(entries_dir, i, entry, None, str(e))
error_count += 1
consecutive_successes = 0
# Rate limiting delay
time.sleep(current_delay)
# Final summary
logger.info("=" * 60)
logger.info("ENRICHMENT COMPLETE")
logger.info("=" * 60)
logger.info(f"Total entries: {total_entries}")
logger.info(f"Entries with wikidata_id: {len(entries_with_wikidata)}")
logger.info(f"Already processed (skipped): {len(processed_indices)}")
logger.info(f"Successfully enriched this run: {success_count}")
logger.info(f"Skipped (no wikidata_id): {skip_count}")
logger.info(f"Errors: {error_count}")
logger.info(f"Rate limit hits: {rate_limit_count}")
logger.info(f"Entries directory: {entries_dir}")
# Create summary log
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
log_file = data_dir / "enriched" / f"enrichment_log_{timestamp}.json"
log_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"input_file": str(input_file),
"entries_directory": str(entries_dir),
"total_entries": total_entries,
"entries_with_wikidata_id": len(entries_with_wikidata),
"previously_processed": len(processed_indices),
"successfully_enriched_this_run": success_count,
"skipped_no_wikidata_id": skip_count,
"errors": error_count,
"rate_limit_hits": rate_limit_count,
"authenticated": bool(WIKIDATA_API_TOKEN),
"base_delay_seconds": BASE_DELAY,
"final_delay_seconds": current_delay,
}
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(log_data, f, indent=2)
logger.info(f"Log file: {log_file}")
return 0
if __name__ == "__main__":
sys.exit(main())