# RAG Integration: SPARQL-First Retrieval Flow ## Overview This document describes how template-based SPARQL generation integrates with the existing Heritage RAG system in `dspy_heritage_rag.py`. The key principle is **SPARQL-first retrieval**: use precise SPARQL queries to filter structured data before falling back to semantic vector search. ## Current RAG Architecture The existing `HeritageRAG` class in `backend/rag/dspy_heritage_rag.py` follows this flow: ``` User Question | v +----------------------+ | HeritageQueryRouter | <-- Classifies intent, entity_type, sources +----------------------+ | v +----------------------+ | HeritageSPARQLGenerator | <-- LLM generates SPARQL (PROBLEMATIC) +----------------------+ | v +----------------------+ | execute_sparql() | <-- Execute against Oxigraph endpoint +----------------------+ | v +----------------------+ | Qdrant Vector Search | <-- Semantic search for context enrichment +----------------------+ | v +----------------------+ | HeritageAnswerGenerator | <-- Synthesize final answer +----------------------+ ``` ### Problem with Current Flow The `HeritageSPARQLGenerator` (lines 344-478) uses an LLM to generate SPARQL queries. Despite extensive docstrings with examples, this produces: 1. **Syntax errors** - Orphaned punctuation, missing prefixes 2. **Invalid predicates** - Uses CIDOC-CRM properties not in our ontology 3. **Inconsistent results** - Same question generates different queries ## Enhanced Flow with Template-Based SPARQL ``` User Question | v +----------------------+ | HeritageQueryRouter | <-- Classifies intent, extracts entities +----------------------+ | v +----------------------+ | TemplateClassifier | <-- NEW: DSPy module to select template +----------------------+ | +---> Template Found? | | | Yes | No | | | | | v | v | +-------+ +----------------------+ | | Template | | HeritageSPARQLGenerator | <-- Fallback to LLM | | Router | +----------------------+ | +-------+ | | | | | v | | +-------+ | | | Slot | | | | Filler | | | +-------+ | | | | | v | | +-----------+ | | | Template | | | | Instantiate| | | +-----------+ | | | | +-----+-------+-------+ | v +----------------------+ | SPARQL Validator | <-- Validate before execution +----------------------+ | v +----------------------+ | execute_sparql() | <-- Execute validated query +----------------------+ | v +----------------------+ | Qdrant Filter/Enrich | <-- Use SPARQL results to filter Qdrant +----------------------+ | v +----------------------+ | HeritageAnswerGenerator | +----------------------+ ``` ## Integration Points ### 1. HeritageQueryRouter Enhancement The existing `HeritageQueryRouter` (lines 1442-1594) already extracts: - `intent`: geographic, statistical, relational, etc. - `entities`: Named entities in the question - `entity_type`: person, institution, or both - `target_custodian_type`: MUSEUM, LIBRARY, ARCHIVE, etc. We add a new field for template routing: ```python class HeritageQueryRouter(dspy.Module): def forward(self, question: str, language: str = "nl", history: History | None = None) -> Prediction: # ... existing classification ... # NEW: Attempt template classification template_match = self.template_classifier.classify( question=question, intent=result.intent, entities=result.entities, custodian_type=target_custodian_type, ) return Prediction( # ... existing fields ... template_id=template_match.template_id, # NEW template_slots=template_match.slots, # NEW template_confidence=template_match.confidence, # NEW ) ``` ### 2. New TemplateClassifier Module ```python class TemplateClassifier(dspy.Module): """Classify question to a pre-defined SPARQL template. Uses DSPy Signature with Pydantic output for structured classification. This module is optimizable with GEPA alongside existing modules. """ def __init__(self, templates_path: str = "data/sparql_templates.yaml"): super().__init__() self.templates = self._load_templates(templates_path) self.classifier = dspy.ChainOfThought(ClassifyTemplateSignature) def forward( self, question: str, intent: str, entities: list[str], custodian_type: str | None = None, ) -> Prediction: """Classify question to template with slot values.""" # Build template descriptions for the classifier template_descriptions = self._format_template_descriptions() result = self.classifier( question=question, intent=intent, entities=entities, custodian_type=custodian_type or "UNKNOWN", available_templates=template_descriptions, ) return Prediction( template_id=result.template_id, slots=result.slots, confidence=result.confidence, reasoning=result.reasoning, ) ``` ### 3. SPARQL-First Retrieval Pattern The key integration pattern is using SPARQL results to **filter** Qdrant vector search: ```python async def retrieve_with_sparql_filter( self, question: str, routing: Prediction, language: str = "nl", ) -> list[dict]: """SPARQL-first retrieval with Qdrant enrichment. 1. Execute SPARQL query (template or LLM-generated) 2. Extract entity URIs/IDs from results 3. Use URIs to filter Qdrant search 4. Combine structured + semantic results """ # Step 1: Generate and execute SPARQL if routing.template_id and routing.template_confidence > 0.8: # Use template-based SPARQL (high confidence) sparql = self.template_instantiator.instantiate( template_id=routing.template_id, slots=routing.template_slots, ) else: # Fallback to LLM-generated SPARQL sparql_result = self.sparql_generator( question=question, intent=routing.intent, entities=routing.entities, ) sparql = sparql_result.sparql # Step 2: Validate and execute SPARQL is_valid, errors = validate_sparql(sparql) if not is_valid: logger.warning(f"SPARQL validation failed: {errors}") # Fall back to pure vector search return await self.qdrant_retriever.search(question, k=10) sparql_results = await self.execute_sparql(sparql) # Step 3: Extract entity identifiers for Qdrant filtering entity_uris = [r.get("institution") or r.get("s") for r in sparql_results] ghcids = [self._uri_to_ghcid(uri) for uri in entity_uris if uri] # Step 4: Qdrant search with SPARQL-derived filters if ghcids: # Filter Qdrant to only these entities qdrant_results = await self.qdrant_retriever.search( query=question, k=20, filter={"ghcid": {"$in": ghcids}}, ) else: # SPARQL returned no results, use semantic search qdrant_results = await self.qdrant_retriever.search(question, k=10) # Step 5: Merge and deduplicate return self._merge_results(sparql_results, qdrant_results) ``` ### 4. Integration with execute_sparql() The existing `execute_sparql()` method (around line 1945 in tool definitions) needs enhancement: ```python async def execute_sparql( self, sparql_query: str, validate: bool = True, auto_correct: bool = True, ) -> list[dict]: """Execute SPARQL query with validation and auto-correction. Args: sparql_query: SPARQL query string validate: Whether to validate before execution auto_correct: Whether to attempt auto-correction on syntax errors Returns: List of result bindings as dictionaries """ # Import auto-correction from existing module from glam_extractor.api.sparql_linter import auto_correct_sparql, validate_sparql # Step 1: Validate if validate: is_valid, errors = validate_sparql(sparql_query) if not is_valid: if auto_correct: sparql_query, was_modified = auto_correct_sparql(sparql_query) if was_modified: logger.info("SPARQL auto-corrected") # Re-validate after correction is_valid, errors = validate_sparql(sparql_query) if not is_valid: raise ValueError(f"SPARQL still invalid after correction: {errors}") else: raise ValueError(f"Invalid SPARQL: {errors}") # Step 2: Execute try: response = await self.http_client.post( self.sparql_endpoint, data={"query": sparql_query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) response.raise_for_status() data = response.json() bindings = data.get("results", {}).get("bindings", []) # Convert to simple dicts return [ {k: v.get("value") for k, v in binding.items()} for binding in bindings ] except httpx.HTTPStatusError as e: logger.error(f"SPARQL execution failed: {e.response.status_code}") if e.response.status_code == 400: # Query syntax error - log the query for debugging logger.error(f"Query: {sparql_query}") raise ``` ## Fallback Behavior When template-based SPARQL fails or returns no results: ```python class HeritageRAG(dspy.Module): async def answer(self, question: str, language: str = "nl") -> Prediction: # Route the question routing = self.router(question=question, language=language) # Try template-based retrieval first results = [] sparql_used = None if routing.template_id: try: sparql_used = self.instantiate_template(routing) results = await self.execute_sparql(sparql_used) except Exception as e: logger.warning(f"Template SPARQL failed: {e}") # Fall through to LLM generation # Fallback 1: LLM-generated SPARQL if not results: try: sparql_result = self.sparql_generator( question=question, intent=routing.intent, entities=routing.entities, ) sparql_used = sparql_result.sparql results = await self.execute_sparql(sparql_used) except Exception as e: logger.warning(f"LLM SPARQL failed: {e}") # Fallback 2: Pure vector search if not results: logger.info("Falling back to pure vector search") results = await self.qdrant_retriever.search(question, k=10) sparql_used = None # Generate answer context = self._format_results(results) answer = self.answer_generator( question=question, context=context, sources=["sparql" if sparql_used else "qdrant"], language=language, ) return Prediction( answer=answer.answer, citations=answer.citations, sparql_query=sparql_used, result_count=len(results), ) ``` ## Template Confidence Thresholds | Confidence | Behavior | |------------|----------| | > 0.9 | Use template directly, skip LLM | | 0.7 - 0.9 | Use template, validate with LLM | | 0.5 - 0.7 | Use template but prepare LLM fallback | | < 0.5 | Skip template, use LLM directly | ```python TEMPLATE_CONFIDENCE_THRESHOLD = 0.7 def should_use_template(confidence: float) -> bool: return confidence >= TEMPLATE_CONFIDENCE_THRESHOLD ``` ## Qdrant Collection Integration For person queries (`entity_type='person'`), the flow integrates with the `heritage_persons` collection: ```python async def retrieve_persons_with_sparql( self, question: str, routing: Prediction, ) -> list[dict]: """Retrieve person records using SPARQL + Qdrant hybrid. SPARQL queries the RDF graph for schema:Person records. Qdrant filters by custodian_slug if extracted from routing. """ # Generate person-specific SPARQL if routing.entity_type == "person": sparql = self.person_sparql_generator( question=question, intent=routing.intent, entities=routing.entities, ) # Execute SPARQL to get person names/roles sparql_results = await self.execute_sparql(sparql.sparql) # Use results to filter Qdrant if routing.target_custodian_slug: # Filter by institution qdrant_results = await self.qdrant_retriever.search_persons( query=question, filter_custodian=routing.target_custodian_slug, k=20, ) else: # Use SPARQL-derived names as filter names = [r.get("name") for r in sparql_results if r.get("name")] qdrant_results = await self.qdrant_retriever.search_persons( query=question, k=20, ) return self._merge_person_results(sparql_results, qdrant_results) ``` ## Metrics and Monitoring Track template vs LLM query performance: ```python @dataclass class QueryMetrics: """Metrics for SPARQL query execution.""" template_used: bool template_id: str | None confidence: float sparql_valid: bool execution_time_ms: float result_count: int fallback_used: bool error: str | None = None async def execute_with_metrics( self, sparql: str, template_id: str | None = None, confidence: float = 0.0, ) -> tuple[list[dict], QueryMetrics]: """Execute SPARQL and collect metrics.""" start = time.perf_counter() metrics = QueryMetrics( template_used=template_id is not None, template_id=template_id, confidence=confidence, sparql_valid=True, execution_time_ms=0, result_count=0, fallback_used=False, ) try: results = await self.execute_sparql(sparql) metrics.result_count = len(results) except Exception as e: metrics.sparql_valid = False metrics.error = str(e) results = [] metrics.execution_time_ms = (time.perf_counter() - start) * 1000 # Log for monitoring logger.info( f"SPARQL {'template' if metrics.template_used else 'LLM'} " f"[{template_id or 'generated'}] " f"valid={metrics.sparql_valid} " f"results={metrics.result_count} " f"time={metrics.execution_time_ms:.1f}ms" ) return results, metrics ``` ## File Changes Required | File | Changes | |------|---------| | `backend/rag/dspy_heritage_rag.py` | Add TemplateClassifier, modify HeritageQueryRouter | | `backend/rag/template_sparql.py` | NEW: Template loading, instantiation, validation | | `backend/rag/sparql_templates.yaml` | NEW: Template definitions | | `src/glam_extractor/api/sparql_linter.py` | Enhance validation for template output | | `data/validation/sparql_validation_rules.json` | Already exists, source for slot values | ## Testing Integration ```python # tests/rag/test_template_integration.py @pytest.mark.asyncio async def test_template_sparql_execution(): """Test that template SPARQL executes successfully.""" rag = HeritageRAG() # Question that should match a template result = await rag.answer( question="Welke archieven zijn er in Drenthe?", language="nl", ) assert result.sparql_query is not None assert "NL-DR" in result.sparql_query assert result.result_count > 0 @pytest.mark.asyncio async def test_template_fallback_to_llm(): """Test fallback to LLM for unmatched questions.""" rag = HeritageRAG() # Question unlikely to match any template result = await rag.answer( question="Wat is de relatie tussen het Rijksmuseum en Vincent van Gogh?", language="nl", ) # Should still produce an answer via LLM fallback assert result.answer is not None ``` ## Summary The template-based SPARQL integration follows these principles: 1. **SPARQL-first**: Try precise structured queries before semantic search 2. **Graceful degradation**: Template → LLM SPARQL → Vector search 3. **DSPy compatibility**: Template classifier is a DSPy module, optimizable with GEPA 4. **Metrics-driven**: Track template vs LLM performance for continuous improvement 5. **Hybrid results**: Merge SPARQL + Qdrant for comprehensive answers