glam/scripts/enrich_custodians_wikidata_inception.py
2025-12-09 07:56:35 +01:00

498 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Enrich custodian YAML files with full Wikidata data, specifically targeting inception dates.
This script:
1. Scans all YAML files in data/custodian/
2. Finds records with wikidata_entity_id but missing wikidata_inception
3. Fetches full Wikidata data from REST API (including P571 inception)
4. Updates the YAML files with enriched wikidata_enrichment section
5. Generates a report of enriched records
The script respects Wikidata rate limits and supports resumable processing.
Usage:
python scripts/enrich_custodians_wikidata_inception.py [--dry-run] [--limit N] [--country XX]
Options:
--dry-run Show what would be enriched without modifying files
--limit N Process only first N files (for testing)
--country XX Only process files for country code XX (e.g., JP, CZ, NL)
--skip-existing Skip files that already have wikidata_inception
Environment Variables:
WIKIDATA_API_TOKEN - Optional OAuth2 token for increased rate limits (5,000 req/hr)
WIKIMEDIA_CONTACT_EMAIL - Contact email for User-Agent (required by Wikimedia policy)
"""
import argparse
import json
import logging
import os
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import httpx
import yaml
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1"
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
PROGRESS_FILE = Path(__file__).parent.parent / "data" / "custodian" / ".inception_enrichment_progress.json"
# Rate limiting: 500 req/hr for anonymous, 5000 req/hr with token
WIKIDATA_API_TOKEN = os.getenv("WIKIDATA_API_TOKEN", "")
WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-data@example.com")
USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL}) Python/httpx"
# Request delay based on authentication status
if WIKIDATA_API_TOKEN:
REQUEST_DELAY = 0.75 # ~4800 requests per hour (below 5000 limit)
logger.info("Using authenticated mode: 5,000 req/hr limit")
else:
REQUEST_DELAY = 7.5 # ~480 requests per hour (below 500 limit)
logger.info("Using anonymous mode: 500 req/hr limit (use WIKIDATA_API_TOKEN for faster processing)")
# Headers
HEADERS = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
if WIKIDATA_API_TOKEN:
HEADERS["Authorization"] = f"Bearer {WIKIDATA_API_TOKEN}"
# Property IDs for heritage institutions
PROPERTY_LABELS = {
"P31": "instance_of",
"P17": "country",
"P131": "located_in",
"P625": "coordinates",
"P571": "inception",
"P576": "dissolution",
"P856": "official_website",
"P18": "image",
"P154": "logo",
"P791": "isil",
"P214": "viaf",
"P227": "gnd",
"P244": "lcnaf",
"P268": "bnf",
"P269": "idref",
"P213": "isni",
"P1566": "geonames",
}
@dataclass
class WikidataEnrichment:
"""Container for Wikidata enrichment data."""
entity_id: str
labels: Dict[str, str] = field(default_factory=dict)
descriptions: Dict[str, str] = field(default_factory=dict)
instance_of: List[Dict[str, str]] = field(default_factory=list)
country: Optional[Dict[str, str]] = None
location: Optional[Dict[str, str]] = None
coordinates: Optional[Dict[str, float]] = None
inception: Optional[str] = None
dissolution: Optional[str] = None
official_website: Optional[str] = None
image: Optional[str] = None
logo: Optional[str] = None
identifiers: Dict[str, str] = field(default_factory=dict)
fetch_timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
def extract_value_from_statement(statement: Dict) -> Any:
"""Extract the value from a Wikidata statement structure."""
try:
value_data = statement.get("value", {})
value_type = value_data.get("type")
content = value_data.get("content")
# Handle dict content (time, coordinates, etc.) regardless of value_type
if isinstance(content, dict):
if "entity-type" in content or "id" in content:
return content.get("id", content)
elif "time" in content:
# Parse time value to extract just the date
time_val = content.get("time", "")
# Format: "+1854-11-28T00:00:00Z" -> "1854-11-28"
if time_val.startswith("+") or time_val.startswith("-"):
time_val = time_val[1:]
if "T" in time_val:
time_val = time_val.split("T")[0]
return time_val
elif "latitude" in content and "longitude" in content:
return {
"latitude": content.get("latitude"),
"longitude": content.get("longitude"),
"precision": content.get("precision")
}
else:
return content
else:
return content
except Exception:
return None
def fetch_entity_data(entity_id: str, client: httpx.Client) -> Optional[Dict]:
"""Fetch full entity data from Wikibase REST API."""
url = f"{WIKIDATA_REST_API}/entities/items/{entity_id}"
try:
response = client.get(url, headers=HEADERS)
# Handle OAuth errors (retry without auth)
if response.status_code == 403:
headers_no_auth = {k: v for k, v in HEADERS.items() if k != "Authorization"}
response = client.get(url, headers=headers_no_auth)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.warning(f"Entity {entity_id} not found")
else:
logger.error(f"HTTP error fetching {entity_id}: {e}")
return None
except Exception as e:
logger.error(f"Error fetching {entity_id}: {e}")
return None
def parse_entity_data(entity_id: str, data: Dict) -> WikidataEnrichment:
"""Parse the full entity data into a WikidataEnrichment object."""
enrichment = WikidataEnrichment(entity_id=entity_id)
# Extract labels
enrichment.labels = data.get("labels", {})
# Extract descriptions
enrichment.descriptions = data.get("descriptions", {})
# Extract statements/claims
statements = data.get("statements", {})
for prop_id, prop_statements in statements.items():
if not prop_statements:
continue
values = []
for stmt in prop_statements:
value = extract_value_from_statement(stmt)
if value is not None:
values.append(value)
if not values:
continue
# Handle specific properties
if prop_id == "P31": # Instance of
enrichment.instance_of = [{"id": v} if isinstance(v, str) else v for v in values]
elif prop_id == "P17": # Country
enrichment.country = {"id": values[0]} if values else None
elif prop_id == "P131": # Located in
enrichment.location = {"id": values[0]} if values else None
elif prop_id == "P625": # Coordinates
if values and isinstance(values[0], dict):
enrichment.coordinates = values[0]
elif prop_id == "P571": # Inception
enrichment.inception = values[0] if values else None
elif prop_id == "P576": # Dissolution
enrichment.dissolution = values[0] if values else None
elif prop_id == "P856": # Official website
enrichment.official_website = values[0] if values else None
elif prop_id == "P18": # Image
enrichment.image = values[0] if values else None
elif prop_id == "P154": # Logo
enrichment.logo = values[0] if values else None
elif prop_id in PROPERTY_LABELS:
prop_name = PROPERTY_LABELS[prop_id]
if prop_name not in ["instance_of", "country", "located_in", "coordinates",
"inception", "dissolution", "official_website", "image", "logo"]:
if values and values[0]:
enrichment.identifiers[prop_name] = str(values[0])
return enrichment
def enrichment_to_dict(enrichment: WikidataEnrichment) -> Dict:
"""Convert WikidataEnrichment to a dictionary for YAML output."""
result = {
"wikidata_entity_id": enrichment.entity_id,
"api_metadata": {
"api_endpoint": WIKIDATA_REST_API,
"fetch_timestamp": enrichment.fetch_timestamp,
"user_agent": USER_AGENT,
}
}
# Add labels
if enrichment.labels:
result["wikidata_labels"] = enrichment.labels
if "en" in enrichment.labels:
result["wikidata_label_en"] = enrichment.labels["en"]
if "nl" in enrichment.labels:
result["wikidata_label_nl"] = enrichment.labels["nl"]
if "ja" in enrichment.labels:
result["wikidata_label_ja"] = enrichment.labels["ja"]
# Add descriptions
if enrichment.descriptions:
result["wikidata_descriptions"] = enrichment.descriptions
if "en" in enrichment.descriptions:
result["wikidata_description_en"] = enrichment.descriptions["en"]
# Add identifiers
if enrichment.identifiers:
result["wikidata_identifiers"] = {k: v for k, v in enrichment.identifiers.items() if v}
# Add instance types
if enrichment.instance_of:
result["wikidata_instance_of"] = enrichment.instance_of
# Add location data
if enrichment.country:
result["wikidata_country"] = enrichment.country
if enrichment.location:
result["wikidata_located_in"] = enrichment.location
if enrichment.coordinates:
result["wikidata_coordinates"] = enrichment.coordinates
# Add temporal data (the key field we're enriching!)
if enrichment.inception:
result["wikidata_inception"] = enrichment.inception
if enrichment.dissolution:
result["wikidata_dissolution"] = enrichment.dissolution
# Add web presence
if enrichment.official_website:
result["wikidata_official_website"] = enrichment.official_website
# Add media
if enrichment.image:
result["wikidata_image"] = enrichment.image
if enrichment.logo:
result["wikidata_logo"] = enrichment.logo
return result
def get_wikidata_entity_id(data: Dict) -> Optional[str]:
"""Extract Wikidata entity ID from a custodian YAML file."""
# Check wikidata_enrichment section first
wd = data.get("wikidata_enrichment", {})
if wd and wd.get("wikidata_entity_id"):
return wd.get("wikidata_entity_id")
# Check identifiers array
identifiers = data.get("identifiers", [])
for ident in identifiers:
if isinstance(ident, dict):
scheme = ident.get("identifier_scheme", "")
if scheme.lower() == "wikidata":
return ident.get("identifier_value")
# Check original_entry identifiers
original = data.get("original_entry", {})
for ident in original.get("identifiers", []):
if isinstance(ident, dict):
scheme = ident.get("identifier_scheme", "")
if scheme.lower() == "wikidata":
return ident.get("identifier_value")
return None
def has_wikidata_inception(data: Dict) -> bool:
"""Check if a custodian YAML file already has wikidata_inception."""
wd = data.get("wikidata_enrichment", {})
if wd and wd.get("wikidata_inception"):
return True
return False
def load_progress() -> Dict:
"""Load progress from checkpoint file."""
if PROGRESS_FILE.exists():
try:
with open(PROGRESS_FILE, 'r') as f:
return json.load(f)
except Exception:
pass
return {"processed_files": [], "stats": {}}
def save_progress(progress: Dict):
"""Save progress to checkpoint file."""
try:
with open(PROGRESS_FILE, 'w') as f:
json.dump(progress, f, indent=2)
except Exception as e:
logger.error(f"Failed to save progress: {e}")
def main():
parser = argparse.ArgumentParser(description="Enrich custodian files with Wikidata inception dates")
parser.add_argument("--dry-run", action="store_true", help="Show what would be enriched without modifying files")
parser.add_argument("--limit", type=int, default=0, help="Process only first N files (0 = no limit)")
parser.add_argument("--country", type=str, help="Only process files for country code XX (e.g., JP, CZ)")
parser.add_argument("--skip-existing", action="store_true", help="Skip files that already have wikidata_inception")
parser.add_argument("--resume", action="store_true", help="Resume from last checkpoint")
args = parser.parse_args()
# Load progress if resuming
progress = load_progress() if args.resume else {"processed_files": [], "stats": {}}
processed_files = set(progress.get("processed_files", []))
# Statistics
stats = {
"total_scanned": 0,
"needs_enrichment": 0,
"already_has_inception": 0,
"no_wikidata_id": 0,
"enriched_with_inception": 0,
"enriched_no_inception": 0,
"errors": 0,
"skipped_already_processed": 0,
}
# Find all YAML files
pattern = f"{args.country}-*.yaml" if args.country else "*.yaml"
yaml_files = sorted(CUSTODIAN_DIR.glob(pattern))
logger.info(f"Found {len(yaml_files)} YAML files in {CUSTODIAN_DIR}")
# Filter and prepare files to process
files_to_process = []
for yaml_file in yaml_files:
stats["total_scanned"] += 1
# Skip if already processed in previous run
if args.resume and yaml_file.name in processed_files:
stats["skipped_already_processed"] += 1
continue
try:
with open(yaml_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
continue
# Check if has wikidata_id
entity_id = get_wikidata_entity_id(data)
if not entity_id:
stats["no_wikidata_id"] += 1
continue
# Check if already has inception
if has_wikidata_inception(data):
stats["already_has_inception"] += 1
if args.skip_existing:
continue
stats["needs_enrichment"] += 1
files_to_process.append((yaml_file, data, entity_id))
except Exception as e:
logger.error(f"Error reading {yaml_file}: {e}")
stats["errors"] += 1
logger.info(f"Files needing enrichment: {len(files_to_process)}")
logger.info(f"Files already with inception: {stats['already_has_inception']}")
logger.info(f"Files without Wikidata ID: {stats['no_wikidata_id']}")
if args.limit > 0:
files_to_process = files_to_process[:args.limit]
logger.info(f"Limited to first {args.limit} files")
if args.dry_run:
logger.info("DRY RUN - No files will be modified")
for yaml_file, _, entity_id in files_to_process[:20]:
logger.info(f" Would enrich: {yaml_file.name} ({entity_id})")
if len(files_to_process) > 20:
logger.info(f" ... and {len(files_to_process) - 20} more")
return
# Process files
with httpx.Client(timeout=30.0) as client:
for i, (yaml_file, data, entity_id) in enumerate(files_to_process):
try:
logger.info(f"[{i+1}/{len(files_to_process)}] Enriching {yaml_file.name} ({entity_id})")
# Fetch Wikidata data
entity_data = fetch_entity_data(entity_id, client)
if entity_data is None:
logger.warning(f" Could not fetch data for {entity_id}")
stats["errors"] += 1
continue
# Parse enrichment
enrichment = parse_entity_data(entity_id, entity_data)
enrichment_dict = enrichment_to_dict(enrichment)
# Update the YAML data
data["wikidata_enrichment"] = enrichment_dict
if enrichment.inception:
stats["enriched_with_inception"] += 1
logger.info(f" Found inception: {enrichment.inception}")
else:
stats["enriched_no_inception"] += 1
logger.info(f" No inception found in Wikidata")
# Write back to file
with open(yaml_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Update progress
processed_files.add(yaml_file.name)
progress["processed_files"] = list(processed_files)
progress["stats"] = stats
# Save progress periodically
if (i + 1) % 10 == 0:
save_progress(progress)
# Rate limiting
time.sleep(REQUEST_DELAY)
except Exception as e:
logger.error(f"Error processing {yaml_file.name}: {e}")
stats["errors"] += 1
# Save final progress
save_progress(progress)
# Print summary
logger.info("\n" + "=" * 60)
logger.info("ENRICHMENT COMPLETE")
logger.info("=" * 60)
logger.info(f"Total files scanned: {stats['total_scanned']}")
logger.info(f"Files needing enrichment: {stats['needs_enrichment']}")
logger.info(f"Files already with inception: {stats['already_has_inception']}")
logger.info(f"Files without Wikidata ID: {stats['no_wikidata_id']}")
logger.info(f"Successfully enriched with inception: {stats['enriched_with_inception']}")
logger.info(f"Enriched but no inception in Wikidata: {stats['enriched_no_inception']}")
logger.info(f"Errors: {stats['errors']}")
logger.info("=" * 60)
if __name__ == "__main__":
main()