#!/usr/bin/env python3 """ Oxigraph Sync Module - Sync custodian YAML files to Oxigraph triplestore. This module syncs all custodian YAML files to Oxigraph as RDF triples for SPARQL querying and graph-based retrieval. Usage: python -m scripts.sync.oxigraph_sync [--dry-run] [--limit N] [--endpoint URL] """ import argparse import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import httpx import yaml from rdflib import Graph, Namespace, Literal, URIRef, BNode from rdflib.namespace import RDF, RDFS, XSD, SKOS, DCTERMS, FOAF, OWL # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR # Configuration OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://localhost:7878") BATCH_SIZE = 500 # Namespaces - Using nde.nl as canonical namespace (per LinkML schema) HC = Namespace("https://nde.nl/ontology/hc/") GHCID = Namespace("https://nde.nl/ontology/hc/") NDE = Namespace("https://nde.nl/ontology/hc/class/") ORG = Namespace("http://www.w3.org/ns/org#") RICO = Namespace("https://www.ica.org/standards/RiC/ontology#") SCHEMA = Namespace("http://schema.org/") CIDOC = Namespace("http://www.cidoc-crm.org/cidoc-crm/") PROV = Namespace("http://www.w3.org/ns/prov#") WD = Namespace("http://www.wikidata.org/entity/") WDT = Namespace("http://www.wikidata.org/prop/direct/") GEO = Namespace("http://www.w3.org/2003/01/geo/wgs84_pos#") def sanitize_ghcid_for_uri(ghcid: str) -> str: """Sanitize GHCID to be valid as a URI local name.""" sanitized = re.sub(r'[`\'\"<>{}|\\^~\[\]@#$%&*()+=,;!? ]', '_', ghcid) sanitized = re.sub(r'_+', '_', sanitized) sanitized = sanitized.strip('_') return sanitized class CustodianRDFConverter: """Convert custodian YAML data to RDF triples.""" def __init__(self): self.graph = Graph() self._bind_namespaces() def _bind_namespaces(self) -> None: """Bind all ontology namespaces to the graph.""" self.graph.bind("hc", HC) self.graph.bind("ghcid", GHCID) self.graph.bind("nde", NDE) self.graph.bind("org", ORG) self.graph.bind("rico", RICO) self.graph.bind("schema", SCHEMA) self.graph.bind("cidoc", CIDOC) self.graph.bind("prov", PROV) self.graph.bind("wd", WD) self.graph.bind("wdt", WDT) self.graph.bind("geo", GEO) self.graph.bind("dcterms", DCTERMS) self.graph.bind("foaf", FOAF) self.graph.bind("skos", SKOS) self.graph.bind("owl", OWL) def reset(self) -> None: """Reset the graph for new batch.""" self.graph = Graph() self._bind_namespaces() def add_custodian(self, data: dict[str, Any], filepath: Path) -> URIRef | None: """Convert custodian YAML data to RDF triples.""" ghcid_data = data.get("ghcid", {}) ghcid = ghcid_data.get("ghcid_current", "") if not ghcid: ghcid = filepath.stem ghcid_uri_safe = sanitize_ghcid_for_uri(ghcid) custodian_uri = GHCID[ghcid_uri_safe] original = data.get("original_entry", {}) wikidata = data.get("wikidata_enrichment", {}) custodian_name = data.get("custodian_name", {}) # Type declarations self.graph.add((custodian_uri, RDF.type, NDE.Custodian)) self.graph.add((custodian_uri, RDF.type, CIDOC.E39_Actor)) self.graph.add((custodian_uri, RDF.type, ORG.Organization)) self.graph.add((custodian_uri, RDF.type, SCHEMA.Organization)) self.graph.add((custodian_uri, RDF.type, PROV.Entity)) # Institution types - check multiple sources inst_types: list[str] = original.get("type", []) if not inst_types: # Try institution_type field (common in CH-Annotator files) inst_type_single = original.get("institution_type", "") if inst_type_single: # Convert full name to single-letter code type_code_map = { "GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", "MUSEUM": "M", "OFFICIAL_INSTITUTION": "O", "RESEARCH_CENTER": "R", "CORPORATION": "C", "UNKNOWN": "U", "BOTANICAL_ZOO": "B", "EDUCATION_PROVIDER": "E", "COLLECTING_SOCIETY": "S", "FEATURES": "F", "INTANGIBLE_HERITAGE_GROUP": "I", "MIXED": "X", "PERSONAL_COLLECTION": "P", "HOLY_SITES": "H", "DIGITAL_PLATFORM": "D", "NGO": "N", "TASTE_SMELL": "T" } inst_types = [type_code_map.get(str(inst_type_single).upper(), str(inst_type_single))] if not inst_types: # Extract from GHCID (4th component is type code) # GHCID format: {country}-{region}-{city}-{type}-{abbrev} if ghcid and len(ghcid.split("-")) >= 4: ghcid_type = ghcid.split("-")[3] if ghcid_type and len(ghcid_type) == 1 and ghcid_type.upper() in "GLAMORCUBESFIXPHDNT": inst_types = [ghcid_type.upper()] if isinstance(inst_types, str): inst_types = [inst_types] for inst_type in inst_types: if inst_type: self._add_type_class(custodian_uri, str(inst_type)) # Name name = ( custodian_name.get("emic_name") or custodian_name.get("claim_value") or wikidata.get("wikidata_label_nl") or wikidata.get("wikidata_label_en") or original.get("name") or original.get("organisatie", "") ) if name: self.graph.add((custodian_uri, SKOS.prefLabel, Literal(name, lang="nl"))) self.graph.add((custodian_uri, SCHEMA.name, Literal(name))) self.graph.add((custodian_uri, FOAF.name, Literal(name))) self.graph.add((custodian_uri, RDFS.label, Literal(name))) # Alternative names labels = wikidata.get("wikidata_labels", {}) for lang, label in labels.items(): if label and label != name: lang_tag = lang.replace("-", "_").split("_")[0] self.graph.add((custodian_uri, SKOS.altLabel, Literal(label, lang=lang_tag))) # Description description = ( wikidata.get("wikidata_description_en") or wikidata.get("wikidata_description_nl", "") ) if description: self.graph.add((custodian_uri, DCTERMS.description, Literal(description, lang="en"))) self.graph.add((custodian_uri, SCHEMA.description, Literal(description))) # GHCID identifier if ghcid: self.graph.add((custodian_uri, HC.ghcid, Literal(ghcid))) self.graph.add((custodian_uri, DCTERMS.identifier, Literal(ghcid))) # GHCID UUID ghcid_uuid = ghcid_data.get("ghcid_uuid") if ghcid_uuid: self.graph.add((custodian_uri, HC.ghcidUUID, Literal(ghcid_uuid))) # Wikidata sameAs link wikidata_id = ( wikidata.get("wikidata_entity_id") or original.get("wikidata_id", "") ) if wikidata_id: wd_uri = WD[wikidata_id] self.graph.add((custodian_uri, OWL.sameAs, wd_uri)) self.graph.add((custodian_uri, SCHEMA.sameAs, wd_uri)) self.graph.add((custodian_uri, HC.wikidataId, Literal(wikidata_id))) # ISIL code isil = ( wikidata.get("wikidata_identifiers", {}).get("isil") or original.get("isil-code_na", "") ) if isil: self._add_identifier(custodian_uri, "ISIL", isil) # Other identifiers wd_ids = wikidata.get("wikidata_identifiers", {}) for id_type, id_value in wd_ids.items(): if id_type != "isil" and id_value: self._add_identifier(custodian_uri, id_type.upper(), str(id_value)) # Location - includes settlement from GHCID location_resolution coords = wikidata.get("wikidata_coordinates", {}) location_resolution = ghcid_data.get("location_resolution", {}) if coords.get("latitude") and coords.get("longitude"): self._add_location(custodian_uri, coords, wikidata, location_resolution) elif location_resolution: # Add settlement data even without coordinates self._add_settlement_only(custodian_uri, location_resolution) # ISO country and subregion codes from GHCID self._add_geospatial_codes(custodian_uri, ghcid, location_resolution) # Official website website = wikidata.get("wikidata_official_website") if website: websites = website if isinstance(website, list) else [website] for url in websites: if url and isinstance(url, str): try: self.graph.add((custodian_uri, SCHEMA.url, URIRef(url))) self.graph.add((custodian_uri, FOAF.homepage, URIRef(url))) except Exception: pass # Located in (administrative) located_in = wikidata.get("wikidata_located_in", {}) if located_in.get("id"): loc_uri = WD[located_in["id"]] self.graph.add((custodian_uri, SCHEMA.containedInPlace, loc_uri)) self.graph.add((custodian_uri, WDT.P131, loc_uri)) # Country country = wikidata.get("wikidata_country", {}) if country.get("id"): country_uri = WD[country["id"]] self.graph.add((custodian_uri, SCHEMA.addressCountry, country_uri)) self.graph.add((custodian_uri, WDT.P17, country_uri)) # Instance of for instance in wikidata.get("wikidata_instance_of", []): if instance.get("id"): inst_uri = WD[instance["id"]] self.graph.add((custodian_uri, WDT.P31, inst_uri)) # Temporal data - Founding date # Try multiple sources: timespan.begin_of_the_begin (preferred), wikidata_inception founding_date = None founding_year = None # Source 1: TimeSpan begin_of_the_begin (preferred - has proper dates) timespan = data.get("timespan", {}) if timespan: begin = timespan.get("begin_of_the_begin", "") if begin and len(begin) >= 10: # Format: "1638-01-01T00:00:00Z" -> "1638-01-01" founding_date = begin[:10] # Source 2: Wikidata inception (P571) - fallback, may have 00 for month/day if not founding_date: wikidata_inception = wikidata.get("wikidata_inception", "") if not wikidata_inception: wikidata_inception = wikidata.get("wikidata_founded", "") if wikidata_inception: # Wikidata format: "+1800-01-01T00:00:00Z" or "1638-00-00" if wikidata_inception.startswith("+"): raw_date = wikidata_inception[1:11] # Extract YYYY-MM-DD elif wikidata_inception.startswith("-"): raw_date = wikidata_inception[:11] # Keep - for BCE dates else: raw_date = wikidata_inception[:10] # Fix year-only dates: "1638-00-00" -> "1638-01-01" if len(raw_date) >= 10: if raw_date[5:7] == "00": raw_date = raw_date[:5] + "01" + raw_date[7:] if raw_date[8:10] == "00": raw_date = raw_date[:8] + "01" founding_date = raw_date # Extract year for simpler queries if founding_date and len(founding_date) >= 4: try: founding_year = int(founding_date[:4].lstrip('-+')) except ValueError: pass # Add founding date triple if founding_date: try: self.graph.add((custodian_uri, SCHEMA.foundingDate, Literal(founding_date, datatype=XSD.date))) # Also add as Wikidata inception property for compatibility self.graph.add((custodian_uri, WDT.P571, Literal(founding_date, datatype=XSD.date))) except Exception: # Skip if date is still malformed pass # Founding year (for easier range queries) if founding_year: self.graph.add((custodian_uri, HC.foundingYear, Literal(founding_year, datatype=XSD.integer))) # Processing provenance timestamp = data.get("processing_timestamp") if timestamp: self.graph.add((custodian_uri, PROV.generatedAtTime, Literal(timestamp, datatype=XSD.dateTime))) return custodian_uri def _add_type_class(self, uri: URIRef, inst_type: str) -> None: """Add institution-type-specific RDF classes.""" type_mappings = { "M": [(SCHEMA.Museum,), (CIDOC["E74_Group"],)], "A": [(SCHEMA.ArchiveOrganization,), (RICO.CorporateBody,)], "L": [(SCHEMA.Library,)], "G": [(SCHEMA.Museum,)], "R": [(SCHEMA.ResearchOrganization,)], "H": [(SCHEMA.PlaceOfWorship,)], "E": [(SCHEMA.EducationalOrganization,)], "S": [(ORG.Organization,)], "O": [(ORG.Organization,)], } for classes in type_mappings.get(inst_type, []): for cls in classes: self.graph.add((uri, RDF.type, cls)) if inst_type: self.graph.add((uri, HC.institutionType, Literal(inst_type))) def _add_identifier(self, uri: URIRef, scheme: str, value: str) -> None: """Add an identifier as a blank node.""" id_node = BNode() self.graph.add((id_node, RDF.type, CIDOC.E42_Identifier)) self.graph.add((id_node, RICO.identifierType, Literal(scheme))) self.graph.add((id_node, RICO.textualValue, Literal(value))) self.graph.add((uri, CIDOC.P1_is_identified_by, id_node)) if scheme == "ISIL": self.graph.add((uri, HC.isil, Literal(value))) elif scheme == "VIAF": self.graph.add((uri, HC.viaf, Literal(value))) elif scheme == "GND": self.graph.add((uri, HC.gnd, Literal(value))) def _add_location(self, uri: URIRef, coords: dict, wikidata: dict, location_resolution: Optional[dict] = None) -> None: """Add location with coordinates and settlement data. Args: uri: The custodian URI coords: Coordinate dict with latitude/longitude wikidata: Wikidata enrichment data location_resolution: GHCID location resolution with GeoNames data """ loc_node = BNode() self.graph.add((loc_node, RDF.type, SCHEMA.Place)) self.graph.add((loc_node, RDF.type, GEO.SpatialThing)) lat = coords.get("latitude") lon = coords.get("longitude") if lat and lon: self.graph.add((loc_node, GEO.lat, Literal(lat, datatype=XSD.decimal))) self.graph.add((loc_node, GEO.long, Literal(lon, datatype=XSD.decimal))) self.graph.add((loc_node, SCHEMA.latitude, Literal(lat, datatype=XSD.decimal))) self.graph.add((loc_node, SCHEMA.longitude, Literal(lon, datatype=XSD.decimal))) # Settlement from location_resolution (preferred - has GeoNames ID) if location_resolution: geonames_id = location_resolution.get("geonames_id") settlement_name = location_resolution.get("geonames_name") or location_resolution.get("google_maps_locality") if geonames_id: # Create Settlement URI using GeoNames namespace settlement_uri = URIRef(f"https://sws.geonames.org/{geonames_id}/") self.graph.add((uri, HC.settlement, settlement_uri)) self.graph.add((loc_node, SCHEMA.containedInPlace, settlement_uri)) # Also add GeoNames ID as literal for easy querying self.graph.add((uri, HC.geonamesId, Literal(geonames_id, datatype=XSD.integer))) if settlement_name: # Add settlement name for text-based queries self.graph.add((uri, HC.settlementName, Literal(settlement_name))) self.graph.add((loc_node, SCHEMA.addressLocality, Literal(settlement_name))) else: # Fallback to wikidata_located_in located_in = wikidata.get("wikidata_located_in", {}) city_label = located_in.get("label_en") or located_in.get("label_nl") if city_label: self.graph.add((loc_node, SCHEMA.addressLocality, Literal(city_label))) self.graph.add((uri, HC.settlementName, Literal(city_label))) self.graph.add((uri, SCHEMA.location, loc_node)) def _add_settlement_only(self, uri: URIRef, location_resolution: dict) -> None: """Add settlement data without coordinates. Used when we have GHCID location_resolution but no precise coordinates. """ geonames_id = location_resolution.get("geonames_id") settlement_name = location_resolution.get("geonames_name") or location_resolution.get("google_maps_locality") if geonames_id: settlement_uri = URIRef(f"https://sws.geonames.org/{geonames_id}/") self.graph.add((uri, HC.settlement, settlement_uri)) self.graph.add((uri, HC.geonamesId, Literal(geonames_id, datatype=XSD.integer))) if settlement_name: self.graph.add((uri, HC.settlementName, Literal(settlement_name))) def _add_geospatial_codes(self, uri: URIRef, ghcid: str, location_resolution: Optional[dict] = None) -> None: """Add ISO country and subregion codes from GHCID. GHCID format: {country}-{region}-{settlement}-{type}-{abbrev} Example: NL-NH-AMS-M-RM → countryCode: NL, subregionCode: NL-NH Args: uri: The custodian URI ghcid: The GHCID string location_resolution: Optional location resolution dict with region info """ if not ghcid: return parts = ghcid.split("-") if len(parts) < 2: return # Extract ISO 3166-1 alpha-2 country code (first part) country_code = parts[0] if len(country_code) == 2 and country_code.isalpha(): self.graph.add((uri, HC.countryCode, Literal(country_code.upper()))) # Extract ISO 3166-2 subregion code (country-region) if len(parts) >= 2: region_code = parts[1] # Subregion is the combination: country-region (e.g., NL-NH, DE-BY, US-CA) if region_code and region_code != "XX": # XX = unknown region subregion_code = f"{country_code.upper()}-{region_code.upper()}" self.graph.add((uri, HC.subregionCode, Literal(subregion_code))) # Also add region name if available from location_resolution if location_resolution: region_name = location_resolution.get("region_name") or location_resolution.get("admin1_name") if region_name: self.graph.add((uri, HC.subregionName, Literal(region_name))) def serialize(self, format: str = "turtle") -> bytes: """Serialize graph to bytes.""" return self.graph.serialize(format=format).encode("utf-8") def triple_count(self) -> int: """Get number of triples in graph.""" return len(self.graph) class OxigraphSyncer(BaseSyncer): """Sync custodian YAML files to Oxigraph triplestore.""" database_name = "oxigraph" def __init__(self, endpoint: str = OXIGRAPH_URL, batch_size: int = BATCH_SIZE, **kwargs): super().__init__(**kwargs) self.endpoint = endpoint.rstrip("/") self.batch_size = batch_size self.client = httpx.Client(timeout=120.0) self.converter = CustodianRDFConverter() def check_connection(self) -> bool: """Check if Oxigraph is available.""" try: resp = self.client.post( f"{self.endpoint}/query", headers={"Content-Type": "application/sparql-query"}, content="ASK { ?s ?p ?o }" ) return resp.status_code in (200, 204) except Exception as e: self.logger.error(f"Oxigraph connection failed: {e}") return False def get_status(self) -> dict: """Get Oxigraph status.""" try: resp = self.client.post( f"{self.endpoint}/query", headers={ "Content-Type": "application/sparql-query", "Accept": "application/sparql-results+json" }, content="SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }" ) if resp.status_code == 200: data = resp.json() count = int(data["results"]["bindings"][0]["count"]["value"]) return {"status": "healthy", "triple_count": count} except Exception as e: return {"status": "unavailable", "error": str(e)} return {"status": "unknown"} def clear_store(self) -> bool: """Clear all data from the store.""" try: resp = self.client.post( f"{self.endpoint}/update", headers={"Content-Type": "application/sparql-update"}, content="CLEAR DEFAULT" ) return resp.status_code in (200, 204) except Exception as e: self.logger.error(f"Failed to clear store: {e}") return False def _upload_turtle(self, ttl_data: bytes) -> bool: """Upload Turtle data to Oxigraph.""" try: resp = self.client.post( f"{self.endpoint}/store?default", headers={"Content-Type": "text/turtle"}, content=ttl_data ) return resp.status_code in (200, 201, 204) except Exception as e: self.logger.error(f"Failed to upload data: {e}") return False def sync(self, limit: Optional[int] = None, dry_run: bool = False, clear: bool = True) -> SyncResult: """Sync all YAML files to Oxigraph.""" result = SyncResult( database="oxigraph", status=SyncStatus.IN_PROGRESS, start_time=datetime.now(timezone.utc), ) # Check connection if not dry_run and not self.check_connection(): result.status = SyncStatus.FAILED result.error_message = f"Cannot connect to Oxigraph at {self.endpoint}" result.end_time = datetime.now(timezone.utc) return result # Load YAML files yaml_files = self._list_yaml_files() if limit: yaml_files = yaml_files[:limit] self.progress.total_files = len(yaml_files) self.progress.current_database = "oxigraph" self.logger.info(f"Processing {len(yaml_files)} YAML files...") # Clear existing data if not dry_run and clear: self.logger.info("Clearing existing data...") self.clear_store() # Process in batches total_triples = 0 processed = 0 failed = 0 for i in range(0, len(yaml_files), self.batch_size): batch = yaml_files[i:i + self.batch_size] batch_num = i // self.batch_size + 1 self.converter.reset() for filepath in batch: self.progress.processed_files = processed + 1 self.progress.current_file = filepath.name self._report_progress() try: with open(filepath, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data: self.converter.add_custodian(data, filepath) processed += 1 except Exception as e: self.logger.warning(f"Error processing {filepath.name}: {e}") failed += 1 self.progress.errors.append(f"{filepath.name}: {str(e)}") batch_triples = self.converter.triple_count() total_triples += batch_triples if not dry_run: try: ttl_data = self.converter.serialize("turtle") if self._upload_turtle(ttl_data): self.logger.info(f"Batch {batch_num}: loaded {len(batch)} files, {batch_triples:,} triples") else: self.logger.error(f"Batch {batch_num}: failed to upload") failed += len(batch) except Exception as e: self.logger.error(f"Batch {batch_num}: serialization failed: {e}") failed += len(batch) else: self.logger.info(f"Batch {batch_num}: parsed {len(batch)} files, {batch_triples:,} triples (dry-run)") result.records_processed = len(yaml_files) result.records_succeeded = processed result.records_failed = failed result.details["total_triples"] = total_triples result.details["batch_size"] = self.batch_size result.details["dry_run"] = dry_run if failed == 0: result.status = SyncStatus.SUCCESS elif processed > 0: result.status = SyncStatus.PARTIAL else: result.status = SyncStatus.FAILED result.end_time = datetime.now(timezone.utc) return result def main(): parser = argparse.ArgumentParser(description="Sync custodian YAML files to Oxigraph") parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument("--endpoint", default=OXIGRAPH_URL, help="Oxigraph SPARQL endpoint URL") parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Files per batch") parser.add_argument("--no-clear", action="store_true", help="Don't clear existing data") args = parser.parse_args() import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) syncer = OxigraphSyncer(endpoint=args.endpoint, batch_size=args.batch_size) print("=" * 60) print("Oxigraph Sync") print("=" * 60) if not args.dry_run: print(f"Checking connection to {args.endpoint}...") status = syncer.get_status() print(f" Status: {status.get('status', 'unknown')}") if status.get('triple_count'): print(f" Triple count: {status['triple_count']:,}") result = syncer.sync(limit=args.limit, dry_run=args.dry_run, clear=not args.no_clear) print("\n" + "=" * 60) print(f"Sync Result: {result.status.value.upper()}") print(f" Processed: {result.records_processed}") print(f" Succeeded: {result.records_succeeded}") print(f" Failed: {result.records_failed}") print(f" Triples: {result.details.get('total_triples', 0):,}") print(f" Duration: {result.duration_seconds:.2f}s") if result.error_message: print(f" Error: {result.error_message}") print("=" * 60) if __name__ == "__main__": main()