#!/usr/bin/env python3 """ Oxigraph Person Sync Module - Sync person JSON files to Oxigraph triplestore. This module syncs all person data (entity profiles and staff lists) to Oxigraph as RDF triples for SPARQL querying and graph-based retrieval. Usage: python -m scripts.sync.oxigraph_person_sync [--dry-run] [--limit N] [--endpoint URL] Data Sources: - data/custodian/person/entity/*.json - Individual person profile files - data/custodian/person/affiliated/parsed/*_staff_*.json - Staff list files """ import argparse import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import httpx import yaml from rdflib import Graph, Namespace, Literal, URIRef, BNode from rdflib.namespace import RDF, RDFS, XSD, SKOS, DCTERMS, FOAF, OWL # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from scripts.sync import BaseSyncer, SyncResult, SyncStatus, DEFAULT_CUSTODIAN_DIR # Configuration OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://localhost:7878") BATCH_SIZE = 500 # Data directories PERSON_ENTITY_DIR = PROJECT_ROOT / "data" / "custodian" / "person" / "entity" PERSON_AFFILIATED_DIR = PROJECT_ROOT / "data" / "custodian" / "person" / "affiliated" / "parsed" # Namespaces - Using nde.nl as canonical namespace (per LinkML schema) HC = Namespace("https://nde.nl/ontology/hc/") HP = Namespace("https://nde.nl/ontology/hc/person/") GHCID = Namespace("https://nde.nl/ontology/hc/") NDE = Namespace("https://nde.nl/ontology/hc/class/") ORG = Namespace("http://www.w3.org/ns/org#") RICO = Namespace("https://www.ica.org/standards/RiC/ontology#") SCHEMA = Namespace("http://schema.org/") CIDOC = Namespace("http://www.cidoc-crm.org/cidoc-crm/") PROV = Namespace("http://www.w3.org/ns/prov#") WD = Namespace("http://www.wikidata.org/entity/") WDT = Namespace("http://www.wikidata.org/prop/direct/") GEO = Namespace("http://www.w3.org/2003/01/geo/wgs84_pos#") PICO = Namespace("https://personsincontext.org/model#") PNV = Namespace("https://w3id.org/pnv#") def sanitize_for_uri(value: str) -> str: """Sanitize a string to be valid as a URI local name.""" if not value: return "unknown" sanitized = re.sub(r'[`\'\"<>{}|\\^~\[\]@#$%&*()+=,;!? ]', '_', value) sanitized = re.sub(r'_+', '_', sanitized) sanitized = sanitized.strip('_') return sanitized.lower() if sanitized else "unknown" def normalize_location(location) -> str: """Normalize location to string format. Handles both string format: "Amsterdam, North Holland, Netherlands" and dict format: {"city": "Amsterdam", "region": "North Holland", "country": "Netherlands"} """ if not location: return "" if isinstance(location, str): return location if isinstance(location, dict): parts = [] for key in ['city', 'region', 'country']: if location.get(key): parts.append(str(location[key])) return ", ".join(parts) return str(location) def extract_country_code(location) -> Optional[str]: """Extract country code from location string or dict. Handles: - String: 'Amsterdam, North Holland, Netherlands (NL)' - Dict: {"city": "Amsterdam", "region": "North Holland", "country": "Netherlands"} """ if not location: return None # Handle dict format if isinstance(location, dict): country = location.get('country', '') if country: location = country else: location = normalize_location(location) if not isinstance(location, str): location = str(location) # Try to find country code in parentheses match = re.search(r'\(([A-Z]{2})\)', location) if match: return match.group(1) # Common country mappings country_map = { 'netherlands': 'NL', 'nederland': 'NL', 'united states': 'US', 'usa': 'US', 'united kingdom': 'GB', 'uk': 'GB', 'england': 'GB', 'germany': 'DE', 'deutschland': 'DE', 'france': 'FR', 'belgium': 'BE', 'belgiƫ': 'BE', 'spain': 'ES', 'espaƱa': 'ES', 'italy': 'IT', 'italia': 'IT', 'australia': 'AU', 'canada': 'CA', 'japan': 'JP', 'china': 'CN', } lower_loc = location.lower() for country, code in country_map.items(): if country in lower_loc: return code return None class PersonRDFConverter: """Convert person JSON data to RDF triples.""" def __init__(self, custodian_lookup: dict[str, str] | None = None): self.graph = Graph() self._bind_namespaces() self.custodian_lookup = custodian_lookup or {} self._persons_added = 0 def _bind_namespaces(self) -> None: """Bind all ontology namespaces to the graph.""" self.graph.bind("hc", HC) self.graph.bind("hp", HP) self.graph.bind("ghcid", GHCID) self.graph.bind("nde", NDE) self.graph.bind("org", ORG) self.graph.bind("rico", RICO) self.graph.bind("schema", SCHEMA) self.graph.bind("cidoc", CIDOC) self.graph.bind("prov", PROV) self.graph.bind("wd", WD) self.graph.bind("wdt", WDT) self.graph.bind("geo", GEO) self.graph.bind("dcterms", DCTERMS) self.graph.bind("foaf", FOAF) self.graph.bind("skos", SKOS) self.graph.bind("owl", OWL) self.graph.bind("pico", PICO) self.graph.bind("pnv", PNV) def reset(self) -> None: """Reset the graph for new batch.""" self.graph = Graph() self._bind_namespaces() self._persons_added = 0 def _get_person_uri(self, data: dict, filepath: Path) -> URIRef: """Generate a URI for a person based on LinkedIn slug or staff_id.""" extraction_meta = data.get('extraction_metadata', {}) profile_data = data.get('profile_data', {}) # Try LinkedIn URL to get slug linkedin_url = ( extraction_meta.get('linkedin_url') or profile_data.get('linkedin_url') or '' ) if linkedin_url: # Extract slug from URL like https://www.linkedin.com/in/alex-brandsen match = re.search(r'linkedin\.com/in/([^/?]+)', linkedin_url) if match: slug = sanitize_for_uri(match.group(1)) return HP[slug] # Fallback to staff_id staff_id = extraction_meta.get('staff_id', '') if staff_id: return HP[sanitize_for_uri(staff_id)] # Last resort: use filename return HP[sanitize_for_uri(filepath.stem)] def _resolve_custodian_uri(self, custodian_name: str) -> URIRef | None: """Resolve custodian name to GHCID URI.""" if not custodian_name: return None # Try exact match first name_lower = custodian_name.lower() if name_lower in self.custodian_lookup: ghcid = self.custodian_lookup[name_lower] return GHCID[sanitize_for_uri(ghcid)] # Try slug form slug = name_lower.replace(' ', '-') if slug in self.custodian_lookup: ghcid = self.custodian_lookup[slug] return GHCID[sanitize_for_uri(ghcid)] # Try partial match for key, ghcid in self.custodian_lookup.items(): if name_lower in key or key in name_lower: return GHCID[sanitize_for_uri(ghcid)] return None def add_person_from_entity(self, data: dict, filepath: Path) -> URIRef | None: """Convert person entity JSON to RDF triples.""" extraction_meta = data.get('extraction_metadata', {}) source_staff = data.get('source_staff_info', {}) profile_data = data.get('profile_data', {}) heritage_rel = data.get('heritage_relevance', {}) person_data = data.get('person', {}) # Alternative schema variant heritage_profile = data.get('heritage_profile', {}) # Alternative schema # Get name (required) - handle multiple schema variants: # 1. profile_data.name / profile_data.full_name (standard schema) # 2. source_staff_info.name (from staff list extraction) # 3. person.full_name (alternative schema - exa_crawling_linkedin_profile) name = ( profile_data.get('name') or profile_data.get('full_name') or source_staff.get('name') or person_data.get('full_name') or person_data.get('name') ) if not name: return None # Create person URI person_uri = self._get_person_uri(data, filepath) # Type declarations self.graph.add((person_uri, RDF.type, SCHEMA.Person)) self.graph.add((person_uri, RDF.type, FOAF.Person)) self.graph.add((person_uri, RDF.type, PICO.PersonObservation)) self.graph.add((person_uri, RDF.type, CIDOC.E21_Person)) # Name self.graph.add((person_uri, SCHEMA.name, Literal(name))) self.graph.add((person_uri, FOAF.name, Literal(name))) self.graph.add((person_uri, RDFS.label, Literal(name))) self.graph.add((person_uri, SKOS.prefLabel, Literal(name))) # LinkedIn URL as sameAs - check multiple schema locations linkedin_url = ( extraction_meta.get('linkedin_url') or profile_data.get('linkedin_url') or person_data.get('linkedin_url') ) if linkedin_url: try: self.graph.add((person_uri, SCHEMA.sameAs, URIRef(linkedin_url))) self.graph.add((person_uri, OWL.sameAs, URIRef(linkedin_url))) except Exception: pass # Headline / job title - check multiple schema locations headline = ( profile_data.get('headline') or source_staff.get('headline') or person_data.get('headline') ) if headline: self.graph.add((person_uri, SCHEMA.jobTitle, Literal(headline))) self.graph.add((person_uri, SCHEMA.description, Literal(headline))) # Location (handle both string and dict formats) - check multiple schema locations location_raw = profile_data.get('location') or person_data.get('location') location = normalize_location(location_raw) if location: self.graph.add((person_uri, SCHEMA.workLocation, Literal(location))) country_code = extract_country_code(location_raw) if country_code: self.graph.add((person_uri, HC.countryCode, Literal(country_code))) # About / description - check multiple schema locations about = profile_data.get('about') or person_data.get('about') if about: self.graph.add((person_uri, SCHEMA.disambiguatingDescription, Literal(about))) # Profile image - check multiple schema locations profile_image = ( profile_data.get('profile_image_url') or person_data.get('photo_url') ) if profile_image: try: self.graph.add((person_uri, SCHEMA.image, URIRef(profile_image))) self.graph.add((person_uri, FOAF.img, URIRef(profile_image))) except Exception: pass # Heritage relevance - check multiple schema locations is_heritage_relevant = ( heritage_rel.get('is_heritage_relevant', True) or heritage_profile.get('is_heritage_professional', True) ) self.graph.add((person_uri, HC.heritageRelevant, Literal(is_heritage_relevant, datatype=XSD.boolean))) heritage_types = ( heritage_rel.get('heritage_types', []) or heritage_profile.get('heritage_types', []) ) if not heritage_types and source_staff.get('heritage_type'): heritage_types = [source_staff.get('heritage_type')] for ht in heritage_types: if ht: self.graph.add((person_uri, HC.heritageType, Literal(ht))) # Link to custodian (employer) - check multiple schema locations custodian_name = ( source_staff.get('custodian') or data.get('network_context', {}).get('source_custodian') ) if custodian_name: custodian_uri = self._resolve_custodian_uri(custodian_name) if custodian_uri: self.graph.add((person_uri, SCHEMA.worksFor, custodian_uri)) self.graph.add((person_uri, ORG.memberOf, custodian_uri)) # Also store as literal for cases where lookup fails self.graph.add((person_uri, HC.custodianName, Literal(custodian_name))) # Skills (handle null values) - check multiple schema locations skills = profile_data.get('skills') or data.get('skills') or [] for skill in skills: if skill: self.graph.add((person_uri, SCHEMA.knowsAbout, Literal(skill))) # Languages (handle null values and dict format) for lang in (profile_data.get('languages') or []): if isinstance(lang, dict): lang_name = lang.get('language', '') else: lang_name = str(lang) if lang_name: self.graph.add((person_uri, SCHEMA.knowsLanguage, Literal(lang_name))) # Experience (as structured data) - handle multiple schema variants: # 1. profile_data.experience / profile_data.career_history (standard) # 2. professional_experience (alternative schema) experience = ( profile_data.get('experience') or profile_data.get('career_history') or data.get('professional_experience') or [] ) if experience: for i, exp in enumerate(experience): if not isinstance(exp, dict): continue exp_node = BNode() self.graph.add((exp_node, RDF.type, SCHEMA.WorkExperience)) if exp.get('title'): self.graph.add((exp_node, SCHEMA.roleName, Literal(exp['title']))) # Handle both 'company' (Schema A) and 'organization' (Schema B) company = exp.get('company') or exp.get('organization') if company: self.graph.add((exp_node, SCHEMA.name, Literal(company))) # Handle both date_range and duration fields, also construct from start_date/end_date duration = exp.get('date_range') or exp.get('duration') if not duration and exp.get('start_date'): end_date = exp.get('end_date', 'Present') duration = f"{exp['start_date']} - {end_date}" if duration: self.graph.add((exp_node, SCHEMA.temporal, Literal(duration))) # Handle location (normalize if dict) exp_location = exp.get('location') if exp_location: exp_location_str = normalize_location(exp_location) if isinstance(exp_location, dict) else str(exp_location) self.graph.add((exp_node, SCHEMA.workLocation, Literal(exp_location_str))) if exp.get('description'): self.graph.add((exp_node, SCHEMA.description, Literal(exp['description']))) self.graph.add((person_uri, SCHEMA.hasOccupation, exp_node)) # Education (handle null and both schema variants) # Schema A: profile_data.education, Schema B: root education education = profile_data.get('education') or data.get('education') or [] if education: for edu in education: if not isinstance(edu, dict): continue edu_node = BNode() self.graph.add((edu_node, RDF.type, SCHEMA.EducationalOccupationalCredential)) if edu.get('degree'): self.graph.add((edu_node, SCHEMA.name, Literal(edu['degree']))) if edu.get('institution'): self.graph.add((edu_node, SCHEMA.educationalCredentialAwarded, Literal(edu['institution']))) # Handle field_of_study (variant B) or field (variant A) field = edu.get('field_of_study') or edu.get('field') if field: self.graph.add((edu_node, SCHEMA.educationalProgramName, Literal(field))) # Handle date_range, years, or start_year/end_year date_range = edu.get('date_range') or edu.get('years') if not date_range and edu.get('start_year'): end_year = edu.get('end_year', '') date_range = f"{edu['start_year']} - {end_year}" if end_year else str(edu['start_year']) if date_range: self.graph.add((edu_node, SCHEMA.temporal, Literal(date_range))) self.graph.add((person_uri, SCHEMA.alumniOf, edu_node)) # Provenance extraction_date = extraction_meta.get('extraction_date') if extraction_date: self.graph.add((person_uri, PROV.generatedAtTime, Literal(extraction_date, datatype=XSD.dateTime))) extraction_method = extraction_meta.get('extraction_method') if extraction_method: self.graph.add((person_uri, PROV.wasGeneratedBy, Literal(extraction_method))) staff_id = extraction_meta.get('staff_id') if staff_id: self.graph.add((person_uri, DCTERMS.identifier, Literal(staff_id))) self.graph.add((person_uri, HC.staffId, Literal(staff_id))) self._persons_added += 1 return person_uri def add_persons_from_staff_list(self, data: dict, filepath: Path) -> list[URIRef]: """Convert staff list JSON to RDF triples for each person.""" uris = [] custodian_meta = data.get('custodian_metadata', {}) custodian_name = custodian_meta.get('custodian_name', '') custodian_slug = custodian_meta.get('custodian_slug', '') # Resolve custodian URI once for all staff custodian_uri = self._resolve_custodian_uri(custodian_name) or self._resolve_custodian_uri(custodian_slug) staff = data.get('staff', []) for person in staff: if not isinstance(person, dict): continue name = person.get('name') if not name: continue # Create person URI from linkedin_slug or staff_id linkedin_slug = person.get('linkedin_slug') staff_id = person.get('staff_id') if linkedin_slug: person_uri = HP[sanitize_for_uri(linkedin_slug)] elif staff_id: person_uri = HP[sanitize_for_uri(staff_id)] else: # Generate from name + custodian slug = sanitize_for_uri(f"{custodian_slug}_{name}") person_uri = HP[slug] # Type declarations self.graph.add((person_uri, RDF.type, SCHEMA.Person)) self.graph.add((person_uri, RDF.type, FOAF.Person)) self.graph.add((person_uri, RDF.type, PICO.PersonObservation)) # Name self.graph.add((person_uri, SCHEMA.name, Literal(name))) self.graph.add((person_uri, FOAF.name, Literal(name))) self.graph.add((person_uri, RDFS.label, Literal(name))) # LinkedIn URL linkedin_url = person.get('linkedin_profile_url') if linkedin_url: try: self.graph.add((person_uri, SCHEMA.sameAs, URIRef(linkedin_url))) except Exception: pass # Headline headline = person.get('headline') if headline: self.graph.add((person_uri, SCHEMA.jobTitle, Literal(headline))) # Heritage relevance heritage_relevant = person.get('heritage_relevant', True) self.graph.add((person_uri, HC.heritageRelevant, Literal(heritage_relevant, datatype=XSD.boolean))) heritage_type = person.get('heritage_type') if heritage_type: self.graph.add((person_uri, HC.heritageType, Literal(heritage_type))) # Link to custodian if custodian_uri: self.graph.add((person_uri, SCHEMA.worksFor, custodian_uri)) self.graph.add((person_uri, ORG.memberOf, custodian_uri)) if custodian_name: self.graph.add((person_uri, HC.custodianName, Literal(custodian_name))) # Staff ID if staff_id: self.graph.add((person_uri, HC.staffId, Literal(staff_id))) self.graph.add((person_uri, DCTERMS.identifier, Literal(staff_id))) uris.append(person_uri) self._persons_added += 1 return uris def serialize(self, format: str = "turtle") -> bytes: """Serialize graph to bytes.""" return self.graph.serialize(format=format).encode("utf-8") def triple_count(self) -> int: """Get number of triples in graph.""" return len(self.graph) def persons_added(self) -> int: """Get number of persons added.""" return self._persons_added CACHE_FILE = PROJECT_ROOT / "data" / ".custodian_lookup_cache.json" CACHE_MAX_AGE_HOURS = 24 def build_custodian_lookup(custodian_dir: Path = DEFAULT_CUSTODIAN_DIR, use_cache: bool = True) -> dict[str, str]: """ Build custodian lookup table using filename-based extraction. This is MUCH faster than YAML parsing (27k files in <1s vs 2+ minutes). Uses GHCID from filename directly - custodian YAMLs are named by GHCID. """ lookup = {} # Try cache first if use_cache and CACHE_FILE.exists(): try: cache_stat = CACHE_FILE.stat() cache_age_hours = (datetime.now().timestamp() - cache_stat.st_mtime) / 3600 if cache_age_hours < CACHE_MAX_AGE_HOURS: with open(CACHE_FILE, 'r') as f: cached = json.load(f) return cached.get('lookup', {}) except Exception: pass if not custodian_dir.exists(): return lookup # Fast extraction: GHCID from filename # Files are named like: NL-NH-AMS-M-RM.yaml or NL-NH-AMS-M-RM-rijksmuseum.yaml yaml_files = list(custodian_dir.glob("*.yaml")) for yaml_file in yaml_files: stem = yaml_file.stem # e.g., "NL-NH-AMS-M-RM" or "NL-NH-AMS-M-RM-rijksmuseum" # Extract base GHCID (before any name suffix after 5th hyphen) parts = stem.split('-') if len(parts) >= 5: # Standard GHCID: CC-RR-CCC-T-ABB base_ghcid = '-'.join(parts[:5]) # Add both full stem and base GHCID to lookup lookup[stem.lower()] = stem lookup[base_ghcid.lower()] = stem # If there's a name suffix, also index by that if len(parts) > 5: name_suffix = '-'.join(parts[5:]) lookup[name_suffix.lower()] = stem # Also with underscores replaced by spaces lookup[name_suffix.replace('_', ' ').lower()] = stem # Save cache if use_cache: try: CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) with open(CACHE_FILE, 'w') as f: json.dump({ 'lookup': lookup, 'generated_at': datetime.now(timezone.utc).isoformat(), 'file_count': len(yaml_files) }, f) except Exception: pass return lookup class OxigraphPersonSyncer(BaseSyncer): """Sync person JSON files to Oxigraph triplestore.""" database_name = "oxigraph_persons" def __init__( self, endpoint: str = OXIGRAPH_URL, batch_size: int = BATCH_SIZE, entity_dir: Path = PERSON_ENTITY_DIR, affiliated_dir: Path = PERSON_AFFILIATED_DIR, **kwargs ): super().__init__(**kwargs) self.endpoint = endpoint.rstrip("/") self.batch_size = batch_size self.entity_dir = entity_dir self.affiliated_dir = affiliated_dir self.client = httpx.Client(timeout=120.0) # Build custodian lookup self.logger.info("Building custodian lookup table...") self.custodian_lookup = build_custodian_lookup() self.logger.info(f"Loaded {len(self.custodian_lookup)} custodian name mappings") self.converter = PersonRDFConverter(custodian_lookup=self.custodian_lookup) def check_connection(self) -> bool: """Check if Oxigraph is available.""" try: resp = self.client.post( f"{self.endpoint}/query", headers={"Content-Type": "application/sparql-query"}, content="ASK { ?s ?p ?o }" ) return resp.status_code in (200, 204) except Exception as e: self.logger.error(f"Oxigraph connection failed: {e}") return False def get_status(self) -> dict: """Get Oxigraph status for person data.""" try: # Count person triples specifically resp = self.client.post( f"{self.endpoint}/query", headers={ "Content-Type": "application/sparql-query", "Accept": "application/sparql-results+json" }, content=""" PREFIX schema: PREFIX hp: SELECT (COUNT(DISTINCT ?person) AS ?count) WHERE { ?person a schema:Person . FILTER(STRSTARTS(STR(?person), STR(hp:))) } """ ) if resp.status_code == 200: data = resp.json() count = int(data["results"]["bindings"][0]["count"]["value"]) return {"status": "healthy", "person_count": count} except Exception as e: return {"status": "unavailable", "error": str(e)} return {"status": "unknown"} def _upload_turtle(self, ttl_data: bytes) -> bool: """Upload Turtle data to Oxigraph.""" try: resp = self.client.post( f"{self.endpoint}/store?default", headers={"Content-Type": "text/turtle"}, content=ttl_data ) return resp.status_code in (200, 201, 204) except Exception as e: self.logger.error(f"Failed to upload data: {e}") return False def _list_entity_files(self) -> list[Path]: """List all person entity JSON files.""" if not self.entity_dir.exists(): self.logger.warning(f"Entity directory not found: {self.entity_dir}") return [] return sorted(self.entity_dir.glob("*.json")) def _list_staff_files(self) -> list[Path]: """List all staff list JSON files.""" if not self.affiliated_dir.exists(): self.logger.warning(f"Affiliated directory not found: {self.affiliated_dir}") return [] return sorted(self.affiliated_dir.glob("*_staff_*.json")) def sync( self, limit: Optional[int] = None, dry_run: bool = False, clear: bool = False, entity_only: bool = False, staff_only: bool = False, ) -> SyncResult: """Sync all person JSON files to Oxigraph.""" result = SyncResult( database="oxigraph_persons", status=SyncStatus.IN_PROGRESS, start_time=datetime.now(timezone.utc), ) # Check connection if not dry_run and not self.check_connection(): result.status = SyncStatus.FAILED result.error_message = f"Cannot connect to Oxigraph at {self.endpoint}" result.end_time = datetime.now(timezone.utc) return result # Gather files to process files_to_process = [] if not staff_only: entity_files = self._list_entity_files() files_to_process.extend([('entity', f) for f in entity_files]) self.logger.info(f"Found {len(entity_files)} entity files") if not entity_only: staff_files = self._list_staff_files() files_to_process.extend([('staff', f) for f in staff_files]) self.logger.info(f"Found {len(staff_files)} staff files") if limit: files_to_process = files_to_process[:limit] self.progress.total_files = len(files_to_process) self.progress.current_database = "oxigraph_persons" self.logger.info(f"Processing {len(files_to_process)} files...") # Clear existing person data (optional) if not dry_run and clear: self.logger.info("Note: Use SPARQL DELETE to clear person data if needed") # Process in batches total_triples = 0 total_persons = 0 processed = 0 failed = 0 for i in range(0, len(files_to_process), self.batch_size): batch = files_to_process[i:i + self.batch_size] batch_num = i // self.batch_size + 1 self.converter.reset() for file_type, filepath in batch: self.progress.processed_files = processed + 1 self.progress.current_file = filepath.name self._report_progress() try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) if not data: failed += 1 continue if file_type == 'entity': uri = self.converter.add_person_from_entity(data, filepath) if uri: processed += 1 else: self.logger.debug(f"No person extracted from entity: {filepath.name}") failed += 1 else: # staff uris = self.converter.add_persons_from_staff_list(data, filepath) # Empty staff list is not a failure - it's just a file with no persons # (e.g., LinkedIn company page with 0 visible employees) if uris is not None: # Explicitly check for None (error) vs [] (empty) processed += 1 else: self.logger.debug(f"Failed to process staff file: {filepath.name}") failed += 1 except json.JSONDecodeError as e: self.logger.warning(f"JSON decode error in {filepath.name}: {e}") failed += 1 self.progress.errors.append(f"{filepath.name}: JSON decode error") except Exception as e: self.logger.warning(f"Error processing {filepath.name}: {e}") failed += 1 self.progress.errors.append(f"{filepath.name}: {str(e)}") batch_triples = self.converter.triple_count() batch_persons = self.converter.persons_added() total_triples += batch_triples total_persons += batch_persons if not dry_run: try: ttl_data = self.converter.serialize("turtle") if self._upload_turtle(ttl_data): self.logger.info( f"Batch {batch_num}: loaded {len(batch)} files, " f"{batch_persons} persons, {batch_triples:,} triples" ) else: self.logger.error(f"Batch {batch_num}: failed to upload") failed += len(batch) except Exception as e: self.logger.error(f"Batch {batch_num}: serialization failed: {e}") failed += len(batch) else: self.logger.info( f"Batch {batch_num}: parsed {len(batch)} files, " f"{batch_persons} persons, {batch_triples:,} triples (dry-run)" ) result.records_processed = len(files_to_process) result.records_succeeded = processed result.records_failed = failed result.details["total_triples"] = total_triples result.details["total_persons"] = total_persons result.details["batch_size"] = self.batch_size result.details["dry_run"] = dry_run result.details["custodian_mappings"] = len(self.custodian_lookup) if failed == 0: result.status = SyncStatus.SUCCESS elif processed > 0: result.status = SyncStatus.PARTIAL else: result.status = SyncStatus.FAILED result.end_time = datetime.now(timezone.utc) return result def main(): parser = argparse.ArgumentParser(description="Sync person JSON files to Oxigraph") parser.add_argument("--dry-run", action="store_true", help="Parse files but don't upload") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument("--endpoint", default=OXIGRAPH_URL, help="Oxigraph SPARQL endpoint URL") parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Files per batch") parser.add_argument("--entity-only", action="store_true", help="Only process entity files") parser.add_argument("--staff-only", action="store_true", help="Only process staff files") parser.add_argument("--clear", action="store_true", help="Clear existing person data first") args = parser.parse_args() import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) syncer = OxigraphPersonSyncer(endpoint=args.endpoint, batch_size=args.batch_size) print("=" * 60) print("Oxigraph Person Sync") print("=" * 60) if not args.dry_run: print(f"Checking connection to {args.endpoint}...") status = syncer.get_status() print(f" Status: {status.get('status', 'unknown')}") if status.get('person_count'): print(f" Current persons: {status['person_count']:,}") print(f" Custodian mappings: {len(syncer.custodian_lookup):,}") result = syncer.sync( limit=args.limit, dry_run=args.dry_run, clear=args.clear, entity_only=args.entity_only, staff_only=args.staff_only, ) print("\n" + "=" * 60) print(f"Sync Result: {result.status.value.upper()}") print(f" Files processed: {result.records_processed}") print(f" Files succeeded: {result.records_succeeded}") print(f" Files failed: {result.records_failed}") print(f" Persons synced: {result.details.get('total_persons', 0):,}") print(f" Triples: {result.details.get('total_triples', 0):,}") print(f" Duration: {result.duration_seconds:.2f}s") if result.error_message: print(f" Error: {result.error_message}") print("=" * 60) if __name__ == "__main__": main()