561 lines
22 KiB
Python
561 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Oxigraph Sync Module - Sync custodian YAML files to Oxigraph triplestore.
|
|
|
|
This module syncs all custodian YAML files to Oxigraph as RDF triples for
|
|
SPARQL querying and graph-based retrieval.
|
|
|
|
Usage:
|
|
python -m scripts.sync.oxigraph_sync [--dry-run] [--limit N] [--endpoint URL]
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
import yaml
|
|
from rdflib import Graph, Namespace, Literal, URIRef, BNode
|
|
from rdflib.namespace import RDF, RDFS, XSD, SKOS, DCTERMS, FOAF, OWL
|
|
|
|
# Add project root to path
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR
|
|
|
|
# Configuration
|
|
OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://localhost:7878")
|
|
BATCH_SIZE = 500
|
|
|
|
# Namespaces - Using nde.nl as canonical namespace (per LinkML schema)
|
|
HC = Namespace("https://nde.nl/ontology/hc/")
|
|
GHCID = Namespace("https://nde.nl/ontology/hc/")
|
|
NDE = Namespace("https://nde.nl/ontology/hc/class/")
|
|
ORG = Namespace("http://www.w3.org/ns/org#")
|
|
RICO = Namespace("https://www.ica.org/standards/RiC/ontology#")
|
|
SCHEMA = Namespace("http://schema.org/")
|
|
CIDOC = Namespace("http://www.cidoc-crm.org/cidoc-crm/")
|
|
PROV = Namespace("http://www.w3.org/ns/prov#")
|
|
WD = Namespace("http://www.wikidata.org/entity/")
|
|
WDT = Namespace("http://www.wikidata.org/prop/direct/")
|
|
GEO = Namespace("http://www.w3.org/2003/01/geo/wgs84_pos#")
|
|
|
|
|
|
def sanitize_ghcid_for_uri(ghcid: str) -> str:
|
|
"""Sanitize GHCID to be valid as a URI local name."""
|
|
sanitized = re.sub(r'[`\'\"<>{}|\\^~\[\]@#$%&*()+=,;!? ]', '_', ghcid)
|
|
sanitized = re.sub(r'_+', '_', sanitized)
|
|
sanitized = sanitized.strip('_')
|
|
return sanitized
|
|
|
|
|
|
class CustodianRDFConverter:
|
|
"""Convert custodian YAML data to RDF triples."""
|
|
|
|
def __init__(self):
|
|
self.graph = Graph()
|
|
self._bind_namespaces()
|
|
|
|
def _bind_namespaces(self) -> None:
|
|
"""Bind all ontology namespaces to the graph."""
|
|
self.graph.bind("hc", HC)
|
|
self.graph.bind("ghcid", GHCID)
|
|
self.graph.bind("nde", NDE)
|
|
self.graph.bind("org", ORG)
|
|
self.graph.bind("rico", RICO)
|
|
self.graph.bind("schema", SCHEMA)
|
|
self.graph.bind("cidoc", CIDOC)
|
|
self.graph.bind("prov", PROV)
|
|
self.graph.bind("wd", WD)
|
|
self.graph.bind("wdt", WDT)
|
|
self.graph.bind("geo", GEO)
|
|
self.graph.bind("dcterms", DCTERMS)
|
|
self.graph.bind("foaf", FOAF)
|
|
self.graph.bind("skos", SKOS)
|
|
self.graph.bind("owl", OWL)
|
|
|
|
def reset(self) -> None:
|
|
"""Reset the graph for new batch."""
|
|
self.graph = Graph()
|
|
self._bind_namespaces()
|
|
|
|
def add_custodian(self, data: dict[str, Any], filepath: Path) -> URIRef | None:
|
|
"""Convert custodian YAML data to RDF triples."""
|
|
ghcid_data = data.get("ghcid", {})
|
|
ghcid = ghcid_data.get("ghcid_current", "")
|
|
|
|
if not ghcid:
|
|
ghcid = filepath.stem
|
|
|
|
ghcid_uri_safe = sanitize_ghcid_for_uri(ghcid)
|
|
custodian_uri = GHCID[ghcid_uri_safe]
|
|
|
|
original = data.get("original_entry", {})
|
|
wikidata = data.get("wikidata_enrichment", {})
|
|
custodian_name = data.get("custodian_name", {})
|
|
|
|
# Type declarations
|
|
self.graph.add((custodian_uri, RDF.type, NDE.Custodian))
|
|
self.graph.add((custodian_uri, RDF.type, CIDOC.E39_Actor))
|
|
self.graph.add((custodian_uri, RDF.type, ORG.Organization))
|
|
self.graph.add((custodian_uri, RDF.type, SCHEMA.Organization))
|
|
self.graph.add((custodian_uri, RDF.type, PROV.Entity))
|
|
|
|
# Institution types
|
|
inst_types = original.get("type", [])
|
|
if isinstance(inst_types, str):
|
|
inst_types = [inst_types]
|
|
for inst_type in inst_types:
|
|
self._add_type_class(custodian_uri, inst_type)
|
|
|
|
# Name
|
|
name = (
|
|
custodian_name.get("emic_name") or
|
|
custodian_name.get("claim_value") or
|
|
wikidata.get("wikidata_label_nl") or
|
|
wikidata.get("wikidata_label_en") or
|
|
original.get("name") or
|
|
original.get("organisatie", "")
|
|
)
|
|
if name:
|
|
self.graph.add((custodian_uri, SKOS.prefLabel, Literal(name, lang="nl")))
|
|
self.graph.add((custodian_uri, SCHEMA.name, Literal(name)))
|
|
self.graph.add((custodian_uri, FOAF.name, Literal(name)))
|
|
self.graph.add((custodian_uri, RDFS.label, Literal(name)))
|
|
|
|
# Alternative names
|
|
labels = wikidata.get("wikidata_labels", {})
|
|
for lang, label in labels.items():
|
|
if label and label != name:
|
|
lang_tag = lang.replace("-", "_").split("_")[0]
|
|
self.graph.add((custodian_uri, SKOS.altLabel, Literal(label, lang=lang_tag)))
|
|
|
|
# Description
|
|
description = (
|
|
wikidata.get("wikidata_description_en") or
|
|
wikidata.get("wikidata_description_nl", "")
|
|
)
|
|
if description:
|
|
self.graph.add((custodian_uri, DCTERMS.description, Literal(description, lang="en")))
|
|
self.graph.add((custodian_uri, SCHEMA.description, Literal(description)))
|
|
|
|
# GHCID identifier
|
|
if ghcid:
|
|
self.graph.add((custodian_uri, HC.ghcid, Literal(ghcid)))
|
|
self.graph.add((custodian_uri, DCTERMS.identifier, Literal(ghcid)))
|
|
|
|
# GHCID UUID
|
|
ghcid_uuid = ghcid_data.get("ghcid_uuid")
|
|
if ghcid_uuid:
|
|
self.graph.add((custodian_uri, HC.ghcidUUID, Literal(ghcid_uuid)))
|
|
|
|
# Wikidata sameAs link
|
|
wikidata_id = (
|
|
wikidata.get("wikidata_entity_id") or
|
|
original.get("wikidata_id", "")
|
|
)
|
|
if wikidata_id:
|
|
wd_uri = WD[wikidata_id]
|
|
self.graph.add((custodian_uri, OWL.sameAs, wd_uri))
|
|
self.graph.add((custodian_uri, SCHEMA.sameAs, wd_uri))
|
|
self.graph.add((custodian_uri, HC.wikidataId, Literal(wikidata_id)))
|
|
|
|
# ISIL code
|
|
isil = (
|
|
wikidata.get("wikidata_identifiers", {}).get("isil") or
|
|
original.get("isil-code_na", "")
|
|
)
|
|
if isil:
|
|
self._add_identifier(custodian_uri, "ISIL", isil)
|
|
|
|
# Other identifiers
|
|
wd_ids = wikidata.get("wikidata_identifiers", {})
|
|
for id_type, id_value in wd_ids.items():
|
|
if id_type != "isil" and id_value:
|
|
self._add_identifier(custodian_uri, id_type.upper(), str(id_value))
|
|
|
|
# Location
|
|
coords = wikidata.get("wikidata_coordinates", {})
|
|
if coords.get("latitude") and coords.get("longitude"):
|
|
self._add_location(custodian_uri, coords, wikidata)
|
|
|
|
# Official website
|
|
website = wikidata.get("wikidata_official_website")
|
|
if website:
|
|
websites = website if isinstance(website, list) else [website]
|
|
for url in websites:
|
|
if url and isinstance(url, str):
|
|
try:
|
|
self.graph.add((custodian_uri, SCHEMA.url, URIRef(url)))
|
|
self.graph.add((custodian_uri, FOAF.homepage, URIRef(url)))
|
|
except Exception:
|
|
pass
|
|
|
|
# Located in (administrative)
|
|
located_in = wikidata.get("wikidata_located_in", {})
|
|
if located_in.get("id"):
|
|
loc_uri = WD[located_in["id"]]
|
|
self.graph.add((custodian_uri, SCHEMA.containedInPlace, loc_uri))
|
|
self.graph.add((custodian_uri, WDT.P131, loc_uri))
|
|
|
|
# Country
|
|
country = wikidata.get("wikidata_country", {})
|
|
if country.get("id"):
|
|
country_uri = WD[country["id"]]
|
|
self.graph.add((custodian_uri, SCHEMA.addressCountry, country_uri))
|
|
self.graph.add((custodian_uri, WDT.P17, country_uri))
|
|
|
|
# Instance of
|
|
for instance in wikidata.get("wikidata_instance_of", []):
|
|
if instance.get("id"):
|
|
inst_uri = WD[instance["id"]]
|
|
self.graph.add((custodian_uri, WDT.P31, inst_uri))
|
|
|
|
|
|
# Temporal data - Founding date
|
|
# Try multiple sources: timespan.begin_of_the_begin (preferred), wikidata_inception
|
|
founding_date = None
|
|
founding_year = None
|
|
|
|
# Source 1: TimeSpan begin_of_the_begin (preferred - has proper dates)
|
|
timespan = data.get("timespan", {})
|
|
if timespan:
|
|
begin = timespan.get("begin_of_the_begin", "")
|
|
if begin and len(begin) >= 10:
|
|
# Format: "1638-01-01T00:00:00Z" -> "1638-01-01"
|
|
founding_date = begin[:10]
|
|
|
|
# Source 2: Wikidata inception (P571) - fallback, may have 00 for month/day
|
|
if not founding_date:
|
|
wikidata_inception = wikidata.get("wikidata_inception", "")
|
|
if not wikidata_inception:
|
|
wikidata_inception = wikidata.get("wikidata_founded", "")
|
|
|
|
if wikidata_inception:
|
|
# Wikidata format: "+1800-01-01T00:00:00Z" or "1638-00-00"
|
|
if wikidata_inception.startswith("+"):
|
|
raw_date = wikidata_inception[1:11] # Extract YYYY-MM-DD
|
|
elif wikidata_inception.startswith("-"):
|
|
raw_date = wikidata_inception[:11] # Keep - for BCE dates
|
|
else:
|
|
raw_date = wikidata_inception[:10]
|
|
|
|
# Fix year-only dates: "1638-00-00" -> "1638-01-01"
|
|
if len(raw_date) >= 10:
|
|
if raw_date[5:7] == "00":
|
|
raw_date = raw_date[:5] + "01" + raw_date[7:]
|
|
if raw_date[8:10] == "00":
|
|
raw_date = raw_date[:8] + "01"
|
|
founding_date = raw_date
|
|
|
|
# Extract year for simpler queries
|
|
if founding_date and len(founding_date) >= 4:
|
|
try:
|
|
founding_year = int(founding_date[:4].lstrip('-+'))
|
|
except ValueError:
|
|
pass
|
|
|
|
# Add founding date triple
|
|
if founding_date:
|
|
try:
|
|
self.graph.add((custodian_uri, SCHEMA.foundingDate,
|
|
Literal(founding_date, datatype=XSD.date)))
|
|
# Also add as Wikidata inception property for compatibility
|
|
self.graph.add((custodian_uri, WDT.P571,
|
|
Literal(founding_date, datatype=XSD.date)))
|
|
except Exception:
|
|
# Skip if date is still malformed
|
|
pass
|
|
|
|
# Founding year (for easier range queries)
|
|
if founding_year:
|
|
self.graph.add((custodian_uri, HC.foundingYear,
|
|
Literal(founding_year, datatype=XSD.integer)))
|
|
|
|
# Processing provenance
|
|
timestamp = data.get("processing_timestamp")
|
|
if timestamp:
|
|
self.graph.add((custodian_uri, PROV.generatedAtTime,
|
|
Literal(timestamp, datatype=XSD.dateTime)))
|
|
|
|
return custodian_uri
|
|
|
|
def _add_type_class(self, uri: URIRef, inst_type: str) -> None:
|
|
"""Add institution-type-specific RDF classes."""
|
|
type_mappings = {
|
|
"M": [(SCHEMA.Museum,), (CIDOC["E74_Group"],)],
|
|
"A": [(SCHEMA.ArchiveOrganization,), (RICO.CorporateBody,)],
|
|
"L": [(SCHEMA.Library,)],
|
|
"G": [(SCHEMA.Museum,)],
|
|
"R": [(SCHEMA.ResearchOrganization,)],
|
|
"H": [(SCHEMA.PlaceOfWorship,)],
|
|
"E": [(SCHEMA.EducationalOrganization,)],
|
|
"S": [(ORG.Organization,)],
|
|
"O": [(ORG.Organization,)],
|
|
}
|
|
|
|
for classes in type_mappings.get(inst_type, []):
|
|
for cls in classes:
|
|
self.graph.add((uri, RDF.type, cls))
|
|
|
|
if inst_type:
|
|
self.graph.add((uri, HC.institutionType, Literal(inst_type)))
|
|
|
|
def _add_identifier(self, uri: URIRef, scheme: str, value: str) -> None:
|
|
"""Add an identifier as a blank node."""
|
|
id_node = BNode()
|
|
self.graph.add((id_node, RDF.type, CIDOC.E42_Identifier))
|
|
self.graph.add((id_node, RICO.identifierType, Literal(scheme)))
|
|
self.graph.add((id_node, RICO.textualValue, Literal(value)))
|
|
self.graph.add((uri, CIDOC.P1_is_identified_by, id_node))
|
|
|
|
if scheme == "ISIL":
|
|
self.graph.add((uri, HC.isil, Literal(value)))
|
|
elif scheme == "VIAF":
|
|
self.graph.add((uri, HC.viaf, Literal(value)))
|
|
elif scheme == "GND":
|
|
self.graph.add((uri, HC.gnd, Literal(value)))
|
|
|
|
def _add_location(self, uri: URIRef, coords: dict, wikidata: dict) -> None:
|
|
"""Add location with coordinates."""
|
|
loc_node = BNode()
|
|
self.graph.add((loc_node, RDF.type, SCHEMA.Place))
|
|
self.graph.add((loc_node, RDF.type, GEO.SpatialThing))
|
|
|
|
lat = coords.get("latitude")
|
|
lon = coords.get("longitude")
|
|
|
|
if lat and lon:
|
|
self.graph.add((loc_node, GEO.lat, Literal(lat, datatype=XSD.decimal)))
|
|
self.graph.add((loc_node, GEO.long, Literal(lon, datatype=XSD.decimal)))
|
|
self.graph.add((loc_node, SCHEMA.latitude, Literal(lat, datatype=XSD.decimal)))
|
|
self.graph.add((loc_node, SCHEMA.longitude, Literal(lon, datatype=XSD.decimal)))
|
|
|
|
located_in = wikidata.get("wikidata_located_in", {})
|
|
city_label = located_in.get("label_en") or located_in.get("label_nl")
|
|
if city_label:
|
|
self.graph.add((loc_node, SCHEMA.addressLocality, Literal(city_label)))
|
|
|
|
self.graph.add((uri, SCHEMA.location, loc_node))
|
|
|
|
def serialize(self, format: str = "turtle") -> bytes:
|
|
"""Serialize graph to bytes."""
|
|
return self.graph.serialize(format=format).encode("utf-8")
|
|
|
|
def triple_count(self) -> int:
|
|
"""Get number of triples in graph."""
|
|
return len(self.graph)
|
|
|
|
|
|
class OxigraphSyncer(BaseSyncer):
|
|
"""Sync custodian YAML files to Oxigraph triplestore."""
|
|
|
|
database_name = "oxigraph"
|
|
|
|
def __init__(self, endpoint: str = OXIGRAPH_URL, batch_size: int = BATCH_SIZE, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.endpoint = endpoint.rstrip("/")
|
|
self.batch_size = batch_size
|
|
self.client = httpx.Client(timeout=120.0)
|
|
self.converter = CustodianRDFConverter()
|
|
|
|
def check_connection(self) -> bool:
|
|
"""Check if Oxigraph is available."""
|
|
try:
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/query",
|
|
headers={"Content-Type": "application/sparql-query"},
|
|
content="ASK { ?s ?p ?o }"
|
|
)
|
|
return resp.status_code in (200, 204)
|
|
except Exception as e:
|
|
self.logger.error(f"Oxigraph connection failed: {e}")
|
|
return False
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get Oxigraph status."""
|
|
try:
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/query",
|
|
headers={
|
|
"Content-Type": "application/sparql-query",
|
|
"Accept": "application/sparql-results+json"
|
|
},
|
|
content="SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }"
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
count = int(data["results"]["bindings"][0]["count"]["value"])
|
|
return {"status": "healthy", "triple_count": count}
|
|
except Exception as e:
|
|
return {"status": "unavailable", "error": str(e)}
|
|
return {"status": "unknown"}
|
|
|
|
def clear_store(self) -> bool:
|
|
"""Clear all data from the store."""
|
|
try:
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/update",
|
|
headers={"Content-Type": "application/sparql-update"},
|
|
content="CLEAR DEFAULT"
|
|
)
|
|
return resp.status_code in (200, 204)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to clear store: {e}")
|
|
return False
|
|
|
|
def _upload_turtle(self, ttl_data: bytes) -> bool:
|
|
"""Upload Turtle data to Oxigraph."""
|
|
try:
|
|
resp = self.client.post(
|
|
f"{self.endpoint}/store?default",
|
|
headers={"Content-Type": "text/turtle"},
|
|
content=ttl_data
|
|
)
|
|
return resp.status_code in (200, 201, 204)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to upload data: {e}")
|
|
return False
|
|
|
|
def sync(self, limit: Optional[int] = None, dry_run: bool = False, clear: bool = True) -> SyncResult:
|
|
"""Sync all YAML files to Oxigraph."""
|
|
result = SyncResult(
|
|
database="oxigraph",
|
|
status=SyncStatus.IN_PROGRESS,
|
|
start_time=datetime.now(timezone.utc),
|
|
)
|
|
|
|
# Check connection
|
|
if not dry_run and not self.check_connection():
|
|
result.status = SyncStatus.FAILED
|
|
result.error_message = f"Cannot connect to Oxigraph at {self.endpoint}"
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
# Load YAML files
|
|
yaml_files = self._list_yaml_files()
|
|
if limit:
|
|
yaml_files = yaml_files[:limit]
|
|
|
|
self.progress.total_files = len(yaml_files)
|
|
self.progress.current_database = "oxigraph"
|
|
|
|
self.logger.info(f"Processing {len(yaml_files)} YAML files...")
|
|
|
|
# Clear existing data
|
|
if not dry_run and clear:
|
|
self.logger.info("Clearing existing data...")
|
|
self.clear_store()
|
|
|
|
# Process in batches
|
|
total_triples = 0
|
|
processed = 0
|
|
failed = 0
|
|
|
|
for i in range(0, len(yaml_files), self.batch_size):
|
|
batch = yaml_files[i:i + self.batch_size]
|
|
batch_num = i // self.batch_size + 1
|
|
|
|
self.converter.reset()
|
|
|
|
for filepath in batch:
|
|
self.progress.processed_files = processed + 1
|
|
self.progress.current_file = filepath.name
|
|
self._report_progress()
|
|
|
|
try:
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if data:
|
|
self.converter.add_custodian(data, filepath)
|
|
processed += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"Error processing {filepath.name}: {e}")
|
|
failed += 1
|
|
self.progress.errors.append(f"{filepath.name}: {str(e)}")
|
|
|
|
batch_triples = self.converter.triple_count()
|
|
total_triples += batch_triples
|
|
|
|
if not dry_run:
|
|
try:
|
|
ttl_data = self.converter.serialize("turtle")
|
|
if self._upload_turtle(ttl_data):
|
|
self.logger.info(f"Batch {batch_num}: loaded {len(batch)} files, {batch_triples:,} triples")
|
|
else:
|
|
self.logger.error(f"Batch {batch_num}: failed to upload")
|
|
failed += len(batch)
|
|
except Exception as e:
|
|
self.logger.error(f"Batch {batch_num}: serialization failed: {e}")
|
|
failed += len(batch)
|
|
else:
|
|
self.logger.info(f"Batch {batch_num}: parsed {len(batch)} files, {batch_triples:,} triples (dry-run)")
|
|
|
|
result.records_processed = len(yaml_files)
|
|
result.records_succeeded = processed
|
|
result.records_failed = failed
|
|
result.details["total_triples"] = total_triples
|
|
result.details["batch_size"] = self.batch_size
|
|
result.details["dry_run"] = dry_run
|
|
|
|
if failed == 0:
|
|
result.status = SyncStatus.SUCCESS
|
|
elif processed > 0:
|
|
result.status = SyncStatus.PARTIAL
|
|
else:
|
|
result.status = SyncStatus.FAILED
|
|
|
|
result.end_time = datetime.now(timezone.utc)
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Sync custodian YAML files to Oxigraph")
|
|
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload")
|
|
parser.add_argument("--limit", type=int, help="Limit number of files to process")
|
|
parser.add_argument("--endpoint", default=OXIGRAPH_URL, help="Oxigraph SPARQL endpoint URL")
|
|
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Files per batch")
|
|
parser.add_argument("--no-clear", action="store_true", help="Don't clear existing data")
|
|
args = parser.parse_args()
|
|
|
|
import logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
)
|
|
|
|
syncer = OxigraphSyncer(endpoint=args.endpoint, batch_size=args.batch_size)
|
|
|
|
print("=" * 60)
|
|
print("Oxigraph Sync")
|
|
print("=" * 60)
|
|
|
|
if not args.dry_run:
|
|
print(f"Checking connection to {args.endpoint}...")
|
|
status = syncer.get_status()
|
|
print(f" Status: {status.get('status', 'unknown')}")
|
|
if status.get('triple_count'):
|
|
print(f" Triple count: {status['triple_count']:,}")
|
|
|
|
result = syncer.sync(limit=args.limit, dry_run=args.dry_run, clear=not args.no_clear)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Sync Result: {result.status.value.upper()}")
|
|
print(f" Processed: {result.records_processed}")
|
|
print(f" Succeeded: {result.records_succeeded}")
|
|
print(f" Failed: {result.records_failed}")
|
|
print(f" Triples: {result.details.get('total_triples', 0):,}")
|
|
print(f" Duration: {result.duration_seconds:.2f}s")
|
|
if result.error_message:
|
|
print(f" Error: {result.error_message}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|