glam/scripts/enrich_czech_sigla.py
2025-12-21 00:01:54 +01:00

382 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Enrich Czech custodian files using Sigla identifier matching against Wikidata.
Czech libraries have Sigla codes (e.g., "BEG501") which are stored in Wikidata
as property P9559. This script:
1. Fetches all Sigla→Wikidata mappings from Wikidata
2. Matches our CZ files by Sigla code
3. Enriches matched files with Wikidata metadata
Usage:
python scripts/enrich_czech_sigla.py [--dry-run] [--limit N]
"""
import argparse
import glob
import json
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import httpx
import yaml
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('cz_sigla_enrichment.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
WIKIDATA_SPARQL_ENDPOINT = "https://query.wikidata.org/sparql"
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1/entities/items"
# Rate limiting
REQUEST_DELAY = 0.5 # seconds between requests
def fetch_all_sigla_mappings() -> dict[str, dict]:
"""
Fetch all Czech Sigla → Wikidata QID mappings.
Returns:
Dict mapping Sigla code to {qid, label}
"""
logger.info("Fetching all Czech Sigla mappings from Wikidata...")
query = """
SELECT ?item ?itemLabel ?sigla ?itemDescription ?coord WHERE {
?item wdt:P9559 ?sigla .
OPTIONAL { ?item wdt:P625 ?coord . }
SERVICE wikibase:label { bd:serviceParam wikibase:language "cs,en" }
}
"""
headers = {
"Accept": "application/sparql-results+json",
"User-Agent": "GLAM-Heritage-Custodian-Project/1.0 (https://github.com/heritage-custodian; contact@example.org) Python/httpx"
}
try:
with httpx.Client(timeout=60.0) as client:
response = client.post(
WIKIDATA_SPARQL_ENDPOINT,
data={"query": query, "format": "json"},
headers=headers
)
response.raise_for_status()
data = response.json()
except Exception as e:
logger.error(f"Failed to fetch Sigla mappings: {e}")
return {}
mappings = {}
for binding in data.get("results", {}).get("bindings", []):
sigla = binding.get("sigla", {}).get("value", "")
qid = binding.get("item", {}).get("value", "").split("/")[-1]
label = binding.get("itemLabel", {}).get("value", "")
description = binding.get("itemDescription", {}).get("value", "")
coord = binding.get("coord", {}).get("value", "")
if sigla and qid:
mappings[sigla] = {
"qid": qid,
"label": label,
"description": description,
"coordinates": coord
}
logger.info(f"Fetched {len(mappings)} Sigla→Wikidata mappings")
return mappings
def fetch_wikidata_details(qid: str) -> dict | None:
"""Fetch detailed entity data from Wikidata REST API."""
url = f"{WIKIDATA_REST_API}/{qid}"
headers = {
"Accept": "application/json",
"User-Agent": "GLAM-Heritage-Custodian-Project/1.0 (https://github.com/heritage-custodian; contact@example.org) Python/httpx"
}
try:
with httpx.Client(timeout=30.0) as client:
response = client.get(url, headers=headers)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except Exception as e:
logger.warning(f"Failed to fetch details for {qid}: {e}")
return None
def extract_sigla_from_file(filepath: Path) -> str | None:
"""Extract Sigla identifier from a custodian YAML file."""
try:
with open(filepath) as f:
data = yaml.safe_load(f)
# Check original_entry.identifiers
identifiers = data.get("original_entry", {}).get("identifiers", [])
for ident in identifiers:
if ident.get("identifier_scheme") == "Sigla":
return ident.get("identifier_value")
# Also check top-level identifiers
identifiers = data.get("identifiers", [])
for ident in identifiers:
if ident.get("identifier_scheme") == "Sigla":
return ident.get("identifier_value")
except Exception as e:
logger.warning(f"Error reading {filepath}: {e}")
return None
def is_already_enriched(filepath: Path) -> bool:
"""Check if file already has Wikidata enrichment."""
try:
with open(filepath) as f:
data = yaml.safe_load(f)
# Check for wikidata_enrichment section
if data.get("wikidata_enrichment"):
return True
# Check for Wikidata identifier
for ident in data.get("identifiers", []):
if ident.get("identifier_scheme") == "Wikidata":
return True
except:
pass
return False
def enrich_file(filepath: Path, wikidata_info: dict, fetch_details: bool = True) -> bool:
"""
Enrich a custodian file with Wikidata data.
Args:
filepath: Path to YAML file
wikidata_info: Dict with qid, label, description from SPARQL
fetch_details: Whether to fetch additional details via REST API
Returns:
True if file was enriched, False otherwise
"""
try:
with open(filepath) as f:
data = yaml.safe_load(f)
except Exception as e:
logger.error(f"Error reading {filepath}: {e}")
return False
qid = wikidata_info["qid"]
# Optionally fetch additional details
details = None
if fetch_details:
time.sleep(REQUEST_DELAY)
details = fetch_wikidata_details(qid)
# Build enrichment data
enrichment = {
"wikidata_id": qid,
"wikidata_url": f"https://www.wikidata.org/wiki/{qid}",
"matched_by": "sigla_identifier",
"matched_sigla": extract_sigla_from_file(filepath),
"wikidata_label": wikidata_info.get("label", ""),
"wikidata_description": wikidata_info.get("description", ""),
"enrichment_date": datetime.now(timezone.utc).isoformat(),
"enrichment_version": "2.1.0"
}
# Add coordinates if available
if wikidata_info.get("coordinates"):
coord_str = wikidata_info["coordinates"]
# Parse "Point(lon lat)" format
if coord_str.startswith("Point("):
try:
coords = coord_str.replace("Point(", "").replace(")", "").split()
enrichment["wikidata_coordinates"] = {
"longitude": float(coords[0]),
"latitude": float(coords[1])
}
except:
pass
# Extract additional info from REST API response
if details:
statements = details.get("statements", {})
# P856 - official website
if "P856" in statements:
for stmt in statements["P856"]:
val = stmt.get("value", {}).get("content")
if val:
enrichment["official_website"] = val
break
# P18 - image
if "P18" in statements:
for stmt in statements["P18"]:
val = stmt.get("value", {}).get("content")
if val:
enrichment["image"] = f"https://commons.wikimedia.org/wiki/Special:FilePath/{val.replace(' ', '_')}"
break
# P31 - instance of (to get institution type)
if "P31" in statements:
instance_types = []
for stmt in statements["P31"]:
val = stmt.get("value", {}).get("content")
if val:
instance_types.append(val)
if instance_types:
enrichment["instance_of"] = instance_types
# P571 - inception date
if "P571" in statements:
for stmt in statements["P571"]:
val = stmt.get("value", {}).get("content", {})
if isinstance(val, dict) and "time" in val:
enrichment["inception"] = val["time"]
break
# P131 - located in administrative entity
if "P131" in statements:
for stmt in statements["P131"]:
val = stmt.get("value", {}).get("content")
if val:
enrichment["located_in"] = val
break
# Update the file
data["wikidata_enrichment"] = enrichment
# Also add Wikidata identifier to identifiers list if not present
identifiers = data.get("identifiers", [])
has_wikidata_id = any(i.get("identifier_scheme") == "Wikidata" for i in identifiers)
if not has_wikidata_id:
identifiers.append({
"identifier_scheme": "Wikidata",
"identifier_value": qid,
"identifier_url": f"https://www.wikidata.org/wiki/{qid}"
})
data["identifiers"] = identifiers
# Write back
try:
with open(filepath, "w") as f:
yaml.dump(data, f, allow_unicode=True, sort_keys=False, default_flow_style=False)
return True
except Exception as e:
logger.error(f"Error writing {filepath}: {e}")
return False
def main():
parser = argparse.ArgumentParser(description="Enrich Czech custodian files using Sigla matching")
parser.add_argument("--dry-run", action="store_true", help="Don't write changes, just report matches")
parser.add_argument("--limit", type=int, default=0, help="Limit number of files to process (0=all)")
parser.add_argument("--no-details", action="store_true", help="Skip fetching detailed entity data")
args = parser.parse_args()
# Find all CZ files
cz_files = sorted(glob.glob("/Users/kempersc/apps/glam/data/custodian/CZ-*.yaml"))
logger.info(f"Found {len(cz_files)} Czech custodian files")
# Fetch all Sigla mappings
sigla_mappings = fetch_all_sigla_mappings()
if not sigla_mappings:
logger.error("Failed to fetch Sigla mappings, aborting")
return 1
# Track statistics
stats = {
"total_files": len(cz_files),
"files_with_sigla": 0,
"already_enriched": 0,
"matches_found": 0,
"files_enriched": 0,
"no_match": 0,
"errors": 0
}
processed = 0
for filepath in cz_files:
filepath = Path(filepath)
# Check limit
if args.limit > 0 and processed >= args.limit:
logger.info(f"Reached limit of {args.limit} files")
break
# Extract Sigla from file
sigla = extract_sigla_from_file(filepath)
if not sigla:
continue
stats["files_with_sigla"] += 1
processed += 1
# Check if already enriched
if is_already_enriched(filepath):
stats["already_enriched"] += 1
continue
# Look up in Wikidata mappings
if sigla not in sigla_mappings:
stats["no_match"] += 1
if processed % 500 == 0:
logger.info(f"Processed {processed} files, {stats['matches_found']} matches so far")
continue
wikidata_info = sigla_mappings[sigla]
stats["matches_found"] += 1
logger.info(f"Match: {filepath.name} (Sigla: {sigla}) → {wikidata_info['qid']} ({wikidata_info['label']})")
if args.dry_run:
continue
# Enrich the file
if enrich_file(filepath, wikidata_info, fetch_details=not args.no_details):
stats["files_enriched"] += 1
else:
stats["errors"] += 1
# Progress update
if stats["files_enriched"] % 50 == 0:
logger.info(f"Progress: {stats['files_enriched']} files enriched")
# Final report
logger.info("=" * 60)
logger.info("Czech Sigla Enrichment Complete")
logger.info("=" * 60)
logger.info(f"Total CZ files: {stats['total_files']}")
logger.info(f"Files with Sigla: {stats['files_with_sigla']}")
logger.info(f"Already enriched: {stats['already_enriched']}")
logger.info(f"Sigla matches found: {stats['matches_found']}")
logger.info(f"Files enriched: {stats['files_enriched']}")
logger.info(f"No Wikidata match: {stats['no_match']}")
logger.info(f"Errors: {stats['errors']}")
match_rate = (stats['matches_found'] / stats['files_with_sigla'] * 100) if stats['files_with_sigla'] > 0 else 0
logger.info(f"Match rate: {match_rate:.1f}%")
return 0
if __name__ == "__main__":
sys.exit(main())