""" LinkML Schema Loader for DSPy Heritage RAG Loads and parses LinkML schema files to provide schema-aware context for DSPy signatures and RAG pipeline components. The loader extracts: - Class definitions with descriptions and ontology mappings - Slot definitions with URIs and ranges - Enum values for controlled vocabularies - Prefix mappings for SPARQL generation This enables: 1. Dynamic schema context injection into DSPy signatures 2. Schema-validated entity extraction 3. Ontology-aligned SPARQL generation 4. Rich answer synthesis with correct ontology terms """ from __future__ import annotations import logging from dataclasses import dataclass, field from functools import lru_cache from pathlib import Path from typing import Any, Optional import yaml logger = logging.getLogger(__name__) # Default schema directory SCHEMA_BASE_DIR = Path(__file__).parent.parent.parent / "schemas" / "20251121" / "linkml" @dataclass class OntologyPrefix: """An ontology prefix mapping.""" prefix: str uri: str description: Optional[str] = None @dataclass class SlotDefinition: """A slot (property) definition from LinkML schema.""" name: str slot_uri: Optional[str] = None range: Optional[str] = None description: Optional[str] = None required: bool = False multivalued: bool = False exact_mappings: list[str] = field(default_factory=list) close_mappings: list[str] = field(default_factory=list) examples: list[dict] = field(default_factory=list) @dataclass class EnumValue: """A permissible value in an enum.""" name: str description: Optional[str] = None meaning: Optional[str] = None # Wikidata mapping comments: list[str] = field(default_factory=list) @dataclass class EnumDefinition: """An enum definition from LinkML schema.""" name: str description: Optional[str] = None values: list[EnumValue] = field(default_factory=list) @dataclass class ClassDefinition: """A class definition from LinkML schema.""" name: str class_uri: Optional[str] = None description: Optional[str] = None is_a: Optional[str] = None slots: list[str] = field(default_factory=list) exact_mappings: list[str] = field(default_factory=list) close_mappings: list[str] = field(default_factory=list) narrow_mappings: list[str] = field(default_factory=list) @dataclass class StaffRoleDefinition: """A staff role class definition from LinkML schema. Represents an official job title/appellation in heritage institutions, categorized by role family (CURATORIAL, ARCHIVAL, DIGITAL, etc.). """ name: str category: str # CURATORIAL, ARCHIVAL, DIGITAL, etc. description: Optional[str] = None class_uri: Optional[str] = None common_variants: list[str] = field(default_factory=list) wikidata_mapping: Optional[str] = None # e.g., wikidata:Q674426 @dataclass class HeritageSchema: """Complete parsed heritage custodian schema.""" # Core schema metadata name: str version: str description: str # Ontology prefixes prefixes: dict[str, OntologyPrefix] = field(default_factory=dict) # Classes classes: dict[str, ClassDefinition] = field(default_factory=dict) # Slots (properties) slots: dict[str, SlotDefinition] = field(default_factory=dict) # Enums enums: dict[str, EnumDefinition] = field(default_factory=dict) # Custodian types (from CustodianPrimaryTypeEnum) custodian_types: list[EnumValue] = field(default_factory=list) # Staff roles organized by category (from StaffRoles.yaml) staff_roles: dict[str, list[StaffRoleDefinition]] = field(default_factory=dict) # Role categories (from RoleCategoryEnum in StaffRole.yaml) role_categories: list[EnumValue] = field(default_factory=list) def get_sparql_prefixes(self) -> str: """Generate SPARQL prefix declarations from schema prefixes.""" lines = [] for prefix, info in self.prefixes.items(): lines.append(f"PREFIX {prefix}: <{info.uri}>") return "\n".join(lines) def get_custodian_type_names(self) -> list[str]: """Get list of custodian type enum values.""" return [v.name for v in self.custodian_types] def get_staff_role_names(self) -> list[str]: """Get flat list of all staff role class names.""" roles = [] for category_roles in self.staff_roles.values(): roles.extend([r.name for r in category_roles]) return sorted(roles) def get_staff_role_category_names(self) -> list[str]: """Get list of staff role category names.""" return [v.name for v in self.role_categories] def get_staff_roles_by_category(self) -> dict[str, list[str]]: """Get staff role names organized by category.""" return { category: [r.name for r in roles] for category, roles in self.staff_roles.items() } def get_class_description(self, class_name: str) -> Optional[str]: """Get description for a class.""" cls = self.classes.get(class_name) return cls.description if cls else None def get_slot_uri(self, slot_name: str) -> Optional[str]: """Get the slot URI for a slot name.""" slot = self.slots.get(slot_name) return slot.slot_uri if slot else None def format_entity_types_for_prompt(self) -> str: """Format custodian types for DSPy prompt injection.""" lines = ["Heritage Custodian Types (GLAMORCUBESFIXPHDNT taxonomy):"] for ct in self.custodian_types: desc = ct.description.split("(")[0].strip() if ct.description else ct.name lines.append(f" - {ct.name}: {desc}") return "\n".join(lines) def format_key_properties_for_prompt(self) -> str: """Format key properties for DSPy prompt injection.""" key_slots = [ "hc_id", "preferred_label", "custodian_type", "legal_status", "place_designation", "has_collection", "identifiers", "organizational_structure", "encompassing_body" ] lines = ["Key Properties:"] for slot_name in key_slots: slot = self.slots.get(slot_name) if slot: uri = slot.slot_uri or f"hc:{slot_name}" desc = (slot.description or "").split("\n")[0][:80] lines.append(f" - {uri}: {desc}") return "\n".join(lines) def format_staff_role_categories_for_prompt(self) -> str: """Format staff role categories for DSPy prompt injection.""" lines = ["Staff Role Categories (13 categories):"] for rc in self.role_categories: desc = rc.description[:60] if rc.description else rc.name lines.append(f" - {rc.name}: {desc}") return "\n".join(lines) def format_staff_roles_for_prompt(self, max_per_category: int = 5) -> str: """Format staff roles for DSPy prompt injection. Args: max_per_category: Maximum roles to show per category (for brevity) """ lines = ["Staff Roles by Category:"] for category, roles in sorted(self.staff_roles.items()): role_names = [r.name for r in roles[:max_per_category]] if len(roles) > max_per_category: role_names.append(f"... +{len(roles) - max_per_category} more") lines.append(f" - {category}: {', '.join(role_names)}") return "\n".join(lines) def format_ontology_context_for_prompt(self) -> str: """Format complete ontology context for DSPy prompts.""" sections = [ "=" * 60, "HERITAGE CUSTODIAN ONTOLOGY CONTEXT", "=" * 60, "", "Hub Architecture:", " - Custodian (crm:E39_Actor): Central hub entity", " - CustodianObservation: Evidence from sources", " - CustodianName: Standardized emic names", " - CustodianLegalStatus: Formal legal entity", " - CustodianPlace: Geographic location", " - CustodianCollection: Heritage collections", "", self.format_entity_types_for_prompt(), "", self.format_key_properties_for_prompt(), "", ] # Add staff roles if loaded if self.role_categories: sections.extend([ self.format_staff_role_categories_for_prompt(), "", self.format_staff_roles_for_prompt(), "", ]) sections.append("Key Ontology Prefixes:") for prefix, info in list(self.prefixes.items())[:12]: # Top 12 prefixes sections.append(f" PREFIX {prefix}: <{info.uri}>") sections.extend([ "", "=" * 60, ]) return "\n".join(sections) class SchemaLoader: """ Loads and parses LinkML schema files for the Heritage Custodian Ontology. Usage: loader = SchemaLoader() schema = loader.load() # Get SPARQL prefixes prefixes = schema.get_sparql_prefixes() # Get custodian types for entity extraction types = schema.get_custodian_type_names() # Get prompt context context = schema.format_ontology_context_for_prompt() """ def __init__(self, schema_dir: Optional[Path] = None): """Initialize schema loader. Args: schema_dir: Path to LinkML schema directory. Defaults to schemas/20251121/linkml/ """ self.schema_dir = schema_dir or SCHEMA_BASE_DIR self._schema: Optional[HeritageSchema] = None def load(self, force_reload: bool = False) -> HeritageSchema: """Load and parse the complete schema. Args: force_reload: Force reload even if cached Returns: Parsed HeritageSchema object """ if self._schema is not None and not force_reload: return self._schema logger.info(f"Loading LinkML schema from {self.schema_dir}") # Load main schema file main_schema_path = self.schema_dir / "01_custodian_name_modular.yaml" if not main_schema_path.exists(): raise FileNotFoundError(f"Main schema not found: {main_schema_path}") with open(main_schema_path, "r", encoding="utf-8") as f: main_schema = yaml.safe_load(f) # Initialize schema object schema = HeritageSchema( name=main_schema.get("name", "heritage_custodian_ontology"), version=main_schema.get("version", "0.9.9"), description=main_schema.get("description", ""), ) # Load prefixes from Custodian class (has the most complete set) schema.prefixes = self._load_prefixes() # Load custodian types enum schema.custodian_types = self._load_custodian_types() schema.enums["CustodianPrimaryTypeEnum"] = EnumDefinition( name="CustodianPrimaryTypeEnum", description="GLAMORCUBESFIXPHDNT Primary Type Categories", values=schema.custodian_types, ) # Load key classes schema.classes = self._load_key_classes() # Load key slots schema.slots = self._load_key_slots() # Load staff role categories (RoleCategoryEnum) schema.role_categories = self._load_role_categories() schema.enums["RoleCategoryEnum"] = EnumDefinition( name="RoleCategoryEnum", description="Staff Role Categories", values=schema.role_categories, ) # Load staff roles organized by category schema.staff_roles = self._load_staff_roles() self._schema = schema logger.info(f"Loaded schema with {len(schema.classes)} classes, " f"{len(schema.slots)} slots, {len(schema.custodian_types)} custodian types, " f"{len(schema.role_categories)} role categories, " f"{sum(len(r) for r in schema.staff_roles.values())} staff roles") return schema def _load_prefixes(self) -> dict[str, OntologyPrefix]: """Load ontology prefixes from Custodian class file.""" prefixes = {} # Default prefixes from main schema and Custodian class default_prefixes = { "linkml": "https://w3id.org/linkml/", "hc": "https://nde.nl/ontology/hc/", "crm": "http://www.cidoc-crm.org/cidoc-crm/", "prov": "http://www.w3.org/ns/prov#", "schema": "http://schema.org/", "cpov": "http://data.europa.eu/m8g/", "rico": "https://www.ica.org/standards/RiC/ontology#", "foaf": "http://xmlns.com/foaf/0.1/", "tooi": "https://identifier.overheid.nl/tooi/def/ont/", "org": "http://www.w3.org/ns/org#", "skos": "http://www.w3.org/2004/02/skos/core#", "dcterms": "http://purl.org/dc/terms/", "dct": "http://purl.org/dc/terms/", "wdt": "http://www.wikidata.org/prop/direct/", "wikidata": "http://www.wikidata.org/entity/", "geo": "http://www.opengis.net/ont/geosparql#", "geof": "http://www.opengis.net/def/function/geosparql/", "ghcid": "https://w3id.org/heritage/custodian/", "sosa": "http://www.w3.org/ns/sosa/", } # Try to load from Custodian.yaml for additional prefixes custodian_path = self.schema_dir / "modules" / "classes" / "Custodian.yaml" if custodian_path.exists(): try: with open(custodian_path, "r", encoding="utf-8") as f: custodian_yaml = yaml.safe_load(f) if "prefixes" in custodian_yaml: default_prefixes.update(custodian_yaml["prefixes"]) except Exception as e: logger.warning(f"Could not load prefixes from Custodian.yaml: {e}") for prefix, uri in default_prefixes.items(): prefixes[prefix] = OntologyPrefix(prefix=prefix, uri=uri) return prefixes def _load_custodian_types(self) -> list[EnumValue]: """Load CustodianPrimaryTypeEnum values.""" enum_path = self.schema_dir / "modules" / "enums" / "CustodianPrimaryTypeEnum.yaml" if not enum_path.exists(): logger.warning(f"CustodianPrimaryTypeEnum not found: {enum_path}") return [] with open(enum_path, "r", encoding="utf-8") as f: enum_yaml = yaml.safe_load(f) values = [] enum_def = enum_yaml.get("enums", {}).get("CustodianPrimaryTypeEnum", {}) permissible_values = enum_def.get("permissible_values", {}) for name, info in permissible_values.items(): values.append(EnumValue( name=name, description=info.get("description"), meaning=info.get("meaning"), comments=info.get("comments", []), )) return values def _load_key_classes(self) -> dict[str, ClassDefinition]: """Load key class definitions.""" classes = {} # Key classes to load key_class_files = [ "Custodian.yaml", "CustodianName.yaml", "CustodianObservation.yaml", "CustodianLegalStatus.yaml", "CustodianPlace.yaml", "CustodianCollection.yaml", "Identifier.yaml", "TimeSpan.yaml", "OrganizationalStructure.yaml", "EncompassingBody.yaml", ] classes_dir = self.schema_dir / "modules" / "classes" for filename in key_class_files: filepath = classes_dir / filename if not filepath.exists(): continue try: with open(filepath, "r", encoding="utf-8") as f: class_yaml = yaml.safe_load(f) # Find class definition in the YAML class_defs = class_yaml.get("classes", {}) for class_name, class_info in class_defs.items(): classes[class_name] = ClassDefinition( name=class_name, class_uri=class_info.get("class_uri"), description=class_info.get("description"), is_a=class_info.get("is_a"), slots=class_info.get("slots", []), exact_mappings=class_info.get("exact_mappings", []), close_mappings=class_info.get("close_mappings", []), narrow_mappings=class_info.get("narrow_mappings", []), ) except Exception as e: logger.warning(f"Could not load class from {filepath}: {e}") return classes def _load_key_slots(self) -> dict[str, SlotDefinition]: """Load key slot definitions.""" slots = {} # Key slots to load key_slot_files = [ "hc_id.yaml", "preferred_label.yaml", "custodian_type.yaml", "legal_status.yaml", "place_designation.yaml", "has_collection.yaml", "identifiers.yaml", "organizational_structure.yaml", "encompassing_body.yaml", "identifier_scheme.yaml", "identifier_value.yaml", "observed_name.yaml", "emic_name.yaml", "valid_from.yaml", "valid_to.yaml", ] slots_dir = self.schema_dir / "modules" / "slots" for filename in key_slot_files: filepath = slots_dir / filename if not filepath.exists(): continue try: with open(filepath, "r", encoding="utf-8") as f: slot_yaml = yaml.safe_load(f) # Find slot definition in the YAML slot_defs = slot_yaml.get("slots", {}) for slot_name, slot_info in slot_defs.items(): slots[slot_name] = SlotDefinition( name=slot_name, slot_uri=slot_info.get("slot_uri"), range=slot_info.get("range"), description=slot_info.get("description"), required=slot_info.get("required", False), multivalued=slot_info.get("multivalued", False), exact_mappings=slot_info.get("exact_mappings", []), close_mappings=slot_info.get("close_mappings", []), examples=slot_info.get("examples", []), ) except Exception as e: logger.warning(f"Could not load slot from {filepath}: {e}") return slots def _load_role_categories(self) -> list[EnumValue]: """Load RoleCategoryEnum values from StaffRole.yaml.""" enum_path = self.schema_dir / "modules" / "classes" / "StaffRole.yaml" if not enum_path.exists(): logger.warning(f"StaffRole.yaml not found: {enum_path}") return [] try: with open(enum_path, "r", encoding="utf-8") as f: staff_role_yaml = yaml.safe_load(f) values = [] enum_def = staff_role_yaml.get("enums", {}).get("RoleCategoryEnum", {}) permissible_values = enum_def.get("permissible_values", {}) for name, info in permissible_values.items(): values.append(EnumValue( name=name, description=info.get("description") if info else None, )) logger.debug(f"Loaded {len(values)} role categories") return values except Exception as e: logger.warning(f"Could not load role categories: {e}") return [] def _load_staff_roles(self) -> dict[str, list[StaffRoleDefinition]]: """Load staff role classes organized by category from StaffRoles.yaml. Parses the slot_usage.role_category.ifabsent pattern to determine category. Example: ifabsent: "string(CURATORIAL)" -> category = "CURATORIAL" Returns: Dictionary mapping category name to list of StaffRoleDefinition """ import re roles_path = self.schema_dir / "modules" / "classes" / "StaffRoles.yaml" if not roles_path.exists(): logger.warning(f"StaffRoles.yaml not found: {roles_path}") return {} try: with open(roles_path, "r", encoding="utf-8") as f: roles_yaml = yaml.safe_load(f) roles_by_category: dict[str, list[StaffRoleDefinition]] = {} class_defs = roles_yaml.get("classes", {}) # Regex to extract category from ifabsent: "string(CURATORIAL)" ifabsent_pattern = re.compile(r'string\((\w+)\)') for class_name, class_info in class_defs.items(): if not class_info: continue # Extract category from slot_usage.role_category.ifabsent category = "UNKNOWN" slot_usage = class_info.get("slot_usage", {}) role_category = slot_usage.get("role_category", {}) ifabsent = role_category.get("ifabsent", "") match = ifabsent_pattern.search(ifabsent) if match: category = match.group(1) # Extract wikidata mapping from exact_mappings wikidata_mapping = None exact_mappings = class_info.get("exact_mappings", []) for mapping in exact_mappings: if mapping.startswith("wikidata:"): wikidata_mapping = mapping break # Create role definition role_def = StaffRoleDefinition( name=class_name, category=category, description=class_info.get("description"), class_uri=class_info.get("class_uri"), wikidata_mapping=wikidata_mapping, ) # Add to category if category not in roles_by_category: roles_by_category[category] = [] roles_by_category[category].append(role_def) total_roles = sum(len(r) for r in roles_by_category.values()) logger.debug(f"Loaded {total_roles} staff roles across {len(roles_by_category)} categories") return roles_by_category except Exception as e: logger.warning(f"Could not load staff roles: {e}") return {} # Singleton instance for easy access _schema_loader: Optional[SchemaLoader] = None def get_schema_loader() -> SchemaLoader: """Get singleton schema loader instance.""" global _schema_loader if _schema_loader is None: _schema_loader = SchemaLoader() return _schema_loader @lru_cache(maxsize=1) def get_heritage_schema() -> HeritageSchema: """Get cached heritage schema (loaded once).""" loader = get_schema_loader() return loader.load() # Convenience functions for common operations def get_sparql_prefixes() -> str: """Get SPARQL prefix declarations from schema.""" return get_heritage_schema().get_sparql_prefixes() def get_custodian_types() -> list[str]: """Get list of valid custodian type names.""" return get_heritage_schema().get_custodian_type_names() def get_ontology_context() -> str: """Get formatted ontology context for DSPy prompts.""" return get_heritage_schema().format_ontology_context_for_prompt() def get_entity_types_prompt() -> str: """Get formatted entity types for DSPy entity extraction.""" return get_heritage_schema().format_entity_types_for_prompt() def get_key_properties_prompt() -> str: """Get formatted key properties for DSPy prompts.""" return get_heritage_schema().format_key_properties_for_prompt() # Staff Role Convenience Functions def get_staff_role_categories() -> list[str]: """Get list of staff role category names (13 categories). Returns: List of role category names like ['CURATORIAL', 'ARCHIVAL', 'DIGITAL', ...] """ return get_heritage_schema().get_staff_role_category_names() def get_all_staff_roles() -> list[str]: """Get flat list of all staff role class names (64 roles). Returns: List of role names like ['Curator', 'Archivist', 'DataEngineer', ...] """ return get_heritage_schema().get_staff_role_names() def get_staff_role_classes() -> dict[str, list[str]]: """Get staff role names organized by category. Returns: Dictionary mapping category to list of role names. Example: {'CURATORIAL': ['Curator', 'CollectionsManager'], ...} """ return get_heritage_schema().get_staff_roles_by_category() def get_staff_roles_prompt() -> str: """Get formatted staff roles for DSPy prompts.""" return get_heritage_schema().format_staff_roles_for_prompt() def get_staff_role_categories_prompt() -> str: """Get formatted staff role categories for DSPy prompts.""" return get_heritage_schema().format_staff_role_categories_for_prompt() # ============================================================================= # Schema-Aware Signature Helpers # ============================================================================= def create_schema_aware_sparql_docstring() -> str: """Create docstring for SPARQL generator with schema-derived prefixes.""" schema = get_heritage_schema() # Build prefix section prefix_lines = [] for prefix, info in list(schema.prefixes.items())[:15]: # Top 15 prefix_lines.append(f" - PREFIX {prefix}: <{info.uri}>") # Build class section class_lines = [] for cls_name, cls_def in schema.classes.items(): uri = cls_def.class_uri or f"hc:{cls_name}" desc = (cls_def.description or "").split("\n")[0][:60] class_lines.append(f" - {uri} ({cls_name}): {desc}") # Build property section prop_lines = [] for slot_name, slot_def in list(schema.slots.items())[:10]: uri = slot_def.slot_uri or f"hc:{slot_name}" desc = (slot_def.description or "").split("\n")[0][:60] prop_lines.append(f" - {uri}: {desc}") docstring = f"""Generate SPARQL queries for heritage custodian knowledge graph. You are an expert in SPARQL and the Heritage Custodian Ontology (v{schema.version}). Generate valid SPARQL queries that work with our Oxigraph endpoint. Ontology Prefixes (MUST USE THESE EXACT URIs): {chr(10).join(prefix_lines)} Key Classes: {chr(10).join(class_lines[:8])} Key Properties: {chr(10).join(prop_lines)} Hub Architecture: - Custodian (crm:E39_Actor) is the central hub entity - CustodianObservation contains evidence from sources - CustodianName holds standardized emic names - CustodianLegalStatus holds formal legal entity info - CustodianPlace holds geographic location - CustodianCollection holds heritage collections """ return docstring def create_schema_aware_entity_docstring() -> str: """Create docstring for entity extractor with schema-derived types. Includes multilingual synonyms with language tags when ontology_mapping module is available, enabling better entity recognition across languages. """ schema = get_heritage_schema() type_lines = [] for ct in schema.custodian_types: # Extract first part of description desc = ct.description.split("(")[0].strip() if ct.description else ct.name type_lines.append(f" - {ct.name}: {desc}") # Build multilingual synonym section with language tags synonym_lines = [] try: # Import dynamically to avoid circular imports from backend.rag.ontology_mapping import get_ontology_mapper mapper = get_ontology_mapper() # Key types to include synonyms for key_types = [ "MUSEUM", "LIBRARY", "ARCHIVE", "GALLERY", "RESEARCH_CENTER", "EDUCATION_PROVIDER", "HOLY_SACRED_SITE", "BIO_CUSTODIAN", ] for custodian_type in key_types: by_lang = mapper.get_all_synonyms_by_language( custodian_type, "CustodianPrimaryTypeEnum" ) tagged_syns: list[str] = [] # Sort languages for consistent output for lang in sorted(by_lang.keys()): if lang == "all": # Skip the aggregate 'all' key continue syns = by_lang[lang] # Take up to 2 synonyms per language for syn in sorted(syns)[:2]: tagged_syns.append(f"{syn} ({lang})") if tagged_syns: # Limit to 6 total synonyms per type for brevity synonym_lines.append(f" - {custodian_type}: {', '.join(tagged_syns[:6])}") logger.debug(f"Built multilingual synonyms for {len(synonym_lines)} types") except ImportError: logger.warning("ontology_mapping not available, using static synonyms") # Fallback to static synonyms without language tags synonym_lines = [ ' - MUSEUM: "museum", "musea", "museo", "musée"', ' - LIBRARY: "library", "bibliotheek", "bibliothèque"', ' - ARCHIVE: "archive", "archief", "archiv"', ' - GALLERY: "gallery", "galerie"', ] except Exception as e: logger.warning(f"Could not build multilingual synonyms: {e}") synonym_lines = [] # Format synonym section if synonym_lines: synonym_section = f""" MULTILINGUAL SYNONYMS (term + language code): {chr(10).join(synonym_lines)} """ else: synonym_section = "" docstring = f"""Extract heritage-specific entities from text. Identify institutions, places, dates, identifiers, and relationships following the Heritage Custodian Ontology (v{schema.version}). Institution Type Classification (GLAMORCUBESFIXPHDNT taxonomy): {chr(10).join(type_lines)} Entity Types to Extract: - INSTITUTIONS: Heritage custodians with type classification - PLACES: Geographic locations (cities, regions, countries) - TEMPORAL: Dates and time periods (founding, closure, events) - IDENTIFIERS: ISIL codes (NL-XXXX), Wikidata IDs (Q12345), GHCIDs {synonym_section} When extracting institution types, recognize synonyms in ANY language and map them to the canonical GLAMORCUBESFIXPHDNT type. """ return docstring if __name__ == "__main__": # Test the schema loader logging.basicConfig(level=logging.INFO) schema = get_heritage_schema() print("\n=== SCHEMA LOADED ===") print(f"Name: {schema.name}") print(f"Version: {schema.version}") print(f"Classes: {len(schema.classes)}") print(f"Slots: {len(schema.slots)}") print(f"Custodian Types: {len(schema.custodian_types)}") print("\n=== SPARQL PREFIXES ===") print(schema.get_sparql_prefixes()) print("\n=== CUSTODIAN TYPES ===") for ct in schema.custodian_types[:5]: desc = ct.description[:60] if ct.description else "(no description)" print(f" - {ct.name}: {desc}...") print("\n=== ONTOLOGY CONTEXT (for DSPy) ===") print(schema.format_ontology_context_for_prompt()[:1000]) print("\n=== SCHEMA-AWARE SPARQL DOCSTRING ===") print(create_schema_aware_sparql_docstring()[:1500])