glam/backend/rag/template_sparql.py
2025-12-30 23:53:16 +01:00

1706 lines
67 KiB
Python

"""
Template-Based SPARQL Query Generation System
This module implements a template-based approach to SPARQL query generation,
replacing error-prone LLM-generated queries with deterministic, validated templates.
Architecture (CRITICAL ORDERING):
=================================
1. ConversationContextResolver (DSPy) - Resolves elliptical follow-ups FIRST
"En in Enschede?""Welke archieven zijn er in Enschede?"
2. FykeFilter (DSPy) - Filters irrelevant questions on RESOLVED input
⚠️ MUST operate on resolved question, not raw input!
3. TemplateClassifier (DSPy) - Matches to SPARQL template
4. SlotExtractor (DSPy) - Extracts slot values with synonym resolution
5. TemplateInstantiator (Jinja2) - Renders final SPARQL query
Based on:
- docs/plan/prompt-query_template_mapping/
- Formica et al. (2023) - Template-based SPARQL achieves 65% precision vs 10% LLM-only
- DSPy 2.6+ GEPA optimization
Author: OpenCode
Created: 2025-01-06
"""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Literal, Optional
import dspy
import numpy as np
from dspy import History
from jinja2 import Environment, BaseLoader
from pydantic import BaseModel, Field
from rapidfuzz import fuzz, process
logger = logging.getLogger(__name__)
# =============================================================================
# CONFIGURATION
# =============================================================================
# Lazy-loaded sentence transformer model
_embedding_model = None
_embedding_model_name = "paraphrase-multilingual-MiniLM-L12-v2" # Multilingual, 384-dim
def _get_embedding_model():
"""Lazy-load the sentence transformer model."""
global _embedding_model
if _embedding_model is None:
try:
from sentence_transformers import SentenceTransformer
logger.info(f"Loading embedding model: {_embedding_model_name}")
_embedding_model = SentenceTransformer(_embedding_model_name)
logger.info("Embedding model loaded successfully")
except ImportError:
logger.warning("sentence-transformers not installed, embedding matching disabled")
return None
except Exception as e:
logger.warning(f"Failed to load embedding model: {e}")
return None
return _embedding_model
def _find_data_path(filename: str) -> Path:
"""Find data file in multiple possible locations.
Supports both local development (backend/rag/ → data/) and
server deployment (e.g., /opt/glam-backend/rag/data/).
"""
# Try relative to module location (local dev: backend/rag → glam/data)
module_dir = Path(__file__).parent
candidates = [
module_dir.parent.parent / "data" / filename, # Local: glam/data/
module_dir / "data" / filename, # Server: rag/data/
Path("/opt/glam-backend/rag/data") / filename, # Server explicit path
]
for candidate in candidates:
if candidate.exists():
return candidate
# Return first candidate (will report as missing in logs)
return candidates[0]
TEMPLATES_PATH = _find_data_path("sparql_templates.yaml")
VALIDATION_RULES_PATH = _find_data_path("validation/sparql_validation_rules.json")
# Standard SPARQL prefixes
SPARQL_PREFIXES = """PREFIX hc: <https://nde.nl/ontology/hc/>
PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
PREFIX schema: <http://schema.org/>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX org: <http://www.w3.org/ns/org#>
PREFIX foaf: <http://xmlns.com/foaf/0.1/>
PREFIX dcterms: <http://purl.org/dc/terms/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX wd: <http://www.wikidata.org/entity/>"""
# =============================================================================
# PYDANTIC MODELS
# =============================================================================
class SlotType(str, Enum):
"""Types of template slots."""
INSTITUTION_TYPE = "institution_type"
SUBREGION = "subregion"
COUNTRY = "country"
CITY = "city"
INSTITUTION_NAME = "institution_name"
BUDGET_CATEGORY = "budget_category"
STRING = "string"
INTEGER = "integer"
DECIMAL = "decimal"
class SlotDefinition(BaseModel):
"""Definition of a template slot."""
type: SlotType
required: bool = True
default: Optional[str] = None
examples: list[str] = Field(default_factory=list)
fallback_types: list[SlotType] = Field(default_factory=list)
valid_values: list[str] = Field(default_factory=list)
class TemplateDefinition(BaseModel):
"""Definition of a SPARQL query template."""
id: str
description: str
intent: list[str]
question_patterns: list[str]
slots: dict[str, SlotDefinition]
sparql_template: str
sparql_template_alt: Optional[str] = None
sparql_template_region: Optional[str] = None
sparql_template_country: Optional[str] = None
sparql_template_isil: Optional[str] = None
sparql_template_ghcid: Optional[str] = None
examples: list[dict[str, Any]] = Field(default_factory=list)
class FollowUpPattern(BaseModel):
"""Definition of a follow-up question pattern."""
description: str
patterns: list[str]
slot_inheritance: list[str] = Field(default_factory=list)
transforms_to: Optional[str] = None
resolution_strategy: str
requires_previous_results: bool = False
class FykeFilterConfig(BaseModel):
"""Configuration for the Fyke filter."""
out_of_scope_keywords: list[str]
out_of_scope_categories: list[str]
heritage_keywords: list[str]
standard_response: dict[str, str]
class ConversationTurn(BaseModel):
"""A single turn in conversation history."""
role: Literal["user", "assistant"]
content: str
resolved_question: Optional[str] = None
template_id: Optional[str] = None
slots: dict[str, str] = Field(default_factory=dict)
results: list[dict[str, Any]] = Field(default_factory=list)
class ConversationState(BaseModel):
"""State tracking across conversation turns."""
turns: list[ConversationTurn] = Field(default_factory=list)
current_slots: dict[str, str] = Field(default_factory=dict)
current_template_id: Optional[str] = None
language: str = "nl"
def add_turn(self, turn: ConversationTurn) -> None:
"""Add a turn and update current state."""
self.turns.append(turn)
if turn.role == "user" and turn.slots:
# Inherit slots from user turns
self.current_slots.update(turn.slots)
if turn.template_id:
self.current_template_id = turn.template_id
def get_previous_user_turn(self) -> Optional[ConversationTurn]:
"""Get the most recent user turn."""
for turn in reversed(self.turns):
if turn.role == "user":
return turn
return None
def to_dspy_history(self) -> History:
"""Convert to DSPy History object."""
messages = []
for turn in self.turns[-6:]: # Keep last 6 turns for context
messages.append({
"role": turn.role,
"content": turn.resolved_question or turn.content
})
return History(messages=messages)
class TemplateMatchResult(BaseModel):
"""Result of template matching."""
matched: bool
template_id: Optional[str] = None
confidence: float = 0.0
slots: dict[str, str] = Field(default_factory=dict)
sparql: Optional[str] = None
reasoning: str = ""
class ResolvedQuestion(BaseModel):
"""Result of conversation context resolution."""
original: str
resolved: str
is_follow_up: bool = False
follow_up_type: Optional[str] = None
inherited_slots: dict[str, str] = Field(default_factory=dict)
confidence: float = 1.0
class FykeResult(BaseModel):
"""Result of Fyke filter."""
is_relevant: bool
confidence: float
reasoning: str
standard_response: Optional[str] = None
# =============================================================================
# SYNONYM MAPPINGS (loaded from validation rules)
# =============================================================================
class SynonymResolver:
"""Resolves natural language terms to canonical slot values."""
def __init__(self):
self._institution_types: dict[str, str] = {}
self._subregions: dict[str, str] = {}
self._countries: dict[str, str] = {}
self._cities: set[str] = set()
self._budget_categories: dict[str, str] = {}
self._loaded = False
def load(self) -> None:
"""Load synonym mappings from validation rules and templates."""
if self._loaded:
return
# Load from validation rules
if VALIDATION_RULES_PATH.exists():
try:
with open(VALIDATION_RULES_PATH) as f:
rules = json.load(f)
# Institution type mappings
if "institution_type_mappings" in rules:
for k, v in rules["institution_type_mappings"].items():
self._institution_types[k.lower()] = v
# Subregion mappings
if "subregion_mappings" in rules:
for k, v in rules["subregion_mappings"].items():
self._subregions[k.lower()] = v
# Country mappings
if "country_mappings" in rules:
for k, v in rules["country_mappings"].items():
self._countries[k.lower()] = v
except Exception as e:
logger.warning(f"Failed to load validation rules: {e}")
# Load additional synonyms from templates YAML
if TEMPLATES_PATH.exists():
try:
import yaml
with open(TEMPLATES_PATH) as f:
templates = yaml.safe_load(f)
slot_types = templates.get("_slot_types", {})
# Institution type synonyms
inst_synonyms = slot_types.get("institution_type", {}).get("synonyms", {})
for k, v in inst_synonyms.items():
self._institution_types[k.lower().replace("_", " ")] = v
# Subregion synonyms
region_synonyms = slot_types.get("subregion", {}).get("synonyms", {})
for k, v in region_synonyms.items():
self._subregions[k.lower().replace("_", " ")] = v
# Country synonyms
country_synonyms = slot_types.get("country", {}).get("synonyms", {})
for k, v in country_synonyms.items():
self._countries[k.lower().replace("_", " ")] = v
# Budget category synonyms
budget_synonyms = slot_types.get("budget_category", {}).get("synonyms", {})
for k, v in budget_synonyms.items():
self._budget_categories[k.lower().replace("_", " ")] = v
except Exception as e:
logger.warning(f"Failed to load template synonyms: {e}")
# Add common Dutch institution type synonyms
dutch_types = {
"museum": "M", "musea": "M", "museums": "M",
"bibliotheek": "L", "bibliotheken": "L", "library": "L", "libraries": "L",
"archief": "A", "archieven": "A", "archive": "A", "archives": "A",
"galerie": "G", "galerij": "G", "galerijen": "G", "gallery": "G", "galleries": "G",
}
for k, v in dutch_types.items():
if k not in self._institution_types:
self._institution_types[k] = v
self._loaded = True
logger.info(f"Loaded {len(self._institution_types)} institution types, "
f"{len(self._subregions)} subregions, {len(self._countries)} countries")
def resolve_institution_type(self, term: str) -> Optional[str]:
"""Resolve institution type term to single-letter code."""
self.load()
term_lower = term.lower().strip()
# Direct match
if term_lower in self._institution_types:
return self._institution_types[term_lower]
# Already a valid code
if term.upper() in "MLAGORCUBESFIXPHDNT":
return term.upper()
# Fuzzy match
if self._institution_types:
match = process.extractOne(
term_lower,
list(self._institution_types.keys()),
scorer=fuzz.ratio,
score_cutoff=80
)
if match:
return self._institution_types[match[0]]
return None
def resolve_subregion(self, term: str) -> Optional[str]:
"""Resolve subregion term to ISO 3166-2 code."""
self.load()
term_lower = term.lower().strip()
# Direct match
if term_lower in self._subregions:
return self._subregions[term_lower]
# Already a valid code (e.g., NL-NH)
if re.match(r'^[A-Z]{2}-[A-Z]{2,3}$', term.upper()):
return term.upper()
# Fuzzy match
if self._subregions:
match = process.extractOne(
term_lower,
list(self._subregions.keys()),
scorer=fuzz.ratio,
score_cutoff=75
)
if match:
return self._subregions[match[0]]
return None
def resolve_country(self, term: str) -> Optional[str]:
"""Resolve country term to Wikidata Q-number."""
self.load()
term_lower = term.lower().strip()
# Direct match
if term_lower in self._countries:
return self._countries[term_lower]
# Already a Q-number
if re.match(r'^Q\d+$', term):
return term
# Fuzzy match
if self._countries:
match = process.extractOne(
term_lower,
list(self._countries.keys()),
scorer=fuzz.ratio,
score_cutoff=80
)
if match:
return self._countries[match[0]]
return None
def resolve_city(self, term: str) -> str:
"""Normalize city name (title case, common corrections)."""
# Common Dutch city name corrections
corrections = {
"den haag": "Den Haag",
"the hague": "Den Haag",
"'s-gravenhage": "Den Haag",
"s-gravenhage": "Den Haag",
"'s-hertogenbosch": "'s-Hertogenbosch",
"den bosch": "'s-Hertogenbosch",
}
term_lower = term.lower().strip()
if term_lower in corrections:
return corrections[term_lower]
# Title case with Dutch article handling
if term_lower.startswith("'s-"):
return "'" + term[1:2] + "-" + term[3:].title()
return term.title()
def resolve_budget_category(self, term: str) -> Optional[str]:
"""Resolve budget category term to canonical slot name.
Args:
term: Budget category term (e.g., "innovatie", "digitalisering", "innovation budget")
Returns:
Canonical budget category slot name (e.g., "innovation", "digitization") or None
"""
self.load()
# Normalize: lowercase and replace underscores with spaces (consistent with synonym loading)
term_normalized = term.lower().strip().replace("_", " ")
# Direct match from synonyms
if term_normalized in self._budget_categories:
return self._budget_categories[term_normalized]
# Already a valid category
valid_categories = [
"innovation", "digitization", "preservation", "personnel",
"acquisition", "operating", "capital", "external_funding",
"internal_funding", "endowment_draw"
]
if term_normalized in valid_categories:
return term_normalized
# Fuzzy match against loaded synonyms
if self._budget_categories:
match = process.extractOne(
term_normalized,
list(self._budget_categories.keys()),
scorer=fuzz.ratio,
score_cutoff=75
)
if match:
return self._budget_categories[match[0]]
return None
def is_region(self, term: str) -> bool:
"""Check if a term is a known region/province name.
This is used to disambiguate between city and region patterns
when both would match the same question structure.
Args:
term: Location term to check (e.g., "Noord-Holland", "Amsterdam")
Returns:
True if the term resolves to a known region, False otherwise
"""
self.load()
term_lower = term.lower().strip()
# Check if it's in our subregions mapping
if term_lower in self._subregions:
return True
# Check if it's already a valid ISO code
if re.match(r'^[A-Z]{2}-[A-Z]{2,3}$', term.upper()):
return True
# Fuzzy match with high threshold to avoid false positives
if self._subregions:
match = process.extractOne(
term_lower,
list(self._subregions.keys()),
scorer=fuzz.ratio,
score_cutoff=85 # Higher threshold than resolve_subregion
)
if match:
return True
return False
# Global synonym resolver instance
_synonym_resolver: Optional[SynonymResolver] = None
def get_synonym_resolver() -> SynonymResolver:
"""Get or create the global synonym resolver."""
global _synonym_resolver
if _synonym_resolver is None:
_synonym_resolver = SynonymResolver()
return _synonym_resolver
# =============================================================================
# DSPy SIGNATURES
# =============================================================================
class ConversationContextSignature(dspy.Signature):
"""Resolve elliptical follow-up questions using conversation history.
CRITICAL: This module runs FIRST, before any filtering or classification.
It expands short follow-up questions into complete, self-contained questions.
Examples of resolution:
- Previous: "Welke archieven zijn er in Den Haag?"
Current: "En in Enschede?"
Resolved: "Welke archieven zijn er in Enschede?"
- Previous: "Welke musea zijn er in Amsterdam?"
Current: "Hoeveel?"
Resolved: "Hoeveel musea zijn er in Amsterdam?"
- Previous: Listed 5 museums
Current: "Vertel me meer over de eerste"
Resolved: "Vertel me meer over [first museum name]"
The resolved question must be a complete, standalone question that would
make sense without any conversation history.
"""
question: str = dspy.InputField(
desc="Current user question (may be elliptical follow-up)"
)
history: History = dspy.InputField(
desc="Previous conversation turns",
default=History(messages=[])
)
previous_slots: str = dspy.InputField(
desc="JSON string of slot values from previous query (e.g., {\"institution_type\": \"A\", \"city\": \"Den Haag\"})",
default="{}"
)
previous_results_summary: str = dspy.InputField(
desc="Brief summary of previous query results for ordinal/pronoun resolution",
default=""
)
resolved_question: str = dspy.OutputField(
desc="Fully expanded, self-contained question. If not a follow-up, return original question unchanged."
)
is_follow_up: bool = dspy.OutputField(
desc="True if this was an elliptical follow-up that needed resolution"
)
follow_up_type: str = dspy.OutputField(
desc="Type of follow-up: 'location_swap', 'type_swap', 'count_from_list', 'details_request', 'ordinal_reference', 'pronoun_reference', or 'none'"
)
inherited_slots_json: str = dspy.OutputField(
desc="JSON string of slots inherited from previous query (e.g., {\"institution_type\": \"A\"})"
)
confidence: float = dspy.OutputField(
desc="Confidence in resolution (0.0-1.0)"
)
class FykeFilterSignature(dspy.Signature):
"""Determine if a RESOLVED question is relevant to heritage institutions.
CRITICAL: This filter operates on the RESOLVED question from ConversationContextResolver,
NOT the raw user input. This prevents false positives on short follow-ups like
"En in Enschede?" which resolves to "Welke archieven zijn er in Enschede?"
Heritage institutions include:
- Museums (musea)
- Archives (archieven)
- Libraries (bibliotheken)
- Galleries (galerijen)
- Heritage societies
- Cultural institutions
- Collections
Questions about these topics are RELEVANT.
Questions about shopping, weather, sports, restaurants, etc. are NOT relevant.
When in doubt, err on the side of relevance (return True).
"""
resolved_question: str = dspy.InputField(
desc="The fully resolved question (after context resolution)"
)
conversation_topic: str = dspy.InputField(
desc="Brief summary of what the conversation has been about so far",
default="heritage institutions"
)
is_relevant: bool = dspy.OutputField(
desc="True if question is about heritage institutions, False otherwise"
)
confidence: float = dspy.OutputField(
desc="Confidence in relevance classification (0.0-1.0)"
)
reasoning: str = dspy.OutputField(
desc="Brief explanation of why question is/isn't relevant"
)
class TemplateClassifierSignature(dspy.Signature):
"""Classify a heritage question to match it with a SPARQL template.
Given a resolved question about heritage institutions, determine which
SPARQL query template best matches the user's intent.
Available template IDs (return the EXACT ID string, not the number):
- list_institutions_by_type_city: List institutions of type X in city Y
- list_institutions_by_type_region: List institutions of type X in region Y (province/state)
- list_institutions_by_type_country: List institutions of type X in country Y
- count_institutions_by_type_location: Count institutions of type X in location Y
- count_institutions_by_type: Count all institutions grouped by type
- find_institution_by_name: Find specific institution by name
- list_all_institutions_in_city: List all institutions in city Y
- find_institutions_by_founding_date: Find oldest/newest institutions
- find_institution_by_identifier: Find by ISIL/GHCID
- compare_locations: Compare institutions between locations
- find_custodians_by_budget_threshold: Find custodians with budget category above/below threshold (e.g., "Which custodians spend more than 5000 euros on innovation?")
- none: No template matches (fall back to LLM generation)
CRITICAL DISAMBIGUATION - Province vs City:
Some Dutch locations are BOTH a province AND a city with the same name.
When the location name alone is used (without "stad" or "de stad"),
DEFAULT TO PROVINCE (use list_institutions_by_type_region):
- Groningen → province (NL-GR), NOT the city
- Utrecht → province (NL-UT), NOT the city
- Limburg → province (NL-LI), NOT the city
- Friesland/Frisia → province (NL-FR)
- Drenthe → province (NL-DR)
- Gelderland → province (NL-GE)
- Overijssel → province (NL-OV)
- Flevoland → province (NL-FL)
- Zeeland → province (NL-ZE)
- Noord-Holland → province (NL-NH)
- Zuid-Holland → province (NL-ZH)
- Noord-Brabant → province (NL-NB)
Use list_institutions_by_type_city ONLY when:
- The question explicitly says "de stad" or "in de stad" (the city)
- The location is clearly just a city (e.g., Amsterdam, Rotterdam, Den Haag)
IMPORTANT: Return the template ID string exactly as shown (e.g. "count_institutions_by_type_location"),
NOT a number. Return "none" if no template matches well.
"""
question: str = dspy.InputField(
desc="Resolved natural language question about heritage institutions"
)
language: str = dspy.InputField(
desc="Language code: nl, en, de, fr",
default="nl"
)
template_id: str = dspy.OutputField(
desc="EXACT template ID string from the list above (e.g. 'count_institutions_by_type_location'), or 'none'"
)
confidence: float = dspy.OutputField(
desc="Confidence in template match (0.0-1.0). Return 'none' as template_id if below 0.6"
)
reasoning: str = dspy.OutputField(
desc="Brief explanation of why this template matches"
)
class SlotExtractorSignature(dspy.Signature):
"""Extract slot values from a question for a specific template.
Given a question and the template it matched, extract the values needed
to fill in the template's slots.
Slot types and expected formats:
- institution_type: Return single-letter code (M, L, A, G, O, R, C, U, B, E, S, F, I, X, P, H, D, N, T)
Examples: "musea""M", "archieven""A", "bibliotheken""L"
- city: Return city name with proper capitalization
Examples: "amsterdam""Amsterdam", "den haag""Den Haag"
- region: Return ISO 3166-2 code
Examples: "Noord-Holland""NL-NH", "Gelderland""NL-GE"
- country: Return Wikidata Q-number
Examples: "Nederland""Q55", "Belgium""Q31"
- institution_name: Return the institution name as mentioned
- limit: Return integer (default 10)
Return slots as a JSON object with slot names as keys.
"""
question: str = dspy.InputField(
desc="The user's question"
)
template_id: str = dspy.InputField(
desc="ID of the matched template"
)
required_slots: str = dspy.InputField(
desc="Comma-separated list of required slot names for this template"
)
inherited_slots: str = dspy.InputField(
desc="JSON string of slots inherited from conversation context",
default="{}"
)
slots_json: str = dspy.OutputField(
desc="JSON object with extracted slot values, e.g., {\"institution_type\": \"M\", \"city\": \"Amsterdam\"}"
)
extraction_notes: str = dspy.OutputField(
desc="Notes about any slots that couldn't be extracted or needed inference"
)
# =============================================================================
# DSPy MODULES
# =============================================================================
class ConversationContextResolver(dspy.Module):
"""Resolves elliptical follow-up questions using conversation history.
CRITICAL: This module MUST run FIRST in the pipeline, before FykeFilter.
It expands short follow-ups like "En in Enschede?" into complete questions
like "Welke archieven zijn er in Enschede?" so that subsequent modules
can properly understand the user's intent.
"""
def __init__(self):
super().__init__()
self.resolve = dspy.ChainOfThought(ConversationContextSignature)
def forward(
self,
question: str,
conversation_state: Optional[ConversationState] = None,
) -> ResolvedQuestion:
"""Resolve a potentially elliptical question.
Args:
question: Current user question (may be elliptical)
conversation_state: Full conversation state with history
Returns:
ResolvedQuestion with expanded question and metadata
"""
# If no conversation history, return as-is
if conversation_state is None or not conversation_state.turns:
return ResolvedQuestion(
original=question,
resolved=question,
is_follow_up=False,
confidence=1.0
)
# Prepare inputs for DSPy
history = conversation_state.to_dspy_history()
previous_slots = json.dumps(conversation_state.current_slots)
# Get previous results summary if available
prev_turn = conversation_state.get_previous_user_turn()
results_summary = ""
if prev_turn and prev_turn.results:
results_summary = ", ".join(
r.get("name", str(r))[:50] for r in prev_turn.results[:5]
)
try:
result = self.resolve(
question=question,
history=history,
previous_slots=previous_slots,
previous_results_summary=results_summary
)
# Parse inherited slots
try:
inherited = json.loads(result.inherited_slots_json)
except (json.JSONDecodeError, TypeError):
inherited = {}
return ResolvedQuestion(
original=question,
resolved=result.resolved_question,
is_follow_up=result.is_follow_up,
follow_up_type=result.follow_up_type if result.follow_up_type != "none" else None,
inherited_slots=inherited,
confidence=result.confidence
)
except Exception as e:
logger.warning(f"Context resolution failed: {e}, returning original")
return ResolvedQuestion(
original=question,
resolved=question,
is_follow_up=False,
confidence=0.5
)
class FykeFilter(dspy.Module):
"""Filters out irrelevant questions with standard response.
CRITICAL: Must operate on RESOLVED question from ConversationContextResolver,
not the raw user input. This prevents false positives on follow-ups.
Named after Dutch "fuik" (fish trap) - catches irrelevant questions.
"""
def __init__(self, config: Optional[FykeFilterConfig] = None):
super().__init__()
self.classify = dspy.ChainOfThought(FykeFilterSignature)
self.config = config or self._load_config()
def _load_config(self) -> FykeFilterConfig:
"""Load Fyke configuration from templates YAML."""
default_config = FykeFilterConfig(
out_of_scope_keywords=[
"tandpasta", "toothpaste", "supermarkt", "restaurant",
"hotel", "weer", "weather", "voetbal", "soccer"
],
out_of_scope_categories=["shopping", "sports", "cooking"],
heritage_keywords=[
"museum", "musea", "archief", "archieven", "bibliotheek",
"galerie", "erfgoed", "heritage", "collectie", "collection"
],
standard_response={
"nl": "Ik kan je helpen met vragen over erfgoedinstellingen zoals musea, archieven, bibliotheken en galerijen.",
"en": "I can help you with questions about heritage institutions such as museums, archives, libraries and galleries.",
"de": "Ich kann Ihnen bei Fragen zu Kulturerbeinstitutionen wie Museen, Archiven und Bibliotheken helfen.",
"fr": "Je peux vous aider avec des questions sur les institutions patrimoniales."
}
)
if TEMPLATES_PATH.exists():
try:
import yaml
with open(TEMPLATES_PATH) as f:
templates = yaml.safe_load(f)
fyke_config = templates.get("fyke_filter", {})
return FykeFilterConfig(
out_of_scope_keywords=fyke_config.get("out_of_scope_keywords", default_config.out_of_scope_keywords),
out_of_scope_categories=fyke_config.get("out_of_scope_categories", default_config.out_of_scope_categories),
heritage_keywords=fyke_config.get("heritage_keywords", default_config.heritage_keywords),
standard_response=fyke_config.get("standard_response", default_config.standard_response)
)
except Exception as e:
logger.warning(f"Failed to load Fyke config: {e}")
return default_config
def forward(
self,
resolved_question: str,
conversation_topic: str = "heritage institutions",
language: str = "nl"
) -> FykeResult:
"""Check if resolved question is relevant to heritage.
Args:
resolved_question: The RESOLVED question (not raw input!)
conversation_topic: Summary of conversation so far
language: Language code for standard response
Returns:
FykeResult with relevance decision
"""
question_lower = resolved_question.lower()
# Quick check: obvious heritage keywords → definitely relevant
for keyword in self.config.heritage_keywords:
if keyword in question_lower:
return FykeResult(
is_relevant=True,
confidence=0.95,
reasoning=f"Contains heritage keyword: {keyword}"
)
# Quick check: obvious out-of-scope keywords → definitely irrelevant
for keyword in self.config.out_of_scope_keywords:
if keyword in question_lower:
return FykeResult(
is_relevant=False,
confidence=0.95,
reasoning=f"Contains out-of-scope keyword: {keyword}",
standard_response=self.config.standard_response.get(
language, self.config.standard_response.get("en")
)
)
# Use DSPy for ambiguous cases
try:
result = self.classify(
resolved_question=resolved_question,
conversation_topic=conversation_topic
)
return FykeResult(
is_relevant=result.is_relevant,
confidence=result.confidence,
reasoning=result.reasoning,
standard_response=None if result.is_relevant else self.config.standard_response.get(
language, self.config.standard_response.get("en")
)
)
except Exception as e:
logger.warning(f"Fyke classification failed: {e}, assuming relevant")
# Err on side of relevance
return FykeResult(
is_relevant=True,
confidence=0.5,
reasoning=f"Classification failed, assuming relevant: {e}"
)
# =============================================================================
# EMBEDDING-BASED TEMPLATE MATCHING
# =============================================================================
class TemplateEmbeddingMatcher:
"""Matches questions to templates using semantic embeddings.
Uses sentence-transformers to compute embeddings for template patterns
and find the best match for incoming questions based on cosine similarity.
This provides semantic matching that can handle:
- Paraphrases ("Welke musea..." vs "Zijn er musea die...")
- Synonyms ("instellingen" vs "organisaties")
- Different word orders
- Multilingual queries (Dutch, English, German)
"""
_instance = None
_pattern_embeddings: Optional[np.ndarray] = None
_pattern_template_ids: Optional[list[str]] = None
_pattern_texts: Optional[list[str]] = None
def __new__(cls):
"""Singleton pattern - embeddings are expensive to compute."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def _ensure_embeddings_computed(self, templates: dict[str, "TemplateDefinition"]) -> bool:
"""Compute and cache embeddings for all template patterns.
Returns:
True if embeddings are available, False otherwise
"""
if self._pattern_embeddings is not None:
return True
model = _get_embedding_model()
if model is None:
return False
# Collect all patterns with their template IDs
pattern_texts = []
template_ids = []
for template_id, template_def in templates.items():
for pattern in template_def.question_patterns:
# Normalize pattern: replace {slot} with generic placeholder
normalized = re.sub(r'\{[^}]+\}', '[VALUE]', pattern)
pattern_texts.append(normalized)
template_ids.append(template_id)
if not pattern_texts:
logger.warning("No patterns found for embedding computation")
return False
# Compute embeddings for all patterns
logger.info(f"Computing embeddings for {len(pattern_texts)} template patterns...")
try:
embeddings = model.encode(pattern_texts, convert_to_numpy=True, show_progress_bar=False)
self._pattern_embeddings = embeddings
self._pattern_template_ids = template_ids
self._pattern_texts = pattern_texts
logger.info(f"Computed {len(embeddings)} pattern embeddings (dim={embeddings.shape[1]})")
return True
except Exception as e:
logger.warning(f"Failed to compute pattern embeddings: {e}")
return False
def match(
self,
question: str,
templates: dict[str, "TemplateDefinition"],
min_similarity: float = 0.70
) -> Optional["TemplateMatchResult"]:
"""Find best matching template using embedding similarity.
Args:
question: Natural language question
templates: Dictionary of template definitions
min_similarity: Minimum cosine similarity threshold (0-1)
Returns:
TemplateMatchResult if similarity >= threshold, None otherwise
"""
if not self._ensure_embeddings_computed(templates):
return None
model = _get_embedding_model()
if model is None:
return None
# Normalize question: replace numbers with placeholder
normalized_question = re.sub(r'\d+(?:[.,]\d+)?', '[VALUE]', question)
# Compute question embedding
try:
question_embedding = model.encode([normalized_question], convert_to_numpy=True)[0]
except Exception as e:
logger.warning(f"Failed to compute question embedding: {e}")
return None
# Guard against None embeddings (should not happen after _ensure_embeddings_computed)
if self._pattern_embeddings is None or self._pattern_template_ids is None or self._pattern_texts is None:
return None
# Compute cosine similarities
# Normalize vectors for cosine similarity
question_norm = question_embedding / np.linalg.norm(question_embedding)
pattern_norms = self._pattern_embeddings / np.linalg.norm(self._pattern_embeddings, axis=1, keepdims=True)
similarities = np.dot(pattern_norms, question_norm)
# Find best match
best_idx = int(np.argmax(similarities))
best_similarity = float(similarities[best_idx])
if best_similarity < min_similarity:
logger.debug(f"Best embedding similarity {best_similarity:.3f} below threshold {min_similarity}")
return None
best_template_id = self._pattern_template_ids[best_idx]
best_pattern = self._pattern_texts[best_idx]
# Scale similarity to confidence (0.70 → 0.70, 0.85 → 0.85, etc.)
confidence = best_similarity
logger.info(f"Embedding match found: template='{best_template_id}', similarity={best_similarity:.3f}, pattern='{best_pattern}'")
return TemplateMatchResult(
matched=True,
template_id=best_template_id,
confidence=confidence,
reasoning=f"Embedding similarity: {best_similarity:.3f} with pattern '{best_pattern}'"
)
# Singleton instance
_template_embedding_matcher: Optional[TemplateEmbeddingMatcher] = None
def get_template_embedding_matcher() -> TemplateEmbeddingMatcher:
"""Get or create the singleton embedding matcher."""
global _template_embedding_matcher
if _template_embedding_matcher is None:
_template_embedding_matcher = TemplateEmbeddingMatcher()
return _template_embedding_matcher
class TemplateClassifier(dspy.Module):
"""Classifies questions to match SPARQL templates."""
def __init__(self):
super().__init__()
self.classify = dspy.ChainOfThought(TemplateClassifierSignature)
self._templates: Optional[dict[str, TemplateDefinition]] = None
def _load_templates(self) -> dict[str, TemplateDefinition]:
"""Load template definitions from YAML."""
if self._templates is not None:
return self._templates
self._templates = {}
if TEMPLATES_PATH.exists():
try:
import yaml
with open(TEMPLATES_PATH) as f:
data = yaml.safe_load(f)
templates = data.get("templates", {})
for template_id, template_data in templates.items():
try:
# Convert slots
slots = {}
for slot_name, slot_data in template_data.get("slots", {}).items():
if isinstance(slot_data, dict):
slot_type = slot_data.get("type", "string")
slots[slot_name] = SlotDefinition(
type=SlotType(slot_type) if slot_type in [e.value for e in SlotType] else SlotType.STRING,
required=slot_data.get("required", True),
default=slot_data.get("default"),
examples=slot_data.get("examples", []),
fallback_types=[SlotType(t) for t in slot_data.get("fallback_types", []) if t in [e.value for e in SlotType]],
valid_values=slot_data.get("valid_values", [])
)
self._templates[template_id] = TemplateDefinition(
id=template_id,
description=template_data.get("description", ""),
intent=template_data.get("intent", []),
question_patterns=template_data.get("question_patterns", []),
slots=slots,
sparql_template=template_data.get("sparql_template", ""),
sparql_template_alt=template_data.get("sparql_template_alt"),
sparql_template_region=template_data.get("sparql_template_region"),
sparql_template_country=template_data.get("sparql_template_country"),
sparql_template_isil=template_data.get("sparql_template_isil"),
sparql_template_ghcid=template_data.get("sparql_template_ghcid"),
examples=template_data.get("examples", [])
)
except Exception as e:
logger.warning(f"Failed to parse template {template_id}: {e}")
except Exception as e:
logger.error(f"Failed to load templates: {e}")
return self._templates
def _pattern_to_regex(self, pattern: str) -> tuple[re.Pattern, list[str]]:
"""Convert a template pattern to a regex for matching.
Converts patterns like:
"Welke instellingen geven meer dan {amount} uit aan {budget_category}?"
To regex like:
"Welke instellingen geven meer dan (.+?) uit aan (.+?)\\?"
Args:
pattern: Template pattern with {slot_name} placeholders
Returns:
Tuple of (compiled regex pattern, list of slot names in order)
"""
# Extract slot names in order
slot_names = re.findall(r'\{([^}]+)\}', pattern)
# Escape regex special characters (except { and })
escaped = re.escape(pattern)
# Replace escaped braces with capture groups
# \{...\} becomes (.+?) for non-greedy capture
regex_str = re.sub(r'\\{[^}]+\\}', r'(.+?)', escaped)
# Compile with case-insensitive matching
return re.compile(regex_str, re.IGNORECASE), slot_names
def _validate_slot_value(self, slot_name: str, value: str, template_id: str) -> bool:
"""Validate a captured slot value against its expected type.
This is used to disambiguate between templates that have identical patterns
but different slot types (e.g., city vs region).
Args:
slot_name: Name of the slot (e.g., "city", "region")
value: Captured value to validate
template_id: Template ID for context
Returns:
True if the value is valid for the slot type, False otherwise
"""
resolver = get_synonym_resolver()
# Slot-specific validation
if slot_name in ("region", "subregion"):
# For region slots, check if the value is a known region
return resolver.is_region(value)
elif slot_name == "city":
# For city slots, check if the value is NOT a region (inverse logic)
# This helps disambiguate "Noord-Holland" (region) from "Amsterdam" (city)
return not resolver.is_region(value)
# Default: accept any value
return True
def _match_by_patterns(
self,
question: str,
templates: dict[str, TemplateDefinition]
) -> Optional[TemplateMatchResult]:
"""Try to match question against template patterns using regex.
This provides a fast, deterministic fallback before using LLM classification.
Patterns are defined in the YAML template's `question_patterns` field.
When multiple patterns match, prefers:
1. Patterns with more literal text (more specific)
2. Patterns with higher fuzzy similarity
Args:
question: The natural language question
templates: Dictionary of template definitions
Returns:
TemplateMatchResult if high-confidence match found, None otherwise
"""
all_matches: list[tuple[str, float, str, int, bool]] = [] # (template_id, confidence, pattern, literal_chars, is_exact)
# Normalize question for matching
question_normalized = question.strip()
for template_id, template_def in templates.items():
patterns = template_def.question_patterns
if not patterns:
continue
for pattern in patterns:
try:
regex, slot_names = self._pattern_to_regex(pattern)
match = regex.fullmatch(question_normalized)
# Calculate literal characters (non-slot text) in pattern
literal_text = re.sub(r'\{[^}]+\}', '', pattern)
literal_chars = len(literal_text.strip())
if match:
# Validate captured slot values
captured_values = match.groups()
slots_valid = True
for slot_name, value in zip(slot_names, captured_values):
if not self._validate_slot_value(slot_name, value, template_id):
logger.debug(f"Slot validation failed: {slot_name}='{value}' for template {template_id}")
slots_valid = False
break
if not slots_valid:
# Skip this match - slot value doesn't match expected type
continue
# Full match = high confidence, but scaled by specificity
# More literal chars = more specific = higher confidence
base_confidence = 0.95
# Boost confidence slightly for more specific patterns
specificity_bonus = min(0.04, literal_chars / 200.0)
confidence = base_confidence + specificity_bonus
logger.debug(f"Pattern exact match: '{pattern}' -> {template_id} (literal_chars={literal_chars})")
all_matches.append((template_id, confidence, pattern, literal_chars, True)) # is_exact=True
continue
# Try partial/fuzzy matching with lower confidence
# Use rapidfuzz to compare pattern structure (with slots replaced)
pattern_normalized = re.sub(r'\{[^}]+\}', '___', pattern.lower())
question_lower = question_normalized.lower()
# Replace common numeric patterns with placeholder
question_for_compare = re.sub(r'\d+(?:[.,]\d+)?', '___', question_lower)
# Calculate similarity
similarity = fuzz.ratio(pattern_normalized, question_for_compare) / 100.0
if similarity >= 0.75:
# Good fuzzy match
confidence = 0.70 + (similarity - 0.75) * 0.8 # Scale 0.75-1.0 to 0.70-0.90
logger.debug(f"Pattern fuzzy match: '{pattern}' -> {template_id} (sim={similarity:.2f}, conf={confidence:.2f})")
all_matches.append((template_id, confidence, pattern, literal_chars, False)) # is_exact=False
except Exception as e:
logger.warning(f"Pattern matching error for '{pattern}': {e}")
continue
# Debug: Print match count
logger.debug(f"Pattern matching found {len(all_matches)} matches for '{question[:50]}...'")
if not all_matches:
return None
# Sort by: 1) is_exact descending (exact matches first), 2) literal_chars descending, 3) confidence descending
all_matches.sort(key=lambda x: (x[4], x[3], x[1]), reverse=True)
# Debug: show best match
best_match = all_matches[0]
logger.debug(f"Best match after sort: {best_match}")
template_id, confidence, matched_pattern, literal_chars, is_exact = best_match
logger.debug(f"Extracted: template_id={template_id}, confidence={confidence}, literal_chars={literal_chars}, is_exact={is_exact}")
if confidence >= 0.75:
logger.info(f"Pattern-based match found: template='{template_id}', confidence={confidence:.2f}, pattern='{matched_pattern}' (literal_chars={literal_chars})")
return TemplateMatchResult(
matched=True,
template_id=template_id,
confidence=confidence,
reasoning=f"Pattern match: '{matched_pattern}'"
)
return None
def forward(self, question: str, language: str = "nl") -> TemplateMatchResult:
"""Classify question to find matching template.
Args:
question: Resolved natural language question
language: Language code
Returns:
TemplateMatchResult with template ID and confidence
"""
templates = self._load_templates()
if not templates:
return TemplateMatchResult(
matched=False,
reasoning="No templates loaded"
)
# TIER 1: Pattern-based matching (fast, deterministic, exact regex)
pattern_match = self._match_by_patterns(question, templates)
if pattern_match and pattern_match.confidence >= 0.75:
logger.info(f"Using pattern-based match: {pattern_match.template_id} (confidence={pattern_match.confidence:.2f})")
return pattern_match
# TIER 2: Embedding-based matching (semantic similarity, handles paraphrases)
embedding_matcher = get_template_embedding_matcher()
embedding_match = embedding_matcher.match(question, templates, min_similarity=0.70)
if embedding_match and embedding_match.confidence >= 0.70:
logger.info(f"Using embedding-based match: {embedding_match.template_id} (confidence={embedding_match.confidence:.2f})")
return embedding_match
# TIER 3: LLM classification (fallback for complex/novel queries)
try:
result = self.classify(
question=question,
language=language
)
template_id = result.template_id
confidence = result.confidence
# Debug logging to see what LLM returned
logger.info(f"Template classifier returned: template_id='{template_id}', confidence={confidence}, reasoning='{result.reasoning[:100]}...'")
logger.debug(f"Available templates: {list(templates.keys())}")
# Handle numeric IDs (LLM sometimes returns "4" instead of "count_institutions_by_type_location")
numeric_to_template = {
"1": "list_institutions_by_type_city",
"2": "list_institutions_by_type_region",
"3": "list_institutions_by_type_country",
"4": "count_institutions_by_type_location",
"5": "count_institutions_by_type",
"6": "find_institution_by_name",
"7": "list_all_institutions_in_city",
"8": "find_institutions_by_founding_date",
"9": "find_institution_by_identifier",
"10": "compare_locations",
"11": "find_custodians_by_budget_threshold",
}
if template_id in numeric_to_template:
logger.info(f"Converting numeric template_id '{template_id}' to '{numeric_to_template[template_id]}'")
template_id = numeric_to_template[template_id]
# Validate template exists
if template_id != "none" and template_id not in templates:
# Try fuzzy match on template IDs
match = process.extractOne(
template_id,
list(templates.keys()),
scorer=fuzz.ratio,
score_cutoff=70
)
if match:
template_id = match[0]
else:
template_id = "none"
confidence = 0.0
if template_id == "none" or confidence < 0.6:
return TemplateMatchResult(
matched=False,
template_id=None,
confidence=confidence,
reasoning=result.reasoning
)
return TemplateMatchResult(
matched=True,
template_id=template_id,
confidence=confidence,
reasoning=result.reasoning
)
except Exception as e:
logger.warning(f"Template classification failed: {e}")
return TemplateMatchResult(
matched=False,
reasoning=f"Classification error: {e}"
)
class SlotExtractor(dspy.Module):
"""Extracts slot values from questions with synonym resolution."""
def __init__(self):
super().__init__()
self.extract = dspy.ChainOfThought(SlotExtractorSignature)
self.resolver = get_synonym_resolver()
self._templates: Optional[dict[str, TemplateDefinition]] = None
def _get_template(self, template_id: str) -> Optional[TemplateDefinition]:
"""Get template definition by ID."""
if self._templates is None:
classifier = TemplateClassifier()
self._templates = classifier._load_templates()
return self._templates.get(template_id)
def forward(
self,
question: str,
template_id: str,
inherited_slots: Optional[dict[str, str]] = None
) -> dict[str, str]:
"""Extract slot values from question.
Args:
question: User's question
template_id: ID of matched template
inherited_slots: Slots inherited from conversation context
Returns:
Dictionary of slot names to resolved values
"""
template = self._get_template(template_id)
if not template:
return inherited_slots or {}
# Get required slots
required_slots = [
name for name, slot in template.slots.items()
if slot.required
]
try:
result = self.extract(
question=question,
template_id=template_id,
required_slots=", ".join(required_slots),
inherited_slots=json.dumps(inherited_slots or {})
)
# Parse extracted slots
try:
raw_slots = json.loads(result.slots_json)
except (json.JSONDecodeError, TypeError):
raw_slots = {}
# Merge with inherited slots (extracted takes precedence)
slots = {**(inherited_slots or {}), **raw_slots}
# Resolve synonyms for each slot
resolved_slots = {}
for name, value in slots.items():
if not value:
continue
slot_def = template.slots.get(name)
if not slot_def:
resolved_slots[name] = value
continue
# Resolve based on slot type
if slot_def.type == SlotType.INSTITUTION_TYPE:
resolved = self.resolver.resolve_institution_type(value)
resolved_slots[name] = resolved or value
elif slot_def.type == SlotType.SUBREGION:
resolved = self.resolver.resolve_subregion(value)
resolved_slots[name] = resolved or value
elif slot_def.type == SlotType.COUNTRY:
resolved = self.resolver.resolve_country(value)
resolved_slots[name] = resolved or value
elif slot_def.type == SlotType.CITY:
resolved_slots[name] = self.resolver.resolve_city(value)
elif slot_def.type == SlotType.BUDGET_CATEGORY:
resolved = self.resolver.resolve_budget_category(value)
resolved_slots[name] = resolved or value
else:
resolved_slots[name] = value
return resolved_slots
except Exception as e:
logger.warning(f"Slot extraction failed: {e}")
return inherited_slots or {}
class TemplateInstantiator:
"""Renders SPARQL queries from templates using Jinja2."""
def __init__(self):
self.env = Environment(loader=BaseLoader())
self._templates: Optional[dict[str, TemplateDefinition]] = None
def _get_template(self, template_id: str) -> Optional[TemplateDefinition]:
"""Get template definition by ID."""
if self._templates is None:
classifier = TemplateClassifier()
self._templates = classifier._load_templates()
return self._templates.get(template_id)
def render(
self,
template_id: str,
slots: dict[str, str],
variant: Optional[str] = None
) -> Optional[str]:
"""Render SPARQL query from template and slots.
Args:
template_id: Template to use
slots: Resolved slot values
variant: Optional variant (e.g., 'region', 'country', 'isil')
Returns:
Rendered SPARQL query or None if rendering fails
"""
template_def = self._get_template(template_id)
if not template_def:
logger.warning(f"Template not found: {template_id}")
return None
# Select template variant
if variant == "region" and template_def.sparql_template_region:
sparql_template = template_def.sparql_template_region
elif variant == "country" and template_def.sparql_template_country:
sparql_template = template_def.sparql_template_country
elif variant == "isil" and template_def.sparql_template_isil:
sparql_template = template_def.sparql_template_isil
elif variant == "ghcid" and template_def.sparql_template_ghcid:
sparql_template = template_def.sparql_template_ghcid
elif variant == "alt" and template_def.sparql_template_alt:
sparql_template = template_def.sparql_template_alt
else:
sparql_template = template_def.sparql_template
if not sparql_template:
logger.warning(f"No SPARQL template for {template_id} variant {variant}")
return None
try:
# Add prefixes to context
context = {
"prefixes": SPARQL_PREFIXES,
"limit": slots.get("limit", 10),
**slots
}
# Render template
jinja_template = self.env.from_string(sparql_template)
sparql = jinja_template.render(**context)
# Clean up whitespace
sparql = re.sub(r'\n\s*\n', '\n', sparql.strip())
return sparql
except Exception as e:
logger.error(f"Template rendering failed: {e}")
return None
# =============================================================================
# MAIN PIPELINE
# =============================================================================
class TemplateSPARQLPipeline(dspy.Module):
"""Complete template-based SPARQL generation pipeline.
Pipeline order (CRITICAL):
1. ConversationContextResolver - Expand follow-ups FIRST
2. FykeFilter - Filter irrelevant on RESOLVED question
3. TemplateClassifier - Match to template
4. SlotExtractor - Extract and resolve slots
5. TemplateInstantiator - Render SPARQL
Falls back to LLM generation if no template matches.
"""
def __init__(self):
super().__init__()
self.context_resolver = ConversationContextResolver()
self.fyke_filter = FykeFilter()
self.template_classifier = TemplateClassifier()
self.slot_extractor = SlotExtractor()
self.instantiator = TemplateInstantiator()
def forward(
self,
question: str,
conversation_state: Optional[ConversationState] = None,
language: str = "nl"
) -> TemplateMatchResult:
"""Process question through complete pipeline.
Args:
question: User's question (may be elliptical follow-up)
conversation_state: Conversation history and state
language: Language code
Returns:
TemplateMatchResult with SPARQL query if successful
"""
# Step 1: Resolve conversation context FIRST
resolved = self.context_resolver.forward(
question=question,
conversation_state=conversation_state
)
logger.info(f"Resolved question: '{question}''{resolved.resolved}'")
# Step 2: Fyke filter on RESOLVED question
fyke_result = self.fyke_filter.forward(
resolved_question=resolved.resolved,
conversation_topic="heritage institutions",
language=language
)
if not fyke_result.is_relevant:
logger.info(f"Question filtered by Fyke: {fyke_result.reasoning}")
return TemplateMatchResult(
matched=False,
reasoning=f"Out of scope: {fyke_result.reasoning}",
sparql=None # Will trigger standard response
)
# Step 3: Classify to template
match_result = self.template_classifier.forward(
question=resolved.resolved,
language=language
)
if not match_result.matched:
logger.info(f"No template match: {match_result.reasoning}")
return match_result # Falls back to LLM generation
# Step 4: Extract slots
template_id = match_result.template_id
if template_id is None:
return TemplateMatchResult(
matched=False,
reasoning="No template ID from classifier"
)
slots = self.slot_extractor.forward(
question=resolved.resolved,
template_id=template_id,
inherited_slots=resolved.inherited_slots
)
logger.info(f"Extracted slots: {slots}")
# Step 5: Render SPARQL
sparql = self.instantiator.render(
template_id=template_id,
slots=slots
)
if not sparql:
logger.warning(f"Failed to render template {match_result.template_id}")
return TemplateMatchResult(
matched=False,
template_id=match_result.template_id,
reasoning="Template rendering failed"
)
# Update conversation state if provided
if conversation_state:
conversation_state.add_turn(ConversationTurn(
role="user",
content=question,
resolved_question=resolved.resolved,
template_id=match_result.template_id,
slots=slots
))
return TemplateMatchResult(
matched=True,
template_id=match_result.template_id,
confidence=match_result.confidence,
slots=slots,
sparql=sparql,
reasoning=match_result.reasoning
)
# =============================================================================
# FACTORY FUNCTION
# =============================================================================
def get_template_pipeline() -> TemplateSPARQLPipeline:
"""Get or create template SPARQL pipeline instance."""
return TemplateSPARQLPipeline()