""" Semantic Routing for Heritage RAG Implements Signal-Decision architecture for fast, accurate query routing. Based on: docs/plan/external_design_patterns/04_temporal_semantic_hypergraph.md Key concepts: - Signal extraction (no LLM) for fast query analysis - Decision routing based on extracted signals - Falls back to LLM classification for low-confidence cases """ from dataclasses import dataclass, field from typing import Literal, Optional import re import logging logger = logging.getLogger(__name__) @dataclass class QuerySignals: """Semantic signals extracted from query.""" # Primary classification # Using str instead of Literal for runtime flexibility entity_type: str # "person", "institution", "collection", "event", "mixed" intent: str # "geographic", "statistical", "relational", "temporal", etc. # Extracted entities institution_mentions: list[str] = field(default_factory=list) person_mentions: list[str] = field(default_factory=list) location_mentions: list[str] = field(default_factory=list) # Query characteristics language: str = "nl" has_temporal_constraint: bool = False has_geographic_constraint: bool = False requires_aggregation: bool = False # Confidence confidence: float = 0.85 @dataclass class RouteConfig: """Configuration for query routing.""" primary_backend: str secondary_backend: Optional[str] = None qdrant_collection: Optional[str] = None use_temporal_templates: bool = False qdrant_filters: dict = field(default_factory=dict) sparql_variant: Optional[str] = None class SemanticSignalExtractor: """ Extract semantic signals from queries without LLM calls. Uses: - Keyword patterns for entity type detection - Embedding similarity for intent classification - Regex for entity extraction """ # Entity type indicators PERSON_INDICATORS = [ "wie", "who", "curator", "archivist", "archivaris", "bibliothecaris", "directeur", "director", "medewerker", "staff", "employee", "werkt", "works", "persoon", "person", "hoofd", "manager" ] INSTITUTION_INDICATORS = [ "museum", "musea", "archief", "archieven", "bibliotheek", "bibliotheken", "galerie", "gallery", "instelling", "institution", "organisatie" ] AGGREGATION_INDICATORS = [ "hoeveel", "how many", "count", "aantal", "total", "totaal", "per", "verdeling", "distribution", "gemiddelde", "average" ] # NOTE: Short words like "in" removed - too many false positives # "in" matches "interessant", "instituut", etc. GEOGRAPHIC_INDICATORS = [ "nabij", "near", "waar", "where", "locatie", "location", "provincie", "province", "stad", "city", "regio", "region" ] # NOTE: Short words like "na" removed - too many false positives # "na" matches "nationaal", "naam", etc. # Use word boundary matching for remaining short indicators TEMPORAL_INDICATORS = [ "wanneer", "when", "voor", "before", "tussen", "between", "oudste", "oldest", "nieuwste", "newest", "opgericht", "founded", "gesloten", "closed", "fusie", "merger", "geschiedenis", "history", "tijdlijn", "timeline" ] # Short indicators that require word boundary matching TEMPORAL_INDICATORS_SHORT = ["na", "after"] # Require \b matching GEOGRAPHIC_INDICATORS_SHORT = ["in"] # Require \b matching # Year pattern for temporal detection YEAR_PATTERN = re.compile(r'\b(1[0-9]{3}|20[0-2][0-9])\b') # 1000-2029 # Known Dutch cities and provinces for location extraction KNOWN_LOCATIONS = [ "Amsterdam", "Rotterdam", "Den Haag", "Utrecht", "Groningen", "Noord-Holland", "Zuid-Holland", "Noord-Brabant", "Limburg", "Gelderland", "Friesland", "Overijssel", "Drenthe", "Zeeland", "Flevoland", "Haarlem", "Leiden", "Maastricht", "Eindhoven", "Arnhem", "Nijmegen", "Enschede", "Tilburg", "Breda", "Delft" ] def __init__(self): self._intent_embeddings = None self._model = None # Precompile word boundary patterns for short indicators self._temporal_short_patterns = [ re.compile(rf'\b{ind}\b', re.IGNORECASE) for ind in self.TEMPORAL_INDICATORS_SHORT ] self._geographic_short_patterns = [ re.compile(rf'\b{ind}\b', re.IGNORECASE) for ind in self.GEOGRAPHIC_INDICATORS_SHORT ] def _has_word_boundary_match(self, query: str, patterns: list) -> bool: """Check if any pattern matches with word boundaries.""" return any(p.search(query) for p in patterns) def extract_signals(self, query: str) -> QuerySignals: """ Extract all semantic signals from query. Fast operation - no LLM calls. """ query_lower = query.lower() # Entity type detection entity_type = self._detect_entity_type(query_lower) # Intent classification intent = self._classify_intent(query, query_lower) # Entity extraction institutions = self._extract_institutions(query) persons = self._extract_persons(query) locations = self._extract_locations(query) # Constraint detection (with word boundary matching for short indicators) has_temporal = ( any(ind in query_lower for ind in self.TEMPORAL_INDICATORS) or self._has_word_boundary_match(query, self._temporal_short_patterns) or bool(self.YEAR_PATTERN.search(query)) # Year mention implies temporal ) has_geographic = ( any(ind in query_lower for ind in self.GEOGRAPHIC_INDICATORS) or self._has_word_boundary_match(query, self._geographic_short_patterns) or bool(locations) ) requires_aggregation = any(ind in query_lower for ind in self.AGGREGATION_INDICATORS) # Language detection language = self._detect_language(query) # Confidence based on signal clarity confidence = self._compute_confidence(entity_type, intent, query_lower) return QuerySignals( entity_type=entity_type, intent=intent, institution_mentions=institutions, person_mentions=persons, location_mentions=locations, language=language, has_temporal_constraint=has_temporal, has_geographic_constraint=has_geographic, requires_aggregation=requires_aggregation, confidence=confidence ) def _detect_entity_type(self, query_lower: str) -> str: """Detect primary entity type in query.""" person_score = sum(1 for p in self.PERSON_INDICATORS if p in query_lower) institution_score = sum(1 for p in self.INSTITUTION_INDICATORS if p in query_lower) if person_score > 0 and institution_score > 0: return "mixed" elif person_score > institution_score: return "person" elif institution_score > 0: return "institution" else: return "institution" # Default def _classify_intent(self, query: str, query_lower: str) -> str: """Classify query intent.""" # Quick rule-based classification if any(ind in query_lower for ind in self.AGGREGATION_INDICATORS): return "statistical" # Temporal: check long indicators, short indicators with word boundary, AND year patterns if (any(ind in query_lower for ind in self.TEMPORAL_INDICATORS) or self._has_word_boundary_match(query, self._temporal_short_patterns) or bool(self.YEAR_PATTERN.search(query))): # Year implies temporal intent return "temporal" if "vergelijk" in query_lower or "compare" in query_lower: return "comparative" if any(ind in query_lower for ind in ["wat is", "what is", "tell me about", "vertel"]): return "entity_lookup" # Geographic: check both long indicators and short with word boundary if (any(ind in query_lower for ind in self.GEOGRAPHIC_INDICATORS) or self._has_word_boundary_match(query, self._geographic_short_patterns)): return "geographic" # Default based on question type if query_lower.startswith(("welke", "which", "wat", "what")): return "exploration" return "exploration" def _extract_institutions(self, query: str) -> list[str]: """Extract institution mentions from query.""" # Known institution patterns patterns = [ r"(?:het\s+)?(\w+\s+(?:Museum|Archief|Bibliotheek|Galerie))", r"(Rijksmuseum|Nationaal Archief|KB|Koninklijke Bibliotheek)", r"(Noord-Hollands Archief|Stadsarchief Amsterdam|Gemeentearchief)", r"(\w+archief|\w+museum|\w+bibliotheek)", ] mentions = [] for pattern in patterns: for match in re.finditer(pattern, query, re.IGNORECASE): mentions.append(match.group(1)) return list(set(mentions)) def _extract_persons(self, query: str) -> list[str]: """Extract person mentions from query.""" # Basic person name pattern (capitalized words with optional tussenvoegsel) pattern = r"\b([A-Z][a-z]+\s+(?:van\s+(?:de\s+)?|de\s+)?[A-Z][a-z]+)\b" matches = re.findall(pattern, query) return matches def _extract_locations(self, query: str) -> list[str]: """Extract location mentions from query.""" mentions = [] query_lower = query.lower() for loc in self.KNOWN_LOCATIONS: if loc.lower() in query_lower: mentions.append(loc) return mentions def _detect_language(self, query: str) -> str: """Detect query language.""" dutch_indicators = ["welke", "hoeveel", "waar", "wanneer", "wie", "het", "de", "zijn", "er"] english_indicators = ["which", "how many", "where", "when", "who", "the", "are", "there"] query_lower = query.lower() dutch_score = sum(1 for w in dutch_indicators if w in query_lower) english_score = sum(1 for w in english_indicators if w in query_lower) return "nl" if dutch_score >= english_score else "en" def _compute_confidence(self, entity_type: str, intent: str, query_lower: str) -> float: """Compute confidence in signal extraction.""" confidence = 0.7 # Base # Boost for clear entity type if entity_type != "mixed": confidence += 0.1 # Boost for clear intent indicators if any(ind in query_lower for ind in self.AGGREGATION_INDICATORS + self.TEMPORAL_INDICATORS): confidence += 0.1 # Boost for clear question structure if query_lower.startswith(("welke", "which", "hoeveel", "how many", "waar", "where")): confidence += 0.05 return min(confidence, 0.95) class SemanticDecisionRouter: """ Route queries to backends based on signals. """ def route(self, signals: QuerySignals) -> RouteConfig: """ Determine routing based on signals. """ # Person queries → Qdrant persons collection if signals.entity_type == "person": config = RouteConfig( primary_backend="qdrant", secondary_backend="sparql", qdrant_collection="heritage_persons", ) # Add institution filter if mentioned if signals.institution_mentions: config.qdrant_filters["custodian_slug"] = self._to_slug( signals.institution_mentions[0] ) return config # Statistical queries → SPARQL (aggregations via COUNT, SUM, etc.) if signals.requires_aggregation: return RouteConfig( primary_backend="sparql", secondary_backend="qdrant", qdrant_collection="heritage_custodians", ) # Temporal queries → Temporal SPARQL templates if signals.has_temporal_constraint: return RouteConfig( primary_backend="sparql", secondary_backend="qdrant", use_temporal_templates=True, qdrant_collection="heritage_custodians", ) # Geographic queries → SPARQL with location filter if signals.has_geographic_constraint: return RouteConfig( primary_backend="sparql", secondary_backend="qdrant", qdrant_collection="heritage_custodians", ) # Default: hybrid SPARQL + Qdrant return RouteConfig( primary_backend="qdrant", secondary_backend="sparql", qdrant_collection="heritage_custodians", ) def _to_slug(self, institution_name: str) -> str: """Convert institution name to slug format.""" import unicodedata normalized = unicodedata.normalize('NFD', institution_name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') slug = ascii_name.lower() slug = re.sub(r"[''`\",.:;!?()[\]{}]", '', slug) slug = re.sub(r'[\s_]+', '-', slug) slug = re.sub(r'-+', '-', slug).strip('-') return slug # Singleton instances _signal_extractor: Optional[SemanticSignalExtractor] = None _decision_router: Optional[SemanticDecisionRouter] = None def get_signal_extractor() -> SemanticSignalExtractor: """Get or create singleton signal extractor instance.""" global _signal_extractor if _signal_extractor is None: _signal_extractor = SemanticSignalExtractor() return _signal_extractor def get_decision_router() -> SemanticDecisionRouter: """Get or create singleton decision router instance.""" global _decision_router if _decision_router is None: _decision_router = SemanticDecisionRouter() return _decision_router