glam/scripts/resolve_qp_labels.py

443 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Resolve Q and P numbers to human-readable labels in enriched entries.
This script:
- Scans enriched YAML files for Q-numbers (entities) and P-numbers (properties)
- Fetches labels and basic info from Wikidata
- Updates entries IN-PLACE, keeping original Q/P numbers and adding labels
- Caches resolved entities to avoid duplicate API calls
Usage:
python scripts/resolve_qp_labels.py
Output:
Updates files in data/nde/enriched/entries/ with resolved labels
"""
import os
import sys
import time
import json
import yaml
import httpx
import re
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
import logging
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1"
WIKIDATA_API = "https://www.wikidata.org/w/api.php"
# Rate limiting
BASE_DELAY = 0.5 # Faster since we batch requests
MAX_BATCH_SIZE = 50 # Wikidata API limit for wbgetentities
WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-data@example.com")
USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL}) Python/httpx"
HEADERS = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
@dataclass
class EntityInfo:
"""Basic info about a Wikidata entity."""
id: str
label_en: Optional[str] = None
label_nl: Optional[str] = None
description_en: Optional[str] = None
description_nl: Optional[str] = None
instance_of: Optional[List[str]] = None # For Q entities
property_type: Optional[str] = None # For P entities (e.g., "external-id", "wikibase-item")
# Cache for resolved entities
entity_cache: Dict[str, EntityInfo] = {}
def extract_qp_numbers(obj: Any, found: Set[str] = None) -> Set[str]:
"""Recursively extract all Q and P numbers from a nested structure."""
if found is None:
found = set()
if isinstance(obj, str):
# Match Q or P followed by digits
matches = re.findall(r'\b([QP]\d+)\b', obj)
found.update(matches)
elif isinstance(obj, dict):
for key, value in obj.items():
# Check keys too (like "P276")
if re.match(r'^[QP]\d+$', key):
found.add(key)
extract_qp_numbers(value, found)
elif isinstance(obj, list):
for item in obj:
extract_qp_numbers(item, found)
return found
def fetch_entities_batch(entity_ids: List[str], client: httpx.Client) -> Dict[str, EntityInfo]:
"""Fetch multiple entities in a single API call."""
if not entity_ids:
return {}
# Separate Q and P entities
results = {}
# Use wbgetentities API for batch fetching
ids_str = "|".join(entity_ids)
params = {
"action": "wbgetentities",
"ids": ids_str,
"props": "labels|descriptions|claims|datatype",
"languages": "en|nl",
"format": "json",
}
try:
response = client.get(WIKIDATA_API, params=params, headers=HEADERS)
response.raise_for_status()
data = response.json()
entities = data.get("entities", {})
for entity_id, entity_data in entities.items():
if "missing" in entity_data:
continue
info = EntityInfo(id=entity_id)
# Extract labels
labels = entity_data.get("labels", {})
if "en" in labels:
info.label_en = labels["en"].get("value")
if "nl" in labels:
info.label_nl = labels["nl"].get("value")
# Extract descriptions
descriptions = entity_data.get("descriptions", {})
if "en" in descriptions:
info.description_en = descriptions["en"].get("value")
if "nl" in descriptions:
info.description_nl = descriptions["nl"].get("value")
# For properties, get datatype
if entity_id.startswith("P"):
info.property_type = entity_data.get("datatype")
# For entities, get instance_of (P31)
if entity_id.startswith("Q"):
claims = entity_data.get("claims", {})
p31_claims = claims.get("P31", [])
instance_of_ids = []
for claim in p31_claims[:3]: # Limit to first 3
mainsnak = claim.get("mainsnak", {})
datavalue = mainsnak.get("datavalue", {})
if datavalue.get("type") == "wikibase-entityid":
qid = datavalue.get("value", {}).get("id")
if qid:
instance_of_ids.append(qid)
if instance_of_ids:
info.instance_of = instance_of_ids
results[entity_id] = info
entity_cache[entity_id] = info
return results
except Exception as e:
logger.error(f"Error fetching batch: {e}")
return {}
def entity_info_to_dict(info: EntityInfo) -> Dict[str, Any]:
"""Convert EntityInfo to a dictionary for YAML output."""
result = {"id": info.id}
if info.label_en:
result["label_en"] = info.label_en
if info.label_nl:
result["label_nl"] = info.label_nl
if info.description_en:
result["description_en"] = info.description_en
if info.description_nl:
result["description_nl"] = info.description_nl
if info.instance_of:
result["instance_of"] = info.instance_of
if info.property_type:
result["property_type"] = info.property_type
return result
def resolve_value(value: Any, resolved_entities: Dict[str, EntityInfo]) -> Any:
"""
Resolve Q/P numbers in a value, keeping originals and adding labels.
Transforms:
"Q33506" -> {"id": "Q33506", "label_en": "museum", ...}
{"id": "Q55"} -> {"id": "Q55", "label_en": "Netherlands", ...}
"""
if isinstance(value, str):
# Check if it's a bare Q/P number
if re.match(r'^[QP]\d+$', value):
if value in resolved_entities:
return entity_info_to_dict(resolved_entities[value])
return value
return value
elif isinstance(value, dict):
# Check if it's an entity reference like {"id": "Q55"}
if "id" in value and isinstance(value["id"], str):
entity_id = value["id"]
if re.match(r'^[QP]\d+$', entity_id) and entity_id in resolved_entities:
# Merge original dict with resolved info
resolved = entity_info_to_dict(resolved_entities[entity_id])
# Keep any additional fields from original
for k, v in value.items():
if k not in resolved:
resolved[k] = v
return resolved
# Recursively process dict values
return {k: resolve_value(v, resolved_entities) for k, v in value.items()}
elif isinstance(value, list):
return [resolve_value(item, resolved_entities) for item in value]
return value
def resolve_claims_keys(claims: Dict[str, Any], resolved_entities: Dict[str, EntityInfo]) -> Dict[str, Any]:
"""
Resolve P-number keys in claims dict to include labels.
Transforms:
{"P276": "Q3028083", ...}
To:
{"P276_location": {"property": {"id": "P276", "label_en": "location"}, "value": {...}}, ...}
"""
resolved_claims = {}
for key, value in claims.items():
if re.match(r'^P\d+$', key):
# It's a P-number key
if key in resolved_entities:
prop_info = resolved_entities[key]
# Create a more descriptive key
label = prop_info.label_en or prop_info.label_nl or key
# Sanitize label for use as key
safe_label = re.sub(r'[^a-zA-Z0-9_]', '_', label.lower())
new_key = f"{key}_{safe_label}"
resolved_claims[new_key] = {
"property": entity_info_to_dict(prop_info),
"value": resolve_value(value, resolved_entities)
}
else:
resolved_claims[key] = resolve_value(value, resolved_entities)
else:
# Keep as-is but still resolve values
resolved_claims[key] = resolve_value(value, resolved_entities)
return resolved_claims
def process_enrichment(enrichment: Dict[str, Any], resolved_entities: Dict[str, EntityInfo]) -> Dict[str, Any]:
"""Process the wikidata_enrichment section to add resolved labels."""
result = dict(enrichment)
# Add a resolved_entities section with all entity info
qp_in_enrichment = extract_qp_numbers(enrichment)
result["_resolved_entities"] = {
qp: entity_info_to_dict(resolved_entities[qp])
for qp in sorted(qp_in_enrichment)
if qp in resolved_entities
}
# Resolve specific sections
# instance_of
if "wikidata_instance_of" in result:
result["wikidata_instance_of"] = resolve_value(
result["wikidata_instance_of"], resolved_entities
)
# country
if "wikidata_country" in result:
result["wikidata_country"] = resolve_value(
result["wikidata_country"], resolved_entities
)
# located_in
if "wikidata_located_in" in result:
result["wikidata_located_in"] = resolve_value(
result["wikidata_located_in"], resolved_entities
)
# claims - resolve both keys and values
if "wikidata_claims" in result:
result["wikidata_claims"] = resolve_claims_keys(
result["wikidata_claims"], resolved_entities
)
return result
def process_file(filepath: Path, resolved_entities: Dict[str, EntityInfo]) -> bool:
"""Process a single enriched entry file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return False
# Check if already processed
enrichment = data.get("wikidata_enrichment")
if not enrichment:
return False
if "_resolved_entities" in enrichment:
# Already processed
return False
# Process the enrichment
data["wikidata_enrichment"] = process_enrichment(enrichment, resolved_entities)
data["qp_resolution_timestamp"] = datetime.now(timezone.utc).isoformat()
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
return True
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
return False
def main():
"""Main entry point."""
script_dir = Path(__file__).parent
entries_dir = script_dir.parent / "data" / "nde" / "enriched" / "entries"
cache_file = entries_dir.parent / "entity_cache.json"
logger.info(f"Entries directory: {entries_dir}")
# Load existing cache
if cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
for entity_id, info_dict in cache_data.items():
entity_cache[entity_id] = EntityInfo(
id=info_dict["id"],
label_en=info_dict.get("label_en"),
label_nl=info_dict.get("label_nl"),
description_en=info_dict.get("description_en"),
description_nl=info_dict.get("description_nl"),
instance_of=info_dict.get("instance_of"),
property_type=info_dict.get("property_type"),
)
logger.info(f"Loaded {len(entity_cache)} cached entities")
except Exception as e:
logger.warning(f"Could not load cache: {e}")
# Get all YAML files
yaml_files = sorted(entries_dir.glob("*.yaml"))
logger.info(f"Found {len(yaml_files)} entry files")
# First pass: collect all Q/P numbers
logger.info("Scanning files for Q/P numbers...")
all_qp_numbers: Set[str] = set()
files_to_process = []
for filepath in yaml_files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
continue
enrichment = data.get("wikidata_enrichment")
if not enrichment:
continue
# Skip already processed
if "_resolved_entities" in enrichment:
continue
files_to_process.append(filepath)
qp_numbers = extract_qp_numbers(enrichment)
all_qp_numbers.update(qp_numbers)
except Exception as e:
logger.warning(f"Error scanning {filepath}: {e}")
logger.info(f"Found {len(all_qp_numbers)} unique Q/P numbers in {len(files_to_process)} files to process")
# Remove already cached
to_fetch = [qp for qp in all_qp_numbers if qp not in entity_cache]
logger.info(f"Need to fetch {len(to_fetch)} entities (have {len(entity_cache)} cached)")
# Fetch in batches
with httpx.Client(timeout=30.0) as client:
for i in range(0, len(to_fetch), MAX_BATCH_SIZE):
batch = to_fetch[i:i + MAX_BATCH_SIZE]
logger.info(f"Fetching batch {i // MAX_BATCH_SIZE + 1}/{(len(to_fetch) + MAX_BATCH_SIZE - 1) // MAX_BATCH_SIZE} ({len(batch)} entities)")
fetch_entities_batch(batch, client)
time.sleep(BASE_DELAY)
logger.info(f"Total cached entities: {len(entity_cache)}")
# Save cache
cache_data = {
entity_id: entity_info_to_dict(info)
for entity_id, info in entity_cache.items()
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
logger.info(f"Saved cache to {cache_file}")
# Second pass: process files
logger.info("Processing files...")
processed_count = 0
for i, filepath in enumerate(files_to_process):
if (i + 1) % 50 == 0:
logger.info(f"Processing file {i + 1}/{len(files_to_process)}")
if process_file(filepath, entity_cache):
processed_count += 1
logger.info("=" * 60)
logger.info("Q/P RESOLUTION COMPLETE")
logger.info("=" * 60)
logger.info(f"Files processed: {processed_count}")
logger.info(f"Entities resolved: {len(entity_cache)}")
logger.info(f"Cache file: {cache_file}")
return 0
if __name__ == "__main__":
sys.exit(main())