""" LinkML Schema Loader for DSPy Heritage RAG Loads and parses LinkML schema files to provide schema-aware context for DSPy signatures and RAG pipeline components. The loader extracts: - Class definitions with descriptions and ontology mappings - Slot definitions with URIs and ranges - Enum values for controlled vocabularies - Prefix mappings for SPARQL generation This enables: 1. Dynamic schema context injection into DSPy signatures 2. Schema-validated entity extraction 3. Ontology-aligned SPARQL generation 4. Rich answer synthesis with correct ontology terms """ from __future__ import annotations import logging from dataclasses import dataclass, field from functools import lru_cache from pathlib import Path from typing import Any, Optional import yaml logger = logging.getLogger(__name__) # Default schema directory SCHEMA_BASE_DIR = Path(__file__).parent.parent.parent / "schemas" / "20251121" / "linkml" @dataclass class OntologyPrefix: """An ontology prefix mapping.""" prefix: str uri: str description: Optional[str] = None @dataclass class SlotDefinition: """A slot (property) definition from LinkML schema.""" name: str slot_uri: Optional[str] = None range: Optional[str] = None description: Optional[str] = None required: bool = False multivalued: bool = False exact_mappings: list[str] = field(default_factory=list) close_mappings: list[str] = field(default_factory=list) examples: list[dict] = field(default_factory=list) @dataclass class EnumValue: """A permissible value in an enum.""" name: str description: Optional[str] = None meaning: Optional[str] = None # Wikidata mapping comments: list[str] = field(default_factory=list) @dataclass class EnumDefinition: """An enum definition from LinkML schema.""" name: str description: Optional[str] = None values: list[EnumValue] = field(default_factory=list) @dataclass class ClassDefinition: """A class definition from LinkML schema.""" name: str class_uri: Optional[str] = None description: Optional[str] = None is_a: Optional[str] = None slots: list[str] = field(default_factory=list) exact_mappings: list[str] = field(default_factory=list) close_mappings: list[str] = field(default_factory=list) narrow_mappings: list[str] = field(default_factory=list) @dataclass class HeritageSchema: """Complete parsed heritage custodian schema.""" # Core schema metadata name: str version: str description: str # Ontology prefixes prefixes: dict[str, OntologyPrefix] = field(default_factory=dict) # Classes classes: dict[str, ClassDefinition] = field(default_factory=dict) # Slots (properties) slots: dict[str, SlotDefinition] = field(default_factory=dict) # Enums enums: dict[str, EnumDefinition] = field(default_factory=dict) # Custodian types (from CustodianPrimaryTypeEnum) custodian_types: list[EnumValue] = field(default_factory=list) def get_sparql_prefixes(self) -> str: """Generate SPARQL prefix declarations from schema prefixes.""" lines = [] for prefix, info in self.prefixes.items(): lines.append(f"PREFIX {prefix}: <{info.uri}>") return "\n".join(lines) def get_custodian_type_names(self) -> list[str]: """Get list of custodian type enum values.""" return [v.name for v in self.custodian_types] def get_class_description(self, class_name: str) -> Optional[str]: """Get description for a class.""" cls = self.classes.get(class_name) return cls.description if cls else None def get_slot_uri(self, slot_name: str) -> Optional[str]: """Get the slot URI for a slot name.""" slot = self.slots.get(slot_name) return slot.slot_uri if slot else None def format_entity_types_for_prompt(self) -> str: """Format custodian types for DSPy prompt injection.""" lines = ["Heritage Custodian Types (GLAMORCUBESFIXPHDNT taxonomy):"] for ct in self.custodian_types: desc = ct.description.split("(")[0].strip() if ct.description else ct.name lines.append(f" - {ct.name}: {desc}") return "\n".join(lines) def format_key_properties_for_prompt(self) -> str: """Format key properties for DSPy prompt injection.""" key_slots = [ "hc_id", "preferred_label", "custodian_type", "legal_status", "place_designation", "has_collection", "identifiers", "organizational_structure", "encompassing_body" ] lines = ["Key Properties:"] for slot_name in key_slots: slot = self.slots.get(slot_name) if slot: uri = slot.slot_uri or f"hc:{slot_name}" desc = (slot.description or "").split("\n")[0][:80] lines.append(f" - {uri}: {desc}") return "\n".join(lines) def format_ontology_context_for_prompt(self) -> str: """Format complete ontology context for DSPy prompts.""" sections = [ "=" * 60, "HERITAGE CUSTODIAN ONTOLOGY CONTEXT", "=" * 60, "", "Hub Architecture:", " - Custodian (crm:E39_Actor): Central hub entity", " - CustodianObservation: Evidence from sources", " - CustodianName: Standardized emic names", " - CustodianLegalStatus: Formal legal entity", " - CustodianPlace: Geographic location", " - CustodianCollection: Heritage collections", "", self.format_entity_types_for_prompt(), "", self.format_key_properties_for_prompt(), "", "Key Ontology Prefixes:", ] for prefix, info in list(self.prefixes.items())[:12]: # Top 12 prefixes sections.append(f" PREFIX {prefix}: <{info.uri}>") sections.extend([ "", "=" * 60, ]) return "\n".join(sections) class SchemaLoader: """ Loads and parses LinkML schema files for the Heritage Custodian Ontology. Usage: loader = SchemaLoader() schema = loader.load() # Get SPARQL prefixes prefixes = schema.get_sparql_prefixes() # Get custodian types for entity extraction types = schema.get_custodian_type_names() # Get prompt context context = schema.format_ontology_context_for_prompt() """ def __init__(self, schema_dir: Optional[Path] = None): """Initialize schema loader. Args: schema_dir: Path to LinkML schema directory. Defaults to schemas/20251121/linkml/ """ self.schema_dir = schema_dir or SCHEMA_BASE_DIR self._schema: Optional[HeritageSchema] = None def load(self, force_reload: bool = False) -> HeritageSchema: """Load and parse the complete schema. Args: force_reload: Force reload even if cached Returns: Parsed HeritageSchema object """ if self._schema is not None and not force_reload: return self._schema logger.info(f"Loading LinkML schema from {self.schema_dir}") # Load main schema file main_schema_path = self.schema_dir / "01_custodian_name_modular.yaml" if not main_schema_path.exists(): raise FileNotFoundError(f"Main schema not found: {main_schema_path}") with open(main_schema_path, "r", encoding="utf-8") as f: main_schema = yaml.safe_load(f) # Initialize schema object schema = HeritageSchema( name=main_schema.get("name", "heritage_custodian_ontology"), version=main_schema.get("version", "0.9.9"), description=main_schema.get("description", ""), ) # Load prefixes from Custodian class (has the most complete set) schema.prefixes = self._load_prefixes() # Load custodian types enum schema.custodian_types = self._load_custodian_types() schema.enums["CustodianPrimaryTypeEnum"] = EnumDefinition( name="CustodianPrimaryTypeEnum", description="GLAMORCUBESFIXPHDNT Primary Type Categories", values=schema.custodian_types, ) # Load key classes schema.classes = self._load_key_classes() # Load key slots schema.slots = self._load_key_slots() self._schema = schema logger.info(f"Loaded schema with {len(schema.classes)} classes, " f"{len(schema.slots)} slots, {len(schema.custodian_types)} custodian types") return schema def _load_prefixes(self) -> dict[str, OntologyPrefix]: """Load ontology prefixes from Custodian class file.""" prefixes = {} # Default prefixes from main schema and Custodian class default_prefixes = { "linkml": "https://w3id.org/linkml/", "hc": "https://nde.nl/ontology/hc/", "crm": "http://www.cidoc-crm.org/cidoc-crm/", "prov": "http://www.w3.org/ns/prov#", "schema": "http://schema.org/", "cpov": "http://data.europa.eu/m8g/", "rico": "https://www.ica.org/standards/RiC/ontology#", "foaf": "http://xmlns.com/foaf/0.1/", "tooi": "https://identifier.overheid.nl/tooi/def/ont/", "org": "http://www.w3.org/ns/org#", "skos": "http://www.w3.org/2004/02/skos/core#", "dcterms": "http://purl.org/dc/terms/", "dct": "http://purl.org/dc/terms/", "wdt": "http://www.wikidata.org/prop/direct/", "wikidata": "http://www.wikidata.org/entity/", "geo": "http://www.opengis.net/ont/geosparql#", "geof": "http://www.opengis.net/def/function/geosparql/", "ghcid": "https://w3id.org/heritage/custodian/", "sosa": "http://www.w3.org/ns/sosa/", } # Try to load from Custodian.yaml for additional prefixes custodian_path = self.schema_dir / "modules" / "classes" / "Custodian.yaml" if custodian_path.exists(): try: with open(custodian_path, "r", encoding="utf-8") as f: custodian_yaml = yaml.safe_load(f) if "prefixes" in custodian_yaml: default_prefixes.update(custodian_yaml["prefixes"]) except Exception as e: logger.warning(f"Could not load prefixes from Custodian.yaml: {e}") for prefix, uri in default_prefixes.items(): prefixes[prefix] = OntologyPrefix(prefix=prefix, uri=uri) return prefixes def _load_custodian_types(self) -> list[EnumValue]: """Load CustodianPrimaryTypeEnum values.""" enum_path = self.schema_dir / "modules" / "enums" / "CustodianPrimaryTypeEnum.yaml" if not enum_path.exists(): logger.warning(f"CustodianPrimaryTypeEnum not found: {enum_path}") return [] with open(enum_path, "r", encoding="utf-8") as f: enum_yaml = yaml.safe_load(f) values = [] enum_def = enum_yaml.get("enums", {}).get("CustodianPrimaryTypeEnum", {}) permissible_values = enum_def.get("permissible_values", {}) for name, info in permissible_values.items(): values.append(EnumValue( name=name, description=info.get("description"), meaning=info.get("meaning"), comments=info.get("comments", []), )) return values def _load_key_classes(self) -> dict[str, ClassDefinition]: """Load key class definitions.""" classes = {} # Key classes to load key_class_files = [ "Custodian.yaml", "CustodianName.yaml", "CustodianObservation.yaml", "CustodianLegalStatus.yaml", "CustodianPlace.yaml", "CustodianCollection.yaml", "Identifier.yaml", "TimeSpan.yaml", "OrganizationalStructure.yaml", "EncompassingBody.yaml", ] classes_dir = self.schema_dir / "modules" / "classes" for filename in key_class_files: filepath = classes_dir / filename if not filepath.exists(): continue try: with open(filepath, "r", encoding="utf-8") as f: class_yaml = yaml.safe_load(f) # Find class definition in the YAML class_defs = class_yaml.get("classes", {}) for class_name, class_info in class_defs.items(): classes[class_name] = ClassDefinition( name=class_name, class_uri=class_info.get("class_uri"), description=class_info.get("description"), is_a=class_info.get("is_a"), slots=class_info.get("slots", []), exact_mappings=class_info.get("exact_mappings", []), close_mappings=class_info.get("close_mappings", []), narrow_mappings=class_info.get("narrow_mappings", []), ) except Exception as e: logger.warning(f"Could not load class from {filepath}: {e}") return classes def _load_key_slots(self) -> dict[str, SlotDefinition]: """Load key slot definitions.""" slots = {} # Key slots to load key_slot_files = [ "hc_id.yaml", "preferred_label.yaml", "custodian_type.yaml", "legal_status.yaml", "place_designation.yaml", "has_collection.yaml", "identifiers.yaml", "organizational_structure.yaml", "encompassing_body.yaml", "identifier_scheme.yaml", "identifier_value.yaml", "observed_name.yaml", "emic_name.yaml", "valid_from.yaml", "valid_to.yaml", ] slots_dir = self.schema_dir / "modules" / "slots" for filename in key_slot_files: filepath = slots_dir / filename if not filepath.exists(): continue try: with open(filepath, "r", encoding="utf-8") as f: slot_yaml = yaml.safe_load(f) # Find slot definition in the YAML slot_defs = slot_yaml.get("slots", {}) for slot_name, slot_info in slot_defs.items(): slots[slot_name] = SlotDefinition( name=slot_name, slot_uri=slot_info.get("slot_uri"), range=slot_info.get("range"), description=slot_info.get("description"), required=slot_info.get("required", False), multivalued=slot_info.get("multivalued", False), exact_mappings=slot_info.get("exact_mappings", []), close_mappings=slot_info.get("close_mappings", []), examples=slot_info.get("examples", []), ) except Exception as e: logger.warning(f"Could not load slot from {filepath}: {e}") return slots # Singleton instance for easy access _schema_loader: Optional[SchemaLoader] = None def get_schema_loader() -> SchemaLoader: """Get singleton schema loader instance.""" global _schema_loader if _schema_loader is None: _schema_loader = SchemaLoader() return _schema_loader @lru_cache(maxsize=1) def get_heritage_schema() -> HeritageSchema: """Get cached heritage schema (loaded once).""" loader = get_schema_loader() return loader.load() # Convenience functions for common operations def get_sparql_prefixes() -> str: """Get SPARQL prefix declarations from schema.""" return get_heritage_schema().get_sparql_prefixes() def get_custodian_types() -> list[str]: """Get list of valid custodian type names.""" return get_heritage_schema().get_custodian_type_names() def get_ontology_context() -> str: """Get formatted ontology context for DSPy prompts.""" return get_heritage_schema().format_ontology_context_for_prompt() def get_entity_types_prompt() -> str: """Get formatted entity types for DSPy entity extraction.""" return get_heritage_schema().format_entity_types_for_prompt() def get_key_properties_prompt() -> str: """Get formatted key properties for DSPy prompts.""" return get_heritage_schema().format_key_properties_for_prompt() # ============================================================================= # Schema-Aware Signature Helpers # ============================================================================= def create_schema_aware_sparql_docstring() -> str: """Create docstring for SPARQL generator with schema-derived prefixes.""" schema = get_heritage_schema() # Build prefix section prefix_lines = [] for prefix, info in list(schema.prefixes.items())[:15]: # Top 15 prefix_lines.append(f" - PREFIX {prefix}: <{info.uri}>") # Build class section class_lines = [] for cls_name, cls_def in schema.classes.items(): uri = cls_def.class_uri or f"hc:{cls_name}" desc = (cls_def.description or "").split("\n")[0][:60] class_lines.append(f" - {uri} ({cls_name}): {desc}") # Build property section prop_lines = [] for slot_name, slot_def in list(schema.slots.items())[:10]: uri = slot_def.slot_uri or f"hc:{slot_name}" desc = (slot_def.description or "").split("\n")[0][:60] prop_lines.append(f" - {uri}: {desc}") docstring = f"""Generate SPARQL queries for heritage custodian knowledge graph. You are an expert in SPARQL and the Heritage Custodian Ontology (v{schema.version}). Generate valid SPARQL queries that work with our Oxigraph endpoint. Ontology Prefixes (MUST USE THESE EXACT URIs): {chr(10).join(prefix_lines)} Key Classes: {chr(10).join(class_lines[:8])} Key Properties: {chr(10).join(prop_lines)} Hub Architecture: - Custodian (crm:E39_Actor) is the central hub entity - CustodianObservation contains evidence from sources - CustodianName holds standardized emic names - CustodianLegalStatus holds formal legal entity info - CustodianPlace holds geographic location - CustodianCollection holds heritage collections """ return docstring def create_schema_aware_entity_docstring() -> str: """Create docstring for entity extractor with schema-derived types.""" schema = get_heritage_schema() type_lines = [] for ct in schema.custodian_types: # Extract first part of description desc = ct.description.split("(")[0].strip() if ct.description else ct.name type_lines.append(f" - {ct.name}: {desc}") docstring = f"""Extract heritage-specific entities from text. Identify institutions, places, dates, identifiers, and relationships following the Heritage Custodian Ontology (v{schema.version}). Institution Type Classification (GLAMORCUBESFIXPHDNT taxonomy): {chr(10).join(type_lines)} Entity Types to Extract: - INSTITUTIONS: Heritage custodians with type classification - PLACES: Geographic locations (cities, regions, countries) - TEMPORAL: Dates and time periods (founding, closure, events) - IDENTIFIERS: ISIL codes (NL-XXXX), Wikidata IDs (Q12345), GHCIDs Map institution mentions to appropriate GLAMORCUBESFIXPHDNT type: - "museum", "musea", "museo" → MUSEUM - "library", "bibliotheek", "bibliothek" → LIBRARY - "archive", "archief", "archiv" → ARCHIVE - "gallery", "galerie" → GALLERY - "university", "universiteit" → EDUCATION_PROVIDER - "botanical garden", "zoo" → BIO_CUSTODIAN - "church", "monastery", "temple" → HOLY_SACRED_SITE """ return docstring if __name__ == "__main__": # Test the schema loader logging.basicConfig(level=logging.INFO) schema = get_heritage_schema() print("\n=== SCHEMA LOADED ===") print(f"Name: {schema.name}") print(f"Version: {schema.version}") print(f"Classes: {len(schema.classes)}") print(f"Slots: {len(schema.slots)}") print(f"Custodian Types: {len(schema.custodian_types)}") print("\n=== SPARQL PREFIXES ===") print(schema.get_sparql_prefixes()) print("\n=== CUSTODIAN TYPES ===") for ct in schema.custodian_types[:5]: desc = ct.description[:60] if ct.description else "(no description)" print(f" - {ct.name}: {desc}...") print("\n=== ONTOLOGY CONTEXT (for DSPy) ===") print(schema.format_ontology_context_for_prompt()[:1000]) print("\n=== SCHEMA-AWARE SPARQL DOCSTRING ===") print(create_schema_aware_sparql_docstring()[:1500])