""" Template-Based SPARQL Query Generation System This module implements a template-based approach to SPARQL query generation, replacing error-prone LLM-generated queries with deterministic, validated templates. Architecture (CRITICAL ORDERING): ================================= 1. ConversationContextResolver (DSPy) - Resolves elliptical follow-ups FIRST "En in Enschede?" → "Welke archieven zijn er in Enschede?" 2. FykeFilter (DSPy) - Filters irrelevant questions on RESOLVED input ⚠️ MUST operate on resolved question, not raw input! 3. TemplateClassifier (DSPy) - Matches to SPARQL template 4. SlotExtractor (DSPy) - Extracts slot values with synonym resolution 5. TemplateInstantiator (Jinja2) - Renders final SPARQL query Based on: - docs/plan/prompt-query_template_mapping/ - Formica et al. (2023) - Template-based SPARQL achieves 65% precision vs 10% LLM-only - DSPy 2.6+ GEPA optimization Author: OpenCode Created: 2025-01-06 """ from __future__ import annotations import json import logging import re from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Literal, Optional import dspy import numpy as np from dspy import History from jinja2 import Environment, BaseLoader from pydantic import BaseModel, Field from rapidfuzz import fuzz, process logger = logging.getLogger(__name__) # ============================================================================= # CONFIGURATION # ============================================================================= # Lazy-loaded sentence transformer model _embedding_model = None _embedding_model_name = "paraphrase-multilingual-MiniLM-L12-v2" # Multilingual, 384-dim def _get_embedding_model(): """Lazy-load the sentence transformer model.""" global _embedding_model if _embedding_model is None: try: from sentence_transformers import SentenceTransformer logger.info(f"Loading embedding model: {_embedding_model_name}") _embedding_model = SentenceTransformer(_embedding_model_name) logger.info("Embedding model loaded successfully") except ImportError: logger.warning("sentence-transformers not installed, embedding matching disabled") return None except Exception as e: logger.warning(f"Failed to load embedding model: {e}") return None return _embedding_model def _find_data_path(filename: str) -> Path: """Find data file in multiple possible locations. Supports both local development (backend/rag/ → data/) and server deployment (e.g., /opt/glam-backend/rag/data/). """ # Try relative to module location (local dev: backend/rag → glam/data) module_dir = Path(__file__).parent candidates = [ module_dir.parent.parent / "data" / filename, # Local: glam/data/ module_dir / "data" / filename, # Server: rag/data/ Path("/opt/glam-backend/rag/data") / filename, # Server explicit path ] for candidate in candidates: if candidate.exists(): return candidate # Return first candidate (will report as missing in logs) return candidates[0] TEMPLATES_PATH = _find_data_path("sparql_templates.yaml") VALIDATION_RULES_PATH = _find_data_path("validation/sparql_validation_rules.json") # Standard SPARQL prefixes SPARQL_PREFIXES = """PREFIX hc: PREFIX crm: PREFIX schema: PREFIX skos: PREFIX org: PREFIX foaf: PREFIX dcterms: PREFIX xsd: PREFIX wd: """ # ============================================================================= # PYDANTIC MODELS # ============================================================================= class SlotType(str, Enum): """Types of template slots.""" INSTITUTION_TYPE = "institution_type" SUBREGION = "subregion" COUNTRY = "country" CITY = "city" INSTITUTION_NAME = "institution_name" BUDGET_CATEGORY = "budget_category" STRING = "string" INTEGER = "integer" DECIMAL = "decimal" class SlotDefinition(BaseModel): """Definition of a template slot.""" type: SlotType required: bool = True default: Optional[str] = None examples: list[str] = Field(default_factory=list) fallback_types: list[SlotType] = Field(default_factory=list) valid_values: list[str] = Field(default_factory=list) class TemplateDefinition(BaseModel): """Definition of a SPARQL query template.""" id: str description: str intent: list[str] question_patterns: list[str] slots: dict[str, SlotDefinition] sparql_template: str sparql_template_alt: Optional[str] = None sparql_template_region: Optional[str] = None sparql_template_country: Optional[str] = None sparql_template_isil: Optional[str] = None sparql_template_ghcid: Optional[str] = None examples: list[dict[str, Any]] = Field(default_factory=list) class FollowUpPattern(BaseModel): """Definition of a follow-up question pattern.""" description: str patterns: list[str] slot_inheritance: list[str] = Field(default_factory=list) transforms_to: Optional[str] = None resolution_strategy: str requires_previous_results: bool = False class FykeFilterConfig(BaseModel): """Configuration for the Fyke filter.""" out_of_scope_keywords: list[str] out_of_scope_categories: list[str] heritage_keywords: list[str] standard_response: dict[str, str] class ConversationTurn(BaseModel): """A single turn in conversation history.""" role: Literal["user", "assistant"] content: str resolved_question: Optional[str] = None template_id: Optional[str] = None slots: dict[str, str] = Field(default_factory=dict) results: list[dict[str, Any]] = Field(default_factory=list) class ConversationState(BaseModel): """State tracking across conversation turns.""" turns: list[ConversationTurn] = Field(default_factory=list) current_slots: dict[str, str] = Field(default_factory=dict) current_template_id: Optional[str] = None language: str = "nl" def add_turn(self, turn: ConversationTurn) -> None: """Add a turn and update current state.""" self.turns.append(turn) if turn.role == "user" and turn.slots: # Inherit slots from user turns self.current_slots.update(turn.slots) if turn.template_id: self.current_template_id = turn.template_id def get_previous_user_turn(self) -> Optional[ConversationTurn]: """Get the most recent user turn.""" for turn in reversed(self.turns): if turn.role == "user": return turn return None def to_dspy_history(self) -> History: """Convert to DSPy History object.""" messages = [] for turn in self.turns[-6:]: # Keep last 6 turns for context messages.append({ "role": turn.role, "content": turn.resolved_question or turn.content }) return History(messages=messages) class TemplateMatchResult(BaseModel): """Result of template matching.""" matched: bool template_id: Optional[str] = None confidence: float = 0.0 slots: dict[str, str] = Field(default_factory=dict) sparql: Optional[str] = None reasoning: str = "" class ResolvedQuestion(BaseModel): """Result of conversation context resolution.""" original: str resolved: str is_follow_up: bool = False follow_up_type: Optional[str] = None inherited_slots: dict[str, str] = Field(default_factory=dict) confidence: float = 1.0 class FykeResult(BaseModel): """Result of Fyke filter.""" is_relevant: bool confidence: float reasoning: str standard_response: Optional[str] = None # ============================================================================= # SYNONYM MAPPINGS (loaded from validation rules) # ============================================================================= class SynonymResolver: """Resolves natural language terms to canonical slot values.""" def __init__(self): self._institution_types: dict[str, str] = {} self._subregions: dict[str, str] = {} self._countries: dict[str, str] = {} self._cities: set[str] = set() self._budget_categories: dict[str, str] = {} self._loaded = False def load(self) -> None: """Load synonym mappings from validation rules and templates.""" if self._loaded: return # Load from validation rules if VALIDATION_RULES_PATH.exists(): try: with open(VALIDATION_RULES_PATH) as f: rules = json.load(f) # Institution type mappings if "institution_type_mappings" in rules: for k, v in rules["institution_type_mappings"].items(): self._institution_types[k.lower()] = v # Subregion mappings if "subregion_mappings" in rules: for k, v in rules["subregion_mappings"].items(): self._subregions[k.lower()] = v # Country mappings if "country_mappings" in rules: for k, v in rules["country_mappings"].items(): self._countries[k.lower()] = v except Exception as e: logger.warning(f"Failed to load validation rules: {e}") # Load additional synonyms from templates YAML if TEMPLATES_PATH.exists(): try: import yaml with open(TEMPLATES_PATH) as f: templates = yaml.safe_load(f) slot_types = templates.get("_slot_types", {}) # Institution type synonyms inst_synonyms = slot_types.get("institution_type", {}).get("synonyms", {}) for k, v in inst_synonyms.items(): self._institution_types[k.lower().replace("_", " ")] = v # Subregion synonyms region_synonyms = slot_types.get("subregion", {}).get("synonyms", {}) for k, v in region_synonyms.items(): self._subregions[k.lower().replace("_", " ")] = v # Country synonyms country_synonyms = slot_types.get("country", {}).get("synonyms", {}) for k, v in country_synonyms.items(): self._countries[k.lower().replace("_", " ")] = v # Budget category synonyms budget_synonyms = slot_types.get("budget_category", {}).get("synonyms", {}) for k, v in budget_synonyms.items(): self._budget_categories[k.lower().replace("_", " ")] = v except Exception as e: logger.warning(f"Failed to load template synonyms: {e}") # Add common Dutch institution type synonyms dutch_types = { "museum": "M", "musea": "M", "museums": "M", "bibliotheek": "L", "bibliotheken": "L", "library": "L", "libraries": "L", "archief": "A", "archieven": "A", "archive": "A", "archives": "A", "galerie": "G", "galerij": "G", "galerijen": "G", "gallery": "G", "galleries": "G", } for k, v in dutch_types.items(): if k not in self._institution_types: self._institution_types[k] = v self._loaded = True logger.info(f"Loaded {len(self._institution_types)} institution types, " f"{len(self._subregions)} subregions, {len(self._countries)} countries") def resolve_institution_type(self, term: str) -> Optional[str]: """Resolve institution type term to single-letter code.""" self.load() term_lower = term.lower().strip() # Direct match if term_lower in self._institution_types: return self._institution_types[term_lower] # Already a valid code if term.upper() in "MLAGORCUBESFIXPHDNT": return term.upper() # Fuzzy match if self._institution_types: match = process.extractOne( term_lower, list(self._institution_types.keys()), scorer=fuzz.ratio, score_cutoff=80 ) if match: return self._institution_types[match[0]] return None def resolve_subregion(self, term: str) -> Optional[str]: """Resolve subregion term to ISO 3166-2 code.""" self.load() term_lower = term.lower().strip() # Direct match if term_lower in self._subregions: return self._subregions[term_lower] # Already a valid code (e.g., NL-NH) if re.match(r'^[A-Z]{2}-[A-Z]{2,3}$', term.upper()): return term.upper() # Fuzzy match if self._subregions: match = process.extractOne( term_lower, list(self._subregions.keys()), scorer=fuzz.ratio, score_cutoff=75 ) if match: return self._subregions[match[0]] return None def resolve_country(self, term: str) -> Optional[str]: """Resolve country term to Wikidata Q-number.""" self.load() term_lower = term.lower().strip() # Direct match if term_lower in self._countries: return self._countries[term_lower] # Already a Q-number if re.match(r'^Q\d+$', term): return term # Fuzzy match if self._countries: match = process.extractOne( term_lower, list(self._countries.keys()), scorer=fuzz.ratio, score_cutoff=80 ) if match: return self._countries[match[0]] return None def resolve_city(self, term: str) -> str: """Normalize city name (title case, common corrections).""" # Common Dutch city name corrections corrections = { "den haag": "Den Haag", "the hague": "Den Haag", "'s-gravenhage": "Den Haag", "s-gravenhage": "Den Haag", "'s-hertogenbosch": "'s-Hertogenbosch", "den bosch": "'s-Hertogenbosch", } term_lower = term.lower().strip() if term_lower in corrections: return corrections[term_lower] # Title case with Dutch article handling if term_lower.startswith("'s-"): return "'" + term[1:2] + "-" + term[3:].title() return term.title() def resolve_budget_category(self, term: str) -> Optional[str]: """Resolve budget category term to canonical slot name. Args: term: Budget category term (e.g., "innovatie", "digitalisering", "innovation budget") Returns: Canonical budget category slot name (e.g., "innovation", "digitization") or None """ self.load() # Normalize: lowercase and replace underscores with spaces (consistent with synonym loading) term_normalized = term.lower().strip().replace("_", " ") # Direct match from synonyms if term_normalized in self._budget_categories: return self._budget_categories[term_normalized] # Already a valid category valid_categories = [ "innovation", "digitization", "preservation", "personnel", "acquisition", "operating", "capital", "external_funding", "internal_funding", "endowment_draw" ] if term_normalized in valid_categories: return term_normalized # Fuzzy match against loaded synonyms if self._budget_categories: match = process.extractOne( term_normalized, list(self._budget_categories.keys()), scorer=fuzz.ratio, score_cutoff=75 ) if match: return self._budget_categories[match[0]] return None def is_region(self, term: str) -> bool: """Check if a term is a known region/province name. This is used to disambiguate between city and region patterns when both would match the same question structure. Args: term: Location term to check (e.g., "Noord-Holland", "Amsterdam") Returns: True if the term resolves to a known region, False otherwise """ self.load() term_lower = term.lower().strip() # Check if it's in our subregions mapping if term_lower in self._subregions: return True # Check if it's already a valid ISO code if re.match(r'^[A-Z]{2}-[A-Z]{2,3}$', term.upper()): return True # Fuzzy match with high threshold to avoid false positives if self._subregions: match = process.extractOne( term_lower, list(self._subregions.keys()), scorer=fuzz.ratio, score_cutoff=85 # Higher threshold than resolve_subregion ) if match: return True return False # Global synonym resolver instance _synonym_resolver: Optional[SynonymResolver] = None def get_synonym_resolver() -> SynonymResolver: """Get or create the global synonym resolver.""" global _synonym_resolver if _synonym_resolver is None: _synonym_resolver = SynonymResolver() return _synonym_resolver # ============================================================================= # DSPy SIGNATURES # ============================================================================= class ConversationContextSignature(dspy.Signature): """Resolve elliptical follow-up questions using conversation history. CRITICAL: This module runs FIRST, before any filtering or classification. It expands short follow-up questions into complete, self-contained questions. Examples of resolution: - Previous: "Welke archieven zijn er in Den Haag?" Current: "En in Enschede?" Resolved: "Welke archieven zijn er in Enschede?" - Previous: "Welke musea zijn er in Amsterdam?" Current: "Hoeveel?" Resolved: "Hoeveel musea zijn er in Amsterdam?" - Previous: Listed 5 museums Current: "Vertel me meer over de eerste" Resolved: "Vertel me meer over [first museum name]" The resolved question must be a complete, standalone question that would make sense without any conversation history. """ question: str = dspy.InputField( desc="Current user question (may be elliptical follow-up)" ) history: History = dspy.InputField( desc="Previous conversation turns", default=History(messages=[]) ) previous_slots: str = dspy.InputField( desc="JSON string of slot values from previous query (e.g., {\"institution_type\": \"A\", \"city\": \"Den Haag\"})", default="{}" ) previous_results_summary: str = dspy.InputField( desc="Brief summary of previous query results for ordinal/pronoun resolution", default="" ) resolved_question: str = dspy.OutputField( desc="Fully expanded, self-contained question. If not a follow-up, return original question unchanged." ) is_follow_up: bool = dspy.OutputField( desc="True if this was an elliptical follow-up that needed resolution" ) follow_up_type: str = dspy.OutputField( desc="Type of follow-up: 'location_swap', 'type_swap', 'count_from_list', 'details_request', 'ordinal_reference', 'pronoun_reference', or 'none'" ) inherited_slots_json: str = dspy.OutputField( desc="JSON string of slots inherited from previous query (e.g., {\"institution_type\": \"A\"})" ) confidence: float = dspy.OutputField( desc="Confidence in resolution (0.0-1.0)" ) class FykeFilterSignature(dspy.Signature): """Determine if a RESOLVED question is relevant to heritage institutions. CRITICAL: This filter operates on the RESOLVED question from ConversationContextResolver, NOT the raw user input. This prevents false positives on short follow-ups like "En in Enschede?" which resolves to "Welke archieven zijn er in Enschede?" Heritage institutions include: - Museums (musea) - Archives (archieven) - Libraries (bibliotheken) - Galleries (galerijen) - Heritage societies - Cultural institutions - Collections Questions about these topics are RELEVANT. Questions about shopping, weather, sports, restaurants, etc. are NOT relevant. When in doubt, err on the side of relevance (return True). """ resolved_question: str = dspy.InputField( desc="The fully resolved question (after context resolution)" ) conversation_topic: str = dspy.InputField( desc="Brief summary of what the conversation has been about so far", default="heritage institutions" ) is_relevant: bool = dspy.OutputField( desc="True if question is about heritage institutions, False otherwise" ) confidence: float = dspy.OutputField( desc="Confidence in relevance classification (0.0-1.0)" ) reasoning: str = dspy.OutputField( desc="Brief explanation of why question is/isn't relevant" ) class TemplateClassifierSignature(dspy.Signature): """Classify a heritage question to match it with a SPARQL template. Given a resolved question about heritage institutions, determine which SPARQL query template best matches the user's intent. Available template IDs (return the EXACT ID string, not the number): - list_institutions_by_type_city: List institutions of type X in city Y - list_institutions_by_type_region: List institutions of type X in region Y (province/state) - list_institutions_by_type_country: List institutions of type X in country Y - count_institutions_by_type_location: Count institutions of type X in location Y - count_institutions_by_type: Count all institutions grouped by type - find_institution_by_name: Find specific institution by name - list_all_institutions_in_city: List all institutions in city Y - find_institutions_by_founding_date: Find oldest/newest institutions - find_institution_by_identifier: Find by ISIL/GHCID - compare_locations: Compare institutions between locations - find_custodians_by_budget_threshold: Find custodians with budget category above/below threshold (e.g., "Which custodians spend more than 5000 euros on innovation?") - none: No template matches (fall back to LLM generation) CRITICAL DISAMBIGUATION - Province vs City: Some Dutch locations are BOTH a province AND a city with the same name. When the location name alone is used (without "stad" or "de stad"), DEFAULT TO PROVINCE (use list_institutions_by_type_region): - Groningen → province (NL-GR), NOT the city - Utrecht → province (NL-UT), NOT the city - Limburg → province (NL-LI), NOT the city - Friesland/Frisia → province (NL-FR) - Drenthe → province (NL-DR) - Gelderland → province (NL-GE) - Overijssel → province (NL-OV) - Flevoland → province (NL-FL) - Zeeland → province (NL-ZE) - Noord-Holland → province (NL-NH) - Zuid-Holland → province (NL-ZH) - Noord-Brabant → province (NL-NB) Use list_institutions_by_type_city ONLY when: - The question explicitly says "de stad" or "in de stad" (the city) - The location is clearly just a city (e.g., Amsterdam, Rotterdam, Den Haag) IMPORTANT: Return the template ID string exactly as shown (e.g. "count_institutions_by_type_location"), NOT a number. Return "none" if no template matches well. """ question: str = dspy.InputField( desc="Resolved natural language question about heritage institutions" ) language: str = dspy.InputField( desc="Language code: nl, en, de, fr", default="nl" ) template_id: str = dspy.OutputField( desc="EXACT template ID string from the list above (e.g. 'count_institutions_by_type_location'), or 'none'" ) confidence: float = dspy.OutputField( desc="Confidence in template match (0.0-1.0). Return 'none' as template_id if below 0.6" ) reasoning: str = dspy.OutputField( desc="Brief explanation of why this template matches" ) class SlotExtractorSignature(dspy.Signature): """Extract slot values from a question for a specific template. Given a question and the template it matched, extract the values needed to fill in the template's slots. Slot types and expected formats: - institution_type: Return single-letter code (M, L, A, G, O, R, C, U, B, E, S, F, I, X, P, H, D, N, T) Examples: "musea" → "M", "archieven" → "A", "bibliotheken" → "L" - city: Return city name with proper capitalization Examples: "amsterdam" → "Amsterdam", "den haag" → "Den Haag" - region: Return ISO 3166-2 code Examples: "Noord-Holland" → "NL-NH", "Gelderland" → "NL-GE" - country: Return Wikidata Q-number Examples: "Nederland" → "Q55", "Belgium" → "Q31" - institution_name: Return the institution name as mentioned - limit: Return integer (default 10) Return slots as a JSON object with slot names as keys. """ question: str = dspy.InputField( desc="The user's question" ) template_id: str = dspy.InputField( desc="ID of the matched template" ) required_slots: str = dspy.InputField( desc="Comma-separated list of required slot names for this template" ) inherited_slots: str = dspy.InputField( desc="JSON string of slots inherited from conversation context", default="{}" ) slots_json: str = dspy.OutputField( desc="JSON object with extracted slot values, e.g., {\"institution_type\": \"M\", \"city\": \"Amsterdam\"}" ) extraction_notes: str = dspy.OutputField( desc="Notes about any slots that couldn't be extracted or needed inference" ) # ============================================================================= # DSPy MODULES # ============================================================================= class ConversationContextResolver(dspy.Module): """Resolves elliptical follow-up questions using conversation history. CRITICAL: This module MUST run FIRST in the pipeline, before FykeFilter. It expands short follow-ups like "En in Enschede?" into complete questions like "Welke archieven zijn er in Enschede?" so that subsequent modules can properly understand the user's intent. """ def __init__(self): super().__init__() self.resolve = dspy.ChainOfThought(ConversationContextSignature) def forward( self, question: str, conversation_state: Optional[ConversationState] = None, ) -> ResolvedQuestion: """Resolve a potentially elliptical question. Args: question: Current user question (may be elliptical) conversation_state: Full conversation state with history Returns: ResolvedQuestion with expanded question and metadata """ # If no conversation history, return as-is if conversation_state is None or not conversation_state.turns: return ResolvedQuestion( original=question, resolved=question, is_follow_up=False, confidence=1.0 ) # Prepare inputs for DSPy history = conversation_state.to_dspy_history() previous_slots = json.dumps(conversation_state.current_slots) # Get previous results summary if available prev_turn = conversation_state.get_previous_user_turn() results_summary = "" if prev_turn and prev_turn.results: results_summary = ", ".join( r.get("name", str(r))[:50] for r in prev_turn.results[:5] ) try: result = self.resolve( question=question, history=history, previous_slots=previous_slots, previous_results_summary=results_summary ) # Parse inherited slots try: inherited = json.loads(result.inherited_slots_json) except (json.JSONDecodeError, TypeError): inherited = {} return ResolvedQuestion( original=question, resolved=result.resolved_question, is_follow_up=result.is_follow_up, follow_up_type=result.follow_up_type if result.follow_up_type != "none" else None, inherited_slots=inherited, confidence=result.confidence ) except Exception as e: logger.warning(f"Context resolution failed: {e}, returning original") return ResolvedQuestion( original=question, resolved=question, is_follow_up=False, confidence=0.5 ) class FykeFilter(dspy.Module): """Filters out irrelevant questions with standard response. CRITICAL: Must operate on RESOLVED question from ConversationContextResolver, not the raw user input. This prevents false positives on follow-ups. Named after Dutch "fuik" (fish trap) - catches irrelevant questions. """ def __init__(self, config: Optional[FykeFilterConfig] = None): super().__init__() self.classify = dspy.ChainOfThought(FykeFilterSignature) self.config = config or self._load_config() def _load_config(self) -> FykeFilterConfig: """Load Fyke configuration from templates YAML.""" default_config = FykeFilterConfig( out_of_scope_keywords=[ "tandpasta", "toothpaste", "supermarkt", "restaurant", "hotel", "weer", "weather", "voetbal", "soccer" ], out_of_scope_categories=["shopping", "sports", "cooking"], heritage_keywords=[ "museum", "musea", "archief", "archieven", "bibliotheek", "galerie", "erfgoed", "heritage", "collectie", "collection" ], standard_response={ "nl": "Ik kan je helpen met vragen over erfgoedinstellingen zoals musea, archieven, bibliotheken en galerijen.", "en": "I can help you with questions about heritage institutions such as museums, archives, libraries and galleries.", "de": "Ich kann Ihnen bei Fragen zu Kulturerbeinstitutionen wie Museen, Archiven und Bibliotheken helfen.", "fr": "Je peux vous aider avec des questions sur les institutions patrimoniales." } ) if TEMPLATES_PATH.exists(): try: import yaml with open(TEMPLATES_PATH) as f: templates = yaml.safe_load(f) fyke_config = templates.get("fyke_filter", {}) return FykeFilterConfig( out_of_scope_keywords=fyke_config.get("out_of_scope_keywords", default_config.out_of_scope_keywords), out_of_scope_categories=fyke_config.get("out_of_scope_categories", default_config.out_of_scope_categories), heritage_keywords=fyke_config.get("heritage_keywords", default_config.heritage_keywords), standard_response=fyke_config.get("standard_response", default_config.standard_response) ) except Exception as e: logger.warning(f"Failed to load Fyke config: {e}") return default_config def forward( self, resolved_question: str, conversation_topic: str = "heritage institutions", language: str = "nl" ) -> FykeResult: """Check if resolved question is relevant to heritage. Args: resolved_question: The RESOLVED question (not raw input!) conversation_topic: Summary of conversation so far language: Language code for standard response Returns: FykeResult with relevance decision """ question_lower = resolved_question.lower() # Quick check: obvious heritage keywords → definitely relevant for keyword in self.config.heritage_keywords: if keyword in question_lower: return FykeResult( is_relevant=True, confidence=0.95, reasoning=f"Contains heritage keyword: {keyword}" ) # Quick check: obvious out-of-scope keywords → definitely irrelevant for keyword in self.config.out_of_scope_keywords: if keyword in question_lower: return FykeResult( is_relevant=False, confidence=0.95, reasoning=f"Contains out-of-scope keyword: {keyword}", standard_response=self.config.standard_response.get( language, self.config.standard_response.get("en") ) ) # Use DSPy for ambiguous cases try: result = self.classify( resolved_question=resolved_question, conversation_topic=conversation_topic ) return FykeResult( is_relevant=result.is_relevant, confidence=result.confidence, reasoning=result.reasoning, standard_response=None if result.is_relevant else self.config.standard_response.get( language, self.config.standard_response.get("en") ) ) except Exception as e: logger.warning(f"Fyke classification failed: {e}, assuming relevant") # Err on side of relevance return FykeResult( is_relevant=True, confidence=0.5, reasoning=f"Classification failed, assuming relevant: {e}" ) # ============================================================================= # EMBEDDING-BASED TEMPLATE MATCHING # ============================================================================= class TemplateEmbeddingMatcher: """Matches questions to templates using semantic embeddings. Uses sentence-transformers to compute embeddings for template patterns and find the best match for incoming questions based on cosine similarity. This provides semantic matching that can handle: - Paraphrases ("Welke musea..." vs "Zijn er musea die...") - Synonyms ("instellingen" vs "organisaties") - Different word orders - Multilingual queries (Dutch, English, German) """ _instance = None _pattern_embeddings: Optional[np.ndarray] = None _pattern_template_ids: Optional[list[str]] = None _pattern_texts: Optional[list[str]] = None def __new__(cls): """Singleton pattern - embeddings are expensive to compute.""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def _ensure_embeddings_computed(self, templates: dict[str, "TemplateDefinition"]) -> bool: """Compute and cache embeddings for all template patterns. Returns: True if embeddings are available, False otherwise """ if self._pattern_embeddings is not None: return True model = _get_embedding_model() if model is None: return False # Collect all patterns with their template IDs pattern_texts = [] template_ids = [] for template_id, template_def in templates.items(): for pattern in template_def.question_patterns: # Normalize pattern: replace {slot} with generic placeholder normalized = re.sub(r'\{[^}]+\}', '[VALUE]', pattern) pattern_texts.append(normalized) template_ids.append(template_id) if not pattern_texts: logger.warning("No patterns found for embedding computation") return False # Compute embeddings for all patterns logger.info(f"Computing embeddings for {len(pattern_texts)} template patterns...") try: embeddings = model.encode(pattern_texts, convert_to_numpy=True, show_progress_bar=False) self._pattern_embeddings = embeddings self._pattern_template_ids = template_ids self._pattern_texts = pattern_texts logger.info(f"Computed {len(embeddings)} pattern embeddings (dim={embeddings.shape[1]})") return True except Exception as e: logger.warning(f"Failed to compute pattern embeddings: {e}") return False def match( self, question: str, templates: dict[str, "TemplateDefinition"], min_similarity: float = 0.70 ) -> Optional["TemplateMatchResult"]: """Find best matching template using embedding similarity. Args: question: Natural language question templates: Dictionary of template definitions min_similarity: Minimum cosine similarity threshold (0-1) Returns: TemplateMatchResult if similarity >= threshold, None otherwise """ if not self._ensure_embeddings_computed(templates): return None model = _get_embedding_model() if model is None: return None # Normalize question: replace numbers with placeholder normalized_question = re.sub(r'\d+(?:[.,]\d+)?', '[VALUE]', question) # Compute question embedding try: question_embedding = model.encode([normalized_question], convert_to_numpy=True)[0] except Exception as e: logger.warning(f"Failed to compute question embedding: {e}") return None # Guard against None embeddings (should not happen after _ensure_embeddings_computed) if self._pattern_embeddings is None or self._pattern_template_ids is None or self._pattern_texts is None: return None # Compute cosine similarities # Normalize vectors for cosine similarity question_norm = question_embedding / np.linalg.norm(question_embedding) pattern_norms = self._pattern_embeddings / np.linalg.norm(self._pattern_embeddings, axis=1, keepdims=True) similarities = np.dot(pattern_norms, question_norm) # Find best match best_idx = int(np.argmax(similarities)) best_similarity = float(similarities[best_idx]) if best_similarity < min_similarity: logger.debug(f"Best embedding similarity {best_similarity:.3f} below threshold {min_similarity}") return None best_template_id = self._pattern_template_ids[best_idx] best_pattern = self._pattern_texts[best_idx] # Scale similarity to confidence (0.70 → 0.70, 0.85 → 0.85, etc.) confidence = best_similarity logger.info(f"Embedding match found: template='{best_template_id}', similarity={best_similarity:.3f}, pattern='{best_pattern}'") return TemplateMatchResult( matched=True, template_id=best_template_id, confidence=confidence, reasoning=f"Embedding similarity: {best_similarity:.3f} with pattern '{best_pattern}'" ) # Singleton instance _template_embedding_matcher: Optional[TemplateEmbeddingMatcher] = None def get_template_embedding_matcher() -> TemplateEmbeddingMatcher: """Get or create the singleton embedding matcher.""" global _template_embedding_matcher if _template_embedding_matcher is None: _template_embedding_matcher = TemplateEmbeddingMatcher() return _template_embedding_matcher class TemplateClassifier(dspy.Module): """Classifies questions to match SPARQL templates.""" def __init__(self): super().__init__() self.classify = dspy.ChainOfThought(TemplateClassifierSignature) self._templates: Optional[dict[str, TemplateDefinition]] = None def _load_templates(self) -> dict[str, TemplateDefinition]: """Load template definitions from YAML.""" if self._templates is not None: return self._templates self._templates = {} if TEMPLATES_PATH.exists(): try: import yaml with open(TEMPLATES_PATH) as f: data = yaml.safe_load(f) templates = data.get("templates", {}) for template_id, template_data in templates.items(): try: # Convert slots slots = {} for slot_name, slot_data in template_data.get("slots", {}).items(): if isinstance(slot_data, dict): slot_type = slot_data.get("type", "string") slots[slot_name] = SlotDefinition( type=SlotType(slot_type) if slot_type in [e.value for e in SlotType] else SlotType.STRING, required=slot_data.get("required", True), default=slot_data.get("default"), examples=slot_data.get("examples", []), fallback_types=[SlotType(t) for t in slot_data.get("fallback_types", []) if t in [e.value for e in SlotType]], valid_values=slot_data.get("valid_values", []) ) self._templates[template_id] = TemplateDefinition( id=template_id, description=template_data.get("description", ""), intent=template_data.get("intent", []), question_patterns=template_data.get("question_patterns", []), slots=slots, sparql_template=template_data.get("sparql_template", ""), sparql_template_alt=template_data.get("sparql_template_alt"), sparql_template_region=template_data.get("sparql_template_region"), sparql_template_country=template_data.get("sparql_template_country"), sparql_template_isil=template_data.get("sparql_template_isil"), sparql_template_ghcid=template_data.get("sparql_template_ghcid"), examples=template_data.get("examples", []) ) except Exception as e: logger.warning(f"Failed to parse template {template_id}: {e}") except Exception as e: logger.error(f"Failed to load templates: {e}") return self._templates def _pattern_to_regex(self, pattern: str) -> tuple[re.Pattern, list[str]]: """Convert a template pattern to a regex for matching. Converts patterns like: "Welke instellingen geven meer dan {amount} uit aan {budget_category}?" To regex like: "Welke instellingen geven meer dan (.+?) uit aan (.+?)\\?" Args: pattern: Template pattern with {slot_name} placeholders Returns: Tuple of (compiled regex pattern, list of slot names in order) """ # Extract slot names in order slot_names = re.findall(r'\{([^}]+)\}', pattern) # Escape regex special characters (except { and }) escaped = re.escape(pattern) # Replace escaped braces with capture groups # \{...\} becomes (.+?) for non-greedy capture regex_str = re.sub(r'\\{[^}]+\\}', r'(.+?)', escaped) # Compile with case-insensitive matching return re.compile(regex_str, re.IGNORECASE), slot_names def _validate_slot_value(self, slot_name: str, value: str, template_id: str) -> bool: """Validate a captured slot value against its expected type. This is used to disambiguate between templates that have identical patterns but different slot types (e.g., city vs region). Args: slot_name: Name of the slot (e.g., "city", "region") value: Captured value to validate template_id: Template ID for context Returns: True if the value is valid for the slot type, False otherwise """ resolver = get_synonym_resolver() # Slot-specific validation if slot_name in ("region", "subregion"): # For region slots, check if the value is a known region return resolver.is_region(value) elif slot_name == "city": # For city slots, check if the value is NOT a region (inverse logic) # This helps disambiguate "Noord-Holland" (region) from "Amsterdam" (city) return not resolver.is_region(value) # Default: accept any value return True def _match_by_patterns( self, question: str, templates: dict[str, TemplateDefinition] ) -> Optional[TemplateMatchResult]: """Try to match question against template patterns using regex. This provides a fast, deterministic fallback before using LLM classification. Patterns are defined in the YAML template's `question_patterns` field. When multiple patterns match, prefers: 1. Patterns with more literal text (more specific) 2. Patterns with higher fuzzy similarity Args: question: The natural language question templates: Dictionary of template definitions Returns: TemplateMatchResult if high-confidence match found, None otherwise """ all_matches: list[tuple[str, float, str, int, bool]] = [] # (template_id, confidence, pattern, literal_chars, is_exact) # Normalize question for matching question_normalized = question.strip() for template_id, template_def in templates.items(): patterns = template_def.question_patterns if not patterns: continue for pattern in patterns: try: regex, slot_names = self._pattern_to_regex(pattern) match = regex.fullmatch(question_normalized) # Calculate literal characters (non-slot text) in pattern literal_text = re.sub(r'\{[^}]+\}', '', pattern) literal_chars = len(literal_text.strip()) if match: # Validate captured slot values captured_values = match.groups() slots_valid = True for slot_name, value in zip(slot_names, captured_values): if not self._validate_slot_value(slot_name, value, template_id): logger.debug(f"Slot validation failed: {slot_name}='{value}' for template {template_id}") slots_valid = False break if not slots_valid: # Skip this match - slot value doesn't match expected type continue # Full match = high confidence, but scaled by specificity # More literal chars = more specific = higher confidence base_confidence = 0.95 # Boost confidence slightly for more specific patterns specificity_bonus = min(0.04, literal_chars / 200.0) confidence = base_confidence + specificity_bonus logger.debug(f"Pattern exact match: '{pattern}' -> {template_id} (literal_chars={literal_chars})") all_matches.append((template_id, confidence, pattern, literal_chars, True)) # is_exact=True continue # Try partial/fuzzy matching with lower confidence # Use rapidfuzz to compare pattern structure (with slots replaced) pattern_normalized = re.sub(r'\{[^}]+\}', '___', pattern.lower()) question_lower = question_normalized.lower() # Replace common numeric patterns with placeholder question_for_compare = re.sub(r'\d+(?:[.,]\d+)?', '___', question_lower) # Calculate similarity similarity = fuzz.ratio(pattern_normalized, question_for_compare) / 100.0 if similarity >= 0.75: # Good fuzzy match confidence = 0.70 + (similarity - 0.75) * 0.8 # Scale 0.75-1.0 to 0.70-0.90 logger.debug(f"Pattern fuzzy match: '{pattern}' -> {template_id} (sim={similarity:.2f}, conf={confidence:.2f})") all_matches.append((template_id, confidence, pattern, literal_chars, False)) # is_exact=False except Exception as e: logger.warning(f"Pattern matching error for '{pattern}': {e}") continue # Debug: Print match count logger.debug(f"Pattern matching found {len(all_matches)} matches for '{question[:50]}...'") if not all_matches: return None # Sort by: 1) is_exact descending (exact matches first), 2) literal_chars descending, 3) confidence descending all_matches.sort(key=lambda x: (x[4], x[3], x[1]), reverse=True) # Debug: show best match best_match = all_matches[0] logger.debug(f"Best match after sort: {best_match}") template_id, confidence, matched_pattern, literal_chars, is_exact = best_match logger.debug(f"Extracted: template_id={template_id}, confidence={confidence}, literal_chars={literal_chars}, is_exact={is_exact}") if confidence >= 0.75: logger.info(f"Pattern-based match found: template='{template_id}', confidence={confidence:.2f}, pattern='{matched_pattern}' (literal_chars={literal_chars})") return TemplateMatchResult( matched=True, template_id=template_id, confidence=confidence, reasoning=f"Pattern match: '{matched_pattern}'" ) return None def forward(self, question: str, language: str = "nl") -> TemplateMatchResult: """Classify question to find matching template. Args: question: Resolved natural language question language: Language code Returns: TemplateMatchResult with template ID and confidence """ templates = self._load_templates() if not templates: return TemplateMatchResult( matched=False, reasoning="No templates loaded" ) # TIER 1: Pattern-based matching (fast, deterministic, exact regex) pattern_match = self._match_by_patterns(question, templates) if pattern_match and pattern_match.confidence >= 0.75: logger.info(f"Using pattern-based match: {pattern_match.template_id} (confidence={pattern_match.confidence:.2f})") return pattern_match # TIER 2: Embedding-based matching (semantic similarity, handles paraphrases) embedding_matcher = get_template_embedding_matcher() embedding_match = embedding_matcher.match(question, templates, min_similarity=0.70) if embedding_match and embedding_match.confidence >= 0.70: logger.info(f"Using embedding-based match: {embedding_match.template_id} (confidence={embedding_match.confidence:.2f})") return embedding_match # TIER 3: LLM classification (fallback for complex/novel queries) try: result = self.classify( question=question, language=language ) template_id = result.template_id confidence = result.confidence # Debug logging to see what LLM returned logger.info(f"Template classifier returned: template_id='{template_id}', confidence={confidence}, reasoning='{result.reasoning[:100]}...'") logger.debug(f"Available templates: {list(templates.keys())}") # Handle numeric IDs (LLM sometimes returns "4" instead of "count_institutions_by_type_location") numeric_to_template = { "1": "list_institutions_by_type_city", "2": "list_institutions_by_type_region", "3": "list_institutions_by_type_country", "4": "count_institutions_by_type_location", "5": "count_institutions_by_type", "6": "find_institution_by_name", "7": "list_all_institutions_in_city", "8": "find_institutions_by_founding_date", "9": "find_institution_by_identifier", "10": "compare_locations", "11": "find_custodians_by_budget_threshold", } if template_id in numeric_to_template: logger.info(f"Converting numeric template_id '{template_id}' to '{numeric_to_template[template_id]}'") template_id = numeric_to_template[template_id] # Validate template exists if template_id != "none" and template_id not in templates: # Try fuzzy match on template IDs match = process.extractOne( template_id, list(templates.keys()), scorer=fuzz.ratio, score_cutoff=70 ) if match: template_id = match[0] else: template_id = "none" confidence = 0.0 if template_id == "none" or confidence < 0.6: return TemplateMatchResult( matched=False, template_id=None, confidence=confidence, reasoning=result.reasoning ) return TemplateMatchResult( matched=True, template_id=template_id, confidence=confidence, reasoning=result.reasoning ) except Exception as e: logger.warning(f"Template classification failed: {e}") return TemplateMatchResult( matched=False, reasoning=f"Classification error: {e}" ) class SlotExtractor(dspy.Module): """Extracts slot values from questions with synonym resolution.""" def __init__(self): super().__init__() self.extract = dspy.ChainOfThought(SlotExtractorSignature) self.resolver = get_synonym_resolver() self._templates: Optional[dict[str, TemplateDefinition]] = None def _get_template(self, template_id: str) -> Optional[TemplateDefinition]: """Get template definition by ID.""" if self._templates is None: classifier = TemplateClassifier() self._templates = classifier._load_templates() return self._templates.get(template_id) def forward( self, question: str, template_id: str, inherited_slots: Optional[dict[str, str]] = None ) -> dict[str, str]: """Extract slot values from question. Args: question: User's question template_id: ID of matched template inherited_slots: Slots inherited from conversation context Returns: Dictionary of slot names to resolved values """ template = self._get_template(template_id) if not template: return inherited_slots or {} # Get required slots required_slots = [ name for name, slot in template.slots.items() if slot.required ] try: result = self.extract( question=question, template_id=template_id, required_slots=", ".join(required_slots), inherited_slots=json.dumps(inherited_slots or {}) ) # Parse extracted slots try: raw_slots = json.loads(result.slots_json) except (json.JSONDecodeError, TypeError): raw_slots = {} # Merge with inherited slots (extracted takes precedence) slots = {**(inherited_slots or {}), **raw_slots} # Resolve synonyms for each slot resolved_slots = {} for name, value in slots.items(): if not value: continue slot_def = template.slots.get(name) if not slot_def: resolved_slots[name] = value continue # Resolve based on slot type if slot_def.type == SlotType.INSTITUTION_TYPE: resolved = self.resolver.resolve_institution_type(value) resolved_slots[name] = resolved or value elif slot_def.type == SlotType.SUBREGION: resolved = self.resolver.resolve_subregion(value) resolved_slots[name] = resolved or value elif slot_def.type == SlotType.COUNTRY: resolved = self.resolver.resolve_country(value) resolved_slots[name] = resolved or value elif slot_def.type == SlotType.CITY: resolved_slots[name] = self.resolver.resolve_city(value) elif slot_def.type == SlotType.BUDGET_CATEGORY: resolved = self.resolver.resolve_budget_category(value) resolved_slots[name] = resolved or value else: resolved_slots[name] = value return resolved_slots except Exception as e: logger.warning(f"Slot extraction failed: {e}") return inherited_slots or {} class TemplateInstantiator: """Renders SPARQL queries from templates using Jinja2.""" def __init__(self): self.env = Environment(loader=BaseLoader()) self._templates: Optional[dict[str, TemplateDefinition]] = None def _get_template(self, template_id: str) -> Optional[TemplateDefinition]: """Get template definition by ID.""" if self._templates is None: classifier = TemplateClassifier() self._templates = classifier._load_templates() return self._templates.get(template_id) def render( self, template_id: str, slots: dict[str, str], variant: Optional[str] = None ) -> Optional[str]: """Render SPARQL query from template and slots. Args: template_id: Template to use slots: Resolved slot values variant: Optional variant (e.g., 'region', 'country', 'isil') Returns: Rendered SPARQL query or None if rendering fails """ template_def = self._get_template(template_id) if not template_def: logger.warning(f"Template not found: {template_id}") return None # Select template variant if variant == "region" and template_def.sparql_template_region: sparql_template = template_def.sparql_template_region elif variant == "country" and template_def.sparql_template_country: sparql_template = template_def.sparql_template_country elif variant == "isil" and template_def.sparql_template_isil: sparql_template = template_def.sparql_template_isil elif variant == "ghcid" and template_def.sparql_template_ghcid: sparql_template = template_def.sparql_template_ghcid elif variant == "alt" and template_def.sparql_template_alt: sparql_template = template_def.sparql_template_alt else: sparql_template = template_def.sparql_template if not sparql_template: logger.warning(f"No SPARQL template for {template_id} variant {variant}") return None try: # Add prefixes to context context = { "prefixes": SPARQL_PREFIXES, "limit": slots.get("limit", 10), **slots } # Render template jinja_template = self.env.from_string(sparql_template) sparql = jinja_template.render(**context) # Clean up whitespace sparql = re.sub(r'\n\s*\n', '\n', sparql.strip()) return sparql except Exception as e: logger.error(f"Template rendering failed: {e}") return None # ============================================================================= # MAIN PIPELINE # ============================================================================= class TemplateSPARQLPipeline(dspy.Module): """Complete template-based SPARQL generation pipeline. Pipeline order (CRITICAL): 1. ConversationContextResolver - Expand follow-ups FIRST 2. FykeFilter - Filter irrelevant on RESOLVED question 3. TemplateClassifier - Match to template 4. SlotExtractor - Extract and resolve slots 5. TemplateInstantiator - Render SPARQL Falls back to LLM generation if no template matches. """ def __init__(self): super().__init__() self.context_resolver = ConversationContextResolver() self.fyke_filter = FykeFilter() self.template_classifier = TemplateClassifier() self.slot_extractor = SlotExtractor() self.instantiator = TemplateInstantiator() def forward( self, question: str, conversation_state: Optional[ConversationState] = None, language: str = "nl" ) -> TemplateMatchResult: """Process question through complete pipeline. Args: question: User's question (may be elliptical follow-up) conversation_state: Conversation history and state language: Language code Returns: TemplateMatchResult with SPARQL query if successful """ # Step 1: Resolve conversation context FIRST resolved = self.context_resolver.forward( question=question, conversation_state=conversation_state ) logger.info(f"Resolved question: '{question}' → '{resolved.resolved}'") # Step 2: Fyke filter on RESOLVED question fyke_result = self.fyke_filter.forward( resolved_question=resolved.resolved, conversation_topic="heritage institutions", language=language ) if not fyke_result.is_relevant: logger.info(f"Question filtered by Fyke: {fyke_result.reasoning}") return TemplateMatchResult( matched=False, reasoning=f"Out of scope: {fyke_result.reasoning}", sparql=None # Will trigger standard response ) # Step 3: Classify to template match_result = self.template_classifier.forward( question=resolved.resolved, language=language ) if not match_result.matched: logger.info(f"No template match: {match_result.reasoning}") return match_result # Falls back to LLM generation # Step 4: Extract slots template_id = match_result.template_id if template_id is None: return TemplateMatchResult( matched=False, reasoning="No template ID from classifier" ) slots = self.slot_extractor.forward( question=resolved.resolved, template_id=template_id, inherited_slots=resolved.inherited_slots ) logger.info(f"Extracted slots: {slots}") # Step 5: Render SPARQL sparql = self.instantiator.render( template_id=template_id, slots=slots ) if not sparql: logger.warning(f"Failed to render template {match_result.template_id}") return TemplateMatchResult( matched=False, template_id=match_result.template_id, reasoning="Template rendering failed" ) # Update conversation state if provided if conversation_state: conversation_state.add_turn(ConversationTurn( role="user", content=question, resolved_question=resolved.resolved, template_id=match_result.template_id, slots=slots )) return TemplateMatchResult( matched=True, template_id=match_result.template_id, confidence=match_result.confidence, slots=slots, sparql=sparql, reasoning=match_result.reasoning ) # ============================================================================= # FACTORY FUNCTION # ============================================================================= def get_template_pipeline() -> TemplateSPARQLPipeline: """Get or create template SPARQL pipeline instance.""" return TemplateSPARQLPipeline()