glam/backend/rag/metrics.py
2025-12-30 03:43:31 +01:00

313 lines
9.4 KiB
Python

"""
Prometheus Metrics for Heritage RAG API
Exposes metrics for monitoring template-based SPARQL generation,
session management, and overall API performance.
Metrics exposed:
- rag_queries_total: Total queries by type (template/llm), status, endpoint
- rag_template_hits_total: Template SPARQL hits by template_id
- rag_query_duration_seconds: Query latency histogram
- rag_session_active: Active sessions gauge
- rag_cache_hits_total: Cache hit/miss counter
Usage:
from backend.rag.metrics import (
record_query, create_metrics_endpoint, PROMETHEUS_AVAILABLE
)
# Record a query
record_query(
endpoint="dspy_query",
template_used=True,
template_id="count_by_province",
cache_hit=False,
status="success",
duration_seconds=1.5
)
"""
from __future__ import annotations
import logging
from functools import lru_cache
from typing import Any
logger = logging.getLogger(__name__)
# ============================================================================
# Prometheus Client Import (Lazy/Optional)
# ============================================================================
PROMETHEUS_AVAILABLE = False
_prometheus_client = None
try:
import prometheus_client as _prometheus_client
PROMETHEUS_AVAILABLE = True
logger.info("Prometheus metrics enabled")
except ImportError:
logger.warning("prometheus_client not installed - metrics disabled")
# ============================================================================
# Metric Initialization
# ============================================================================
def _init_metrics():
"""Initialize Prometheus metrics. Called once at module load."""
if not PROMETHEUS_AVAILABLE or _prometheus_client is None:
return {}
pc = _prometheus_client
return {
"query_counter": pc.Counter(
"rag_queries_total",
"Total RAG queries processed",
labelnames=["endpoint", "method", "status"],
),
"template_hit_counter": pc.Counter(
"rag_template_hits_total",
"Template SPARQL hits by template ID",
labelnames=["template_id", "intent"],
),
"cache_counter": pc.Counter(
"rag_cache_total",
"Cache hits and misses",
labelnames=["result"],
),
"query_duration": pc.Histogram(
"rag_query_duration_seconds",
"Query processing time in seconds",
labelnames=["endpoint", "method"],
buckets=(0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0),
),
"template_matching_duration": pc.Histogram(
"rag_template_matching_seconds",
"Time to match query to template",
labelnames=["matched"],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25),
),
"active_sessions_gauge": pc.Gauge(
"rag_sessions_active",
"Number of active conversation sessions",
),
}
# Initialize metrics at module load
_metrics = _init_metrics()
# ============================================================================
# Helper Functions
# ============================================================================
def record_query(
endpoint: str,
template_used: bool,
template_id: str | None,
cache_hit: bool,
status: str,
duration_seconds: float,
intent: str | None = None,
) -> None:
"""Record metrics for a completed query.
Args:
endpoint: API endpoint name (e.g., "dspy_query", "dspy_query_stream")
template_used: Whether template SPARQL was used vs LLM generation
template_id: Template ID if template was used
cache_hit: Whether response was served from cache
status: Query status ("success", "error", "timeout")
duration_seconds: Total query duration in seconds
intent: Query intent classification if available
"""
if not PROMETHEUS_AVAILABLE or not _metrics:
return
method = "template" if template_used else "llm"
# Increment query counter
_metrics["query_counter"].labels(
endpoint=endpoint,
method=method,
status=status,
).inc()
# Record template hit if applicable
if template_used and template_id:
_metrics["template_hit_counter"].labels(
template_id=template_id,
intent=intent or "unknown",
).inc()
# Record cache status
_metrics["cache_counter"].labels(result="hit" if cache_hit else "miss").inc()
# Record duration
_metrics["query_duration"].labels(
endpoint=endpoint,
method=method,
).observe(duration_seconds)
def record_template_matching(matched: bool, duration_seconds: float) -> None:
"""Record template matching attempt metrics.
Args:
matched: Whether a template was successfully matched
duration_seconds: Time taken to attempt template matching
"""
if not PROMETHEUS_AVAILABLE or not _metrics:
return
_metrics["template_matching_duration"].labels(
matched="true" if matched else "false",
).observe(duration_seconds)
def set_active_sessions(count: int) -> None:
"""Update the active sessions gauge.
Args:
count: Current number of active sessions
"""
if not PROMETHEUS_AVAILABLE or not _metrics:
return
_metrics["active_sessions_gauge"].set(count)
def increment_active_sessions() -> None:
"""Increment active sessions by 1."""
if not PROMETHEUS_AVAILABLE or not _metrics:
return
_metrics["active_sessions_gauge"].inc()
def decrement_active_sessions() -> None:
"""Decrement active sessions by 1."""
if not PROMETHEUS_AVAILABLE or not _metrics:
return
_metrics["active_sessions_gauge"].dec()
# ============================================================================
# Metrics Endpoint
# ============================================================================
@lru_cache(maxsize=1)
def _get_metrics_bytes() -> tuple[bytes, str]:
"""Generate Prometheus metrics response (cached).
Returns:
Tuple of (metrics_bytes, content_type)
"""
if not PROMETHEUS_AVAILABLE or _prometheus_client is None:
return b"# Prometheus metrics not available\n", "text/plain"
return (
_prometheus_client.generate_latest(_prometheus_client.REGISTRY),
_prometheus_client.CONTENT_TYPE_LATEST,
)
def get_metrics_response() -> tuple[bytes, str]:
"""Generate Prometheus metrics response.
Clears cache to ensure fresh metrics on each call.
Returns:
Tuple of (metrics_bytes, content_type)
"""
_get_metrics_bytes.cache_clear()
return _get_metrics_bytes()
def create_metrics_endpoint():
"""Create a FastAPI router for the /metrics endpoint.
Usage:
from backend.rag.metrics import create_metrics_endpoint
app.include_router(create_metrics_endpoint())
Returns:
FastAPI APIRouter with /metrics endpoint
"""
from fastapi import APIRouter
from fastapi.responses import Response
router = APIRouter(tags=["monitoring"])
@router.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint for scraping."""
body, content_type = get_metrics_response()
return Response(content=body, media_type=content_type)
return router
# ============================================================================
# Metric Summary Helpers (for logging/debugging)
# ============================================================================
def get_template_hit_rate() -> dict[str, Any]:
"""Calculate template hit rate from current metrics.
Returns:
Dict with hit rate statistics
"""
if not PROMETHEUS_AVAILABLE or not _metrics:
return {"available": False}
query_counter = _metrics["query_counter"]
# Get current counter values
total_template = 0
total_llm = 0
# Iterate through query_counter samples
for metric in query_counter.collect():
for sample in metric.samples:
if sample.name == "rag_queries_total":
labels = sample.labels
if labels.get("method") == "template":
total_template += sample.value
elif labels.get("method") == "llm":
total_llm += sample.value
total = total_template + total_llm
hit_rate = total_template / total if total > 0 else 0.0
return {
"available": True,
"total_queries": int(total),
"template_queries": int(total_template),
"llm_queries": int(total_llm),
"template_hit_rate": round(hit_rate, 4),
"template_hit_rate_percent": round(hit_rate * 100, 2),
}
def get_template_breakdown() -> dict[str, int]:
"""Get breakdown of template usage by template_id.
Returns:
Dict mapping template_id to hit count
"""
if not PROMETHEUS_AVAILABLE or not _metrics:
return {}
template_counter = _metrics["template_hit_counter"]
breakdown: dict[str, int] = {}
for metric in template_counter.collect():
for sample in metric.samples:
if sample.name == "rag_template_hits_total":
template_id = sample.labels.get("template_id", "unknown")
breakdown[template_id] = int(sample.value)
return breakdown