#!/usr/bin/env python3 """ Oxigraph Sync Module - Sync custodian YAML files to Oxigraph triplestore. This module syncs all custodian YAML files to Oxigraph as RDF triples for SPARQL querying and graph-based retrieval. Usage: python -m scripts.sync.oxigraph_sync [--dry-run] [--limit N] [--endpoint URL] """ import argparse import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import httpx import yaml from rdflib import Graph, Namespace, Literal, URIRef, BNode from rdflib.namespace import RDF, RDFS, XSD, SKOS, DCTERMS, FOAF, OWL # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR # Configuration OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://localhost:7878") BATCH_SIZE = 500 # Namespaces HC = Namespace("https://w3id.org/heritage/custodian/") GHCID = Namespace("https://w3id.org/heritage/ghcid/") NDE = Namespace("https://nde.nl/ontology/hc/class/") ORG = Namespace("http://www.w3.org/ns/org#") RICO = Namespace("https://www.ica.org/standards/RiC/ontology#") SCHEMA = Namespace("http://schema.org/") CIDOC = Namespace("http://www.cidoc-crm.org/cidoc-crm/") PROV = Namespace("http://www.w3.org/ns/prov#") WD = Namespace("http://www.wikidata.org/entity/") WDT = Namespace("http://www.wikidata.org/prop/direct/") GEO = Namespace("http://www.w3.org/2003/01/geo/wgs84_pos#") def sanitize_ghcid_for_uri(ghcid: str) -> str: """Sanitize GHCID to be valid as a URI local name.""" sanitized = re.sub(r'[`\'\"<>{}|\\^~\[\]@#$%&*()+=,;!? ]', '_', ghcid) sanitized = re.sub(r'_+', '_', sanitized) sanitized = sanitized.strip('_') return sanitized class CustodianRDFConverter: """Convert custodian YAML data to RDF triples.""" def __init__(self): self.graph = Graph() self._bind_namespaces() def _bind_namespaces(self) -> None: """Bind all ontology namespaces to the graph.""" self.graph.bind("hc", HC) self.graph.bind("ghcid", GHCID) self.graph.bind("nde", NDE) self.graph.bind("org", ORG) self.graph.bind("rico", RICO) self.graph.bind("schema", SCHEMA) self.graph.bind("cidoc", CIDOC) self.graph.bind("prov", PROV) self.graph.bind("wd", WD) self.graph.bind("wdt", WDT) self.graph.bind("geo", GEO) self.graph.bind("dcterms", DCTERMS) self.graph.bind("foaf", FOAF) self.graph.bind("skos", SKOS) self.graph.bind("owl", OWL) def reset(self) -> None: """Reset the graph for new batch.""" self.graph = Graph() self._bind_namespaces() def add_custodian(self, data: dict[str, Any], filepath: Path) -> URIRef | None: """Convert custodian YAML data to RDF triples.""" ghcid_data = data.get("ghcid", {}) ghcid = ghcid_data.get("ghcid_current", "") if not ghcid: ghcid = filepath.stem ghcid_uri_safe = sanitize_ghcid_for_uri(ghcid) custodian_uri = GHCID[ghcid_uri_safe] original = data.get("original_entry", {}) wikidata = data.get("wikidata_enrichment", {}) custodian_name = data.get("custodian_name", {}) # Type declarations self.graph.add((custodian_uri, RDF.type, NDE.Custodian)) self.graph.add((custodian_uri, RDF.type, CIDOC.E39_Actor)) self.graph.add((custodian_uri, RDF.type, ORG.Organization)) self.graph.add((custodian_uri, RDF.type, SCHEMA.Organization)) self.graph.add((custodian_uri, RDF.type, PROV.Entity)) # Institution types inst_types = original.get("type", []) if isinstance(inst_types, str): inst_types = [inst_types] for inst_type in inst_types: self._add_type_class(custodian_uri, inst_type) # Name name = ( custodian_name.get("emic_name") or custodian_name.get("claim_value") or wikidata.get("wikidata_label_nl") or wikidata.get("wikidata_label_en") or original.get("name") or original.get("organisatie", "") ) if name: self.graph.add((custodian_uri, SKOS.prefLabel, Literal(name, lang="nl"))) self.graph.add((custodian_uri, SCHEMA.name, Literal(name))) self.graph.add((custodian_uri, FOAF.name, Literal(name))) self.graph.add((custodian_uri, RDFS.label, Literal(name))) # Alternative names labels = wikidata.get("wikidata_labels", {}) for lang, label in labels.items(): if label and label != name: lang_tag = lang.replace("-", "_").split("_")[0] self.graph.add((custodian_uri, SKOS.altLabel, Literal(label, lang=lang_tag))) # Description description = ( wikidata.get("wikidata_description_en") or wikidata.get("wikidata_description_nl", "") ) if description: self.graph.add((custodian_uri, DCTERMS.description, Literal(description, lang="en"))) self.graph.add((custodian_uri, SCHEMA.description, Literal(description))) # GHCID identifier if ghcid: self.graph.add((custodian_uri, HC.ghcid, Literal(ghcid))) self.graph.add((custodian_uri, DCTERMS.identifier, Literal(ghcid))) # GHCID UUID ghcid_uuid = ghcid_data.get("ghcid_uuid") if ghcid_uuid: self.graph.add((custodian_uri, HC.ghcidUUID, Literal(ghcid_uuid))) # Wikidata sameAs link wikidata_id = ( wikidata.get("wikidata_entity_id") or original.get("wikidata_id", "") ) if wikidata_id: wd_uri = WD[wikidata_id] self.graph.add((custodian_uri, OWL.sameAs, wd_uri)) self.graph.add((custodian_uri, SCHEMA.sameAs, wd_uri)) self.graph.add((custodian_uri, HC.wikidataId, Literal(wikidata_id))) # ISIL code isil = ( wikidata.get("wikidata_identifiers", {}).get("isil") or original.get("isil-code_na", "") ) if isil: self._add_identifier(custodian_uri, "ISIL", isil) # Other identifiers wd_ids = wikidata.get("wikidata_identifiers", {}) for id_type, id_value in wd_ids.items(): if id_type != "isil" and id_value: self._add_identifier(custodian_uri, id_type.upper(), str(id_value)) # Location coords = wikidata.get("wikidata_coordinates", {}) if coords.get("latitude") and coords.get("longitude"): self._add_location(custodian_uri, coords, wikidata) # Official website website = wikidata.get("wikidata_official_website") if website: websites = website if isinstance(website, list) else [website] for url in websites: if url and isinstance(url, str): try: self.graph.add((custodian_uri, SCHEMA.url, URIRef(url))) self.graph.add((custodian_uri, FOAF.homepage, URIRef(url))) except Exception: pass # Located in (administrative) located_in = wikidata.get("wikidata_located_in", {}) if located_in.get("id"): loc_uri = WD[located_in["id"]] self.graph.add((custodian_uri, SCHEMA.containedInPlace, loc_uri)) self.graph.add((custodian_uri, WDT.P131, loc_uri)) # Country country = wikidata.get("wikidata_country", {}) if country.get("id"): country_uri = WD[country["id"]] self.graph.add((custodian_uri, SCHEMA.addressCountry, country_uri)) self.graph.add((custodian_uri, WDT.P17, country_uri)) # Instance of for instance in wikidata.get("wikidata_instance_of", []): if instance.get("id"): inst_uri = WD[instance["id"]] self.graph.add((custodian_uri, WDT.P31, inst_uri)) # Processing provenance timestamp = data.get("processing_timestamp") if timestamp: self.graph.add((custodian_uri, PROV.generatedAtTime, Literal(timestamp, datatype=XSD.dateTime))) return custodian_uri def _add_type_class(self, uri: URIRef, inst_type: str) -> None: """Add institution-type-specific RDF classes.""" type_mappings = { "M": [(SCHEMA.Museum,), (CIDOC["E74_Group"],)], "A": [(SCHEMA.ArchiveOrganization,), (RICO.CorporateBody,)], "L": [(SCHEMA.Library,)], "G": [(SCHEMA.Museum,)], "R": [(SCHEMA.ResearchOrganization,)], "H": [(SCHEMA.PlaceOfWorship,)], "E": [(SCHEMA.EducationalOrganization,)], "S": [(ORG.Organization,)], "O": [(ORG.Organization,)], } for classes in type_mappings.get(inst_type, []): for cls in classes: self.graph.add((uri, RDF.type, cls)) if inst_type: self.graph.add((uri, HC.institutionType, Literal(inst_type))) def _add_identifier(self, uri: URIRef, scheme: str, value: str) -> None: """Add an identifier as a blank node.""" id_node = BNode() self.graph.add((id_node, RDF.type, CIDOC.E42_Identifier)) self.graph.add((id_node, RICO.identifierType, Literal(scheme))) self.graph.add((id_node, RICO.textualValue, Literal(value))) self.graph.add((uri, CIDOC.P1_is_identified_by, id_node)) if scheme == "ISIL": self.graph.add((uri, HC.isil, Literal(value))) elif scheme == "VIAF": self.graph.add((uri, HC.viaf, Literal(value))) elif scheme == "GND": self.graph.add((uri, HC.gnd, Literal(value))) def _add_location(self, uri: URIRef, coords: dict, wikidata: dict) -> None: """Add location with coordinates.""" loc_node = BNode() self.graph.add((loc_node, RDF.type, SCHEMA.Place)) self.graph.add((loc_node, RDF.type, GEO.SpatialThing)) lat = coords.get("latitude") lon = coords.get("longitude") if lat and lon: self.graph.add((loc_node, GEO.lat, Literal(lat, datatype=XSD.decimal))) self.graph.add((loc_node, GEO.long, Literal(lon, datatype=XSD.decimal))) self.graph.add((loc_node, SCHEMA.latitude, Literal(lat, datatype=XSD.decimal))) self.graph.add((loc_node, SCHEMA.longitude, Literal(lon, datatype=XSD.decimal))) located_in = wikidata.get("wikidata_located_in", {}) city_label = located_in.get("label_en") or located_in.get("label_nl") if city_label: self.graph.add((loc_node, SCHEMA.addressLocality, Literal(city_label))) self.graph.add((uri, SCHEMA.location, loc_node)) def serialize(self, format: str = "turtle") -> bytes: """Serialize graph to bytes.""" return self.graph.serialize(format=format).encode("utf-8") def triple_count(self) -> int: """Get number of triples in graph.""" return len(self.graph) class OxigraphSyncer(BaseSyncer): """Sync custodian YAML files to Oxigraph triplestore.""" database_name = "oxigraph" def __init__(self, endpoint: str = OXIGRAPH_URL, batch_size: int = BATCH_SIZE, **kwargs): super().__init__(**kwargs) self.endpoint = endpoint.rstrip("/") self.batch_size = batch_size self.client = httpx.Client(timeout=120.0) self.converter = CustodianRDFConverter() def check_connection(self) -> bool: """Check if Oxigraph is available.""" try: resp = self.client.post( f"{self.endpoint}/query", headers={"Content-Type": "application/sparql-query"}, content="ASK { ?s ?p ?o }" ) return resp.status_code in (200, 204) except Exception as e: self.logger.error(f"Oxigraph connection failed: {e}") return False def get_status(self) -> dict: """Get Oxigraph status.""" try: resp = self.client.post( f"{self.endpoint}/query", headers={ "Content-Type": "application/sparql-query", "Accept": "application/sparql-results+json" }, content="SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }" ) if resp.status_code == 200: data = resp.json() count = int(data["results"]["bindings"][0]["count"]["value"]) return {"status": "healthy", "triple_count": count} except Exception as e: return {"status": "unavailable", "error": str(e)} return {"status": "unknown"} def clear_store(self) -> bool: """Clear all data from the store.""" try: resp = self.client.post( f"{self.endpoint}/update", headers={"Content-Type": "application/sparql-update"}, content="CLEAR DEFAULT" ) return resp.status_code in (200, 204) except Exception as e: self.logger.error(f"Failed to clear store: {e}") return False def _upload_turtle(self, ttl_data: bytes) -> bool: """Upload Turtle data to Oxigraph.""" try: resp = self.client.post( f"{self.endpoint}/store?default", headers={"Content-Type": "text/turtle"}, content=ttl_data ) return resp.status_code in (200, 201, 204) except Exception as e: self.logger.error(f"Failed to upload data: {e}") return False def sync(self, limit: Optional[int] = None, dry_run: bool = False, clear: bool = True) -> SyncResult: """Sync all YAML files to Oxigraph.""" result = SyncResult( database="oxigraph", status=SyncStatus.IN_PROGRESS, start_time=datetime.now(timezone.utc), ) # Check connection if not dry_run and not self.check_connection(): result.status = SyncStatus.FAILED result.error_message = f"Cannot connect to Oxigraph at {self.endpoint}" result.end_time = datetime.now(timezone.utc) return result # Load YAML files yaml_files = self._list_yaml_files() if limit: yaml_files = yaml_files[:limit] self.progress.total_files = len(yaml_files) self.progress.current_database = "oxigraph" self.logger.info(f"Processing {len(yaml_files)} YAML files...") # Clear existing data if not dry_run and clear: self.logger.info("Clearing existing data...") self.clear_store() # Process in batches total_triples = 0 processed = 0 failed = 0 for i in range(0, len(yaml_files), self.batch_size): batch = yaml_files[i:i + self.batch_size] batch_num = i // self.batch_size + 1 self.converter.reset() for filepath in batch: self.progress.processed_files = processed + 1 self.progress.current_file = filepath.name self._report_progress() try: with open(filepath, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data: self.converter.add_custodian(data, filepath) processed += 1 except Exception as e: self.logger.warning(f"Error processing {filepath.name}: {e}") failed += 1 self.progress.errors.append(f"{filepath.name}: {str(e)}") batch_triples = self.converter.triple_count() total_triples += batch_triples if not dry_run: try: ttl_data = self.converter.serialize("turtle") if self._upload_turtle(ttl_data): self.logger.info(f"Batch {batch_num}: loaded {len(batch)} files, {batch_triples:,} triples") else: self.logger.error(f"Batch {batch_num}: failed to upload") failed += len(batch) except Exception as e: self.logger.error(f"Batch {batch_num}: serialization failed: {e}") failed += len(batch) else: self.logger.info(f"Batch {batch_num}: parsed {len(batch)} files, {batch_triples:,} triples (dry-run)") result.records_processed = len(yaml_files) result.records_succeeded = processed result.records_failed = failed result.details["total_triples"] = total_triples result.details["batch_size"] = self.batch_size result.details["dry_run"] = dry_run if failed == 0: result.status = SyncStatus.SUCCESS elif processed > 0: result.status = SyncStatus.PARTIAL else: result.status = SyncStatus.FAILED result.end_time = datetime.now(timezone.utc) return result def main(): parser = argparse.ArgumentParser(description="Sync custodian YAML files to Oxigraph") parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument("--endpoint", default=OXIGRAPH_URL, help="Oxigraph SPARQL endpoint URL") parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Files per batch") parser.add_argument("--no-clear", action="store_true", help="Don't clear existing data") args = parser.parse_args() import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) syncer = OxigraphSyncer(endpoint=args.endpoint, batch_size=args.batch_size) print("=" * 60) print("Oxigraph Sync") print("=" * 60) if not args.dry_run: print(f"Checking connection to {args.endpoint}...") status = syncer.get_status() print(f" Status: {status.get('status', 'unknown')}") if status.get('triple_count'): print(f" Triple count: {status['triple_count']:,}") result = syncer.sync(limit=args.limit, dry_run=args.dry_run, clear=not args.no_clear) print("\n" + "=" * 60) print(f"Sync Result: {result.status.value.upper()}") print(f" Processed: {result.records_processed}") print(f" Succeeded: {result.records_succeeded}") print(f" Failed: {result.records_failed}") print(f" Triples: {result.details.get('total_triples', 0):,}") print(f" Duration: {result.duration_seconds:.2f}s") if result.error_message: print(f" Error: {result.error_message}") print("=" * 60) if __name__ == "__main__": main()