glam/docs/plan/specificity_score/03-rag-dspy-integration.md
kempersc 11983014bb Enhance specificity scoring system integration with existing infrastructure
- Updated documentation to clarify integration points with existing components in the RAG pipeline and DSPy framework.
- Added detailed mapping of SPARQL templates to context templates for improved specificity filtering.
- Implemented wrapper patterns around existing classifiers to extend functionality without duplication.
- Introduced new tests for the SpecificityAwareClassifier and SPARQLToContextMapper to ensure proper integration and functionality.
- Enhanced the CustodianRDFConverter to include ISO country and subregion codes from GHCID for better geospatial data handling.
2026-01-05 17:37:49 +01:00

24 KiB

Specificity Score System - RAG/DSPy Integration

Overview

This document describes how the specificity scoring system integrates with the existing RAG pipeline and DSPy framework to improve retrieval precision for follow-up questions.

CRITICAL: This system extends the existing TemplateClassifier infrastructure in backend/rag/template_sparql.py. We do NOT create a new classifier - we add a mapping layer on top of existing SPARQL template classification.


Existing RAG Architecture

The GLAM project already has a sophisticated template-based RAG pipeline:

User Question
     |
     v
+----------------------------------+
| ConversationContextResolver      |  <-- EXISTING: backend/rag/template_sparql.py:745
+----------------------------------+     Resolves elliptical follow-ups
     |
     v
+----------------------------------+
| FykeFilter                       |  <-- EXISTING: Filters irrelevant questions
+----------------------------------+
     |
     v
+----------------------------------+
| TemplateClassifier               |  <-- EXISTING: backend/rag/template_sparql.py:1104
+----------------------------------+     Returns SPARQL template_id
     |
     v
+----------------------------------+
| SlotExtractor                    |  <-- EXISTING: Extracts slot values
+----------------------------------+
     |
     v
+----------------------------------+
| TemplateInstantiator             |  <-- EXISTING: Generates SPARQL from template
+----------------------------------+
     |
     v
+----------------------------------+
| SPARQL Execution + Response      |  <-- EXISTING: Query execution
+----------------------------------+

Existing Components Reference

Component Location Description
TemplateClassifierSignature backend/rag/template_sparql.py:634 DSPy Signature for SPARQL template classification
TemplateClassifier backend/rag/template_sparql.py:1104 DSPy Module loading templates from YAML
ConversationContextResolver backend/rag/template_sparql.py:745 Resolves elliptical follow-ups
SlotExtractorSignature backend/rag/template_sparql.py:698 DSPy Signature for slot extraction
sparql_templates.yaml data/sparql_templates.yaml SPARQL template definitions

Existing SPARQL Templates

From data/sparql_templates.yaml:

  • list_institutions_by_type_city - List institutions by type in a city
  • list_institutions_by_type_region - List institutions by type in a region
  • list_institutions_by_type_country - List institutions by type in a country
  • count_institutions_by_type_location - Count institutions by type and location
  • find_institution_by_name - Find institution by name
  • find_institution_by_identifier - Find by ISIL/GHCID
  • find_institutions_by_founding_date - Find oldest/newest institutions
  • compare_locations - Compare institutions between locations
  • find_custodians_by_budget_threshold - Find by budget category
  • none - Fallback (no template matches)

Enhanced Architecture with Specificity Filtering

We extend the existing pipeline with a new component: SPARQL → Context Template Mapper

User Question
     |
     v
+----------------------------------+
| ConversationContextResolver      |  <-- EXISTING
+----------------------------------+
     |
     v
+----------------------------------+
| FykeFilter                       |  <-- EXISTING
+----------------------------------+
     |
     v
+----------------------------------+
| TemplateClassifier               |  <-- EXISTING: Returns SPARQL template_id
+----------------------------------+
     |
     +---> [SPARQL query generation continues as normal]
     |
     v
+----------------------------------+
| SPARQLToContextMapper (NEW)      |  <-- Maps SPARQL template → context template
+----------------------------------+     Uses slot values for refinement
     |
     v
+----------------------------------+
| SpecificityLookup (NEW)          |  <-- Retrieves template-specific class scores
+----------------------------------+
     |
     v
+----------------------------------+
| ClassFilter/Ranker (NEW)         |  <-- Filters classes below threshold
+----------------------------------+
     |
     v
+----------------------------------+
| RAG Context Builder (NEW)        |  <-- Builds context from high-specificity classes
+----------------------------------+
     |
     v
+----------------------------------+
| UML View Renderer (Existing)     |  <-- Filters/highlights UML based on specificity
+----------------------------------+

SPARQL → Context Template Mapping

Mapping Module

# backend/rag/specificity_mapper.py

from typing import Optional
from enum import Enum

class ContextTemplate(str, Enum):
    """Context templates for specificity scoring."""
    ARCHIVE_SEARCH = "archive_search"
    MUSEUM_SEARCH = "museum_search"
    LIBRARY_SEARCH = "library_search"
    COLLECTION_DISCOVERY = "collection_discovery"
    PERSON_RESEARCH = "person_research"
    LOCATION_BROWSE = "location_browse"
    IDENTIFIER_LOOKUP = "identifier_lookup"
    ORGANIZATIONAL_CHANGE = "organizational_change"
    DIGITAL_PLATFORM = "digital_platform"
    GENERAL_HERITAGE = "general_heritage"


# Mapping from SPARQL templates to context templates
SPARQL_TO_CONTEXT_MAP = {
    "list_institutions_by_type_city": ContextTemplate.LOCATION_BROWSE,
    "list_institutions_by_type_region": ContextTemplate.LOCATION_BROWSE,
    "list_institutions_by_type_country": ContextTemplate.LOCATION_BROWSE,
    "count_institutions_by_type_location": ContextTemplate.LOCATION_BROWSE,
    "count_institutions_by_type": ContextTemplate.GENERAL_HERITAGE,
    "find_institution_by_name": ContextTemplate.GENERAL_HERITAGE,
    "list_all_institutions_in_city": ContextTemplate.LOCATION_BROWSE,
    "find_institutions_by_founding_date": ContextTemplate.ORGANIZATIONAL_CHANGE,
    "find_institution_by_identifier": ContextTemplate.IDENTIFIER_LOOKUP,
    "compare_locations": ContextTemplate.LOCATION_BROWSE,
    "find_custodians_by_budget_threshold": ContextTemplate.GENERAL_HERITAGE,
    "none": ContextTemplate.GENERAL_HERITAGE,
}

# Institution type refinement map
INSTITUTION_TYPE_TO_CONTEXT = {
    "A": ContextTemplate.ARCHIVE_SEARCH,
    "M": ContextTemplate.MUSEUM_SEARCH,
    "L": ContextTemplate.LIBRARY_SEARCH,
    "G": ContextTemplate.MUSEUM_SEARCH,  # Galleries similar to museums
    "R": ContextTemplate.GENERAL_HERITAGE,  # Research centers
    "C": ContextTemplate.GENERAL_HERITAGE,  # Corporations
    "H": ContextTemplate.GENERAL_HERITAGE,  # Holy sites
    "D": ContextTemplate.DIGITAL_PLATFORM,  # Digital platforms
}


class SPARQLToContextMapper:
    """Maps SPARQL template classification to context template for specificity scoring."""
    
    def map(
        self,
        sparql_template_id: str,
        extracted_slots: Optional[dict] = None
    ) -> ContextTemplate:
        """
        Map a SPARQL template ID to a context template.
        
        Args:
            sparql_template_id: ID from existing TemplateClassifier
            extracted_slots: Slot values from SlotExtractor (optional)
            
        Returns:
            ContextTemplate for specificity score lookup
        """
        # Base mapping from SPARQL template
        base_context = SPARQL_TO_CONTEXT_MAP.get(
            sparql_template_id, 
            ContextTemplate.GENERAL_HERITAGE
        )
        
        # Refine by institution type if available
        if extracted_slots and "institution_type" in extracted_slots:
            inst_type = extracted_slots["institution_type"]
            refined = INSTITUTION_TYPE_TO_CONTEXT.get(inst_type)
            if refined:
                return refined
        
        return base_context

Integration with Existing TemplateClassifier

# backend/rag/specificity_integration.py

from backend.rag.template_sparql import TemplateClassifier, SlotExtractor
from backend.rag.specificity_mapper import SPARQLToContextMapper, ContextTemplate

class SpecificityAwareClassifier:
    """
    Wraps the existing TemplateClassifier to add context template mapping.
    
    This class does NOT replace the existing classifier - it extends it.
    """
    
    def __init__(self):
        # Use EXISTING classifiers
        self.sparql_classifier = TemplateClassifier()
        self.slot_extractor = SlotExtractor()
        
        # Add NEW context mapper
        self.context_mapper = SPARQLToContextMapper()
    
    def classify(self, question: str, language: str = "nl") -> dict:
        """
        Classify a question and return both SPARQL and context templates.
        
        Returns:
            {
                "sparql_template_id": "list_institutions_by_type_city",
                "context_template_id": "archive_search",
                "extracted_slots": {"institution_type": "A", "city": "Amsterdam"},
                "confidence": 0.95
            }
        """
        # Step 1: Use EXISTING TemplateClassifier
        sparql_result = self.sparql_classifier(question=question, language=language)
        sparql_template_id = sparql_result.template_id
        confidence = sparql_result.confidence
        
        # Step 2: Extract slots using EXISTING SlotExtractor
        extracted_slots = {}
        if sparql_template_id != "none":
            templates = self.sparql_classifier._load_templates()
            if sparql_template_id in templates:
                template_def = templates[sparql_template_id]
                required_slots = ",".join(template_def.slots.keys())
                slot_result = self.slot_extractor(
                    question=question,
                    template_id=sparql_template_id,
                    required_slots=required_slots
                )
                import json
                try:
                    extracted_slots = json.loads(slot_result.slots_json)
                except json.JSONDecodeError:
                    pass
        
        # Step 3: Map to context template (NEW)
        context_template = self.context_mapper.map(
            sparql_template_id,
            extracted_slots
        )
        
        return {
            "sparql_template_id": sparql_template_id,
            "context_template_id": context_template.value,
            "extracted_slots": extracted_slots,
            "confidence": confidence
        }

Specificity Lookup Service

# backend/rag/specificity_lookup.py

from pathlib import Path
from typing import Dict, Optional
import yaml

class SpecificityLookup:
    """Looks up specificity scores for classes based on context template."""
    
    def __init__(self, schema_dir: Path):
        self.schema_dir = schema_dir
        self._cache: Dict[str, Dict[str, float]] = {}
        self._load_scores()
    
    def _load_scores(self):
        """Load specificity scores from LinkML class annotations."""
        classes_dir = self.schema_dir / "modules" / "classes"
        
        for yaml_file in classes_dir.glob("*.yaml"):
            with open(yaml_file) as f:
                data = yaml.safe_load(f)
            
            for class_name, class_def in data.get("classes", {}).items():
                annotations = class_def.get("annotations", {})
                
                # General score
                general_score = annotations.get("specificity_score", 0.5)
                
                # Template-specific scores
                template_scores = annotations.get("template_specificity", {})
                
                self._cache[class_name] = {
                    "general": general_score,
                    **template_scores
                }
    
    def get_score(
        self, 
        class_name: str, 
        context_template: str = "general"
    ) -> float:
        """Get specificity score for a class in a context."""
        if class_name not in self._cache:
            return 0.5  # Default if class not found
        
        scores = self._cache[class_name]
        
        # Try template-specific score first
        if context_template in scores:
            return scores[context_template]
        
        # Fall back to general score
        return scores.get("general", 0.5)
    
    def get_all_scores(self, context_template: str = "general") -> Dict[str, float]:
        """Get all class scores for a context template."""
        return {
            class_name: self.get_score(class_name, context_template)
            for class_name in self._cache
        }
    
    def filter_classes(
        self, 
        context_template: str, 
        threshold: float = 0.5
    ) -> list[str]:
        """Get classes above threshold for a context template."""
        all_scores = self.get_all_scores(context_template)
        return [
            class_name for class_name, score in all_scores.items()
            if score >= threshold
        ]

Specificity-Aware Retriever Module

# backend/rag/specificity_retriever.py

import dspy
from typing import Optional, Dict
from backend.rag.specificity_integration import SpecificityAwareClassifier
from backend.rag.specificity_lookup import SpecificityLookup

class SpecificityAwareRetriever(dspy.Module):
    """
    DSPy module that retrieves context filtered by specificity scores.
    
    Integrates with EXISTING TemplateClassifier infrastructure.
    """
    
    def __init__(
        self, 
        score_lookup: SpecificityLookup,
        vector_store,
        threshold: float = 0.5
    ):
        super().__init__()
        self.classifier = SpecificityAwareClassifier()
        self.score_lookup = score_lookup
        self.vector_store = vector_store
        self.threshold = threshold
    
    def forward(self, question: str, k: int = 10) -> dspy.Prediction:
        # Step 1: Classify using EXISTING infrastructure + NEW context mapping
        classification = self.classifier.classify(question)
        context_template = classification["context_template_id"]
        sparql_template = classification["sparql_template_id"]
        
        # Step 2: Get relevant classes above threshold
        all_scores = self.score_lookup.get_all_scores(context_template)
        relevant_classes = [
            cls for cls, score in all_scores.items()
            if score >= self.threshold
        ]
        
        # Step 3: Retrieve chunks only from relevant classes
        results = self.vector_store.similarity_search(
            question,
            k=k,
            filter={"class_name": {"$in": relevant_classes}}
        )
        
        # Step 4: Rank results by combined relevance + specificity
        ranked_results = self._rank_by_specificity(results, all_scores)
        
        return dspy.Prediction(
            sparql_template_id=sparql_template,
            context_template_id=context_template,
            template_confidence=classification["confidence"],
            extracted_slots=classification["extracted_slots"],
            relevant_classes=relevant_classes,
            context=ranked_results,
            scores=all_scores
        )
    
    def _rank_by_specificity(self, results, scores: Dict[str, float]):
        """Re-rank results by combining vector similarity with specificity score."""
        for result in results:
            class_name = result.metadata.get("class_name", "Unknown")
            specificity = scores.get(class_name, 0.5)
            # Combine scores: 70% vector similarity, 30% specificity
            result.combined_score = (0.7 * result.score) + (0.3 * specificity)
        
        return sorted(results, key=lambda r: r.combined_score, reverse=True)

Conversation Context Integration

Follow-up Question Handling

class ConversationContext:
    """Manages conversation context for follow-up questions."""
    
    def __init__(self):
        self.history: List[Turn] = []
        self.current_template: str = None
        self.active_classes: Set[str] = set()
    
    def add_turn(self, question: str, template_id: str, classes: List[str]):
        """Record a conversation turn."""
        self.history.append(Turn(
            question=question,
            template_id=template_id,
            classes=classes,
            timestamp=datetime.now()
        ))
        
        # Update active context
        self.current_template = template_id
        self.active_classes.update(classes)
    
    def get_context_boost(self) -> Dict[str, float]:
        """Get score boosts for classes mentioned in conversation."""
        boosts = {}
        
        # Recent classes get higher boost
        for i, turn in enumerate(reversed(self.history[-5:])):
            recency_factor = 1.0 - (i * 0.15)  # 1.0, 0.85, 0.70, 0.55, 0.40
            for class_name in turn.classes:
                current_boost = boosts.get(class_name, 0)
                boosts[class_name] = max(current_boost, 0.1 * recency_factor)
        
        return boosts


class ConversationAwareRetriever(SpecificityAwareRetriever):
    """Retriever that considers conversation history."""
    
    def __init__(self, *args, context: ConversationContext = None, **kwargs):
        super().__init__(*args, **kwargs)
        self.context = context or ConversationContext()
    
    def forward(self, question: str, k: int = 10) -> dspy.Prediction:
        # Get base prediction
        prediction = super().forward(question, k)
        
        # Apply conversation context boosts
        context_boosts = self.context.get_context_boost()
        
        for class_name, boost in context_boosts.items():
            if class_name in prediction.scores:
                prediction.scores[class_name] = min(
                    1.0,
                    prediction.scores[class_name] + boost
                )
        
        # Re-filter with boosted scores
        prediction.relevant_classes = [
            cls for cls, score in prediction.scores.items()
            if score >= self.threshold
        ]
        
        # Update conversation context
        self.context.add_turn(
            question,
            prediction.template_id,
            prediction.relevant_classes
        )
        
        return prediction

DSPy History Integration

# Using DSPy's built-in history tracking

class HeritageRAGWithHistory(dspy.Module):
    """Heritage RAG using DSPy history for multi-turn conversations."""
    
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.retriever = ConversationAwareRetriever(...)
        self.qa = dspy.ChainOfThought(HeritageQA)
    
    def forward(self, question: str, history: List[dict] = None) -> dspy.Prediction:
        # Build context from history
        context_prompt = ""
        if history:
            context_prompt = "Previous conversation:\n"
            for turn in history[-3:]:  # Last 3 turns
                context_prompt += f"Q: {turn['question']}\n"
                context_prompt += f"A: {turn['answer']}\n\n"
        
        # Retrieve with conversation awareness
        retrieval = self.retriever(
            question=f"{context_prompt}Current question: {question}"
        )
        
        # Generate answer
        answer = self.qa(
            question=question,
            context=retrieval.context,
            conversation_history=context_prompt
        )
        
        return dspy.Prediction(
            answer=answer.answer,
            template=retrieval.template_id,
            classes_used=retrieval.relevant_classes
        )

Metrics and Evaluation

Retrieval Quality Metrics

def evaluate_specificity_filtering(test_set, retriever):
    """Evaluate retrieval quality with specificity filtering."""
    
    metrics = {
        "precision": [],
        "recall": [],
        "class_reduction": [],
        "response_time": []
    }
    
    for example in test_set:
        start_time = time.time()
        
        prediction = retriever(example.question)
        
        elapsed = time.time() - start_time
        
        # Calculate precision: relevant classes / retrieved classes
        relevant = set(example.expected_classes)
        retrieved = set(prediction.relevant_classes)
        
        precision = len(relevant & retrieved) / len(retrieved) if retrieved else 0
        recall = len(relevant & retrieved) / len(relevant) if relevant else 0
        
        # Calculate class reduction
        all_classes = retriever.score_repo.get_all_scores()
        reduction = 1 - (len(retrieved) / len(all_classes))
        
        metrics["precision"].append(precision)
        metrics["recall"].append(recall)
        metrics["class_reduction"].append(reduction)
        metrics["response_time"].append(elapsed)
    
    return {
        k: sum(v) / len(v) for k, v in metrics.items()
    }

A/B Testing Configuration

class SpecificityExperiment:
    """A/B test configuration for specificity filtering."""
    
    def __init__(self, traffic_split=0.5):
        self.traffic_split = traffic_split
        self.metrics = {"control": [], "treatment": []}
    
    def get_retriever(self, session_id: str):
        """Get retriever based on traffic split."""
        # Deterministic assignment based on session ID
        use_specificity = hash(session_id) % 100 < (self.traffic_split * 100)
        
        if use_specificity:
            return SpecificityAwareRetriever(...)  # Treatment
        else:
            return BaseRetriever(...)  # Control
    
    def log_result(self, session_id: str, quality_score: float):
        """Log result for analysis."""
        group = "treatment" if self.is_treatment(session_id) else "control"
        self.metrics[group].append(quality_score)

Deployment Considerations

Caching Strategy

from functools import lru_cache
from datetime import datetime, timedelta

class CachedScoreRepository:
    """Score repository with caching for production performance."""
    
    def __init__(self, base_repo, cache_ttl_seconds=300):
        self.base_repo = base_repo
        self.cache_ttl = cache_ttl_seconds
        self._cache = {}
        self._cache_time = {}
    
    def get_all_scores(self, template_id: str) -> Dict[str, float]:
        cache_key = f"scores_{template_id}"
        
        # Check cache validity
        if cache_key in self._cache:
            cache_age = (datetime.now() - self._cache_time[cache_key]).seconds
            if cache_age < self.cache_ttl:
                return self._cache[cache_key]
        
        # Refresh cache
        scores = self.base_repo.get_all_scores(template_id)
        self._cache[cache_key] = scores
        self._cache_time[cache_key] = datetime.now()
        
        return scores
    
    def invalidate(self, template_id: str = None):
        """Invalidate cache (called when scores are updated)."""
        if template_id:
            cache_key = f"scores_{template_id}"
            self._cache.pop(cache_key, None)
        else:
            self._cache.clear()
            self._cache_time.clear()

Monitoring

# Prometheus metrics for specificity filtering

from prometheus_client import Counter, Histogram, Gauge

# Track template classification
template_classifications = Counter(
    'specificity_template_classifications_total',
    'Number of questions classified by template',
    ['template_id']
)

# Track filtering effectiveness
classes_before_filter = Histogram(
    'specificity_classes_before_filter',
    'Number of classes before specificity filtering'
)

classes_after_filter = Histogram(
    'specificity_classes_after_filter',
    'Number of classes after specificity filtering'
)

# Track threshold adjustments
current_threshold = Gauge(
    'specificity_threshold',
    'Current specificity threshold',
    ['template_id']
)

References