glam/scripts/sync/oxigraph_sync.py
2025-12-14 17:09:55 +01:00

500 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Oxigraph Sync Module - Sync custodian YAML files to Oxigraph triplestore.
This module syncs all custodian YAML files to Oxigraph as RDF triples for
SPARQL querying and graph-based retrieval.
Usage:
python -m scripts.sync.oxigraph_sync [--dry-run] [--limit N] [--endpoint URL]
"""
import argparse
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import httpx
import yaml
from rdflib import Graph, Namespace, Literal, URIRef, BNode
from rdflib.namespace import RDF, RDFS, XSD, SKOS, DCTERMS, FOAF, OWL
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR
# Configuration
OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://localhost:7878")
BATCH_SIZE = 500
# Namespaces
HC = Namespace("https://w3id.org/heritage/custodian/")
GHCID = Namespace("https://w3id.org/heritage/ghcid/")
NDE = Namespace("https://nde.nl/ontology/hc/class/")
ORG = Namespace("http://www.w3.org/ns/org#")
RICO = Namespace("https://www.ica.org/standards/RiC/ontology#")
SCHEMA = Namespace("http://schema.org/")
CIDOC = Namespace("http://www.cidoc-crm.org/cidoc-crm/")
PROV = Namespace("http://www.w3.org/ns/prov#")
WD = Namespace("http://www.wikidata.org/entity/")
WDT = Namespace("http://www.wikidata.org/prop/direct/")
GEO = Namespace("http://www.w3.org/2003/01/geo/wgs84_pos#")
def sanitize_ghcid_for_uri(ghcid: str) -> str:
"""Sanitize GHCID to be valid as a URI local name."""
sanitized = re.sub(r'[`\'\"<>{}|\\^~\[\]@#$%&*()+=,;!? ]', '_', ghcid)
sanitized = re.sub(r'_+', '_', sanitized)
sanitized = sanitized.strip('_')
return sanitized
class CustodianRDFConverter:
"""Convert custodian YAML data to RDF triples."""
def __init__(self):
self.graph = Graph()
self._bind_namespaces()
def _bind_namespaces(self) -> None:
"""Bind all ontology namespaces to the graph."""
self.graph.bind("hc", HC)
self.graph.bind("ghcid", GHCID)
self.graph.bind("nde", NDE)
self.graph.bind("org", ORG)
self.graph.bind("rico", RICO)
self.graph.bind("schema", SCHEMA)
self.graph.bind("cidoc", CIDOC)
self.graph.bind("prov", PROV)
self.graph.bind("wd", WD)
self.graph.bind("wdt", WDT)
self.graph.bind("geo", GEO)
self.graph.bind("dcterms", DCTERMS)
self.graph.bind("foaf", FOAF)
self.graph.bind("skos", SKOS)
self.graph.bind("owl", OWL)
def reset(self) -> None:
"""Reset the graph for new batch."""
self.graph = Graph()
self._bind_namespaces()
def add_custodian(self, data: dict[str, Any], filepath: Path) -> URIRef | None:
"""Convert custodian YAML data to RDF triples."""
ghcid_data = data.get("ghcid", {})
ghcid = ghcid_data.get("ghcid_current", "")
if not ghcid:
ghcid = filepath.stem
ghcid_uri_safe = sanitize_ghcid_for_uri(ghcid)
custodian_uri = GHCID[ghcid_uri_safe]
original = data.get("original_entry", {})
wikidata = data.get("wikidata_enrichment", {})
custodian_name = data.get("custodian_name", {})
# Type declarations
self.graph.add((custodian_uri, RDF.type, NDE.Custodian))
self.graph.add((custodian_uri, RDF.type, CIDOC.E39_Actor))
self.graph.add((custodian_uri, RDF.type, ORG.Organization))
self.graph.add((custodian_uri, RDF.type, SCHEMA.Organization))
self.graph.add((custodian_uri, RDF.type, PROV.Entity))
# Institution types
inst_types = original.get("type", [])
if isinstance(inst_types, str):
inst_types = [inst_types]
for inst_type in inst_types:
self._add_type_class(custodian_uri, inst_type)
# Name
name = (
custodian_name.get("emic_name") or
custodian_name.get("claim_value") or
wikidata.get("wikidata_label_nl") or
wikidata.get("wikidata_label_en") or
original.get("name") or
original.get("organisatie", "")
)
if name:
self.graph.add((custodian_uri, SKOS.prefLabel, Literal(name, lang="nl")))
self.graph.add((custodian_uri, SCHEMA.name, Literal(name)))
self.graph.add((custodian_uri, FOAF.name, Literal(name)))
self.graph.add((custodian_uri, RDFS.label, Literal(name)))
# Alternative names
labels = wikidata.get("wikidata_labels", {})
for lang, label in labels.items():
if label and label != name:
lang_tag = lang.replace("-", "_").split("_")[0]
self.graph.add((custodian_uri, SKOS.altLabel, Literal(label, lang=lang_tag)))
# Description
description = (
wikidata.get("wikidata_description_en") or
wikidata.get("wikidata_description_nl", "")
)
if description:
self.graph.add((custodian_uri, DCTERMS.description, Literal(description, lang="en")))
self.graph.add((custodian_uri, SCHEMA.description, Literal(description)))
# GHCID identifier
if ghcid:
self.graph.add((custodian_uri, HC.ghcid, Literal(ghcid)))
self.graph.add((custodian_uri, DCTERMS.identifier, Literal(ghcid)))
# GHCID UUID
ghcid_uuid = ghcid_data.get("ghcid_uuid")
if ghcid_uuid:
self.graph.add((custodian_uri, HC.ghcidUUID, Literal(ghcid_uuid)))
# Wikidata sameAs link
wikidata_id = (
wikidata.get("wikidata_entity_id") or
original.get("wikidata_id", "")
)
if wikidata_id:
wd_uri = WD[wikidata_id]
self.graph.add((custodian_uri, OWL.sameAs, wd_uri))
self.graph.add((custodian_uri, SCHEMA.sameAs, wd_uri))
self.graph.add((custodian_uri, HC.wikidataId, Literal(wikidata_id)))
# ISIL code
isil = (
wikidata.get("wikidata_identifiers", {}).get("isil") or
original.get("isil-code_na", "")
)
if isil:
self._add_identifier(custodian_uri, "ISIL", isil)
# Other identifiers
wd_ids = wikidata.get("wikidata_identifiers", {})
for id_type, id_value in wd_ids.items():
if id_type != "isil" and id_value:
self._add_identifier(custodian_uri, id_type.upper(), str(id_value))
# Location
coords = wikidata.get("wikidata_coordinates", {})
if coords.get("latitude") and coords.get("longitude"):
self._add_location(custodian_uri, coords, wikidata)
# Official website
website = wikidata.get("wikidata_official_website")
if website:
websites = website if isinstance(website, list) else [website]
for url in websites:
if url and isinstance(url, str):
try:
self.graph.add((custodian_uri, SCHEMA.url, URIRef(url)))
self.graph.add((custodian_uri, FOAF.homepage, URIRef(url)))
except Exception:
pass
# Located in (administrative)
located_in = wikidata.get("wikidata_located_in", {})
if located_in.get("id"):
loc_uri = WD[located_in["id"]]
self.graph.add((custodian_uri, SCHEMA.containedInPlace, loc_uri))
self.graph.add((custodian_uri, WDT.P131, loc_uri))
# Country
country = wikidata.get("wikidata_country", {})
if country.get("id"):
country_uri = WD[country["id"]]
self.graph.add((custodian_uri, SCHEMA.addressCountry, country_uri))
self.graph.add((custodian_uri, WDT.P17, country_uri))
# Instance of
for instance in wikidata.get("wikidata_instance_of", []):
if instance.get("id"):
inst_uri = WD[instance["id"]]
self.graph.add((custodian_uri, WDT.P31, inst_uri))
# Processing provenance
timestamp = data.get("processing_timestamp")
if timestamp:
self.graph.add((custodian_uri, PROV.generatedAtTime,
Literal(timestamp, datatype=XSD.dateTime)))
return custodian_uri
def _add_type_class(self, uri: URIRef, inst_type: str) -> None:
"""Add institution-type-specific RDF classes."""
type_mappings = {
"M": [(SCHEMA.Museum,), (CIDOC["E74_Group"],)],
"A": [(SCHEMA.ArchiveOrganization,), (RICO.CorporateBody,)],
"L": [(SCHEMA.Library,)],
"G": [(SCHEMA.Museum,)],
"R": [(SCHEMA.ResearchOrganization,)],
"H": [(SCHEMA.PlaceOfWorship,)],
"E": [(SCHEMA.EducationalOrganization,)],
"S": [(ORG.Organization,)],
"O": [(ORG.Organization,)],
}
for classes in type_mappings.get(inst_type, []):
for cls in classes:
self.graph.add((uri, RDF.type, cls))
if inst_type:
self.graph.add((uri, HC.institutionType, Literal(inst_type)))
def _add_identifier(self, uri: URIRef, scheme: str, value: str) -> None:
"""Add an identifier as a blank node."""
id_node = BNode()
self.graph.add((id_node, RDF.type, CIDOC.E42_Identifier))
self.graph.add((id_node, RICO.identifierType, Literal(scheme)))
self.graph.add((id_node, RICO.textualValue, Literal(value)))
self.graph.add((uri, CIDOC.P1_is_identified_by, id_node))
if scheme == "ISIL":
self.graph.add((uri, HC.isil, Literal(value)))
elif scheme == "VIAF":
self.graph.add((uri, HC.viaf, Literal(value)))
elif scheme == "GND":
self.graph.add((uri, HC.gnd, Literal(value)))
def _add_location(self, uri: URIRef, coords: dict, wikidata: dict) -> None:
"""Add location with coordinates."""
loc_node = BNode()
self.graph.add((loc_node, RDF.type, SCHEMA.Place))
self.graph.add((loc_node, RDF.type, GEO.SpatialThing))
lat = coords.get("latitude")
lon = coords.get("longitude")
if lat and lon:
self.graph.add((loc_node, GEO.lat, Literal(lat, datatype=XSD.decimal)))
self.graph.add((loc_node, GEO.long, Literal(lon, datatype=XSD.decimal)))
self.graph.add((loc_node, SCHEMA.latitude, Literal(lat, datatype=XSD.decimal)))
self.graph.add((loc_node, SCHEMA.longitude, Literal(lon, datatype=XSD.decimal)))
located_in = wikidata.get("wikidata_located_in", {})
city_label = located_in.get("label_en") or located_in.get("label_nl")
if city_label:
self.graph.add((loc_node, SCHEMA.addressLocality, Literal(city_label)))
self.graph.add((uri, SCHEMA.location, loc_node))
def serialize(self, format: str = "turtle") -> bytes:
"""Serialize graph to bytes."""
return self.graph.serialize(format=format).encode("utf-8")
def triple_count(self) -> int:
"""Get number of triples in graph."""
return len(self.graph)
class OxigraphSyncer(BaseSyncer):
"""Sync custodian YAML files to Oxigraph triplestore."""
database_name = "oxigraph"
def __init__(self, endpoint: str = OXIGRAPH_URL, batch_size: int = BATCH_SIZE, **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint.rstrip("/")
self.batch_size = batch_size
self.client = httpx.Client(timeout=120.0)
self.converter = CustodianRDFConverter()
def check_connection(self) -> bool:
"""Check if Oxigraph is available."""
try:
resp = self.client.post(
f"{self.endpoint}/query",
headers={"Content-Type": "application/sparql-query"},
content="ASK { ?s ?p ?o }"
)
return resp.status_code in (200, 204)
except Exception as e:
self.logger.error(f"Oxigraph connection failed: {e}")
return False
def get_status(self) -> dict:
"""Get Oxigraph status."""
try:
resp = self.client.post(
f"{self.endpoint}/query",
headers={
"Content-Type": "application/sparql-query",
"Accept": "application/sparql-results+json"
},
content="SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }"
)
if resp.status_code == 200:
data = resp.json()
count = int(data["results"]["bindings"][0]["count"]["value"])
return {"status": "healthy", "triple_count": count}
except Exception as e:
return {"status": "unavailable", "error": str(e)}
return {"status": "unknown"}
def clear_store(self) -> bool:
"""Clear all data from the store."""
try:
resp = self.client.post(
f"{self.endpoint}/update",
headers={"Content-Type": "application/sparql-update"},
content="CLEAR DEFAULT"
)
return resp.status_code in (200, 204)
except Exception as e:
self.logger.error(f"Failed to clear store: {e}")
return False
def _upload_turtle(self, ttl_data: bytes) -> bool:
"""Upload Turtle data to Oxigraph."""
try:
resp = self.client.post(
f"{self.endpoint}/store?default",
headers={"Content-Type": "text/turtle"},
content=ttl_data
)
return resp.status_code in (200, 201, 204)
except Exception as e:
self.logger.error(f"Failed to upload data: {e}")
return False
def sync(self, limit: Optional[int] = None, dry_run: bool = False, clear: bool = True) -> SyncResult:
"""Sync all YAML files to Oxigraph."""
result = SyncResult(
database="oxigraph",
status=SyncStatus.IN_PROGRESS,
start_time=datetime.now(timezone.utc),
)
# Check connection
if not dry_run and not self.check_connection():
result.status = SyncStatus.FAILED
result.error_message = f"Cannot connect to Oxigraph at {self.endpoint}"
result.end_time = datetime.now(timezone.utc)
return result
# Load YAML files
yaml_files = self._list_yaml_files()
if limit:
yaml_files = yaml_files[:limit]
self.progress.total_files = len(yaml_files)
self.progress.current_database = "oxigraph"
self.logger.info(f"Processing {len(yaml_files)} YAML files...")
# Clear existing data
if not dry_run and clear:
self.logger.info("Clearing existing data...")
self.clear_store()
# Process in batches
total_triples = 0
processed = 0
failed = 0
for i in range(0, len(yaml_files), self.batch_size):
batch = yaml_files[i:i + self.batch_size]
batch_num = i // self.batch_size + 1
self.converter.reset()
for filepath in batch:
self.progress.processed_files = processed + 1
self.progress.current_file = filepath.name
self._report_progress()
try:
with open(filepath, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data:
self.converter.add_custodian(data, filepath)
processed += 1
except Exception as e:
self.logger.warning(f"Error processing {filepath.name}: {e}")
failed += 1
self.progress.errors.append(f"{filepath.name}: {str(e)}")
batch_triples = self.converter.triple_count()
total_triples += batch_triples
if not dry_run:
try:
ttl_data = self.converter.serialize("turtle")
if self._upload_turtle(ttl_data):
self.logger.info(f"Batch {batch_num}: loaded {len(batch)} files, {batch_triples:,} triples")
else:
self.logger.error(f"Batch {batch_num}: failed to upload")
failed += len(batch)
except Exception as e:
self.logger.error(f"Batch {batch_num}: serialization failed: {e}")
failed += len(batch)
else:
self.logger.info(f"Batch {batch_num}: parsed {len(batch)} files, {batch_triples:,} triples (dry-run)")
result.records_processed = len(yaml_files)
result.records_succeeded = processed
result.records_failed = failed
result.details["total_triples"] = total_triples
result.details["batch_size"] = self.batch_size
result.details["dry_run"] = dry_run
if failed == 0:
result.status = SyncStatus.SUCCESS
elif processed > 0:
result.status = SyncStatus.PARTIAL
else:
result.status = SyncStatus.FAILED
result.end_time = datetime.now(timezone.utc)
return result
def main():
parser = argparse.ArgumentParser(description="Sync custodian YAML files to Oxigraph")
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload")
parser.add_argument("--limit", type=int, help="Limit number of files to process")
parser.add_argument("--endpoint", default=OXIGRAPH_URL, help="Oxigraph SPARQL endpoint URL")
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Files per batch")
parser.add_argument("--no-clear", action="store_true", help="Don't clear existing data")
args = parser.parse_args()
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
syncer = OxigraphSyncer(endpoint=args.endpoint, batch_size=args.batch_size)
print("=" * 60)
print("Oxigraph Sync")
print("=" * 60)
if not args.dry_run:
print(f"Checking connection to {args.endpoint}...")
status = syncer.get_status()
print(f" Status: {status.get('status', 'unknown')}")
if status.get('triple_count'):
print(f" Triple count: {status['triple_count']:,}")
result = syncer.sync(limit=args.limit, dry_run=args.dry_run, clear=not args.no_clear)
print("\n" + "=" * 60)
print(f"Sync Result: {result.status.value.upper()}")
print(f" Processed: {result.records_processed}")
print(f" Succeeded: {result.records_succeeded}")
print(f" Failed: {result.records_failed}")
print(f" Triples: {result.details.get('total_triples', 0):,}")
print(f" Duration: {result.duration_seconds:.2f}s")
if result.error_message:
print(f" Error: {result.error_message}")
print("=" * 60)
if __name__ == "__main__":
main()