""" Atomic Query Decomposer for Heritage RAG Semantic Caching Decomposes complex heritage queries into atomic sub-tasks for higher cache hit rates. Based on research showing 40-70% cache hit rates with atomic decomposition vs 5-15% for full queries. Architecture: 1. Parse query into intent + entities + constraints 2. Generate cacheable sub-task keys 3. Check cache for each sub-task independently 4. Merge cached sub-results with fresh retrievals 5. Cache new sub-results for future queries Example decomposition: "Hoeveel musea in Amsterdam hebben een ISIL code?" ↓ ├── IntentTask(intent="statistical", entity_type="MUSEUM") ├── LocationTask(location="Amsterdam", level="city") ├── FilterTask(filter="has_isil", value=True) └── AggregateTask(aggregation="count") Each sub-task can be cached independently: - Statistical queries for "MUSEUM" entities → cacheable - Location filtering for "Amsterdam" → cacheable - ISIL filter application → cacheable """ from __future__ import annotations import hashlib import logging import re from dataclasses import dataclass, field from enum import Enum from typing import Any logger = logging.getLogger(__name__) class SubTaskType(str, Enum): """Types of atomic sub-tasks for heritage queries.""" INTENT_CLASSIFICATION = "intent" ENTITY_EXTRACTION = "entity" LOCATION_FILTER = "location" TYPE_FILTER = "type" TEMPORAL_FILTER = "temporal" IDENTIFIER_FILTER = "identifier" AGGREGATION = "aggregate" RELATIONSHIP = "relation" COMPARISON = "compare" @dataclass class AtomicSubTask: """A single atomic sub-task that can be cached independently.""" task_type: SubTaskType task_key: str # Unique key for cache lookup parameters: dict[str, Any] # Dependencies on other sub-tasks depends_on: list[str] = field(default_factory=list) # Cached result (if available) cached_result: Any = None cache_hit: bool = False def __hash__(self) -> int: return hash(self.task_key) def to_cache_key(self) -> str: """Generate deterministic cache key for this sub-task.""" params_str = "&".join(f"{k}={v}" for k, v in sorted(self.parameters.items())) key_str = f"{self.task_type.value}:{params_str}" return hashlib.sha256(key_str.encode()).hexdigest()[:16] @dataclass class DecomposedQuery: """A query decomposed into atomic sub-tasks.""" original_query: str language: str sub_tasks: list[AtomicSubTask] # Extracted components intent: str | None = None entities: list[str] = field(default_factory=list) location: str | None = None institution_type: str | None = None temporal_range: tuple[str, str] | None = None # Execution metadata fully_cached: bool = False partial_cache_hits: int = 0 def get_cache_keys(self) -> list[str]: """Get all cache keys for sub-tasks.""" return [task.to_cache_key() for task in self.sub_tasks] class HeritageQueryDecomposer: """Decomposes heritage queries into cacheable atomic sub-tasks. Uses pattern matching and linguistic analysis to break down complex queries into independent, cacheable components. Strategies: 1. Intent decomposition - Separate intent from parameters 2. Entity decomposition - Each entity is a separate lookup 3. Filter decomposition - Each filter is independent 4. Aggregation decomposition - Aggregation separate from data """ def __init__(self): # Intent detection patterns self.intent_patterns = { "statistical": [ r"hoeveel\b", r"how many\b", r"aantal\b", r"count\b", r"totaal\b", r"total\b", r"gemiddeld\b", r"average\b", ], "geographic": [ r"waar\b", r"where\b", r"locatie\b", r"location\b", r"in de buurt\b", r"nearby\b", r"kaart\b", r"map\b", ], "temporal": [ r"wanneer\b", r"when\b", r"opgericht\b", r"founded\b", r"geschiedenis\b", r"history\b", r"tijdlijn\b", r"timeline\b", ], "entity_lookup": [ r"wat is\b", r"what is\b", r"informatie over\b", r"about\b", r"details\b", r"tell me\b", r"vertel\b", ], "relational": [ r"relatie\b", r"relationship\b", r"connected\b", r"verbonden\b", r"netwerk\b", r"network\b", r"fusie\b", r"merger\b", ], "comparative": [ r"vergelijk\b", r"compare\b", r"versus\b", r"verschil\b", r"difference\b", r"between\b", r"tussen\b", ], } # Institution type patterns (Dutch + English) self.type_patterns = { "MUSEUM": [r"museum", r"musea", r"museums"], "LIBRARY": [r"bibliothe[e]?k", r"librar(y|ies)", r"bieb"], "ARCHIVE": [r"archief", r"archiv(e|es)", r"archieven"], "GALLERY": [r"galerie", r"galler(y|ies)", r"kunsthal"], "RESEARCH_CENTER": [r"onderzoek", r"research", r"kenniscentrum"], "BOTANICAL_ZOO": [r"dierentuin", r"zoo", r"botanisch", r"artis"], "HOLY_SITES": [r"kerk", r"church", r"kathedraal", r"cathedral"], } # Location patterns self.location_patterns = [ # Dutch cities r"\b(amsterdam|rotterdam|den haag|utrecht|eindhoven|tilburg|groningen|almere|breda|nijmegen|apeldoorn|haarlem|arnhem|zaanstad|amersfoort|haarlemmermeer|enschede|the hague|maastricht|leiden|dordrecht|zoetermeer|zwolle|deventer|delft|alkmaar|leeuwarden|venlo|hilversum|middelburg|sittard|assen|helmond|lelystad|emmen)\b", # Dutch provinces r"\b(noord-holland|zuid-holland|utrecht|gelderland|brabant|limburg|overijssel|friesland|groningen|drenthe|flevoland|zeeland)\b", # Dutch regions r"\b(randstad|veluwe|achterhoek|twente|brabant)\b", ] # Identifier patterns self.identifier_patterns = { "isil": r"isil|NL-[A-Z][a-z]+[A-Z]*", "wikidata": r"Q\d+|wikidata", "ghcid": r"NL-[A-Z]{2}-[A-Z]{3}-[A-Z]-", } # Compile patterns self._compile_patterns() def _compile_patterns(self) -> None: """Compile regex patterns for efficiency.""" self._intent_regexes = { intent: [re.compile(p, re.IGNORECASE) for p in patterns] for intent, patterns in self.intent_patterns.items() } self._type_regexes = { inst_type: [re.compile(p, re.IGNORECASE) for p in patterns] for inst_type, patterns in self.type_patterns.items() } self._location_regexes = [ re.compile(p, re.IGNORECASE) for p in self.location_patterns ] self._identifier_regexes = { id_type: re.compile(p, re.IGNORECASE) for id_type, p in self.identifier_patterns.items() } def decompose(self, query: str, language: str = "nl") -> DecomposedQuery: """Decompose a query into atomic sub-tasks. Args: query: Natural language query language: Query language (nl, en) Returns: DecomposedQuery with atomic sub-tasks """ sub_tasks: list[AtomicSubTask] = [] # 1. Detect intent intent = self._detect_intent(query) sub_tasks.append(AtomicSubTask( task_type=SubTaskType.INTENT_CLASSIFICATION, task_key=f"intent:{intent}", parameters={"intent": intent, "language": language}, )) # 2. Extract institution type inst_type = self._extract_institution_type(query) if inst_type: sub_tasks.append(AtomicSubTask( task_type=SubTaskType.TYPE_FILTER, task_key=f"type:{inst_type}", parameters={"institution_type": inst_type}, depends_on=["intent"], )) # 3. Extract location location = self._extract_location(query) if location: sub_tasks.append(AtomicSubTask( task_type=SubTaskType.LOCATION_FILTER, task_key=f"location:{location.lower()}", parameters={"location": location, "level": self._infer_location_level(location)}, depends_on=["intent"], )) # 4. Extract identifier filters identifiers = self._extract_identifiers(query) for id_type, id_value in identifiers.items(): sub_tasks.append(AtomicSubTask( task_type=SubTaskType.IDENTIFIER_FILTER, task_key=f"identifier:{id_type}:{id_value}", parameters={"identifier_type": id_type, "value": id_value}, depends_on=["intent"], )) # 5. Add aggregation task if statistical if intent == "statistical": agg_type = self._infer_aggregation_type(query) sub_tasks.append(AtomicSubTask( task_type=SubTaskType.AGGREGATION, task_key=f"aggregate:{agg_type}", parameters={"aggregation": agg_type}, depends_on=[t.task_key for t in sub_tasks if t.task_type != SubTaskType.INTENT_CLASSIFICATION], )) # 6. Add relationship task if relational if intent == "relational": sub_tasks.append(AtomicSubTask( task_type=SubTaskType.RELATIONSHIP, task_key="relation:network", parameters={"relation_type": "network"}, depends_on=[t.task_key for t in sub_tasks if t.task_type in [SubTaskType.TYPE_FILTER, SubTaskType.LOCATION_FILTER]], )) return DecomposedQuery( original_query=query, language=language, sub_tasks=sub_tasks, intent=intent, entities=self._extract_entities(query), location=location, institution_type=inst_type, ) def _detect_intent(self, query: str) -> str: """Detect query intent using pattern matching.""" intent_scores: dict[str, int] = {intent: 0 for intent in self.intent_patterns} for intent, regexes in self._intent_regexes.items(): for regex in regexes: if regex.search(query): intent_scores[intent] += 1 best_intent = max(intent_scores, key=intent_scores.get) # type: ignore return best_intent if intent_scores[best_intent] > 0 else "exploration" def _extract_institution_type(self, query: str) -> str | None: """Extract institution type from query.""" for inst_type, regexes in self._type_regexes.items(): for regex in regexes: if regex.search(query): return inst_type return None def _extract_location(self, query: str) -> str | None: """Extract location from query.""" for regex in self._location_regexes: match = regex.search(query) if match: return match.group(0).title() return None def _infer_location_level(self, location: str) -> str: """Infer whether location is city, province, or region.""" location_lower = location.lower() provinces = [ "noord-holland", "zuid-holland", "utrecht", "gelderland", "brabant", "limburg", "overijssel", "friesland", "groningen", "drenthe", "flevoland", "zeeland", ] regions = ["randstad", "veluwe", "achterhoek", "twente"] if location_lower in provinces: return "province" elif location_lower in regions: return "region" else: return "city" def _extract_identifiers(self, query: str) -> dict[str, str | None]: """Extract identifier references from query.""" identifiers: dict[str, str | None] = {} for id_type, regex in self._identifier_regexes.items(): match = regex.search(query) if match: identifiers[id_type] = match.group(0) if "has" not in query.lower() else "exists" return identifiers def _extract_entities(self, query: str) -> list[str]: """Extract named entities from query (simplified NER).""" entities: list[str] = [] # Extract quoted strings quoted = re.findall(r'"([^"]+)"', query) entities.extend(quoted) # Extract institution type mentions inst_type = self._extract_institution_type(query) if inst_type: entities.append(inst_type) # Extract location mentions location = self._extract_location(query) if location: entities.append(location) return entities def _infer_aggregation_type(self, query: str) -> str: """Infer aggregation type from query.""" query_lower = query.lower() if any(w in query_lower for w in ["gemiddeld", "average"]): return "average" elif any(w in query_lower for w in ["totaal", "total", "sum"]): return "sum" elif any(w in query_lower for w in ["maximum", "max", "meest", "most"]): return "max" elif any(w in query_lower for w in ["minimum", "min", "minst", "least"]): return "min" else: return "count" def generate_sub_task_cache_key( self, task: AtomicSubTask, language: str = "nl", ) -> str: """Generate a cache key for a sub-task. Format: heritage:subtask:{type}:{hash} """ params_with_lang = {**task.parameters, "lang": language} params_str = "&".join(f"{k}={v}" for k, v in sorted(params_with_lang.items())) hash_input = f"{task.task_type.value}:{params_str}" hash_value = hashlib.sha256(hash_input.encode()).hexdigest()[:12] return f"heritage:subtask:{task.task_type.value}:{hash_value}" class AtomicCacheManager: """Manages atomic sub-task caching for the heritage RAG pipeline. Integrates with HeritageSemanticCache for storing and retrieving atomic sub-task results. """ def __init__(self, semantic_cache: Any = None): """Initialize with optional semantic cache backend. Args: semantic_cache: HeritageSemanticCache instance """ self.decomposer = HeritageQueryDecomposer() self.semantic_cache = semantic_cache # In-memory fallback for sub-task cache self._subtask_cache: dict[str, Any] = {} # Statistics self.stats = { "queries_decomposed": 0, "subtask_hits": 0, "subtask_misses": 0, "full_query_reassemblies": 0, } async def process_query( self, query: str, language: str = "nl", ) -> tuple[DecomposedQuery, dict[str, Any]]: """Process a query with atomic caching. Returns decomposed query and cached sub-results. """ self.stats["queries_decomposed"] += 1 # Decompose query decomposed = self.decomposer.decompose(query, language) # Check cache for each sub-task cached_results: dict[str, Any] = {} for task in decomposed.sub_tasks: cache_key = self.decomposer.generate_sub_task_cache_key(task, language) # Try semantic cache first cached_value = await self._get_cached_subtask(cache_key) if cached_value is not None: task.cached_result = cached_value task.cache_hit = True cached_results[task.task_key] = cached_value decomposed.partial_cache_hits += 1 self.stats["subtask_hits"] += 1 else: self.stats["subtask_misses"] += 1 # Check if fully cached decomposed.fully_cached = all(t.cache_hit for t in decomposed.sub_tasks) if decomposed.fully_cached: self.stats["full_query_reassemblies"] += 1 return decomposed, cached_results async def cache_subtask_result( self, task: AtomicSubTask, result: Any, language: str = "nl", ttl: int = 3600, ) -> bool: """Cache a sub-task result. Args: task: The atomic sub-task result: Result to cache language: Query language ttl: Time-to-live in seconds Returns: True if cached successfully """ cache_key = self.decomposer.generate_sub_task_cache_key(task, language) # Store in semantic cache if available if self.semantic_cache: try: await self.semantic_cache.set( query=task.task_key, response={"subtask_result": result}, intent=task.task_type.value, language=language, ) return True except Exception as e: logger.warning(f"Failed to cache subtask in semantic cache: {e}") # Fallback to memory cache self._subtask_cache[cache_key] = result return True async def _get_cached_subtask(self, cache_key: str) -> Any | None: """Get cached sub-task result.""" # Try semantic cache if self.semantic_cache: try: result = await self.semantic_cache.get(query=cache_key) if result and "subtask_result" in result: return result["subtask_result"] except Exception as e: logger.debug(f"Semantic cache lookup failed: {e}") # Fallback to memory return self._subtask_cache.get(cache_key) def get_stats(self) -> dict[str, Any]: """Get atomic caching statistics.""" total_subtasks = self.stats["subtask_hits"] + self.stats["subtask_misses"] hit_rate = self.stats["subtask_hits"] / total_subtasks if total_subtasks > 0 else 0 return { **self.stats, "subtask_hit_rate": round(hit_rate * 100, 2), "memory_cache_size": len(self._subtask_cache), } # Convenience functions _decomposer_instance: HeritageQueryDecomposer | None = None def get_decomposer() -> HeritageQueryDecomposer: """Get or create global decomposer instance.""" global _decomposer_instance if _decomposer_instance is None: _decomposer_instance = HeritageQueryDecomposer() return _decomposer_instance def decompose_query(query: str, language: str = "nl") -> DecomposedQuery: """Convenience function to decompose a query.""" return get_decomposer().decompose(query, language) # Example usage and testing if __name__ == "__main__": # Test decomposition decomposer = HeritageQueryDecomposer() test_queries = [ "Hoeveel musea zijn er in Amsterdam?", "Where is the Rijksmuseum located?", "Show me archives in Noord-Holland with ISIL codes", "Welke bibliotheken zijn gefuseerd sinds 2000?", "Find heritage institutions near Rotterdam Centraal", "Compare the collections of Rijksmuseum and Van Gogh Museum", ] for query in test_queries: print(f"\nQuery: {query}") decomposed = decomposer.decompose(query) print(f" Intent: {decomposed.intent}") print(f" Institution type: {decomposed.institution_type}") print(f" Location: {decomposed.location}") print(f" Sub-tasks: {len(decomposed.sub_tasks)}") for task in decomposed.sub_tasks: print(f" - {task.task_type.value}: {task.parameters}")