secureagentrag-api / utils /metrics.py
LeomordKaly's picture
deploy: phase 3 BYOK backend (Dockerfile.hf, FastAPI on 7860)
2a83c3b verified
"""Prometheus metrics for the RAG pipeline.
Self-hosted observability. Unlike Phoenix tracing (which captures prompts and
completions and is therefore *hard-disabled* under BYOK), Prometheus metrics
are aggregate counters and histograms β€” no request content, no keys, no user
text ever lands in a label. Only low-cardinality categorical dimensions
(provider name, gate verdict, outcome) are recorded, so they are safe to emit
even on the public BYOK demo.
Graceful degradation: if ``prometheus_client`` is not installed the module
exposes no-op stubs and ``METRICS_ENABLED`` is ``False``. Nothing in the hot
path raises β€” a missing dependency just means ``/metrics`` returns 501.
The custom RAG metrics live on the global ``prometheus_client.REGISTRY`` so
they share the same ``/metrics`` exposition as the HTTP-level metrics emitted
by ``prometheus-fastapi-instrumentator`` (which also defaults to that
registry).
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from utils.logging import get_logger
if TYPE_CHECKING:
from core.state import GraphState
_log = get_logger(__name__)
try:
from prometheus_client import (
CONTENT_TYPE_LATEST,
Counter,
Gauge,
Histogram,
generate_latest,
)
METRICS_ENABLED = True
except ImportError: # pragma: no cover - exercised only without the extra
METRICS_ENABLED = False
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
Counter = Gauge = Histogram = None # type: ignore[assignment]
generate_latest = None # type: ignore[assignment]
# ── Metric definitions ───────────────────────────────────────────────────
# Buckets tuned for the RAG SLO: sub-second cache-warm answers up to the
# 180 s wall-clock budget (SAR_REQUEST_TIMEOUT_S). The wide tail captures
# Groq free-tier rate-limit waits + reranker cold loads without smearing the
# fast path across too few buckets.
_LATENCY_BUCKETS = (0.5, 1.0, 2.5, 5.0, 10.0, 20.0, 30.0, 60.0, 120.0, 180.0)
if METRICS_ENABLED:
PIPELINE_LATENCY = Histogram(
"rag_pipeline_latency_seconds",
"End-to-end RAG pipeline wall-clock latency, by terminal outcome.",
labelnames=("outcome",),
buckets=_LATENCY_BUCKETS,
)
PIPELINE_REQUESTS = Counter(
"rag_pipeline_requests_total",
"RAG pipeline invocations, by terminal outcome.",
labelnames=("outcome",),
)
GUARDRAILS_BLOCKED = Counter(
"guardrails_blocked_total",
"Requests blocked at a safety gate, by gate and reason category.",
labelnames=("gate", "reason"),
)
INFERENCE_ROUTED = Counter(
"inference_routed_by_provider_total",
"Synthesis inference calls routed, by provider.",
labelnames=("provider",),
)
FAITHFULNESS_DROPPED = Counter(
"faithfulness_dropped_total",
"Total sentences dropped/flagged by the NLI faithfulness gate.",
)
AUDIT_VERIFICATIONS = Counter(
"audit_chain_verifications_total",
"Scheduled audit hash-chain verification runs, by result.",
labelnames=("result",),
)
AUDIT_CHAIN_VALID = Gauge(
"audit_chain_valid",
"1 if the last audit-chain verification passed, 0 if tamper/error detected.",
)
else: # pragma: no cover
PIPELINE_LATENCY = None
PIPELINE_REQUESTS = None
GUARDRAILS_BLOCKED = None
INFERENCE_ROUTED = None
FAITHFULNESS_DROPPED = None
AUDIT_VERIFICATIONS = None
AUDIT_CHAIN_VALID = None
# ── Recording helpers ──────────────────────────────────────────────────────
# Each helper is a hard no-op when the extra is absent, and every increment is
# wrapped so a metrics failure can never break a request. Labels are clamped
# to a known vocabulary to keep cardinality bounded.
_KNOWN_PROVIDERS = {"ollama", "groq", "openai", "anthropic"}
def _outcome_from_state(state: GraphState | dict) -> str:
"""Derive a terminal-outcome label from a final pipeline state.
One of: ``blocked`` (a safety gate stopped the request), ``timeout``
(wall-clock budget exceeded), ``review`` (answered but flagged for human
review), or ``success``.
"""
if state.get("guardrails_passed") is False or state.get("security_passed") is False:
return "blocked"
if state.get("evaluation_notes") == "request_timeout":
return "timeout"
if state.get("needs_human_review"):
return "review"
return "success"
def record_pipeline_run(state: GraphState | dict, latency_ms: float) -> None:
"""Record latency, outcome, provider, and faithfulness drops for one run.
Safe to call unconditionally at the end of every pipeline invocation
(streaming and non-streaming). No-op without the ``prometheus_client``
extra; never raises.
"""
if not METRICS_ENABLED:
return
try:
outcome = _outcome_from_state(state)
PIPELINE_LATENCY.labels(outcome=outcome).observe(latency_ms / 1000.0)
PIPELINE_REQUESTS.labels(outcome=outcome).inc()
provider = (state.get("synth_provider") or "").lower()
if provider:
label = provider if provider in _KNOWN_PROVIDERS else "other"
INFERENCE_ROUTED.labels(provider=label).inc()
dropped = len(state.get("faithfulness_unsupported") or [])
if dropped:
FAITHFULNESS_DROPPED.inc(dropped)
if outcome == "blocked":
if state.get("guardrails_passed") is False:
_record_block("guardrails", state.get("guardrails_reason"))
elif state.get("security_passed") is False:
_record_block("security", state.get("security_message"))
except Exception as exc: # pragma: no cover - defensive
_log.debug("metrics_record_failed", error=str(exc))
# Bounded vocabulary for the guardrails reason label. Anything outside this
# set collapses to "other" so an attacker-controlled rejection message can
# never explode metric cardinality.
_KNOWN_GATE_REASONS = {
"prompt_injection",
"jailbreak",
"pii_detected",
"toxic",
"off_topic",
"rbac_denied",
"sensitivity_block",
"policy_violation",
}
def _record_block(gate: str, reason: str | None) -> None:
if not METRICS_ENABLED:
return
key = (reason or "unknown").strip().lower().replace(" ", "_")[:40]
label = key if key in _KNOWN_GATE_REASONS else "other"
GUARDRAILS_BLOCKED.labels(gate=gate, reason=label).inc()
def record_audit_verification(result: str, valid: bool) -> None:
"""Record one scheduled audit-chain verification outcome.
``result`` is one of ``valid`` / ``broken`` / ``error``. No-op without the
extra; never raises.
"""
if not METRICS_ENABLED:
return
try:
AUDIT_VERIFICATIONS.labels(result=result).inc()
AUDIT_CHAIN_VALID.set(1 if valid else 0)
except Exception as exc: # pragma: no cover - defensive
_log.debug("metrics_audit_record_failed", error=str(exc))
def render_latest() -> tuple[bytes, str]:
"""Return ``(payload, content_type)`` for a ``/metrics`` response.
Raises ``RuntimeError`` when the extra is absent so the API layer can map
that to a 501.
"""
if not METRICS_ENABLED:
raise RuntimeError("prometheus_client not installed")
return generate_latest(), CONTENT_TYPE_LATEST