#!/usr/bin/env python3 """ Benchmark OpenAI Prompt Caching for Heritage RAG Pipeline This script measures: 1. Token counts of schema-aware DSPy signatures (need 1024+ for caching) 2. Actual cache hit rates via OpenAI API 3. Latency improvements from prompt caching OpenAI Prompt Caching Requirements: - Minimum 1024 tokens in prompt for caching - Cache lifetime: 5-10 minutes active, up to 24h extended - 50% discount on cached input tokens - Up to 80% latency reduction Date: 2025-12-20 """ import json import os import time import sys from pathlib import Path from dataclasses import dataclass, field from typing import Optional # Add project root to path project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv() # Token counting try: import tiktoken TIKTOKEN_AVAILABLE = True except ImportError: TIKTOKEN_AVAILABLE = False print("Warning: tiktoken not installed. Install with: pip install tiktoken") # DSPy try: import dspy DSPY_AVAILABLE = True except ImportError: DSPY_AVAILABLE = False print("Warning: dspy not installed") # OpenAI try: from openai import OpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False print("Warning: openai not installed") @dataclass class SignatureTokenStats: """Statistics for a DSPy signature's token counts.""" name: str docstring_tokens: int = 0 input_fields_tokens: int = 0 output_fields_tokens: int = 0 total_tokens: int = 0 cacheable: bool = False # True if >= 1024 tokens notes: list[str] = field(default_factory=list) @dataclass class CacheTestResult: """Result of a cache hit test.""" request_num: int latency_ms: float prompt_tokens: int cached_tokens: int completion_tokens: int cache_hit_rate: float # 0.0 to 1.0 def count_tokens(text: str, model: str = "gpt-4o") -> int: """Count tokens in text using tiktoken.""" if not TIKTOKEN_AVAILABLE: # Rough estimate: ~4 characters per token return len(text) // 4 try: encoding = tiktoken.encoding_for_model(model) return len(encoding.encode(text)) except KeyError: # Fallback to cl100k_base for newer models encoding = tiktoken.get_encoding("cl100k_base") return len(encoding.encode(text)) def analyze_signature_tokens(signature_class, name: str) -> SignatureTokenStats: """Analyze token counts for a DSPy signature class.""" stats = SignatureTokenStats(name=name) # Get docstring docstring = signature_class.__doc__ or "" stats.docstring_tokens = count_tokens(docstring) # Analyze input fields input_desc = "" output_desc = "" # Get field annotations from signature if hasattr(signature_class, 'model_fields'): for field_name, field_info in signature_class.model_fields.items(): desc = str(field_info.default) if hasattr(field_info, 'default') else "" if 'input' in str(type(field_info)).lower() or 'Input' in str(field_info.default): input_desc += f"{field_name}: {desc}\n" else: output_desc += f"{field_name}: {desc}\n" # For DSPy signatures, also check __fields__ if hasattr(signature_class, '__fields__'): for field_name, field_info in signature_class.__fields__.items(): field_obj = getattr(signature_class, field_name, None) if field_obj: desc = getattr(field_obj, 'desc', str(field_obj)) if isinstance(field_obj, dspy.InputField): input_desc += f"{field_name}: {desc}\n" else: output_desc += f"{field_name}: {desc}\n" stats.input_fields_tokens = count_tokens(input_desc) stats.output_fields_tokens = count_tokens(output_desc) # Total tokens in the full prompt template stats.total_tokens = stats.docstring_tokens + stats.input_fields_tokens + stats.output_fields_tokens # Check if cacheable (>= 1024 tokens) stats.cacheable = stats.total_tokens >= 1024 if stats.total_tokens < 1024: stats.notes.append(f"Need {1024 - stats.total_tokens} more tokens for caching") else: stats.notes.append(f"Cacheable! {stats.total_tokens - 1024} tokens over threshold") return stats def test_openai_caching(num_requests: int = 5, model: str = "gpt-4o") -> list[CacheTestResult]: """Test OpenAI prompt caching with identical requests. Makes multiple identical requests and measures cache hit rates. """ if not OPENAI_AVAILABLE: print("OpenAI not available, skipping cache test") return [] api_key = os.getenv("OPENAI_API_KEY") if not api_key: print("OPENAI_API_KEY not set, skipping cache test") return [] client = OpenAI(api_key=api_key) results = [] # Create a long system prompt (>1024 tokens) to test caching # This simulates the schema-aware context system_prompt = """You are an expert in GLAM (Galleries, Libraries, Archives, Museums) heritage institutions. Your role is to classify user queries and route them to appropriate data sources. HERITAGE CUSTODIAN ONTOLOGY CONTEXT ============================================================ Hub Architecture: - Custodian (crm:E39_Actor): Central hub entity - CustodianObservation: Evidence from sources - CustodianName: Standardized emic names - CustodianLegalStatus: Formal legal entity - CustodianPlace: Geographic location - CustodianCollection: Heritage collections Heritage Custodian Types (GLAMORCUBESFIXPHDNT taxonomy): - GALLERY: Art gallery or exhibition space for visual arts - LIBRARY: Institution maintaining collections of books, periodicals, and other media - ARCHIVE: Institution maintaining historical records and documents - MUSEUM: Institution collecting, preserving, and displaying objects of cultural or scientific significance - OFFICIAL_INSTITUTION: Government-operated heritage institution or registry - RESEARCH_CENTER: Academic or research institution focused on heritage studies - COMMERCIAL: For-profit organization involved in heritage sector - UNSPECIFIED: Heritage institution whose specific type is not yet classified - BIO_CUSTODIAN: Institution focused on botanical, zoological, or natural heritage - EDUCATION_PROVIDER: Educational institution with significant heritage focus - HERITAGE_SOCIETY: Non-profit society or association focused on heritage preservation - FEATURE_CUSTODIAN: Institution responsible for physical landscape features - INTANGIBLE_HERITAGE_GROUP: Organization preserving intangible cultural heritage - MIXED: Institution combining multiple heritage types - PERSONAL_COLLECTION: Private collection maintained by an individual - HOLY_SACRED_SITE: Religious or sacred site with heritage significance - DIGITAL_PLATFORM: Online platform or digital repository for heritage content - NON_PROFIT: Non-governmental organization in heritage sector - TASTE_SCENT_HERITAGE: Institution preserving culinary, olfactory, or gustatory heritage Key Properties: - hc:hc_id: Global Heritage Custodian Identifier (GHCID) - hc:preferred_label: Primary name of the institution - hc:custodian_type: Type classification from GLAMORCUBESFIXPHDNT taxonomy - hc:legal_status: Legal form and registration status - hc:place_designation: Geographic location and address - hc:has_collection: Collections maintained by the institution - hc:identifiers: External identifiers (ISIL, Wikidata, VIAF) - hc:organizational_structure: Internal organization and departments - hc:encompassing_body: Parent organization or governing body Staff Role Categories (13 categories): - CURATORIAL: Curator, Collections Manager, Registrar - ARCHIVAL: Archivist, Records Manager, Digital Archivist - LIBRARY: Librarian, Cataloger, Reference Specialist - CONSERVATION: Conservator, Restorer, Conservation Scientist - DIGITAL: Data Engineer, Digital Curator, Software Developer - EDUCATION: Museum Educator, Public Programs Manager - RESEARCH: Researcher, Historian, Archaeologist - ADMINISTRATIVE: Director, Manager, Coordinator - VISITOR_SERVICES: Front Desk, Tour Guide, Security - DEVELOPMENT: Fundraiser, Grant Writer, Donor Relations - COMMUNICATIONS: Marketing, Public Relations, Social Media - FACILITIES: Building Manager, Exhibition Installer - VOLUNTEER: Volunteer Coordinator, Docent Key Ontology Prefixes: PREFIX hc: PREFIX crm: PREFIX prov: PREFIX schema: PREFIX cpov: PREFIX rico: PREFIX foaf: PREFIX tooi: PREFIX org: PREFIX skos: PREFIX dcterms: PREFIX wdt: PREFIX wikidata: PREFIX geo: PREFIX ghcid: MULTILINGUAL SYNONYMS: - MUSEUM: "museum", "musea", "museo", "musée", "музей" (ru), "博物馆" (zh) - LIBRARY: "library", "bibliotheek", "bibliothèque", "biblioteca", "библиотека" (ru) - ARCHIVE: "archive", "archief", "archiv", "archivo", "архив" (ru), "档案馆" (zh) - GALLERY: "gallery", "galerie", "galería", "galleria", "галерея" (ru) When classifying queries: 1. Identify the primary intent (geographic, statistical, relational, temporal, entity_lookup, comparative, exploration) 2. Extract named entities (institution names, places, dates) 3. Recommend data sources (qdrant, sparql, typedb, postgis) 4. Classify entity type (person, institution, or both) 5. If person-related, identify the role category and specific role 6. If institution-related, identify the custodian type ============================================================ """ # Count tokens in system prompt system_tokens = count_tokens(system_prompt) print(f"\nSystem prompt tokens: {system_tokens}") print(f"Cacheable: {system_tokens >= 1024}") # Make identical requests user_message = "How many museums are there in Amsterdam?" print(f"\nMaking {num_requests} identical requests to test caching...") print("-" * 60) for i in range(num_requests): start_time = time.time() try: response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], max_tokens=150, ) latency_ms = (time.time() - start_time) * 1000 # Extract usage stats usage = response.usage prompt_tokens = usage.prompt_tokens completion_tokens = usage.completion_tokens # Check for cached tokens (new OpenAI feature) cached_tokens = 0 if hasattr(usage, 'prompt_tokens_details') and usage.prompt_tokens_details: cached_tokens = getattr(usage.prompt_tokens_details, 'cached_tokens', 0) or 0 cache_hit_rate = cached_tokens / prompt_tokens if prompt_tokens > 0 else 0 result = CacheTestResult( request_num=i + 1, latency_ms=latency_ms, prompt_tokens=prompt_tokens, cached_tokens=cached_tokens, completion_tokens=completion_tokens, cache_hit_rate=cache_hit_rate, ) results.append(result) print(f"Request {i+1}: {latency_ms:.0f}ms | " f"Prompt: {prompt_tokens} | " f"Cached: {cached_tokens} ({cache_hit_rate:.1%}) | " f"Completion: {completion_tokens}") # Small delay between requests if i < num_requests - 1: time.sleep(0.5) except Exception as e: print(f"Request {i+1} failed: {e}") return results def analyze_heritage_rag_signatures(): """Analyze all DSPy signatures in the Heritage RAG pipeline.""" print("\n" + "=" * 70) print("HERITAGE RAG SIGNATURE TOKEN ANALYSIS") print("=" * 70) try: from backend.rag.dspy_heritage_rag import ( get_schema_aware_query_intent_signature, HeritageQueryIntent, HeritageEntityExtraction, HeritageAnswerGeneration, ) from backend.rag.schema_loader import ( create_schema_aware_sparql_docstring, create_schema_aware_entity_docstring, get_ontology_context, ) except ImportError as e: print(f"Could not import heritage RAG modules: {e}") return print("\n1. SCHEMA-AWARE CONTEXT TOKEN COUNTS") print("-" * 50) # Analyze ontology context try: ontology_context = get_ontology_context() ont_tokens = count_tokens(ontology_context) print(f"Ontology Context: {ont_tokens:,} tokens {'[CACHEABLE]' if ont_tokens >= 1024 else '[TOO SHORT]'}") except Exception as e: print(f"Could not get ontology context: {e}") # Analyze SPARQL docstring try: sparql_doc = create_schema_aware_sparql_docstring() sparql_tokens = count_tokens(sparql_doc) print(f"SPARQL Docstring: {sparql_tokens:,} tokens {'[CACHEABLE]' if sparql_tokens >= 1024 else '[TOO SHORT]'}") except Exception as e: print(f"Could not get SPARQL docstring: {e}") # Analyze entity docstring try: entity_doc = create_schema_aware_entity_docstring() entity_tokens = count_tokens(entity_doc) print(f"Entity Extractor Doc: {entity_tokens:,} tokens {'[CACHEABLE]' if entity_tokens >= 1024 else '[TOO SHORT]'}") except Exception as e: print(f"Could not get entity docstring: {e}") print("\n2. DSPY SIGNATURE TOKEN COUNTS") print("-" * 50) # Analyze each signature signatures_to_analyze = [ ("HeritageQueryIntent (base)", HeritageQueryIntent), ] try: schema_aware_sig = get_schema_aware_query_intent_signature() signatures_to_analyze.append(("SchemaAwareQueryIntent", schema_aware_sig)) except Exception as e: print(f"Could not get schema-aware signature: {e}") signatures_to_analyze.extend([ ("HeritageEntityExtraction", HeritageEntityExtraction), ("HeritageAnswerGeneration", HeritageAnswerGeneration), ]) for name, sig_class in signatures_to_analyze: try: stats = analyze_signature_tokens(sig_class, name) status = "[CACHEABLE]" if stats.cacheable else "[TOO SHORT]" print(f"{name}: {stats.total_tokens:,} tokens {status}") print(f" Docstring: {stats.docstring_tokens}, Fields: {stats.input_fields_tokens + stats.output_fields_tokens}") for note in stats.notes: print(f" Note: {note}") except Exception as e: print(f"{name}: Error - {e}") print("\n3. RECOMMENDATIONS") print("-" * 50) recommendations = [ "1. Restructure prompts with STATIC content first (ontology context)", " - Move schema definitions, type lists, prefix declarations to prompt start", " - Put user query and dynamic content at the end", "", "2. Consider consolidating short signatures:", " - If docstrings are under 1024 tokens, merge related signatures", " - Or add more static ontology context to reach caching threshold", "", "3. Prompt structure for caching:", " [SYSTEM - STATIC] Ontology context, type definitions (~1500 tokens)", " [SYSTEM - STATIC] Role categories, properties (~500 tokens)", " [USER - DYNAMIC] Actual query (variable)", "", "4. Cache hit optimization:", " - Use identical system prompts across requests", " - Vary only user message content", " - Keep prompt prefix consistent for 1024+ tokens", ] for rec in recommendations: print(rec) def main(): """Run all prompt caching benchmarks.""" print("\n" + "=" * 70) print("OPENAI PROMPT CACHING BENCHMARK FOR HERITAGE RAG") print("=" * 70) print(f"Tiktoken available: {TIKTOKEN_AVAILABLE}") print(f"DSPy available: {DSPY_AVAILABLE}") print(f"OpenAI available: {OPENAI_AVAILABLE}") # 1. Analyze signature token counts analyze_heritage_rag_signatures() # 2. Test actual OpenAI caching print("\n" + "=" * 70) print("OPENAI CACHE HIT TEST") print("=" * 70) results = test_openai_caching(num_requests=5) if results: print("\n4. CACHE TEST SUMMARY") print("-" * 50) avg_latency = sum(r.latency_ms for r in results) / len(results) avg_cache_rate = sum(r.cache_hit_rate for r in results) / len(results) # Compare first vs subsequent requests first_latency = results[0].latency_ms subsequent_latencies = [r.latency_ms for r in results[1:]] avg_subsequent = sum(subsequent_latencies) / len(subsequent_latencies) if subsequent_latencies else 0 print(f"Average latency: {avg_latency:.0f}ms") print(f"First request: {first_latency:.0f}ms") print(f"Subsequent avg: {avg_subsequent:.0f}ms") if first_latency > 0: improvement = (first_latency - avg_subsequent) / first_latency * 100 print(f"Latency improvement: {improvement:.1f}%") print(f"Average cache hit rate: {avg_cache_rate:.1%}") # Calculate potential savings total_prompt_tokens = sum(r.prompt_tokens for r in results) total_cached_tokens = sum(r.cached_tokens for r in results) print(f"\nTotal prompt tokens: {total_prompt_tokens:,}") print(f"Total cached tokens: {total_cached_tokens:,}") if total_prompt_tokens > 0: savings_pct = total_cached_tokens / total_prompt_tokens * 50 # 50% discount on cached print(f"Estimated cost savings: {savings_pct:.1f}%") if __name__ == "__main__": main()