glam/backend/rag/schema_loader.py
kempersc 11983014bb Enhance specificity scoring system integration with existing infrastructure
- Updated documentation to clarify integration points with existing components in the RAG pipeline and DSPy framework.
- Added detailed mapping of SPARQL templates to context templates for improved specificity filtering.
- Implemented wrapper patterns around existing classifiers to extend functionality without duplication.
- Introduced new tests for the SpecificityAwareClassifier and SPARQLToContextMapper to ensure proper integration and functionality.
- Enhanced the CustodianRDFConverter to include ISO country and subregion codes from GHCID for better geospatial data handling.
2026-01-05 17:37:49 +01:00

1604 lines
58 KiB
Python

"""
LinkML Schema Loader for DSPy Heritage RAG
Loads and parses LinkML schema files to provide schema-aware context
for DSPy signatures and RAG pipeline components.
The loader extracts:
- Class definitions with descriptions and ontology mappings
- Slot definitions with URIs and ranges
- Enum values for controlled vocabularies
- Prefix mappings for SPARQL generation
This enables:
1. Dynamic schema context injection into DSPy signatures
2. Schema-validated entity extraction
3. Ontology-aligned SPARQL generation
4. Rich answer synthesis with correct ontology terms
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from functools import lru_cache
from pathlib import Path
from typing import Any, Optional
import yaml
logger = logging.getLogger(__name__)
# Default schema directory
SCHEMA_BASE_DIR = Path(__file__).parent.parent.parent / "schemas" / "20251121" / "linkml"
@dataclass
class OntologyPrefix:
"""An ontology prefix mapping."""
prefix: str
uri: str
description: Optional[str] = None
@dataclass
class SlotDefinition:
"""A slot (property) definition from LinkML schema."""
name: str
slot_uri: Optional[str] = None
range: Optional[str] = None
description: Optional[str] = None
required: bool = False
multivalued: bool = False
exact_mappings: list[str] = field(default_factory=list)
close_mappings: list[str] = field(default_factory=list)
examples: list[dict] = field(default_factory=list)
@dataclass
class EnumValue:
"""A permissible value in an enum."""
name: str
description: Optional[str] = None
meaning: Optional[str] = None # Wikidata mapping
comments: list[str] = field(default_factory=list)
@dataclass
class EnumDefinition:
"""An enum definition from LinkML schema."""
name: str
description: Optional[str] = None
values: list[EnumValue] = field(default_factory=list)
@dataclass
class SlotUsageEntry:
"""A slot_usage entry from a class definition.
slot_usage provides class-specific overrides for slot definitions,
including context-specific slot_uri mappings.
"""
slot_name: str
slot_uri: Optional[str] = None
description: Optional[str] = None
range: Optional[str] = None
required: Optional[bool] = None
multivalued: Optional[bool] = None
@dataclass
class ClassDefinition:
"""A class definition from LinkML schema."""
name: str
class_uri: Optional[str] = None
description: Optional[str] = None
is_a: Optional[str] = None
slots: list[str] = field(default_factory=list)
exact_mappings: list[str] = field(default_factory=list)
close_mappings: list[str] = field(default_factory=list)
narrow_mappings: list[str] = field(default_factory=list)
# slot_usage provides class-specific slot overrides (including slot_uri)
slot_usage: dict[str, SlotUsageEntry] = field(default_factory=dict)
@dataclass
class StaffRoleDefinition:
"""A staff role class definition from LinkML schema.
Represents an official job title/appellation in heritage institutions,
categorized by role family (CURATORIAL, ARCHIVAL, DIGITAL, etc.).
"""
name: str
category: str # CURATORIAL, ARCHIVAL, DIGITAL, etc.
description: Optional[str] = None
class_uri: Optional[str] = None
common_variants: list[str] = field(default_factory=list)
wikidata_mapping: Optional[str] = None # e.g., wikidata:Q674426
@dataclass
class HeritageSchema:
"""Complete parsed heritage custodian schema."""
# Core schema metadata
name: str
version: str
description: str
# Ontology prefixes
prefixes: dict[str, OntologyPrefix] = field(default_factory=dict)
# Classes
classes: dict[str, ClassDefinition] = field(default_factory=dict)
# Slots (properties)
slots: dict[str, SlotDefinition] = field(default_factory=dict)
# Enums
enums: dict[str, EnumDefinition] = field(default_factory=dict)
# Custodian types (from CustodianPrimaryTypeEnum)
custodian_types: list[EnumValue] = field(default_factory=list)
# Staff roles organized by category (from StaffRoles.yaml)
staff_roles: dict[str, list[StaffRoleDefinition]] = field(default_factory=dict)
# Role categories (from RoleCategoryEnum in StaffRole.yaml)
role_categories: list[EnumValue] = field(default_factory=list)
def get_sparql_prefixes(self) -> str:
"""Generate SPARQL prefix declarations from schema prefixes."""
lines = []
for prefix, info in self.prefixes.items():
lines.append(f"PREFIX {prefix}: <{info.uri}>")
return "\n".join(lines)
def get_custodian_type_names(self) -> list[str]:
"""Get list of custodian type enum values."""
return [v.name for v in self.custodian_types]
def get_staff_role_names(self) -> list[str]:
"""Get flat list of all staff role class names."""
roles = []
for category_roles in self.staff_roles.values():
roles.extend([r.name for r in category_roles])
return sorted(roles)
def get_staff_role_category_names(self) -> list[str]:
"""Get list of staff role category names."""
return [v.name for v in self.role_categories]
def get_staff_roles_by_category(self) -> dict[str, list[str]]:
"""Get staff role names organized by category."""
return {
category: [r.name for r in roles]
for category, roles in self.staff_roles.items()
}
def get_class_description(self, class_name: str) -> Optional[str]:
"""Get description for a class."""
cls = self.classes.get(class_name)
return cls.description if cls else None
def get_slot_uri(self, slot_name: str) -> Optional[str]:
"""Get the slot URI for a slot name."""
slot = self.slots.get(slot_name)
return slot.slot_uri if slot else None
def get_slot_uri_for_class(self, class_name: str, slot_name: str) -> Optional[str]:
"""Get the slot_uri for a slot within a specific class context.
Priority:
1. class.slot_usage[slot_name].slot_uri (context-specific override)
2. slots[slot_name].slot_uri (global default)
3. None if not found
Args:
class_name: Name of the class (e.g., "PersonObservation")
slot_name: Name of the slot (e.g., "role_title")
Returns:
The slot_uri string or None if not found
"""
# First, check class-specific slot_usage
cls = self.classes.get(class_name)
if cls and slot_name in cls.slot_usage:
slot_entry = cls.slot_usage[slot_name]
if slot_entry.slot_uri:
return slot_entry.slot_uri
# Fall back to global slot definition
return self.get_slot_uri(slot_name)
def format_entity_types_for_prompt(self) -> str:
"""Format custodian types for DSPy prompt injection."""
lines = ["Heritage Custodian Types (GLAMORCUBESFIXPHDNT taxonomy):"]
for ct in self.custodian_types:
desc = ct.description.split("(")[0].strip() if ct.description else ct.name
lines.append(f" - {ct.name}: {desc}")
return "\n".join(lines)
def format_key_properties_for_prompt(self) -> str:
"""Format key properties for DSPy prompt injection."""
key_slots = [
"hc_id", "preferred_label", "custodian_type", "legal_status",
"place_designation", "has_collection", "identifiers",
"organizational_structure", "encompassing_body"
]
lines = ["Key Properties:"]
for slot_name in key_slots:
slot = self.slots.get(slot_name)
if slot:
uri = slot.slot_uri or f"hc:{slot_name}"
desc = (slot.description or "").split("\n")[0][:80]
lines.append(f" - {uri}: {desc}")
return "\n".join(lines)
def format_person_properties_for_prompt(self) -> str:
"""Format person properties for DSPy prompt injection.
Uses slot_usage from PersonObservation class to get context-specific
slot_uri mappings for person-related queries.
"""
key_slots = [
"person_name", "role_title", "staff_role", "unit_affiliation",
"contact_email", "expertise_areas", "birth_date", "refers_to_person",
"role_start_date", "role_end_date", "observation_source"
]
lines = ["Person Properties (from PersonObservation):"]
for slot_name in key_slots:
uri = self.get_slot_uri_for_class("PersonObservation", slot_name)
if uri:
# Get description from slot or slot_usage
desc = ""
cls = self.classes.get("PersonObservation")
if cls and slot_name in cls.slot_usage:
desc = cls.slot_usage[slot_name].description or ""
if not desc:
slot = self.slots.get(slot_name)
if slot:
desc = (slot.description or "").split("\n")[0][:60]
lines.append(f" - {uri}: {slot_name} - {desc}")
else:
# Fallback for slots not yet in schema
lines.append(f" - hc:{slot_name}: {slot_name}")
return "\n".join(lines)
def format_staff_role_categories_for_prompt(self) -> str:
"""Format staff role categories for DSPy prompt injection."""
lines = ["Staff Role Categories (13 categories):"]
for rc in self.role_categories:
desc = rc.description[:60] if rc.description else rc.name
lines.append(f" - {rc.name}: {desc}")
return "\n".join(lines)
def format_staff_roles_for_prompt(self, max_per_category: int = 5) -> str:
"""Format staff roles for DSPy prompt injection.
Args:
max_per_category: Maximum roles to show per category (for brevity)
"""
lines = ["Staff Roles by Category:"]
for category, roles in sorted(self.staff_roles.items()):
role_names = [r.name for r in roles[:max_per_category]]
if len(roles) > max_per_category:
role_names.append(f"... +{len(roles) - max_per_category} more")
lines.append(f" - {category}: {', '.join(role_names)}")
return "\n".join(lines)
def format_ontology_context_for_prompt(self) -> str:
"""Format complete ontology context for DSPy prompts."""
sections = [
"=" * 60,
"HERITAGE CUSTODIAN ONTOLOGY CONTEXT",
"=" * 60,
"",
"Hub Architecture:",
" - Custodian (crm:E39_Actor): Central hub entity",
" - CustodianObservation: Evidence from sources",
" - CustodianName: Standardized emic names",
" - CustodianLegalStatus: Formal legal entity",
" - CustodianPlace: Geographic location",
" - CustodianCollection: Heritage collections",
"",
self.format_entity_types_for_prompt(),
"",
self.format_key_properties_for_prompt(),
"",
]
# Add staff roles if loaded
if self.role_categories:
sections.extend([
self.format_staff_role_categories_for_prompt(),
"",
self.format_staff_roles_for_prompt(),
"",
])
sections.append("Key Ontology Prefixes:")
for prefix, info in list(self.prefixes.items())[:12]: # Top 12 prefixes
sections.append(f" PREFIX {prefix}: <{info.uri}>")
sections.extend([
"",
"=" * 60,
])
return "\n".join(sections)
class SchemaLoader:
"""
Loads and parses LinkML schema files for the Heritage Custodian Ontology.
Usage:
loader = SchemaLoader()
schema = loader.load()
# Get SPARQL prefixes
prefixes = schema.get_sparql_prefixes()
# Get custodian types for entity extraction
types = schema.get_custodian_type_names()
# Get prompt context
context = schema.format_ontology_context_for_prompt()
"""
def __init__(self, schema_dir: Optional[Path] = None):
"""Initialize schema loader.
Args:
schema_dir: Path to LinkML schema directory. Defaults to
schemas/20251121/linkml/
"""
self.schema_dir = schema_dir or SCHEMA_BASE_DIR
self._schema: Optional[HeritageSchema] = None
def load(self, force_reload: bool = False) -> HeritageSchema:
"""Load and parse the complete schema.
Args:
force_reload: Force reload even if cached
Returns:
Parsed HeritageSchema object
"""
if self._schema is not None and not force_reload:
return self._schema
logger.info(f"Loading LinkML schema from {self.schema_dir}")
# Load main schema file
main_schema_path = self.schema_dir / "01_custodian_name_modular.yaml"
if not main_schema_path.exists():
raise FileNotFoundError(f"Main schema not found: {main_schema_path}")
with open(main_schema_path, "r", encoding="utf-8") as f:
main_schema = yaml.safe_load(f)
# Initialize schema object
schema = HeritageSchema(
name=main_schema.get("name", "heritage_custodian_ontology"),
version=main_schema.get("version", "0.9.9"),
description=main_schema.get("description", ""),
)
# Load prefixes from Custodian class (has the most complete set)
schema.prefixes = self._load_prefixes()
# Load custodian types enum
schema.custodian_types = self._load_custodian_types()
schema.enums["CustodianPrimaryTypeEnum"] = EnumDefinition(
name="CustodianPrimaryTypeEnum",
description="GLAMORCUBESFIXPHDNT Primary Type Categories",
values=schema.custodian_types,
)
# Load key classes
schema.classes = self._load_key_classes()
# Load key slots
schema.slots = self._load_key_slots()
# Load staff role categories (RoleCategoryEnum)
schema.role_categories = self._load_role_categories()
schema.enums["RoleCategoryEnum"] = EnumDefinition(
name="RoleCategoryEnum",
description="Staff Role Categories",
values=schema.role_categories,
)
# Load staff roles organized by category
schema.staff_roles = self._load_staff_roles()
self._schema = schema
logger.info(f"Loaded schema with {len(schema.classes)} classes, "
f"{len(schema.slots)} slots, {len(schema.custodian_types)} custodian types, "
f"{len(schema.role_categories)} role categories, "
f"{sum(len(r) for r in schema.staff_roles.values())} staff roles")
return schema
def _load_prefixes(self) -> dict[str, OntologyPrefix]:
"""Load ontology prefixes from Custodian class file."""
prefixes = {}
# Default prefixes from main schema and Custodian class
default_prefixes = {
"linkml": "https://w3id.org/linkml/",
"hc": "https://nde.nl/ontology/hc/",
"crm": "http://www.cidoc-crm.org/cidoc-crm/",
"prov": "http://www.w3.org/ns/prov#",
"schema": "http://schema.org/",
"cpov": "http://data.europa.eu/m8g/",
"rico": "https://www.ica.org/standards/RiC/ontology#",
"foaf": "http://xmlns.com/foaf/0.1/",
"tooi": "https://identifier.overheid.nl/tooi/def/ont/",
"org": "http://www.w3.org/ns/org#",
"skos": "http://www.w3.org/2004/02/skos/core#",
"dcterms": "http://purl.org/dc/terms/",
"dct": "http://purl.org/dc/terms/",
"wdt": "http://www.wikidata.org/prop/direct/",
"wikidata": "http://www.wikidata.org/entity/",
"geo": "http://www.opengis.net/ont/geosparql#",
"geof": "http://www.opengis.net/def/function/geosparql/",
"ghcid": "https://nde.nl/ontology/hc/",
"sosa": "http://www.w3.org/ns/sosa/",
}
# Try to load from Custodian.yaml for additional prefixes
custodian_path = self.schema_dir / "modules" / "classes" / "Custodian.yaml"
if custodian_path.exists():
try:
with open(custodian_path, "r", encoding="utf-8") as f:
custodian_yaml = yaml.safe_load(f)
if "prefixes" in custodian_yaml:
default_prefixes.update(custodian_yaml["prefixes"])
except Exception as e:
logger.warning(f"Could not load prefixes from Custodian.yaml: {e}")
for prefix, uri in default_prefixes.items():
prefixes[prefix] = OntologyPrefix(prefix=prefix, uri=uri)
return prefixes
def _load_custodian_types(self) -> list[EnumValue]:
"""Load CustodianPrimaryTypeEnum values."""
enum_path = self.schema_dir / "modules" / "enums" / "CustodianPrimaryTypeEnum.yaml"
if not enum_path.exists():
logger.warning(f"CustodianPrimaryTypeEnum not found: {enum_path}")
return []
with open(enum_path, "r", encoding="utf-8") as f:
enum_yaml = yaml.safe_load(f)
values = []
enum_def = enum_yaml.get("enums", {}).get("CustodianPrimaryTypeEnum", {})
permissible_values = enum_def.get("permissible_values", {})
for name, info in permissible_values.items():
values.append(EnumValue(
name=name,
description=info.get("description"),
meaning=info.get("meaning"),
comments=info.get("comments", []),
))
return values
def _load_key_classes(self) -> dict[str, ClassDefinition]:
"""Load key class definitions."""
classes = {}
# Key classes to load
key_class_files = [
# Custodian classes (hub architecture)
"Custodian.yaml",
"CustodianName.yaml",
"CustodianObservation.yaml",
"CustodianLegalStatus.yaml",
"CustodianPlace.yaml",
"CustodianCollection.yaml",
# Person classes (hub architecture for people)
"Person.yaml",
"PersonObservation.yaml",
"PersonName.yaml",
"PersonWebClaim.yaml",
"PersonConnection.yaml",
"StaffRole.yaml",
"StaffRoles.yaml",
# Supporting classes
"Identifier.yaml",
"TimeSpan.yaml",
"OrganizationalStructure.yaml",
"EncompassingBody.yaml",
"Event.yaml",
"WebClaim.yaml",
]
classes_dir = self.schema_dir / "modules" / "classes"
for filename in key_class_files:
filepath = classes_dir / filename
if not filepath.exists():
continue
try:
with open(filepath, "r", encoding="utf-8") as f:
class_yaml = yaml.safe_load(f)
# Find class definition in the YAML
class_defs = class_yaml.get("classes", {})
for class_name, class_info in class_defs.items():
# Parse slot_usage section for context-specific slot overrides
slot_usage_raw = class_info.get("slot_usage", {})
slot_usage = {}
for slot_name, slot_info in slot_usage_raw.items():
if slot_info is None:
slot_info = {}
slot_usage[slot_name] = SlotUsageEntry(
slot_name=slot_name,
slot_uri=slot_info.get("slot_uri"),
description=slot_info.get("description"),
range=slot_info.get("range"),
required=slot_info.get("required"),
multivalued=slot_info.get("multivalued"),
)
classes[class_name] = ClassDefinition(
name=class_name,
class_uri=class_info.get("class_uri"),
description=class_info.get("description"),
is_a=class_info.get("is_a"),
slots=class_info.get("slots", []),
exact_mappings=class_info.get("exact_mappings", []),
close_mappings=class_info.get("close_mappings", []),
narrow_mappings=class_info.get("narrow_mappings", []),
slot_usage=slot_usage,
)
except Exception as e:
logger.warning(f"Could not load class from {filepath}: {e}")
return classes
def _load_key_slots(self) -> dict[str, SlotDefinition]:
"""Load key slot definitions."""
slots = {}
# Key slots to load
key_slot_files = [
# Custodian slots
"hc_id.yaml",
"preferred_label.yaml",
"custodian_type.yaml",
"legal_status.yaml",
"place_designation.yaml",
"has_collection.yaml",
"identifiers.yaml",
"organizational_structure.yaml",
"encompassing_body.yaml",
"identifier_scheme.yaml",
"identifier_value.yaml",
"observed_name.yaml",
"emic_name.yaml",
"valid_from.yaml",
"valid_to.yaml",
# Person-related slots
"person_name.yaml",
"has_person_name.yaml",
"role_title.yaml",
"staff_role.yaml",
"unit_affiliation.yaml",
"contact_email.yaml",
"expertise_areas.yaml",
"birth_date.yaml",
"role_start_date.yaml",
"role_end_date.yaml",
"refers_to_person.yaml",
"refers_to_custodian.yaml",
"participated_in_events.yaml",
]
slots_dir = self.schema_dir / "modules" / "slots"
for filename in key_slot_files:
filepath = slots_dir / filename
if not filepath.exists():
continue
try:
with open(filepath, "r", encoding="utf-8") as f:
slot_yaml = yaml.safe_load(f)
# Find slot definition in the YAML
slot_defs = slot_yaml.get("slots", {})
for slot_name, slot_info in slot_defs.items():
slots[slot_name] = SlotDefinition(
name=slot_name,
slot_uri=slot_info.get("slot_uri"),
range=slot_info.get("range"),
description=slot_info.get("description"),
required=slot_info.get("required", False),
multivalued=slot_info.get("multivalued", False),
exact_mappings=slot_info.get("exact_mappings", []),
close_mappings=slot_info.get("close_mappings", []),
examples=slot_info.get("examples", []),
)
except Exception as e:
logger.warning(f"Could not load slot from {filepath}: {e}")
return slots
def _load_role_categories(self) -> list[EnumValue]:
"""Load RoleCategoryEnum values from StaffRole.yaml."""
enum_path = self.schema_dir / "modules" / "classes" / "StaffRole.yaml"
if not enum_path.exists():
logger.warning(f"StaffRole.yaml not found: {enum_path}")
return []
try:
with open(enum_path, "r", encoding="utf-8") as f:
staff_role_yaml = yaml.safe_load(f)
values = []
enum_def = staff_role_yaml.get("enums", {}).get("RoleCategoryEnum", {})
permissible_values = enum_def.get("permissible_values", {})
for name, info in permissible_values.items():
values.append(EnumValue(
name=name,
description=info.get("description") if info else None,
))
logger.debug(f"Loaded {len(values)} role categories")
return values
except Exception as e:
logger.warning(f"Could not load role categories: {e}")
return []
def _load_staff_roles(self) -> dict[str, list[StaffRoleDefinition]]:
"""Load staff role classes organized by category from StaffRoles.yaml.
Parses the slot_usage.role_category.ifabsent pattern to determine category.
Example: ifabsent: "string(CURATORIAL)" -> category = "CURATORIAL"
Returns:
Dictionary mapping category name to list of StaffRoleDefinition
"""
import re
roles_path = self.schema_dir / "modules" / "classes" / "StaffRoles.yaml"
if not roles_path.exists():
logger.warning(f"StaffRoles.yaml not found: {roles_path}")
return {}
try:
with open(roles_path, "r", encoding="utf-8") as f:
roles_yaml = yaml.safe_load(f)
roles_by_category: dict[str, list[StaffRoleDefinition]] = {}
class_defs = roles_yaml.get("classes", {})
# Regex to extract category from ifabsent: "string(CURATORIAL)"
ifabsent_pattern = re.compile(r'string\((\w+)\)')
for class_name, class_info in class_defs.items():
if not class_info:
continue
# Extract category from slot_usage.role_category.ifabsent
category = "UNKNOWN"
slot_usage = class_info.get("slot_usage", {})
role_category = slot_usage.get("role_category", {})
ifabsent = role_category.get("ifabsent", "")
match = ifabsent_pattern.search(ifabsent)
if match:
category = match.group(1)
# Extract wikidata mapping from exact_mappings
wikidata_mapping = None
exact_mappings = class_info.get("exact_mappings", [])
for mapping in exact_mappings:
if mapping.startswith("wikidata:"):
wikidata_mapping = mapping
break
# Create role definition
role_def = StaffRoleDefinition(
name=class_name,
category=category,
description=class_info.get("description"),
class_uri=class_info.get("class_uri"),
wikidata_mapping=wikidata_mapping,
)
# Add to category
if category not in roles_by_category:
roles_by_category[category] = []
roles_by_category[category].append(role_def)
total_roles = sum(len(r) for r in roles_by_category.values())
logger.debug(f"Loaded {total_roles} staff roles across {len(roles_by_category)} categories")
return roles_by_category
except Exception as e:
logger.warning(f"Could not load staff roles: {e}")
return {}
# Singleton instance for easy access
_schema_loader: Optional[SchemaLoader] = None
def get_schema_loader() -> SchemaLoader:
"""Get singleton schema loader instance."""
global _schema_loader
if _schema_loader is None:
_schema_loader = SchemaLoader()
return _schema_loader
@lru_cache(maxsize=1)
def get_heritage_schema() -> HeritageSchema:
"""Get cached heritage schema (loaded once)."""
loader = get_schema_loader()
return loader.load()
# Convenience functions for common operations
def get_sparql_prefixes() -> str:
"""Get SPARQL prefix declarations from schema."""
return get_heritage_schema().get_sparql_prefixes()
def get_custodian_types() -> list[str]:
"""Get list of valid custodian type names."""
return get_heritage_schema().get_custodian_type_names()
def get_ontology_context() -> str:
"""Get formatted ontology context for DSPy prompts."""
return get_heritage_schema().format_ontology_context_for_prompt()
def get_entity_types_prompt() -> str:
"""Get formatted entity types for DSPy entity extraction."""
return get_heritage_schema().format_entity_types_for_prompt()
def get_key_properties_prompt() -> str:
"""Get formatted key properties for DSPy prompts."""
return get_heritage_schema().format_key_properties_for_prompt()
# Staff Role Convenience Functions
def get_staff_role_categories() -> list[str]:
"""Get list of staff role category names (13 categories).
Returns:
List of role category names like ['CURATORIAL', 'ARCHIVAL', 'DIGITAL', ...]
"""
return get_heritage_schema().get_staff_role_category_names()
def get_all_staff_roles() -> list[str]:
"""Get flat list of all staff role class names (64 roles).
Returns:
List of role names like ['Curator', 'Archivist', 'DataEngineer', ...]
"""
return get_heritage_schema().get_staff_role_names()
def get_staff_role_classes() -> dict[str, list[str]]:
"""Get staff role names organized by category.
Returns:
Dictionary mapping category to list of role names.
Example: {'CURATORIAL': ['Curator', 'CollectionsManager'], ...}
"""
return get_heritage_schema().get_staff_roles_by_category()
def get_staff_roles_prompt() -> str:
"""Get formatted staff roles for DSPy prompts."""
return get_heritage_schema().format_staff_roles_for_prompt()
def get_staff_role_categories_prompt() -> str:
"""Get formatted staff role categories for DSPy prompts."""
return get_heritage_schema().format_staff_role_categories_for_prompt()
# =============================================================================
# Schema-Aware Signature Helpers
# =============================================================================
def create_schema_aware_sparql_docstring() -> str:
"""Create docstring for SPARQL generator with schema-derived prefixes."""
schema = get_heritage_schema()
# Build prefix section
prefix_lines = []
for prefix, info in list(schema.prefixes.items())[:15]: # Top 15
prefix_lines.append(f" - PREFIX {prefix}: <{info.uri}>")
# Build class section
class_lines = []
for cls_name, cls_def in schema.classes.items():
uri = cls_def.class_uri or f"hc:{cls_name}"
desc = (cls_def.description or "").split("\n")[0][:60]
class_lines.append(f" - {uri} ({cls_name}): {desc}")
# Build property section
prop_lines = []
for slot_name, slot_def in list(schema.slots.items())[:10]:
uri = slot_def.slot_uri or f"hc:{slot_name}"
desc = (slot_def.description or "").split("\n")[0][:60]
prop_lines.append(f" - {uri}: {desc}")
docstring = f"""Generate SPARQL queries for heritage custodian knowledge graph.
You are an expert in SPARQL and the Heritage Custodian Ontology (v{schema.version}).
Generate valid SPARQL queries that work with our Oxigraph endpoint.
Ontology Prefixes (MUST USE THESE EXACT URIs):
{chr(10).join(prefix_lines)}
Key Classes:
{chr(10).join(class_lines[:8])}
Key Properties:
{chr(10).join(prop_lines)}
Hub Architecture:
- Custodian (crm:E39_Actor) is the central hub entity
- CustodianObservation contains evidence from sources
- CustodianName holds standardized emic names
- CustodianLegalStatus holds formal legal entity info
- CustodianPlace holds geographic location
- CustodianCollection holds heritage collections
"""
return docstring
def create_schema_aware_entity_docstring() -> str:
"""Create docstring for entity extractor with schema-derived types.
Includes multilingual synonyms with language tags when ontology_mapping
module is available, enabling better entity recognition across languages.
"""
schema = get_heritage_schema()
type_lines = []
for ct in schema.custodian_types:
# Extract first part of description
desc = ct.description.split("(")[0].strip() if ct.description else ct.name
type_lines.append(f" - {ct.name}: {desc}")
# Build multilingual synonym section with language tags
synonym_lines = []
try:
# Import dynamically to avoid circular imports
from backend.rag.ontology_mapping import get_ontology_mapper
mapper = get_ontology_mapper()
# Key types to include synonyms for
key_types = [
"MUSEUM", "LIBRARY", "ARCHIVE", "GALLERY", "RESEARCH_CENTER",
"EDUCATION_PROVIDER", "HOLY_SACRED_SITE", "BIO_CUSTODIAN",
]
for custodian_type in key_types:
by_lang = mapper.get_all_synonyms_by_language(
custodian_type, "CustodianPrimaryTypeEnum"
)
tagged_syns: list[str] = []
# Sort languages for consistent output
for lang in sorted(by_lang.keys()):
if lang == "all": # Skip the aggregate 'all' key
continue
syns = by_lang[lang]
# Take up to 2 synonyms per language
for syn in sorted(syns)[:2]:
tagged_syns.append(f"{syn} ({lang})")
if tagged_syns:
# Limit to 6 total synonyms per type for brevity
synonym_lines.append(f" - {custodian_type}: {', '.join(tagged_syns[:6])}")
logger.debug(f"Built multilingual synonyms for {len(synonym_lines)} types")
except ImportError:
logger.warning("ontology_mapping not available, using static synonyms")
# Fallback to static synonyms without language tags
synonym_lines = [
' - MUSEUM: "museum", "musea", "museo", "musée"',
' - LIBRARY: "library", "bibliotheek", "bibliothèque"',
' - ARCHIVE: "archive", "archief", "archiv"',
' - GALLERY: "gallery", "galerie"',
]
except Exception as e:
logger.warning(f"Could not build multilingual synonyms: {e}")
synonym_lines = []
# Format synonym section
if synonym_lines:
synonym_section = f"""
MULTILINGUAL SYNONYMS (term + language code):
{chr(10).join(synonym_lines)}
"""
else:
synonym_section = ""
docstring = f"""Extract heritage-specific entities from text.
Identify institutions, places, dates, identifiers, and relationships
following the Heritage Custodian Ontology (v{schema.version}).
Institution Type Classification (GLAMORCUBESFIXPHDNT taxonomy):
{chr(10).join(type_lines)}
Entity Types to Extract:
- INSTITUTIONS: Heritage custodians with type classification
- PLACES: Geographic locations (cities, regions, countries)
- TEMPORAL: Dates and time periods (founding, closure, events)
- IDENTIFIERS: ISIL codes (NL-XXXX), Wikidata IDs (Q12345), GHCIDs
{synonym_section}
When extracting institution types, recognize synonyms in ANY language
and map them to the canonical GLAMORCUBESFIXPHDNT type.
"""
return docstring
# =============================================================================
# OpenAI Prompt Caching Helpers
# =============================================================================
def create_cacheable_docstring(signature_docstring: str) -> str:
"""Create a cacheable docstring by prepending ontology context.
OpenAI prompt caching requires 1024+ tokens at the START of the prompt.
This function prepends the full ontology context (1,200+ tokens) to any
signature docstring, ensuring it will be cached.
The ontology context is STATIC (changes only when schema changes), while
the user's query is DYNAMIC. By structuring prompts as:
[STATIC ontology context] + [signature-specific instructions] + [user input]
We maximize cache hit rates and reduce both latency and costs.
Benefits:
- 50% cost reduction on cached input tokens
- Up to 80% latency reduction
- Automatic with OpenAI API (no explicit cache management)
Args:
signature_docstring: The original DSPy signature docstring
Returns:
Merged docstring with ontology context prepended (1,200+ tokens base)
Example:
>>> original = "Classify query intent..." # 50 tokens
>>> cacheable = create_cacheable_docstring(original) # 1,250+ tokens
"""
ontology_context = get_ontology_context()
# Add a separator for clarity
merged = f"""{ontology_context}
============================================================
TASK-SPECIFIC INSTRUCTIONS
============================================================
{signature_docstring}"""
return merged
def get_cacheable_sparql_docstring() -> str:
"""Get SPARQL generator docstring with ontology context for caching.
Returns a docstring with 1,500+ tokens, ensuring OpenAI prompt caching.
"""
return create_cacheable_docstring(create_schema_aware_sparql_docstring())
def create_schema_aware_person_sparql_docstring() -> str:
"""Create docstring for Person SPARQL generator with schema-derived properties.
Dynamically loads person properties from PersonObservation slot_usage in LinkML
schema, ensuring the SPARQL generator uses correct predicates like schema:jobTitle.
Preserves domain-specific role terms (Dutch/English job titles) since this
knowledge isn't captured in the LinkML schema.
"""
schema = get_heritage_schema()
# Get prefixes for person queries
prefix_lines = []
for prefix in ["schema", "foaf", "hc", "prov", "skos"]:
if prefix in schema.prefixes:
prefix_lines.append(f" PREFIX {prefix}: <{schema.prefixes[prefix].uri}>")
# Get person properties from PersonObservation slot_usage (Phase 1 work)
person_props = schema.format_person_properties_for_prompt()
docstring = f"""Generate SPARQL queries for heritage person/staff queries.
You are an expert in SPARQL and the Heritage Person data model (v{schema.version}).
Generate valid SPARQL queries for finding people in heritage institutions.
REQUIRED PREFIXES:
{chr(10).join(prefix_lines)}
MAIN CLASS:
- schema:Person - Person records
{person_props}
CRITICAL PATTERN:
Organization names are often embedded IN the jobTitle, not in a separate field.
Use FILTER(CONTAINS(LCASE(?jobTitle), "organization name")) to find people at specific organizations.
ROLE TERMS (use in FILTER patterns with OR combinations):
Leadership (English): director, executive director, CEO, deputy director, assistant director,
head, chief, manager, team lead, coordinator, supervisor
Leadership (Dutch): directeur, adjunct-directeur, hoofd, manager, teamleider, teammanager,
coördinator, leidinggevende, afdelingshoofd
Governance (English): chair, chairman, chairperson, president, vice president, secretary,
treasurer, board member, trustee
Governance (Dutch): voorzitter, vice-voorzitter, secretaris, penningmeester, bestuurslid,
bestuursvoorzitter
Curatorial (English): curator, senior curator, chief curator, collections manager,
registrar, conservator
Curatorial (Dutch): conservator, collectiebeheerder, registrar
Archival (English): archivist, senior archivist, digital archivist, records manager,
archival manager, processing archivist
Archival (Dutch): archivaris, archiefmedewerker, informatiespecialist
Library (English): librarian, chief librarian, reference librarian, cataloger
Library (Dutch): bibliothecaris, catalogiseur
Research (English): researcher, historian, genealogist, research fellow
Research (Dutch): onderzoeker, historicus, genealoog
Digital (English): digital preservation specialist, digitization specialist, data manager,
metadata specialist, developer, IT specialist
Digital (Dutch): digitaliseringsmedewerker, datamanager, ICT-medewerker
Education (English): educator, education officer, tour guide, docent
Education (Dutch): educatiemedewerker, gids, rondleider
ALWAYS EXCLUDE anonymous profiles:
FILTER(!CONTAINS(LCASE(?name), "linkedin member"))
EXAMPLE QUERY - Find managers at Nationaal Archief:
```sparql
PREFIX schema: <http://schema.org/>
SELECT DISTINCT ?name ?jobTitle WHERE {{
?person a schema:Person ;
schema:name ?name ;
schema:jobTitle ?jobTitle .
FILTER(CONTAINS(LCASE(?jobTitle), "nationaal archief"))
FILTER(CONTAINS(LCASE(?jobTitle), "manager") ||
CONTAINS(LCASE(?jobTitle), "hoofd") ||
CONTAINS(LCASE(?jobTitle), "directeur") ||
CONTAINS(LCASE(?jobTitle), "teamleider"))
FILTER(!CONTAINS(LCASE(?name), "linkedin member"))
}}
ORDER BY ?name
LIMIT 50
```
EXAMPLE QUERY - Find all archivists:
```sparql
PREFIX schema: <http://schema.org/>
SELECT DISTINCT ?name ?jobTitle WHERE {{
?person a schema:Person ;
schema:name ?name ;
schema:jobTitle ?jobTitle .
FILTER(CONTAINS(LCASE(?jobTitle), "archiv") ||
CONTAINS(LCASE(?jobTitle), "archivist"))
FILTER(!CONTAINS(LCASE(?name), "linkedin member"))
}}
ORDER BY ?name
LIMIT 100
```
EXAMPLE QUERY - Find curators at a specific museum:
```sparql
PREFIX schema: <http://schema.org/>
SELECT DISTINCT ?name ?jobTitle WHERE {{
?person a schema:Person ;
schema:name ?name ;
schema:jobTitle ?jobTitle .
FILTER(CONTAINS(LCASE(?jobTitle), "rijksmuseum"))
FILTER(CONTAINS(LCASE(?jobTitle), "curator") ||
CONTAINS(LCASE(?jobTitle), "conservator"))
FILTER(!CONTAINS(LCASE(?name), "linkedin member"))
}}
ORDER BY ?name
```
"""
return docstring
def get_cacheable_person_sparql_docstring() -> str:
"""Get Person SPARQL generator docstring with ontology context for caching.
Returns a docstring with 1,500+ tokens, ensuring OpenAI prompt caching.
Uses PersonObservation slot_usage from LinkML schema for predicate URIs.
"""
return create_cacheable_docstring(create_schema_aware_person_sparql_docstring())
def get_cacheable_entity_docstring() -> str:
"""Get entity extractor docstring with ontology context for caching.
Returns a docstring with 1,500+ tokens, ensuring OpenAI prompt caching.
"""
return create_cacheable_docstring(create_schema_aware_entity_docstring())
def get_cacheable_query_intent_docstring() -> str:
"""Get query intent docstring with ontology context for caching.
Combines:
- Full ontology context (1,200+ tokens)
- Staff role categories and mappings
- Custodian type definitions
- Multilingual synonyms
Returns a docstring with 2,000+ tokens, ensuring maximum cache utilization.
"""
schema = get_heritage_schema()
# Build staff role context
role_categories = schema.get_staff_role_category_names()
role_cat_list = ", ".join(role_categories)
roles_by_category = schema.get_staff_roles_by_category()
role_examples = []
for cat, roles in list(roles_by_category.items())[:5]:
role_examples.append(f" - {cat}: {', '.join(roles[:3])}")
role_mapping_context = "\n".join(role_examples)
# Build custodian type context
type_examples = ", ".join(ct.name for ct in schema.custodian_types[:15])
query_intent_docstring = f"""Classify the intent of a heritage institution query.
You are an expert in GLAM (Galleries, Libraries, Archives, Museums) heritage institutions.
Classify the user's query intent to route to appropriate data sources and retrieval strategies.
STAFF ROLE CATEGORIES ({len(role_categories)} categories):
{role_cat_list}
STAFF ROLE CATEGORY → ROLE MAPPING (examples):
{role_mapping_context}
CUSTODIAN TYPES ({len(schema.custodian_types)} types):
{type_examples}
CLASSIFICATION GUIDELINES:
- When entity_type='person', classify the role category and specific role
- When entity_type='institution', classify the custodian type
- Use 'UNKNOWN' when classification is not determinable
- Infer institution type from names (e.g., 'Rijksmuseum' → MUSEUM)
"""
return create_cacheable_docstring(query_intent_docstring)
def get_cacheable_answer_docstring() -> str:
"""Get answer generator docstring with ontology context for caching.
Combines:
- Full ontology context (1,200+ tokens)
- Key ontology terms for answer synthesis
- Heritage custodian terminology
Returns a docstring with 1,500+ tokens, ensuring OpenAI prompt caching.
"""
schema = get_heritage_schema()
# Build entity types context
type_context = schema.format_entity_types_for_prompt()
answer_docstring = f"""Generate informative answers about heritage institutions.
You are an expert on heritage custodians following the Heritage Custodian Ontology (v{schema.version}).
Synthesize retrieved information into helpful, accurate responses that:
- Use correct ontology terminology
- Cite sources appropriately
- Include relevant heritage-specific details
Use conversation history to maintain context across multiple turns.
For follow-up questions, resolve pronouns and implicit references
using the previous conversation context.
{type_context}
KEY ONTOLOGY TERMS:
- Custodian: Central hub entity (crm:E39_Actor) representing heritage keepers
- CustodianObservation: Source-based evidence from documents/websites
- CustodianName: Standardized emic (native) names
- CustodianLegalStatus: Formal legal entity information
- CustodianPlace: Geographic location with coordinates
- CustodianCollection: Heritage collections managed
ANSWER GUIDELINES:
- Always prefer ontology-aligned terminology in answers
- When discussing institution types, use GLAMORCUBESFIXPHDNT taxonomy
- Include temporal context (founding dates, historical changes) when relevant
- Reference specific collections, holdings, or digital platforms when known
"""
return create_cacheable_docstring(answer_docstring)
# =============================================================================
# SPECIFICITY-AWARE CONTEXT GENERATION
# =============================================================================
#
# These functions integrate with the specificity scoring system to provide
# filtered class lists based on query context. This reduces noise in DSPy
# prompts by only including classes relevant to the query type.
#
# See: backend/rag/specificity/ for the core scoring implementation
# See: .opencode/rules/specificity-score-convention.md for annotation rules
# Lazy import to avoid circular dependencies
_SPECIFICITY_AVAILABLE = False
_specificity_lookup = None
_ContextTemplate = None
def _ensure_specificity_loaded():
"""Lazily load specificity module to avoid circular imports."""
global _SPECIFICITY_AVAILABLE, _specificity_lookup, _ContextTemplate
if _SPECIFICITY_AVAILABLE:
return True
try:
from backend.rag.specificity import (
get_specificity_lookup,
ContextTemplate,
)
_specificity_lookup = get_specificity_lookup()
_ContextTemplate = ContextTemplate
_SPECIFICITY_AVAILABLE = True
logger.info(f"Specificity system loaded: {len(_specificity_lookup.get_all_scores())} classes")
return True
except ImportError as e:
logger.warning(f"Specificity system not available: {e}")
return False
def get_filtered_classes_for_context(
context_template: str,
threshold: float = 0.6,
) -> list[str]:
"""Get class names filtered by specificity score for a given context template.
Args:
context_template: One of the 10 context template names:
- archive_search, museum_search, library_search
- collection_discovery, person_research, location_browse
- identifier_lookup, organizational_change, digital_platform
- general_heritage
threshold: Maximum specificity score to include (default 0.6).
Lower scores = more broadly relevant = more likely included.
Returns:
List of class names that pass the threshold filter for the given template.
Returns empty list if specificity system is not available.
Example:
>>> classes = get_filtered_classes_for_context("archive_search", 0.6)
>>> len(classes) # ~150 classes instead of 627
152
>>> "Archive" in classes
True
"""
if not _ensure_specificity_loaded():
logger.warning("Returning empty list - specificity system not available")
return []
# Convert string to enum
try:
template_enum = _ContextTemplate(context_template)
except ValueError:
logger.warning(f"Unknown context template: {context_template}, using general_heritage")
template_enum = _ContextTemplate.GENERAL_HERITAGE
return _specificity_lookup.get_classes_for_template(template_enum, threshold)
def get_filtered_class_scores_for_context(
context_template: str,
threshold: float = 0.6,
) -> list[tuple[str, float]]:
"""Get class names with their scores for a given context template.
Args:
context_template: Context template name (see get_filtered_classes_for_context)
threshold: Maximum specificity score to include
Returns:
List of (class_name, score) tuples, sorted by score ascending.
Lower scores indicate more broadly relevant classes.
Example:
>>> scores = get_filtered_class_scores_for_context("archive_search", 0.6)
>>> scores[0] # Most relevant class
('Custodian', 0.20)
"""
if not _ensure_specificity_loaded():
return []
try:
template_enum = _ContextTemplate(context_template)
except ValueError:
template_enum = _ContextTemplate.GENERAL_HERITAGE
# get_filtered_scores returns dict[str, SpecificityScore]
score_dict = _specificity_lookup.get_filtered_scores(template_enum, threshold)
# Convert to list of (class_name, score) tuples, sorted by score ascending
result = [
(class_name, score_obj.get_score(template_enum))
for class_name, score_obj in score_dict.items()
]
# Sort by score (ascending - lower scores first = more relevant)
result.sort(key=lambda x: x[1])
return result
def format_filtered_ontology_context(
context_template: str,
threshold: float = 0.6,
max_classes: int = 50,
) -> str:
"""Format ontology context filtered by specificity for DSPy prompts.
This provides a filtered version of format_ontology_context_for_prompt()
that only includes classes relevant to the query context.
Args:
context_template: Context template name
threshold: Maximum specificity score to include
max_classes: Maximum number of classes to include in context
Returns:
Formatted string suitable for DSPy signature docstrings.
Example:
>>> context = format_filtered_ontology_context("archive_search", 0.6)
>>> "Archive" in context
True
>>> "MuseumType" in context # Should be filtered out
False
"""
schema = get_heritage_schema()
if not _ensure_specificity_loaded():
# Fall back to standard context if specificity not available
return schema.format_ontology_context_for_prompt()
# Get filtered classes with scores
class_scores = get_filtered_class_scores_for_context(context_template, threshold)
# Limit to max_classes, prioritizing lowest scores (most relevant)
class_scores = class_scores[:max_classes]
class_names = {name for name, _ in class_scores}
# Build filtered context
sections = [
"=" * 60,
f"HERITAGE CUSTODIAN ONTOLOGY CONTEXT ({context_template})",
"=" * 60,
"",
f"Filtered to {len(class_names)} relevant classes (threshold: {threshold})",
"",
"Hub Architecture:",
" - Custodian (crm:E39_Actor): Central hub entity",
" - CustodianObservation: Evidence from sources",
" - CustodianName: Standardized emic names",
" - CustodianLegalStatus: Formal legal entity",
" - CustodianPlace: Geographic location",
" - CustodianCollection: Heritage collections",
"",
]
# Add filtered classes section
sections.append("Relevant Classes (by specificity):")
for class_name, score in class_scores[:20]: # Top 20 most relevant
if class_name in schema.classes:
cls_def = schema.classes[class_name]
uri = cls_def.class_uri or f"hc:{class_name}"
desc = (cls_def.description or "").split("\n")[0][:50]
sections.append(f" - {class_name} ({uri}): {desc}... [score: {score:.2f}]")
else:
sections.append(f" - {class_name} [score: {score:.2f}]")
if len(class_scores) > 20:
sections.append(f" ... and {len(class_scores) - 20} more classes")
sections.extend([
"",
"Key Ontology Prefixes:",
])
for prefix, info in list(schema.prefixes.items())[:10]:
sections.append(f" PREFIX {prefix}: <{info.uri}>")
sections.extend([
"",
"=" * 60,
])
return "\n".join(sections)
def create_specificity_aware_sparql_docstring(
context_template: str,
threshold: float = 0.6,
) -> str:
"""Create SPARQL generator docstring filtered by specificity.
This is a template-specific version of create_schema_aware_sparql_docstring()
that only includes classes relevant to the query type.
Args:
context_template: Context template name
threshold: Maximum specificity score to include
Returns:
Docstring for SPARQL generator with filtered ontology context.
"""
schema = get_heritage_schema()
# Get filtered classes
filtered_classes = set(get_filtered_classes_for_context(context_template, threshold))
# Build prefix section (same as standard)
prefix_lines = []
for prefix, info in list(schema.prefixes.items())[:15]:
prefix_lines.append(f" - PREFIX {prefix}: <{info.uri}>")
# Build filtered class section
class_lines = []
for cls_name, cls_def in schema.classes.items():
if cls_name not in filtered_classes:
continue
uri = cls_def.class_uri or f"hc:{cls_name}"
desc = (cls_def.description or "").split("\n")[0][:60]
class_lines.append(f" - {uri} ({cls_name}): {desc}")
# Limit to 12 classes for prompt length
class_lines = class_lines[:12]
# Build property section (same as standard)
prop_lines = []
for slot_name, slot_def in list(schema.slots.items())[:10]:
uri = slot_def.slot_uri or f"hc:{slot_name}"
desc = (slot_def.description or "").split("\n")[0][:60]
prop_lines.append(f" - {uri}: {desc}")
docstring = f"""Generate SPARQL queries for heritage custodian knowledge graph.
You are an expert in SPARQL and the Heritage Custodian Ontology (v{schema.version}).
Generate valid SPARQL queries that work with our Oxigraph endpoint.
Query context: {context_template} (filtered to relevant classes)
Ontology Prefixes (MUST USE THESE EXACT URIs):
{chr(10).join(prefix_lines)}
Relevant Classes ({len(filtered_classes)} total, showing top {len(class_lines)}):
{chr(10).join(class_lines)}
Key Properties:
{chr(10).join(prop_lines)}
Hub Architecture:
- Custodian (crm:E39_Actor) is the central hub entity
- CustodianObservation contains evidence from sources
- CustodianName holds standardized emic names
- CustodianLegalStatus holds formal legal entity info
- CustodianPlace holds geographic location
- CustodianCollection holds heritage collections
"""
return docstring
def get_available_context_templates() -> list[str]:
"""Get list of available context template names.
Returns:
List of 10 context template names that can be used with
the specificity-aware functions.
"""
if not _ensure_specificity_loaded():
return [
"archive_search", "museum_search", "library_search",
"collection_discovery", "person_research", "location_browse",
"identifier_lookup", "organizational_change", "digital_platform",
"general_heritage",
]
return [t.value for t in _ContextTemplate]
def get_class_count_by_template(threshold: float = 0.6) -> dict[str, int]:
"""Get count of classes that pass threshold for each template.
Useful for debugging and understanding filtering behavior.
Args:
threshold: Maximum specificity score
Returns:
Dictionary mapping template name to class count.
Example:
>>> counts = get_class_count_by_template(0.6)
>>> counts
{'archive_search': 152, 'museum_search': 148, ...}
"""
if not _ensure_specificity_loaded():
return {}
return {
template.value: len(_specificity_lookup.get_classes_for_template(template, threshold))
for template in _ContextTemplate
}
if __name__ == "__main__":
# Test the schema loader
logging.basicConfig(level=logging.INFO)
schema = get_heritage_schema()
print("\n=== SCHEMA LOADED ===")
print(f"Name: {schema.name}")
print(f"Version: {schema.version}")
print(f"Classes: {len(schema.classes)}")
print(f"Slots: {len(schema.slots)}")
print(f"Custodian Types: {len(schema.custodian_types)}")
print("\n=== SPARQL PREFIXES ===")
print(schema.get_sparql_prefixes())
print("\n=== CUSTODIAN TYPES ===")
for ct in schema.custodian_types[:5]:
desc = ct.description[:60] if ct.description else "(no description)"
print(f" - {ct.name}: {desc}...")
print("\n=== ONTOLOGY CONTEXT (for DSPy) ===")
print(schema.format_ontology_context_for_prompt()[:1000])
print("\n=== SCHEMA-AWARE SPARQL DOCSTRING ===")
print(create_schema_aware_sparql_docstring()[:1500])