637 lines
26 KiB
Python
637 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sync Heritage Custodian YAML files to Oxigraph triplestore.
|
|
|
|
This script reads custodian YAML files from data/custodian/ and loads them
|
|
as RDF triples into Oxigraph for SPARQL querying and graph-based retrieval.
|
|
|
|
Usage:
|
|
python scripts/sync_custodians_to_oxigraph.py [--endpoint ENDPOINT] [--batch-size N]
|
|
|
|
Examples:
|
|
# Sync to local Oxigraph
|
|
python scripts/sync_custodians_to_oxigraph.py
|
|
|
|
# Sync to production via SSH tunnel
|
|
ssh -L 7878:127.0.0.1:7878 root@91.98.224.44 &
|
|
python scripts/sync_custodians_to_oxigraph.py --endpoint http://localhost:7878
|
|
|
|
# Sync directly to production (if allowed)
|
|
python scripts/sync_custodians_to_oxigraph.py --endpoint https://bronhouder.nl
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.parse
|
|
from datetime import datetime, timezone
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Any, Generator
|
|
|
|
import httpx
|
|
import yaml
|
|
from rdflib import Graph, Namespace, Literal, URIRef, BNode
|
|
from rdflib.namespace import RDF, RDFS, XSD, SKOS, DCTERMS, FOAF, OWL
|
|
|
|
# Add project root to path
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(PROJECT_ROOT / "src"))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Namespaces
|
|
# IMPORTANT: hc: namespace must match sparql_templates.yaml for queries to work
|
|
HC = Namespace("https://nde.nl/ontology/hc/") # Properties like institutionType, subregionCode
|
|
HCC = Namespace("https://nde.nl/ontology/hc/class/") # Classes like Custodian
|
|
GHCID = Namespace("https://w3id.org/heritage/ghcid/") # GHCID URIs for instances
|
|
ORG = Namespace("http://www.w3.org/ns/org#")
|
|
RICO = Namespace("https://www.ica.org/standards/RiC/ontology#")
|
|
SCHEMA = Namespace("http://schema.org/")
|
|
CIDOC = Namespace("http://www.cidoc-crm.org/cidoc-crm/")
|
|
PROV = Namespace("http://www.w3.org/ns/prov#")
|
|
WD = Namespace("http://www.wikidata.org/entity/")
|
|
WDT = Namespace("http://www.wikidata.org/prop/direct/")
|
|
GEO = Namespace("http://www.w3.org/2003/01/geo/wgs84_pos#")
|
|
|
|
|
|
def sanitize_ghcid_for_uri(ghcid: str) -> str:
|
|
"""Sanitize GHCID to be valid as a URI local name.
|
|
|
|
Removes or encodes invalid URI characters like backticks, quotes, etc.
|
|
"""
|
|
# Replace backticks, quotes, and other invalid chars with underscores
|
|
sanitized = re.sub(r'[`\'\"<>{}|\\^~\[\]@#$%&*()+=,;!? ]', '_', ghcid)
|
|
# Collapse multiple underscores
|
|
sanitized = re.sub(r'_+', '_', sanitized)
|
|
# Remove leading/trailing underscores
|
|
sanitized = sanitized.strip('_')
|
|
return sanitized
|
|
|
|
|
|
class CustodianRDFConverter:
|
|
"""Convert custodian YAML data to RDF triples."""
|
|
|
|
def __init__(self):
|
|
self.graph = Graph()
|
|
self._bind_namespaces()
|
|
|
|
def _bind_namespaces(self) -> None:
|
|
"""Bind all ontology namespaces to the graph."""
|
|
self.graph.bind("hc", HC)
|
|
self.graph.bind("hcc", HCC)
|
|
self.graph.bind("ghcid", GHCID)
|
|
self.graph.bind("org", ORG)
|
|
self.graph.bind("rico", RICO)
|
|
self.graph.bind("schema", SCHEMA)
|
|
self.graph.bind("cidoc", CIDOC)
|
|
self.graph.bind("prov", PROV)
|
|
self.graph.bind("wd", WD)
|
|
self.graph.bind("wdt", WDT)
|
|
self.graph.bind("geo", GEO)
|
|
self.graph.bind("dcterms", DCTERMS)
|
|
self.graph.bind("foaf", FOAF)
|
|
self.graph.bind("skos", SKOS)
|
|
self.graph.bind("owl", OWL)
|
|
|
|
def reset(self) -> None:
|
|
"""Reset the graph for new batch."""
|
|
self.graph = Graph()
|
|
self._bind_namespaces()
|
|
|
|
def add_custodian(self, data: dict[str, Any], filepath: Path) -> URIRef | None:
|
|
"""Convert custodian YAML data to RDF triples."""
|
|
# Extract GHCID for URI
|
|
ghcid_data = data.get("ghcid", {})
|
|
ghcid = ghcid_data.get("ghcid_current", "")
|
|
|
|
if not ghcid:
|
|
# Fallback to filename
|
|
ghcid = filepath.stem
|
|
|
|
# Sanitize GHCID for use in URI (remove invalid characters like backticks)
|
|
ghcid_uri_safe = sanitize_ghcid_for_uri(ghcid)
|
|
|
|
# Create custodian URI
|
|
custodian_uri = GHCID[ghcid_uri_safe]
|
|
|
|
# Get original entry and enrichment data
|
|
original = data.get("original_entry", {})
|
|
wikidata = data.get("wikidata_enrichment", {})
|
|
custodian_name = data.get("custodian_name", {})
|
|
|
|
# Type declarations
|
|
self.graph.add((custodian_uri, RDF.type, HCC.Custodian))
|
|
self.graph.add((custodian_uri, RDF.type, CIDOC.E39_Actor))
|
|
self.graph.add((custodian_uri, RDF.type, ORG.Organization))
|
|
self.graph.add((custodian_uri, RDF.type, SCHEMA.Organization))
|
|
self.graph.add((custodian_uri, RDF.type, PROV.Entity))
|
|
|
|
# Add institution-type-specific classes
|
|
# Extract institution type from various possible locations in the YAML
|
|
# Priority: 1) top-level institution_type, 2) original_entry.institution_type,
|
|
# 3) original_entry.type, 4) extract from GHCID
|
|
inst_type_raw = (
|
|
data.get("institution_type") or # Top-level (most common)
|
|
original.get("institution_type") or # Inside original_entry
|
|
original.get("type") or # Legacy format
|
|
None
|
|
)
|
|
|
|
# Map full type names to single-letter codes (GLAMORCUBESFIXPHDNT)
|
|
TYPE_NAME_TO_CODE = {
|
|
"GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", "MUSEUM": "M",
|
|
"OFFICIAL_INSTITUTION": "O", "RESEARCH_CENTER": "R", "CORPORATION": "C",
|
|
"UNKNOWN": "U", "BOTANICAL_ZOO": "B", "EDUCATION_PROVIDER": "E",
|
|
"COLLECTING_SOCIETY": "S", "FEATURES": "F", "INTANGIBLE_HERITAGE_GROUP": "I",
|
|
"MIXED": "X", "PERSONAL_COLLECTION": "P", "HOLY_SITES": "H",
|
|
"DIGITAL_PLATFORM": "D", "NGO": "N", "TASTE_SMELL": "T",
|
|
}
|
|
|
|
# If no explicit type found, extract from GHCID (4th component)
|
|
# Format: COUNTRY-REGION-CITY-TYPE-ABBREV (e.g., NL-NH-AMS-M-RM)
|
|
if not inst_type_raw and ghcid:
|
|
ghcid_parts = ghcid.split("-")
|
|
if len(ghcid_parts) >= 4:
|
|
type_from_ghcid = ghcid_parts[3]
|
|
# Only use single-letter codes from GHCID
|
|
if len(type_from_ghcid) == 1 and type_from_ghcid.upper() in "GLAMORCUBESFIXPHDNT":
|
|
inst_type_raw = type_from_ghcid.upper()
|
|
|
|
if isinstance(inst_type_raw, str):
|
|
inst_type_raw = [inst_type_raw]
|
|
elif not inst_type_raw:
|
|
inst_type_raw = []
|
|
|
|
for inst_type in inst_type_raw:
|
|
# Convert full name to code if needed
|
|
type_code = TYPE_NAME_TO_CODE.get(inst_type.upper(), inst_type) if inst_type else None
|
|
if type_code:
|
|
self._add_type_class(custodian_uri, type_code)
|
|
|
|
# Name - prefer emic name from custodian_name
|
|
name = (
|
|
custodian_name.get("emic_name") or
|
|
custodian_name.get("claim_value") or
|
|
wikidata.get("wikidata_label_nl") or
|
|
wikidata.get("wikidata_label_en") or
|
|
original.get("name") or
|
|
original.get("organisatie", "")
|
|
)
|
|
if name:
|
|
self.graph.add((custodian_uri, SKOS.prefLabel, Literal(name, lang="nl")))
|
|
self.graph.add((custodian_uri, SCHEMA.name, Literal(name)))
|
|
self.graph.add((custodian_uri, FOAF.name, Literal(name)))
|
|
self.graph.add((custodian_uri, RDFS.label, Literal(name)))
|
|
|
|
# Alternative names from Wikidata labels
|
|
labels = wikidata.get("wikidata_labels", {})
|
|
for lang, label in labels.items():
|
|
if label and label != name:
|
|
# Normalize language tags
|
|
lang_tag = lang.replace("-", "_").split("_")[0] # Get base language
|
|
self.graph.add((custodian_uri, SKOS.altLabel, Literal(label, lang=lang_tag)))
|
|
|
|
# Description
|
|
description = (
|
|
wikidata.get("wikidata_description_en") or
|
|
wikidata.get("wikidata_description_nl", "")
|
|
)
|
|
if description:
|
|
self.graph.add((custodian_uri, DCTERMS.description, Literal(description, lang="en")))
|
|
self.graph.add((custodian_uri, SCHEMA.description, Literal(description)))
|
|
|
|
# GHCID identifier
|
|
if ghcid:
|
|
self.graph.add((custodian_uri, HC.ghcid, Literal(ghcid)))
|
|
self.graph.add((custodian_uri, DCTERMS.identifier, Literal(ghcid)))
|
|
|
|
# Extract subregionCode from GHCID (format: COUNTRY-REGION-CITY-TYPE-ABBREV)
|
|
# e.g., NL-NH-AMS-M-RM → subregionCode = NL-NH
|
|
ghcid_parts = ghcid.split("-")
|
|
if len(ghcid_parts) >= 2:
|
|
country = ghcid_parts[0]
|
|
region = ghcid_parts[1]
|
|
subregion_code = f"{country}-{region}"
|
|
self.graph.add((custodian_uri, HC.subregionCode, Literal(subregion_code)))
|
|
self.graph.add((custodian_uri, HC.countryCode, Literal(country)))
|
|
|
|
# Settlement/city name from location data
|
|
# Used by SPARQL templates for city-based queries
|
|
location = data.get("location", {})
|
|
city_name = location.get("city") or location.get("geonames_name")
|
|
if city_name:
|
|
self.graph.add((custodian_uri, HC.settlementName, Literal(city_name)))
|
|
|
|
# GHCID UUID
|
|
ghcid_uuid = ghcid_data.get("ghcid_uuid")
|
|
if ghcid_uuid:
|
|
self.graph.add((custodian_uri, HC.ghcidUUID, Literal(ghcid_uuid)))
|
|
|
|
# Wikidata sameAs link
|
|
wikidata_id = (
|
|
wikidata.get("wikidata_entity_id") or
|
|
original.get("wikidata_id", "")
|
|
)
|
|
if wikidata_id:
|
|
wd_uri = WD[wikidata_id]
|
|
self.graph.add((custodian_uri, OWL.sameAs, wd_uri))
|
|
self.graph.add((custodian_uri, SCHEMA.sameAs, wd_uri))
|
|
self.graph.add((custodian_uri, HC.wikidataId, Literal(wikidata_id)))
|
|
|
|
# ISIL code
|
|
isil = (
|
|
wikidata.get("wikidata_identifiers", {}).get("isil") or
|
|
original.get("isil-code_na", "")
|
|
)
|
|
if isil:
|
|
self._add_identifier(custodian_uri, "ISIL", isil)
|
|
|
|
# Other identifiers from Wikidata
|
|
wd_ids = wikidata.get("wikidata_identifiers", {})
|
|
for id_type, id_value in wd_ids.items():
|
|
if id_type != "isil" and id_value:
|
|
self._add_identifier(custodian_uri, id_type.upper(), str(id_value))
|
|
|
|
# Location from coordinates - check multiple sources in priority order
|
|
# Priority: 1) location.latitude/longitude (canonical), 2) location.coordinates,
|
|
# 3) wikidata_coordinates, 4) google_maps_enrichment.coordinates
|
|
coords = None
|
|
|
|
# 1. Check canonical location (top-level latitude/longitude)
|
|
location = data.get("location", {})
|
|
if location.get("latitude") and location.get("longitude"):
|
|
coords = {
|
|
"latitude": location["latitude"],
|
|
"longitude": location["longitude"]
|
|
}
|
|
|
|
# 2. Check location.coordinates (GeoNames format)
|
|
if not coords:
|
|
loc_coords = location.get("coordinates", {})
|
|
if loc_coords.get("latitude") and loc_coords.get("longitude"):
|
|
coords = loc_coords
|
|
|
|
# 3. Fallback to Wikidata coordinates
|
|
if not coords:
|
|
wd_coords = wikidata.get("wikidata_coordinates", {})
|
|
if wd_coords.get("latitude") and wd_coords.get("longitude"):
|
|
coords = wd_coords
|
|
|
|
# 4. Fallback to Google Maps coordinates
|
|
if not coords:
|
|
gmaps = data.get("google_maps_enrichment", {})
|
|
gmaps_coords = gmaps.get("coordinates", {})
|
|
if gmaps_coords.get("latitude") and gmaps_coords.get("longitude"):
|
|
coords = gmaps_coords
|
|
|
|
if coords:
|
|
self._add_location(custodian_uri, coords, wikidata)
|
|
|
|
# Official website - handle both single string and list of URLs
|
|
website = wikidata.get("wikidata_official_website")
|
|
if website:
|
|
websites = website if isinstance(website, list) else [website]
|
|
for url in websites:
|
|
if url and isinstance(url, str):
|
|
try:
|
|
self.graph.add((custodian_uri, SCHEMA.url, URIRef(url)))
|
|
self.graph.add((custodian_uri, FOAF.homepage, URIRef(url)))
|
|
except Exception:
|
|
# Skip invalid URLs
|
|
pass
|
|
|
|
# Located in (administrative)
|
|
located_in = wikidata.get("wikidata_located_in", {})
|
|
if located_in.get("id"):
|
|
loc_uri = WD[located_in["id"]]
|
|
self.graph.add((custodian_uri, SCHEMA.containedInPlace, loc_uri))
|
|
self.graph.add((custodian_uri, WDT.P131, loc_uri)) # P131 = located in admin entity
|
|
|
|
# Country
|
|
country = wikidata.get("wikidata_country", {})
|
|
if country.get("id"):
|
|
country_uri = WD[country["id"]]
|
|
self.graph.add((custodian_uri, SCHEMA.addressCountry, country_uri))
|
|
self.graph.add((custodian_uri, WDT.P17, country_uri)) # P17 = country
|
|
|
|
# Instance of (from Wikidata)
|
|
for instance in wikidata.get("wikidata_instance_of", []):
|
|
if instance.get("id"):
|
|
inst_uri = WD[instance["id"]]
|
|
self.graph.add((custodian_uri, WDT.P31, inst_uri)) # P31 = instance of
|
|
|
|
# Partnerships
|
|
claims = wikidata.get("wikidata_claims", {})
|
|
partners = claims.get("P2652_partnership_with", {})
|
|
if isinstance(partners, dict) and partners.get("value"):
|
|
partner_values = partners["value"]
|
|
if isinstance(partner_values, list):
|
|
for partner in partner_values:
|
|
if partner.get("id"):
|
|
partner_uri = WD[partner["id"]]
|
|
self.graph.add((custodian_uri, ORG.linkedTo, partner_uri))
|
|
self.graph.add((custodian_uri, HC.partnerOf, partner_uri))
|
|
|
|
# Member of
|
|
member_of = claims.get("member_of", {})
|
|
if member_of.get("id"):
|
|
member_uri = WD[member_of["id"]]
|
|
self.graph.add((custodian_uri, ORG.memberOf, member_uri))
|
|
|
|
# Replaces (predecessor organization)
|
|
replaces = claims.get("P1365_replaces", {})
|
|
if isinstance(replaces, dict) and replaces.get("value", {}).get("id"):
|
|
predecessor_uri = WD[replaces["value"]["id"]]
|
|
self.graph.add((custodian_uri, DCTERMS.replaces, predecessor_uri))
|
|
self.graph.add((custodian_uri, PROV.wasDerivedFrom, predecessor_uri))
|
|
|
|
# Has parts (collections, sub-organizations)
|
|
has_parts = claims.get("P527_has_part_s_", {})
|
|
if isinstance(has_parts, dict) and has_parts.get("value", {}).get("id"):
|
|
part_uri = WD[has_parts["value"]["id"]]
|
|
self.graph.add((custodian_uri, DCTERMS.hasPart, part_uri))
|
|
|
|
# Processing provenance
|
|
timestamp = data.get("processing_timestamp")
|
|
if timestamp:
|
|
self.graph.add((custodian_uri, PROV.generatedAtTime,
|
|
Literal(timestamp, datatype=XSD.dateTime)))
|
|
|
|
return custodian_uri
|
|
|
|
def _add_type_class(self, uri: URIRef, inst_type: str) -> None:
|
|
"""Add institution-type-specific RDF classes."""
|
|
type_mappings = {
|
|
"M": [(SCHEMA.Museum,), (CIDOC["E74_Group"],)],
|
|
"A": [(SCHEMA.ArchiveOrganization,), (RICO.CorporateBody,)],
|
|
"L": [(SCHEMA.Library,)],
|
|
"G": [(SCHEMA.Museum,)], # Gallery
|
|
"R": [(SCHEMA.ResearchOrganization,)],
|
|
"H": [(SCHEMA.PlaceOfWorship,)], # Holy sites
|
|
"E": [(SCHEMA.EducationalOrganization,)],
|
|
"S": [(ORG.Organization,)], # Societies
|
|
"O": [(ORG.Organization,)], # Official
|
|
}
|
|
|
|
for classes in type_mappings.get(inst_type, []):
|
|
for cls in classes:
|
|
self.graph.add((uri, RDF.type, cls))
|
|
|
|
# Also add the type code as literal
|
|
if inst_type:
|
|
self.graph.add((uri, HC.institutionType, Literal(inst_type)))
|
|
|
|
def _add_identifier(self, uri: URIRef, scheme: str, value: str) -> None:
|
|
"""Add an identifier as a blank node."""
|
|
id_node = BNode()
|
|
self.graph.add((id_node, RDF.type, CIDOC.E42_Identifier))
|
|
self.graph.add((id_node, RICO.identifierType, Literal(scheme)))
|
|
self.graph.add((id_node, RICO.textualValue, Literal(value)))
|
|
self.graph.add((uri, CIDOC.P1_is_identified_by, id_node))
|
|
|
|
# Direct property for common identifiers
|
|
if scheme == "ISIL":
|
|
self.graph.add((uri, HC.isil, Literal(value)))
|
|
elif scheme == "VIAF":
|
|
self.graph.add((uri, HC.viaf, Literal(value)))
|
|
elif scheme == "GND":
|
|
self.graph.add((uri, HC.gnd, Literal(value)))
|
|
|
|
def _add_location(self, uri: URIRef, coords: dict, wikidata: dict) -> None:
|
|
"""Add location with coordinates."""
|
|
loc_node = BNode()
|
|
self.graph.add((loc_node, RDF.type, SCHEMA.Place))
|
|
self.graph.add((loc_node, RDF.type, GEO.SpatialThing))
|
|
|
|
lat = coords.get("latitude")
|
|
lon = coords.get("longitude")
|
|
|
|
if lat and lon:
|
|
self.graph.add((loc_node, GEO.lat, Literal(lat, datatype=XSD.decimal)))
|
|
self.graph.add((loc_node, GEO.long, Literal(lon, datatype=XSD.decimal)))
|
|
self.graph.add((loc_node, SCHEMA.latitude, Literal(lat, datatype=XSD.decimal)))
|
|
self.graph.add((loc_node, SCHEMA.longitude, Literal(lon, datatype=XSD.decimal)))
|
|
|
|
# Add city from located_in
|
|
located_in = wikidata.get("wikidata_located_in", {})
|
|
city_label = located_in.get("label_en") or located_in.get("label_nl")
|
|
if city_label:
|
|
self.graph.add((loc_node, SCHEMA.addressLocality, Literal(city_label)))
|
|
|
|
# Link to custodian
|
|
self.graph.add((uri, SCHEMA.location, loc_node))
|
|
|
|
def serialize(self, format: str = "turtle") -> bytes:
|
|
"""Serialize graph to bytes."""
|
|
return self.graph.serialize(format=format).encode("utf-8")
|
|
|
|
def triple_count(self) -> int:
|
|
"""Get number of triples in graph."""
|
|
return len(self.graph)
|
|
|
|
|
|
class OxigraphLoader:
|
|
"""Load RDF data into Oxigraph."""
|
|
|
|
def __init__(self, endpoint: str = "http://localhost:7878"):
|
|
self.endpoint = endpoint.rstrip("/")
|
|
self.client = httpx.Client(timeout=120.0)
|
|
|
|
def check_health(self) -> bool:
|
|
"""Check if Oxigraph is available."""
|
|
try:
|
|
# Try query endpoint
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/query",
|
|
headers={"Content-Type": "application/sparql-query"},
|
|
content="ASK { ?s ?p ?o }"
|
|
)
|
|
return resp.status_code in (200, 204)
|
|
except Exception as e:
|
|
logger.error(f"Oxigraph health check failed: {e}")
|
|
return False
|
|
|
|
def get_triple_count(self) -> int:
|
|
"""Get current triple count."""
|
|
try:
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/query",
|
|
headers={
|
|
"Content-Type": "application/sparql-query",
|
|
"Accept": "application/sparql-results+json"
|
|
},
|
|
content="SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }"
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
return int(data["results"]["bindings"][0]["count"]["value"])
|
|
except Exception as e:
|
|
logger.error(f"Failed to get triple count: {e}")
|
|
return 0
|
|
|
|
def load_turtle(self, ttl_data: bytes) -> bool:
|
|
"""Load Turtle data into Oxigraph."""
|
|
try:
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/store?default",
|
|
headers={"Content-Type": "text/turtle"},
|
|
content=ttl_data
|
|
)
|
|
return resp.status_code in (200, 201, 204)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load data: {e}")
|
|
return False
|
|
|
|
def clear_graph(self, graph_uri: str | None = None) -> bool:
|
|
"""Clear a named graph or default graph."""
|
|
try:
|
|
if graph_uri:
|
|
query = f"CLEAR GRAPH <{graph_uri}>"
|
|
else:
|
|
query = "CLEAR DEFAULT"
|
|
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/update",
|
|
headers={"Content-Type": "application/sparql-update"},
|
|
content=query
|
|
)
|
|
return resp.status_code in (200, 204)
|
|
except Exception as e:
|
|
logger.error(f"Failed to clear graph: {e}")
|
|
return False
|
|
|
|
|
|
def iter_custodian_files(data_dir: Path) -> Generator[Path, None, None]:
|
|
"""Iterate over custodian YAML files."""
|
|
for filepath in sorted(data_dir.glob("*.yaml")):
|
|
yield filepath
|
|
|
|
|
|
def load_yaml(filepath: Path) -> dict[str, Any] | None:
|
|
"""Load YAML file.
|
|
|
|
Note: Large files (>1MB) are processed normally - they contain valuable
|
|
YouTube transcripts and other enrichment data that should not be stripped.
|
|
"""
|
|
try:
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
return yaml.safe_load(f)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load {filepath.name}: {e}")
|
|
return None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Sync custodian YAML files to Oxigraph")
|
|
parser.add_argument("--data-dir", type=Path,
|
|
default=PROJECT_ROOT / "data" / "custodian",
|
|
help="Directory containing custodian YAML files")
|
|
parser.add_argument("--endpoint", type=str,
|
|
default="http://localhost:7878",
|
|
help="Oxigraph SPARQL endpoint URL")
|
|
parser.add_argument("--batch-size", type=int, default=500,
|
|
help="Number of files to process per batch")
|
|
parser.add_argument("--clear", action="store_true",
|
|
help="Clear existing data before loading")
|
|
parser.add_argument("--limit", type=int, default=None,
|
|
help="Limit number of files to process (for testing)")
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
help="Parse files but don't upload to Oxigraph")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize
|
|
converter = CustodianRDFConverter()
|
|
loader = OxigraphLoader(args.endpoint)
|
|
initial_count = 0 # Track initial triple count for statistics
|
|
|
|
# Check Oxigraph availability
|
|
if not args.dry_run:
|
|
logger.info(f"Connecting to Oxigraph at {args.endpoint}...")
|
|
if not loader.check_health():
|
|
logger.error("Cannot connect to Oxigraph. Ensure it's running and accessible.")
|
|
sys.exit(1)
|
|
|
|
initial_count = loader.get_triple_count()
|
|
logger.info(f"Initial triple count: {initial_count:,}")
|
|
|
|
if args.clear:
|
|
logger.info("Clearing existing custodian data...")
|
|
# Clear our namespace specifically
|
|
loader.clear_graph()
|
|
logger.info("Store cleared")
|
|
|
|
# Count files
|
|
files = list(iter_custodian_files(args.data_dir))
|
|
total_files = len(files)
|
|
|
|
if args.limit:
|
|
files = files[:args.limit]
|
|
|
|
logger.info(f"Found {total_files:,} custodian files, processing {len(files):,}")
|
|
|
|
# Process in batches
|
|
processed = 0
|
|
failed = 0
|
|
total_triples = 0
|
|
|
|
for i in range(0, len(files), args.batch_size):
|
|
batch = files[i:i + args.batch_size]
|
|
batch_num = i // args.batch_size + 1
|
|
|
|
converter.reset()
|
|
|
|
for filepath in batch:
|
|
data = load_yaml(filepath)
|
|
if data:
|
|
try:
|
|
converter.add_custodian(data, filepath)
|
|
processed += 1
|
|
except Exception as e:
|
|
logger.warning(f"Error processing {filepath.name}: {e}")
|
|
failed += 1
|
|
else:
|
|
failed += 1
|
|
|
|
batch_triples = converter.triple_count()
|
|
total_triples += batch_triples
|
|
|
|
if not args.dry_run:
|
|
# Upload batch
|
|
try:
|
|
ttl_data = converter.serialize("turtle")
|
|
if loader.load_turtle(ttl_data):
|
|
logger.info(f"Batch {batch_num}: loaded {len(batch)} files, {batch_triples:,} triples")
|
|
else:
|
|
logger.error(f"Batch {batch_num}: failed to upload")
|
|
except Exception as e:
|
|
logger.error(f"Batch {batch_num}: serialization failed: {e}")
|
|
# Continue with next batch
|
|
else:
|
|
logger.info(f"Batch {batch_num}: parsed {len(batch)} files, {batch_triples:,} triples (dry-run)")
|
|
|
|
# Final statistics
|
|
logger.info("=" * 60)
|
|
logger.info(f"Processing complete:")
|
|
logger.info(f" Files processed: {processed:,}")
|
|
logger.info(f" Files failed: {failed:,}")
|
|
logger.info(f" Total triples generated: {total_triples:,}")
|
|
|
|
if not args.dry_run:
|
|
final_count = loader.get_triple_count()
|
|
added = final_count - initial_count
|
|
logger.info(f" Triples added: {added:,}")
|
|
logger.info(f" Final triple count: {final_count:,}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|