""" Template-Based SPARQL Query Generation System This module implements a template-based approach to SPARQL query generation, replacing error-prone LLM-generated queries with deterministic, validated templates. Architecture (CRITICAL ORDERING): ================================= 1. ConversationContextResolver (DSPy) - Resolves elliptical follow-ups FIRST "En in Enschede?" → "Welke archieven zijn er in Enschede?" 2. FykeFilter (DSPy) - Filters irrelevant questions on RESOLVED input ⚠️ MUST operate on resolved question, not raw input! 3. TemplateClassifier (DSPy) - Matches to SPARQL template 4. SlotExtractor (DSPy) - Extracts slot values with synonym resolution 5. TemplateInstantiator (Jinja2) - Renders final SPARQL query Based on: - docs/plan/prompt-query_template_mapping/ - Formica et al. (2023) - Template-based SPARQL achieves 65% precision vs 10% LLM-only - DSPy 2.6+ GEPA optimization Author: OpenCode Created: 2025-01-06 """ from __future__ import annotations import json import logging import re from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Literal, Optional import dspy from dspy import History from jinja2 import Environment, BaseLoader from pydantic import BaseModel, Field from rapidfuzz import fuzz, process logger = logging.getLogger(__name__) # ============================================================================= # CONFIGURATION # ============================================================================= def _find_data_path(filename: str) -> Path: """Find data file in multiple possible locations. Supports both local development (backend/rag/ → data/) and server deployment (e.g., /opt/glam-backend/rag/data/). """ # Try relative to module location (local dev: backend/rag → glam/data) module_dir = Path(__file__).parent candidates = [ module_dir.parent.parent / "data" / filename, # Local: glam/data/ module_dir / "data" / filename, # Server: rag/data/ Path("/opt/glam-backend/rag/data") / filename, # Server explicit path ] for candidate in candidates: if candidate.exists(): return candidate # Return first candidate (will report as missing in logs) return candidates[0] TEMPLATES_PATH = _find_data_path("sparql_templates.yaml") VALIDATION_RULES_PATH = _find_data_path("validation/sparql_validation_rules.json") # Standard SPARQL prefixes SPARQL_PREFIXES = """PREFIX hc: PREFIX crm: PREFIX schema: PREFIX skos: PREFIX org: PREFIX foaf: PREFIX dcterms: PREFIX xsd: PREFIX wd: """ # ============================================================================= # PYDANTIC MODELS # ============================================================================= class SlotType(str, Enum): """Types of template slots.""" INSTITUTION_TYPE = "institution_type" SUBREGION = "subregion" COUNTRY = "country" CITY = "city" INSTITUTION_NAME = "institution_name" STRING = "string" INTEGER = "integer" class SlotDefinition(BaseModel): """Definition of a template slot.""" type: SlotType required: bool = True default: Optional[str] = None examples: list[str] = Field(default_factory=list) fallback_types: list[SlotType] = Field(default_factory=list) valid_values: list[str] = Field(default_factory=list) class TemplateDefinition(BaseModel): """Definition of a SPARQL query template.""" id: str description: str intent: list[str] question_patterns: list[str] slots: dict[str, SlotDefinition] sparql_template: str sparql_template_alt: Optional[str] = None sparql_template_region: Optional[str] = None sparql_template_country: Optional[str] = None sparql_template_isil: Optional[str] = None sparql_template_ghcid: Optional[str] = None examples: list[dict[str, Any]] = Field(default_factory=list) class FollowUpPattern(BaseModel): """Definition of a follow-up question pattern.""" description: str patterns: list[str] slot_inheritance: list[str] = Field(default_factory=list) transforms_to: Optional[str] = None resolution_strategy: str requires_previous_results: bool = False class FykeFilterConfig(BaseModel): """Configuration for the Fyke filter.""" out_of_scope_keywords: list[str] out_of_scope_categories: list[str] heritage_keywords: list[str] standard_response: dict[str, str] class ConversationTurn(BaseModel): """A single turn in conversation history.""" role: Literal["user", "assistant"] content: str resolved_question: Optional[str] = None template_id: Optional[str] = None slots: dict[str, str] = Field(default_factory=dict) results: list[dict[str, Any]] = Field(default_factory=list) class ConversationState(BaseModel): """State tracking across conversation turns.""" turns: list[ConversationTurn] = Field(default_factory=list) current_slots: dict[str, str] = Field(default_factory=dict) current_template_id: Optional[str] = None language: str = "nl" def add_turn(self, turn: ConversationTurn) -> None: """Add a turn and update current state.""" self.turns.append(turn) if turn.role == "user" and turn.slots: # Inherit slots from user turns self.current_slots.update(turn.slots) if turn.template_id: self.current_template_id = turn.template_id def get_previous_user_turn(self) -> Optional[ConversationTurn]: """Get the most recent user turn.""" for turn in reversed(self.turns): if turn.role == "user": return turn return None def to_dspy_history(self) -> History: """Convert to DSPy History object.""" messages = [] for turn in self.turns[-6:]: # Keep last 6 turns for context messages.append({ "role": turn.role, "content": turn.resolved_question or turn.content }) return History(messages=messages) class TemplateMatchResult(BaseModel): """Result of template matching.""" matched: bool template_id: Optional[str] = None confidence: float = 0.0 slots: dict[str, str] = Field(default_factory=dict) sparql: Optional[str] = None reasoning: str = "" class ResolvedQuestion(BaseModel): """Result of conversation context resolution.""" original: str resolved: str is_follow_up: bool = False follow_up_type: Optional[str] = None inherited_slots: dict[str, str] = Field(default_factory=dict) confidence: float = 1.0 class FykeResult(BaseModel): """Result of Fyke filter.""" is_relevant: bool confidence: float reasoning: str standard_response: Optional[str] = None # ============================================================================= # SYNONYM MAPPINGS (loaded from validation rules) # ============================================================================= class SynonymResolver: """Resolves natural language terms to canonical slot values.""" def __init__(self): self._institution_types: dict[str, str] = {} self._subregions: dict[str, str] = {} self._countries: dict[str, str] = {} self._cities: set[str] = set() self._loaded = False def load(self) -> None: """Load synonym mappings from validation rules and templates.""" if self._loaded: return # Load from validation rules if VALIDATION_RULES_PATH.exists(): try: with open(VALIDATION_RULES_PATH) as f: rules = json.load(f) # Institution type mappings if "institution_type_mappings" in rules: for k, v in rules["institution_type_mappings"].items(): self._institution_types[k.lower()] = v # Subregion mappings if "subregion_mappings" in rules: for k, v in rules["subregion_mappings"].items(): self._subregions[k.lower()] = v # Country mappings if "country_mappings" in rules: for k, v in rules["country_mappings"].items(): self._countries[k.lower()] = v except Exception as e: logger.warning(f"Failed to load validation rules: {e}") # Load additional synonyms from templates YAML if TEMPLATES_PATH.exists(): try: import yaml with open(TEMPLATES_PATH) as f: templates = yaml.safe_load(f) slot_types = templates.get("_slot_types", {}) # Institution type synonyms inst_synonyms = slot_types.get("institution_type", {}).get("synonyms", {}) for k, v in inst_synonyms.items(): self._institution_types[k.lower().replace("_", " ")] = v # Subregion synonyms region_synonyms = slot_types.get("subregion", {}).get("synonyms", {}) for k, v in region_synonyms.items(): self._subregions[k.lower().replace("_", " ")] = v # Country synonyms country_synonyms = slot_types.get("country", {}).get("synonyms", {}) for k, v in country_synonyms.items(): self._countries[k.lower().replace("_", " ")] = v except Exception as e: logger.warning(f"Failed to load template synonyms: {e}") # Add common Dutch institution type synonyms dutch_types = { "museum": "M", "musea": "M", "museums": "M", "bibliotheek": "L", "bibliotheken": "L", "library": "L", "libraries": "L", "archief": "A", "archieven": "A", "archive": "A", "archives": "A", "galerie": "G", "galerij": "G", "galerijen": "G", "gallery": "G", "galleries": "G", } for k, v in dutch_types.items(): if k not in self._institution_types: self._institution_types[k] = v self._loaded = True logger.info(f"Loaded {len(self._institution_types)} institution types, " f"{len(self._subregions)} subregions, {len(self._countries)} countries") def resolve_institution_type(self, term: str) -> Optional[str]: """Resolve institution type term to single-letter code.""" self.load() term_lower = term.lower().strip() # Direct match if term_lower in self._institution_types: return self._institution_types[term_lower] # Already a valid code if term.upper() in "MLAGORCUBESFIXPHDNT": return term.upper() # Fuzzy match if self._institution_types: match = process.extractOne( term_lower, list(self._institution_types.keys()), scorer=fuzz.ratio, score_cutoff=80 ) if match: return self._institution_types[match[0]] return None def resolve_subregion(self, term: str) -> Optional[str]: """Resolve subregion term to ISO 3166-2 code.""" self.load() term_lower = term.lower().strip() # Direct match if term_lower in self._subregions: return self._subregions[term_lower] # Already a valid code (e.g., NL-NH) if re.match(r'^[A-Z]{2}-[A-Z]{2,3}$', term.upper()): return term.upper() # Fuzzy match if self._subregions: match = process.extractOne( term_lower, list(self._subregions.keys()), scorer=fuzz.ratio, score_cutoff=75 ) if match: return self._subregions[match[0]] return None def resolve_country(self, term: str) -> Optional[str]: """Resolve country term to Wikidata Q-number.""" self.load() term_lower = term.lower().strip() # Direct match if term_lower in self._countries: return self._countries[term_lower] # Already a Q-number if re.match(r'^Q\d+$', term): return term # Fuzzy match if self._countries: match = process.extractOne( term_lower, list(self._countries.keys()), scorer=fuzz.ratio, score_cutoff=80 ) if match: return self._countries[match[0]] return None def resolve_city(self, term: str) -> str: """Normalize city name (title case, common corrections).""" # Common Dutch city name corrections corrections = { "den haag": "Den Haag", "the hague": "Den Haag", "'s-gravenhage": "Den Haag", "s-gravenhage": "Den Haag", "'s-hertogenbosch": "'s-Hertogenbosch", "den bosch": "'s-Hertogenbosch", } term_lower = term.lower().strip() if term_lower in corrections: return corrections[term_lower] # Title case with Dutch article handling if term_lower.startswith("'s-"): return "'" + term[1:2] + "-" + term[3:].title() return term.title() # Global synonym resolver instance _synonym_resolver: Optional[SynonymResolver] = None def get_synonym_resolver() -> SynonymResolver: """Get or create the global synonym resolver.""" global _synonym_resolver if _synonym_resolver is None: _synonym_resolver = SynonymResolver() return _synonym_resolver # ============================================================================= # DSPy SIGNATURES # ============================================================================= class ConversationContextSignature(dspy.Signature): """Resolve elliptical follow-up questions using conversation history. CRITICAL: This module runs FIRST, before any filtering or classification. It expands short follow-up questions into complete, self-contained questions. Examples of resolution: - Previous: "Welke archieven zijn er in Den Haag?" Current: "En in Enschede?" Resolved: "Welke archieven zijn er in Enschede?" - Previous: "Welke musea zijn er in Amsterdam?" Current: "Hoeveel?" Resolved: "Hoeveel musea zijn er in Amsterdam?" - Previous: Listed 5 museums Current: "Vertel me meer over de eerste" Resolved: "Vertel me meer over [first museum name]" The resolved question must be a complete, standalone question that would make sense without any conversation history. """ question: str = dspy.InputField( desc="Current user question (may be elliptical follow-up)" ) history: History = dspy.InputField( desc="Previous conversation turns", default=History(messages=[]) ) previous_slots: str = dspy.InputField( desc="JSON string of slot values from previous query (e.g., {\"institution_type\": \"A\", \"city\": \"Den Haag\"})", default="{}" ) previous_results_summary: str = dspy.InputField( desc="Brief summary of previous query results for ordinal/pronoun resolution", default="" ) resolved_question: str = dspy.OutputField( desc="Fully expanded, self-contained question. If not a follow-up, return original question unchanged." ) is_follow_up: bool = dspy.OutputField( desc="True if this was an elliptical follow-up that needed resolution" ) follow_up_type: str = dspy.OutputField( desc="Type of follow-up: 'location_swap', 'type_swap', 'count_from_list', 'details_request', 'ordinal_reference', 'pronoun_reference', or 'none'" ) inherited_slots_json: str = dspy.OutputField( desc="JSON string of slots inherited from previous query (e.g., {\"institution_type\": \"A\"})" ) confidence: float = dspy.OutputField( desc="Confidence in resolution (0.0-1.0)" ) class FykeFilterSignature(dspy.Signature): """Determine if a RESOLVED question is relevant to heritage institutions. CRITICAL: This filter operates on the RESOLVED question from ConversationContextResolver, NOT the raw user input. This prevents false positives on short follow-ups like "En in Enschede?" which resolves to "Welke archieven zijn er in Enschede?" Heritage institutions include: - Museums (musea) - Archives (archieven) - Libraries (bibliotheken) - Galleries (galerijen) - Heritage societies - Cultural institutions - Collections Questions about these topics are RELEVANT. Questions about shopping, weather, sports, restaurants, etc. are NOT relevant. When in doubt, err on the side of relevance (return True). """ resolved_question: str = dspy.InputField( desc="The fully resolved question (after context resolution)" ) conversation_topic: str = dspy.InputField( desc="Brief summary of what the conversation has been about so far", default="heritage institutions" ) is_relevant: bool = dspy.OutputField( desc="True if question is about heritage institutions, False otherwise" ) confidence: float = dspy.OutputField( desc="Confidence in relevance classification (0.0-1.0)" ) reasoning: str = dspy.OutputField( desc="Brief explanation of why question is/isn't relevant" ) class TemplateClassifierSignature(dspy.Signature): """Classify a heritage question to match it with a SPARQL template. Given a resolved question about heritage institutions, determine which SPARQL query template best matches the user's intent. Available template IDs (return the EXACT ID string, not the number): - list_institutions_by_type_city: List institutions of type X in city Y - list_institutions_by_type_region: List institutions of type X in region Y (province/state) - list_institutions_by_type_country: List institutions of type X in country Y - count_institutions_by_type_location: Count institutions of type X in location Y - count_institutions_by_type: Count all institutions grouped by type - find_institution_by_name: Find specific institution by name - list_all_institutions_in_city: List all institutions in city Y - find_institutions_by_founding_date: Find oldest/newest institutions - find_institution_by_identifier: Find by ISIL/GHCID - compare_locations: Compare institutions between locations - none: No template matches (fall back to LLM generation) CRITICAL DISAMBIGUATION - Province vs City: Some Dutch locations are BOTH a province AND a city with the same name. When the location name alone is used (without "stad" or "de stad"), DEFAULT TO PROVINCE (use list_institutions_by_type_region): - Groningen → province (NL-GR), NOT the city - Utrecht → province (NL-UT), NOT the city - Limburg → province (NL-LI), NOT the city - Friesland/Frisia → province (NL-FR) - Drenthe → province (NL-DR) - Gelderland → province (NL-GE) - Overijssel → province (NL-OV) - Flevoland → province (NL-FL) - Zeeland → province (NL-ZE) - Noord-Holland → province (NL-NH) - Zuid-Holland → province (NL-ZH) - Noord-Brabant → province (NL-NB) Use list_institutions_by_type_city ONLY when: - The question explicitly says "de stad" or "in de stad" (the city) - The location is clearly just a city (e.g., Amsterdam, Rotterdam, Den Haag) IMPORTANT: Return the template ID string exactly as shown (e.g. "count_institutions_by_type_location"), NOT a number. Return "none" if no template matches well. """ question: str = dspy.InputField( desc="Resolved natural language question about heritage institutions" ) language: str = dspy.InputField( desc="Language code: nl, en, de, fr", default="nl" ) template_id: str = dspy.OutputField( desc="EXACT template ID string from the list above (e.g. 'count_institutions_by_type_location'), or 'none'" ) confidence: float = dspy.OutputField( desc="Confidence in template match (0.0-1.0). Return 'none' as template_id if below 0.6" ) reasoning: str = dspy.OutputField( desc="Brief explanation of why this template matches" ) class SlotExtractorSignature(dspy.Signature): """Extract slot values from a question for a specific template. Given a question and the template it matched, extract the values needed to fill in the template's slots. Slot types and expected formats: - institution_type: Return single-letter code (M, L, A, G, O, R, C, U, B, E, S, F, I, X, P, H, D, N, T) Examples: "musea" → "M", "archieven" → "A", "bibliotheken" → "L" - city: Return city name with proper capitalization Examples: "amsterdam" → "Amsterdam", "den haag" → "Den Haag" - region: Return ISO 3166-2 code Examples: "Noord-Holland" → "NL-NH", "Gelderland" → "NL-GE" - country: Return Wikidata Q-number Examples: "Nederland" → "Q55", "Belgium" → "Q31" - institution_name: Return the institution name as mentioned - limit: Return integer (default 10) Return slots as a JSON object with slot names as keys. """ question: str = dspy.InputField( desc="The user's question" ) template_id: str = dspy.InputField( desc="ID of the matched template" ) required_slots: str = dspy.InputField( desc="Comma-separated list of required slot names for this template" ) inherited_slots: str = dspy.InputField( desc="JSON string of slots inherited from conversation context", default="{}" ) slots_json: str = dspy.OutputField( desc="JSON object with extracted slot values, e.g., {\"institution_type\": \"M\", \"city\": \"Amsterdam\"}" ) extraction_notes: str = dspy.OutputField( desc="Notes about any slots that couldn't be extracted or needed inference" ) # ============================================================================= # DSPy MODULES # ============================================================================= class ConversationContextResolver(dspy.Module): """Resolves elliptical follow-up questions using conversation history. CRITICAL: This module MUST run FIRST in the pipeline, before FykeFilter. It expands short follow-ups like "En in Enschede?" into complete questions like "Welke archieven zijn er in Enschede?" so that subsequent modules can properly understand the user's intent. """ def __init__(self): super().__init__() self.resolve = dspy.ChainOfThought(ConversationContextSignature) def forward( self, question: str, conversation_state: Optional[ConversationState] = None, ) -> ResolvedQuestion: """Resolve a potentially elliptical question. Args: question: Current user question (may be elliptical) conversation_state: Full conversation state with history Returns: ResolvedQuestion with expanded question and metadata """ # If no conversation history, return as-is if conversation_state is None or not conversation_state.turns: return ResolvedQuestion( original=question, resolved=question, is_follow_up=False, confidence=1.0 ) # Prepare inputs for DSPy history = conversation_state.to_dspy_history() previous_slots = json.dumps(conversation_state.current_slots) # Get previous results summary if available prev_turn = conversation_state.get_previous_user_turn() results_summary = "" if prev_turn and prev_turn.results: results_summary = ", ".join( r.get("name", str(r))[:50] for r in prev_turn.results[:5] ) try: result = self.resolve( question=question, history=history, previous_slots=previous_slots, previous_results_summary=results_summary ) # Parse inherited slots try: inherited = json.loads(result.inherited_slots_json) except (json.JSONDecodeError, TypeError): inherited = {} return ResolvedQuestion( original=question, resolved=result.resolved_question, is_follow_up=result.is_follow_up, follow_up_type=result.follow_up_type if result.follow_up_type != "none" else None, inherited_slots=inherited, confidence=result.confidence ) except Exception as e: logger.warning(f"Context resolution failed: {e}, returning original") return ResolvedQuestion( original=question, resolved=question, is_follow_up=False, confidence=0.5 ) class FykeFilter(dspy.Module): """Filters out irrelevant questions with standard response. CRITICAL: Must operate on RESOLVED question from ConversationContextResolver, not the raw user input. This prevents false positives on follow-ups. Named after Dutch "fuik" (fish trap) - catches irrelevant questions. """ def __init__(self, config: Optional[FykeFilterConfig] = None): super().__init__() self.classify = dspy.ChainOfThought(FykeFilterSignature) self.config = config or self._load_config() def _load_config(self) -> FykeFilterConfig: """Load Fyke configuration from templates YAML.""" default_config = FykeFilterConfig( out_of_scope_keywords=[ "tandpasta", "toothpaste", "supermarkt", "restaurant", "hotel", "weer", "weather", "voetbal", "soccer" ], out_of_scope_categories=["shopping", "sports", "cooking"], heritage_keywords=[ "museum", "musea", "archief", "archieven", "bibliotheek", "galerie", "erfgoed", "heritage", "collectie", "collection" ], standard_response={ "nl": "Ik kan je helpen met vragen over erfgoedinstellingen zoals musea, archieven, bibliotheken en galerijen.", "en": "I can help you with questions about heritage institutions such as museums, archives, libraries and galleries.", "de": "Ich kann Ihnen bei Fragen zu Kulturerbeinstitutionen wie Museen, Archiven und Bibliotheken helfen.", "fr": "Je peux vous aider avec des questions sur les institutions patrimoniales." } ) if TEMPLATES_PATH.exists(): try: import yaml with open(TEMPLATES_PATH) as f: templates = yaml.safe_load(f) fyke_config = templates.get("fyke_filter", {}) return FykeFilterConfig( out_of_scope_keywords=fyke_config.get("out_of_scope_keywords", default_config.out_of_scope_keywords), out_of_scope_categories=fyke_config.get("out_of_scope_categories", default_config.out_of_scope_categories), heritage_keywords=fyke_config.get("heritage_keywords", default_config.heritage_keywords), standard_response=fyke_config.get("standard_response", default_config.standard_response) ) except Exception as e: logger.warning(f"Failed to load Fyke config: {e}") return default_config def forward( self, resolved_question: str, conversation_topic: str = "heritage institutions", language: str = "nl" ) -> FykeResult: """Check if resolved question is relevant to heritage. Args: resolved_question: The RESOLVED question (not raw input!) conversation_topic: Summary of conversation so far language: Language code for standard response Returns: FykeResult with relevance decision """ question_lower = resolved_question.lower() # Quick check: obvious heritage keywords → definitely relevant for keyword in self.config.heritage_keywords: if keyword in question_lower: return FykeResult( is_relevant=True, confidence=0.95, reasoning=f"Contains heritage keyword: {keyword}" ) # Quick check: obvious out-of-scope keywords → definitely irrelevant for keyword in self.config.out_of_scope_keywords: if keyword in question_lower: return FykeResult( is_relevant=False, confidence=0.95, reasoning=f"Contains out-of-scope keyword: {keyword}", standard_response=self.config.standard_response.get( language, self.config.standard_response.get("en") ) ) # Use DSPy for ambiguous cases try: result = self.classify( resolved_question=resolved_question, conversation_topic=conversation_topic ) return FykeResult( is_relevant=result.is_relevant, confidence=result.confidence, reasoning=result.reasoning, standard_response=None if result.is_relevant else self.config.standard_response.get( language, self.config.standard_response.get("en") ) ) except Exception as e: logger.warning(f"Fyke classification failed: {e}, assuming relevant") # Err on side of relevance return FykeResult( is_relevant=True, confidence=0.5, reasoning=f"Classification failed, assuming relevant: {e}" ) class TemplateClassifier(dspy.Module): """Classifies questions to match SPARQL templates.""" def __init__(self): super().__init__() self.classify = dspy.ChainOfThought(TemplateClassifierSignature) self._templates: Optional[dict[str, TemplateDefinition]] = None def _load_templates(self) -> dict[str, TemplateDefinition]: """Load template definitions from YAML.""" if self._templates is not None: return self._templates self._templates = {} if TEMPLATES_PATH.exists(): try: import yaml with open(TEMPLATES_PATH) as f: data = yaml.safe_load(f) templates = data.get("templates", {}) for template_id, template_data in templates.items(): try: # Convert slots slots = {} for slot_name, slot_data in template_data.get("slots", {}).items(): if isinstance(slot_data, dict): slot_type = slot_data.get("type", "string") slots[slot_name] = SlotDefinition( type=SlotType(slot_type) if slot_type in [e.value for e in SlotType] else SlotType.STRING, required=slot_data.get("required", True), default=slot_data.get("default"), examples=slot_data.get("examples", []), fallback_types=[SlotType(t) for t in slot_data.get("fallback_types", []) if t in [e.value for e in SlotType]], valid_values=slot_data.get("valid_values", []) ) self._templates[template_id] = TemplateDefinition( id=template_id, description=template_data.get("description", ""), intent=template_data.get("intent", []), question_patterns=template_data.get("question_patterns", []), slots=slots, sparql_template=template_data.get("sparql_template", ""), sparql_template_alt=template_data.get("sparql_template_alt"), sparql_template_region=template_data.get("sparql_template_region"), sparql_template_country=template_data.get("sparql_template_country"), sparql_template_isil=template_data.get("sparql_template_isil"), sparql_template_ghcid=template_data.get("sparql_template_ghcid"), examples=template_data.get("examples", []) ) except Exception as e: logger.warning(f"Failed to parse template {template_id}: {e}") except Exception as e: logger.error(f"Failed to load templates: {e}") return self._templates def forward(self, question: str, language: str = "nl") -> TemplateMatchResult: """Classify question to find matching template. Args: question: Resolved natural language question language: Language code Returns: TemplateMatchResult with template ID and confidence """ templates = self._load_templates() if not templates: return TemplateMatchResult( matched=False, reasoning="No templates loaded" ) try: result = self.classify( question=question, language=language ) template_id = result.template_id confidence = result.confidence # Debug logging to see what LLM returned logger.info(f"Template classifier returned: template_id='{template_id}', confidence={confidence}, reasoning='{result.reasoning[:100]}...'") logger.debug(f"Available templates: {list(templates.keys())}") # Handle numeric IDs (LLM sometimes returns "4" instead of "count_institutions_by_type_location") numeric_to_template = { "1": "list_institutions_by_type_city", "2": "list_institutions_by_type_region", "3": "list_institutions_by_type_country", "4": "count_institutions_by_type_location", "5": "count_institutions_by_type", "6": "find_institution_by_name", "7": "list_all_institutions_in_city", "8": "find_institutions_by_founding_date", "9": "find_institution_by_identifier", "10": "compare_locations", } if template_id in numeric_to_template: logger.info(f"Converting numeric template_id '{template_id}' to '{numeric_to_template[template_id]}'") template_id = numeric_to_template[template_id] # Validate template exists if template_id != "none" and template_id not in templates: # Try fuzzy match on template IDs match = process.extractOne( template_id, list(templates.keys()), scorer=fuzz.ratio, score_cutoff=70 ) if match: template_id = match[0] else: template_id = "none" confidence = 0.0 if template_id == "none" or confidence < 0.6: return TemplateMatchResult( matched=False, template_id=None, confidence=confidence, reasoning=result.reasoning ) return TemplateMatchResult( matched=True, template_id=template_id, confidence=confidence, reasoning=result.reasoning ) except Exception as e: logger.warning(f"Template classification failed: {e}") return TemplateMatchResult( matched=False, reasoning=f"Classification error: {e}" ) class SlotExtractor(dspy.Module): """Extracts slot values from questions with synonym resolution.""" def __init__(self): super().__init__() self.extract = dspy.ChainOfThought(SlotExtractorSignature) self.resolver = get_synonym_resolver() self._templates: Optional[dict[str, TemplateDefinition]] = None def _get_template(self, template_id: str) -> Optional[TemplateDefinition]: """Get template definition by ID.""" if self._templates is None: classifier = TemplateClassifier() self._templates = classifier._load_templates() return self._templates.get(template_id) def forward( self, question: str, template_id: str, inherited_slots: Optional[dict[str, str]] = None ) -> dict[str, str]: """Extract slot values from question. Args: question: User's question template_id: ID of matched template inherited_slots: Slots inherited from conversation context Returns: Dictionary of slot names to resolved values """ template = self._get_template(template_id) if not template: return inherited_slots or {} # Get required slots required_slots = [ name for name, slot in template.slots.items() if slot.required ] try: result = self.extract( question=question, template_id=template_id, required_slots=", ".join(required_slots), inherited_slots=json.dumps(inherited_slots or {}) ) # Parse extracted slots try: raw_slots = json.loads(result.slots_json) except (json.JSONDecodeError, TypeError): raw_slots = {} # Merge with inherited slots (extracted takes precedence) slots = {**(inherited_slots or {}), **raw_slots} # Resolve synonyms for each slot resolved_slots = {} for name, value in slots.items(): if not value: continue slot_def = template.slots.get(name) if not slot_def: resolved_slots[name] = value continue # Resolve based on slot type if slot_def.type == SlotType.INSTITUTION_TYPE: resolved = self.resolver.resolve_institution_type(value) resolved_slots[name] = resolved or value elif slot_def.type == SlotType.SUBREGION: resolved = self.resolver.resolve_subregion(value) resolved_slots[name] = resolved or value elif slot_def.type == SlotType.COUNTRY: resolved = self.resolver.resolve_country(value) resolved_slots[name] = resolved or value elif slot_def.type == SlotType.CITY: resolved_slots[name] = self.resolver.resolve_city(value) else: resolved_slots[name] = value return resolved_slots except Exception as e: logger.warning(f"Slot extraction failed: {e}") return inherited_slots or {} class TemplateInstantiator: """Renders SPARQL queries from templates using Jinja2.""" def __init__(self): self.env = Environment(loader=BaseLoader()) self._templates: Optional[dict[str, TemplateDefinition]] = None def _get_template(self, template_id: str) -> Optional[TemplateDefinition]: """Get template definition by ID.""" if self._templates is None: classifier = TemplateClassifier() self._templates = classifier._load_templates() return self._templates.get(template_id) def render( self, template_id: str, slots: dict[str, str], variant: Optional[str] = None ) -> Optional[str]: """Render SPARQL query from template and slots. Args: template_id: Template to use slots: Resolved slot values variant: Optional variant (e.g., 'region', 'country', 'isil') Returns: Rendered SPARQL query or None if rendering fails """ template_def = self._get_template(template_id) if not template_def: logger.warning(f"Template not found: {template_id}") return None # Select template variant if variant == "region" and template_def.sparql_template_region: sparql_template = template_def.sparql_template_region elif variant == "country" and template_def.sparql_template_country: sparql_template = template_def.sparql_template_country elif variant == "isil" and template_def.sparql_template_isil: sparql_template = template_def.sparql_template_isil elif variant == "ghcid" and template_def.sparql_template_ghcid: sparql_template = template_def.sparql_template_ghcid elif variant == "alt" and template_def.sparql_template_alt: sparql_template = template_def.sparql_template_alt else: sparql_template = template_def.sparql_template if not sparql_template: logger.warning(f"No SPARQL template for {template_id} variant {variant}") return None try: # Add prefixes to context context = { "prefixes": SPARQL_PREFIXES, "limit": slots.get("limit", 10), **slots } # Render template jinja_template = self.env.from_string(sparql_template) sparql = jinja_template.render(**context) # Clean up whitespace sparql = re.sub(r'\n\s*\n', '\n', sparql.strip()) return sparql except Exception as e: logger.error(f"Template rendering failed: {e}") return None # ============================================================================= # MAIN PIPELINE # ============================================================================= class TemplateSPARQLPipeline(dspy.Module): """Complete template-based SPARQL generation pipeline. Pipeline order (CRITICAL): 1. ConversationContextResolver - Expand follow-ups FIRST 2. FykeFilter - Filter irrelevant on RESOLVED question 3. TemplateClassifier - Match to template 4. SlotExtractor - Extract and resolve slots 5. TemplateInstantiator - Render SPARQL Falls back to LLM generation if no template matches. """ def __init__(self): super().__init__() self.context_resolver = ConversationContextResolver() self.fyke_filter = FykeFilter() self.template_classifier = TemplateClassifier() self.slot_extractor = SlotExtractor() self.instantiator = TemplateInstantiator() def forward( self, question: str, conversation_state: Optional[ConversationState] = None, language: str = "nl" ) -> TemplateMatchResult: """Process question through complete pipeline. Args: question: User's question (may be elliptical follow-up) conversation_state: Conversation history and state language: Language code Returns: TemplateMatchResult with SPARQL query if successful """ # Step 1: Resolve conversation context FIRST resolved = self.context_resolver( question=question, conversation_state=conversation_state ) logger.info(f"Resolved question: '{question}' → '{resolved.resolved}'") # Step 2: Fyke filter on RESOLVED question fyke_result = self.fyke_filter( resolved_question=resolved.resolved, conversation_topic="heritage institutions", language=language ) if not fyke_result.is_relevant: logger.info(f"Question filtered by Fyke: {fyke_result.reasoning}") return TemplateMatchResult( matched=False, reasoning=f"Out of scope: {fyke_result.reasoning}", sparql=None # Will trigger standard response ) # Step 3: Classify to template match_result = self.template_classifier( question=resolved.resolved, language=language ) if not match_result.matched: logger.info(f"No template match: {match_result.reasoning}") return match_result # Falls back to LLM generation # Step 4: Extract slots template_id = match_result.template_id if template_id is None: return TemplateMatchResult( matched=False, reasoning="No template ID from classifier" ) slots = self.slot_extractor( question=resolved.resolved, template_id=template_id, inherited_slots=resolved.inherited_slots ) logger.info(f"Extracted slots: {slots}") # Step 5: Render SPARQL sparql = self.instantiator.render( template_id=template_id, slots=slots ) if not sparql: logger.warning(f"Failed to render template {match_result.template_id}") return TemplateMatchResult( matched=False, template_id=match_result.template_id, reasoning="Template rendering failed" ) # Update conversation state if provided if conversation_state: conversation_state.add_turn(ConversationTurn( role="user", content=question, resolved_question=resolved.resolved, template_id=match_result.template_id, slots=slots )) return TemplateMatchResult( matched=True, template_id=match_result.template_id, confidence=match_result.confidence, slots=slots, sparql=sparql, reasoning=match_result.reasoning ) # ============================================================================= # FACTORY FUNCTION # ============================================================================= def get_template_pipeline() -> TemplateSPARQLPipeline: """Get or create template SPARQL pipeline instance.""" return TemplateSPARQLPipeline()