382 lines
13 KiB
Python
Executable file
382 lines
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Enrich Czech custodian files using Sigla identifier matching against Wikidata.
|
|
|
|
Czech libraries have Sigla codes (e.g., "BEG501") which are stored in Wikidata
|
|
as property P9559. This script:
|
|
1. Fetches all Sigla→Wikidata mappings from Wikidata
|
|
2. Matches our CZ files by Sigla code
|
|
3. Enriches matched files with Wikidata metadata
|
|
|
|
Usage:
|
|
python scripts/enrich_czech_sigla.py [--dry-run] [--limit N]
|
|
"""
|
|
|
|
import argparse
|
|
import glob
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import yaml
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('cz_sigla_enrichment.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
WIKIDATA_SPARQL_ENDPOINT = "https://query.wikidata.org/sparql"
|
|
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1/entities/items"
|
|
|
|
# Rate limiting
|
|
REQUEST_DELAY = 0.5 # seconds between requests
|
|
|
|
|
|
def fetch_all_sigla_mappings() -> dict[str, dict]:
|
|
"""
|
|
Fetch all Czech Sigla → Wikidata QID mappings.
|
|
|
|
Returns:
|
|
Dict mapping Sigla code to {qid, label}
|
|
"""
|
|
logger.info("Fetching all Czech Sigla mappings from Wikidata...")
|
|
|
|
query = """
|
|
SELECT ?item ?itemLabel ?sigla ?itemDescription ?coord WHERE {
|
|
?item wdt:P9559 ?sigla .
|
|
OPTIONAL { ?item wdt:P625 ?coord . }
|
|
SERVICE wikibase:label { bd:serviceParam wikibase:language "cs,en" }
|
|
}
|
|
"""
|
|
|
|
headers = {
|
|
"Accept": "application/sparql-results+json",
|
|
"User-Agent": "GLAM-Heritage-Custodian-Project/1.0 (https://github.com/heritage-custodian; contact@example.org) Python/httpx"
|
|
}
|
|
|
|
try:
|
|
with httpx.Client(timeout=60.0) as client:
|
|
response = client.post(
|
|
WIKIDATA_SPARQL_ENDPOINT,
|
|
data={"query": query, "format": "json"},
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch Sigla mappings: {e}")
|
|
return {}
|
|
|
|
mappings = {}
|
|
for binding in data.get("results", {}).get("bindings", []):
|
|
sigla = binding.get("sigla", {}).get("value", "")
|
|
qid = binding.get("item", {}).get("value", "").split("/")[-1]
|
|
label = binding.get("itemLabel", {}).get("value", "")
|
|
description = binding.get("itemDescription", {}).get("value", "")
|
|
coord = binding.get("coord", {}).get("value", "")
|
|
|
|
if sigla and qid:
|
|
mappings[sigla] = {
|
|
"qid": qid,
|
|
"label": label,
|
|
"description": description,
|
|
"coordinates": coord
|
|
}
|
|
|
|
logger.info(f"Fetched {len(mappings)} Sigla→Wikidata mappings")
|
|
return mappings
|
|
|
|
|
|
def fetch_wikidata_details(qid: str) -> dict | None:
|
|
"""Fetch detailed entity data from Wikidata REST API."""
|
|
url = f"{WIKIDATA_REST_API}/{qid}"
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"User-Agent": "GLAM-Heritage-Custodian-Project/1.0 (https://github.com/heritage-custodian; contact@example.org) Python/httpx"
|
|
}
|
|
|
|
try:
|
|
with httpx.Client(timeout=30.0) as client:
|
|
response = client.get(url, headers=headers)
|
|
if response.status_code == 404:
|
|
return None
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch details for {qid}: {e}")
|
|
return None
|
|
|
|
|
|
def extract_sigla_from_file(filepath: Path) -> str | None:
|
|
"""Extract Sigla identifier from a custodian YAML file."""
|
|
try:
|
|
with open(filepath) as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
# Check original_entry.identifiers
|
|
identifiers = data.get("original_entry", {}).get("identifiers", [])
|
|
for ident in identifiers:
|
|
if ident.get("identifier_scheme") == "Sigla":
|
|
return ident.get("identifier_value")
|
|
|
|
# Also check top-level identifiers
|
|
identifiers = data.get("identifiers", [])
|
|
for ident in identifiers:
|
|
if ident.get("identifier_scheme") == "Sigla":
|
|
return ident.get("identifier_value")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error reading {filepath}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def is_already_enriched(filepath: Path) -> bool:
|
|
"""Check if file already has Wikidata enrichment."""
|
|
try:
|
|
with open(filepath) as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
# Check for wikidata_enrichment section
|
|
if data.get("wikidata_enrichment"):
|
|
return True
|
|
|
|
# Check for Wikidata identifier
|
|
for ident in data.get("identifiers", []):
|
|
if ident.get("identifier_scheme") == "Wikidata":
|
|
return True
|
|
|
|
except:
|
|
pass
|
|
|
|
return False
|
|
|
|
|
|
def enrich_file(filepath: Path, wikidata_info: dict, fetch_details: bool = True) -> bool:
|
|
"""
|
|
Enrich a custodian file with Wikidata data.
|
|
|
|
Args:
|
|
filepath: Path to YAML file
|
|
wikidata_info: Dict with qid, label, description from SPARQL
|
|
fetch_details: Whether to fetch additional details via REST API
|
|
|
|
Returns:
|
|
True if file was enriched, False otherwise
|
|
"""
|
|
try:
|
|
with open(filepath) as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
logger.error(f"Error reading {filepath}: {e}")
|
|
return False
|
|
|
|
qid = wikidata_info["qid"]
|
|
|
|
# Optionally fetch additional details
|
|
details = None
|
|
if fetch_details:
|
|
time.sleep(REQUEST_DELAY)
|
|
details = fetch_wikidata_details(qid)
|
|
|
|
# Build enrichment data
|
|
enrichment = {
|
|
"wikidata_id": qid,
|
|
"wikidata_url": f"https://www.wikidata.org/wiki/{qid}",
|
|
"matched_by": "sigla_identifier",
|
|
"matched_sigla": extract_sigla_from_file(filepath),
|
|
"wikidata_label": wikidata_info.get("label", ""),
|
|
"wikidata_description": wikidata_info.get("description", ""),
|
|
"enrichment_date": datetime.now(timezone.utc).isoformat(),
|
|
"enrichment_version": "2.1.0"
|
|
}
|
|
|
|
# Add coordinates if available
|
|
if wikidata_info.get("coordinates"):
|
|
coord_str = wikidata_info["coordinates"]
|
|
# Parse "Point(lon lat)" format
|
|
if coord_str.startswith("Point("):
|
|
try:
|
|
coords = coord_str.replace("Point(", "").replace(")", "").split()
|
|
enrichment["wikidata_coordinates"] = {
|
|
"longitude": float(coords[0]),
|
|
"latitude": float(coords[1])
|
|
}
|
|
except:
|
|
pass
|
|
|
|
# Extract additional info from REST API response
|
|
if details:
|
|
statements = details.get("statements", {})
|
|
|
|
# P856 - official website
|
|
if "P856" in statements:
|
|
for stmt in statements["P856"]:
|
|
val = stmt.get("value", {}).get("content")
|
|
if val:
|
|
enrichment["official_website"] = val
|
|
break
|
|
|
|
# P18 - image
|
|
if "P18" in statements:
|
|
for stmt in statements["P18"]:
|
|
val = stmt.get("value", {}).get("content")
|
|
if val:
|
|
enrichment["image"] = f"https://commons.wikimedia.org/wiki/Special:FilePath/{val.replace(' ', '_')}"
|
|
break
|
|
|
|
# P31 - instance of (to get institution type)
|
|
if "P31" in statements:
|
|
instance_types = []
|
|
for stmt in statements["P31"]:
|
|
val = stmt.get("value", {}).get("content")
|
|
if val:
|
|
instance_types.append(val)
|
|
if instance_types:
|
|
enrichment["instance_of"] = instance_types
|
|
|
|
# P571 - inception date
|
|
if "P571" in statements:
|
|
for stmt in statements["P571"]:
|
|
val = stmt.get("value", {}).get("content", {})
|
|
if isinstance(val, dict) and "time" in val:
|
|
enrichment["inception"] = val["time"]
|
|
break
|
|
|
|
# P131 - located in administrative entity
|
|
if "P131" in statements:
|
|
for stmt in statements["P131"]:
|
|
val = stmt.get("value", {}).get("content")
|
|
if val:
|
|
enrichment["located_in"] = val
|
|
break
|
|
|
|
# Update the file
|
|
data["wikidata_enrichment"] = enrichment
|
|
|
|
# Also add Wikidata identifier to identifiers list if not present
|
|
identifiers = data.get("identifiers", [])
|
|
has_wikidata_id = any(i.get("identifier_scheme") == "Wikidata" for i in identifiers)
|
|
if not has_wikidata_id:
|
|
identifiers.append({
|
|
"identifier_scheme": "Wikidata",
|
|
"identifier_value": qid,
|
|
"identifier_url": f"https://www.wikidata.org/wiki/{qid}"
|
|
})
|
|
data["identifiers"] = identifiers
|
|
|
|
# Write back
|
|
try:
|
|
with open(filepath, "w") as f:
|
|
yaml.dump(data, f, allow_unicode=True, sort_keys=False, default_flow_style=False)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error writing {filepath}: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Enrich Czech custodian files using Sigla matching")
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't write changes, just report matches")
|
|
parser.add_argument("--limit", type=int, default=0, help="Limit number of files to process (0=all)")
|
|
parser.add_argument("--no-details", action="store_true", help="Skip fetching detailed entity data")
|
|
args = parser.parse_args()
|
|
|
|
# Find all CZ files
|
|
cz_files = sorted(glob.glob("/Users/kempersc/apps/glam/data/custodian/CZ-*.yaml"))
|
|
logger.info(f"Found {len(cz_files)} Czech custodian files")
|
|
|
|
# Fetch all Sigla mappings
|
|
sigla_mappings = fetch_all_sigla_mappings()
|
|
if not sigla_mappings:
|
|
logger.error("Failed to fetch Sigla mappings, aborting")
|
|
return 1
|
|
|
|
# Track statistics
|
|
stats = {
|
|
"total_files": len(cz_files),
|
|
"files_with_sigla": 0,
|
|
"already_enriched": 0,
|
|
"matches_found": 0,
|
|
"files_enriched": 0,
|
|
"no_match": 0,
|
|
"errors": 0
|
|
}
|
|
|
|
processed = 0
|
|
for filepath in cz_files:
|
|
filepath = Path(filepath)
|
|
|
|
# Check limit
|
|
if args.limit > 0 and processed >= args.limit:
|
|
logger.info(f"Reached limit of {args.limit} files")
|
|
break
|
|
|
|
# Extract Sigla from file
|
|
sigla = extract_sigla_from_file(filepath)
|
|
if not sigla:
|
|
continue
|
|
|
|
stats["files_with_sigla"] += 1
|
|
processed += 1
|
|
|
|
# Check if already enriched
|
|
if is_already_enriched(filepath):
|
|
stats["already_enriched"] += 1
|
|
continue
|
|
|
|
# Look up in Wikidata mappings
|
|
if sigla not in sigla_mappings:
|
|
stats["no_match"] += 1
|
|
if processed % 500 == 0:
|
|
logger.info(f"Processed {processed} files, {stats['matches_found']} matches so far")
|
|
continue
|
|
|
|
wikidata_info = sigla_mappings[sigla]
|
|
stats["matches_found"] += 1
|
|
|
|
logger.info(f"Match: {filepath.name} (Sigla: {sigla}) → {wikidata_info['qid']} ({wikidata_info['label']})")
|
|
|
|
if args.dry_run:
|
|
continue
|
|
|
|
# Enrich the file
|
|
if enrich_file(filepath, wikidata_info, fetch_details=not args.no_details):
|
|
stats["files_enriched"] += 1
|
|
else:
|
|
stats["errors"] += 1
|
|
|
|
# Progress update
|
|
if stats["files_enriched"] % 50 == 0:
|
|
logger.info(f"Progress: {stats['files_enriched']} files enriched")
|
|
|
|
# Final report
|
|
logger.info("=" * 60)
|
|
logger.info("Czech Sigla Enrichment Complete")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Total CZ files: {stats['total_files']}")
|
|
logger.info(f"Files with Sigla: {stats['files_with_sigla']}")
|
|
logger.info(f"Already enriched: {stats['already_enriched']}")
|
|
logger.info(f"Sigla matches found: {stats['matches_found']}")
|
|
logger.info(f"Files enriched: {stats['files_enriched']}")
|
|
logger.info(f"No Wikidata match: {stats['no_match']}")
|
|
logger.info(f"Errors: {stats['errors']}")
|
|
|
|
match_rate = (stats['matches_found'] / stats['files_with_sigla'] * 100) if stats['files_with_sigla'] > 0 else 0
|
|
logger.info(f"Match rate: {match_rate:.1f}%")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|