glam/backend/rag/benchmark_optimization_v2.py
2025-12-21 22:12:34 +01:00

394 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Benchmark script comparing baseline vs optimized Heritage RAG pipeline.
This version runs each query in a separate subprocess to eliminate
in-memory caching artifacts that caused false "0% improvement" results.
Usage:
python benchmark_optimization_v2.py
Requirements:
- SSH tunnel active: ssh -f -N -L 7878:localhost:7878 root@91.98.224.44
- Environment loaded: source .venv/bin/activate && source .env
"""
import json
import os
import re
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
# =============================================================================
# TEST QUERIES
# =============================================================================
TEST_QUERIES = [
{
"query": "List all libraries in Utrecht",
"expected_intent": "geographic",
# Include both English AND Dutch keywords for bilingual scoring
"expected_keywords": ["library", "bibliotheek", "utrecht"],
"description": "Geographic query - libraries in specific city"
},
{
"query": "What is the history of the Rijksmuseum?",
# Accept temporal, entity_lookup, or relational as valid intents
"expected_intent": "temporal",
"accepted_intents": ["temporal", "entity_lookup", "relational", "exploration"],
"expected_keywords": ["rijksmuseum", "museum", "1800", "opgericht", "founded", "geschiedenis", "history"],
"description": "Entity lookup - specific institution history"
},
{
"query": "How many archives are in the Netherlands?",
"expected_intent": "statistical",
# Bilingual keywords
"expected_keywords": ["archief", "archive", "nederland", "netherlands", "aantal", "number"],
"description": "Statistical query - count institutions"
},
{
"query": "Compare museums in Amsterdam and Rotterdam",
"expected_intent": "comparative",
"expected_keywords": ["amsterdam", "rotterdam", "museum", "musea"],
"description": "Comparative query - two cities"
},
{
"query": "Welke archieven zijn er in Groningen?",
"expected_intent": "geographic",
"expected_keywords": ["archief", "groningen", "groninger"],
"description": "Dutch language - geographic query"
},
{
"query": "What heritage institutions are in Drenthe?",
"expected_intent": "geographic",
# Bilingual heritage keywords
"expected_keywords": ["drenthe", "museum", "archief", "heritage", "erfgoed", "drents"],
"description": "Geographic query - heritage in province"
},
{
"query": "Tell me about archives in Friesland",
"expected_intent": "geographic",
# Include various forms of Frisian/Friesland
"expected_keywords": ["friesland", "fryslân", "archief", "fries", "tresoar", "frysk"],
"description": "Geographic query - archives in province"
},
]
def run_query_subprocess(query: str, use_optimized: bool, timeout: int = 120) -> dict:
"""
Run a single query in a separate Python subprocess.
This eliminates any in-memory caching between baseline and optimized runs.
"""
load_code = ""
if use_optimized:
load_code = """
pipeline.load('backend/rag/optimized_models/heritage_rag_bootstrap_latest.json')
demos = len(pipeline.answer_gen.predict.demos)
"""
else:
load_code = """
demos = 0
"""
# Escape the query for Python string
escaped_query = query.replace("'", "\\'").replace('"', '\\"')
code = f'''
import sys
import os
import time
import json
sys.path.insert(0, '.')
os.environ['LITELLM_CACHE'] = 'False'
import dspy
from backend.rag.dspy_heritage_rag import HeritageRAGPipeline
# Configure DSPy with no caching
lm = dspy.LM('openai/gpt-4o-mini', cache=False, max_tokens=1024)
dspy.configure(lm=lm)
# Create pipeline
pipeline = HeritageRAGPipeline()
{load_code}
# Run query
query = "{escaped_query}"
start = time.time()
try:
result = pipeline(query)
elapsed = time.time() - start
output = {{
"success": True,
"intent": result.intent,
"answer": result.answer,
"latency": elapsed,
"demos": demos
}}
except Exception as e:
output = {{
"success": False,
"error": str(e),
"latency": time.time() - start,
"demos": demos
}}
print("RESULT_JSON:" + json.dumps(output))
'''
try:
result = subprocess.run(
['python3', '-c', code],
capture_output=True,
text=True,
env={**os.environ, 'PYTHONPATH': '.'},
cwd=str(Path(__file__).parent.parent.parent),
timeout=timeout
)
# Extract JSON from output
output = result.stdout + result.stderr
match = re.search(r'RESULT_JSON:(\{.*\})', output, re.DOTALL)
if match:
return json.loads(match.group(1))
else:
return {
"success": False,
"error": f"Could not parse output: {output[:500]}",
"latency": 0,
"demos": 0
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "Timeout",
"latency": timeout,
"demos": 0
}
except Exception as e:
return {
"success": False,
"error": str(e),
"latency": 0,
"demos": 0
}
def score_result(query_info: dict, result: dict) -> dict:
"""Score a result against expected values."""
scores = {}
if not result.get("success"):
return {"intent": 0.0, "keywords": 0.0, "non_empty": 0.0, "total": 0.0}
# Intent match (40%)
expected = query_info.get('expected_intent', '').lower()
actual = result.get('intent', '').lower()
# Check if intent matches primary or accepted alternatives
accepted = query_info.get('accepted_intents', [expected])
accepted = [i.lower() for i in accepted]
intent_match = actual in accepted
# Partial credit for related intents
related = {
('geographic', 'entity_lookup'): 0.5,
('geographic', 'exploration'): 0.7,
('statistical', 'comparative'): 0.5,
('exploration', 'entity_lookup'): 0.7,
('exploration', 'geographic'): 0.7,
('temporal', 'entity_lookup'): 0.8,
('temporal', 'relational'): 0.7,
('relational', 'entity_lookup'): 0.7,
}
if intent_match:
scores['intent'] = 1.0
else:
pair = (expected, actual)
reverse = (actual, expected)
scores['intent'] = related.get(pair, related.get(reverse, 0.0))
# Keyword match (40%)
answer = result.get('answer', '').lower()
keywords = query_info.get('expected_keywords', [])
if keywords:
matches = sum(1 for kw in keywords if kw.lower() in answer)
scores['keywords'] = matches / len(keywords)
else:
scores['keywords'] = 0.0
# Non-empty answer (20%)
if answer and len(answer.strip()) > 100:
scores['non_empty'] = 1.0
elif answer and len(answer.strip()) > 20:
scores['non_empty'] = 0.7
elif answer and len(answer.strip()) > 0:
scores['non_empty'] = 0.3
else:
scores['non_empty'] = 0.0
# Weighted total
scores['total'] = (
0.40 * scores['intent'] +
0.40 * scores['keywords'] +
0.20 * scores['non_empty']
)
return scores
def run_benchmark():
"""Run benchmark comparing baseline vs optimized pipeline."""
print("\n" + "=" * 70)
print("Heritage RAG Pipeline - Optimization Benchmark v2")
print("(Subprocess isolation to prevent caching artifacts)")
print("=" * 70)
baseline_results = []
optimized_results = []
# ==========================================================================
# RUN ALL QUERIES
# ==========================================================================
for i, q in enumerate(TEST_QUERIES):
print(f"\n{'='*70}")
print(f"[{i+1}/{len(TEST_QUERIES)}] {q['description']}")
print(f"Query: {q['query']}")
print("=" * 70)
# BASELINE
print("\n[BASELINE] Running in subprocess...")
base_result = run_query_subprocess(q['query'], use_optimized=False)
base_scores = score_result(q, base_result)
baseline_results.append({
"query": q,
"result": base_result,
"scores": base_scores
})
if base_result.get("success"):
print(f" Intent: {base_result['intent']} (expected: {q['expected_intent']}) → {base_scores['intent']:.0%}")
print(f" Keywords: {base_scores['keywords']:.0%} matched")
print(f" Score: {base_scores['total']:.2f} | Latency: {base_result['latency']:.1f}s")
print(f" Answer: {base_result['answer'][:100]}...")
else:
print(f" ERROR: {base_result.get('error', 'Unknown')}")
# OPTIMIZED
print("\n[OPTIMIZED] Running in subprocess...")
opt_result = run_query_subprocess(q['query'], use_optimized=True)
opt_scores = score_result(q, opt_result)
optimized_results.append({
"query": q,
"result": opt_result,
"scores": opt_scores
})
if opt_result.get("success"):
print(f" Demos loaded: {opt_result.get('demos', 0)}")
print(f" Intent: {opt_result['intent']} (expected: {q['expected_intent']}) → {opt_scores['intent']:.0%}")
print(f" Keywords: {opt_scores['keywords']:.0%} matched")
print(f" Score: {opt_scores['total']:.2f} | Latency: {opt_result['latency']:.1f}s")
print(f" Answer: {opt_result['answer'][:100]}...")
else:
print(f" ERROR: {opt_result.get('error', 'Unknown')}")
# Per-query comparison
delta = opt_scores['total'] - base_scores['total']
print(f"\n → Delta: {delta:+.2f} ({'improved' if delta > 0 else 'same' if delta == 0 else 'worse'})")
# ==========================================================================
# SUMMARY
# ==========================================================================
print("\n" + "=" * 70)
print("BENCHMARK SUMMARY")
print("=" * 70)
baseline_scores = [r['scores']['total'] for r in baseline_results]
optimized_scores = [r['scores']['total'] for r in optimized_results]
baseline_avg = sum(baseline_scores) / len(baseline_scores)
optimized_avg = sum(optimized_scores) / len(optimized_scores)
improvement = optimized_avg - baseline_avg
improvement_pct = (improvement / baseline_avg * 100) if baseline_avg > 0 else 0
# Latency comparison
baseline_latencies = [r['result']['latency'] for r in baseline_results if r['result'].get('success')]
optimized_latencies = [r['result']['latency'] for r in optimized_results if r['result'].get('success')]
baseline_lat_avg = sum(baseline_latencies) / len(baseline_latencies) if baseline_latencies else 0
optimized_lat_avg = sum(optimized_latencies) / len(optimized_latencies) if optimized_latencies else 0
print(f"\n Quality Scores:")
print(f" Baseline Score: {baseline_avg:.3f}")
print(f" Optimized Score: {optimized_avg:.3f}")
print(f" Improvement: {improvement:+.3f} ({improvement_pct:+.1f}%)")
print(f"\n Latency (avg):")
print(f" Baseline: {baseline_lat_avg:.1f}s")
print(f" Optimized: {optimized_lat_avg:.1f}s")
print(f" Speedup: {((baseline_lat_avg - optimized_lat_avg) / baseline_lat_avg * 100) if baseline_lat_avg > 0 else 0:.1f}%")
print("\n Per-query comparison:")
print(f" {'Query':<40} {'Base':>8} {'Opt':>8} {'Δ':>8} {'Base(s)':>8} {'Opt(s)':>8}")
print(" " + "-" * 82)
for i, q in enumerate(TEST_QUERIES):
base = baseline_results[i]['scores']['total']
opt = optimized_results[i]['scores']['total']
delta = opt - base
base_lat = baseline_results[i]['result']['latency']
opt_lat = optimized_results[i]['result']['latency']
desc = q['description'][:38]
print(f" {desc:<40} {base:>8.2f} {opt:>8.2f} {delta:>+8.2f} {base_lat:>8.1f} {opt_lat:>8.1f}")
print("\n" + "=" * 70)
# Save results
results = {
"timestamp": datetime.now().isoformat(),
"version": "v2_subprocess_isolation",
"baseline_avg": baseline_avg,
"optimized_avg": optimized_avg,
"improvement": improvement,
"improvement_pct": improvement_pct,
"baseline_latency_avg": baseline_lat_avg,
"optimized_latency_avg": optimized_lat_avg,
"per_query": [
{
"query": q["query"],
"description": q["description"],
"baseline_score": baseline_results[i]['scores']['total'],
"optimized_score": optimized_results[i]['scores']['total'],
"baseline_latency": baseline_results[i]['result']['latency'],
"optimized_latency": optimized_results[i]['result']['latency'],
"baseline_intent": baseline_results[i]['result'].get('intent'),
"optimized_intent": optimized_results[i]['result'].get('intent'),
}
for i, q in enumerate(TEST_QUERIES)
]
}
results_path = Path(__file__).parent / "optimized_models" / "benchmark_results_v2.json"
with open(results_path, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to: {results_path.name}")
return results
if __name__ == "__main__":
run_benchmark()