diff --git a/backend/rag/dspy_heritage_rag.py b/backend/rag/dspy_heritage_rag.py index 6b1b0d5c1d..c6705b06a4 100644 --- a/backend/rag/dspy_heritage_rag.py +++ b/backend/rag/dspy_heritage_rag.py @@ -23,11 +23,11 @@ import logging from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum -from typing import Any, AsyncIterator, Callable, Literal, Optional, TYPE_CHECKING +from typing import Any, AsyncIterator, Callable, Literal, Optional, TYPE_CHECKING, cast -import dspy # type: ignore[import-unresolved] -from dspy import Example, Prediction, History # type: ignore[import-unresolved] -from dspy.streaming import StatusMessage, StreamListener, StatusMessageProvider # type: ignore[import-unresolved] +import dspy +from dspy import Example, Prediction, History +from dspy.streaming import StatusMessage, StreamListener, StatusMessageProvider logger = logging.getLogger(__name__) @@ -37,8 +37,8 @@ get_cache: Optional[Callable[[], Any]] = None should_bypass_cache: Optional[Callable[[str], bool]] = None try: - from .semantic_cache import get_cache as _get_cache, HeritageSemanticCache # type: ignore[import-unresolved] - from .cache_config import should_bypass_cache as _should_bypass_cache # type: ignore[import-unresolved] + from .semantic_cache import get_cache as _get_cache, HeritageSemanticCache + from .cache_config import should_bypass_cache as _should_bypass_cache get_cache = _get_cache should_bypass_cache = _should_bypass_cache SEMANTIC_CACHE_AVAILABLE = True @@ -54,9 +54,13 @@ get_ontology_context: Optional[Callable[[], str]] = None get_entity_types_prompt: Optional[Callable[[], str]] = None create_schema_aware_sparql_docstring: Optional[Callable[[], str]] = None create_schema_aware_entity_docstring: Optional[Callable[[], str]] = None +# Staff role functions +get_staff_role_categories: Optional[Callable[[], list[str]]] = None +get_all_staff_roles: Optional[Callable[[], list[str]]] = None +get_staff_role_classes: Optional[Callable[[], dict[str, list[str]]]] = None try: - from .schema_loader import ( # type: ignore[import-unresolved] + from .schema_loader import ( get_heritage_schema as _get_heritage_schema, get_sparql_prefixes as _get_sparql_prefixes, get_custodian_types as _get_custodian_types, @@ -64,6 +68,9 @@ try: get_entity_types_prompt as _get_entity_types_prompt, create_schema_aware_sparql_docstring as _create_schema_aware_sparql_docstring, create_schema_aware_entity_docstring as _create_schema_aware_entity_docstring, + get_staff_role_categories as _get_staff_role_categories, + get_all_staff_roles as _get_all_staff_roles, + get_staff_role_classes as _get_staff_role_classes, ) get_heritage_schema = _get_heritage_schema get_sparql_prefixes = _get_sparql_prefixes @@ -72,10 +79,46 @@ try: get_entity_types_prompt = _get_entity_types_prompt create_schema_aware_sparql_docstring = _create_schema_aware_sparql_docstring create_schema_aware_entity_docstring = _create_schema_aware_entity_docstring + get_staff_role_categories = _get_staff_role_categories + get_all_staff_roles = _get_all_staff_roles + get_staff_role_classes = _get_staff_role_classes SCHEMA_LOADER_AVAILABLE = True except ImportError: logger.info("Schema loader not available - using static signatures") +# Ontology mapper imports (graceful degradation if not available) +# Provides multilingual matching and heritage code lookups from LinkML schema +ONTOLOGY_MAPPER_AVAILABLE = False +get_ontology_mapper: Optional[Callable[[], Any]] = None +match_custodian_type: Optional[Callable[[str], Optional[str]]] = None +match_museum_type: Optional[Callable[[str], Optional[str]]] = None +match_digital_platform_type: Optional[Callable[[str], Optional[str]]] = None +get_heritage_code: Optional[Callable[[str], Optional[str]]] = None +get_custodian_type_mapping: Optional[Callable[[], dict[str, str]]] = None +get_role_keywords: Optional[Callable[[], dict[str, list[str]]]] = None + +try: + from .ontology_mapping import ( + get_ontology_mapper as _get_ontology_mapper, + match_custodian_type as _match_custodian_type, + match_museum_type as _match_museum_type, + match_digital_platform_type as _match_digital_platform_type, + get_heritage_code as _get_heritage_code, + get_custodian_type_mapping as _get_custodian_type_mapping, + get_role_keywords as _get_role_keywords, + ) + get_ontology_mapper = _get_ontology_mapper + match_custodian_type = _match_custodian_type + match_museum_type = _match_museum_type + match_digital_platform_type = _match_digital_platform_type + get_heritage_code = _get_heritage_code + get_custodian_type_mapping = _get_custodian_type_mapping + get_role_keywords = _get_role_keywords + ONTOLOGY_MAPPER_AVAILABLE = True + logger.info("Ontology mapper loaded - multilingual matching enabled") +except ImportError: + logger.info("Ontology mapper not available - multilingual matching disabled") + # ============================================================================= # 1. HERITAGE-SPECIFIC SIGNATURES @@ -121,6 +164,16 @@ class HeritageQueryIntent(dspy.Signature): desc="Fully resolved question with pronouns/references replaced using conversation history" ) + entity_type: Literal["person", "institution", "both"] = dspy.OutputField( + desc="Whether the query seeks information about people/staff (person), " + "organizations/institutions (institution), or both. " + "Use 'person' for queries about employees, directors, curators, archivists, " + "librarians, staff, experts, professionals, or anyone working at an institution. " + "Use 'institution' for queries about museums, archives, libraries, galleries, " + "collections, buildings, or organizations themselves. " + "Use 'both' when the query relates to both people AND institutions." + ) + reasoning: str = dspy.OutputField(desc="Brief explanation of classification") @@ -246,7 +299,7 @@ class VisualizationSelector(dspy.Signature): # 1b. SCHEMA-AWARE SIGNATURES (Dynamic from LinkML) # ============================================================================= -def _create_schema_aware_sparql_signature(): +def _create_schema_aware_sparql_signature() -> type[dspy.Signature]: """Factory to create SPARQL signature with schema-derived docstring. Uses LinkML schema to inject correct prefixes, classes, and properties @@ -277,7 +330,7 @@ def _create_schema_aware_sparql_signature(): return HeritageSPARQLGenerator -def _create_schema_aware_entity_signature(): +def _create_schema_aware_entity_signature() -> type[dspy.Signature]: """Factory to create entity extractor signature with schema-derived types. Uses LinkML schema to inject GLAMORCUBESFIXPHDNT taxonomy and @@ -317,7 +370,7 @@ def _create_schema_aware_entity_signature(): return HeritageEntityExtractor -def _create_schema_aware_answer_signature(): +def _create_schema_aware_answer_signature() -> type[dspy.Signature]: """Factory to create answer generator with ontology context. Injects heritage custodian ontology terminology and structure @@ -388,7 +441,7 @@ _schema_aware_entity_signature = None _schema_aware_answer_signature = None -def get_schema_aware_sparql_signature(): +def get_schema_aware_sparql_signature() -> type[dspy.Signature]: """Get cached schema-aware SPARQL signature.""" global _schema_aware_sparql_signature if _schema_aware_sparql_signature is None: @@ -396,7 +449,7 @@ def get_schema_aware_sparql_signature(): return _schema_aware_sparql_signature -def get_schema_aware_entity_signature(): +def get_schema_aware_entity_signature() -> type[dspy.Signature]: """Get cached schema-aware entity extractor signature.""" global _schema_aware_entity_signature if _schema_aware_entity_signature is None: @@ -404,7 +457,7 @@ def get_schema_aware_entity_signature(): return _schema_aware_entity_signature -def get_schema_aware_answer_signature(): +def get_schema_aware_answer_signature() -> type[dspy.Signature]: """Get cached schema-aware answer generator signature.""" global _schema_aware_answer_signature if _schema_aware_answer_signature is None: @@ -412,6 +465,202 @@ def get_schema_aware_answer_signature(): return _schema_aware_answer_signature +def _create_schema_aware_query_intent_signature() -> type[dspy.Signature]: + """Factory to create HeritageQueryIntent with schema-derived valid values. + + Dynamically extends HeritageQueryIntent with ontology-aware classification fields: + - target_role_category: Valid role categories from RoleCategoryEnum (13 categories) + - target_staff_role: Valid staff roles from StaffRoles.yaml (64 roles) + - target_custodian_type: Valid custodian types from CustodianPrimaryTypeEnum (19 types) + + When ONTOLOGY_MAPPER_AVAILABLE, includes multilingual synonyms from schema YAML + comments to help the LLM recognize Dutch/German/French/Spanish terms. + + These fields enable precise routing to the correct Qdrant collection: + - Person queries → heritage_persons collection with role filtering + - Institution queries → heritage_custodians collection with type filtering + + Returns: + A DSPy Signature class with schema-aware field descriptions, or + falls back to HeritageQueryIntent if schema loader is unavailable. + """ + if not SCHEMA_LOADER_AVAILABLE: + logger.warning("Schema loader unavailable, using static query intent signature") + return HeritageQueryIntent + + if get_staff_role_categories is None or get_all_staff_roles is None or get_custodian_types is None: + logger.warning("Staff role or custodian type functions unavailable") + return HeritageQueryIntent + + try: + # Get schema-derived values + role_categories = get_staff_role_categories() # 13 categories + staff_roles = get_all_staff_roles() # 64 roles + custodian_types = get_custodian_types() # 19 types + + # Format for prompt (truncate for brevity) + role_cat_list = ', '.join(role_categories) + role_examples = ', '.join(staff_roles[:20]) + f'... ({len(staff_roles)} total)' + type_examples = ', '.join(custodian_types[:12]) + f'... ({len(custodian_types)} total)' + + # Get role classes mapping for context + role_classes = {} + if get_staff_role_classes is not None: + role_classes = get_staff_role_classes() + role_mapping_context = '\n'.join([ + f" - {cat}: {', '.join(roles[:5])}{' ...' if len(roles) > 5 else ''}" + for cat, roles in list(role_classes.items())[:8] + ]) + + # Build multilingual synonym context if ontology mapper is available + multilingual_context = "" + if ONTOLOGY_MAPPER_AVAILABLE and get_ontology_mapper is not None: + try: + mapper = get_ontology_mapper() + # Get synonyms for key custodian types + type_synonyms = [] + key_types = ["MUSEUM", "LIBRARY", "ARCHIVE", "GALLERY", "RESEARCH_CENTER"] + for ct in key_types: + synonyms = mapper.get_synonyms_for_value(ct, "CustodianPrimaryTypeEnum") + if synonyms: + # Get a sample of synonyms (max 5 per type) + sample = list(synonyms)[:5] + type_synonyms.append(f" - {ct}: {', '.join(sample)}") + + if type_synonyms: + multilingual_context = f""" + + MULTILINGUAL SYNONYMS (recognize these terms): +{chr(10).join(type_synonyms)} + + Note: Users may query in Dutch (bibliotheek, archief, museum), German (Bibliothek, Archiv), + French (bibliothèque, archives), Spanish (biblioteca, archivo), etc. + Always map to the English enum value (e.g., 'bibliotheek' → LIBRARY).""" + logger.debug(f"Added multilingual context with {len(type_synonyms)} type synonym sets") + except Exception as e: + logger.warning(f"Could not load multilingual synonyms: {e}") + + class SchemaAwareQueryIntent(dspy.Signature): + f"""Classify heritage query intent with ontology-aware type prediction. + + You are an expert in GLAM (Galleries, Libraries, Archives, Museums) + heritage institutions. Classify the user's query intent to route + to the appropriate data sources and retrieval strategies. + + Use conversation history to understand context for follow-up questions. + + STAFF ROLE CATEGORIES ({len(role_categories)} categories): + {role_cat_list} + + STAFF ROLE CATEGORY → ROLE MAPPING (examples): + {role_mapping_context} + + CUSTODIAN TYPES ({len(custodian_types)} types): + {type_examples}{multilingual_context} + + When entity_type='person', classify the role category and specific role. + When entity_type='institution', classify the custodian type. + Use 'UNKNOWN' when classification is not determinable. + """ + + # Input fields (same as HeritageQueryIntent) + question: str = dspy.InputField( + desc="User's natural language question about heritage institutions" + ) + language: str = dspy.InputField( + desc="Language code (nl, en, de, fr)", + default="nl" + ) + history: History = dspy.InputField( + desc="Previous conversation turns for context resolution", + default=History(messages=[]) + ) + + # Original output fields + intent: Literal[ + "geographic", # Location, maps, coordinates, nearby + "statistical", # Counts, aggregates, distributions + "relational", # Relationships, networks, connections + "temporal", # History, timelines, changes over time + "entity_lookup", # Specific institution details + "comparative", # Compare institutions + "exploration", # Browse, discover, explore + ] = dspy.OutputField(desc="Primary query intent classification") + + entities: list[str] = dspy.OutputField( + desc="Named entities mentioned (institution names, places, etc.)" + ) + + sources: list[str] = dspy.OutputField( + desc="Recommended data sources: qdrant, sparql, typedb, postgis" + ) + + resolved_question: str = dspy.OutputField( + desc="Fully resolved question with pronouns/references replaced using conversation history" + ) + + entity_type: Literal["person", "institution", "both"] = dspy.OutputField( + desc="Whether the query seeks information about people/staff (person), " + "organizations/institutions (institution), or both. " + "Use 'person' for queries about employees, directors, curators, archivists, " + "librarians, staff, experts, professionals, or anyone working at an institution. " + "Use 'institution' for queries about museums, archives, libraries, galleries, " + "collections, buildings, or organizations themselves. " + "Use 'both' when the query relates to both people AND institutions." + ) + + reasoning: str = dspy.OutputField(desc="Brief explanation of classification") + + # NEW: Schema-aware classification fields + target_role_category: str = dspy.OutputField( + desc=f"When entity_type='person', the staff role CATEGORY being sought. " + f"Valid values: {role_cat_list}. " + f"Use 'UNKNOWN' only if no specific role type is mentioned in the query." + ) + + target_staff_role: str = dspy.OutputField( + desc=f"When entity_type='person', the SPECIFIC staff role being sought. " + f"Examples: {role_examples}. " + f"Use 'UNKNOWN' only if no specific role is mentioned in the query." + ) + + target_custodian_type: str = dspy.OutputField( + desc=f"The type of heritage institution referenced in the query. " + f"Valid values: {type_examples}. " + f"IMPORTANT: Infer from institution names even for person queries. " + f"Examples: 'Rijksmuseum' → MUSEUM, 'Nationaal Archief' → ARCHIVE, " + f"'KB' or 'Koninklijke Bibliotheek' → LIBRARY, 'Stedelijk Museum' → MUSEUM. " + f"Use 'UNKNOWN' ONLY if no institution is mentioned or type cannot be inferred." + ) + + logger.info( + f"Created schema-aware query intent signature with " + f"{len(role_categories)} role categories, {len(staff_roles)} roles, " + f"{len(custodian_types)} custodian types" + ) + return SchemaAwareQueryIntent + + except Exception as e: + logger.warning(f"Failed to create schema-aware query intent signature: {e}") + return HeritageQueryIntent + + +# Lazy-loaded schema-aware query intent signature +_schema_aware_query_intent_signature = None + + +def get_schema_aware_query_intent_signature() -> type[dspy.Signature]: + """Get cached schema-aware query intent signature. + + Returns: + SchemaAwareQueryIntent if schema loader available, else HeritageQueryIntent + """ + global _schema_aware_query_intent_signature + if _schema_aware_query_intent_signature is None: + _schema_aware_query_intent_signature = _create_schema_aware_query_intent_signature() + return _schema_aware_query_intent_signature + + def validate_custodian_type(type_value: str) -> bool: """Validate that a custodian type is valid according to LinkML schema. @@ -442,6 +691,111 @@ def validate_custodian_type(type_value: str) -> bool: return False +def preprocess_multilingual_query(question: str, language: str = "nl") -> dict[str, str | None]: + """Extract and match heritage terms from multilingual queries. + + Uses ontology_mapping to identify heritage-related terms in the question + and match them to schema enum values. This enables pre-classification + of custodian types, museum types, and digital platforms before LLM routing. + + Args: + question: User's natural language question + language: Language code (nl, en, de, fr, es) - currently unused but reserved + for future language-specific preprocessing + + Returns: + Dict with matched enum values: + - custodian_type: CustodianPrimaryTypeEnum value or None + - museum_type: MuseumTypeEnum value or None + - platform_type: DigitalPlatformTypeEnum value or None + - heritage_code: Single-letter GLAMORCUBESFIXPHDNT code or None + + Example: + >>> preprocess_multilingual_query("Welke bibliotheken zijn er in Utrecht?") + {'custodian_type': 'LIBRARY', 'museum_type': None, + 'platform_type': None, 'heritage_code': 'L'} + + >>> preprocess_multilingual_query("virtueel museum Amsterdam") + {'custodian_type': 'MUSEUM', 'museum_type': None, + 'platform_type': 'VIRTUAL_MUSEUM', 'heritage_code': 'M'} + """ + if not ONTOLOGY_MAPPER_AVAILABLE or get_ontology_mapper is None: + return { + "custodian_type": None, + "museum_type": None, + "platform_type": None, + "heritage_code": None, + } + + try: + mapper = get_ontology_mapper() + + # Match against different enum types + custodian_type = mapper.match_natural_language(question, "CustodianPrimaryTypeEnum") + museum_type = mapper.match_natural_language(question, "MuseumTypeEnum") + platform_type = mapper.match_natural_language(question, "DigitalPlatformTypeEnum") + + # Get heritage code if custodian type was matched + heritage_code = None + if custodian_type: + heritage_code = mapper.get_heritage_type_code(custodian_type) + + return { + "custodian_type": custodian_type, + "museum_type": museum_type, + "platform_type": platform_type, + "heritage_code": heritage_code, + } + except Exception as e: + logger.warning(f"Error in multilingual preprocessing: {e}") + return { + "custodian_type": None, + "museum_type": None, + "platform_type": None, + "heritage_code": None, + } + + +def normalize_custodian_type(type_value: str) -> str | None: + """Normalize and validate a custodian type value. + + Uses ontology_mapping for fuzzy matching if the value doesn't match + exactly. This handles cases like: + - LLM outputs "museum" instead of "MUSEUM" + - LLM outputs "bibliotheek" instead of "LIBRARY" + - Typos or partial matches + + Args: + type_value: The custodian type string to normalize + + Returns: + Normalized CustodianPrimaryTypeEnum value, or None if no match + + Example: + >>> normalize_custodian_type("MUSEUM") + "MUSEUM" + >>> normalize_custodian_type("bibliotheek") + "LIBRARY" + >>> normalize_custodian_type("archief") + "ARCHIVE" + >>> normalize_custodian_type("xyz123") + None + """ + # First try exact match (uppercase) + upper_value = type_value.upper() + if validate_custodian_type(upper_value): + return upper_value + + # If not exact match, try fuzzy matching via ontology mapper + if ONTOLOGY_MAPPER_AVAILABLE and match_custodian_type is not None: + matched = match_custodian_type(type_value) + if matched: + return matched + + # No match found + return None + + # ============================================================================= # 2. DSPy MODULES # ============================================================================= @@ -450,11 +804,33 @@ class HeritageQueryRouter(dspy.Module): """DSPy module for intelligent query routing. Replaces keyword-based routing with LLM-powered intent classification. + Uses schema-aware signatures when available to enable ontology-aware + classification of staff roles and custodian types. + + Args: + use_schema_aware: Whether to use LinkML schema-aware signature. + If True, uses SchemaAwareQueryIntent with role/type classification. + If False, uses static HeritageQueryIntent. + If None (default), auto-detects based on schema loader availability. """ - def __init__(self): + def __init__(self, use_schema_aware: Optional[bool] = None): super().__init__() - self.classifier = dspy.ChainOfThought(HeritageQueryIntent) + + # Determine whether to use schema-aware signature + if use_schema_aware is None: + use_schema_aware = SCHEMA_LOADER_AVAILABLE + self.use_schema_aware = use_schema_aware + + # Use schema-aware signature if available for better role/type classification + if use_schema_aware: + signature = get_schema_aware_query_intent_signature() + logger.info("HeritageQueryRouter using schema-aware query intent signature") + else: + signature = HeritageQueryIntent + logger.info("HeritageQueryRouter using static query intent signature") + + self.classifier = dspy.ChainOfThought(signature) # Source routing based on intent self.source_mapping = { @@ -474,6 +850,18 @@ class HeritageQueryRouter(dspy.Module): question: User's current question language: Language code (nl, en, etc.) history: Previous conversation turns for context resolution + + Returns: + Prediction with fields: + - intent: Query intent classification + - entities: Named entities mentioned + - sources: Recommended data sources + - reasoning: Explanation of classification + - resolved_question: Question with references resolved + - entity_type: 'person', 'institution', or 'both' + - target_role_category: Staff role category (when entity_type='person') + - target_staff_role: Specific staff role (when entity_type='person') + - target_custodian_type: Custodian type (when entity_type='institution') """ if history is None: history = History(messages=[]) @@ -485,13 +873,61 @@ class HeritageQueryRouter(dspy.Module): result.intent, ["qdrant", "sparql"] ) - return Prediction( + # Extract raw classification results + target_custodian_type = getattr(result, 'target_custodian_type', 'UNKNOWN') + target_role_category = getattr(result, 'target_role_category', 'UNKNOWN') + target_staff_role = getattr(result, 'target_staff_role', 'UNKNOWN') + entity_type = getattr(result, 'entity_type', 'institution') + + # Post-process with ontology mapper for validation/correction + # This helps when the LLM returns a natural language term instead of the enum value + if target_custodian_type and target_custodian_type != 'UNKNOWN': + normalized = normalize_custodian_type(target_custodian_type) + if normalized: + if normalized != target_custodian_type: + logger.debug( + f"Normalized custodian type: '{target_custodian_type}' -> '{normalized}'" + ) + target_custodian_type = normalized + else: + # Normalization failed - LLM returned something we can't map + logger.warning( + f"Could not normalize custodian type: '{target_custodian_type}', " + "trying query extraction" + ) + # Fall back to direct query extraction using multilingual matching + preprocessed = preprocess_multilingual_query(question, language) + if preprocessed.get('custodian_type'): + target_custodian_type = preprocessed['custodian_type'] + logger.debug( + f"Extracted custodian type from query: '{target_custodian_type}'" + ) + else: + target_custodian_type = 'UNKNOWN' + elif target_custodian_type == 'UNKNOWN' and entity_type == 'institution': + # LLM didn't classify - try extracting from question directly + preprocessed = preprocess_multilingual_query(question, language) + if preprocessed.get('custodian_type'): + target_custodian_type = preprocessed['custodian_type'] + logger.debug( + f"Fallback: extracted custodian type from query: '{target_custodian_type}'" + ) + + # Build prediction with all fields (including validated schema-aware fields) + prediction = Prediction( intent=result.intent, entities=result.entities, sources=recommended_sources, reasoning=result.reasoning, resolved_question=getattr(result, 'resolved_question', question), + entity_type=entity_type, + # Validated/corrected schema-aware classification fields + target_role_category=target_role_category, + target_staff_role=target_staff_role, + target_custodian_type=target_custodian_type, ) + + return prediction class MultiHopHeritageRetriever(dspy.Module): @@ -770,6 +1206,75 @@ def create_heritage_tools( }, )) + # Person/staff search tool + def search_heritage_staff( + query: str, + k: int = 10, + custodian_filter: Optional[str] = None, + only_heritage_relevant: bool = False, + ) -> str: + """Search for staff/persons working at heritage institutions. + + Use this tool when the user asks about who works at an institution, + staff members, employees, medewerkers, or specific roles like + archivist, curator, conservator, etc. + + Args: + query: Natural language search query (e.g., "Wie werkt er in het Nationaal Archief?") + k: Number of results to return + custodian_filter: Optional custodian slug to filter by (e.g., "nationaal-archief") + only_heritage_relevant: Only return heritage-relevant staff + + Returns: + JSON string of matching staff with names, roles, headlines, custodian info + """ + if qdrant_retriever is None: + return json.dumps({"error": "Qdrant not available"}) + + try: + # Check if retriever has search_persons method + if hasattr(qdrant_retriever, 'search_persons'): + results = qdrant_retriever.search_persons( + query=query, + k=k, + filter_custodian=custodian_filter, + only_heritage_relevant=only_heritage_relevant, + ) + # Convert to dict format + result_dicts = [] + for r in results: + if hasattr(r, 'to_dict'): + result_dicts.append(r.to_dict()) + elif hasattr(r, '__dict__'): + result_dicts.append({ + "name": getattr(r, 'name', 'Unknown'), + "headline": getattr(r, 'headline', None), + "custodian_name": getattr(r, 'custodian_name', None), + "custodian_slug": getattr(r, 'custodian_slug', None), + "heritage_relevant": getattr(r, 'heritage_relevant', None), + "heritage_type": getattr(r, 'heritage_type', None), + "score": getattr(r, 'vector_score', None), + }) + else: + result_dicts.append({"name": str(r)}) + return json.dumps(result_dicts, ensure_ascii=False) + else: + return json.dumps({"error": "Person search not available on retriever"}) + except Exception as e: + return json.dumps({"error": str(e)}) + + tools.append(dspy.Tool( + func=search_heritage_staff, + name="search_heritage_staff", + desc="Search for staff/persons working at heritage institutions. Use for questions about employees, medewerkers, who works at, staff roles (archivist, curator, conservator, etc.)", + args={ + "query": "Natural language search query about staff (e.g., 'Wie werkt er in het Nationaal Archief?')", + "k": "Number of results (default 10)", + "custodian_filter": "Optional custodian slug to filter by (e.g., 'nationaal-archief', 'rijksmuseum')", + "only_heritage_relevant": "Only return heritage-relevant staff (default False)", + }, + )) + # SPARQL query tool import httpx @@ -962,7 +1467,7 @@ def heritage_gepa_metric( trace: Optional[Any] = None, pred_name: Optional[str] = None, pred_trace: Optional[Any] = None, -) -> dict[str, Any]: +) -> Prediction: """GEPA feedback metric for heritage RAG optimization. Provides rich textual feedback for GEPA's reflective optimization. @@ -1040,9 +1545,9 @@ def heritage_gepa_metric( if coverage == 1.0: feedback_parts.append("Answer contains all expected information") else: - missing = [k for k in expected_answer_contains if k.lower() not in answer_lower] + missing_keywords = [k for k in expected_answer_contains if k.lower() not in answer_lower] feedback_parts.append( - f"Answer missing: {missing}. " + f"Answer missing: {missing_keywords}. " f"Include specific facts from retrieved data." ) @@ -1061,6 +1566,48 @@ def heritage_gepa_metric( f"for this answer quality. Calibrate based on evidence strength." ) + # 6. Schema-aware classification scoring (bonus: up to 0.25 points) + # These are supplementary signals for ontology-aligned query classification + + # 6a. Role category match (0.10 points) + expected_role_cat = getattr(gold, "expected_role_category", None) + if expected_role_cat and expected_role_cat != "UNKNOWN": + pred_role_cat = getattr(pred, "target_role_category", "UNKNOWN") + if pred_role_cat.upper() == expected_role_cat.upper(): + score += 0.10 + feedback_parts.append(f"Role category correctly identified: {pred_role_cat}") + else: + feedback_parts.append( + f"Role category mismatch: predicted '{pred_role_cat}', expected '{expected_role_cat}'. " + f"Consider the professional domain (CURATORIAL, ARCHIVAL, DIGITAL, etc.)." + ) + + # 6b. Exact staff role match (0.05 bonus points) + expected_staff_role = getattr(gold, "expected_staff_role", None) + if expected_staff_role and expected_staff_role != "UNKNOWN": + pred_staff_role = getattr(pred, "target_staff_role", "UNKNOWN") + if pred_staff_role == expected_staff_role: + score += 0.05 + feedback_parts.append(f"Exact staff role match: {pred_staff_role}") + elif pred_staff_role != "UNKNOWN": + feedback_parts.append( + f"Staff role refinement needed: predicted '{pred_staff_role}', expected '{expected_staff_role}'. " + f"Match the specific role type within the category." + ) + + # 6c. Custodian type match (0.10 points) + expected_cust_type = getattr(gold, "expected_custodian_type", None) + if expected_cust_type and expected_cust_type != "UNKNOWN": + pred_cust_type = getattr(pred, "target_custodian_type", "UNKNOWN") + if pred_cust_type.upper() == expected_cust_type.upper(): + score += 0.10 + feedback_parts.append(f"Custodian type correctly identified: {pred_cust_type}") + else: + feedback_parts.append( + f"Custodian type mismatch: predicted '{pred_cust_type}', expected '{expected_cust_type}'. " + f"Consider institution context (MUSEUM, LIBRARY, ARCHIVE, etc.)." + ) + # Compose feedback feedback = "\n".join([f"- {p}" for p in feedback_parts]) @@ -1119,13 +1666,20 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: Tuple of (trainset, valset) with Example objects """ # Training examples with expected outputs + # Note: expected_role_category, expected_staff_role, expected_custodian_type + # are used for schema-aware classification evaluation trainset = [ + # Institution queries - with custodian type expectations Example( question="Hoeveel musea zijn er in Amsterdam?", language="nl", expected_intent="statistical", + expected_entity_type="institution", expected_entities=["amsterdam", "musea"], expected_sources=["sparql", "qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["musea", "Amsterdam", "aantal"], ).with_inputs("question", "language"), @@ -1133,8 +1687,12 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="Where is the Rijksmuseum located?", language="en", expected_intent="entity_lookup", + expected_entity_type="institution", expected_entities=["rijksmuseum"], expected_sources=["sparql", "qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["Rijksmuseum", "Amsterdam", "Museumplein"], ).with_inputs("question", "language"), @@ -1142,8 +1700,12 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="Show me archives related to World War II", language="en", expected_intent="exploration", + expected_entity_type="institution", expected_entities=["world war ii", "archives"], expected_sources=["qdrant", "sparql"], + expected_custodian_type="ARCHIVE", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["archive", "war", "collection"], ).with_inputs("question", "language"), @@ -1151,8 +1713,12 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="Welke bibliotheken zijn gefuseerd sinds 2000?", language="nl", expected_intent="temporal", + expected_entity_type="institution", expected_entities=["bibliotheken", "fusie", "2000"], expected_sources=["typedb", "sparql"], + expected_custodian_type="LIBRARY", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["bibliotheek", "fusie", "merge"], ).with_inputs("question", "language"), @@ -1160,8 +1726,12 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="Find heritage institutions near Rotterdam Centraal", language="en", expected_intent="geographic", + expected_entity_type="institution", expected_entities=["rotterdam centraal"], expected_sources=["postgis", "qdrant", "sparql"], + expected_custodian_type="UNSPECIFIED", # General heritage, not specific type + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["Rotterdam", "museum", "km"], ).with_inputs("question", "language"), @@ -1169,8 +1739,12 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="Compare the collections of Rijksmuseum and Van Gogh Museum", language="en", expected_intent="comparative", + expected_entity_type="institution", expected_entities=["rijksmuseum", "van gogh museum"], expected_sources=["sparql", "qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["Rijksmuseum", "Van Gogh", "collection"], ).with_inputs("question", "language"), @@ -1178,20 +1752,340 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="What institutions are part of the Erfgoed Leiden network?", language="en", expected_intent="relational", + expected_entity_type="institution", expected_entities=["erfgoed leiden"], expected_sources=["typedb", "sparql"], + expected_custodian_type="UNSPECIFIED", # Network could be mixed types + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["Erfgoed", "Leiden", "member"], ).with_inputs("question", "language"), + + # Person queries - Dutch (with role category and specific role expectations) + Example( + question="Wie werkt bij het Nationaal Archief?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["nationaal archief"], + expected_sources=["qdrant"], + expected_custodian_type="ARCHIVE", # Inferred from "Nationaal Archief" + expected_role_category="UNKNOWN", # General staff query, no specific role + expected_staff_role="UNKNOWN", + answer_contains=["medewerker", "archief"], + ).with_inputs("question", "language"), + + Example( + question="Noem de kaartenexperts van het Nationaal Archief", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["nationaal archief", "kaarten", "expert"], + expected_sources=["qdrant"], + expected_custodian_type="ARCHIVE", # Inferred from "Nationaal Archief" + expected_role_category="CURATORIAL", # Map experts are curatorial + expected_staff_role="CollectionsManager", # Closest match for map specialist + answer_contains=["kaart", "specialist"], + ).with_inputs("question", "language"), + + Example( + question="Welke curatoren werken bij het Rijksmuseum?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["rijksmuseum", "curator"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", # Inferred from "Rijksmuseum" + expected_role_category="CURATORIAL", + expected_staff_role="Curator", + answer_contains=["curator", "Rijksmuseum"], + ).with_inputs("question", "language"), + + # Person queries - English (with role expectations) + Example( + question="Who works at the Eye Filmmuseum?", + language="en", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["eye filmmuseum"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", # Inferred from "Eye Filmmuseum" + expected_role_category="UNKNOWN", # General staff, no specific role + expected_staff_role="UNKNOWN", + answer_contains=["staff", "Eye", "film"], + ).with_inputs("question", "language"), + + Example( + question="Find archivists specializing in medieval manuscripts", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["archivist", "medieval", "manuscripts"], + expected_sources=["qdrant"], + expected_custodian_type="ARCHIVE", # Archivists typically work at archives + expected_role_category="ARCHIVAL", + expected_staff_role="Archivist", + answer_contains=["archivist", "manuscript"], + ).with_inputs("question", "language"), + + Example( + question="Show me the director of the Amsterdam Museum", + language="en", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["amsterdam museum", "director"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", # Inferred from "Amsterdam Museum" + expected_role_category="LEADERSHIP", + expected_staff_role="Director", + answer_contains=["director", "Amsterdam"], + ).with_inputs("question", "language"), + + # Additional person queries for diverse role categories + Example( + question="Who are the data engineers at Dutch heritage institutions?", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["data engineer", "heritage"], + expected_sources=["qdrant"], + expected_custodian_type="UNSPECIFIED", # General "Dutch heritage institutions" + expected_role_category="DIGITAL", + expected_staff_role="DataEngineer", + answer_contains=["data", "engineer"], + ).with_inputs("question", "language"), + + Example( + question="Wie zijn de conservatoren bij het Rijksmuseum?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["rijksmuseum", "conservator"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="CONSERVATION", + expected_staff_role="Conservator", + answer_contains=["conservator", "Rijksmuseum"], + ).with_inputs("question", "language"), + + Example( + question="Find education officers at Dutch museums", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["education officer", "museum"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="EDUCATION", + expected_staff_role="EducationOfficer", + answer_contains=["education", "museum"], + ).with_inputs("question", "language"), + + Example( + question="Who manages the library collection at the KB?", + language="en", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["library", "collection", "KB"], + expected_sources=["qdrant"], + expected_custodian_type="LIBRARY", + expected_role_category="LIBRARY", + expected_staff_role="Librarian", + answer_contains=["library", "KB"], + ).with_inputs("question", "language"), + + # Both (person AND institution) + Example( + question="Which museums have staff with expertise in digital preservation?", + language="en", + expected_intent="exploration", + expected_entity_type="both", + expected_entities=["museum", "digital preservation", "staff"], + expected_sources=["qdrant", "sparql"], + expected_custodian_type="MUSEUM", + expected_role_category="DIGITAL", + expected_staff_role="DigitizationSpecialist", + answer_contains=["museum", "digital", "preservation"], + ).with_inputs("question", "language"), + + # Additional examples for underrepresented role categories + # TECHNICAL category + Example( + question="Who maintains the IT systems at Dutch archives?", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["IT", "systems", "archives"], + expected_sources=["qdrant"], + expected_custodian_type="ARCHIVE", + expected_role_category="TECHNICAL", + expected_staff_role="ITManager", + answer_contains=["IT", "systems", "technical"], + ).with_inputs("question", "language"), + + Example( + question="Wie zijn de systeembeheerders bij het Rijksmuseum?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["rijksmuseum", "systeembeheerder"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="TECHNICAL", + expected_staff_role="SystemAdministrator", + answer_contains=["systeem", "beheerder", "Rijksmuseum"], + ).with_inputs("question", "language"), + + # GOVERNANCE category + Example( + question="Who sits on the board of the Van Gogh Museum?", + language="en", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["van gogh museum", "board"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="GOVERNANCE", + expected_staff_role="BoardMember", + answer_contains=["board", "Van Gogh", "governance"], + ).with_inputs("question", "language"), + + Example( + question="Wie zijn de bestuursleden van het Nationaal Archief?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["nationaal archief", "bestuurslid"], + expected_sources=["qdrant"], + expected_custodian_type="ARCHIVE", + expected_role_category="GOVERNANCE", + expected_staff_role="BoardMember", + answer_contains=["bestuur", "archief"], + ).with_inputs("question", "language"), + + # RESEARCH category + Example( + question="Find researchers studying provenance at Dutch museums", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["researcher", "provenance", "museum"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="RESEARCH", + expected_staff_role="ProvenanceResearcher", + answer_contains=["research", "provenance"], + ).with_inputs("question", "language"), + + Example( + question="Wie doet onderzoek bij de KB?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["onderzoek", "KB"], + expected_sources=["qdrant"], + expected_custodian_type="LIBRARY", + expected_role_category="RESEARCH", + expected_staff_role="Researcher", + answer_contains=["onderzoeker", "KB"], + ).with_inputs("question", "language"), + + # SUPPORT category + Example( + question="Who handles visitor services at the Stedelijk Museum?", + language="en", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["visitor services", "stedelijk museum"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="SUPPORT", + expected_staff_role="VisitorServicesManager", + answer_contains=["visitor", "service", "Stedelijk"], + ).with_inputs("question", "language"), + + Example( + question="Wie werkt bij de receptie van het Mauritshuis?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["receptie", "mauritshuis"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="SUPPORT", + expected_staff_role="FrontDeskStaff", + answer_contains=["receptie", "Mauritshuis"], + ).with_inputs("question", "language"), + + # CREATIVE category + Example( + question="Who designs exhibitions at Dutch heritage institutions?", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["exhibition", "design", "heritage"], + expected_sources=["qdrant"], + expected_custodian_type="UNSPECIFIED", # General "heritage institutions" + expected_role_category="CREATIVE", + expected_staff_role="ExhibitionDesigner", + answer_contains=["exhibition", "design"], + ).with_inputs("question", "language"), + + Example( + question="Vind fotografen werkzaam bij Nederlandse musea", + language="nl", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["fotograaf", "museum"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="CREATIVE", + expected_staff_role="Photographer", + answer_contains=["fotograaf", "museum"], + ).with_inputs("question", "language"), + + # EXTERNAL category + Example( + question="Who are the volunteer coordinators at Dutch archives?", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["volunteer", "coordinator", "archives"], + expected_sources=["qdrant"], + expected_custodian_type="ARCHIVE", + expected_role_category="EXTERNAL", + expected_staff_role="VolunteerCoordinator", + answer_contains=["volunteer", "coordinator"], + ).with_inputs("question", "language"), + + Example( + question="Wie coördineert de vrijwilligers bij het Openluchtmuseum?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["vrijwilliger", "openluchtmuseum"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="EXTERNAL", + expected_staff_role="VolunteerCoordinator", + answer_contains=["vrijwilliger", "Openluchtmuseum"], + ).with_inputs("question", "language"), ] # Validation examples (held out for evaluation) + # Same structure as trainset with role/type expectations valset = [ + # Institution validation Example( question="Hoeveel archieven heeft Noord-Holland?", language="nl", expected_intent="statistical", + expected_entity_type="institution", expected_entities=["noord-holland", "archieven"], expected_sources=["sparql", "qdrant"], + expected_custodian_type="ARCHIVE", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["Noord-Holland", "archief", "aantal"], ).with_inputs("question", "language"), @@ -1199,8 +2093,12 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="When was the Nationaal Archief founded?", language="en", expected_intent="temporal", + expected_entity_type="institution", expected_entities=["nationaal archief"], expected_sources=["sparql", "qdrant"], + expected_custodian_type="ARCHIVE", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["Nationaal Archief", "founded", "year"], ).with_inputs("question", "language"), @@ -1208,10 +2106,41 @@ def create_heritage_training_data() -> tuple[list[Example], list[Example]]: question="Show galleries in the Randstad region", language="en", expected_intent="geographic", + expected_entity_type="institution", expected_entities=["galleries", "randstad"], expected_sources=["postgis", "qdrant", "sparql"], + expected_custodian_type="GALLERY", + expected_role_category="UNKNOWN", + expected_staff_role="UNKNOWN", answer_contains=["gallery", "Randstad"], ).with_inputs("question", "language"), + + # Person validation + Example( + question="Wie zijn de bibliothecarissen bij de Koninklijke Bibliotheek?", + language="nl", + expected_intent="entity_lookup", + expected_entity_type="person", + expected_entities=["koninklijke bibliotheek", "bibliothecaris"], + expected_sources=["qdrant"], + expected_custodian_type="LIBRARY", # KB = Koninklijke Bibliotheek + expected_role_category="LIBRARY", + expected_staff_role="Librarian", + answer_contains=["bibliothecaris", "KB"], + ).with_inputs("question", "language"), + + Example( + question="Find conservation experts in Dutch museums", + language="en", + expected_intent="exploration", + expected_entity_type="person", + expected_entities=["conservation", "expert", "dutch", "museum"], + expected_sources=["qdrant"], + expected_custodian_type="MUSEUM", + expected_role_category="CONSERVATION", + expected_staff_role="Conservator", + answer_contains=["conservation", "museum"], + ).with_inputs("question", "language"), ] return trainset, valset @@ -1235,7 +2164,7 @@ async def optimize_heritage_rag( # Choose training data source if use_extended_data: try: - from backend.rag.gepa_training_extended import get_extended_training_data # type: ignore[import-unresolved] + from backend.rag.gepa_training_extended import get_extended_training_data trainset, valset = get_extended_training_data() logger.info("Using extended GEPA training data") except ImportError: @@ -1273,9 +2202,9 @@ async def optimize_heritage_rag( class HeritageStreamListener(StreamListener): """Stream listener for heritage RAG responses.""" - def __init__(self): - self.tokens = [] - self.status_messages = [] + def __init__(self) -> None: + self.tokens: list[str] = [] + self.status_messages: list[dict[str, Any]] = [] def receive(self, token: str) -> None: """Receive a token from the stream.""" @@ -1294,10 +2223,10 @@ class HeritageStreamListener(StreamListener): class HeritageStatusProvider(StatusMessageProvider): """Status message provider for heritage RAG streaming.""" - def __init__(self, module_name: str): + def __init__(self, module_name: str) -> None: self.module_name = module_name - def get_status(self, **kwargs) -> StatusMessage: + def get_status(self, **kwargs: Any) -> StatusMessage: """Generate status message for current operation.""" return StatusMessage( message=f"Processing with {self.module_name}", @@ -1429,9 +2358,9 @@ class HeritageRAGPipeline(dspy.Module): Combines: - Intent classification - - Multi-hop retrieval + - Multi-hop retrieval with ACTUAL data retrieval - Entity extraction - - Answer generation + - Answer generation with retrieved context - Visualization selection Can be optimized with GEPA for improved performance. @@ -1441,6 +2370,7 @@ class HeritageRAGPipeline(dspy.Module): max_hops: Maximum retrieval hops (default 3) use_schema_aware: Use schema-aware signatures with LinkML-derived docstrings for improved LLM context (default True if schema loader available) + retriever: Optional HybridRetriever for actual data retrieval (Qdrant person/institution search) """ def __init__( @@ -1448,9 +2378,13 @@ class HeritageRAGPipeline(dspy.Module): tools: Optional[list[dspy.Tool]] = None, max_hops: int = 3, use_schema_aware: Optional[bool] = None, + retriever: Any = None, ): super().__init__() + # Store retriever for actual data retrieval + self.retriever = retriever + # Determine whether to use schema-aware signatures # Default to True if schema loader is available if use_schema_aware is None: @@ -1489,6 +2423,7 @@ class HeritageRAGPipeline(dspy.Module): self.viz_selector = dspy.Predict(VisualizationSelector) # Optional ReAct agent for complex queries + self.agent: HeritageReActAgent | None if tools: self.agent = HeritageReActAgent(tools=tools) else: @@ -1508,6 +2443,7 @@ class HeritageRAGPipeline(dspy.Module): include_viz: bool = True, use_agent: bool = False, skip_cache: bool = False, + embedding_model: str | None = None, ) -> Prediction: """Execute complete RAG pipeline. @@ -1518,6 +2454,7 @@ class HeritageRAGPipeline(dspy.Module): include_viz: Whether to include visualization config use_agent: Whether to use ReAct agent for complex queries skip_cache: Force bypass cache lookup + embedding_model: Optional embedding model (minilm_384, openai_1536, bge_768) Returns: Prediction with answer, sources, visualization, etc. @@ -1565,6 +2502,10 @@ class HeritageRAGPipeline(dspy.Module): # Standard RAG Pipeline (cache miss path) # ================================================================= + # Initialize retrieval tracking variables (before branching to ensure always defined) + detected_query_type = "institution" # Default query type + retrieved_results = [] # Store raw results for frontend visualization + # Step 1: Route query (with history for context resolution) routing = self.router(question=question, language=language, history=history) @@ -1598,9 +2539,174 @@ class HeritageRAGPipeline(dspy.Module): citations = getattr(result, "citations", []) follow_up = getattr(result, "follow_up", []) else: - # Use standard answer generation - # In production, would use actual retrieval results - context = f"Query intent: {routing.intent}\nEntities: {routing.entities}" + # Use standard answer generation WITH actual retrieval results + context_parts = [f"Query intent: {routing.intent}", f"Entities: {routing.entities}"] + + # =================================================================== + # ACTUAL DATA RETRIEVAL - Use Qdrant to fetch real person/institution data + # =================================================================== + if self.retriever: + try: + # Import query type detection (try multiple import paths for server/local compatibility) + try: + from glam_extractor.api.hybrid_retriever import detect_query_type + except ImportError: + try: + from src.glam_extractor.api.hybrid_retriever import detect_query_type + except ImportError: + try: + from hybrid_retriever import detect_query_type + except ImportError: + # Fallback: simple function that ignores entity_type + def _fallback_detect_query_type(query: str, dspy_entity_type: str | None = None) -> str: + return "institution" + detect_query_type = _fallback_detect_query_type + + # Use DSPy semantic classification if available, fallback to keyword heuristics + dspy_entity_type = getattr(routing, 'entity_type', None) + detected_query_type = detect_query_type(resolved_question, dspy_entity_type=dspy_entity_type) + + if detected_query_type == "person": + # Search for staff/persons in heritage_persons collection + # Use schema-aware filters from DSPy router when available + target_role_category = getattr(routing, 'target_role_category', None) + target_custodian_type = getattr(routing, 'target_custodian_type', None) + + # Only pass non-empty, non-unknown values + effective_role_category = target_role_category if target_role_category not in (None, "", "UNKNOWN", "UNSPECIFIED") else None + effective_custodian_type = target_custodian_type if target_custodian_type not in (None, "", "UNKNOWN", "UNSPECIFIED") else None + + logger.info(f"Performing PERSON retrieval for: {resolved_question[:50]}... (role_category={effective_role_category}, custodian_type={effective_custodian_type})") + person_results = self.retriever.search_persons( + query=resolved_question, + k=15, + using=embedding_model, + target_role_category=effective_role_category, + target_custodian_type=effective_custodian_type, + ) + + if person_results: + context_parts.append("\n[RETRIEVED STAFF DATA - Real data from heritage institutions]:") + for p in person_results: + name = p.name or "Unknown" + headline = p.headline or "" + custodian = p.custodian_name or "" + location = p.location or "" + + # Format each person entry for LLM context + entry = f"- {name}" + if headline: + entry += f": {headline}" + if custodian: + entry += f" @ {custodian}" + if location: + entry += f" ({location})" + context_parts.append(entry) + + # Store raw result for frontend visualization + # Use to_dict() to include all scores (vector, combined, richness) + if hasattr(p, 'to_dict'): + person_dict = p.to_dict() + person_dict['type'] = 'person' + retrieved_results.append(person_dict) + else: + retrieved_results.append({ + "type": "person", + "person_id": p.person_id, + "name": name, + "headline": headline, + "custodian_name": custodian, + "custodian_slug": p.custodian_slug, + "location": location, + "heritage_relevant": p.heritage_relevant, + "heritage_type": p.heritage_type, + "linkedin_url": p.linkedin_url, + "score": p.combined_score, + }) + + logger.info(f"Added {len(person_results)} person results to context") + else: + # Fallback to institution search if person search returns nothing + # (e.g., heritage_persons collection doesn't exist yet) + logger.info(f"No person results, falling back to INSTITUTION retrieval for: {resolved_question[:50]}...") + detected_query_type = "institution" # Update for response + inst_results = self.retriever.search(query=resolved_question, k=10, auto_route=False, using=embedding_model) + if inst_results: + context_parts.append("\n[RETRIEVED INSTITUTIONS - Related to your person query]:") + for inst in inst_results: + if hasattr(inst, 'to_dict'): + inst_dict = inst.to_dict() + name = inst_dict.get('name', 'Unknown') + inst_type = inst_dict.get('metadata', {}).get('type', '') + city = inst_dict.get('metadata', {}).get('city', '') + inst_dict['type'] = 'institution' + retrieved_results.append(inst_dict) + else: + name = getattr(inst, 'name', 'Unknown') + inst_type = getattr(inst, 'type', '') + city = getattr(inst, 'city', '') + retrieved_results.append({ + "type": "institution", + "name": name, + "institution_type": inst_type, + "city": city, + }) + + entry = f"- {name}" + if inst_type: + entry += f" ({inst_type})" + if city: + entry += f" in {city}" + context_parts.append(entry) + logger.info(f"Added {len(inst_results)} institution results as fallback") + else: + context_parts.append("\n[No staff or institution data found for this query]") + else: + # Search for institutions in heritage_custodians collection + logger.info(f"Performing INSTITUTION retrieval for: {resolved_question[:50]}...") + inst_results = self.retriever.search(query=resolved_question, k=10, auto_route=False, using=embedding_model) + + if inst_results: + context_parts.append("\n[RETRIEVED INSTITUTIONS - Real data from heritage database]:") + for inst in inst_results: + if hasattr(inst, 'to_dict'): + inst_dict = inst.to_dict() + name = inst_dict.get('name', 'Unknown') + inst_type = inst_dict.get('metadata', {}).get('type', '') + city = inst_dict.get('metadata', {}).get('city', '') + # Store full dict for frontend + inst_dict['type'] = 'institution' + retrieved_results.append(inst_dict) + else: + name = getattr(inst, 'name', 'Unknown') + inst_type = getattr(inst, 'type', '') + city = getattr(inst, 'city', '') + # Build dict for frontend + retrieved_results.append({ + "type": "institution", + "name": name, + "institution_type": inst_type, + "city": city, + }) + + entry = f"- {name}" + if inst_type: + entry += f" ({inst_type})" + if city: + entry += f" in {city}" + context_parts.append(entry) + + logger.info(f"Added {len(inst_results)} institution results to context") + else: + context_parts.append("\n[No institution data found for this query]") + + except Exception as e: + logger.warning(f"Retrieval failed, using basic context: {e}") + context_parts.append(f"\n[Retrieval error: {str(e)}]") + else: + logger.warning("No retriever available - using basic context without real data") + + context = "\n".join(context_parts) answer_result = self.answer_gen( question=resolved_question, @@ -1630,6 +2736,7 @@ class HeritageRAGPipeline(dspy.Module): } # Build prediction result + # Note: detected_query_type and retrieved_results are now initialized before branching prediction = Prediction( answer=answer, intent=routing.intent, @@ -1642,6 +2749,9 @@ class HeritageRAGPipeline(dspy.Module): visualization=viz_config, cache_hit=False, # Mark as cache miss resolved_question=getattr(routing, 'resolved_question', question), # Include resolved question from router + retrieved_results=retrieved_results, # Raw data for frontend visualization + query_type=detected_query_type, # "person" or "institution" + embedding_model_used=embedding_model, # Which embedding model was used for retrieval ) # ================================================================= @@ -1733,7 +2843,7 @@ async def create_optimized_pipeline( auto_budget=optimization_budget, ) - return optimized + return cast(HeritageRAGPipeline, optimized) # ============================================================================= @@ -1800,7 +2910,7 @@ if __name__ == "__main__": print("Testing streaming...") print("-" * 60) - async def test_stream(): + async def test_stream() -> None: async for chunk in stream_heritage_rag( question="Where is the Rijksmuseum?", language="en", diff --git a/backend/rag/ontology_mapping.py b/backend/rag/ontology_mapping.py new file mode 100644 index 0000000000..2914f1d641 --- /dev/null +++ b/backend/rag/ontology_mapping.py @@ -0,0 +1,1360 @@ +""" +Dynamic Ontology Mapping from LinkML Schema Files + +This module provides dynamic loading and matching of LinkML schema enumerations +for the Heritage RAG pipeline. The LinkML schema files are the SINGLE SOURCE OF TRUTH - +no hardcoded enum values. + +Key features: +1. Dynamically loads enum files from schemas/20251121/linkml/modules/enums/ +2. Extracts multilingual synonyms from the 'comments' field in YAML +3. Provides fuzzy matching for natural language queries +4. Supports cache invalidation based on file modification times +5. Generates filter mappings for Qdrant queries + +Usage: + from backend.rag.ontology_mapping import get_ontology_mapper, match_custodian_type + + mapper = get_ontology_mapper() + + # Match natural language to schema enum value + result = mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum") + # Returns: "VIRTUAL_MUSEUM" + + # Get heritage type code for Qdrant filtering + code = mapper.get_heritage_type_code("MUSEUM") + # Returns: "M" + + # Get custodian type to code mapping (replaces hardcoded dict) + type_to_code = mapper.get_custodian_type_to_code_mapping() + # Returns: {"GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", ...} +""" + +from __future__ import annotations + +import logging +import os +import re +import unicodedata +from dataclasses import dataclass, field +from datetime import datetime, timezone +from functools import lru_cache +from pathlib import Path +from typing import Any + +import yaml + +logger = logging.getLogger(__name__) + +# Default schema directory - matches schema_loader.py +SCHEMA_BASE_DIR = Path(__file__).parent.parent.parent / "schemas" / "20251121" / "linkml" + +# Languages supported for synonym extraction (ISO 639-1 codes) +SUPPORTED_LANGUAGES = {"en", "nl", "de", "fr", "es", "it", "pt"} + +# Heritage-specific vocabulary for domain-specific language detection +# These terms are where general-purpose language detectors often fail on short heritage terms. +# fast-langdetect is used as primary detector; this vocabulary is used as fallback for: +# 1. Low-confidence detections (score < CONFIDENCE_THRESHOLD) +# 2. Known problematic terms that detectors consistently misclassify +# +# NOTE: This is a REDUCED vocabulary focused only on disambiguation cases. +# General-purpose language detection handles most terms correctly. +HERITAGE_VOCABULARY: dict[str, set[str]] = { + "nl": { + # Dutch terms that fast-langdetect often misclassifies + # (e.g., "musea" detected as Italian, "bibliotheken" as German) + "musea", "bibliotheek", "bibliotheken", "archief", "archieven", + "galerij", "galerijen", "collectie", "collecties", "verzameling", + "heemkundige", "kring", "vereniging", "genootschap", "erfgoed", + "rijks", "gemeentelijk", "provinciale", + }, + "de": { + # German terms - most are detected correctly, keep only ambiguous ones + "museen", "archiv", "sammlung", "sammlungen", + "landesarchiv", "stadtarchiv", "bundesarchiv", + }, + "fr": { + # French terms with diacritics are usually detected correctly + # Keep only terms without diacritics that could be confused + "musee", "musees", "bibliotheque", "bibliotheques", + }, + "es": { + # Spanish - biblioteca/museo overlap with Italian + "archivos", "bibliotecas", + }, + "it": { + # Italian terms + "musei", "archivi", "biblioteche", "galleria", "gallerie", + }, + "pt": { + # Portuguese - museu is distinctive + "museu", "museus", "arquivo", "arquivos", + }, + "en": { + # English heritage terms - these should match English + "library", "libraries", "museum", "museums", "archive", "archives", + "gallery", "galleries", "collection", "collections", + "society", "association", "foundation", "trust", "institute", + }, +} + +# Confidence threshold for fast-langdetect +# Below this, fall back to heritage vocabulary matching +LANGDETECT_CONFIDENCE_THRESHOLD = 0.6 + +# Flag to track if fast-langdetect is available +_FAST_LANGDETECT_AVAILABLE: bool | None = None + + +def _is_fast_langdetect_available() -> bool: + """Check if fast-langdetect is available.""" + global _FAST_LANGDETECT_AVAILABLE + if _FAST_LANGDETECT_AVAILABLE is None: + try: + from fast_langdetect import detect # noqa: F401 + _FAST_LANGDETECT_AVAILABLE = True + except ImportError: + _FAST_LANGDETECT_AVAILABLE = False + logger.warning( + "fast-langdetect not installed. Using heritage vocabulary fallback only. " + "Install with: pip install fast-langdetect" + ) + return _FAST_LANGDETECT_AVAILABLE + + +def _match_heritage_vocabulary(term: str) -> str | None: + """Match term against heritage-specific vocabulary. + + This is the fallback method when fast-langdetect is unavailable or + returns low confidence. Uses domain-specific heritage terms that + general-purpose language detectors often misclassify. + + Args: + term: The term to match + + Returns: + Language code or None if no match + """ + normalized = normalize_text(term) + original_lower = term.lower().strip() + + # Single-word exact match + for lang, vocab in HERITAGE_VOCABULARY.items(): + normalized_vocab = {normalize_text(v) for v in vocab} + if normalized in normalized_vocab: + return lang + # Also check with original (preserves diacritics) + if original_lower in {v.lower() for v in vocab}: + return lang + + # Prefix matching for morphological variations + # e.g., "bibliotheken" should match "bibliotheek" + for lang, vocab in HERITAGE_VOCABULARY.items(): + normalized_vocab = {normalize_text(v) for v in vocab} + for marker in normalized_vocab: + if len(marker) >= 5 and len(normalized) >= 5: + if normalized.startswith(marker[:5]) or marker.startswith(normalized[:5]): + return lang + + return None + + +def detect_term_language(term: str) -> str | None: + """Detect language of a term using hybrid approach. + + Uses a two-stage detection strategy: + 1. Primary: fast-langdetect library (FastText model, 176 languages) + 2. Fallback: Heritage-specific vocabulary for domain terms + + The fallback is used when: + - fast-langdetect is not installed + - Detection confidence is below threshold (0.6) + - The term matches known heritage vocabulary + + Args: + term: A single term to analyze (e.g., "bibliotheken", "museos") + + Returns: + ISO 639-1 language code or None if detection fails + + Examples: + >>> detect_term_language("bibliotheken") + "nl" + >>> detect_term_language("museos") + "es" + >>> detect_term_language("bibliothèques") + "fr" + >>> detect_term_language("Public libraries") + "en" + >>> detect_term_language("unknown term") + None + """ + if not term or not term.strip(): + return "en" # Default for empty strings + + normalized = normalize_text(term) + words = normalized.split() + + # Multi-word phrase detection + if len(words) > 1: + # English phrase indicators - these words strongly suggest English + english_indicators = { + "public", "national", "special", "digital", "academic", "local", + "art", "history", "science", "natural", "city", "state", + "corporate", "government", "religious", "university", + } + if any(word in english_indicators for word in words): + return "en" + + # Try heritage vocabulary first for known terms + # This catches terms that fast-langdetect misclassifies + heritage_match = _match_heritage_vocabulary(term) + if heritage_match: + return heritage_match + + # Use fast-langdetect if available + if _is_fast_langdetect_available(): + try: + from fast_langdetect import detect + result = detect(term) + + if isinstance(result, dict): + lang = result.get("lang") + score = result.get("score", 0) + elif isinstance(result, list) and result: + lang = result[0].get("lang") + score = result[0].get("score", 0) + else: + lang = None + score = 0 + + # Return if confidence is high enough + if lang and score >= LANGDETECT_CONFIDENCE_THRESHOLD: + # Map to supported languages (fast-langdetect returns ISO 639-1) + if lang in SUPPORTED_LANGUAGES: + return str(lang) + # Some language codes need mapping + lang_mapping: dict[str, str] = {"af": "nl"} # Afrikaans often confused with Dutch + mapped = lang_mapping.get(str(lang), str(lang)) + return mapped if mapped in SUPPORTED_LANGUAGES else None + + # Low confidence - fall through to return None + logger.debug(f"Low confidence ({score:.2f}) for term '{term}', returning None") + + except Exception as e: + logger.debug(f"fast-langdetect error for '{term}': {e}") + + # For multi-word terms without clear indicators, default to English + if len(words) > 1: + return "en" + + # Single word with no match - return None + return None + +# GLAMORCUBESFIXPHDNT taxonomy mapping - enum value name to single-letter code +# This mapping is STABLE (defined by taxonomy) but the enum VALUE NAMES may evolve +# So we still load dynamically and match to this fixed mapping +GLAMORCUBESFIXPHDNT_CODES: dict[str, str] = { + # Primary type enum values -> single letter codes + "GALLERY": "G", + "LIBRARY": "L", + "ARCHIVE": "A", + "MUSEUM": "M", + "OFFICIAL_INSTITUTION": "O", + "RESEARCH_CENTER": "R", + "COMMERCIAL": "C", + "UNSPECIFIED": "U", + "BIO_CUSTODIAN": "B", + "EDUCATION_PROVIDER": "E", + "HERITAGE_SOCIETY": "S", + "FEATURE_CUSTODIAN": "F", + "INTANGIBLE_HERITAGE_GROUP": "I", + "MIXED": "X", + "PERSONAL_COLLECTION": "P", + "HOLY_SACRED_SITE": "H", + "DIGITAL_PLATFORM": "D", + "NON_PROFIT": "N", + "TASTE_SCENT_HERITAGE": "T", +} + + +@dataclass +class EnumValueInfo: + """Detailed information for a single enum value. + + Attributes: + name: The enum value name (e.g., "VIRTUAL_MUSEUM") + description: Human-readable description + wikidata_id: Wikidata entity ID from 'meaning' field (e.g., "Q1225034") + synonyms: Language-tagged synonyms extracted from comments + all_synonyms_normalized: Flattened list of normalized synonyms for matching + """ + name: str + description: str | None = None + wikidata_id: str | None = None + synonyms: dict[str, list[str]] = field(default_factory=dict) # lang_code -> synonyms + all_synonyms_normalized: list[str] = field(default_factory=list) + + +@dataclass +class EnumMapping: + """Complete mapping for an enum type. + + Attributes: + enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum") + source_file: Path to the source YAML file + values: Dictionary mapping value names to EnumValueInfo + last_loaded: When this enum was last loaded + file_mtime: File modification time for cache invalidation + description: Enum-level description + """ + enum_name: str + source_file: Path + values: dict[str, EnumValueInfo] = field(default_factory=dict) + last_loaded: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + file_mtime: float = 0.0 + description: str | None = None + + +def normalize_text(text: str) -> str: + """Normalize text for matching: lowercase, remove accents, strip whitespace. + + Args: + text: Input text to normalize + + Returns: + Normalized text for comparison + + Examples: + >>> normalize_text("Digitales Museum") + "digitales museum" + >>> normalize_text("musée virtuel") + "musee virtuel" + >>> normalize_text("Bibliothèque") + "bibliotheque" + """ + # NFD decomposition separates base characters from combining marks + normalized = unicodedata.normalize('NFD', text) + # Remove combining marks (category 'Mn' = Mark, Nonspacing) + ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') + # Lowercase and strip + return ascii_text.lower().strip() + + +def parse_language_tag(comment: str) -> tuple[str | None, str]: + """Parse a language-tagged comment string. + + Format: "term (lang_code)" -> ("lang_code", "term") + + Args: + comment: Comment string, possibly with language tag + + Returns: + Tuple of (language_code, term) where language_code may be None + + Examples: + >>> parse_language_tag("Digitales Museum (de)") + ("de", "Digitales Museum") + >>> parse_language_tag("museo virtual (es)") + ("es", "museo virtual") + >>> parse_language_tag("Some plain comment") + (None, "Some plain comment") + """ + # Pattern: text (lang_code) at end of string + pattern = r'^(.+?)\s*\(([a-z]{2})\)\s*$' + match = re.match(pattern, comment, re.IGNORECASE) + + if match: + term = match.group(1).strip() + lang = match.group(2).lower() + if lang in SUPPORTED_LANGUAGES: + return (lang, term) + + return (None, comment) + + +def extract_comma_separated_terms(comment: str) -> list[str]: + """Extract comma-separated terms from comments like "Includes X, Y, Z". + + Handles patterns commonly found in CustodianPrimaryTypeEnum.yaml: + - "Includes bibliotheken, bibliotecas, bibliothèques" + - "Public libraries, academic libraries, national libraries" + - "Kunsthallen, art galleries, visual arts centers" + + Args: + comment: A comment string that may contain comma-separated terms + + Returns: + List of individual terms extracted from the comment + + Examples: + >>> extract_comma_separated_terms("Includes musea, museos, musées") + ["musea", "museos", "musées"] + >>> extract_comma_separated_terms("Public libraries, academic libraries") + ["Public libraries", "academic libraries"] + >>> extract_comma_separated_terms("Some single term comment") + [] # Empty list - no commas + """ + terms: list[str] = [] + + # Skip if no commas (not a list) + if ',' not in comment: + return terms + + # Strip common prefixes like "Includes", "Examples:", etc. + cleaned = comment + prefixes_to_strip = [ + r'^Includes\s+', + r'^Examples?:?\s*', + r'^Types?:?\s*', + r'^Such as\s+', + r'^E\.g\.?,?\s*', + r'^I\.e\.?,?\s*', + ] + for prefix in prefixes_to_strip: + cleaned = re.sub(prefix, '', cleaned, flags=re.IGNORECASE) + + # Split by comma + parts = cleaned.split(',') + + for part in parts: + # Clean up each term + term = part.strip() + + # Skip empty terms + if not term: + continue + + # Skip terms that look like full sentences (long descriptions) + if len(term) > 50: + continue + + # Skip terms that are just references like "(Q123456)" + if re.match(r'^\(Q\d+\)$', term): + continue + + # Handle trailing parentheses like "botanical gardens (Q473972)" + # Extract just the term part + paren_match = re.match(r'^(.+?)\s*\([^)]+\)\s*$', term) + if paren_match: + term = paren_match.group(1).strip() + + # Add valid terms + if term and len(term) >= 2: + terms.append(term) + + return terms + + +def extract_wikidata_id(meaning: str | None) -> str | None: + """Extract Wikidata ID from meaning field. + + Args: + meaning: The meaning field value (e.g., "wikidata:Q1225034") + + Returns: + The Wikidata ID (e.g., "Q1225034") or None + """ + if not meaning: + return None + + # Handle "wikidata:Q12345" format + if meaning.startswith("wikidata:"): + return meaning.replace("wikidata:", "") + + # Handle full URI format + if "wikidata.org" in meaning: + match = re.search(r'(Q\d+)', meaning) + if match: + return match.group(1) + + return None + + +class OntologyMapper: + """Dynamic ontology mapping from LinkML schema files. + + This class loads enum definitions from the LinkML schema directory and provides: + - Multilingual synonym extraction from YAML comments + - Natural language matching to schema enum values + - Cache invalidation based on file modification times + - Integration helpers for Qdrant filtering + + Usage: + mapper = OntologyMapper(schema_dir=Path("schemas/20251121/linkml")) + + # Load specific enum + digital_platforms = mapper.load_enum("DigitalPlatformTypeEnum") + print(len(digital_platforms.values)) # 53 + + # Match natural language (Dutch) + result = mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum") + # Returns: "VIRTUAL_MUSEUM" + + # Get heritage type code for Qdrant filtering + code = mapper.get_heritage_type_code("MUSEUM") + # Returns: "M" + """ + + def __init__(self, schema_dir: Path | None = None, watch_for_changes: bool = True): + """Initialize the OntologyMapper. + + Args: + schema_dir: Path to LinkML schema directory. Defaults to schemas/20251121/linkml/ + watch_for_changes: Whether to check file mtimes for cache invalidation + """ + self.schema_dir = schema_dir or SCHEMA_BASE_DIR + self.enums_dir = self.schema_dir / "modules" / "enums" + self.watch_for_changes = watch_for_changes + + # Cache of loaded enums + self._cache: dict[str, EnumMapping] = {} + + # File modification times for cache invalidation + self._file_mtimes: dict[str, float] = {} + + logger.info(f"OntologyMapper initialized with schema_dir: {self.schema_dir}") + + def _get_enum_file_path(self, enum_name: str) -> Path: + """Get the file path for an enum. + + Args: + enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum") + + Returns: + Path to the enum YAML file + """ + return self.enums_dir / f"{enum_name}.yaml" + + def _is_cache_stale(self, enum_name: str) -> bool: + """Check if cached enum is stale based on file mtime. + + Args: + enum_name: Name of the enum to check + + Returns: + True if cache is stale and needs reload + """ + if not self.watch_for_changes: + return False + + if enum_name not in self._cache: + return True + + filepath = self._get_enum_file_path(enum_name) + if not filepath.exists(): + return True + + current_mtime = filepath.stat().st_mtime + cached_mtime = self._file_mtimes.get(enum_name, 0.0) + + return current_mtime > cached_mtime + + def _parse_comments_to_synonyms( + self, + comments: list[str] | None + ) -> tuple[dict[str, list[str]], list[str]]: + """Parse comments field to extract multilingual synonyms. + + Handles three formats: + 1. Language-tagged: "Digitales Museum (de)" -> {"de": ["Digitales Museum"]} + 2. Comma-separated with auto-detection: "Includes musea, museos, musées" + -> {"nl": ["musea"], "es": ["museos"], "fr": ["musées"]} + 3. Plain terms: Added to all_normalized for fuzzy matching + + The auto-detection uses LANGUAGE_MARKERS to identify which language + each term belongs to based on known heritage vocabulary patterns. + + Args: + comments: List of comment strings from YAML + + Returns: + Tuple of (synonyms_by_language, all_normalized_synonyms) + + Example: + Input: ["Digitales Museum (de)", "Includes musea, museos, musées"] + Output: ( + {"de": ["Digitales Museum"], "nl": ["musea"], "es": ["museos"], "fr": ["musées"]}, + ["digitales museum", "musea", "museos", "musees", ...] + ) + """ + synonyms_by_lang: dict[str, list[str]] = {} + all_normalized: list[str] = [] + + if not comments: + return synonyms_by_lang, all_normalized + + def add_to_lang_dict(lang: str, term: str) -> None: + """Helper to add term to language-specific dict.""" + if lang not in synonyms_by_lang: + synonyms_by_lang[lang] = [] + # Avoid duplicates + if term not in synonyms_by_lang[lang]: + synonyms_by_lang[lang].append(term) + + for comment in comments: + # Try to parse explicit language tag first + lang, term = parse_language_tag(comment) + + # Add to language-specific dict if explicitly tagged + if lang: + add_to_lang_dict(lang, term) + + # Add normalized version to flat list + normalized = normalize_text(term) + if normalized and normalized not in all_normalized: + all_normalized.append(normalized) + + # Extract comma-separated terms within the comment + # This handles patterns like "Includes bibliotheken, bibliotecas, bibliothèques" + comma_terms = extract_comma_separated_terms(comment) + for cterm in comma_terms: + cterm_normalized = normalize_text(cterm) + if cterm_normalized and cterm_normalized not in all_normalized: + all_normalized.append(cterm_normalized) + + # Try to detect language for this term + detected_lang = detect_term_language(cterm) + if detected_lang: + # Store the original (unnormalized) term with its language + add_to_lang_dict(detected_lang, cterm) + + return synonyms_by_lang, all_normalized + + def load_enum(self, enum_name: str, force_reload: bool = False) -> EnumMapping | None: + """Load a single enum with cache invalidation. + + Args: + enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum") + force_reload: Force reload even if cached + + Returns: + EnumMapping object or None if file doesn't exist + """ + # Check cache + if not force_reload and not self._is_cache_stale(enum_name): + cached = self._cache.get(enum_name) + if cached: + return cached + + # Load from file + filepath = self._get_enum_file_path(enum_name) + if not filepath.exists(): + logger.warning(f"Enum file not found: {filepath}") + return None + + try: + with open(filepath, 'r', encoding='utf-8') as f: + yaml_content = yaml.safe_load(f) + except Exception as e: + logger.error(f"Failed to load enum {enum_name}: {e}") + return None + + # Parse the enum + file_mtime = filepath.stat().st_mtime + enums_section = yaml_content.get("enums", {}) + enum_def = enums_section.get(enum_name, {}) + + if not enum_def: + # Try to find any enum in the file + if enums_section: + enum_name = next(iter(enums_section.keys())) + enum_def = enums_section[enum_name] + + permissible_values = enum_def.get("permissible_values", {}) + + # Build EnumMapping + mapping = EnumMapping( + enum_name=enum_name, + source_file=filepath, + file_mtime=file_mtime, + description=yaml_content.get("description") or enum_def.get("description"), + ) + + for value_name, value_info in permissible_values.items(): + if value_info is None: + value_info = {} + + comments = value_info.get("comments", []) + synonyms, all_normalized = self._parse_comments_to_synonyms(comments) + + # Add description to normalized synonyms + description = value_info.get("description") + if description: + desc_normalized = normalize_text(description) + if desc_normalized and desc_normalized not in all_normalized: + all_normalized.append(desc_normalized) + + # Add the value name itself as a synonym + name_normalized = normalize_text(value_name.replace("_", " ")) + if name_normalized and name_normalized not in all_normalized: + all_normalized.insert(0, name_normalized) + + mapping.values[value_name] = EnumValueInfo( + name=value_name, + description=description, + wikidata_id=extract_wikidata_id(value_info.get("meaning")), + synonyms=synonyms, + all_synonyms_normalized=all_normalized, + ) + + # Update cache + self._cache[enum_name] = mapping + self._file_mtimes[enum_name] = file_mtime + + logger.debug(f"Loaded enum {enum_name} with {len(mapping.values)} values") + return mapping + + def load_all_enums(self) -> dict[str, EnumMapping]: + """Load all enum files from schema directory. + + Returns: + Dictionary mapping enum names to EnumMapping objects + """ + if not self.enums_dir.exists(): + logger.warning(f"Enums directory not found: {self.enums_dir}") + return {} + + loaded = {} + for filepath in self.enums_dir.glob("*.yaml"): + enum_name = filepath.stem + mapping = self.load_enum(enum_name) + if mapping: + loaded[enum_name] = mapping + + logger.info(f"Loaded {len(loaded)} enums from {self.enums_dir}") + return loaded + + def get_synonyms(self, enum_name: str, value: str) -> list[str]: + """Get all synonyms for an enum value. + + Args: + enum_name: Name of the enum + value: Enum value name + + Returns: + List of normalized synonyms + """ + mapping = self.load_enum(enum_name) + if not mapping: + return [] + + value_info = mapping.values.get(value) + if not value_info: + return [] + + return value_info.all_synonyms_normalized + + def get_enum_value_info(self, value_name: str, enum_name: str) -> EnumValueInfo | None: + """Get detailed EnumValueInfo for a specific enum value. + + This method provides access to the full EnumValueInfo dataclass, + including both language-tagged synonyms and all normalized synonyms. + + Args: + value_name: The enum value name (e.g., "MUSEUM", "LIBRARY") + enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum") + + Returns: + EnumValueInfo object or None if not found + + Example: + >>> mapper = get_ontology_mapper() + >>> info = mapper.get_enum_value_info("LIBRARY", "CustodianPrimaryTypeEnum") + >>> print(info.synonyms) # Language-tagged synonyms + {"nl": ["bibliotheken"], "es": ["bibliotecas"], "fr": ["bibliothèques"]} + >>> print(info.all_synonyms_normalized[:5]) # All normalized + ["library", "bibliotheken", "bibliotecas", "bibliotheques", ...] + """ + mapping = self.load_enum(enum_name) + if not mapping: + logger.debug(f"Enum {enum_name} not found for get_enum_value_info") + return None + + return mapping.values.get(value_name) + + def match_natural_language( + self, + text: str, + enum_name: str, + threshold: float = 0.8 + ) -> str | None: + """Fuzzy match natural language text to schema enum value. + + Args: + text: Natural language text to match (e.g., "virtueel museum") + enum_name: Name of the enum to match against + threshold: Similarity threshold for fuzzy matching (0.0-1.0) + + Returns: + Matched enum value name or None + + Examples: + >>> mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum") + "VIRTUAL_MUSEUM" + >>> mapper.match_natural_language("Digitales Museum", "DigitalPlatformTypeEnum") + "VIRTUAL_MUSEUM" + """ + mapping = self.load_enum(enum_name) + if not mapping: + return None + + normalized_query = normalize_text(text) + if not normalized_query: + return None + + # 1. Exact match against normalized synonyms + for value_name, value_info in mapping.values.items(): + if normalized_query in value_info.all_synonyms_normalized: + return value_name + + # 2. Substring match (query is contained in synonym or vice versa) + for value_name, value_info in mapping.values.items(): + for synonym in value_info.all_synonyms_normalized: + if normalized_query in synonym or synonym in normalized_query: + return value_name + + # 3. Fuzzy match using basic similarity + best_match: str | None = None + best_score = 0.0 + + for value_name, value_info in mapping.values.items(): + for synonym in value_info.all_synonyms_normalized: + score = self._simple_similarity(normalized_query, synonym) + if score > best_score and score >= threshold: + best_score = score + best_match = value_name + + return best_match + + def _simple_similarity(self, s1: str, s2: str) -> float: + """Calculate simple similarity ratio between two strings. + + Uses multiple approaches: + 1. Exact match (1.0) + 2. Prefix match for singular/plural handling (0.9) + 3. Word-level Jaccard similarity + 4. Character bigram similarity + + Args: + s1: First string + s2: Second string + + Returns: + Similarity ratio (0.0-1.0) + """ + if not s1 or not s2: + return 0.0 + + # Exact match + if s1 == s2: + return 1.0 + + # Prefix match - handles singular/plural variations + # e.g., "bibliotheek" matches "bibliotheken" (Dutch) + # e.g., "archief" matches "archieven" (Dutch) + min_len = min(len(s1), len(s2)) + max_len = max(len(s1), len(s2)) + + # If one is a prefix of the other (with reasonable length overlap) + if min_len >= 5 and max_len - min_len <= 3: + shorter, longer = (s1, s2) if len(s1) < len(s2) else (s2, s1) + if longer.startswith(shorter): + return 0.95 # High score for prefix match + + # Common stem match - handle variations like archief/archieven, museum/musea + # Use shared prefix ratio + shared_prefix_len = 0 + for i in range(min_len): + if s1[i] == s2[i]: + shared_prefix_len += 1 + else: + break + + # If they share a significant prefix (>= 70% of shorter word) + if shared_prefix_len >= 4 and shared_prefix_len / min_len >= 0.7: + return 0.90 + + # Word-level comparison + words1 = set(s1.split()) + words2 = set(s2.split()) + + if words1 and words2: + intersection = len(words1 & words2) + union = len(words1 | words2) + word_similarity = intersection / union if union > 0 else 0.0 + + # Boost if high word overlap + if word_similarity > 0.5: + return word_similarity + + # Character-level bigram comparison + def get_bigrams(s: str) -> set[str]: + return {s[i:i+2] for i in range(len(s) - 1)} if len(s) > 1 else {s} + + bigrams1 = get_bigrams(s1) + bigrams2 = get_bigrams(s2) + + intersection = len(bigrams1 & bigrams2) + union = len(bigrams1 | bigrams2) + + return intersection / union if union > 0 else 0.0 + + def get_heritage_type_code(self, custodian_type: str) -> str | None: + """Map CustodianPrimaryTypeEnum value to single-letter heritage code. + + Args: + custodian_type: Enum value (e.g., "MUSEUM", "ARCHIVE") + + Returns: + Single-letter GLAMORCUBESFIXPHDNT code or None + + Example: + >>> mapper.get_heritage_type_code("MUSEUM") + "M" + >>> mapper.get_heritage_type_code("ARCHIVE") + "A" + """ + return GLAMORCUBESFIXPHDNT_CODES.get(custodian_type) + + def get_custodian_type_to_code_mapping(self) -> dict[str, str]: + """Generate CustodianPrimaryTypeEnum -> single-letter code mapping. + + This replaces the hardcoded CUSTODIAN_TYPE_TO_HERITAGE_CODE dict + in hybrid_retriever.py. + + Returns: + Dict mapping enum values to single-letter codes + """ + # Load the enum to get actual values + mapping = self.load_enum("CustodianPrimaryTypeEnum") + + result = {} + if mapping: + for value_name in mapping.values: + code = GLAMORCUBESFIXPHDNT_CODES.get(value_name) + if code: + result[value_name] = code + else: + # Fall back to static mapping if enum can't be loaded + result = GLAMORCUBESFIXPHDNT_CODES.copy() + + return result + + def get_synonyms_for_value(self, value_name: str, enum_name: str) -> set[str]: + """Get all synonyms for a specific enum value. + + This method retrieves all synonyms associated with an enum value, + useful for building prompt context or understanding what natural language + terms map to a given enum value. + + Collects synonyms from: + 1. Language-tagged synonyms in comments (e.g., "bibliotheek [nl]") + 2. Normalized synonyms from comma-separated lists (e.g., "Includes bibliotheken, bibliotecas") + + Args: + value_name: The enum value name (e.g., "MUSEUM", "LIBRARY") + enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum") + + Returns: + Set of synonym strings. Returns empty set if enum or value not found. + + Example: + >>> mapper = get_ontology_mapper() + >>> synonyms = mapper.get_synonyms_for_value("LIBRARY", "CustodianPrimaryTypeEnum") + >>> print(synonyms) + {"bibliotheken", "bibliotecas", "bibliotheques", "library", ...} + """ + mapping = self.load_enum(enum_name) + if not mapping: + logger.debug(f"Enum {enum_name} not found for get_synonyms_for_value") + return set() + + value_info = mapping.values.get(value_name) + if not value_info: + logger.debug(f"Value {value_name} not found in enum {enum_name}") + return set() + + # Collect all synonyms from multiple sources + all_synonyms: set[str] = set() + + # 1. Add language-tagged synonyms (from patterns like "bibliotheek [nl]") + for lang_code, lang_synonyms in value_info.synonyms.items(): + all_synonyms.update(lang_synonyms) + + # 2. Add normalized synonyms (from comma-separated lists in comments) + # These are extracted during load_enum() from patterns like + # "Includes bibliotheken, bibliotecas, bibliothèques" + all_synonyms.update(value_info.all_synonyms_normalized) + + return all_synonyms + + def get_all_synonyms_by_language( + self, + value_name: str, + enum_name: str + ) -> dict[str, set[str]]: + """Get synonyms for a value organized by language. + + Returns language-tagged synonyms from comments, plus an "all" key + containing all normalized synonyms (not language-specific). + + Args: + value_name: The enum value name (e.g., "MUSEUM", "LIBRARY") + enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum") + + Returns: + Dict mapping language codes to sets of synonyms. The special key "all" + contains all normalized synonyms regardless of language. + Returns empty dict if enum or value not found. + + Example: + >>> mapper = get_ontology_mapper() + >>> by_lang = mapper.get_all_synonyms_by_language("LIBRARY", "CustodianPrimaryTypeEnum") + >>> print(by_lang) + { + "nl": {"bibliotheek", "bibliotheken"}, + "de": {"Bibliothek"}, + "all": {"library", "bibliotheken", "bibliotecas", "bibliotheques", ...} + } + """ + mapping = self.load_enum(enum_name) + if not mapping: + return {} + + value_info = mapping.values.get(value_name) + if not value_info: + return {} + + # Start with language-tagged synonyms + result = {lang: set(syns) for lang, syns in value_info.synonyms.items()} + + # Add "all" key with all normalized synonyms + result["all"] = set(value_info.all_synonyms_normalized) + + return result + + def get_enum_values_for_prompt( + self, + enum_name: str, + max_values: int = 20, + include_descriptions: bool = True + ) -> str: + """Format enum values for DSPy prompt injection. + + Args: + enum_name: Name of the enum + max_values: Maximum number of values to include + include_descriptions: Whether to include value descriptions + + Returns: + Formatted string for prompt injection + """ + mapping = self.load_enum(enum_name) + if not mapping: + return f"[Enum {enum_name} not found]" + + lines = [f"Valid values for {enum_name}:"] + + for i, (value_name, value_info) in enumerate(mapping.values.items()): + if i >= max_values: + remaining = len(mapping.values) - max_values + lines.append(f" ... and {remaining} more values") + break + + if include_descriptions and value_info.description: + # Truncate long descriptions + desc = value_info.description[:60] + if len(value_info.description) > 60: + desc += "..." + lines.append(f" - {value_name}: {desc}") + else: + lines.append(f" - {value_name}") + + return "\n".join(lines) + + def get_valid_filter_values(self, enum_name: str) -> list[str]: + """Get list of valid values for filtering (e.g., Qdrant). + + Args: + enum_name: Name of the enum + + Returns: + List of valid enum value names + """ + mapping = self.load_enum(enum_name) + if not mapping: + return [] + + return list(mapping.values.keys()) + + def invalidate_cache_if_changed(self) -> bool: + """Check all cached enums and invalidate stale entries. + + Returns: + True if any cache entries were invalidated + """ + if not self.watch_for_changes: + return False + + invalidated = False + for enum_name in list(self._cache.keys()): + if self._is_cache_stale(enum_name): + del self._cache[enum_name] + del self._file_mtimes[enum_name] + invalidated = True + logger.info(f"Invalidated stale cache for {enum_name}") + + return invalidated + + def clear_cache(self) -> None: + """Clear all cached enums.""" + self._cache.clear() + self._file_mtimes.clear() + logger.info("Cleared ontology mapper cache") + + # ========================================================================= + # Role Category Mapping (for person search) + # ========================================================================= + + def get_role_category_keywords(self) -> dict[str, list[str]]: + """Load role category keywords from RoleCategoryEnum. + + This replaces the hardcoded ROLE_CATEGORY_KEYWORDS dict. + Keywords are extracted from the 'comments' field of each enum value. + + Returns: + Dict mapping role category to list of keywords + """ + # Try to load from StaffRole.yaml which contains RoleCategoryEnum + staff_role_path = self.schema_dir / "modules" / "classes" / "StaffRole.yaml" + + if not staff_role_path.exists(): + logger.warning(f"StaffRole.yaml not found: {staff_role_path}") + return {} + + try: + with open(staff_role_path, 'r', encoding='utf-8') as f: + yaml_content = yaml.safe_load(f) + except Exception as e: + logger.error(f"Failed to load StaffRole.yaml: {e}") + return {} + + enums = yaml_content.get("enums", {}) + role_category_enum = enums.get("RoleCategoryEnum", {}) + permissible_values = role_category_enum.get("permissible_values", {}) + + result = {} + for category_name, category_info in permissible_values.items(): + if category_info is None: + continue + + # Extract keywords from comments and description + keywords = [] + + # Get keywords from comments + comments = category_info.get("comments", []) + for comment in comments: + # Parse language tag if present + _, term = parse_language_tag(comment) + normalized = normalize_text(term) + if normalized: + keywords.append(normalized) + + # Add keywords from description + description = category_info.get("description") + if description: + # Split description into words and add significant ones + words = description.lower().split() + for word in words: + if len(word) > 3 and word not in {"with", "that", "from", "have", "this"}: + keywords.append(normalize_text(word)) + + # Add the category name itself + keywords.append(normalize_text(category_name)) + + # Remove duplicates while preserving order + seen = set() + unique_keywords = [] + for kw in keywords: + if kw and kw not in seen: + seen.add(kw) + unique_keywords.append(kw) + + result[category_name] = unique_keywords + + return result + + +# ============================================================================= +# Singleton Access Pattern +# ============================================================================= + +_ontology_mapper: OntologyMapper | None = None + + +def get_ontology_mapper() -> OntologyMapper: + """Get singleton OntologyMapper instance. + + Returns: + Shared OntologyMapper instance + """ + global _ontology_mapper + if _ontology_mapper is None: + _ontology_mapper = OntologyMapper(SCHEMA_BASE_DIR) + return _ontology_mapper + + +def reset_ontology_mapper() -> None: + """Reset the singleton instance (useful for testing).""" + global _ontology_mapper + _ontology_mapper = None + + +# ============================================================================= +# Convenience Functions +# ============================================================================= + +def match_custodian_type(text: str) -> str | None: + """Match text to CustodianPrimaryTypeEnum value. + + Args: + text: Natural language text describing institution type + + Returns: + Matched enum value or None + + Example: + >>> match_custodian_type("museum") + "MUSEUM" + >>> match_custodian_type("bibliotheek") + "LIBRARY" + """ + return get_ontology_mapper().match_natural_language(text, "CustodianPrimaryTypeEnum") + + +def match_museum_type(text: str) -> str | None: + """Match text to MuseumTypeEnum value. + + Args: + text: Natural language text describing museum type + + Returns: + Matched enum value or None + """ + return get_ontology_mapper().match_natural_language(text, "MuseumTypeEnum") + + +def match_digital_platform_type(text: str) -> str | None: + """Match text to DigitalPlatformTypeEnum value. + + Args: + text: Natural language text describing digital platform type + + Returns: + Matched enum value or None + + Example: + >>> match_digital_platform_type("virtueel museum") + "VIRTUAL_MUSEUM" + """ + return get_ontology_mapper().match_natural_language(text, "DigitalPlatformTypeEnum") + + +def get_heritage_code(custodian_type: str) -> str | None: + """Get single-letter heritage code for custodian type. + + Args: + custodian_type: CustodianPrimaryTypeEnum value + + Returns: + Single-letter GLAMORCUBESFIXPHDNT code + + Example: + >>> get_heritage_code("MUSEUM") + "M" + """ + return get_ontology_mapper().get_heritage_type_code(custodian_type) + + +def get_custodian_type_mapping() -> dict[str, str]: + """Get custodian type to heritage code mapping. + + Replaces hardcoded CUSTODIAN_TYPE_TO_HERITAGE_CODE in hybrid_retriever.py. + + Returns: + Dict mapping CustodianPrimaryTypeEnum values to single-letter codes + """ + return get_ontology_mapper().get_custodian_type_to_code_mapping() + + +def get_role_keywords() -> dict[str, list[str]]: + """Get role category to keywords mapping. + + Replaces hardcoded ROLE_CATEGORY_KEYWORDS in hybrid_retriever.py. + + Returns: + Dict mapping RoleCategoryEnum values to keyword lists + """ + return get_ontology_mapper().get_role_category_keywords() + + +# ============================================================================= +# Main (for testing) +# ============================================================================= + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + print("\n=== Testing OntologyMapper ===\n") + + mapper = get_ontology_mapper() + + # Test loading an enum + print("1. Loading DigitalPlatformTypeEnum...") + dp_enum = mapper.load_enum("DigitalPlatformTypeEnum") + if dp_enum: + print(f" Loaded {len(dp_enum.values)} values") + print(f" Sample values: {list(dp_enum.values.keys())[:5]}") + + # Test natural language matching + print("\n2. Testing natural language matching...") + test_queries = [ + ("virtueel museum", "DigitalPlatformTypeEnum"), + ("Digitales Museum", "DigitalPlatformTypeEnum"), + ("museo virtual", "DigitalPlatformTypeEnum"), + ("musée virtuel", "DigitalPlatformTypeEnum"), + ("digital library", "DigitalPlatformTypeEnum"), + ("museum", "CustodianPrimaryTypeEnum"), + ("bibliotheek", "CustodianPrimaryTypeEnum"), + ("archief", "CustodianPrimaryTypeEnum"), + ] + + for query, enum_name in test_queries: + result = mapper.match_natural_language(query, enum_name) + print(f" '{query}' -> {result}") + + # Test heritage code mapping + print("\n3. Testing heritage code mapping...") + type_to_code = mapper.get_custodian_type_to_code_mapping() + print(f" Loaded {len(type_to_code)} mappings") + for k, v in list(type_to_code.items())[:5]: + print(f" {k} -> {v}") + + # Test loading all enums + print("\n4. Loading all enums...") + all_enums = mapper.load_all_enums() + print(f" Loaded {len(all_enums)} enums") + + # Show enum value counts + print("\n5. Enum value counts:") + for enum_name, enum_mapping in sorted(all_enums.items(), key=lambda x: len(x[1].values), reverse=True)[:10]: + print(f" {enum_name}: {len(enum_mapping.values)} values") + + # Test prompt formatting + print("\n6. Testing prompt formatting...") + prompt = mapper.get_enum_values_for_prompt("CustodianPrimaryTypeEnum", max_values=5) + print(prompt) + + print("\n=== Tests Complete ===") diff --git a/backend/rag/schema_loader.py b/backend/rag/schema_loader.py index ba3749878b..1866baeabc 100644 --- a/backend/rag/schema_loader.py +++ b/backend/rag/schema_loader.py @@ -85,6 +85,21 @@ class ClassDefinition: narrow_mappings: list[str] = field(default_factory=list) +@dataclass +class StaffRoleDefinition: + """A staff role class definition from LinkML schema. + + Represents an official job title/appellation in heritage institutions, + categorized by role family (CURATORIAL, ARCHIVAL, DIGITAL, etc.). + """ + name: str + category: str # CURATORIAL, ARCHIVAL, DIGITAL, etc. + description: Optional[str] = None + class_uri: Optional[str] = None + common_variants: list[str] = field(default_factory=list) + wikidata_mapping: Optional[str] = None # e.g., wikidata:Q674426 + + @dataclass class HeritageSchema: """Complete parsed heritage custodian schema.""" @@ -109,6 +124,12 @@ class HeritageSchema: # Custodian types (from CustodianPrimaryTypeEnum) custodian_types: list[EnumValue] = field(default_factory=list) + # Staff roles organized by category (from StaffRoles.yaml) + staff_roles: dict[str, list[StaffRoleDefinition]] = field(default_factory=dict) + + # Role categories (from RoleCategoryEnum in StaffRole.yaml) + role_categories: list[EnumValue] = field(default_factory=list) + def get_sparql_prefixes(self) -> str: """Generate SPARQL prefix declarations from schema prefixes.""" lines = [] @@ -120,6 +141,24 @@ class HeritageSchema: """Get list of custodian type enum values.""" return [v.name for v in self.custodian_types] + def get_staff_role_names(self) -> list[str]: + """Get flat list of all staff role class names.""" + roles = [] + for category_roles in self.staff_roles.values(): + roles.extend([r.name for r in category_roles]) + return sorted(roles) + + def get_staff_role_category_names(self) -> list[str]: + """Get list of staff role category names.""" + return [v.name for v in self.role_categories] + + def get_staff_roles_by_category(self) -> dict[str, list[str]]: + """Get staff role names organized by category.""" + return { + category: [r.name for r in roles] + for category, roles in self.staff_roles.items() + } + def get_class_description(self, class_name: str) -> Optional[str]: """Get description for a class.""" cls = self.classes.get(class_name) @@ -154,6 +193,28 @@ class HeritageSchema: lines.append(f" - {uri}: {desc}") return "\n".join(lines) + def format_staff_role_categories_for_prompt(self) -> str: + """Format staff role categories for DSPy prompt injection.""" + lines = ["Staff Role Categories (13 categories):"] + for rc in self.role_categories: + desc = rc.description[:60] if rc.description else rc.name + lines.append(f" - {rc.name}: {desc}") + return "\n".join(lines) + + def format_staff_roles_for_prompt(self, max_per_category: int = 5) -> str: + """Format staff roles for DSPy prompt injection. + + Args: + max_per_category: Maximum roles to show per category (for brevity) + """ + lines = ["Staff Roles by Category:"] + for category, roles in sorted(self.staff_roles.items()): + role_names = [r.name for r in roles[:max_per_category]] + if len(roles) > max_per_category: + role_names.append(f"... +{len(roles) - max_per_category} more") + lines.append(f" - {category}: {', '.join(role_names)}") + return "\n".join(lines) + def format_ontology_context_for_prompt(self) -> str: """Format complete ontology context for DSPy prompts.""" sections = [ @@ -173,9 +234,19 @@ class HeritageSchema: "", self.format_key_properties_for_prompt(), "", - "Key Ontology Prefixes:", ] + # Add staff roles if loaded + if self.role_categories: + sections.extend([ + self.format_staff_role_categories_for_prompt(), + "", + self.format_staff_roles_for_prompt(), + "", + ]) + + sections.append("Key Ontology Prefixes:") + for prefix, info in list(self.prefixes.items())[:12]: # Top 12 prefixes sections.append(f" PREFIX {prefix}: <{info.uri}>") @@ -261,9 +332,22 @@ class SchemaLoader: # Load key slots schema.slots = self._load_key_slots() + # Load staff role categories (RoleCategoryEnum) + schema.role_categories = self._load_role_categories() + schema.enums["RoleCategoryEnum"] = EnumDefinition( + name="RoleCategoryEnum", + description="Staff Role Categories", + values=schema.role_categories, + ) + + # Load staff roles organized by category + schema.staff_roles = self._load_staff_roles() + self._schema = schema logger.info(f"Loaded schema with {len(schema.classes)} classes, " - f"{len(schema.slots)} slots, {len(schema.custodian_types)} custodian types") + f"{len(schema.slots)} slots, {len(schema.custodian_types)} custodian types, " + f"{len(schema.role_categories)} role categories, " + f"{sum(len(r) for r in schema.staff_roles.values())} staff roles") return schema @@ -433,6 +517,104 @@ class SchemaLoader: logger.warning(f"Could not load slot from {filepath}: {e}") return slots + + def _load_role_categories(self) -> list[EnumValue]: + """Load RoleCategoryEnum values from StaffRole.yaml.""" + enum_path = self.schema_dir / "modules" / "classes" / "StaffRole.yaml" + if not enum_path.exists(): + logger.warning(f"StaffRole.yaml not found: {enum_path}") + return [] + + try: + with open(enum_path, "r", encoding="utf-8") as f: + staff_role_yaml = yaml.safe_load(f) + + values = [] + enum_def = staff_role_yaml.get("enums", {}).get("RoleCategoryEnum", {}) + permissible_values = enum_def.get("permissible_values", {}) + + for name, info in permissible_values.items(): + values.append(EnumValue( + name=name, + description=info.get("description") if info else None, + )) + + logger.debug(f"Loaded {len(values)} role categories") + return values + + except Exception as e: + logger.warning(f"Could not load role categories: {e}") + return [] + + def _load_staff_roles(self) -> dict[str, list[StaffRoleDefinition]]: + """Load staff role classes organized by category from StaffRoles.yaml. + + Parses the slot_usage.role_category.ifabsent pattern to determine category. + Example: ifabsent: "string(CURATORIAL)" -> category = "CURATORIAL" + + Returns: + Dictionary mapping category name to list of StaffRoleDefinition + """ + import re + + roles_path = self.schema_dir / "modules" / "classes" / "StaffRoles.yaml" + if not roles_path.exists(): + logger.warning(f"StaffRoles.yaml not found: {roles_path}") + return {} + + try: + with open(roles_path, "r", encoding="utf-8") as f: + roles_yaml = yaml.safe_load(f) + + roles_by_category: dict[str, list[StaffRoleDefinition]] = {} + class_defs = roles_yaml.get("classes", {}) + + # Regex to extract category from ifabsent: "string(CURATORIAL)" + ifabsent_pattern = re.compile(r'string\((\w+)\)') + + for class_name, class_info in class_defs.items(): + if not class_info: + continue + + # Extract category from slot_usage.role_category.ifabsent + category = "UNKNOWN" + slot_usage = class_info.get("slot_usage", {}) + role_category = slot_usage.get("role_category", {}) + ifabsent = role_category.get("ifabsent", "") + + match = ifabsent_pattern.search(ifabsent) + if match: + category = match.group(1) + + # Extract wikidata mapping from exact_mappings + wikidata_mapping = None + exact_mappings = class_info.get("exact_mappings", []) + for mapping in exact_mappings: + if mapping.startswith("wikidata:"): + wikidata_mapping = mapping + break + + # Create role definition + role_def = StaffRoleDefinition( + name=class_name, + category=category, + description=class_info.get("description"), + class_uri=class_info.get("class_uri"), + wikidata_mapping=wikidata_mapping, + ) + + # Add to category + if category not in roles_by_category: + roles_by_category[category] = [] + roles_by_category[category].append(role_def) + + total_roles = sum(len(r) for r in roles_by_category.values()) + logger.debug(f"Loaded {total_roles} staff roles across {len(roles_by_category)} categories") + return roles_by_category + + except Exception as e: + logger.warning(f"Could not load staff roles: {e}") + return {} # Singleton instance for easy access @@ -480,6 +662,45 @@ def get_key_properties_prompt() -> str: return get_heritage_schema().format_key_properties_for_prompt() +# Staff Role Convenience Functions +def get_staff_role_categories() -> list[str]: + """Get list of staff role category names (13 categories). + + Returns: + List of role category names like ['CURATORIAL', 'ARCHIVAL', 'DIGITAL', ...] + """ + return get_heritage_schema().get_staff_role_category_names() + + +def get_all_staff_roles() -> list[str]: + """Get flat list of all staff role class names (64 roles). + + Returns: + List of role names like ['Curator', 'Archivist', 'DataEngineer', ...] + """ + return get_heritage_schema().get_staff_role_names() + + +def get_staff_role_classes() -> dict[str, list[str]]: + """Get staff role names organized by category. + + Returns: + Dictionary mapping category to list of role names. + Example: {'CURATORIAL': ['Curator', 'CollectionsManager'], ...} + """ + return get_heritage_schema().get_staff_roles_by_category() + + +def get_staff_roles_prompt() -> str: + """Get formatted staff roles for DSPy prompts.""" + return get_heritage_schema().format_staff_roles_for_prompt() + + +def get_staff_role_categories_prompt() -> str: + """Get formatted staff role categories for DSPy prompts.""" + return get_heritage_schema().format_staff_role_categories_for_prompt() + + # ============================================================================= # Schema-Aware Signature Helpers # ============================================================================= @@ -534,7 +755,11 @@ def create_schema_aware_sparql_docstring() -> str: def create_schema_aware_entity_docstring() -> str: - """Create docstring for entity extractor with schema-derived types.""" + """Create docstring for entity extractor with schema-derived types. + + Includes multilingual synonyms with language tags when ontology_mapping + module is available, enabling better entity recognition across languages. + """ schema = get_heritage_schema() type_lines = [] @@ -543,6 +768,62 @@ def create_schema_aware_entity_docstring() -> str: desc = ct.description.split("(")[0].strip() if ct.description else ct.name type_lines.append(f" - {ct.name}: {desc}") + # Build multilingual synonym section with language tags + synonym_lines = [] + try: + # Import dynamically to avoid circular imports + from backend.rag.ontology_mapping import get_ontology_mapper + mapper = get_ontology_mapper() + + # Key types to include synonyms for + key_types = [ + "MUSEUM", "LIBRARY", "ARCHIVE", "GALLERY", "RESEARCH_CENTER", + "EDUCATION_PROVIDER", "HOLY_SACRED_SITE", "BIO_CUSTODIAN", + ] + + for custodian_type in key_types: + by_lang = mapper.get_all_synonyms_by_language( + custodian_type, "CustodianPrimaryTypeEnum" + ) + + tagged_syns: list[str] = [] + # Sort languages for consistent output + for lang in sorted(by_lang.keys()): + if lang == "all": # Skip the aggregate 'all' key + continue + syns = by_lang[lang] + # Take up to 2 synonyms per language + for syn in sorted(syns)[:2]: + tagged_syns.append(f"{syn} ({lang})") + + if tagged_syns: + # Limit to 6 total synonyms per type for brevity + synonym_lines.append(f" - {custodian_type}: {', '.join(tagged_syns[:6])}") + + logger.debug(f"Built multilingual synonyms for {len(synonym_lines)} types") + + except ImportError: + logger.warning("ontology_mapping not available, using static synonyms") + # Fallback to static synonyms without language tags + synonym_lines = [ + ' - MUSEUM: "museum", "musea", "museo", "musée"', + ' - LIBRARY: "library", "bibliotheek", "bibliothèque"', + ' - ARCHIVE: "archive", "archief", "archiv"', + ' - GALLERY: "gallery", "galerie"', + ] + except Exception as e: + logger.warning(f"Could not build multilingual synonyms: {e}") + synonym_lines = [] + + # Format synonym section + if synonym_lines: + synonym_section = f""" + MULTILINGUAL SYNONYMS (term + language code): +{chr(10).join(synonym_lines)} +""" + else: + synonym_section = "" + docstring = f"""Extract heritage-specific entities from text. Identify institutions, places, dates, identifiers, and relationships @@ -556,15 +837,9 @@ def create_schema_aware_entity_docstring() -> str: - PLACES: Geographic locations (cities, regions, countries) - TEMPORAL: Dates and time periods (founding, closure, events) - IDENTIFIERS: ISIL codes (NL-XXXX), Wikidata IDs (Q12345), GHCIDs - - Map institution mentions to appropriate GLAMORCUBESFIXPHDNT type: - - "museum", "musea", "museo" → MUSEUM - - "library", "bibliotheek", "bibliothek" → LIBRARY - - "archive", "archief", "archiv" → ARCHIVE - - "gallery", "galerie" → GALLERY - - "university", "universiteit" → EDUCATION_PROVIDER - - "botanical garden", "zoo" → BIO_CUSTODIAN - - "church", "monastery", "temple" → HOLY_SACRED_SITE +{synonym_section} + When extracting institution types, recognize synonyms in ANY language + and map them to the canonical GLAMORCUBESFIXPHDNT type. """ return docstring diff --git a/pyproject.toml b/pyproject.toml index ca1516aa4e..b4a284da94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,8 @@ numpy = ">=2.0.0" # NOTE: NLP extraction (NER) is handled by coding subagents via Task tool # spaCy, transformers, torch are NOT direct dependencies rapidfuzz = "^3.5.0" # Fuzzy string matching for deduplication -langdetect = "^1.0.9" # Language detection +langdetect = "^1.0.9" # Language detection (fallback) +fast-langdetect = "^1.0.0" # FastText-based language detection (primary, more accurate) unidecode = "^1.3.7" # Unicode transliteration # Web crawling and scraping @@ -98,6 +99,7 @@ jupyter = "^1.0.0" ipykernel = "^6.27.0" matplotlib = "^3.8.0" seaborn = "^0.13.0" +types-pyyaml = "^6.0.12.20250915" [tool.poetry.scripts] glam = "glam_extractor.cli:main" diff --git a/tests/rag/__init__.py b/tests/rag/__init__.py new file mode 100644 index 0000000000..7fe257220e --- /dev/null +++ b/tests/rag/__init__.py @@ -0,0 +1 @@ +# Tests for RAG pipeline components diff --git a/tests/rag/test_ontology_mapping.py b/tests/rag/test_ontology_mapping.py new file mode 100644 index 0000000000..77fdacc9b4 --- /dev/null +++ b/tests/rag/test_ontology_mapping.py @@ -0,0 +1,935 @@ +""" +Tests for backend.rag.ontology_mapping module. + +This module tests the dynamic ontology mapping system that loads LinkML schema +enumerations and provides multilingual matching for the Heritage RAG pipeline. + +Coverage: +- Enum loading and caching +- Multilingual synonym extraction from YAML comments +- Natural language fuzzy matching (Dutch, German, French, Spanish) +- Singular/plural handling (bibliotheek → bibliotheken) +- Heritage code mapping (GLAMORCUBESFIXPHDNT) +- Cache invalidation +- Role category keyword extraction +""" + +from __future__ import annotations + +import os +import tempfile +from pathlib import Path +from unittest.mock import patch + +import pytest +import yaml + +# Import module under test +from backend.rag.ontology_mapping import ( + GLAMORCUBESFIXPHDNT_CODES, + SCHEMA_BASE_DIR, + EnumMapping, + EnumValueInfo, + OntologyMapper, + detect_term_language, + extract_comma_separated_terms, + extract_wikidata_id, + get_custodian_type_mapping, + get_heritage_code, + get_ontology_mapper, + get_role_keywords, + match_custodian_type, + match_digital_platform_type, + match_museum_type, + normalize_text, + parse_language_tag, + reset_ontology_mapper, +) + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def mapper() -> OntologyMapper: + """Create a fresh OntologyMapper instance.""" + return OntologyMapper(SCHEMA_BASE_DIR) + + +@pytest.fixture +def temp_enum_dir(tmp_path: Path) -> Path: + """Create a temporary directory with test enum files.""" + enums_dir = tmp_path / "modules" / "enums" + enums_dir.mkdir(parents=True) + return tmp_path + + +@pytest.fixture +def sample_enum_yaml() -> dict: + """Sample enum YAML content for testing.""" + return { + "enums": { + "TestEnum": { + "description": "Test enumeration", + "permissible_values": { + "VALUE_ONE": { + "description": "First test value", + "meaning": "wikidata:Q12345", + "comments": [ + "waarde een (nl)", + "Wert eins (de)", + "valeur un (fr)", + ], + }, + "VALUE_TWO": { + "description": "Second test value", + "meaning": "wikidata:Q67890", + "comments": [ + "Includes alpha, beta, gamma", + ], + }, + "VALUE_THREE": { + "description": "Third value with no comments", + }, + }, + } + } + } + + +@pytest.fixture +def temp_mapper(temp_enum_dir: Path, sample_enum_yaml: dict) -> OntologyMapper: + """Create mapper with temporary test enum file.""" + # Write sample enum file + enum_file = temp_enum_dir / "modules" / "enums" / "TestEnum.yaml" + with open(enum_file, "w") as f: + yaml.dump(sample_enum_yaml, f) + + return OntologyMapper(temp_enum_dir) + + +# ============================================================================= +# Test: normalize_text +# ============================================================================= + + +class TestNormalizeText: + """Tests for normalize_text function.""" + + def test_lowercase(self): + """Should convert to lowercase.""" + assert normalize_text("MUSEUM") == "museum" + assert normalize_text("Museum") == "museum" + + def test_strip_whitespace(self): + """Should strip leading/trailing whitespace.""" + assert normalize_text(" museum ") == "museum" + assert normalize_text("\tarchive\n") == "archive" + + def test_remove_diacritics(self): + """Should remove accents/diacritics.""" + assert normalize_text("Bibliothèque") == "bibliotheque" + assert normalize_text("musée") == "musee" + assert normalize_text("Müzeum") == "muzeum" + assert normalize_text("café") == "cafe" + assert normalize_text("naïve") == "naive" + + def test_combined(self): + """Should handle combined normalization.""" + assert normalize_text(" Musée Virtuel ") == "musee virtuel" + assert normalize_text("BIBLIOTHÈQUE NATIONALE") == "bibliotheque nationale" + + +# ============================================================================= +# Test: parse_language_tag +# ============================================================================= + + +class TestParseLanguageTag: + """Tests for parse_language_tag function.""" + + def test_dutch_tag(self): + """Should parse Dutch language tag.""" + lang, term = parse_language_tag("virtueel museum (nl)") + assert lang == "nl" + assert term == "virtueel museum" + + def test_german_tag(self): + """Should parse German language tag.""" + lang, term = parse_language_tag("Digitales Museum (de)") + assert lang == "de" + assert term == "Digitales Museum" + + def test_french_tag(self): + """Should parse French language tag.""" + lang, term = parse_language_tag("musée virtuel (fr)") + assert lang == "fr" + assert term == "musée virtuel" + + def test_spanish_tag(self): + """Should parse Spanish language tag.""" + lang, term = parse_language_tag("museo virtual (es)") + assert lang == "es" + assert term == "museo virtual" + + def test_no_tag(self): + """Should return None for lang when no tag present.""" + lang, term = parse_language_tag("Some plain comment") + assert lang is None + assert term == "Some plain comment" + + def test_unsupported_language(self): + """Should treat unsupported language codes as no tag.""" + lang, term = parse_language_tag("text (xyz)") + assert lang is None # xyz is not supported + + def test_uppercase_tag(self): + """Should handle uppercase language tags.""" + lang, term = parse_language_tag("museum (NL)") + assert lang == "nl" + assert term == "museum" + + +# ============================================================================= +# Test: extract_comma_separated_terms +# ============================================================================= + + +class TestExtractCommaSeparatedTerms: + """Tests for extract_comma_separated_terms function.""" + + def test_simple_list(self): + """Should extract simple comma-separated terms.""" + terms = extract_comma_separated_terms("alpha, beta, gamma") + assert "alpha" in terms + assert "beta" in terms + assert "gamma" in terms + + def test_includes_prefix(self): + """Should strip 'Includes' prefix.""" + terms = extract_comma_separated_terms("Includes bibliotheken, bibliotecas, bibliothèques") + assert "bibliotheken" in terms + assert "bibliotecas" in terms + assert "bibliothèques" in terms + assert "Includes" not in " ".join(terms) + + def test_examples_prefix(self): + """Should strip 'Examples:' prefix.""" + terms = extract_comma_separated_terms("Examples: museum, archive, library") + assert "museum" in terms + assert "archive" in terms + assert "library" in terms + + def test_no_commas(self): + """Should return empty list for single term.""" + terms = extract_comma_separated_terms("Just a single comment") + assert terms == [] + + def test_skip_long_sentences(self): + """Should skip terms that look like sentences (> 50 chars).""" + long_term = "This is a very long sentence that should be skipped because it exceeds fifty characters" + terms = extract_comma_separated_terms(f"short term, {long_term}") + assert "short term" in terms + assert long_term not in terms + + def test_strip_wikidata_references(self): + """Should strip trailing Wikidata references.""" + terms = extract_comma_separated_terms("botanical gardens (Q473972), zoos") + assert "botanical gardens" in terms + assert "zoos" in terms + assert "(Q473972)" not in " ".join(terms) + + +# ============================================================================= +# Test: extract_wikidata_id +# ============================================================================= + + +class TestExtractWikidataId: + """Tests for extract_wikidata_id function.""" + + def test_wikidata_prefix(self): + """Should extract ID with wikidata: prefix.""" + assert extract_wikidata_id("wikidata:Q12345") == "Q12345" + assert extract_wikidata_id("wikidata:Q1225034") == "Q1225034" + + def test_full_uri(self): + """Should extract ID from full Wikidata URI.""" + assert extract_wikidata_id("http://www.wikidata.org/entity/Q12345") == "Q12345" + assert extract_wikidata_id("https://www.wikidata.org/wiki/Q67890") == "Q67890" + + def test_none_input(self): + """Should handle None input.""" + assert extract_wikidata_id(None) is None + + def test_invalid_format(self): + """Should return None for invalid format.""" + assert extract_wikidata_id("not a wikidata id") is None + assert extract_wikidata_id("schema:Thing") is None + + +# ============================================================================= +# Test: EnumValueInfo +# ============================================================================= + + +class TestEnumValueInfo: + """Tests for EnumValueInfo dataclass.""" + + def test_basic_creation(self): + """Should create with minimal fields.""" + info = EnumValueInfo(name="TEST_VALUE") + assert info.name == "TEST_VALUE" + assert info.description is None + assert info.wikidata_id is None + assert info.synonyms == {} + assert info.all_synonyms_normalized == [] + + def test_full_creation(self): + """Should create with all fields.""" + info = EnumValueInfo( + name="MUSEUM", + description="A museum institution", + wikidata_id="Q33506", + synonyms={"nl": ["museum", "musea"], "de": ["Museum"]}, + all_synonyms_normalized=["museum", "musea"], + ) + assert info.name == "MUSEUM" + assert info.description == "A museum institution" + assert info.wikidata_id == "Q33506" + assert "nl" in info.synonyms + assert "museum" in info.all_synonyms_normalized + + +# ============================================================================= +# Test: OntologyMapper - Enum Loading +# ============================================================================= + + +class TestOntologyMapperLoading: + """Tests for OntologyMapper enum loading.""" + + def test_load_enum_from_temp_file(self, temp_mapper: OntologyMapper): + """Should load enum from temporary test file.""" + mapping = temp_mapper.load_enum("TestEnum") + assert mapping is not None + assert mapping.enum_name == "TestEnum" + assert len(mapping.values) == 3 + assert "VALUE_ONE" in mapping.values + assert "VALUE_TWO" in mapping.values + assert "VALUE_THREE" in mapping.values + + def test_load_nonexistent_enum(self, temp_mapper: OntologyMapper): + """Should return None for non-existent enum.""" + mapping = temp_mapper.load_enum("NonExistentEnum") + assert mapping is None + + def test_extract_wikidata_from_meaning(self, temp_mapper: OntologyMapper): + """Should extract Wikidata ID from meaning field.""" + mapping = temp_mapper.load_enum("TestEnum") + assert mapping is not None + value_one = mapping.values.get("VALUE_ONE") + assert value_one is not None + assert value_one.wikidata_id == "Q12345" + + def test_extract_synonyms_from_comments(self, temp_mapper: OntologyMapper): + """Should extract language-tagged synonyms from comments.""" + mapping = temp_mapper.load_enum("TestEnum") + assert mapping is not None + value_one = mapping.values.get("VALUE_ONE") + assert value_one is not None + # Check language-specific synonyms + assert "nl" in value_one.synonyms + assert "waarde een" in value_one.synonyms["nl"] + assert "de" in value_one.synonyms + assert "Wert eins" in value_one.synonyms["de"] + + def test_extract_comma_separated_from_comments(self, temp_mapper: OntologyMapper): + """Should extract comma-separated terms from comments.""" + mapping = temp_mapper.load_enum("TestEnum") + assert mapping is not None + value_two = mapping.values.get("VALUE_TWO") + assert value_two is not None + # Comma-separated terms should be in all_synonyms_normalized + assert "alpha" in value_two.all_synonyms_normalized + assert "beta" in value_two.all_synonyms_normalized + assert "gamma" in value_two.all_synonyms_normalized + + def test_load_real_custodian_type_enum(self, mapper: OntologyMapper): + """Should load real CustodianPrimaryTypeEnum from schema.""" + mapping = mapper.load_enum("CustodianPrimaryTypeEnum") + assert mapping is not None + assert len(mapping.values) >= 19 # GLAMORCUBESFIXPHDNT has 19 types + assert "MUSEUM" in mapping.values + assert "LIBRARY" in mapping.values + assert "ARCHIVE" in mapping.values + + def test_load_real_digital_platform_enum(self, mapper: OntologyMapper): + """Should load real DigitalPlatformTypeEnum from schema.""" + mapping = mapper.load_enum("DigitalPlatformTypeEnum") + assert mapping is not None + assert len(mapping.values) >= 50 # Should have many platform types + assert "VIRTUAL_MUSEUM" in mapping.values + + def test_load_all_enums(self, mapper: OntologyMapper): + """Should load all enum files from schema directory.""" + all_enums = mapper.load_all_enums() + assert len(all_enums) >= 10 # Should have many enums + # Check some expected enums + enum_names = list(all_enums.keys()) + assert "CustodianPrimaryTypeEnum" in enum_names + assert "DigitalPlatformTypeEnum" in enum_names + + +# ============================================================================= +# Test: OntologyMapper - Natural Language Matching +# ============================================================================= + + +class TestOntologyMapperMatching: + """Tests for OntologyMapper natural language matching.""" + + def test_exact_match(self, temp_mapper: OntologyMapper): + """Should match exact normalized text.""" + result = temp_mapper.match_natural_language("value one", "TestEnum") + assert result == "VALUE_ONE" + + def test_dutch_synonym_match(self, temp_mapper: OntologyMapper): + """Should match Dutch synonym from comments.""" + result = temp_mapper.match_natural_language("waarde een", "TestEnum") + assert result == "VALUE_ONE" + + def test_german_synonym_match(self, temp_mapper: OntologyMapper): + """Should match German synonym from comments.""" + result = temp_mapper.match_natural_language("Wert eins", "TestEnum") + assert result == "VALUE_ONE" + + def test_comma_term_match(self, temp_mapper: OntologyMapper): + """Should match comma-separated term.""" + result = temp_mapper.match_natural_language("alpha", "TestEnum") + assert result == "VALUE_TWO" + + def test_no_match(self, temp_mapper: OntologyMapper): + """Should return None when no match found.""" + result = temp_mapper.match_natural_language("xyz nonexistent", "TestEnum") + assert result is None + + def test_real_dutch_bibliotheek(self, mapper: OntologyMapper): + """Should match Dutch 'bibliotheek' to LIBRARY.""" + result = mapper.match_natural_language("bibliotheek", "CustodianPrimaryTypeEnum") + assert result == "LIBRARY" + + def test_real_dutch_bibliotheken(self, mapper: OntologyMapper): + """Should match Dutch plural 'bibliotheken' to LIBRARY (fuzzy).""" + result = mapper.match_natural_language("bibliotheken", "CustodianPrimaryTypeEnum") + assert result == "LIBRARY" + + def test_real_dutch_archief(self, mapper: OntologyMapper): + """Should match Dutch 'archief' to ARCHIVE.""" + result = mapper.match_natural_language("archief", "CustodianPrimaryTypeEnum") + assert result == "ARCHIVE" + + def test_real_dutch_virtueel_museum(self, mapper: OntologyMapper): + """Should match Dutch 'virtueel museum' to VIRTUAL_MUSEUM.""" + result = mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum") + assert result == "VIRTUAL_MUSEUM" + + def test_real_german_digitales_museum(self, mapper: OntologyMapper): + """Should match German 'Digitales Museum' to VIRTUAL_MUSEUM.""" + result = mapper.match_natural_language("Digitales Museum", "DigitalPlatformTypeEnum") + assert result == "VIRTUAL_MUSEUM" + + def test_real_spanish_museo_virtual(self, mapper: OntologyMapper): + """Should match Spanish 'museo virtual' to VIRTUAL_MUSEUM.""" + result = mapper.match_natural_language("museo virtual", "DigitalPlatformTypeEnum") + assert result == "VIRTUAL_MUSEUM" + + def test_case_insensitive(self, mapper: OntologyMapper): + """Should be case insensitive.""" + result1 = mapper.match_natural_language("MUSEUM", "CustodianPrimaryTypeEnum") + result2 = mapper.match_natural_language("museum", "CustodianPrimaryTypeEnum") + result3 = mapper.match_natural_language("Museum", "CustodianPrimaryTypeEnum") + assert result1 == result2 == result3 == "MUSEUM" + + +# ============================================================================= +# Test: OntologyMapper - Heritage Code Mapping +# ============================================================================= + + +class TestOntologyMapperHeritageCodes: + """Tests for heritage code mapping.""" + + def test_museum_code(self, mapper: OntologyMapper): + """Should map MUSEUM to M.""" + assert mapper.get_heritage_type_code("MUSEUM") == "M" + + def test_library_code(self, mapper: OntologyMapper): + """Should map LIBRARY to L.""" + assert mapper.get_heritage_type_code("LIBRARY") == "L" + + def test_archive_code(self, mapper: OntologyMapper): + """Should map ARCHIVE to A.""" + assert mapper.get_heritage_type_code("ARCHIVE") == "A" + + def test_gallery_code(self, mapper: OntologyMapper): + """Should map GALLERY to G.""" + assert mapper.get_heritage_type_code("GALLERY") == "G" + + def test_unknown_code(self, mapper: OntologyMapper): + """Should return None for unknown type.""" + assert mapper.get_heritage_type_code("UNKNOWN_TYPE") is None + + def test_get_full_mapping(self, mapper: OntologyMapper): + """Should return complete type-to-code mapping.""" + mapping = mapper.get_custodian_type_to_code_mapping() + assert len(mapping) == 19 # GLAMORCUBESFIXPHDNT has 19 types + assert mapping["MUSEUM"] == "M" + assert mapping["LIBRARY"] == "L" + assert mapping["ARCHIVE"] == "A" + assert mapping["GALLERY"] == "G" + # Check all expected codes are present + expected_codes = set("GLAMORCUBESFIXPHDNT") + actual_codes = set(mapping.values()) + assert actual_codes == expected_codes + + +# ============================================================================= +# Test: OntologyMapper - Caching +# ============================================================================= + + +class TestOntologyMapperCaching: + """Tests for caching behavior.""" + + def test_enum_is_cached(self, mapper: OntologyMapper): + """Should cache enum after first load.""" + # First load + mapping1 = mapper.load_enum("CustodianPrimaryTypeEnum") + assert mapping1 is not None + assert "CustodianPrimaryTypeEnum" in mapper._cache + + # Second load should return cached version + mapping2 = mapper.load_enum("CustodianPrimaryTypeEnum") + assert mapping1 is mapping2 # Same object + + def test_force_reload(self, mapper: OntologyMapper): + """Should reload when force_reload=True.""" + # First load + mapping1 = mapper.load_enum("CustodianPrimaryTypeEnum") + + # Force reload + mapping2 = mapper.load_enum("CustodianPrimaryTypeEnum", force_reload=True) + + # Should be different objects + assert mapping1 is not mapping2 + + def test_clear_cache(self, mapper: OntologyMapper): + """Should clear all cached enums.""" + # Load some enums + mapper.load_enum("CustodianPrimaryTypeEnum") + mapper.load_enum("DigitalPlatformTypeEnum") + assert len(mapper._cache) >= 2 + + # Clear cache + mapper.clear_cache() + assert len(mapper._cache) == 0 + assert len(mapper._file_mtimes) == 0 + + +# ============================================================================= +# Test: Convenience Functions +# ============================================================================= + + +class TestConvenienceFunctions: + """Tests for module-level convenience functions.""" + + @pytest.fixture(autouse=True) + def reset_singleton(self): + """Reset singleton before each test.""" + reset_ontology_mapper() + yield + reset_ontology_mapper() + + def test_match_custodian_type(self): + """Should match custodian type via convenience function.""" + assert match_custodian_type("museum") == "MUSEUM" + assert match_custodian_type("bibliotheek") == "LIBRARY" + assert match_custodian_type("archief") == "ARCHIVE" + + def test_match_digital_platform_type(self): + """Should match digital platform type via convenience function.""" + assert match_digital_platform_type("virtueel museum") == "VIRTUAL_MUSEUM" + + def test_match_museum_type(self): + """Should match museum type via convenience function.""" + # This tests against MuseumTypeEnum + result = match_museum_type("art museum") + # Result depends on what's in MuseumTypeEnum + assert result is None or isinstance(result, str) + + def test_get_heritage_code(self): + """Should get heritage code via convenience function.""" + assert get_heritage_code("MUSEUM") == "M" + assert get_heritage_code("LIBRARY") == "L" + assert get_heritage_code("ARCHIVE") == "A" + + def test_get_custodian_type_mapping(self): + """Should get full mapping via convenience function.""" + mapping = get_custodian_type_mapping() + assert len(mapping) == 19 + assert mapping["MUSEUM"] == "M" + + def test_get_ontology_mapper_singleton(self): + """Should return singleton instance.""" + mapper1 = get_ontology_mapper() + mapper2 = get_ontology_mapper() + assert mapper1 is mapper2 + + +# ============================================================================= +# Test: Role Category Keywords +# ============================================================================= + + +class TestRoleCategoryKeywords: + """Tests for role category keyword extraction.""" + + def test_get_role_keywords(self, mapper: OntologyMapper): + """Should extract role category keywords.""" + keywords = mapper.get_role_category_keywords() + # May return empty dict if StaffRole.yaml doesn't exist + assert isinstance(keywords, dict) + + def test_get_role_keywords_convenience(self): + """Should work via convenience function.""" + reset_ontology_mapper() + keywords = get_role_keywords() + assert isinstance(keywords, dict) + + +# ============================================================================= +# Test: Prompt Formatting +# ============================================================================= + + +class TestPromptFormatting: + """Tests for DSPy prompt formatting.""" + + def test_get_enum_values_for_prompt(self, mapper: OntologyMapper): + """Should format enum values for prompt injection.""" + prompt = mapper.get_enum_values_for_prompt("CustodianPrimaryTypeEnum", max_values=5) + assert "Valid values for CustodianPrimaryTypeEnum:" in prompt + assert "MUSEUM" in prompt or "LIBRARY" in prompt # At least some values + assert "... and" in prompt # Should indicate more values exist + + def test_get_valid_filter_values(self, mapper: OntologyMapper): + """Should return list of valid filter values.""" + values = mapper.get_valid_filter_values("CustodianPrimaryTypeEnum") + assert isinstance(values, list) + assert len(values) >= 19 + assert "MUSEUM" in values + assert "LIBRARY" in values + + +# ============================================================================= +# Test: GLAMORCUBESFIXPHDNT Codes Constant +# ============================================================================= + + +class TestGLAMORCUBESFIXPHDNTCodes: + """Tests for GLAMORCUBESFIXPHDNT_CODES constant.""" + + def test_all_codes_present(self): + """Should have all 19 codes in mnemonic.""" + expected = "GLAMORCUBESFIXPHDNT" + actual_codes = set(GLAMORCUBESFIXPHDNT_CODES.values()) + assert actual_codes == set(expected) + + def test_all_codes_single_letter(self): + """All codes should be single letters.""" + for type_name, code in GLAMORCUBESFIXPHDNT_CODES.items(): + assert len(code) == 1, f"{type_name} has non-single-letter code: {code}" + assert code.isalpha(), f"{type_name} has non-letter code: {code}" + assert code.isupper(), f"{type_name} has non-uppercase code: {code}" + + def test_code_count(self): + """Should have exactly 19 type-to-code mappings.""" + assert len(GLAMORCUBESFIXPHDNT_CODES) == 19 + + +# ============================================================================= +# Test: Similarity Function +# ============================================================================= + + +class TestSimilarityFunction: + """Tests for _simple_similarity method.""" + + def test_exact_match(self, mapper: OntologyMapper): + """Exact match should return 1.0.""" + score = mapper._simple_similarity("museum", "museum") + assert score == 1.0 + + def test_prefix_match(self, mapper: OntologyMapper): + """Prefix match should return high score.""" + # bibliotheek → bibliotheken (Dutch singular/plural) + score = mapper._simple_similarity("bibliotheek", "bibliotheken") + assert score >= 0.9 + + def test_stem_match(self, mapper: OntologyMapper): + """Shared stem should return good score.""" + # archief → archieven + score = mapper._simple_similarity("archief", "archieven") + assert score >= 0.85 + + def test_no_similarity(self, mapper: OntologyMapper): + """Completely different strings should return low score.""" + score = mapper._simple_similarity("museum", "xyz") + assert score < 0.5 + + def test_empty_string(self, mapper: OntologyMapper): + """Empty strings should return 0.0.""" + assert mapper._simple_similarity("", "museum") == 0.0 + assert mapper._simple_similarity("museum", "") == 0.0 + assert mapper._simple_similarity("", "") == 0.0 + + +# ============================================================================= +# Test: Integration with hybrid_retriever +# ============================================================================= + + +class TestHybridRetrieverIntegration: + """Tests verifying integration with hybrid_retriever.py.""" + + @pytest.fixture(autouse=True) + def reset(self): + """Reset singleton before each test.""" + reset_ontology_mapper() + yield + + def test_mapping_has_expected_format(self): + """Mapping should match expected format for hybrid_retriever.""" + mapping = get_custodian_type_mapping() + + # All keys should be uppercase enum values + for key in mapping: + assert key.isupper() or key == key.upper().replace("_", "_") + + # All values should be single uppercase letters + for value in mapping.values(): + assert len(value) == 1 + assert value.isupper() + + def test_heritage_code_returns_none_for_invalid(self): + """get_heritage_code should return None for invalid types.""" + assert get_heritage_code("INVALID_TYPE") is None + assert get_heritage_code("") is None + + def test_consistent_with_hardcoded_values(self): + """Dynamic mapping should match expected hardcoded values.""" + mapping = get_custodian_type_mapping() + + # These are the critical mappings that hybrid_retriever depends on + expected = { + "GALLERY": "G", + "LIBRARY": "L", + "ARCHIVE": "A", + "MUSEUM": "M", + "OFFICIAL_INSTITUTION": "O", + "RESEARCH_CENTER": "R", + "DIGITAL_PLATFORM": "D", + } + + for enum_val, code in expected.items(): + assert mapping.get(enum_val) == code, f"Mismatch for {enum_val}" + + +# ============================================================================= +# Test: Edge Cases +# ============================================================================= + + +class TestEdgeCases: + """Tests for edge cases and error handling.""" + + def test_match_empty_string(self, mapper: OntologyMapper): + """Should handle empty string input.""" + result = mapper.match_natural_language("", "CustodianPrimaryTypeEnum") + assert result is None + + def test_match_whitespace_only(self, mapper: OntologyMapper): + """Should handle whitespace-only input.""" + result = mapper.match_natural_language(" ", "CustodianPrimaryTypeEnum") + assert result is None + + def test_match_nonexistent_enum(self, mapper: OntologyMapper): + """Should return None for non-existent enum.""" + result = mapper.match_natural_language("museum", "NonExistentEnum") + assert result is None + + def test_load_malformed_yaml(self, temp_enum_dir: Path): + """Should handle malformed YAML gracefully.""" + enum_file = temp_enum_dir / "modules" / "enums" / "BrokenEnum.yaml" + with open(enum_file, "w") as f: + f.write("this is not: valid: yaml: content:") + + mapper = OntologyMapper(temp_enum_dir) + result = mapper.load_enum("BrokenEnum") + assert result is None + + def test_unicode_normalization(self, mapper: OntologyMapper): + """Should handle various unicode representations.""" + # e with combining acute accent vs precomposed é + result1 = mapper.match_natural_language("musée", "CustodianPrimaryTypeEnum") # precomposed + result2 = mapper.match_natural_language("musée", "CustodianPrimaryTypeEnum") # combining + # Both should normalize to "musee" and potentially match + assert result1 == result2 + + +# ============================================================================= +# Test: Language Detection +# ============================================================================= + + +class TestDetectTermLanguage: + """Tests for the detect_term_language function. + + This function uses a hybrid approach: + 1. Heritage-specific vocabulary for known heritage terms (highest priority) + 2. fast-langdetect library for general language detection (with confidence threshold) + 3. English default for multi-word phrases without clear indicators + + The heritage vocabulary focuses on terms that general-purpose language + detectors often misclassify (e.g., "musea" as Italian instead of Dutch). + """ + + def test_detect_dutch_museum_terms(self): + """Dutch museum-related terms in heritage vocabulary should be 'nl'.""" + # "musea" is in heritage vocabulary - fast-langdetect often misclassifies it + assert detect_term_language("musea") == "nl" + # "museum" is generic - depends on fast-langdetect (en/nl/de all valid) + result = detect_term_language("museum") + assert result in ("nl", "de", "en") # Accept any valid detection + + def test_detect_dutch_library_terms(self): + """Dutch library terms should be detected as 'nl'.""" + assert detect_term_language("bibliotheken") == "nl" + assert detect_term_language("bibliotheek") == "nl" + # Multi-word terms without English indicators default to heritage vocab match + assert detect_term_language("openbare bibliotheek") in ("nl", "en") + + def test_detect_dutch_archive_terms(self): + """Dutch archive terms should be detected as 'nl'.""" + assert detect_term_language("archieven") == "nl" + assert detect_term_language("archief") == "nl" + # "nationaal" triggers heritage vocab match for Dutch + assert detect_term_language("nationaal archief") in ("nl", "en") # "national" may trigger English + # Compound terms use prefix matching + assert detect_term_language("gemeentearchief") in ("nl", None) + + def test_detect_french_terms(self): + """French heritage terms with diacritics should be detected as 'fr'.""" + # Terms with diacritics are reliably detected by fast-langdetect + assert detect_term_language("musées") == "fr" + assert detect_term_language("musée") == "fr" + assert detect_term_language("bibliothèques") == "fr" + assert detect_term_language("bibliothèque") == "fr" + # "archives" without diacritics is ambiguous (French/English) + result = detect_term_language("archives") + assert result in ("fr", "en") + # Diacritics provide clear French signal + result = detect_term_language("société historique") + assert result in ("fr", "en") # "historique" detected by fast-langdetect + + def test_detect_spanish_terms(self): + """Spanish heritage terms should be detected as 'es'.""" + # "museos" is in heritage vocabulary + result = detect_term_language("museos") + assert result in ("es", None) # May not match if not in reduced vocab + # "bibliotecas" and "archivos" are in heritage vocabulary + assert detect_term_language("bibliotecas") in ("es", "pt") # Shared term + assert detect_term_language("archivos") == "es" + + def test_detect_german_terms(self): + """German heritage terms should be detected as 'de'.""" + assert detect_term_language("museen") == "de" + # "bibliothek" may match Dutch vocabulary first due to prefix matching + result = detect_term_language("bibliothek") + assert result in ("de", "nl") # Both have similar terms + assert detect_term_language("archiv") == "de" + assert detect_term_language("sammlung") == "de" + + def test_detect_english_terms(self): + """English heritage terms should be detected as 'en'.""" + assert detect_term_language("museums") == "en" + assert detect_term_language("libraries") == "en" + assert detect_term_language("gallery") == "en" + assert detect_term_language("national library") == "en" + assert detect_term_language("public archives") == "en" + + def test_detect_italian_terms(self): + """Italian heritage terms should be detected as 'it'.""" + assert detect_term_language("musei") == "it" + assert detect_term_language("biblioteche") == "it" + assert detect_term_language("archivi") == "it" + + def test_detect_portuguese_terms(self): + """Portuguese heritage terms should be detected as 'pt'.""" + assert detect_term_language("museus") == "pt" + assert detect_term_language("bibliotecas") in ("pt", "es") # Shared term + assert detect_term_language("arquivos") == "pt" + + def test_unknown_term_returns_none(self): + """Unknown single-word terms should return None.""" + assert detect_term_language("xyz123") is None + assert detect_term_language("asdfghjkl") is None + + def test_empty_string_defaults_to_english(self): + """Empty string should return English as default.""" + assert detect_term_language("") == "en" + + def test_whitespace_only_defaults_to_english(self): + """Whitespace-only input should return English as default.""" + assert detect_term_language(" ") == "en" + + def test_case_insensitive_detection(self): + """Detection should be case-insensitive.""" + assert detect_term_language("MUSEA") == "nl" + assert detect_term_language("Musées") == "fr" + # "MUSEOS" relies on fast-langdetect after heritage vocab check + result = detect_term_language("MUSEOS") + assert result in ("es", None) + assert detect_term_language("Libraries") == "en" + + def test_compound_dutch_terms(self): + """Compound Dutch terms should be detected via heritage vocabulary or prefix matching.""" + # "rijks" is in heritage vocabulary as prefix + assert detect_term_language("rijksmuseum") in ("nl", None) + # "gemeente" matches via prefix with "gemeentelijk" + assert detect_term_language("gemeentearchief") in ("nl", None) + + def test_priority_when_ambiguous(self): + """Heritage vocabulary takes precedence for known terms. + + When a term is in heritage vocabulary, that language is returned. + For terms not in vocabulary, fast-langdetect determines the result. + """ + # "archiv" is in German heritage vocabulary + assert detect_term_language("archiv") == "de" + + # "museum" is not in heritage vocabulary (too ambiguous) + # fast-langdetect will classify it + result = detect_term_language("museum") + assert result in ("nl", "de", "en") + + # "musea" is specifically in Dutch heritage vocabulary + assert detect_term_language("musea") == "nl" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])