""" Token counting utilities for benchmarking specificity filtering effectiveness. This module provides tools to measure context size reduction achieved by specificity-based filtering. It helps quantify: 1. Token count reduction per context template 2. Cost savings from reduced context 3. Comparison between filtered vs. unfiltered context Usage: from backend.rag.specificity.token_counter import ( count_tokens, compare_context_sizes, benchmark_all_templates, ) # Count tokens in a string tokens = count_tokens("Some text here") # Compare filtered vs unfiltered comparison = compare_context_sizes( template="archive_search", threshold=0.5 ) print(f"Reduction: {comparison['reduction_percent']:.1f}%") # Benchmark all templates results = benchmark_all_templates(threshold=0.5) for template, stats in results.items(): print(f"{template}: {stats['tokens']} tokens ({stats['reduction_percent']:.1f}% reduction)") """ from __future__ import annotations import logging from dataclasses import dataclass, field from typing import Optional logger = logging.getLogger(__name__) # Try to import tiktoken for accurate token counting try: import tiktoken TIKTOKEN_AVAILABLE = True except ImportError: tiktoken = None TIKTOKEN_AVAILABLE = False logger.warning("tiktoken not available, using approximate token counting") # Try to import schema loader for context generation try: from backend.rag.schema_loader import ( format_filtered_ontology_context, get_filtered_classes_for_context, get_class_count_by_template, ) SCHEMA_LOADER_AVAILABLE = True except ImportError: SCHEMA_LOADER_AVAILABLE = False logger.warning("Schema loader not available for token counting benchmarks") # Try to import context templates try: from backend.rag.specificity.models import ContextTemplate MODELS_AVAILABLE = True except ImportError: MODELS_AVAILABLE = False # ============================================================================= # Token Counting Functions # ============================================================================= def count_tokens(text: str, model: str = "gpt-4o") -> int: """Count tokens in text using tiktoken. Args: text: Text to count tokens for model: Model name for tokenizer selection (default: gpt-4o) Returns: Number of tokens Note: Falls back to approximate counting (chars/4) if tiktoken unavailable. """ if not text: return 0 if TIKTOKEN_AVAILABLE: try: # Get encoder for model try: encoding = tiktoken.encoding_for_model(model) except KeyError: # Fallback to cl100k_base (used by GPT-4, Claude, etc.) encoding = tiktoken.get_encoding("cl100k_base") return len(encoding.encode(text)) except Exception as e: logger.warning(f"tiktoken encoding failed: {e}, using approximation") # Fallback: approximate tokens as chars/4 return len(text) // 4 def count_tokens_for_context( context_template: str = "general_heritage", threshold: float = 0.5, model: str = "gpt-4o", ) -> int: """Count tokens in a specificity-filtered context. Args: context_template: Name of context template threshold: Specificity threshold (0.0-1.0) model: Model name for tokenizer Returns: Token count for the filtered context """ if not SCHEMA_LOADER_AVAILABLE: logger.warning("Schema loader not available") return 0 context = format_filtered_ontology_context(context_template, threshold) return count_tokens(context, model) # ============================================================================= # Comparison and Benchmarking # ============================================================================= @dataclass class ContextSizeComparison: """Comparison of filtered vs unfiltered context sizes.""" template: str threshold: float # Token counts filtered_tokens: int unfiltered_tokens: int # Class counts filtered_classes: int unfiltered_classes: int # Derived metrics token_reduction: int = field(init=False) token_reduction_percent: float = field(init=False) class_reduction: int = field(init=False) class_reduction_percent: float = field(init=False) def __post_init__(self): self.token_reduction = self.unfiltered_tokens - self.filtered_tokens self.token_reduction_percent = ( (self.token_reduction / self.unfiltered_tokens * 100) if self.unfiltered_tokens > 0 else 0.0 ) self.class_reduction = self.unfiltered_classes - self.filtered_classes self.class_reduction_percent = ( (self.class_reduction / self.unfiltered_classes * 100) if self.unfiltered_classes > 0 else 0.0 ) def to_dict(self) -> dict: """Convert to dictionary.""" return { "template": self.template, "threshold": self.threshold, "filtered_tokens": self.filtered_tokens, "unfiltered_tokens": self.unfiltered_tokens, "token_reduction": self.token_reduction, "token_reduction_percent": self.token_reduction_percent, "filtered_classes": self.filtered_classes, "unfiltered_classes": self.unfiltered_classes, "class_reduction": self.class_reduction, "class_reduction_percent": self.class_reduction_percent, } def __str__(self) -> str: return ( f"ContextSizeComparison({self.template}, threshold={self.threshold}):\n" f" Tokens: {self.filtered_tokens:,} / {self.unfiltered_tokens:,} " f"(-{self.token_reduction_percent:.1f}%)\n" f" Classes: {self.filtered_classes} / {self.unfiltered_classes} " f"(-{self.class_reduction_percent:.1f}%)" ) def compare_context_sizes( template: str = "archive_search", threshold: float = 0.5, model: str = "gpt-4o", ) -> ContextSizeComparison: """Compare filtered vs unfiltered context sizes. Args: template: Context template name threshold: Specificity threshold model: Model for token counting Returns: ContextSizeComparison with token and class counts """ if not SCHEMA_LOADER_AVAILABLE: raise RuntimeError("Schema loader not available for comparison") # Get filtered context filtered_context = format_filtered_ontology_context(template, threshold) filtered_tokens = count_tokens(filtered_context, model) filtered_classes = len(get_filtered_classes_for_context(template, threshold)) # Get unfiltered context (threshold=1.0 includes all classes) unfiltered_context = format_filtered_ontology_context("general_heritage", 1.0) unfiltered_tokens = count_tokens(unfiltered_context, model) unfiltered_classes = len(get_filtered_classes_for_context("general_heritage", 1.0)) return ContextSizeComparison( template=template, threshold=threshold, filtered_tokens=filtered_tokens, unfiltered_tokens=unfiltered_tokens, filtered_classes=filtered_classes, unfiltered_classes=unfiltered_classes, ) def benchmark_all_templates( threshold: float = 0.5, model: str = "gpt-4o", ) -> dict[str, ContextSizeComparison]: """Benchmark token counts for all context templates. Args: threshold: Specificity threshold to use model: Model for token counting Returns: Dict mapping template name to ContextSizeComparison """ if not SCHEMA_LOADER_AVAILABLE or not MODELS_AVAILABLE: raise RuntimeError("Required modules not available for benchmarking") results = {} for template in ContextTemplate: try: comparison = compare_context_sizes( template=template.value, threshold=threshold, model=model, ) results[template.value] = comparison except Exception as e: logger.warning(f"Failed to benchmark {template.value}: {e}") return results def format_benchmark_report( results: dict[str, ContextSizeComparison], include_header: bool = True, ) -> str: """Format benchmark results as a readable report. Args: results: Dict from benchmark_all_templates() include_header: Whether to include header Returns: Formatted string report """ lines = [] if include_header: lines.append("=" * 80) lines.append("SPECIFICITY FILTERING BENCHMARK REPORT") lines.append("=" * 80) lines.append("") # Sort by token reduction percentage (highest first) sorted_results = sorted( results.items(), key=lambda x: x[1].token_reduction_percent, reverse=True, ) # Table header lines.append(f"{'Template':<25} {'Tokens':>12} {'Reduction':>12} {'Classes':>10}") lines.append("-" * 60) total_filtered = 0 total_unfiltered = 0 for template_name, comparison in sorted_results: total_filtered += comparison.filtered_tokens total_unfiltered += comparison.unfiltered_tokens lines.append( f"{template_name:<25} " f"{comparison.filtered_tokens:>12,} " f"{comparison.token_reduction_percent:>10.1f}% " f"{comparison.filtered_classes:>10}" ) # Summary lines.append("-" * 60) avg_reduction = ( (total_unfiltered - total_filtered) / total_unfiltered * 100 if total_unfiltered > 0 else 0.0 ) lines.append(f"{'Average Reduction:':<25} {avg_reduction:>22.1f}%") lines.append("") # Baseline info if sorted_results: baseline = sorted_results[0][1] # First result has baseline info lines.append(f"Baseline (unfiltered): {baseline.unfiltered_tokens:,} tokens, " f"{baseline.unfiltered_classes} classes") return "\n".join(lines) # ============================================================================= # Cost Estimation # ============================================================================= @dataclass class CostEstimate: """Estimated cost savings from token reduction.""" template: str threshold: float # Token counts filtered_tokens: int unfiltered_tokens: int # Cost per 1000 queries (USD) filtered_cost_1k: float unfiltered_cost_1k: float savings_1k: float savings_percent: float def __str__(self) -> str: return ( f"CostEstimate({self.template}):\n" f" Per 1000 queries: ${self.filtered_cost_1k:.4f} vs ${self.unfiltered_cost_1k:.4f}\n" f" Savings: ${self.savings_1k:.4f} ({self.savings_percent:.1f}%)" ) def estimate_cost_savings( template: str = "archive_search", threshold: float = 0.5, model: str = "gpt-4o-mini", input_price_per_1m: float = 0.15, # GPT-4o-mini default ) -> CostEstimate: """Estimate cost savings from specificity filtering. Args: template: Context template threshold: Specificity threshold model: Model name (for token counting) input_price_per_1m: Price per 1M input tokens (USD) Returns: CostEstimate with savings calculation """ comparison = compare_context_sizes(template, threshold, model) # Calculate cost per 1000 queries filtered_cost = (comparison.filtered_tokens / 1_000_000) * input_price_per_1m * 1000 unfiltered_cost = (comparison.unfiltered_tokens / 1_000_000) * input_price_per_1m * 1000 savings = unfiltered_cost - filtered_cost savings_percent = (savings / unfiltered_cost * 100) if unfiltered_cost > 0 else 0.0 return CostEstimate( template=template, threshold=threshold, filtered_tokens=comparison.filtered_tokens, unfiltered_tokens=comparison.unfiltered_tokens, filtered_cost_1k=filtered_cost, unfiltered_cost_1k=unfiltered_cost, savings_1k=savings, savings_percent=savings_percent, ) # ============================================================================= # Quick Benchmarking Function # ============================================================================= def quick_benchmark(threshold: float = 0.5) -> None: """Run a quick benchmark and print results. Args: threshold: Specificity threshold to test Example: >>> from backend.rag.specificity.token_counter import quick_benchmark >>> quick_benchmark(0.5) """ print(f"\nRunning specificity filtering benchmark (threshold={threshold})...\n") try: results = benchmark_all_templates(threshold=threshold) report = format_benchmark_report(results) print(report) # Show cost estimate for one template print("\n" + "=" * 80) print("COST ESTIMATE (GPT-4o-mini pricing)") print("=" * 80) for template_name in ["archive_search", "person_research", "general_heritage"]: if template_name in results: estimate = estimate_cost_savings(template_name, threshold) print(f"\n{template_name}:") print(f" Filtered: ${estimate.filtered_cost_1k:.4f} per 1K queries") print(f" Unfiltered: ${estimate.unfiltered_cost_1k:.4f} per 1K queries") print(f" Savings: ${estimate.savings_1k:.4f} ({estimate.savings_percent:.1f}%)") except Exception as e: print(f"Benchmark failed: {e}") print("Make sure schema files are available.") if __name__ == "__main__": quick_benchmark(0.5)