InsuranceBot / backend /admin.py
rohitsar567's picture
feat!: remove cross-session profile recall (ADR-043) β€” net βˆ’3700 LOC
6d5684e
Raw
History Blame Contribute Delete
48.3 kB
"""Admin endpoints β€” password gated. NEVER exposed to ordinary users.
The four endpoints:
GET /api/admin/health β€” full LLM health snapshot (every model)
POST /api/admin/probe β€” force a fresh probe of all models now
GET /api/admin/chain β€” current brain chain order
POST /api/admin/chain β€” reorder the brain chain (promote a model to primary)
Access gate (KI-097 β€” single check):
Request must include `X-Admin-Password` header matching ADMIN_PASSWORD env.
Setup:
Add to Space secrets (Settings β†’ Variables and secrets):
ADMIN_PASSWORD = your-strong-password-here
Returns 401 Unauthorized on bad/missing password. Access is gated by a
strong password only; there is no IP allowlist (it would add operational
complexity β€” changing networks would lock the operator out β€” without
meaningful additional security).
"""
from __future__ import annotations
import asyncio
import json
import os
import re
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, Request, Header
from pydantic import BaseModel
from backend import llm_health
from backend.config import settings
router = APIRouter()
# Cap how many tail lines of 40-data/llm_usage.jsonl we hold in memory while
# computing per-role stats. 1000 lines @ ~150B each = ~150 KB peak β€” bounded.
USAGE_TAIL_LINES = 1000
# A5 β€” Audit fix #7: emit a console warning at import time when the admin
# password is unset. We don't break access (the password check below already
# returns False which 401s every request), but ops needs a loud signal that
# the gate is effectively unconfigured so deployments don't silently sit
# behind an unreachable admin surface.
if not os.environ.get("ADMIN_PASSWORD", "").strip():
import sys as _sys
print(
"[admin] WARNING: ADMIN_UNGATED β€” ADMIN_PASSWORD env var is empty. "
"All /api/admin/* requests will return 401. Set ADMIN_PASSWORD in "
"the deployment env to enable access.",
file=_sys.stderr,
flush=True,
)
def _password_ok(supplied: Optional[str]) -> bool:
expected = os.environ.get("ADMIN_PASSWORD", "").strip()
if not expected:
return False # no password configured => deny
return supplied is not None and supplied == expected
def _check_admin(request: Request, password: Optional[str]) -> None:
"""Password-only gate. Raises 401 on bad/missing password.
`request` is kept in the signature so callsites don't churn; the
function does not inspect the client IP.
"""
# TODO: enforce an IP allowlist as a second factor for hardening (with
# a documented break-glass procedure so a network change doesn't lock
# ops out). The gate is password-only, acceptable for the current
# threat model.
if not _password_ok(password):
raise HTTPException(status_code=401, detail="Unauthorized")
# --- Endpoints --------------------------------------------------------------
@router.get("/api/admin/health")
async def admin_health(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
_check_admin(request, x_admin_password)
return llm_health.status_summary()
@router.post("/api/admin/probe")
async def admin_probe(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
_check_admin(request, x_admin_password)
state = await llm_health.probe_all()
return {"probed": len(state), "summary": llm_health.status_summary()}
@router.get("/api/admin/chain")
async def admin_chain_get(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
_check_admin(request, x_admin_password)
from backend.providers.nvidia_nim_llm import BRAIN_CHAIN
# Three-chain collapse (2026-05-15) β€” only the brain role remains.
return {
"brain": BRAIN_CHAIN,
}
class ChainReorderRequest(BaseModel):
role: str # 'brain'
order: list[str] # new ordering of model ids
@router.post("/api/admin/chain")
async def admin_chain_set(
body: ChainReorderRequest,
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
_check_admin(request, x_admin_password)
if body.role != "brain":
raise HTTPException(status_code=400, detail="role must be 'brain'")
# Mutate the in-memory chain
from backend.providers import nvidia_nim_llm as nim
setattr(nim, "BRAIN_CHAIN", list(body.order))
# Persist for next process restart β€” write to 40-data/admin_overrides.json
override_path = settings.DATA_DIR / "admin_overrides.json"
override_path.parent.mkdir(parents=True, exist_ok=True)
state = {}
if override_path.exists():
try:
state = json.loads(override_path.read_text())
except Exception:
state = {}
state[body.role] = body.order
override_path.write_text(json.dumps(state, indent=2) + "\n")
return {"ok": True, "role": body.role, "new_chain": body.order}
# ---------------------------------------------------------------------------
# /api/admin/usage β€” per-role usage stats + next-best-recommended
# ---------------------------------------------------------------------------
def _tail_jsonl(path: Path, n: int) -> list[dict]:
"""Read the last `n` lines of a JSONL file, returning parsed dicts.
Bad lines are skipped silently. Returns [] when the file doesn't exist β€”
backward-compatible behavior so the endpoint stays alive before any usage
has been logged.
"""
if not path.exists():
return []
try:
# File is bounded by the 1 MB rotation cap in nvidia_nim_llm._append_usage,
# so reading the whole thing and slicing tail is fine; deque keeps memory
# bounded even when no rotation has happened yet.
with path.open("r", encoding="utf-8", errors="replace") as f:
tail = deque(f, maxlen=n)
out: list[dict] = []
for line in tail:
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except Exception:
continue # skip malformed line, keep going
return out
except Exception:
return []
def _stat_block_for_role(rows: list[dict], role: str, chain: list[str],
health_state: dict) -> dict:
"""Compute the per-role JSON payload (see endpoint docstring for shape)."""
role_rows = [r for r in rows if r.get("role") == role]
total = len(role_rows)
success_count = sum(1 for r in role_rows if r.get("success") is True)
success_rate = (success_count / total) if total else 0.0
# Group by served_model (skip rows with no served_model β€” those are total
# failures already captured in the role-level success_rate).
by_model: dict[str, dict] = {}
for r in role_rows:
m = r.get("served_model")
if not m:
continue
bucket = by_model.setdefault(m, {"calls": 0, "latency_sum": 0,
"success_count": 0})
bucket["calls"] += 1
try:
bucket["latency_sum"] += int(r.get("latency_ms") or 0)
except Exception:
pass
if r.get("success") is True:
bucket["success_count"] += 1
by_model_list = []
total_served = sum(b["calls"] for b in by_model.values()) or 1
for m, b in by_model.items():
avg_lat = int(b["latency_sum"] / b["calls"]) if b["calls"] else 0
m_success = (b["success_count"] / b["calls"]) if b["calls"] else 0.0
by_model_list.append({
"model": m,
"calls": b["calls"],
"share": round(b["calls"] / total_served, 4) if total_served else 0.0,
"avg_latency_ms": avg_lat,
"success_rate": round(m_success, 4),
})
by_model_list.sort(key=lambda x: -x["calls"])
primary_model = chain[0] if chain else None
# next_best_recommended: highest-position model in the chain (after the
# primary) whose llm_health status is 'healthy'. If none are healthy,
# fall back to chain[1] with next_best_is_unverified=true so the UI can
# flag the recommendation as unproven.
next_best = None
next_best_unverified = False
for m in chain[1:]:
h = health_state.get(m)
status = getattr(h, "status", None) if h else None
if status == "healthy":
next_best = m
break
if next_best is None:
next_best = chain[1] if len(chain) > 1 else None
next_best_unverified = next_best is not None
return {
"total_calls_24h": total,
"success_rate": round(success_rate, 4),
"by_model": by_model_list,
"primary_model": primary_model,
"next_best_recommended": next_best,
"next_best_is_unverified": next_best_unverified,
}
@router.get("/api/admin/usage")
async def admin_usage(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
"""Per-role usage stats over the last USAGE_TAIL_LINES log entries.
Backward-compatible: if 40-data/llm_usage.jsonl doesn't exist yet, returns
zero-stat blocks with primary_model = current chain[0]. The frontend can
render an empty-state without any extra branching.
"""
_check_admin(request, x_admin_password)
# Live (post-override) chain β€” read attribute off the module so any
# admin reorder applied in the current process is reflected immediately.
# Three-chain collapse (2026-05-15) β€” only the brain role remains.
from backend.providers import nvidia_nim_llm as nim
chains = {
"brain": list(getattr(nim, "BRAIN_CHAIN", [])),
}
usage_path = settings.DATA_DIR / "llm_usage.jsonl"
rows = _tail_jsonl(usage_path, USAGE_TAIL_LINES)
health_state = llm_health.load() # {model: ModelHealth} dict
return {
role: _stat_block_for_role(rows, role, chains[role], health_state)
for role in chains
}
# ---------------------------------------------------------------------------
# /api/admin/profiles β€” live-session snapshot (ADR-043, 2026-05-27)
# ---------------------------------------------------------------------------
@router.get("/api/admin/profiles")
async def admin_profiles(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
"""List every CURRENTLY LIVE in-memory session + lightweight summary.
Pre-ADR-043 this read from `40-data/profiles/<name>.json` and exposed a
cross-session audit of every named visitor. The on-disk store is gone;
operators get the live picture instead β€” every session that is in
`session_state._sessions` right now (i.e. hasn't been idle-evicted).
Returned shape stays close to the prior schema so the existing admin
UI keeps rendering without changes.
"""
_check_admin(request, x_admin_password)
from backend import session_state as _ss
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
rows = []
with _ss._lock:
for sid, s in list(_ss._sessions.items()):
p = s.profile
name = (getattr(p, "name", None) or "").strip()
filled = sum(
1 for fld in (
"name", "age", "dependents", "income_band",
"location_tier", "primary_goal", "health_conditions",
)
if getattr(p, fld, None) not in (None, "", [])
)
rows.append({
"name_display": name or "(anonymous)",
"name_slug": name.lower().replace(" ", "-") if name else "",
"session_id": sid,
"last_seen": datetime.fromtimestamp(s.last_touched, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"session_count": 1,
"profile_complete_fields": filled,
})
return {"profiles": rows, "total": len(rows), "snapshot_ts": now_iso}
# ---------------------------------------------------------------------------
# /api/profile/select + /api/profile/reject β€” IN-MEMORY shortlist tracker
# (ADR-043, 2026-05-27)
#
# Pre-ADR-043 these appended events to the named-profile JSON file. They
# now mutate only the live SessionState. Closes the tab β‡’ shortlist gone.
# The frontend's shortlist UI continues to work within one session.
# ---------------------------------------------------------------------------
class _PolicyEventBody(BaseModel):
session_id: str
policy_slug: str
insurer: str
reason: Optional[str] = None
_EVENT_TYPE_TO_FIELD = {
"selected": "selected_policies",
"rejected": "rejected_policies",
}
def _do_record_policy_event(body: _PolicyEventBody, event_type: str) -> dict:
"""Append a shortlist event to the LIVE in-memory profile."""
if not body.session_id or not body.policy_slug or not body.insurer:
raise HTTPException(
status_code=400,
detail="session_id, policy_slug, and insurer are required",
)
if event_type not in _EVENT_TYPE_TO_FIELD:
raise HTTPException(status_code=400, detail="invalid event_type")
from backend.session_state import get_session
session = get_session(body.session_id)
p = session.profile
field_name = _EVENT_TYPE_TO_FIELD[event_type]
entries: list[dict] = list(getattr(p, field_name, None) or [])
# Dedup on policy_slug β€” bump timestamp + reason on repeat clicks.
dedup_idx = next(
(i for i, e in enumerate(entries)
if (e or {}).get("policy_slug") == body.policy_slug),
None,
)
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = {
"policy_slug": body.policy_slug,
"insurer": body.insurer,
"event_at": now_iso,
"session_id": body.session_id,
"reason": body.reason or ("user_clicked_select" if event_type == "selected" else "user_clicked_reject"),
}
if dedup_idx is not None:
entries[dedup_idx] = {**entries[dedup_idx], **payload}
else:
entries.append(payload)
setattr(p, field_name, entries)
return {
"ok": True,
"event_type": event_type,
"policy_slug": body.policy_slug,
"count": len(entries),
}
@router.post("/api/profile/select")
async def profile_select(body: _PolicyEventBody):
"""Record a user clicking "shortlist / save" on a policy card."""
return _do_record_policy_event(body, "selected")
@router.post("/api/profile/reject")
async def profile_reject(body: _PolicyEventBody):
"""Record a user clicking "not for me / reject" on a policy card."""
return _do_record_policy_event(body, "rejected")
# ---------------------------------------------------------------------------
# /api/admin/performance β€” aggregated performance/quality metrics
# ---------------------------------------------------------------------------
_REPO_ROOT = Path(__file__).resolve().parent.parent
def _read_eval_summary() -> Optional[dict]:
"""Return the `summary` block from eval/results.json β€” or None if missing."""
p = _REPO_ROOT / "eval" / "results.json"
if not p.exists():
return None
try:
raw = json.loads(p.read_text(encoding="utf-8"))
except Exception:
return None
summary = raw.get("summary") if isinstance(raw, dict) else None
if not isinstance(summary, dict):
return None
# Surface exactly the fields the admin panel cares about. Use .get so any
# missing field becomes None rather than raising.
return {
"ran_at": summary.get("ran_at"),
"elapsed_seconds": summary.get("elapsed_seconds"),
"n_questions": summary.get("n_questions"),
"factual_accuracy": summary.get("factual_accuracy"),
"citation_accuracy": summary.get("citation_accuracy"),
"refusal_precision": summary.get("refusal_precision"),
"by_brain": summary.get("by_brain") or {},
"by_type": summary.get("by_type") or {},
}
def _latest_audit_dir() -> Optional[Path]:
"""Return the most recently modified `80-audit/full_*` directory containing
a summary.json. Returns None if no such directory exists."""
audit_root = _REPO_ROOT / "80-audit"
if not audit_root.exists():
return None
candidates = [d for d in audit_root.glob("full_*") if (d / "summary.json").exists()]
if not candidates:
return None
candidates.sort(key=lambda d: d.stat().st_mtime, reverse=True)
return candidates[0]
# Regex helpers for parsing report.md (analyze.py output). Anchored to the
# specific table rows so they're robust against table reordering.
_RE_REPORT_PERSONAS = re.compile(r"Personas completed \| \*\*(\d+)\*\* of (\d+)")
_RE_REPORT_TURNS = re.compile(r"Total turns executed \| \*\*(\d+)\*\*")
_RE_REPORT_ERRORS = re.compile(r"Errors \(HTTP / timeout / network\) \| (\d+)")
_RE_REPORT_REFUSALS = re.compile(r"Refusals \(blocked=true\) \| (\d+)")
_RE_REPORT_P50 = re.compile(r"Latency p50 \| (\d+)\s*ms")
_RE_REPORT_P95 = re.compile(r"Latency p95 \| (\d+)\s*ms")
_RE_REPORT_P99 = re.compile(r"Latency p99 \| (\d+)\s*ms")
_RE_BRAIN_ROW = re.compile(r"^\|\s*`([^`]+)`\s*\|\s*(\d+)\s*\|\s*$")
def _parse_brain_routing(report_text: str) -> dict[str, int]:
"""Extract the `## 2. Brain routing` table β†’ {brain: turn_count}."""
out: dict[str, int] = {}
section_start = report_text.find("## 2. Brain routing")
if section_start < 0:
return out
section_end = report_text.find("## 3.", section_start)
if section_end < 0:
section_end = len(report_text)
for line in report_text[section_start:section_end].splitlines():
m = _RE_BRAIN_ROW.match(line)
if m:
try:
out[m.group(1)] = int(m.group(2))
except ValueError:
continue
return out
def _read_audit_summary() -> Optional[dict]:
"""Return aggregate metrics for the latest persona-audit run.
The on-disk summary.json is intentionally sparse (it's just the launcher's
config). The real metrics live in `report.md` (produced by
tools/audit/analyze.py). We parse it via regex β€” much cheaper than
re-walking 100+ transcript JSONs on every admin request.
Returns None if there is no audit run yet OR if the report.md hasn't been
generated (the launcher writes summary.json before analyze.py runs).
"""
run_dir = _latest_audit_dir()
if run_dir is None:
return None
summary_path = run_dir / "summary.json"
report_path = run_dir / "report.md"
try:
launcher_summary = json.loads(summary_path.read_text(encoding="utf-8"))
except Exception:
launcher_summary = {}
out: dict = {
"run_id": launcher_summary.get("run_id") or run_dir.name,
"personas_requested": launcher_summary.get("personas_requested"),
"personas_completed": launcher_summary.get("personas_completed"),
"elapsed_seconds": launcher_summary.get("elapsed_seconds"),
"turns_total": None,
"errors": None,
"refusals": None,
"p50_ms": None,
"p95_ms": None,
"p99_ms": None,
"brain_routing": {},
}
if not report_path.exists():
return out # launcher ran but analyze.py hasn't; surface what we have
try:
report_text = report_path.read_text(encoding="utf-8")
except Exception:
return out
def _int_match(rx: re.Pattern[str]) -> Optional[int]:
m = rx.search(report_text)
if not m:
return None
try:
return int(m.group(1))
except (ValueError, IndexError):
return None
# Pull values from report.md β€” overrides None defaults set above. Personas
# completed lives in BOTH summary.json and report.md; report.md wins because
# it reflects the actually-analyzed transcripts (in case the launcher
# claimed N but only M wrote transcripts).
p_match = _RE_REPORT_PERSONAS.search(report_text)
if p_match:
try:
out["personas_completed"] = int(p_match.group(1))
except ValueError:
pass
out["turns_total"] = _int_match(_RE_REPORT_TURNS)
out["errors"] = _int_match(_RE_REPORT_ERRORS)
out["refusals"] = _int_match(_RE_REPORT_REFUSALS)
out["p50_ms"] = _int_match(_RE_REPORT_P50)
out["p95_ms"] = _int_match(_RE_REPORT_P95)
out["p99_ms"] = _int_match(_RE_REPORT_P99)
out["brain_routing"] = _parse_brain_routing(report_text)
return out
def _read_usage_24h() -> Optional[dict]:
"""Compute {role: {count, success_rate, avg_latency_ms}} from the last
USAGE_TAIL_LINES entries of 40-data/llm_usage.jsonl. Returns None if the
file is missing OR empty so the frontend can render an empty-state.
Note: "24h" in the field name is conventional β€” the actual window is the
last USAGE_TAIL_LINES rows (typically covers β‰ˆ24h of activity at current
traffic). Keeping the name aligns with the admin UI label.
The live stack emits the `brain` role (see
providers.nvidia_nim_llm.get_brain_llm + llm_health.ROLES). The
append-only llm_usage.jsonl can contain rows tagged with non-canonical
role values, so we filter to the canonical role set β€” the same way
/api/admin/usage does via its `chains` dict β€” instead of surfacing
every role value present in the file. llm_health.ROLES is the single
source of truth (also used as _LLM_HEALTH_CHAIN_ROLES below).
"""
usage_path = settings.DATA_DIR / "llm_usage.jsonl"
rows = _tail_jsonl(usage_path, USAGE_TAIL_LINES)
if not rows:
return None
canonical_roles = set(llm_health.ROLES)
agg: dict[str, dict] = {}
for r in rows:
role = r.get("role")
if not role or role not in canonical_roles:
continue
bucket = agg.setdefault(role, {"count": 0, "success_count": 0, "latency_sum": 0,
"latency_n": 0})
bucket["count"] += 1
if r.get("success") is True:
bucket["success_count"] += 1
lat = r.get("latency_ms")
if isinstance(lat, (int, float)):
bucket["latency_sum"] += int(lat)
bucket["latency_n"] += 1
out: dict[str, dict] = {}
for role, b in agg.items():
out[role] = {
"count": b["count"],
"success_rate": round(b["success_count"] / b["count"], 4) if b["count"] else 0.0,
"avg_latency_ms": int(b["latency_sum"] / b["latency_n"]) if b["latency_n"] else 0,
}
return out
@router.get("/api/admin/performance")
async def admin_performance(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
"""Aggregated performance/quality metrics for the admin Performance section.
All four sub-blocks are independently nullable β€” a missing eval/results.json
or absent audit run shouldn't 500 the endpoint.
"""
_check_admin(request, x_admin_password)
return {
"eval": _read_eval_summary(),
"audit": _read_audit_summary(),
"usage_24h": _read_usage_24h(),
"snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
# ---------------------------------------------------------------------------
# /api/admin/llm-health β€” KI-086 LLM Health & Credits snapshot
#
# Surfaces KI-080..KI-085 telemetry on the existing admin "LLM Chain" tab so
# the operator can see at a glance:
# - per-chain elected PRIMARY + BACKUP (KI-080)
# - each candidate's latest probe latency + success rate (KI-080)
# - credits remaining + unit + reset deadline (KI-085)
# - degraded-until window when a candidate was demoted on 429 (KI-084)
# - per-turn served-model distribution from the last N llm_usage.jsonl rows
#
# Response shape β€” three top-level keys (chains / candidates / recent_turns) +
# a snapshot_ts. All durations on the wire are seconds-from-now (relative,
# never absolute monotonic) so the frontend doesn't need to know the server's
# monotonic clock origin.
# ---------------------------------------------------------------------------
# Map of the on-wire chain role name β†’ human-friendly label. The roles
# themselves match llm_health.get_primary() input strings exactly.
# Three-chain collapse (2026-05-15) β€” only the brain role remains; we
# defer to llm_health.ROLES so a future role only needs to be added once.
_LLM_HEALTH_CHAIN_ROLES = llm_health.ROLES
def _chain_names_map() -> dict[str, list[str]]:
"""Live (post-admin-override) chain config β€” read off the module so
admin reorders applied earlier in the same process are reflected
immediately. Only the brain role is present."""
from backend.providers import nvidia_nim_llm as nim
return {
"brain": list(getattr(nim, "BRAIN_CHAIN", [])),
}
def _seconds_until_monotonic(deadline: Optional[float]) -> Optional[float]:
"""Convert a monotonic-time deadline (as stamped in ModelHealth) into a
seconds-from-now value the frontend can render as an ETA. Returns None
when the deadline is missing OR already in the past; the caller decides
how to render `None` vs `0`."""
if deadline is None:
return None
import time as _time
rem = deadline - _time.monotonic()
if rem <= 0:
return 0.0
return round(rem, 1)
def _probe_age_seconds(iso_ts: Optional[str]) -> Optional[float]:
"""Wall-clock seconds since a probe iso8601 timestamp. Cheap wrapper
around llm_health._iso_age_seconds for the wire payload."""
age = llm_health._iso_age_seconds(iso_ts)
if age is None:
return None
return max(0.0, round(age, 1))
def _success_rate_for(h) -> Optional[float]:
"""Last-N probes success rate as a float fraction. Returns None when no
probe history yet."""
hist = getattr(h, "probe_history", None) or []
if not hist:
return None
hits = sum(1 for r in hist if r.get("ok"))
return round(hits / len(hist), 4)
def _candidate_snapshot(model: str, health, chain_membership: list[str],
now_mono: float) -> dict:
"""Per-candidate row for Section B. Always returns a dict β€” even when
the model has never been probed yet (status='unknown', everything else
None) β€” so the frontend table doesn't have to handle missing rows."""
if health is None:
return {
"model": model,
"provider": llm_health.provider_of(model),
"chain_membership": chain_membership,
"status": "unknown",
"effective_status": "unknown",
"latency_ms": None,
"success_rate": None,
"probe_age_seconds": None,
"last_error": None,
"last_status_code": None,
"health_reason": None,
"credits_remaining": None,
"credits_unit": None,
"credits_low_water": None,
"credits_reset_in_seconds": None,
"degraded_for_seconds": None,
}
deg_until = getattr(health, "degraded_until_monotonic", 0.0) or 0.0
deg_for = None
if deg_until and deg_until > now_mono:
deg_for = round(deg_until - now_mono, 1)
# KI-202 β€” operator-facing reason for the admin Health column. None when
# the row is healthy (renders "Live" only) or has no error signal yet.
eff = llm_health.effective_status(health)
if eff == "stale":
health_reason = "stale"
elif eff == "healthy":
health_reason = None
else:
health_reason = llm_health._classify_error_reason(
health.last_error, health.last_status_code,
)
return {
"model": model,
"provider": llm_health.provider_of(model),
"chain_membership": chain_membership,
"status": health.status,
"effective_status": eff,
"latency_ms": health.latency_ms,
"success_rate": _success_rate_for(health),
"probe_age_seconds": _probe_age_seconds(health.tested_at),
"last_error": health.last_error,
"last_status_code": health.last_status_code,
"health_reason": health_reason,
"credits_remaining": health.credits_remaining,
"credits_unit": health.credits_unit,
"credits_low_water": health.credits_low_water,
"credits_reset_in_seconds": _seconds_until_monotonic(health.credits_reset_at),
"degraded_for_seconds": deg_for,
}
def _candidate_available_for_calls(h, now_mono: float) -> bool:
"""KI-122 β€” Is this candidate AVAILABLE RIGHT NOW for real chat traffic?
This is the operator-facing definition of "live and usable":
- probe says healthy or degraded (NOT 'down' / 'unknown')
- has not been sin-binned by report_failure() in the last
DEGRADED_WINDOW_SEC / DEGRADE_DURATION_LONG_S window
- credits gate (`_has_credits`) passes β€” either no signal, OR
credits_reset_at has elapsed (stale snapshot), OR credits_remaining
is above the candidate's low_water.
Returns False for None / unknown candidates."""
if h is None:
return False
if h.status in ("down", "unknown"):
return False
if h.degraded_until_monotonic and h.degraded_until_monotonic > now_mono:
return False
if not llm_health._has_credits(h, now_mono):
return False
return True
def _candidate_credit_exhausted_strict(h, now_mono: float) -> bool:
"""STRICT credit-exhausted rule for a single candidate.
True ONLY when ALL three hold:
(1) credits_remaining is NOT None (we have a real signal)
(2) credits_remaining <= credits_low_water
(3) credits_reset_at is set AND in the FUTURE (we're inside an
active gating window β€” stale or absent reset means the
snapshot is not authoritative).
This is intentionally stricter than `not _has_credits()` because the
banner is louder than the elector. The elector falls through cheaply
on a single bad candidate; this strictness avoids falsely scaring the
operator when one quota-exhausted backup co-exists with a perfectly
healthy primary."""
if h is None:
return False
if h.credits_remaining is None:
return False
if h.credits_remaining > (h.credits_low_water or 0.0):
return False
if h.credits_reset_at is None:
# No scheduled reset (e.g. OpenRouter usd_balance β€” prepaid wallet).
# Could still be a real "wallet empty" signal, BUT we won't flag the
# chain banner on it alone because OpenRouter free-tier accounts
# report $0 even when calls succeed. The probe / chat success
# signal is the authoritative truth β€” fall through to the probe
# status check in `_candidate_available_for_calls` instead.
return False
if now_mono >= h.credits_reset_at:
# Reset window has elapsed β€” snapshot is stale, treat as permissive.
return False
return True
def _chain_summary(role: str, chains: dict[str, list[str]],
state: dict, now_mono: float) -> dict:
"""Per-chain block for Section A. Includes elected primary/backup +
`chain_credit_exhausted` so the frontend can render a banner when every
candidate in the chain is genuinely unusable."""
chain = chains.get(role) or []
primary = llm_health.get_primary(role)
backup = llm_health.get_backup(role)
# KI-122 (2026-05-15) β€” STRICT chain_credit_exhausted rule.
#
# Earlier rules (KI-085, KI-116) tried to skip cold-start candidates
# and elapsed-reset snapshots but still left a hole: when one chain
# member had a None reset_at + low credits (typical for OpenRouter's
# usd_balance) while sibling NIM candidates had `credits_remaining=None`
# (no signal yet), the loop registered `any_signal=True` on the bad
# OpenRouter row but never broke out via a fresh NIM peer β€” so the
# banner fired despite the elected NIM primary being HEALTHY Β· 100%.
#
# New rule: banner fires ONLY when EVERY chain member is BOTH
# (a) not available_for_calls (down / sin-binned / credit-gated), AND
# (b) at least one of those failures is a credit-exhaustion signal
# (otherwise it's a "chain entirely down" situation, which
# deserves a different banner β€” handled by the elected_primary
# == None path in the frontend).
members_info: list[dict] = []
any_available = False
any_credit_exhausted = False
last_probe_at_iso: Optional[str] = None
for m in chain:
h = state.get(m)
available = _candidate_available_for_calls(h, now_mono)
credit_exhausted = _candidate_credit_exhausted_strict(h, now_mono)
if available:
any_available = True
if credit_exhausted:
any_credit_exhausted = True
# Track the most-recent probe timestamp across the chain.
ts = getattr(h, "tested_at", None) if h is not None else None
if ts and (last_probe_at_iso is None or ts > last_probe_at_iso):
last_probe_at_iso = ts
members_info.append({
"model": m,
"available_for_calls": available,
"credit_exhausted": credit_exhausted,
"is_current_primary": bool(primary and m == primary),
"is_current_backup": bool(backup and m == backup),
})
# Banner fires only when: zero available members AND at least one
# member is strictly credit-exhausted. If everyone is just 'down'
# (probe failures) the banner is the wrong message β€” the operator
# needs the "no eligible candidate" state, which the frontend already
# renders via `elected_primary == None`.
chain_credit_exhausted = bool((not any_available) and any_credit_exhausted)
return {
"role": role,
"chain": chain,
"chain_members": members_info,
"elected_primary": primary,
"elected_backup": backup,
# KI-122 β€” explicit "currently in use" name + a boolean the
# frontend uses to render the "● IN USE" pill without re-deriving.
"current_primary": primary,
"current_primary_available": _candidate_available_for_calls(
state.get(primary) if primary else None, now_mono,
),
"primary_snapshot": _candidate_snapshot(
primary, state.get(primary), [role], now_mono,
) if primary else None,
"backup_snapshot": _candidate_snapshot(
backup, state.get(backup), [role], now_mono,
) if backup else None,
"chain_credit_exhausted": chain_credit_exhausted,
# KI-122 β€” operator-facing staleness signal. ISO string + a
# convenience seconds-ago value so the UI can render "Probed Ns ago".
"last_probe_at": last_probe_at_iso,
"last_probe_age_seconds": _probe_age_seconds(last_probe_at_iso),
}
def _recent_turns(n: int = 20) -> list[dict]:
"""Section C β€” last N completed turns from 40-data/llm_usage.jsonl,
most-recent-first. We keep the row shape close to the raw log entries
so the frontend can render new fields if the producer adds them.
Fields surfaced (all optional β€” older rows pre-KI-080 won't carry
elected_primary/backup):
ts / role / chain_primary / elected_primary / elected_backup /
served_model / latency_ms / success / fallback_reason
"""
usage_path = settings.DATA_DIR / "llm_usage.jsonl"
# Re-use the existing tail-reader; cap at N so we don't pay for the
# full 1000-row tail when the panel only renders 20.
rows = _tail_jsonl(usage_path, max(n, 20))
if not rows:
return []
out: list[dict] = []
# tail_jsonl returns oldest-first; reverse to newest-first.
for r in reversed(rows[-n:]):
out.append({
"ts": r.get("ts"),
"role": r.get("role"),
"chain_primary": r.get("chain_primary"),
"elected_primary": r.get("elected_primary"),
"elected_backup": r.get("elected_backup"),
"served_model": r.get("served_model"),
"latency_ms": r.get("latency_ms"),
"success": r.get("success"),
"fallback_reason": r.get("fallback_reason"),
})
return out
@router.get("/api/admin/llm-health")
async def admin_llm_health(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
"""KI-086 β€” composite LLM health + credits snapshot for the LLM Chain tab.
Returns three keys:
chains β€” Section A: one entry per chain (brain β€” the only
remaining role after the three-chain collapse) with
elected primary + backup + their snapshots +
chain_credit_exhausted banner flag.
candidates β€” Section B: one row per known candidate across all chains,
with chain_membership listing every chain it appears in,
latency / success / credits / degraded-window state.
recent_turns β€” Section C: last 20 served turns from llm_usage.jsonl.
Plus snapshot_ts so the UI can show "updated <wallclock>".
"""
_check_admin(request, x_admin_password)
# KI-088: admin endpoint must never trigger probes β€” read cached state only.
# Live probing from the admin tab (polled every 30s by the frontend) would
# stack 6+ NIM candidates onto the same per-key concurrency budget and
# starve user chat traffic. All data below comes from llm_health.load()
# (in-memory snapshot persisted by the background_probe_loop) and the
# llm_usage.jsonl append-only log β€” both are read-only and trigger zero
# outbound LLM calls.
import time as _time
now_mono = _time.monotonic()
chains = _chain_names_map()
state = llm_health.load() # {model -> ModelHealth} (cached snapshot only)
# Section A: per-chain election + credit banner.
chains_block = [
_chain_summary(role, chains, state, now_mono)
for role in _LLM_HEALTH_CHAIN_ROLES
]
# Section B: every known candidate Γ— chain membership (deduped).
# Membership is the list of chain roles a model belongs to.
membership: dict[str, list[str]] = {}
for role, models in chains.items():
for m in models:
membership.setdefault(m, []).append(role)
# Also include any candidate present in state but not currently in any
# chain (admin reorder may have just removed it) so the operator can
# still see its last probe + credits.
for m in state.keys():
membership.setdefault(m, [])
candidates_block = [
_candidate_snapshot(m, state.get(m), membership[m], now_mono)
for m in membership.keys()
]
# Sort: degraded first (red), then non-healthy (amber), healthy last,
# then by model name. Helps the operator see problems at the top.
_status_rank = {"down": 0, "degraded": 1, "unknown": 2, "healthy": 3}
candidates_block.sort(
key=lambda c: (_status_rank.get(c["status"], 9), c["model"])
)
# Section C: last 20 turns.
recent = _recent_turns(20)
return {
"chains": chains_block,
"candidates": candidates_block,
"recent_turns": recent,
"snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
# ---------------------------------------------------------------------------
# A5 β€” Audit fix #4: /api/admin/persona-drift β€” slot-capture completeness
# for the last 20 personas. Seven canonical required slots: name, age,
# dependents, location_tier, income_band, primary_goal, health_conditions.
# <50% capture is flagged red on the frontend.
# ---------------------------------------------------------------------------
# The seven canonical "ready-to-recommend" fact-find slots. Mirrors
# brain_tools._REQUIRED_FOR_READY (the recommendation-ready required set)
# and needs_finder.Profile's persisted fields. Adding/removing a slot must
# stay in sync with that required set. Health is captured as a list (empty
# list counts as "asked but no conditions" β†’ still a valid captured signal
# once the `asked` array contains the field name).
_PERSONA_DRIFT_SLOTS = ("name", "age", "dependents", "location_tier",
"income_band", "primary_goal", "health_conditions")
def _slot_captured(profile: dict, slot: str, asked: list[str]) -> bool:
"""A slot is 'captured' when the field has a non-empty value OR (for
health_conditions) when the user was asked and confirmed no conditions
(asked-list contains the slot but the list is empty β†’ still a positive
answer the bot heard, not a missing signal)."""
v = profile.get(slot)
if slot == "health_conditions":
if isinstance(v, list) and len(v) > 0:
return True
if "health_conditions" in (asked or []):
return True
return False
return v not in (None, "", [], 0)
@router.get("/api/admin/persona-drift")
async def admin_persona_drift(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
"""Slot-capture completeness for LIVE in-memory sessions (newest first).
Pre-ADR-043 this walked `40-data/profiles/*.json` and showed every
named visitor's slot-capture progress. The on-disk store is gone;
operators now see the live picture only.
"""
_check_admin(request, x_admin_password)
from backend import session_state as _ss
rows: list[dict] = []
with _ss._lock:
for sid, s in list(_ss._sessions.items()):
p = s.profile
profile_dict = {fld: getattr(p, fld, None) for fld in _PERSONA_DRIFT_SLOTS}
asked = list(getattr(p, "asked", None) or [])
captured = [s_ for s_ in _PERSONA_DRIFT_SLOTS if _slot_captured(profile_dict, s_, asked)]
missing = [s_ for s_ in _PERSONA_DRIFT_SLOTS if s_ not in captured]
rows.append({
"persona_id": sid,
"name_display": (getattr(p, "name", None) or "β€”"),
"last_seen": datetime.fromtimestamp(s.last_touched, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"captured_slots": captured,
"missing_slots": missing,
"completeness_pct": round(100.0 * len(captured) / len(_PERSONA_DRIFT_SLOTS), 1),
"session_count": 1,
})
rows.sort(key=lambda r: (r["last_seen"] or ""), reverse=True)
rows = rows[:20]
return {
"personas": rows,
"total": len(rows),
"slots": list(_PERSONA_DRIFT_SLOTS),
"snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
# ---------------------------------------------------------------------------
# A5 β€” Audit fix #5: /api/admin/recommendation-history β€” last 10 policy
# recommendation events across all profiles, newest first.
# ---------------------------------------------------------------------------
@router.get("/api/admin/recommendation-history")
async def admin_recommendation_history(
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
"""Last 10 policy-event entries across LIVE in-memory sessions.
Pre-ADR-043 this walked `40-data/profiles/*.json` and surfaced every
historical shown/selected/rejected event the bot had ever logged. The
on-disk store is gone; this now reflects the current container's live
sessions only β€” historical events evict with the session.
"""
_check_admin(request, x_admin_password)
from backend import session_state as _ss
events: list[dict] = []
with _ss._lock:
for sid, s in list(_ss._sessions.items()):
p = s.profile
name_display = (getattr(p, "name", None) or "β€”")
for evt_type, field_name in (("shown", "shown_policies"),
("selected", "selected_policies"),
("rejected", "rejected_policies")):
for entry in (getattr(p, field_name, None) or []):
events.append({
"persona_id": sid,
"name_display": name_display,
"event_type": evt_type,
"policy_slug": (entry or {}).get("policy_slug"),
"insurer": (entry or {}).get("insurer"),
"event_at": (entry or {}).get("event_at"),
"session_id": (entry or {}).get("session_id"),
"reason": (entry or {}).get("reason"),
"outcome": evt_type,
})
events.sort(key=lambda e: (e["event_at"] or ""), reverse=True)
events = events[:10]
return {
"events": events,
"total": len(events),
"snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
# ---------------------------------------------------------------------------
# #77 / #52-residual-5 β€” operator prune of persisted uploaded marketplace
# docs. Persisted uploads have NO TTL by design (they are public catalogue
# cards); this is the sanctioned way to remove a test/abuse upload. Removes
# the persisted dir (path-safety-guarded in backend.uploaded_docs) + the
# doc's chunks from the GLOBAL policies collection + busts the marketplace
# grade / corpus-pdf caches so /api/policies/all drops the card at once.
# ---------------------------------------------------------------------------
class UploadedDocPruneRequest(BaseModel):
policy_id: Optional[str] = None
prefix: Optional[str] = None # e.g. "user-upload__e2e-verify" β€” bulk
@router.post("/api/admin/uploaded-docs/prune")
async def admin_prune_uploaded_docs(
body: UploadedDocPruneRequest,
request: Request,
x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"),
):
_check_admin(request, x_admin_password)
if not body.policy_id and body.prefix is None:
raise HTTPException(status_code=400, detail="provide policy_id or prefix")
from backend import uploaded_docs as _udocs
res = _udocs.prune_persisted_upload(body.policy_id, prefix=body.prefix)
chroma_deleted: list[str] = []
chroma_errors: list[str] = []
if res.get("removed"):
try:
from rag.ingest import get_chroma_collection
_col = get_chroma_collection()
for pid in res["removed"]:
try:
_col.delete(where={"policy_id": pid})
chroma_deleted.append(pid)
except Exception as e: # noqa: BLE001 β€” surface, don't swallow
chroma_errors.append(f"{pid}: {type(e).__name__}: {e}")
except Exception as e: # noqa: BLE001
chroma_errors.append(f"collection: {type(e).__name__}: {e}")
cache_bust = "ok"
try:
import backend.main as _m
_m._CORPUS_PDF_IDX = None
_mg = getattr(_m, "_MG_CACHE", None)
if isinstance(_mg, dict):
_mg["sig"] = None
_mg["index"] = None
except Exception as e: # noqa: BLE001
cache_bust = f"{type(e).__name__}: {e}"
return {
**res,
"chroma_deleted": chroma_deleted,
"chroma_errors": chroma_errors,
"cache_bust": cache_bust,
}