glam/backend/rag/semantic_router.py
kempersc 98c42bf272 Fix LinkML URI conflicts and generate RDF outputs
- Fix scope_note → finding_aid_scope_note in FindingAid.yaml
- Remove duplicate wikidata_entity slot from CustodianType.yaml (import instead)
- Remove duplicate rico_record_set_type from class_metadata_slots.yaml
- Fix range types for equals_string compatibility (uriorcurie → string)
- Move class names from close_mappings to see_also in 10 RecordSetTypes files
- Generate all RDF formats: OWL, N-Triples, RDF/XML, N3, JSON-LD context
- Sync schemas to frontend/public/schemas/

Files: 1,151 changed (includes prior CustodianType migration)
2026-01-07 12:32:59 +01:00

372 lines
14 KiB
Python

"""
Semantic Routing for Heritage RAG
Implements Signal-Decision architecture for fast, accurate query routing.
Based on: docs/plan/external_design_patterns/04_temporal_semantic_hypergraph.md
Key concepts:
- Signal extraction (no LLM) for fast query analysis
- Decision routing based on extracted signals
- Falls back to LLM classification for low-confidence cases
"""
from dataclasses import dataclass, field
from typing import Literal, Optional
import re
import logging
logger = logging.getLogger(__name__)
@dataclass
class QuerySignals:
"""Semantic signals extracted from query."""
# Primary classification
# Using str instead of Literal for runtime flexibility
entity_type: str # "person", "institution", "collection", "event", "mixed"
intent: str # "geographic", "statistical", "relational", "temporal", etc.
# Extracted entities
institution_mentions: list[str] = field(default_factory=list)
person_mentions: list[str] = field(default_factory=list)
location_mentions: list[str] = field(default_factory=list)
# Query characteristics
language: str = "nl"
has_temporal_constraint: bool = False
has_geographic_constraint: bool = False
requires_aggregation: bool = False
# Confidence
confidence: float = 0.85
@dataclass
class RouteConfig:
"""Configuration for query routing."""
primary_backend: str
secondary_backend: Optional[str] = None
qdrant_collection: Optional[str] = None
use_temporal_templates: bool = False
qdrant_filters: dict = field(default_factory=dict)
sparql_variant: Optional[str] = None
class SemanticSignalExtractor:
"""
Extract semantic signals from queries without LLM calls.
Uses:
- Keyword patterns for entity type detection
- Embedding similarity for intent classification
- Regex for entity extraction
"""
# Entity type indicators
PERSON_INDICATORS = [
"wie", "who", "curator", "archivist", "archivaris", "bibliothecaris",
"directeur", "director", "medewerker", "staff", "employee",
"werkt", "works", "persoon", "person", "hoofd", "manager"
]
INSTITUTION_INDICATORS = [
"museum", "musea", "archief", "archieven", "bibliotheek", "bibliotheken",
"galerie", "gallery", "instelling", "institution", "organisatie"
]
AGGREGATION_INDICATORS = [
"hoeveel", "how many", "count", "aantal", "total", "totaal",
"per", "verdeling", "distribution", "gemiddelde", "average"
]
# NOTE: Short words like "in" removed - too many false positives
# "in" matches "interessant", "instituut", etc.
GEOGRAPHIC_INDICATORS = [
"nabij", "near", "waar", "where", "locatie", "location",
"provincie", "province", "stad", "city", "regio", "region"
]
# NOTE: Short words like "na" removed - too many false positives
# "na" matches "nationaal", "naam", etc.
# Use word boundary matching for remaining short indicators
TEMPORAL_INDICATORS = [
"wanneer", "when", "voor", "before", "tussen", "between",
"oudste", "oldest", "nieuwste", "newest",
"opgericht", "founded", "gesloten", "closed", "fusie", "merger",
"geschiedenis", "history", "tijdlijn", "timeline"
]
# Short indicators that require word boundary matching
TEMPORAL_INDICATORS_SHORT = ["na", "after"] # Require \b matching
GEOGRAPHIC_INDICATORS_SHORT = ["in"] # Require \b matching
# Year pattern for temporal detection
YEAR_PATTERN = re.compile(r'\b(1[0-9]{3}|20[0-2][0-9])\b') # 1000-2029
# Known Dutch cities and provinces for location extraction
KNOWN_LOCATIONS = [
"Amsterdam", "Rotterdam", "Den Haag", "Utrecht", "Groningen",
"Noord-Holland", "Zuid-Holland", "Noord-Brabant", "Limburg",
"Gelderland", "Friesland", "Overijssel", "Drenthe", "Zeeland",
"Flevoland", "Haarlem", "Leiden", "Maastricht", "Eindhoven",
"Arnhem", "Nijmegen", "Enschede", "Tilburg", "Breda", "Delft"
]
def __init__(self):
self._intent_embeddings = None
self._model = None
# Precompile word boundary patterns for short indicators
self._temporal_short_patterns = [
re.compile(rf'\b{ind}\b', re.IGNORECASE)
for ind in self.TEMPORAL_INDICATORS_SHORT
]
self._geographic_short_patterns = [
re.compile(rf'\b{ind}\b', re.IGNORECASE)
for ind in self.GEOGRAPHIC_INDICATORS_SHORT
]
def _has_word_boundary_match(self, query: str, patterns: list) -> bool:
"""Check if any pattern matches with word boundaries."""
return any(p.search(query) for p in patterns)
def extract_signals(self, query: str) -> QuerySignals:
"""
Extract all semantic signals from query.
Fast operation - no LLM calls.
"""
query_lower = query.lower()
# Entity type detection
entity_type = self._detect_entity_type(query_lower)
# Intent classification
intent = self._classify_intent(query, query_lower)
# Entity extraction
institutions = self._extract_institutions(query)
persons = self._extract_persons(query)
locations = self._extract_locations(query)
# Constraint detection (with word boundary matching for short indicators)
has_temporal = (
any(ind in query_lower for ind in self.TEMPORAL_INDICATORS) or
self._has_word_boundary_match(query, self._temporal_short_patterns) or
bool(self.YEAR_PATTERN.search(query)) # Year mention implies temporal
)
has_geographic = (
any(ind in query_lower for ind in self.GEOGRAPHIC_INDICATORS) or
self._has_word_boundary_match(query, self._geographic_short_patterns) or
bool(locations)
)
requires_aggregation = any(ind in query_lower for ind in self.AGGREGATION_INDICATORS)
# Language detection
language = self._detect_language(query)
# Confidence based on signal clarity
confidence = self._compute_confidence(entity_type, intent, query_lower)
return QuerySignals(
entity_type=entity_type,
intent=intent,
institution_mentions=institutions,
person_mentions=persons,
location_mentions=locations,
language=language,
has_temporal_constraint=has_temporal,
has_geographic_constraint=has_geographic,
requires_aggregation=requires_aggregation,
confidence=confidence
)
def _detect_entity_type(self, query_lower: str) -> str:
"""Detect primary entity type in query."""
person_score = sum(1 for p in self.PERSON_INDICATORS if p in query_lower)
institution_score = sum(1 for p in self.INSTITUTION_INDICATORS if p in query_lower)
if person_score > 0 and institution_score > 0:
return "mixed"
elif person_score > institution_score:
return "person"
elif institution_score > 0:
return "institution"
else:
return "institution" # Default
def _classify_intent(self, query: str, query_lower: str) -> str:
"""Classify query intent."""
# Quick rule-based classification
if any(ind in query_lower for ind in self.AGGREGATION_INDICATORS):
return "statistical"
# Temporal: check long indicators, short indicators with word boundary, AND year patterns
if (any(ind in query_lower for ind in self.TEMPORAL_INDICATORS) or
self._has_word_boundary_match(query, self._temporal_short_patterns) or
bool(self.YEAR_PATTERN.search(query))): # Year implies temporal intent
return "temporal"
if "vergelijk" in query_lower or "compare" in query_lower:
return "comparative"
if any(ind in query_lower for ind in ["wat is", "what is", "tell me about", "vertel"]):
return "entity_lookup"
# Geographic: check both long indicators and short with word boundary
if (any(ind in query_lower for ind in self.GEOGRAPHIC_INDICATORS) or
self._has_word_boundary_match(query, self._geographic_short_patterns)):
return "geographic"
# Default based on question type
if query_lower.startswith(("welke", "which", "wat", "what")):
return "exploration"
return "exploration"
def _extract_institutions(self, query: str) -> list[str]:
"""Extract institution mentions from query."""
# Known institution patterns
patterns = [
r"(?:het\s+)?(\w+\s+(?:Museum|Archief|Bibliotheek|Galerie))",
r"(Rijksmuseum|Nationaal Archief|KB|Koninklijke Bibliotheek)",
r"(Noord-Hollands Archief|Stadsarchief Amsterdam|Gemeentearchief)",
r"(\w+archief|\w+museum|\w+bibliotheek)",
]
mentions = []
for pattern in patterns:
for match in re.finditer(pattern, query, re.IGNORECASE):
mentions.append(match.group(1))
return list(set(mentions))
def _extract_persons(self, query: str) -> list[str]:
"""Extract person mentions from query."""
# Basic person name pattern (capitalized words with optional tussenvoegsel)
pattern = r"\b([A-Z][a-z]+\s+(?:van\s+(?:de\s+)?|de\s+)?[A-Z][a-z]+)\b"
matches = re.findall(pattern, query)
return matches
def _extract_locations(self, query: str) -> list[str]:
"""Extract location mentions from query."""
mentions = []
query_lower = query.lower()
for loc in self.KNOWN_LOCATIONS:
if loc.lower() in query_lower:
mentions.append(loc)
return mentions
def _detect_language(self, query: str) -> str:
"""Detect query language."""
dutch_indicators = ["welke", "hoeveel", "waar", "wanneer", "wie", "het", "de", "zijn", "er"]
english_indicators = ["which", "how many", "where", "when", "who", "the", "are", "there"]
query_lower = query.lower()
dutch_score = sum(1 for w in dutch_indicators if w in query_lower)
english_score = sum(1 for w in english_indicators if w in query_lower)
return "nl" if dutch_score >= english_score else "en"
def _compute_confidence(self, entity_type: str, intent: str, query_lower: str) -> float:
"""Compute confidence in signal extraction."""
confidence = 0.7 # Base
# Boost for clear entity type
if entity_type != "mixed":
confidence += 0.1
# Boost for clear intent indicators
if any(ind in query_lower for ind in self.AGGREGATION_INDICATORS + self.TEMPORAL_INDICATORS):
confidence += 0.1
# Boost for clear question structure
if query_lower.startswith(("welke", "which", "hoeveel", "how many", "waar", "where")):
confidence += 0.05
return min(confidence, 0.95)
class SemanticDecisionRouter:
"""
Route queries to backends based on signals.
"""
def route(self, signals: QuerySignals) -> RouteConfig:
"""
Determine routing based on signals.
"""
# Person queries → Qdrant persons collection
if signals.entity_type == "person":
config = RouteConfig(
primary_backend="qdrant",
secondary_backend="sparql",
qdrant_collection="heritage_persons",
)
# Add institution filter if mentioned
if signals.institution_mentions:
config.qdrant_filters["custodian_slug"] = self._to_slug(
signals.institution_mentions[0]
)
return config
# Statistical queries → DuckLake
if signals.requires_aggregation:
return RouteConfig(
primary_backend="ducklake",
secondary_backend="sparql",
)
# Temporal queries → Temporal SPARQL templates
if signals.has_temporal_constraint:
return RouteConfig(
primary_backend="sparql",
secondary_backend="qdrant",
use_temporal_templates=True,
qdrant_collection="heritage_custodians",
)
# Geographic queries → SPARQL with location filter
if signals.has_geographic_constraint:
return RouteConfig(
primary_backend="sparql",
secondary_backend="qdrant",
qdrant_collection="heritage_custodians",
)
# Default: hybrid SPARQL + Qdrant
return RouteConfig(
primary_backend="qdrant",
secondary_backend="sparql",
qdrant_collection="heritage_custodians",
)
def _to_slug(self, institution_name: str) -> str:
"""Convert institution name to slug format."""
import unicodedata
normalized = unicodedata.normalize('NFD', institution_name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
slug = ascii_name.lower()
slug = re.sub(r"[''`\",.:;!?()[\]{}]", '', slug)
slug = re.sub(r'[\s_]+', '-', slug)
slug = re.sub(r'-+', '-', slug).strip('-')
return slug
# Singleton instances
_signal_extractor: Optional[SemanticSignalExtractor] = None
_decision_router: Optional[SemanticDecisionRouter] = None
def get_signal_extractor() -> SemanticSignalExtractor:
"""Get or create singleton signal extractor instance."""
global _signal_extractor
if _signal_extractor is None:
_signal_extractor = SemanticSignalExtractor()
return _signal_extractor
def get_decision_router() -> SemanticDecisionRouter:
"""Get or create singleton decision router instance."""
global _decision_router
if _decision_router is None:
_decision_router = SemanticDecisionRouter()
return _decision_router