From 6c19ef8661b300a32912a97deb18eb50cd3e9b40 Mon Sep 17 00:00:00 2001 From: kempersc Date: Sat, 10 Jan 2026 18:42:43 +0100 Subject: [PATCH] feat(rag): add Rule 46 epistemic provenance tracking Track full lineage of RAG responses: WHERE data comes from, WHEN it was retrieved, HOW it was processed (SPARQL/vector/LLM). Backend changes: - Add provenance.py with EpistemicProvenance, DataTier, SourceAttribution - Integrate provenance into MultiSourceRetriever.merge_results() - Return epistemic_provenance in DSPyQueryResponse Frontend changes: - Pass EpistemicProvenance through useMultiDatabaseRAG hook - Display provenance in ConversationPage (for cache transparency) Schema fixes: - Fix truncated example in has_observation.yaml slot definition References: - Pavlyshyn's Context Graphs and Data Traces paper - LinkML ProvenanceBlock schema pattern --- backend/rag/main.py | 320 +++++++++++++++++- backend/rag/provenance.py | 280 +++++++++++++++ .../schemas/20251121/linkml/manifest.json | 2 +- .../linkml/modules/slots/has_observation.yaml | 6 +- frontend/src/hooks/useMultiDatabaseRAG.ts | 10 +- frontend/src/pages/ConversationPage.tsx | 6 +- .../linkml/modules/slots/has_observation.yaml | 6 +- 7 files changed, 602 insertions(+), 28 deletions(-) create mode 100644 backend/rag/provenance.py diff --git a/backend/rag/main.py b/backend/rag/main.py index 1628c31bb0..1a335f6183 100644 --- a/backend/rag/main.py +++ b/backend/rag/main.py @@ -65,6 +65,18 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field +# Rule 46: Epistemic Provenance Tracking +from .provenance import ( + EpistemicProvenance, + EpistemicDataSource, + DataTier, + RetrievalSource, + SourceAttribution, + infer_data_tier, + build_derivation_chain, + aggregate_data_tier, +) + # Configure logging logging.basicConfig( level=logging.INFO, @@ -686,6 +698,9 @@ class DSPyQueryResponse(BaseModel): # Factual query mode - skip LLM generation for count/list queries factual_result: bool = False # True if this is a direct SPARQL result (no LLM prose generation) sparql_query: str | None = None # The SPARQL query that was executed (for transparency) + + # Rule 46: Epistemic Provenance Tracking + epistemic_provenance: dict[str, Any] | None = None # Full provenance chain for transparency def extract_llm_response_metadata( @@ -1525,15 +1540,37 @@ class MultiSourceRetriever: self, results: list[RetrievalResult], max_results: int = 20, - ) -> list[dict[str, Any]]: + template_used: bool = False, + template_id: str | None = None, + ) -> tuple[list[dict[str, Any]], EpistemicProvenance]: """Merge and deduplicate results from multiple sources. Uses reciprocal rank fusion for score combination. + Returns merged items AND epistemic provenance tracking. + + Rule 46: Epistemic Provenance Tracking """ + from datetime import datetime, timezone + # Track items by GHCID for deduplication merged: dict[str, dict[str, Any]] = {} + # Initialize provenance tracking + tier_counts: dict[DataTier, int] = {} + sources_queried = [r.source.value for r in results] + total_retrieved = sum(len(r.items) for r in results) + for result in results: + # Map DataSource to RetrievalSource + source_map = { + DataSource.QDRANT: RetrievalSource.QDRANT, + DataSource.SPARQL: RetrievalSource.SPARQL, + DataSource.TYPEDB: RetrievalSource.TYPEDB, + DataSource.POSTGIS: RetrievalSource.POSTGIS, + DataSource.CACHE: RetrievalSource.CACHE, + } + retrieval_source = source_map.get(result.source, RetrievalSource.LLM_SYNTHESIS) + for rank, item in enumerate(result.items): ghcid = item.get("ghcid", item.get("id", f"unknown_{rank}")) @@ -1541,6 +1578,17 @@ class MultiSourceRetriever: merged[ghcid] = item.copy() merged[ghcid]["_sources"] = [] merged[ghcid]["_rrf_score"] = 0.0 + merged[ghcid]["_data_tier"] = None + + # Infer data tier for this item + item_tier = infer_data_tier(item, retrieval_source) + tier_counts[item_tier] = tier_counts.get(item_tier, 0) + 1 + + # Track best (lowest) tier for each item + if merged[ghcid]["_data_tier"] is None: + merged[ghcid]["_data_tier"] = item_tier.value + else: + merged[ghcid]["_data_tier"] = min(merged[ghcid]["_data_tier"], item_tier.value) # Reciprocal Rank Fusion rrf_score = 1.0 / (60 + rank) # k=60 is standard @@ -1564,7 +1612,31 @@ class MultiSourceRetriever: reverse=True, ) - return sorted_items[:max_results] + final_items = sorted_items[:max_results] + + # Build epistemic provenance + provenance = EpistemicProvenance( + dataSource=EpistemicDataSource.RAG_PIPELINE, + dataTier=aggregate_data_tier(tier_counts), + sourceTimestamp=datetime.now(timezone.utc).isoformat(), + derivationChain=build_derivation_chain( + sources_used=sources_queried, + template_used=template_used, + template_id=template_id, + ), + revalidationPolicy="weekly", + sourcesQueried=sources_queried, + totalRetrieved=total_retrieved, + totalAfterFusion=len(final_items), + dataTierBreakdown={ + f"tier_{tier.value}": count + for tier, count in tier_counts.items() + }, + templateUsed=template_used, + templateId=template_id, + ) + + return final_items, provenance async def close(self) -> None: """Clean up resources.""" @@ -2262,8 +2334,8 @@ async def query_rag(request: QueryRequest) -> QueryResponse: institution_types=geo_filters["institution_types"], ) - # Merge results - merged_items = retriever.merge_results(results, max_results=request.k * 2) + # Merge results with provenance tracking + merged_items, retrieval_provenance = retriever.merge_results(results, max_results=request.k * 2) # Generate visualization config if requested visualization = None @@ -2906,24 +2978,64 @@ async def dspy_query(request: DSPyQueryRequest) -> DSPyQueryResponse: # Fall back to dict if LLMResponseMetadata fails llm_response_obj = llm_response_cached # type: ignore[assignment] + # Rule 46: Build provenance for cache hit responses + cached_sources = cached.get("sources", []) + cached_template_used = cached_context.get("template_used", False) + cached_template_id = cached_context.get("template_id") + cached_llm_provider = cached_context.get("llm_provider") + cached_llm_model = cached_context.get("llm_model") + + # Infer data tier - prioritize cached provenance if present + cached_provenance = cached_context.get("epistemic_provenance") + if cached_provenance: + # Use the cached provenance, but mark it as coming from cache + cache_provenance = cached_provenance.copy() + if "CACHE" not in cache_provenance.get("derivationChain", []): + cache_provenance.setdefault("derivationChain", []).insert(0, "CACHE:hit") + else: + # Build fresh provenance for older cache entries + cache_tier = DataTier.TIER_3_CROWD_SOURCED.value + if cached_template_used: + cache_tier = DataTier.TIER_1_AUTHORITATIVE.value + elif any(s.lower() in ["sparql", "typedb"] for s in cached_sources): + cache_tier = DataTier.TIER_1_AUTHORITATIVE.value + + cache_provenance = EpistemicProvenance( + dataSource=EpistemicDataSource.CACHE_AGGREGATION, + dataTier=cache_tier, + derivationChain=["CACHE:hit"] + build_derivation_chain( + sources_used=cached_sources, + template_used=cached_template_used, + template_id=cached_template_id, + llm_provider=cached_llm_provider, + ), + sourcesQueried=cached_sources, + templateUsed=cached_template_used, + templateId=cached_template_id, + llmProvider=cached_llm_provider, + llmModel=cached_llm_model, + ).model_dump() + response_data = { "question": request.question, "answer": cached.get("answer", ""), - "sources_used": cached.get("sources", []), + "sources_used": cached_sources, "visualization": visualization, "resolved_question": cached_context.get("resolved_question"), "retrieved_results": cached_context.get("retrieved_results"), "query_type": cached_context.get("query_type"), "embedding_model_used": cached_context.get("embedding_model"), - "llm_model_used": cached_context.get("llm_model"), + "llm_model_used": cached_llm_model, "query_time_ms": round(elapsed_ms, 2), "cache_hit": True, "llm_response": llm_response_obj, # GLM 4.7 reasoning_content from cache # Session management - return session_id for follow-up queries "session_id": session_id, # Template tracking from cache - "template_used": cached_context.get("template_used", False), - "template_id": cached_context.get("template_id"), + "template_used": cached_template_used, + "template_id": cached_template_id, + # Rule 46: Epistemic provenance for transparency + "epistemic_provenance": cache_provenance, } # Record cache hit metrics @@ -3074,6 +3186,70 @@ async def dspy_query(request: DSPyQueryRequest) -> DSPyQueryResponse: "scores": {"combined": 1.0}, }) logger.debug(f"[FACTUAL-QUERY] COUNT query result: {sparql_results[0].get('count') if sparql_results else 0}") + + # Execute companion query if available to get entity results for map/list + # This fetches the actual institution records that were counted + companion_query = getattr(template_result, 'companion_query', None) + if companion_query: + try: + companion_response = await client.post( + settings.sparql_endpoint, + data={"query": companion_query}, + headers={"Accept": "application/sparql-results+json"}, + timeout=30.0, + ) + + if companion_response.status_code == 200: + companion_data = companion_response.json() + companion_bindings = companion_data.get("results", {}).get("bindings", []) + companion_raw = [ + {k: v.get("value") for k, v in binding.items()} + for binding in companion_bindings + ] + + # Transform companion results to frontend format + companion_results = [] + for row in companion_raw: + lat = None + lon = None + if row.get("lat"): + try: + lat = float(row["lat"]) + except (ValueError, TypeError): + pass + if row.get("lon"): + try: + lon = float(row["lon"]) + except (ValueError, TypeError): + pass + + companion_results.append({ + "name": row.get("name"), + "institution_uri": row.get("institution"), + "metadata": { + "latitude": lat, + "longitude": lon, + "city": row.get("city") or template_result.slots.get("city"), + "institution_type": template_result.slots.get("institution_type"), + }, + "scores": {"combined": 1.0}, + }) + + # Store companion results - these will be used for map/list display + # while sparql_results contains the count for the answer text + if companion_results: + logger.info(f"[COMPANION-QUERY] Fetched {len(companion_results)} entities for display, {sum(1 for r in companion_results if r['metadata'].get('latitude'))} with coordinates") + # Replace sparql_results with companion results for display + # but preserve the count value for answer rendering + count_value = sparql_results[0].get("count", 0) if sparql_results else 0 + sparql_results = companion_results + # Add count to first result so it's available for ui_template + if sparql_results: + sparql_results[0]["count"] = count_value + else: + logger.warning(f"[COMPANION-QUERY] Failed with status {companion_response.status_code}") + except Exception as ce: + logger.warning(f"[COMPANION-QUERY] Execution failed: {ce}") else: # Transform SPARQL results to match frontend expected format # Frontend expects: {name, website, metadata: {latitude, longitude, city, ...}} @@ -3579,12 +3755,45 @@ async def dspy_query(request: DSPyQueryRequest) -> DSPyQueryResponse: template_used = getattr(result, "template_used", False) template_id = getattr(result, "template_id", None) + # Rule 46: Build epistemic provenance for transparency + # This tracks WHERE, WHEN, and HOW the response data originated + sources_used_list = getattr(result, "sources_used", []) + + # Infer data tier from sources - SPARQL/TypeDB are authoritative, Qdrant may include scraped data + inferred_tier = DataTier.TIER_3_CROWD_SOURCED.value # Default + if template_used: + # Template-based SPARQL uses curated Oxigraph data + inferred_tier = DataTier.TIER_1_AUTHORITATIVE.value + elif any(s.lower() in ["sparql", "typedb"] for s in sources_used_list): + inferred_tier = DataTier.TIER_1_AUTHORITATIVE.value + elif any(s.lower() == "qdrant" for s in sources_used_list): + inferred_tier = DataTier.TIER_3_CROWD_SOURCED.value + + # Build provenance object + response_provenance = EpistemicProvenance( + dataSource=EpistemicDataSource.RAG_PIPELINE, + dataTier=inferred_tier, + derivationChain=build_derivation_chain( + sources_used=sources_used_list, + template_used=template_used, + template_id=template_id, + llm_provider=llm_provider_used, + ), + sourcesQueried=sources_used_list, + totalRetrieved=len(retrieved_results) if retrieved_results else 0, + totalAfterFusion=len(retrieved_results) if retrieved_results else 0, + templateUsed=template_used, + templateId=template_id, + llmProvider=llm_provider_used, + llmModel=llm_model_used, + ) + # Build response object response = DSPyQueryResponse( question=request.question, resolved_question=getattr(result, "resolved_question", None), answer=getattr(result, "answer", "Geen antwoord gevonden."), - sources_used=getattr(result, "sources_used", []), + sources_used=sources_used_list, visualization=visualization, retrieved_results=retrieved_results, # Raw data for frontend visualization query_type=query_type, # "person" or "institution" @@ -3606,6 +3815,8 @@ async def dspy_query(request: DSPyQueryRequest) -> DSPyQueryResponse: # Template SPARQL tracking template_used=template_used, template_id=template_id, + # Rule 46: Epistemic provenance for transparency + epistemic_provenance=response_provenance.model_dump(), ) # Update session with this turn for multi-turn conversation support @@ -3880,23 +4091,63 @@ async def stream_dspy_query_response( "data": cached.get("visualization_data"), } + # Rule 46: Build provenance for streaming cache hit responses + stream_cached_sources = cached.get("sources", []) + stream_cached_template_used = cached_context.get("template_used", False) + stream_cached_template_id = cached_context.get("template_id") + stream_cached_llm_provider = cached_context.get("llm_provider") + stream_cached_llm_model = cached_context.get("llm_model") + + # Infer data tier - prioritize cached provenance if present + stream_cached_prov = cached_context.get("epistemic_provenance") + if stream_cached_prov: + # Use the cached provenance, but mark it as coming from cache + stream_cache_provenance = stream_cached_prov.copy() + if "CACHE" not in stream_cache_provenance.get("derivationChain", []): + stream_cache_provenance.setdefault("derivationChain", []).insert(0, "CACHE:hit") + else: + # Build fresh provenance for older cache entries + stream_cache_tier = DataTier.TIER_3_CROWD_SOURCED.value + if stream_cached_template_used: + stream_cache_tier = DataTier.TIER_1_AUTHORITATIVE.value + elif any(s.lower() in ["sparql", "typedb"] for s in stream_cached_sources): + stream_cache_tier = DataTier.TIER_1_AUTHORITATIVE.value + + stream_cache_provenance = EpistemicProvenance( + dataSource=EpistemicDataSource.CACHE_AGGREGATION, + dataTier=stream_cache_tier, + derivationChain=["CACHE:hit"] + build_derivation_chain( + sources_used=stream_cached_sources, + template_used=stream_cached_template_used, + template_id=stream_cached_template_id, + llm_provider=stream_cached_llm_provider, + ), + sourcesQueried=stream_cached_sources, + templateUsed=stream_cached_template_used, + templateId=stream_cached_template_id, + llmProvider=stream_cached_llm_provider, + llmModel=stream_cached_llm_model, + ).model_dump() + response_data = { "question": request.question, "answer": cached.get("answer", ""), - "sources_used": cached.get("sources", []), + "sources_used": stream_cached_sources, "visualization": visualization, "resolved_question": cached_context.get("resolved_question"), "retrieved_results": cached_context.get("retrieved_results"), "query_type": cached_context.get("query_type"), "embedding_model_used": cached_context.get("embedding_model"), - "llm_model_used": cached_context.get("llm_model"), + "llm_model_used": stream_cached_llm_model, "query_time_ms": round(elapsed_ms, 2), "cache_hit": True, # Session management "session_id": session_id, # Template tracking from cache - "template_used": cached_context.get("template_used", False), - "template_id": cached_context.get("template_id"), + "template_used": stream_cached_template_used, + "template_id": stream_cached_template_id, + # Rule 46: Epistemic provenance for transparency + "epistemic_provenance": stream_cache_provenance, } # Record cache hit metrics for streaming endpoint @@ -4191,11 +4442,41 @@ async def stream_dspy_query_response( latency_ms=int(elapsed_ms), ) + # Rule 46: Build epistemic provenance for streaming endpoint + stream_sources_used = getattr(result, "sources_used", []) + stream_template_used = getattr(result, "template_used", False) + stream_template_id = getattr(result, "template_id", None) + + # Infer data tier from sources + stream_tier = DataTier.TIER_3_CROWD_SOURCED.value + if stream_template_used: + stream_tier = DataTier.TIER_1_AUTHORITATIVE.value + elif any(s.lower() in ["sparql", "typedb"] for s in stream_sources_used): + stream_tier = DataTier.TIER_1_AUTHORITATIVE.value + + stream_provenance = EpistemicProvenance( + dataSource=EpistemicDataSource.RAG_PIPELINE, + dataTier=stream_tier, + derivationChain=build_derivation_chain( + sources_used=stream_sources_used, + template_used=stream_template_used, + template_id=stream_template_id, + llm_provider=llm_provider_used, + ), + sourcesQueried=stream_sources_used, + totalRetrieved=len(retrieved_results) if retrieved_results else 0, + totalAfterFusion=len(retrieved_results) if retrieved_results else 0, + templateUsed=stream_template_used, + templateId=stream_template_id, + llmProvider=llm_provider_used, + llmModel=llm_model_used, + ) + response = DSPyQueryResponse( question=request.question, resolved_question=getattr(result, "resolved_question", None), answer=getattr(result, "answer", "Geen antwoord gevonden."), - sources_used=getattr(result, "sources_used", []), + sources_used=stream_sources_used, visualization=visualization, retrieved_results=retrieved_results, query_type=query_type, @@ -4212,8 +4493,10 @@ async def stream_dspy_query_response( llm_response=llm_response_metadata, # Session management fields for multi-turn conversations session_id=session_id, - template_used=getattr(result, "template_used", False), - template_id=getattr(result, "template_id", None), + template_used=stream_template_used, + template_id=stream_template_id, + # Rule 46: Epistemic provenance for transparency + epistemic_provenance=stream_provenance.model_dump(), ) # Update session with this turn (before caching) @@ -4339,8 +4622,8 @@ async def stream_query_response( "count": len(source_results[0].items) if source_results else 0, }) + "\n" - # Merge and finalize - merged = retriever.merge_results(results) + # Merge and finalize with provenance + merged, stream_provenance = retriever.merge_results(results) elapsed_ms = (asyncio.get_event_loop().time() - start_time) * 1000 yield json.dumps({ @@ -4348,6 +4631,7 @@ async def stream_query_response( "results": merged, "query_time_ms": round(elapsed_ms, 2), "result_count": len(merged), + "epistemic_provenance": stream_provenance.model_dump() if stream_provenance else None, }) + "\n" diff --git a/backend/rag/provenance.py b/backend/rag/provenance.py new file mode 100644 index 0000000000..8dbf0d0833 --- /dev/null +++ b/backend/rag/provenance.py @@ -0,0 +1,280 @@ +""" +Epistemic Provenance Tracking for RAG Pipeline + +Rule 46: Ontology-Driven Cache Segmentation + +This module tracks the full lineage of how RAG responses are derived, including: +- Which data sources contributed each piece of information +- Data quality tiers (1=authoritative, 4=inferred) +- Derivation chain showing processing steps +- Confidence scores and temporal validity + +The provenance data flows through the pipeline: +1. Retriever → adds source attribution +2. merge_results() → aggregates across sources +3. DSPy generator → adds LLM inference step +4. Response → includes full EpistemicProvenance + +Frontend displays this in ProvenanceTooltip when hovering over cache badges. + +References: +- Pavlyshyn's "Context Graphs and Data Traces: Building Epistemology Layers for Agentic Memory" +- LinkML ProvenanceBlock in schemas/20251121/linkml/modules/classes/ +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + + +class DataTier(int, Enum): + """Data quality tier - aligned with LinkML DataTierEnum and frontend DataTier type. + + Lower numbers = higher authority. + """ + TIER_1_AUTHORITATIVE = 1 # ISIL Registry, Nationaal Archief, official government data + TIER_2_VERIFIED = 2 # Wikidata, Google Maps, verified institutional websites + TIER_3_CROWD_SOURCED = 3 # User reviews, community edits, unverified web scrapes + TIER_4_INFERRED = 4 # LLM extraction, inference, aggregation + + +class EpistemicDataSource(str, Enum): + """Data source types - aligned with frontend EpistemicDataSource type.""" + ISIL_REGISTRY = "ISIL_REGISTRY" + WIKIDATA = "WIKIDATA" + CUSTODIAN_YAML = "CUSTODIAN_YAML" + GOOGLE_MAPS = "GOOGLE_MAPS" + WEB_SCRAPE = "WEB_SCRAPE" + LLM_INFERENCE = "LLM_INFERENCE" + SPARQL_QUERY = "SPARQL_QUERY" + RAG_PIPELINE = "RAG_PIPELINE" + USER_PROVIDED = "USER_PROVIDED" + CACHE_AGGREGATION = "CACHE_AGGREGATION" + + +class RetrievalSource(str, Enum): + """Source system for retrieval - maps to DataSource enum.""" + QDRANT = "qdrant" + SPARQL = "sparql" + TYPEDB = "typedb" + POSTGIS = "postgis" + CACHE = "cache" + LLM_SYNTHESIS = "llm_synthesis" + + +class SourceAttribution(BaseModel): + """Attribution to a specific data source for a single result item. + + Tracks how each retrieved item contributed to the final result. + """ + source: RetrievalSource = Field(description="Which retriever returned this item") + data_tier: DataTier = Field(description="Quality tier of this source") + retrieval_rank: int = Field(description="Position in source's result list") + rrf_contribution: float = Field(default=0.0, description="RRF score contribution from this source") + retrieved_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + query_time_ms: float = Field(default=0.0, description="Time taken by this retriever") + + # Optional source-specific details + sparql_query: str | None = Field(default=None, description="SPARQL query if source is SPARQL") + vector_similarity: float | None = Field(default=None, description="Cosine similarity if source is Qdrant") + collection_name: str | None = Field(default=None, description="Collection name for vector search") + template_id: str | None = Field(default=None, description="Template ID if template-based SPARQL") + + +class EpistemicProvenance(BaseModel): + """Complete epistemic provenance for a RAG response. + + This model is designed to be JSON-serializable and compatible with the + frontend's EpistemicProvenance TypeScript interface. + + Fields align with frontend/src/lib/storage/semantic-cache.ts:103-121 + """ + + # Core provenance fields (required for frontend compatibility) + dataSource: EpistemicDataSource = Field( + default=EpistemicDataSource.RAG_PIPELINE, + description="Primary data source for this response" + ) + dataTier: int = Field( + default=3, + ge=1, + le=4, + description="Data quality tier (1=authoritative, 4=inferred)" + ) + sourceTimestamp: str = Field( + default_factory=lambda: datetime.now(timezone.utc).isoformat(), + description="When the source data was retrieved" + ) + derivationChain: list[str] = Field( + default_factory=list, + description="Chain of processing steps, e.g. ['SPARQL:Oxigraph', 'RAG:retrieve', 'LLM:groq']" + ) + revalidationPolicy: str = Field( + default="weekly", + description="How often to revalidate: static, daily, weekly, on_access" + ) + confidenceScore: float | None = Field( + default=None, + description="Confidence score 0-1, if applicable" + ) + + # Extended provenance (for detailed analysis) + sourcesQueried: list[str] = Field( + default_factory=list, + description="List of sources that were queried" + ) + totalRetrieved: int = Field( + default=0, + description="Total items retrieved before fusion" + ) + totalAfterFusion: int = Field( + default=0, + description="Items after RRF fusion and deduplication" + ) + dataTierBreakdown: dict[str, int] = Field( + default_factory=dict, + description="Count of results per data tier" + ) + templateUsed: bool = Field( + default=False, + description="Whether template-based SPARQL was used" + ) + templateId: str | None = Field( + default=None, + description="Which template was used, if any" + ) + llmProvider: str | None = Field( + default=None, + description="LLM provider used for generation" + ) + llmModel: str | None = Field( + default=None, + description="Specific LLM model used" + ) + + class Config: + """Pydantic configuration.""" + use_enum_values = True # Serialize enums to their values + + +def infer_data_tier(item: dict[str, Any], source: RetrievalSource) -> DataTier: + """Infer data tier from item metadata and source. + + Args: + item: Retrieved item with potential provenance metadata + source: Which retriever returned this item + + Returns: + Appropriate DataTier based on item provenance + """ + # Check for explicit data_tier in item + if "data_tier" in item: + tier = item["data_tier"] + if isinstance(tier, int) and 1 <= tier <= 4: + return DataTier(tier) + if isinstance(tier, str): + tier_map = { + "TIER_1_AUTHORITATIVE": DataTier.TIER_1_AUTHORITATIVE, + "TIER_2_VERIFIED": DataTier.TIER_2_VERIFIED, + "TIER_3_CROWD_SOURCED": DataTier.TIER_3_CROWD_SOURCED, + "TIER_4_INFERRED": DataTier.TIER_4_INFERRED, + } + if tier in tier_map: + return tier_map[tier] + + # Check provenance block if present + provenance = item.get("provenance", {}) + if provenance.get("data_source") == "CSV_REGISTRY": + return DataTier.TIER_1_AUTHORITATIVE + if provenance.get("data_source") == "WIKIDATA": + return DataTier.TIER_2_VERIFIED + + # Infer from source + source_tiers = { + RetrievalSource.SPARQL: DataTier.TIER_1_AUTHORITATIVE, # Oxigraph has curated data + RetrievalSource.TYPEDB: DataTier.TIER_1_AUTHORITATIVE, # TypeDB has curated data + RetrievalSource.QDRANT: DataTier.TIER_3_CROWD_SOURCED, # Vector search may include scraped data + RetrievalSource.POSTGIS: DataTier.TIER_2_VERIFIED, # GeoNames data + RetrievalSource.CACHE: DataTier.TIER_3_CROWD_SOURCED, # Cached responses + RetrievalSource.LLM_SYNTHESIS: DataTier.TIER_4_INFERRED, # LLM-generated + } + + return source_tiers.get(source, DataTier.TIER_4_INFERRED) + + +def build_derivation_chain( + sources_used: list[str], + template_used: bool = False, + template_id: str | None = None, + llm_provider: str | None = None, +) -> list[str]: + """Build the derivation chain from processing steps. + + Args: + sources_used: List of data sources queried + template_used: Whether template-based SPARQL was used + template_id: Template ID if used + llm_provider: LLM provider for generation + + Returns: + List of derivation steps, e.g. ['SPARQL:Oxigraph', 'RAG:retrieve', 'LLM:groq'] + """ + chain = [] + + # Add retrieval sources + source_map = { + "qdrant": "Vector:Qdrant", + "sparql": "SPARQL:Oxigraph", + "typedb": "Graph:TypeDB", + "postgis": "Geo:PostGIS", + } + for source in sources_used: + if source.lower() in source_map: + chain.append(source_map[source.lower()]) + + # Add template or RAG step + if template_used and template_id: + chain.append(f"Template:{template_id}") + else: + chain.append("RAG:retrieve") + + # Add LLM generation step + if llm_provider: + chain.append(f"LLM:{llm_provider}") + + return chain + + +def aggregate_data_tier(tier_counts: dict[DataTier, int]) -> int: + """Aggregate multiple data tiers to a single representative tier. + + Uses pessimistic aggregation: the overall tier is the worst (highest number) + tier with significant contribution (>20% of results). + + Args: + tier_counts: Count of results per tier + + Returns: + Aggregated tier number (1-4) + """ + if not tier_counts: + return 4 # Default to inferred if no data + + total = sum(tier_counts.values()) + if total == 0: + return 4 + + # Check from worst to best, return first tier with >20% contribution + for tier in [DataTier.TIER_4_INFERRED, DataTier.TIER_3_CROWD_SOURCED, + DataTier.TIER_2_VERIFIED, DataTier.TIER_1_AUTHORITATIVE]: + count = tier_counts.get(tier, 0) + if count > 0 and (count / total) > 0.2: + return tier.value + + # If all tiers have <20% contribution, return the mode + most_common = max(tier_counts, key=tier_counts.get) # type: ignore[arg-type] + return most_common.value diff --git a/frontend/public/schemas/20251121/linkml/manifest.json b/frontend/public/schemas/20251121/linkml/manifest.json index af38866a39..1b5f39546a 100644 --- a/frontend/public/schemas/20251121/linkml/manifest.json +++ b/frontend/public/schemas/20251121/linkml/manifest.json @@ -1,5 +1,5 @@ { - "generated": "2026-01-10T15:49:44.857Z", + "generated": "2026-01-10T17:17:56.765Z", "version": "1.0.0", "categories": [ { diff --git a/frontend/public/schemas/20251121/linkml/modules/slots/has_observation.yaml b/frontend/public/schemas/20251121/linkml/modules/slots/has_observation.yaml index ef34ae6473..6c927371fe 100644 --- a/frontend/public/schemas/20251121/linkml/modules/slots/has_observation.yaml +++ b/frontend/public/schemas/20251121/linkml/modules/slots/has_observation.yaml @@ -33,6 +33,6 @@ slots: Custodian: hc_id: "https://nde.nl/ontology/hc/nl-nh-ams-m-rm" has_observation: - - "https://nde.nl/ontology/hc/observation/isil-registry-2024"\ - - "https://nde.nl/ontology/hc/observation/wikid... - description: Usage example + - "https://nde.nl/ontology/hc/observation/isil-registry-2024" + - "https://nde.nl/ontology/hc/observation/wikidata-q190804" + description: Usage example showing a Custodian hub linked to multiple observations from different sources diff --git a/frontend/src/hooks/useMultiDatabaseRAG.ts b/frontend/src/hooks/useMultiDatabaseRAG.ts index 49b17edfbd..8f510c8b19 100644 --- a/frontend/src/hooks/useMultiDatabaseRAG.ts +++ b/frontend/src/hooks/useMultiDatabaseRAG.ts @@ -20,7 +20,7 @@ import { useState, useCallback, useRef, useEffect } from 'react'; import type { QdrantSearchResult } from './useQdrant'; -import { semanticCache, type CachedResponse, type CacheStats, type CacheLookupResult } from '../lib/storage/semantic-cache'; +import { semanticCache, type CachedResponse, type CacheStats, type CacheLookupResult, type EpistemicProvenance } from '../lib/storage/semantic-cache'; import type { LLMProviderType } from '../lib/storage/ui-state'; // Configuration - all services use Caddy proxy paths @@ -138,6 +138,8 @@ export interface RAGResponse { // Secondary reply type for composite visualizations (e.g., factual_count + map_points) secondaryReplyType?: ReplyType; secondaryReplyContent?: ReplyContent; + // Rule 46: Epistemic provenance for transparency (WHERE, WHEN, HOW data originated) + epistemicProvenance?: EpistemicProvenance; } export interface RAGSource { @@ -1128,6 +1130,8 @@ async function callDSPy( // Secondary reply type for composite visualizations (e.g., factual_count + map_points) secondaryReplyType?: string; secondaryReplyContent?: ReplyContent; + // Rule 46: Epistemic provenance for transparency (WHERE, WHEN, HOW data originated) + epistemicProvenance?: EpistemicProvenance; }> { // Format conversation history for DSPy backend // Backend expects: context = [{question: "...", answer: "..."}, ...] @@ -1307,6 +1311,8 @@ async function callDSPy( // Secondary reply type for composite visualizations (e.g., factual_count + map_points) secondaryReplyType: data.secondary_reply_type as ReplyType | undefined, secondaryReplyContent: data.secondary_reply_content as ReplyContent | undefined, + // Rule 46: Epistemic provenance from backend + epistemicProvenance: data.epistemic_provenance as EpistemicProvenance | undefined, }; } @@ -1846,6 +1852,8 @@ export function useMultiDatabaseRAG(): UseMultiDatabaseRAGReturn { // Reply type classification from backend classify_and_format() replyType: dspyResponse.replyType as ReplyType | undefined, replyContent: dspyResponse.replyContent, + // Rule 46: Epistemic provenance from backend + epistemicProvenance: dspyResponse.epistemicProvenance, }; // Update pagination state with correct queryType from DSPy response diff --git a/frontend/src/pages/ConversationPage.tsx b/frontend/src/pages/ConversationPage.tsx index ffd9c394e3..a2b17d272d 100644 --- a/frontend/src/pages/ConversationPage.tsx +++ b/frontend/src/pages/ConversationPage.tsx @@ -1914,7 +1914,9 @@ const ConversationPage: React.FC = () => { const cacheSimilarity = lastCacheLookup?.similarity; const cacheMethod = lastCacheLookup?.method; const cacheTier = lastCacheLookup?.tier; - const cacheProvenance = lastCacheLookup?.entry?.epistemicProvenance; + // Rule 46: Prioritize backend epistemic provenance over cache-only provenance + // Backend provenance includes full derivation chain; cache provenance is only for cache hits + const epistemicProvenance = response.epistemicProvenance || lastCacheLookup?.entry?.epistemicProvenance; const cacheLookupTimeMs = lastCacheLookup?.lookupTimeMs; // Replace loading message with response @@ -1929,7 +1931,7 @@ const ConversationPage: React.FC = () => { cacheSimilarity: cacheSimilarity, cacheMethod: cacheMethod, cacheTier: cacheTier, - epistemicProvenance: cacheProvenance, + epistemicProvenance: epistemicProvenance, cacheLookupTimeMs: cacheLookupTimeMs, } : msg diff --git a/schemas/20251121/linkml/modules/slots/has_observation.yaml b/schemas/20251121/linkml/modules/slots/has_observation.yaml index ef34ae6473..6c927371fe 100644 --- a/schemas/20251121/linkml/modules/slots/has_observation.yaml +++ b/schemas/20251121/linkml/modules/slots/has_observation.yaml @@ -33,6 +33,6 @@ slots: Custodian: hc_id: "https://nde.nl/ontology/hc/nl-nh-ams-m-rm" has_observation: - - "https://nde.nl/ontology/hc/observation/isil-registry-2024"\ - - "https://nde.nl/ontology/hc/observation/wikid... - description: Usage example + - "https://nde.nl/ontology/hc/observation/isil-registry-2024" + - "https://nde.nl/ontology/hc/observation/wikidata-q190804" + description: Usage example showing a Custodian hub linked to multiple observations from different sources