- Layer 1: 35 unit tests (no LLM required) - Layer 2: 56 DSPy module tests with LLM - Layer 3: 10 integration tests with Oxigraph - Layer 4: Comprehensive evaluation suite Fixed: - Coordinate queries to use schema:location -> blank node pattern - Golden query expected intent for location questions - Health check test filtering in Layer 4 Added GitHub Actions workflow for CI/CD evaluation
407 lines
14 KiB
Python
407 lines
14 KiB
Python
"""
|
|
Layer 4: Comprehensive Evaluation - Full pipeline evaluation
|
|
|
|
Runs complete evaluation on full datasets:
|
|
- Full dev set evaluation
|
|
- Regression detection
|
|
- Performance benchmarking
|
|
- Quality trend tracking
|
|
|
|
Target: Nightly runs, overall RAG score ≥75% (warning, not blocking)
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import pytest
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
# Add backend to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "backend" / "rag"))
|
|
|
|
from .conftest import requires_dspy, requires_llm
|
|
|
|
|
|
# =============================================================================
|
|
# Evaluation Results Storage
|
|
# =============================================================================
|
|
|
|
RESULTS_DIR = Path(__file__).parent / "results"
|
|
|
|
|
|
def save_evaluation_results(
|
|
results: dict,
|
|
run_id: Optional[str] = None,
|
|
) -> Path:
|
|
"""Save evaluation results to JSON file.
|
|
|
|
Args:
|
|
results: Evaluation results dict
|
|
run_id: Optional run identifier
|
|
|
|
Returns:
|
|
Path to saved results file
|
|
"""
|
|
RESULTS_DIR.mkdir(exist_ok=True)
|
|
|
|
if run_id is None:
|
|
run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
|
|
filepath = RESULTS_DIR / f"eval_{run_id}.json"
|
|
|
|
with open(filepath, "w") as f:
|
|
json.dump(results, f, indent=2, default=str)
|
|
|
|
return filepath
|
|
|
|
|
|
def load_previous_results() -> list[dict]:
|
|
"""Load previous evaluation results for comparison.
|
|
|
|
Returns:
|
|
List of previous result dicts, sorted by date
|
|
"""
|
|
if not RESULTS_DIR.exists():
|
|
return []
|
|
|
|
results = []
|
|
for filepath in sorted(RESULTS_DIR.glob("eval_*.json")):
|
|
try:
|
|
with open(filepath) as f:
|
|
results.append(json.load(f))
|
|
except Exception:
|
|
pass
|
|
|
|
return results
|
|
|
|
|
|
# =============================================================================
|
|
# Full Pipeline Evaluation
|
|
# =============================================================================
|
|
|
|
@requires_dspy
|
|
@requires_llm
|
|
class TestFullPipelineEvaluation:
|
|
"""Comprehensive pipeline evaluation."""
|
|
|
|
def test_full_dev_set_evaluation(self, dev_set, dspy_lm):
|
|
"""Evaluate full pipeline on dev set."""
|
|
import dspy
|
|
from tests.dspy_gitops.metrics import heritage_rag_metric
|
|
|
|
# Import pipeline components
|
|
try:
|
|
from backend.rag.dspy_heritage_rag import HeritageQueryIntent
|
|
except ImportError:
|
|
pytest.skip("Heritage RAG pipeline not available")
|
|
|
|
classifier = dspy.Predict(HeritageQueryIntent)
|
|
|
|
results = {
|
|
"run_timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"model": "claude-sonnet-4-20250514",
|
|
"dataset": "heritage_rag_dev.json",
|
|
"dataset_size": len(dev_set),
|
|
"scores": {
|
|
"intent_accuracy": [],
|
|
"entity_f1": [],
|
|
"overall": [],
|
|
},
|
|
"per_example": [],
|
|
}
|
|
|
|
for i, example in enumerate(dev_set):
|
|
try:
|
|
pred = classifier(
|
|
question=example.question,
|
|
language=example.language,
|
|
)
|
|
|
|
# Add mock fields for full metric evaluation
|
|
pred.sparql = "SELECT ?s WHERE { ?s a ?t }"
|
|
pred.answer = "Generated answer"
|
|
pred.citations = ["oxigraph"]
|
|
pred.confidence = 0.8
|
|
|
|
score = heritage_rag_metric(example, pred)
|
|
|
|
# Calculate component scores
|
|
from tests.dspy_gitops.metrics import intent_accuracy_metric, entity_f1
|
|
intent_score = intent_accuracy_metric(example, pred)
|
|
entity_score = entity_f1(
|
|
getattr(example, "expected_entities", []),
|
|
getattr(pred, "entities", []),
|
|
)
|
|
|
|
results["scores"]["intent_accuracy"].append(intent_score)
|
|
results["scores"]["entity_f1"].append(entity_score)
|
|
results["scores"]["overall"].append(score)
|
|
|
|
results["per_example"].append({
|
|
"index": i,
|
|
"question": example.question[:100],
|
|
"expected_intent": example.expected_intent,
|
|
"predicted_intent": pred.intent,
|
|
"intent_correct": intent_score == 1.0,
|
|
"entity_f1": entity_score,
|
|
"overall_score": score,
|
|
})
|
|
|
|
except Exception as e:
|
|
results["per_example"].append({
|
|
"index": i,
|
|
"question": example.question[:100],
|
|
"error": str(e),
|
|
"overall_score": 0.0,
|
|
})
|
|
results["scores"]["overall"].append(0.0)
|
|
|
|
# Calculate aggregates
|
|
results["aggregates"] = {
|
|
"intent_accuracy": sum(results["scores"]["intent_accuracy"]) / len(results["scores"]["intent_accuracy"]) if results["scores"]["intent_accuracy"] else 0,
|
|
"entity_f1_avg": sum(results["scores"]["entity_f1"]) / len(results["scores"]["entity_f1"]) if results["scores"]["entity_f1"] else 0,
|
|
"overall_avg": sum(results["scores"]["overall"]) / len(results["scores"]["overall"]) if results["scores"]["overall"] else 0,
|
|
"pass_rate": sum(1 for s in results["scores"]["overall"] if s >= 0.5) / len(results["scores"]["overall"]) if results["scores"]["overall"] else 0,
|
|
}
|
|
|
|
# Save results
|
|
save_evaluation_results(results)
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("FULL PIPELINE EVALUATION RESULTS")
|
|
print("=" * 60)
|
|
print(f"Dataset size: {results['dataset_size']}")
|
|
print(f"Intent accuracy: {results['aggregates']['intent_accuracy']:.2%}")
|
|
print(f"Entity F1 avg: {results['aggregates']['entity_f1_avg']:.2%}")
|
|
print(f"Overall avg: {results['aggregates']['overall_avg']:.2%}")
|
|
print(f"Pass rate (≥50%): {results['aggregates']['pass_rate']:.2%}")
|
|
print("=" * 60)
|
|
|
|
# Assert minimum quality (warning level, not hard fail)
|
|
overall = results["aggregates"]["overall_avg"]
|
|
if overall < 0.75:
|
|
print(f"WARNING: Overall score {overall:.2%} below 75% target")
|
|
|
|
assert overall > 0.3, f"Overall score {overall:.2%} critically low"
|
|
|
|
|
|
# =============================================================================
|
|
# Regression Detection
|
|
# =============================================================================
|
|
|
|
@requires_dspy
|
|
@requires_llm
|
|
class TestRegressionDetection:
|
|
"""Detect quality regressions from previous runs."""
|
|
|
|
def test_no_regression_from_baseline(self, dev_set, dspy_lm):
|
|
"""Check for regression from previous results."""
|
|
import dspy
|
|
from tests.dspy_gitops.metrics import heritage_rag_metric
|
|
|
|
try:
|
|
from backend.rag.dspy_heritage_rag import HeritageQueryIntent
|
|
except ImportError:
|
|
pytest.skip("Heritage RAG pipeline not available")
|
|
|
|
# Load previous results
|
|
previous = load_previous_results()
|
|
if not previous:
|
|
pytest.skip("No previous results for regression comparison")
|
|
|
|
baseline = previous[-1] # Most recent
|
|
baseline_score = baseline.get("aggregates", {}).get("overall_avg", 0)
|
|
|
|
# Run current evaluation on sample
|
|
classifier = dspy.Predict(HeritageQueryIntent)
|
|
|
|
current_scores = []
|
|
for example in dev_set[:10]: # Sample for speed
|
|
try:
|
|
pred = classifier(
|
|
question=example.question,
|
|
language=example.language,
|
|
)
|
|
pred.sparql = "SELECT ?s WHERE { ?s a ?t }"
|
|
pred.answer = "Generated answer"
|
|
pred.citations = []
|
|
pred.confidence = 0.8
|
|
|
|
score = heritage_rag_metric(example, pred)
|
|
current_scores.append(score)
|
|
except Exception:
|
|
current_scores.append(0.0)
|
|
|
|
current_avg = sum(current_scores) / len(current_scores) if current_scores else 0
|
|
|
|
# Check for regression (10% tolerance)
|
|
regression_threshold = baseline_score * 0.9
|
|
|
|
print(f"\nBaseline score: {baseline_score:.2%}")
|
|
print(f"Current score: {current_avg:.2%}")
|
|
print(f"Regression threshold: {regression_threshold:.2%}")
|
|
|
|
if current_avg < regression_threshold:
|
|
print(f"WARNING: Potential regression detected!")
|
|
# Don't fail, just warn
|
|
else:
|
|
print("No regression detected")
|
|
|
|
|
|
# =============================================================================
|
|
# Golden Test Suite
|
|
# =============================================================================
|
|
|
|
@requires_dspy
|
|
@requires_llm
|
|
class TestGoldenQueries:
|
|
"""Test critical golden queries that must always pass."""
|
|
|
|
def test_all_golden_queries(self, golden_tests, dspy_lm):
|
|
"""All golden queries must pass."""
|
|
import dspy
|
|
|
|
try:
|
|
from backend.rag.dspy_heritage_rag import HeritageQueryIntent
|
|
except ImportError:
|
|
pytest.skip("Heritage RAG pipeline not available")
|
|
|
|
classifier = dspy.Predict(HeritageQueryIntent)
|
|
|
|
failures = []
|
|
|
|
# Filter out health check tests - those don't have questions
|
|
query_tests = [t for t in golden_tests if "question" in t]
|
|
|
|
for test in query_tests:
|
|
try:
|
|
pred = classifier(
|
|
question=test["question"],
|
|
language=test.get("language", "nl"),
|
|
)
|
|
|
|
# Check intent
|
|
expected_intent = test.get("expected_intent")
|
|
if expected_intent and pred.intent != expected_intent:
|
|
failures.append({
|
|
"test_id": test.get("id", "unknown"),
|
|
"question": test["question"],
|
|
"expected_intent": expected_intent,
|
|
"actual_intent": pred.intent,
|
|
})
|
|
|
|
except Exception as e:
|
|
failures.append({
|
|
"test_id": test.get("id", "unknown"),
|
|
"question": test.get("question", "N/A"),
|
|
"error": str(e),
|
|
})
|
|
|
|
if failures:
|
|
print("\nGolden test failures:")
|
|
for f in failures:
|
|
print(f" - {f.get('test_id')}: {f}")
|
|
|
|
# Golden tests are critical - they should pass
|
|
assert len(failures) == 0, f"{len(failures)} golden tests failed"
|
|
|
|
|
|
# =============================================================================
|
|
# Performance Benchmarking
|
|
# =============================================================================
|
|
|
|
@requires_dspy
|
|
@requires_llm
|
|
class TestPerformanceBenchmark:
|
|
"""Benchmark response times."""
|
|
|
|
def test_classification_latency(self, sample_queries, dspy_lm):
|
|
"""Classification should complete within time budget."""
|
|
import time
|
|
import dspy
|
|
|
|
try:
|
|
from backend.rag.dspy_heritage_rag import HeritageQueryIntent
|
|
except ImportError:
|
|
pytest.skip("Heritage RAG pipeline not available")
|
|
|
|
classifier = dspy.Predict(HeritageQueryIntent)
|
|
|
|
latencies = []
|
|
|
|
for query in sample_queries[:5]:
|
|
start = time.time()
|
|
try:
|
|
_ = classifier(
|
|
question=query["question"],
|
|
language=query["language"],
|
|
)
|
|
except Exception:
|
|
pass
|
|
latencies.append(time.time() - start)
|
|
|
|
avg_latency = sum(latencies) / len(latencies)
|
|
max_latency = max(latencies)
|
|
|
|
print(f"\nClassification latency:")
|
|
print(f" Average: {avg_latency:.2f}s")
|
|
print(f" Max: {max_latency:.2f}s")
|
|
|
|
# Classification should be fast (< 5s average)
|
|
assert avg_latency < 5.0, f"Average latency {avg_latency:.2f}s too high"
|
|
|
|
|
|
# =============================================================================
|
|
# Quality Trend Analysis
|
|
# =============================================================================
|
|
|
|
class TestQualityTrends:
|
|
"""Analyze quality trends over time."""
|
|
|
|
def test_quality_trend_positive(self):
|
|
"""Quality should not be declining over time."""
|
|
previous = load_previous_results()
|
|
|
|
if len(previous) < 3:
|
|
pytest.skip("Need at least 3 previous runs for trend analysis")
|
|
|
|
# Get last 5 runs
|
|
recent = previous[-5:]
|
|
scores = [r.get("aggregates", {}).get("overall_avg", 0) for r in recent]
|
|
|
|
# Check trend (simple linear regression slope)
|
|
n = len(scores)
|
|
x_mean = (n - 1) / 2
|
|
y_mean = sum(scores) / n
|
|
|
|
numerator = sum((i - x_mean) * (scores[i] - y_mean) for i in range(n))
|
|
denominator = sum((i - x_mean) ** 2 for i in range(n))
|
|
|
|
slope = numerator / denominator if denominator > 0 else 0
|
|
|
|
print(f"\nQuality trend (last {n} runs):")
|
|
print(f" Scores: {[f'{s:.2%}' for s in scores]}")
|
|
print(f" Trend slope: {slope:+.4f}")
|
|
|
|
if slope < -0.05:
|
|
print("WARNING: Negative quality trend detected!")
|
|
else:
|
|
print("Quality trend is stable or improving")
|
|
|
|
|
|
# =============================================================================
|
|
# Run comprehensive evaluation
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
# Run with verbose output
|
|
pytest.main([
|
|
__file__,
|
|
"-v",
|
|
"--tb=short",
|
|
"-s", # Show prints
|
|
"--durations=10", # Show slowest tests
|
|
])
|