- Updated documentation to clarify integration points with existing components in the RAG pipeline and DSPy framework. - Added detailed mapping of SPARQL templates to context templates for improved specificity filtering. - Implemented wrapper patterns around existing classifiers to extend functionality without duplication. - Introduced new tests for the SpecificityAwareClassifier and SPARQLToContextMapper to ensure proper integration and functionality. - Enhanced the CustodianRDFConverter to include ISO country and subregion codes from GHCID for better geospatial data handling.
434 lines
14 KiB
Python
434 lines
14 KiB
Python
"""
|
|
Token counting utilities for benchmarking specificity filtering effectiveness.
|
|
|
|
This module provides tools to measure context size reduction achieved by
|
|
specificity-based filtering. It helps quantify:
|
|
1. Token count reduction per context template
|
|
2. Cost savings from reduced context
|
|
3. Comparison between filtered vs. unfiltered context
|
|
|
|
Usage:
|
|
from backend.rag.specificity.token_counter import (
|
|
count_tokens,
|
|
compare_context_sizes,
|
|
benchmark_all_templates,
|
|
)
|
|
|
|
# Count tokens in a string
|
|
tokens = count_tokens("Some text here")
|
|
|
|
# Compare filtered vs unfiltered
|
|
comparison = compare_context_sizes(
|
|
template="archive_search",
|
|
threshold=0.5
|
|
)
|
|
print(f"Reduction: {comparison['reduction_percent']:.1f}%")
|
|
|
|
# Benchmark all templates
|
|
results = benchmark_all_templates(threshold=0.5)
|
|
for template, stats in results.items():
|
|
print(f"{template}: {stats['tokens']} tokens ({stats['reduction_percent']:.1f}% reduction)")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try to import tiktoken for accurate token counting
|
|
try:
|
|
import tiktoken
|
|
TIKTOKEN_AVAILABLE = True
|
|
except ImportError:
|
|
tiktoken = None
|
|
TIKTOKEN_AVAILABLE = False
|
|
logger.warning("tiktoken not available, using approximate token counting")
|
|
|
|
# Try to import schema loader for context generation
|
|
try:
|
|
from backend.rag.schema_loader import (
|
|
format_filtered_ontology_context,
|
|
get_filtered_classes_for_context,
|
|
get_class_count_by_template,
|
|
)
|
|
SCHEMA_LOADER_AVAILABLE = True
|
|
except ImportError:
|
|
SCHEMA_LOADER_AVAILABLE = False
|
|
logger.warning("Schema loader not available for token counting benchmarks")
|
|
|
|
# Try to import context templates
|
|
try:
|
|
from backend.rag.specificity.models import ContextTemplate
|
|
MODELS_AVAILABLE = True
|
|
except ImportError:
|
|
MODELS_AVAILABLE = False
|
|
|
|
|
|
# =============================================================================
|
|
# Token Counting Functions
|
|
# =============================================================================
|
|
|
|
|
|
def count_tokens(text: str, model: str = "gpt-4o") -> int:
|
|
"""Count tokens in text using tiktoken.
|
|
|
|
Args:
|
|
text: Text to count tokens for
|
|
model: Model name for tokenizer selection (default: gpt-4o)
|
|
|
|
Returns:
|
|
Number of tokens
|
|
|
|
Note:
|
|
Falls back to approximate counting (chars/4) if tiktoken unavailable.
|
|
"""
|
|
if not text:
|
|
return 0
|
|
|
|
if TIKTOKEN_AVAILABLE:
|
|
try:
|
|
# Get encoder for model
|
|
try:
|
|
encoding = tiktoken.encoding_for_model(model)
|
|
except KeyError:
|
|
# Fallback to cl100k_base (used by GPT-4, Claude, etc.)
|
|
encoding = tiktoken.get_encoding("cl100k_base")
|
|
|
|
return len(encoding.encode(text))
|
|
except Exception as e:
|
|
logger.warning(f"tiktoken encoding failed: {e}, using approximation")
|
|
|
|
# Fallback: approximate tokens as chars/4
|
|
return len(text) // 4
|
|
|
|
|
|
def count_tokens_for_context(
|
|
context_template: str = "general_heritage",
|
|
threshold: float = 0.5,
|
|
model: str = "gpt-4o",
|
|
) -> int:
|
|
"""Count tokens in a specificity-filtered context.
|
|
|
|
Args:
|
|
context_template: Name of context template
|
|
threshold: Specificity threshold (0.0-1.0)
|
|
model: Model name for tokenizer
|
|
|
|
Returns:
|
|
Token count for the filtered context
|
|
"""
|
|
if not SCHEMA_LOADER_AVAILABLE:
|
|
logger.warning("Schema loader not available")
|
|
return 0
|
|
|
|
context = format_filtered_ontology_context(context_template, threshold)
|
|
return count_tokens(context, model)
|
|
|
|
|
|
# =============================================================================
|
|
# Comparison and Benchmarking
|
|
# =============================================================================
|
|
|
|
|
|
@dataclass
|
|
class ContextSizeComparison:
|
|
"""Comparison of filtered vs unfiltered context sizes."""
|
|
template: str
|
|
threshold: float
|
|
|
|
# Token counts
|
|
filtered_tokens: int
|
|
unfiltered_tokens: int
|
|
|
|
# Class counts
|
|
filtered_classes: int
|
|
unfiltered_classes: int
|
|
|
|
# Derived metrics
|
|
token_reduction: int = field(init=False)
|
|
token_reduction_percent: float = field(init=False)
|
|
class_reduction: int = field(init=False)
|
|
class_reduction_percent: float = field(init=False)
|
|
|
|
def __post_init__(self):
|
|
self.token_reduction = self.unfiltered_tokens - self.filtered_tokens
|
|
self.token_reduction_percent = (
|
|
(self.token_reduction / self.unfiltered_tokens * 100)
|
|
if self.unfiltered_tokens > 0 else 0.0
|
|
)
|
|
self.class_reduction = self.unfiltered_classes - self.filtered_classes
|
|
self.class_reduction_percent = (
|
|
(self.class_reduction / self.unfiltered_classes * 100)
|
|
if self.unfiltered_classes > 0 else 0.0
|
|
)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary."""
|
|
return {
|
|
"template": self.template,
|
|
"threshold": self.threshold,
|
|
"filtered_tokens": self.filtered_tokens,
|
|
"unfiltered_tokens": self.unfiltered_tokens,
|
|
"token_reduction": self.token_reduction,
|
|
"token_reduction_percent": self.token_reduction_percent,
|
|
"filtered_classes": self.filtered_classes,
|
|
"unfiltered_classes": self.unfiltered_classes,
|
|
"class_reduction": self.class_reduction,
|
|
"class_reduction_percent": self.class_reduction_percent,
|
|
}
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f"ContextSizeComparison({self.template}, threshold={self.threshold}):\n"
|
|
f" Tokens: {self.filtered_tokens:,} / {self.unfiltered_tokens:,} "
|
|
f"(-{self.token_reduction_percent:.1f}%)\n"
|
|
f" Classes: {self.filtered_classes} / {self.unfiltered_classes} "
|
|
f"(-{self.class_reduction_percent:.1f}%)"
|
|
)
|
|
|
|
|
|
def compare_context_sizes(
|
|
template: str = "archive_search",
|
|
threshold: float = 0.5,
|
|
model: str = "gpt-4o",
|
|
) -> ContextSizeComparison:
|
|
"""Compare filtered vs unfiltered context sizes.
|
|
|
|
Args:
|
|
template: Context template name
|
|
threshold: Specificity threshold
|
|
model: Model for token counting
|
|
|
|
Returns:
|
|
ContextSizeComparison with token and class counts
|
|
"""
|
|
if not SCHEMA_LOADER_AVAILABLE:
|
|
raise RuntimeError("Schema loader not available for comparison")
|
|
|
|
# Get filtered context
|
|
filtered_context = format_filtered_ontology_context(template, threshold)
|
|
filtered_tokens = count_tokens(filtered_context, model)
|
|
filtered_classes = len(get_filtered_classes_for_context(template, threshold))
|
|
|
|
# Get unfiltered context (threshold=1.0 includes all classes)
|
|
unfiltered_context = format_filtered_ontology_context("general_heritage", 1.0)
|
|
unfiltered_tokens = count_tokens(unfiltered_context, model)
|
|
unfiltered_classes = len(get_filtered_classes_for_context("general_heritage", 1.0))
|
|
|
|
return ContextSizeComparison(
|
|
template=template,
|
|
threshold=threshold,
|
|
filtered_tokens=filtered_tokens,
|
|
unfiltered_tokens=unfiltered_tokens,
|
|
filtered_classes=filtered_classes,
|
|
unfiltered_classes=unfiltered_classes,
|
|
)
|
|
|
|
|
|
def benchmark_all_templates(
|
|
threshold: float = 0.5,
|
|
model: str = "gpt-4o",
|
|
) -> dict[str, ContextSizeComparison]:
|
|
"""Benchmark token counts for all context templates.
|
|
|
|
Args:
|
|
threshold: Specificity threshold to use
|
|
model: Model for token counting
|
|
|
|
Returns:
|
|
Dict mapping template name to ContextSizeComparison
|
|
"""
|
|
if not SCHEMA_LOADER_AVAILABLE or not MODELS_AVAILABLE:
|
|
raise RuntimeError("Required modules not available for benchmarking")
|
|
|
|
results = {}
|
|
|
|
for template in ContextTemplate:
|
|
try:
|
|
comparison = compare_context_sizes(
|
|
template=template.value,
|
|
threshold=threshold,
|
|
model=model,
|
|
)
|
|
results[template.value] = comparison
|
|
except Exception as e:
|
|
logger.warning(f"Failed to benchmark {template.value}: {e}")
|
|
|
|
return results
|
|
|
|
|
|
def format_benchmark_report(
|
|
results: dict[str, ContextSizeComparison],
|
|
include_header: bool = True,
|
|
) -> str:
|
|
"""Format benchmark results as a readable report.
|
|
|
|
Args:
|
|
results: Dict from benchmark_all_templates()
|
|
include_header: Whether to include header
|
|
|
|
Returns:
|
|
Formatted string report
|
|
"""
|
|
lines = []
|
|
|
|
if include_header:
|
|
lines.append("=" * 80)
|
|
lines.append("SPECIFICITY FILTERING BENCHMARK REPORT")
|
|
lines.append("=" * 80)
|
|
lines.append("")
|
|
|
|
# Sort by token reduction percentage (highest first)
|
|
sorted_results = sorted(
|
|
results.items(),
|
|
key=lambda x: x[1].token_reduction_percent,
|
|
reverse=True,
|
|
)
|
|
|
|
# Table header
|
|
lines.append(f"{'Template':<25} {'Tokens':>12} {'Reduction':>12} {'Classes':>10}")
|
|
lines.append("-" * 60)
|
|
|
|
total_filtered = 0
|
|
total_unfiltered = 0
|
|
|
|
for template_name, comparison in sorted_results:
|
|
total_filtered += comparison.filtered_tokens
|
|
total_unfiltered += comparison.unfiltered_tokens
|
|
|
|
lines.append(
|
|
f"{template_name:<25} "
|
|
f"{comparison.filtered_tokens:>12,} "
|
|
f"{comparison.token_reduction_percent:>10.1f}% "
|
|
f"{comparison.filtered_classes:>10}"
|
|
)
|
|
|
|
# Summary
|
|
lines.append("-" * 60)
|
|
avg_reduction = (
|
|
(total_unfiltered - total_filtered) / total_unfiltered * 100
|
|
if total_unfiltered > 0 else 0.0
|
|
)
|
|
lines.append(f"{'Average Reduction:':<25} {avg_reduction:>22.1f}%")
|
|
lines.append("")
|
|
|
|
# Baseline info
|
|
if sorted_results:
|
|
baseline = sorted_results[0][1] # First result has baseline info
|
|
lines.append(f"Baseline (unfiltered): {baseline.unfiltered_tokens:,} tokens, "
|
|
f"{baseline.unfiltered_classes} classes")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# =============================================================================
|
|
# Cost Estimation
|
|
# =============================================================================
|
|
|
|
|
|
@dataclass
|
|
class CostEstimate:
|
|
"""Estimated cost savings from token reduction."""
|
|
template: str
|
|
threshold: float
|
|
|
|
# Token counts
|
|
filtered_tokens: int
|
|
unfiltered_tokens: int
|
|
|
|
# Cost per 1000 queries (USD)
|
|
filtered_cost_1k: float
|
|
unfiltered_cost_1k: float
|
|
savings_1k: float
|
|
savings_percent: float
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f"CostEstimate({self.template}):\n"
|
|
f" Per 1000 queries: ${self.filtered_cost_1k:.4f} vs ${self.unfiltered_cost_1k:.4f}\n"
|
|
f" Savings: ${self.savings_1k:.4f} ({self.savings_percent:.1f}%)"
|
|
)
|
|
|
|
|
|
def estimate_cost_savings(
|
|
template: str = "archive_search",
|
|
threshold: float = 0.5,
|
|
model: str = "gpt-4o-mini",
|
|
input_price_per_1m: float = 0.15, # GPT-4o-mini default
|
|
) -> CostEstimate:
|
|
"""Estimate cost savings from specificity filtering.
|
|
|
|
Args:
|
|
template: Context template
|
|
threshold: Specificity threshold
|
|
model: Model name (for token counting)
|
|
input_price_per_1m: Price per 1M input tokens (USD)
|
|
|
|
Returns:
|
|
CostEstimate with savings calculation
|
|
"""
|
|
comparison = compare_context_sizes(template, threshold, model)
|
|
|
|
# Calculate cost per 1000 queries
|
|
filtered_cost = (comparison.filtered_tokens / 1_000_000) * input_price_per_1m * 1000
|
|
unfiltered_cost = (comparison.unfiltered_tokens / 1_000_000) * input_price_per_1m * 1000
|
|
savings = unfiltered_cost - filtered_cost
|
|
savings_percent = (savings / unfiltered_cost * 100) if unfiltered_cost > 0 else 0.0
|
|
|
|
return CostEstimate(
|
|
template=template,
|
|
threshold=threshold,
|
|
filtered_tokens=comparison.filtered_tokens,
|
|
unfiltered_tokens=comparison.unfiltered_tokens,
|
|
filtered_cost_1k=filtered_cost,
|
|
unfiltered_cost_1k=unfiltered_cost,
|
|
savings_1k=savings,
|
|
savings_percent=savings_percent,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Quick Benchmarking Function
|
|
# =============================================================================
|
|
|
|
|
|
def quick_benchmark(threshold: float = 0.5) -> None:
|
|
"""Run a quick benchmark and print results.
|
|
|
|
Args:
|
|
threshold: Specificity threshold to test
|
|
|
|
Example:
|
|
>>> from backend.rag.specificity.token_counter import quick_benchmark
|
|
>>> quick_benchmark(0.5)
|
|
"""
|
|
print(f"\nRunning specificity filtering benchmark (threshold={threshold})...\n")
|
|
|
|
try:
|
|
results = benchmark_all_templates(threshold=threshold)
|
|
report = format_benchmark_report(results)
|
|
print(report)
|
|
|
|
# Show cost estimate for one template
|
|
print("\n" + "=" * 80)
|
|
print("COST ESTIMATE (GPT-4o-mini pricing)")
|
|
print("=" * 80)
|
|
|
|
for template_name in ["archive_search", "person_research", "general_heritage"]:
|
|
if template_name in results:
|
|
estimate = estimate_cost_savings(template_name, threshold)
|
|
print(f"\n{template_name}:")
|
|
print(f" Filtered: ${estimate.filtered_cost_1k:.4f} per 1K queries")
|
|
print(f" Unfiltered: ${estimate.unfiltered_cost_1k:.4f} per 1K queries")
|
|
print(f" Savings: ${estimate.savings_1k:.4f} ({estimate.savings_percent:.1f}%)")
|
|
|
|
except Exception as e:
|
|
print(f"Benchmark failed: {e}")
|
|
print("Make sure schema files are available.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
quick_benchmark(0.5)
|