File size: 7,573 Bytes
2a83c3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""Prometheus metrics for the RAG pipeline.

Self-hosted observability. Unlike Phoenix tracing (which captures prompts and
completions and is therefore *hard-disabled* under BYOK), Prometheus metrics
are aggregate counters and histograms β€” no request content, no keys, no user
text ever lands in a label. Only low-cardinality categorical dimensions
(provider name, gate verdict, outcome) are recorded, so they are safe to emit
even on the public BYOK demo.

Graceful degradation: if ``prometheus_client`` is not installed the module
exposes no-op stubs and ``METRICS_ENABLED`` is ``False``. Nothing in the hot
path raises β€” a missing dependency just means ``/metrics`` returns 501.

The custom RAG metrics live on the global ``prometheus_client.REGISTRY`` so
they share the same ``/metrics`` exposition as the HTTP-level metrics emitted
by ``prometheus-fastapi-instrumentator`` (which also defaults to that
registry).
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from utils.logging import get_logger

if TYPE_CHECKING:
    from core.state import GraphState

_log = get_logger(__name__)

try:
    from prometheus_client import (
        CONTENT_TYPE_LATEST,
        Counter,
        Gauge,
        Histogram,
        generate_latest,
    )

    METRICS_ENABLED = True
except ImportError:  # pragma: no cover - exercised only without the extra
    METRICS_ENABLED = False
    CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
    Counter = Gauge = Histogram = None  # type: ignore[assignment]
    generate_latest = None  # type: ignore[assignment]


# ── Metric definitions ───────────────────────────────────────────────────
# Buckets tuned for the RAG SLO: sub-second cache-warm answers up to the
# 180 s wall-clock budget (SAR_REQUEST_TIMEOUT_S). The wide tail captures
# Groq free-tier rate-limit waits + reranker cold loads without smearing the
# fast path across too few buckets.
_LATENCY_BUCKETS = (0.5, 1.0, 2.5, 5.0, 10.0, 20.0, 30.0, 60.0, 120.0, 180.0)

if METRICS_ENABLED:
    PIPELINE_LATENCY = Histogram(
        "rag_pipeline_latency_seconds",
        "End-to-end RAG pipeline wall-clock latency, by terminal outcome.",
        labelnames=("outcome",),
        buckets=_LATENCY_BUCKETS,
    )
    PIPELINE_REQUESTS = Counter(
        "rag_pipeline_requests_total",
        "RAG pipeline invocations, by terminal outcome.",
        labelnames=("outcome",),
    )
    GUARDRAILS_BLOCKED = Counter(
        "guardrails_blocked_total",
        "Requests blocked at a safety gate, by gate and reason category.",
        labelnames=("gate", "reason"),
    )
    INFERENCE_ROUTED = Counter(
        "inference_routed_by_provider_total",
        "Synthesis inference calls routed, by provider.",
        labelnames=("provider",),
    )
    FAITHFULNESS_DROPPED = Counter(
        "faithfulness_dropped_total",
        "Total sentences dropped/flagged by the NLI faithfulness gate.",
    )
    AUDIT_VERIFICATIONS = Counter(
        "audit_chain_verifications_total",
        "Scheduled audit hash-chain verification runs, by result.",
        labelnames=("result",),
    )
    AUDIT_CHAIN_VALID = Gauge(
        "audit_chain_valid",
        "1 if the last audit-chain verification passed, 0 if tamper/error detected.",
    )
else:  # pragma: no cover
    PIPELINE_LATENCY = None
    PIPELINE_REQUESTS = None
    GUARDRAILS_BLOCKED = None
    INFERENCE_ROUTED = None
    FAITHFULNESS_DROPPED = None
    AUDIT_VERIFICATIONS = None
    AUDIT_CHAIN_VALID = None


# ── Recording helpers ──────────────────────────────────────────────────────
# Each helper is a hard no-op when the extra is absent, and every increment is
# wrapped so a metrics failure can never break a request. Labels are clamped
# to a known vocabulary to keep cardinality bounded.

_KNOWN_PROVIDERS = {"ollama", "groq", "openai", "anthropic"}


def _outcome_from_state(state: GraphState | dict) -> str:
    """Derive a terminal-outcome label from a final pipeline state.

    One of: ``blocked`` (a safety gate stopped the request), ``timeout``
    (wall-clock budget exceeded), ``review`` (answered but flagged for human
    review), or ``success``.
    """
    if state.get("guardrails_passed") is False or state.get("security_passed") is False:
        return "blocked"
    if state.get("evaluation_notes") == "request_timeout":
        return "timeout"
    if state.get("needs_human_review"):
        return "review"
    return "success"


def record_pipeline_run(state: GraphState | dict, latency_ms: float) -> None:
    """Record latency, outcome, provider, and faithfulness drops for one run.

    Safe to call unconditionally at the end of every pipeline invocation
    (streaming and non-streaming). No-op without the ``prometheus_client``
    extra; never raises.
    """
    if not METRICS_ENABLED:
        return
    try:
        outcome = _outcome_from_state(state)
        PIPELINE_LATENCY.labels(outcome=outcome).observe(latency_ms / 1000.0)
        PIPELINE_REQUESTS.labels(outcome=outcome).inc()

        provider = (state.get("synth_provider") or "").lower()
        if provider:
            label = provider if provider in _KNOWN_PROVIDERS else "other"
            INFERENCE_ROUTED.labels(provider=label).inc()

        dropped = len(state.get("faithfulness_unsupported") or [])
        if dropped:
            FAITHFULNESS_DROPPED.inc(dropped)

        if outcome == "blocked":
            if state.get("guardrails_passed") is False:
                _record_block("guardrails", state.get("guardrails_reason"))
            elif state.get("security_passed") is False:
                _record_block("security", state.get("security_message"))
    except Exception as exc:  # pragma: no cover - defensive
        _log.debug("metrics_record_failed", error=str(exc))


# Bounded vocabulary for the guardrails reason label. Anything outside this
# set collapses to "other" so an attacker-controlled rejection message can
# never explode metric cardinality.
_KNOWN_GATE_REASONS = {
    "prompt_injection",
    "jailbreak",
    "pii_detected",
    "toxic",
    "off_topic",
    "rbac_denied",
    "sensitivity_block",
    "policy_violation",
}


def _record_block(gate: str, reason: str | None) -> None:
    if not METRICS_ENABLED:
        return
    key = (reason or "unknown").strip().lower().replace(" ", "_")[:40]
    label = key if key in _KNOWN_GATE_REASONS else "other"
    GUARDRAILS_BLOCKED.labels(gate=gate, reason=label).inc()


def record_audit_verification(result: str, valid: bool) -> None:
    """Record one scheduled audit-chain verification outcome.

    ``result`` is one of ``valid`` / ``broken`` / ``error``. No-op without the
    extra; never raises.
    """
    if not METRICS_ENABLED:
        return
    try:
        AUDIT_VERIFICATIONS.labels(result=result).inc()
        AUDIT_CHAIN_VALID.set(1 if valid else 0)
    except Exception as exc:  # pragma: no cover - defensive
        _log.debug("metrics_audit_record_failed", error=str(exc))


def render_latest() -> tuple[bytes, str]:
    """Return ``(payload, content_type)`` for a ``/metrics`` response.

    Raises ``RuntimeError`` when the extra is absent so the API layer can map
    that to a 501.
    """
    if not METRICS_ENABLED:
        raise RuntimeError("prometheus_client not installed")
    return generate_latest(), CONTENT_TYPE_LATEST