Spaces:
Running
Running
| """Prometheus metrics for the RAG pipeline. | |
| Self-hosted observability. Unlike Phoenix tracing (which captures prompts and | |
| completions and is therefore *hard-disabled* under BYOK), Prometheus metrics | |
| are aggregate counters and histograms β no request content, no keys, no user | |
| text ever lands in a label. Only low-cardinality categorical dimensions | |
| (provider name, gate verdict, outcome) are recorded, so they are safe to emit | |
| even on the public BYOK demo. | |
| Graceful degradation: if ``prometheus_client`` is not installed the module | |
| exposes no-op stubs and ``METRICS_ENABLED`` is ``False``. Nothing in the hot | |
| path raises β a missing dependency just means ``/metrics`` returns 501. | |
| The custom RAG metrics live on the global ``prometheus_client.REGISTRY`` so | |
| they share the same ``/metrics`` exposition as the HTTP-level metrics emitted | |
| by ``prometheus-fastapi-instrumentator`` (which also defaults to that | |
| registry). | |
| """ | |
| from __future__ import annotations | |
| from typing import TYPE_CHECKING | |
| from utils.logging import get_logger | |
| if TYPE_CHECKING: | |
| from core.state import GraphState | |
| _log = get_logger(__name__) | |
| try: | |
| from prometheus_client import ( | |
| CONTENT_TYPE_LATEST, | |
| Counter, | |
| Gauge, | |
| Histogram, | |
| generate_latest, | |
| ) | |
| METRICS_ENABLED = True | |
| except ImportError: # pragma: no cover - exercised only without the extra | |
| METRICS_ENABLED = False | |
| CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8" | |
| Counter = Gauge = Histogram = None # type: ignore[assignment] | |
| generate_latest = None # type: ignore[assignment] | |
| # ββ Metric definitions βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Buckets tuned for the RAG SLO: sub-second cache-warm answers up to the | |
| # 180 s wall-clock budget (SAR_REQUEST_TIMEOUT_S). The wide tail captures | |
| # Groq free-tier rate-limit waits + reranker cold loads without smearing the | |
| # fast path across too few buckets. | |
| _LATENCY_BUCKETS = (0.5, 1.0, 2.5, 5.0, 10.0, 20.0, 30.0, 60.0, 120.0, 180.0) | |
| if METRICS_ENABLED: | |
| PIPELINE_LATENCY = Histogram( | |
| "rag_pipeline_latency_seconds", | |
| "End-to-end RAG pipeline wall-clock latency, by terminal outcome.", | |
| labelnames=("outcome",), | |
| buckets=_LATENCY_BUCKETS, | |
| ) | |
| PIPELINE_REQUESTS = Counter( | |
| "rag_pipeline_requests_total", | |
| "RAG pipeline invocations, by terminal outcome.", | |
| labelnames=("outcome",), | |
| ) | |
| GUARDRAILS_BLOCKED = Counter( | |
| "guardrails_blocked_total", | |
| "Requests blocked at a safety gate, by gate and reason category.", | |
| labelnames=("gate", "reason"), | |
| ) | |
| INFERENCE_ROUTED = Counter( | |
| "inference_routed_by_provider_total", | |
| "Synthesis inference calls routed, by provider.", | |
| labelnames=("provider",), | |
| ) | |
| FAITHFULNESS_DROPPED = Counter( | |
| "faithfulness_dropped_total", | |
| "Total sentences dropped/flagged by the NLI faithfulness gate.", | |
| ) | |
| AUDIT_VERIFICATIONS = Counter( | |
| "audit_chain_verifications_total", | |
| "Scheduled audit hash-chain verification runs, by result.", | |
| labelnames=("result",), | |
| ) | |
| AUDIT_CHAIN_VALID = Gauge( | |
| "audit_chain_valid", | |
| "1 if the last audit-chain verification passed, 0 if tamper/error detected.", | |
| ) | |
| else: # pragma: no cover | |
| PIPELINE_LATENCY = None | |
| PIPELINE_REQUESTS = None | |
| GUARDRAILS_BLOCKED = None | |
| INFERENCE_ROUTED = None | |
| FAITHFULNESS_DROPPED = None | |
| AUDIT_VERIFICATIONS = None | |
| AUDIT_CHAIN_VALID = None | |
| # ββ Recording helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Each helper is a hard no-op when the extra is absent, and every increment is | |
| # wrapped so a metrics failure can never break a request. Labels are clamped | |
| # to a known vocabulary to keep cardinality bounded. | |
| _KNOWN_PROVIDERS = {"ollama", "groq", "openai", "anthropic"} | |
| def _outcome_from_state(state: GraphState | dict) -> str: | |
| """Derive a terminal-outcome label from a final pipeline state. | |
| One of: ``blocked`` (a safety gate stopped the request), ``timeout`` | |
| (wall-clock budget exceeded), ``review`` (answered but flagged for human | |
| review), or ``success``. | |
| """ | |
| if state.get("guardrails_passed") is False or state.get("security_passed") is False: | |
| return "blocked" | |
| if state.get("evaluation_notes") == "request_timeout": | |
| return "timeout" | |
| if state.get("needs_human_review"): | |
| return "review" | |
| return "success" | |
| def record_pipeline_run(state: GraphState | dict, latency_ms: float) -> None: | |
| """Record latency, outcome, provider, and faithfulness drops for one run. | |
| Safe to call unconditionally at the end of every pipeline invocation | |
| (streaming and non-streaming). No-op without the ``prometheus_client`` | |
| extra; never raises. | |
| """ | |
| if not METRICS_ENABLED: | |
| return | |
| try: | |
| outcome = _outcome_from_state(state) | |
| PIPELINE_LATENCY.labels(outcome=outcome).observe(latency_ms / 1000.0) | |
| PIPELINE_REQUESTS.labels(outcome=outcome).inc() | |
| provider = (state.get("synth_provider") or "").lower() | |
| if provider: | |
| label = provider if provider in _KNOWN_PROVIDERS else "other" | |
| INFERENCE_ROUTED.labels(provider=label).inc() | |
| dropped = len(state.get("faithfulness_unsupported") or []) | |
| if dropped: | |
| FAITHFULNESS_DROPPED.inc(dropped) | |
| if outcome == "blocked": | |
| if state.get("guardrails_passed") is False: | |
| _record_block("guardrails", state.get("guardrails_reason")) | |
| elif state.get("security_passed") is False: | |
| _record_block("security", state.get("security_message")) | |
| except Exception as exc: # pragma: no cover - defensive | |
| _log.debug("metrics_record_failed", error=str(exc)) | |
| # Bounded vocabulary for the guardrails reason label. Anything outside this | |
| # set collapses to "other" so an attacker-controlled rejection message can | |
| # never explode metric cardinality. | |
| _KNOWN_GATE_REASONS = { | |
| "prompt_injection", | |
| "jailbreak", | |
| "pii_detected", | |
| "toxic", | |
| "off_topic", | |
| "rbac_denied", | |
| "sensitivity_block", | |
| "policy_violation", | |
| } | |
| def _record_block(gate: str, reason: str | None) -> None: | |
| if not METRICS_ENABLED: | |
| return | |
| key = (reason or "unknown").strip().lower().replace(" ", "_")[:40] | |
| label = key if key in _KNOWN_GATE_REASONS else "other" | |
| GUARDRAILS_BLOCKED.labels(gate=gate, reason=label).inc() | |
| def record_audit_verification(result: str, valid: bool) -> None: | |
| """Record one scheduled audit-chain verification outcome. | |
| ``result`` is one of ``valid`` / ``broken`` / ``error``. No-op without the | |
| extra; never raises. | |
| """ | |
| if not METRICS_ENABLED: | |
| return | |
| try: | |
| AUDIT_VERIFICATIONS.labels(result=result).inc() | |
| AUDIT_CHAIN_VALID.set(1 if valid else 0) | |
| except Exception as exc: # pragma: no cover - defensive | |
| _log.debug("metrics_audit_record_failed", error=str(exc)) | |
| def render_latest() -> tuple[bytes, str]: | |
| """Return ``(payload, content_type)`` for a ``/metrics`` response. | |
| Raises ``RuntimeError`` when the extra is absent so the API layer can map | |
| that to a 501. | |
| """ | |
| if not METRICS_ENABLED: | |
| raise RuntimeError("prometheus_client not installed") | |
| return generate_latest(), CONTENT_TYPE_LATEST | |