542 lines
20 KiB
Python
Executable file
542 lines
20 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Fast Wikidata enrichment for NDE Register NL entries.
|
|
|
|
This script:
|
|
- Stores each enriched entry as a separate YAML file
|
|
- Allows resuming from last enriched entry
|
|
- Uses faster rate limiting (with exponential backoff on errors)
|
|
- Adds comprehensive API metadata to each entry
|
|
|
|
Usage:
|
|
python scripts/enrich_nde_fast.py
|
|
|
|
Output:
|
|
data/nde/enriched/entries/{index}_{wikidata_id}.yaml
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import yaml
|
|
import httpx
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, field
|
|
import logging
|
|
import re
|
|
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1"
|
|
SPARQL_URL = "https://query.wikidata.org/sparql"
|
|
|
|
# Rate limiting - FASTER but with backoff
|
|
# Wikimedia guidelines: be polite, use User-Agent, back off on errors
|
|
# Anonymous: 500 req/hr = ~8.3 req/min, we'll try 5 req/min (12 sec delay)
|
|
# If we get 429s, we back off exponentially
|
|
BASE_DELAY = 2.0 # Start with 2 seconds between requests
|
|
MAX_DELAY = 60.0 # Max backoff delay
|
|
BACKOFF_FACTOR = 2.0 # Double delay on each error
|
|
|
|
WIKIDATA_API_TOKEN = os.getenv("WIKIDATA_API_TOKEN", "")
|
|
WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-data@example.com")
|
|
USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL}) Python/httpx"
|
|
|
|
# Headers
|
|
HEADERS = {
|
|
"Accept": "application/json",
|
|
"User-Agent": USER_AGENT,
|
|
}
|
|
if WIKIDATA_API_TOKEN:
|
|
HEADERS["Authorization"] = f"Bearer {WIKIDATA_API_TOKEN}"
|
|
logger.info("Using authenticated mode")
|
|
else:
|
|
logger.info("Using anonymous mode")
|
|
|
|
|
|
@dataclass
|
|
class APIMetadata:
|
|
"""Metadata about the API call."""
|
|
api_endpoint: str
|
|
entity_id: str
|
|
request_url: str
|
|
response_status: int
|
|
response_time_ms: float
|
|
fetch_timestamp: str
|
|
user_agent: str
|
|
authenticated: bool
|
|
rate_limit_delay_used: float
|
|
|
|
|
|
@dataclass
|
|
class WikidataEnrichment:
|
|
"""Container for all Wikidata data extracted for an entity."""
|
|
entity_id: str
|
|
labels: Dict[str, str] = field(default_factory=dict)
|
|
descriptions: Dict[str, str] = field(default_factory=dict)
|
|
aliases: Dict[str, List[str]] = field(default_factory=dict)
|
|
sitelinks: Dict[str, str] = field(default_factory=dict)
|
|
claims: Dict[str, Any] = field(default_factory=dict)
|
|
identifiers: Dict[str, str] = field(default_factory=dict)
|
|
instance_of: List[Dict[str, str]] = field(default_factory=list)
|
|
country: Optional[Dict[str, str]] = None
|
|
location: Optional[Dict[str, str]] = None
|
|
coordinates: Optional[Dict[str, float]] = None
|
|
inception: Optional[str] = None
|
|
dissolution: Optional[str] = None
|
|
official_website: Optional[str] = None
|
|
image: Optional[str] = None
|
|
logo: Optional[str] = None
|
|
api_metadata: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
# Property IDs for heritage institutions
|
|
PROPERTY_LABELS = {
|
|
"P31": "instance_of", "P17": "country", "P131": "located_in",
|
|
"P625": "coordinates", "P571": "inception", "P576": "dissolution",
|
|
"P856": "official_website", "P18": "image", "P154": "logo",
|
|
"P791": "isil", "P214": "viaf", "P227": "gnd", "P244": "lcnaf",
|
|
"P268": "bnf", "P269": "idref", "P213": "isni", "P1566": "geonames",
|
|
"P2427": "grid", "P3500": "ringgold", "P5785": "museofile",
|
|
"P8168": "factgrid", "P361": "part_of", "P355": "subsidiaries",
|
|
"P749": "parent_org", "P127": "owned_by", "P1037": "director",
|
|
"P159": "headquarters", "P463": "member_of", "P1435": "heritage_status",
|
|
"P910": "topic_category", "P373": "commons_category", "P2044": "elevation",
|
|
"P6375": "street_address", "P281": "postal_code", "P1329": "phone",
|
|
"P968": "email", "P973": "described_at_url", "P8402": "kvk_number",
|
|
}
|
|
|
|
|
|
def fetch_entity_with_metadata(entity_id: str, client: httpx.Client, delay_used: float) -> tuple[Optional[Dict], Optional[APIMetadata]]:
|
|
"""
|
|
Fetch entity data with comprehensive metadata tracking.
|
|
|
|
Returns:
|
|
Tuple of (entity_data, api_metadata)
|
|
"""
|
|
url = f"{WIKIDATA_REST_API}/entities/items/{entity_id}"
|
|
fetch_timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
start_time = time.perf_counter()
|
|
|
|
try:
|
|
response = client.get(url, headers=HEADERS)
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
|
|
metadata = APIMetadata(
|
|
api_endpoint=WIKIDATA_REST_API,
|
|
entity_id=entity_id,
|
|
request_url=url,
|
|
response_status=response.status_code,
|
|
response_time_ms=round(elapsed_ms, 2),
|
|
fetch_timestamp=fetch_timestamp,
|
|
user_agent=USER_AGENT,
|
|
authenticated=bool(WIKIDATA_API_TOKEN),
|
|
rate_limit_delay_used=delay_used,
|
|
)
|
|
|
|
if response.status_code == 429:
|
|
# Rate limited
|
|
logger.warning(f"Rate limited on {entity_id}")
|
|
return None, metadata
|
|
|
|
response.raise_for_status()
|
|
return response.json(), metadata
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
metadata = APIMetadata(
|
|
api_endpoint=WIKIDATA_REST_API,
|
|
entity_id=entity_id,
|
|
request_url=url,
|
|
response_status=e.response.status_code,
|
|
response_time_ms=round(elapsed_ms, 2),
|
|
fetch_timestamp=fetch_timestamp,
|
|
user_agent=USER_AGENT,
|
|
authenticated=bool(WIKIDATA_API_TOKEN),
|
|
rate_limit_delay_used=delay_used,
|
|
)
|
|
logger.error(f"HTTP error fetching {entity_id}: {e}")
|
|
return None, metadata
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching {entity_id}: {e}")
|
|
return None, None
|
|
|
|
|
|
def extract_value_from_statement(statement: Dict) -> Any:
|
|
"""Extract the value from a Wikidata statement structure."""
|
|
try:
|
|
value_data = statement.get("value", {})
|
|
value_type = value_data.get("type")
|
|
content = value_data.get("content")
|
|
|
|
if value_type == "value":
|
|
return content
|
|
elif isinstance(content, dict):
|
|
if "entity-type" in content or "id" in content:
|
|
return content.get("id", content)
|
|
elif "time" in content:
|
|
return content.get("time")
|
|
elif "latitude" in content and "longitude" in content:
|
|
return {
|
|
"latitude": content.get("latitude"),
|
|
"longitude": content.get("longitude"),
|
|
"precision": content.get("precision")
|
|
}
|
|
else:
|
|
return content
|
|
else:
|
|
return content
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def parse_entity_data(entity_id: str, data: Dict, api_metadata: APIMetadata) -> WikidataEnrichment:
|
|
"""Parse the full entity data into a WikidataEnrichment object."""
|
|
enrichment = WikidataEnrichment(entity_id=entity_id)
|
|
|
|
# Store API metadata
|
|
enrichment.api_metadata = {
|
|
"api_endpoint": api_metadata.api_endpoint,
|
|
"request_url": api_metadata.request_url,
|
|
"response_status": api_metadata.response_status,
|
|
"response_time_ms": api_metadata.response_time_ms,
|
|
"fetch_timestamp": api_metadata.fetch_timestamp,
|
|
"user_agent": api_metadata.user_agent,
|
|
"authenticated": api_metadata.authenticated,
|
|
"rate_limit_delay_used": api_metadata.rate_limit_delay_used,
|
|
}
|
|
|
|
# Extract labels, descriptions, aliases
|
|
enrichment.labels = data.get("labels", {})
|
|
enrichment.descriptions = data.get("descriptions", {})
|
|
enrichment.aliases = data.get("aliases", {})
|
|
|
|
# Extract sitelinks
|
|
sitelinks = data.get("sitelinks", {})
|
|
for site, link_data in sitelinks.items():
|
|
if isinstance(link_data, dict):
|
|
enrichment.sitelinks[site] = link_data.get("title", str(link_data))
|
|
else:
|
|
enrichment.sitelinks[site] = str(link_data)
|
|
|
|
# Extract statements/claims
|
|
statements = data.get("statements", {})
|
|
|
|
for prop_id, prop_statements in statements.items():
|
|
prop_name = PROPERTY_LABELS.get(prop_id, prop_id)
|
|
|
|
if not prop_statements:
|
|
continue
|
|
|
|
values = []
|
|
for stmt in prop_statements:
|
|
value = extract_value_from_statement(stmt)
|
|
if value is not None:
|
|
values.append(value)
|
|
|
|
if not values:
|
|
continue
|
|
|
|
# Handle specific properties
|
|
if prop_id == "P31":
|
|
enrichment.instance_of = [{"id": v} if isinstance(v, str) else v for v in values]
|
|
elif prop_id == "P17":
|
|
enrichment.country = {"id": values[0]} if values else None
|
|
elif prop_id == "P131":
|
|
enrichment.location = {"id": values[0]} if values else None
|
|
elif prop_id == "P625":
|
|
if values and isinstance(values[0], dict):
|
|
enrichment.coordinates = values[0]
|
|
elif prop_id == "P571":
|
|
enrichment.inception = values[0] if values else None
|
|
elif prop_id == "P576":
|
|
enrichment.dissolution = values[0] if values else None
|
|
elif prop_id == "P856":
|
|
enrichment.official_website = values[0] if values else None
|
|
elif prop_id == "P18":
|
|
enrichment.image = values[0] if values else None
|
|
elif prop_id == "P154":
|
|
enrichment.logo = values[0] if values else None
|
|
elif prop_id in ["P791", "P214", "P227", "P244", "P268", "P269",
|
|
"P213", "P1566", "P2427", "P3500", "P5785", "P8168", "P8402"]:
|
|
enrichment.identifiers[prop_name] = values[0] if values else None
|
|
else:
|
|
enrichment.claims[prop_name] = values[0] if len(values) == 1 else values
|
|
|
|
return enrichment
|
|
|
|
|
|
def enrichment_to_dict(enrichment: WikidataEnrichment) -> Dict[str, Any]:
|
|
"""Convert WikidataEnrichment to a clean dictionary for YAML output."""
|
|
result: Dict[str, Any] = {
|
|
"wikidata_entity_id": enrichment.entity_id,
|
|
}
|
|
|
|
# Add API metadata first
|
|
if enrichment.api_metadata:
|
|
result["api_metadata"] = enrichment.api_metadata
|
|
|
|
# Add labels (prioritize nl, en)
|
|
if enrichment.labels:
|
|
result["wikidata_labels"] = enrichment.labels
|
|
if "nl" in enrichment.labels:
|
|
result["wikidata_label_nl"] = enrichment.labels["nl"]
|
|
if "en" in enrichment.labels:
|
|
result["wikidata_label_en"] = enrichment.labels["en"]
|
|
|
|
# Add descriptions
|
|
if enrichment.descriptions:
|
|
result["wikidata_descriptions"] = enrichment.descriptions
|
|
if "nl" in enrichment.descriptions:
|
|
result["wikidata_description_nl"] = enrichment.descriptions["nl"]
|
|
if "en" in enrichment.descriptions:
|
|
result["wikidata_description_en"] = enrichment.descriptions["en"]
|
|
|
|
# Add aliases
|
|
if enrichment.aliases:
|
|
result["wikidata_aliases"] = enrichment.aliases
|
|
|
|
# Add identifiers
|
|
if enrichment.identifiers:
|
|
result["wikidata_identifiers"] = {k: v for k, v in enrichment.identifiers.items() if v}
|
|
|
|
# Add instance types
|
|
if enrichment.instance_of:
|
|
result["wikidata_instance_of"] = enrichment.instance_of
|
|
|
|
# Add location data
|
|
if enrichment.country:
|
|
result["wikidata_country"] = enrichment.country
|
|
if enrichment.location:
|
|
result["wikidata_located_in"] = enrichment.location
|
|
if enrichment.coordinates:
|
|
result["wikidata_coordinates"] = enrichment.coordinates
|
|
|
|
# Add temporal data
|
|
if enrichment.inception:
|
|
result["wikidata_inception"] = enrichment.inception
|
|
if enrichment.dissolution:
|
|
result["wikidata_dissolution"] = enrichment.dissolution
|
|
|
|
# Add web presence
|
|
if enrichment.official_website:
|
|
result["wikidata_official_website"] = enrichment.official_website
|
|
|
|
# Add media
|
|
if enrichment.image:
|
|
result["wikidata_image"] = enrichment.image
|
|
if enrichment.logo:
|
|
result["wikidata_logo"] = enrichment.logo
|
|
|
|
# Add sitelinks (Wikipedia links)
|
|
if enrichment.sitelinks:
|
|
result["wikidata_sitelinks"] = enrichment.sitelinks
|
|
|
|
# Add other claims
|
|
if enrichment.claims:
|
|
result["wikidata_claims"] = enrichment.claims
|
|
|
|
return result
|
|
|
|
|
|
def get_processed_entries(entries_dir: Path) -> set:
|
|
"""Get set of already processed entry indices."""
|
|
processed = set()
|
|
for f in entries_dir.glob("*.yaml"):
|
|
# Extract index from filename like "0001_Q12345.yaml"
|
|
match = re.match(r"(\d+)_", f.name)
|
|
if match:
|
|
processed.add(int(match.group(1)))
|
|
return processed
|
|
|
|
|
|
def save_entry(entries_dir: Path, index: int, entry: Dict, enrichment_data: Optional[Dict], error: Optional[str] = None):
|
|
"""Save a single entry to a YAML file."""
|
|
wikidata_id = entry.get("wikidata_id", "unknown")
|
|
filename = f"{index:04d}_{wikidata_id}.yaml"
|
|
filepath = entries_dir / filename
|
|
|
|
output = {
|
|
"original_entry": entry,
|
|
"entry_index": index,
|
|
"processing_timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
if enrichment_data:
|
|
output["wikidata_enrichment"] = enrichment_data
|
|
output["enrichment_status"] = "success"
|
|
elif error:
|
|
output["enrichment_error"] = error
|
|
output["enrichment_status"] = "error"
|
|
else:
|
|
output["enrichment_status"] = "skipped"
|
|
output["skip_reason"] = "no_wikidata_id"
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(output, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
return filepath
|
|
|
|
|
|
def main():
|
|
"""Main entry point with per-entry storage and resume capability."""
|
|
# Paths
|
|
script_dir = Path(__file__).parent
|
|
data_dir = script_dir.parent / "data" / "nde"
|
|
input_file = data_dir / "nde_register_nl.yaml"
|
|
entries_dir = data_dir / "enriched" / "entries"
|
|
|
|
# Ensure entries directory exists
|
|
entries_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
logger.info(f"Input file: {input_file}")
|
|
logger.info(f"Entries directory: {entries_dir}")
|
|
|
|
# Load input YAML
|
|
logger.info("Loading input YAML file...")
|
|
with open(input_file, 'r', encoding='utf-8') as f:
|
|
entries = yaml.safe_load(f)
|
|
|
|
total_entries = len(entries)
|
|
logger.info(f"Loaded {total_entries} entries")
|
|
|
|
# Check for already processed entries
|
|
processed_indices = get_processed_entries(entries_dir)
|
|
logger.info(f"Found {len(processed_indices)} already processed entries")
|
|
|
|
# Count entries with wikidata_id
|
|
entries_with_wikidata = [i for i, e in enumerate(entries) if e.get("wikidata_id")]
|
|
remaining_to_process = [i for i in entries_with_wikidata if i not in processed_indices]
|
|
|
|
logger.info(f"Entries with wikidata_id: {len(entries_with_wikidata)}")
|
|
logger.info(f"Remaining to process: {len(remaining_to_process)}")
|
|
|
|
# Stats
|
|
success_count = 0
|
|
skip_count = 0
|
|
error_count = 0
|
|
rate_limit_count = 0
|
|
|
|
# Adaptive delay
|
|
current_delay = BASE_DELAY
|
|
consecutive_successes = 0
|
|
|
|
with httpx.Client(timeout=30.0) as client:
|
|
for i, entry in enumerate(entries):
|
|
# Skip already processed
|
|
if i in processed_indices:
|
|
continue
|
|
|
|
wikidata_id = entry.get("wikidata_id")
|
|
org_name = entry.get("organisatie", "Unknown")
|
|
|
|
if not wikidata_id:
|
|
# Save as skipped
|
|
save_entry(entries_dir, i, entry, None)
|
|
skip_count += 1
|
|
continue
|
|
|
|
# Ensure proper Q-number format
|
|
wikidata_id_str = str(wikidata_id)
|
|
if not wikidata_id_str.startswith("Q"):
|
|
wikidata_id_str = f"Q{wikidata_id_str}"
|
|
|
|
# Log progress
|
|
progress_pct = (i + 1) / total_entries * 100
|
|
logger.info(f"[{i+1}/{total_entries}] ({progress_pct:.1f}%) Enriching: {org_name} ({wikidata_id_str}) [delay={current_delay:.1f}s]")
|
|
|
|
# Fetch and enrich
|
|
try:
|
|
data, metadata = fetch_entity_with_metadata(wikidata_id_str, client, current_delay)
|
|
|
|
if data and metadata:
|
|
enrichment = parse_entity_data(wikidata_id_str, data, metadata)
|
|
enrichment_dict = enrichment_to_dict(enrichment)
|
|
save_entry(entries_dir, i, entry, enrichment_dict)
|
|
success_count += 1
|
|
consecutive_successes += 1
|
|
|
|
# Reduce delay after consecutive successes (min BASE_DELAY)
|
|
if consecutive_successes >= 5 and current_delay > BASE_DELAY:
|
|
current_delay = max(BASE_DELAY, current_delay / 1.5)
|
|
consecutive_successes = 0
|
|
logger.info(f"Reducing delay to {current_delay:.1f}s after successful requests")
|
|
|
|
elif metadata and metadata.response_status == 429:
|
|
# Rate limited - back off
|
|
rate_limit_count += 1
|
|
consecutive_successes = 0
|
|
current_delay = min(MAX_DELAY, current_delay * BACKOFF_FACTOR)
|
|
logger.warning(f"Rate limited! Increasing delay to {current_delay:.1f}s")
|
|
|
|
# Save as error but allow retry later
|
|
save_entry(entries_dir, i, entry, None, f"Rate limited (429)")
|
|
error_count += 1
|
|
|
|
# Extra wait on rate limit
|
|
time.sleep(current_delay * 2)
|
|
else:
|
|
save_entry(entries_dir, i, entry, None, "Failed to fetch from Wikidata")
|
|
error_count += 1
|
|
consecutive_successes = 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {org_name}: {e}")
|
|
save_entry(entries_dir, i, entry, None, str(e))
|
|
error_count += 1
|
|
consecutive_successes = 0
|
|
|
|
# Rate limiting delay
|
|
time.sleep(current_delay)
|
|
|
|
# Final summary
|
|
logger.info("=" * 60)
|
|
logger.info("ENRICHMENT COMPLETE")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Total entries: {total_entries}")
|
|
logger.info(f"Entries with wikidata_id: {len(entries_with_wikidata)}")
|
|
logger.info(f"Already processed (skipped): {len(processed_indices)}")
|
|
logger.info(f"Successfully enriched this run: {success_count}")
|
|
logger.info(f"Skipped (no wikidata_id): {skip_count}")
|
|
logger.info(f"Errors: {error_count}")
|
|
logger.info(f"Rate limit hits: {rate_limit_count}")
|
|
logger.info(f"Entries directory: {entries_dir}")
|
|
|
|
# Create summary log
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
log_file = data_dir / "enriched" / f"enrichment_log_{timestamp}.json"
|
|
log_data = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"input_file": str(input_file),
|
|
"entries_directory": str(entries_dir),
|
|
"total_entries": total_entries,
|
|
"entries_with_wikidata_id": len(entries_with_wikidata),
|
|
"previously_processed": len(processed_indices),
|
|
"successfully_enriched_this_run": success_count,
|
|
"skipped_no_wikidata_id": skip_count,
|
|
"errors": error_count,
|
|
"rate_limit_hits": rate_limit_count,
|
|
"authenticated": bool(WIKIDATA_API_TOKEN),
|
|
"base_delay_seconds": BASE_DELAY,
|
|
"final_delay_seconds": current_delay,
|
|
}
|
|
with open(log_file, 'w', encoding='utf-8') as f:
|
|
json.dump(log_data, f, indent=2)
|
|
logger.info(f"Log file: {log_file}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|