Spaces:
Sleeping
Sleeping
| """Admin endpoints β password gated. NEVER exposed to ordinary users. | |
| The four endpoints: | |
| GET /api/admin/health β full LLM health snapshot (every model) | |
| POST /api/admin/probe β force a fresh probe of all models now | |
| GET /api/admin/chain β current brain chain order | |
| POST /api/admin/chain β reorder the brain chain (promote a model to primary) | |
| Access gate (KI-097 β single check): | |
| Request must include `X-Admin-Password` header matching ADMIN_PASSWORD env. | |
| Setup: | |
| Add to Space secrets (Settings β Variables and secrets): | |
| ADMIN_PASSWORD = your-strong-password-here | |
| Returns 401 Unauthorized on bad/missing password. Access is gated by a | |
| strong password only; there is no IP allowlist (it would add operational | |
| complexity β changing networks would lock the operator out β without | |
| meaningful additional security). | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| from collections import deque | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import APIRouter, HTTPException, Request, Header | |
| from pydantic import BaseModel | |
| from backend import llm_health | |
| from backend.config import settings | |
| router = APIRouter() | |
| # Cap how many tail lines of 40-data/llm_usage.jsonl we hold in memory while | |
| # computing per-role stats. 1000 lines @ ~150B each = ~150 KB peak β bounded. | |
| USAGE_TAIL_LINES = 1000 | |
| # A5 β Audit fix #7: emit a console warning at import time when the admin | |
| # password is unset. We don't break access (the password check below already | |
| # returns False which 401s every request), but ops needs a loud signal that | |
| # the gate is effectively unconfigured so deployments don't silently sit | |
| # behind an unreachable admin surface. | |
| if not os.environ.get("ADMIN_PASSWORD", "").strip(): | |
| import sys as _sys | |
| print( | |
| "[admin] WARNING: ADMIN_UNGATED β ADMIN_PASSWORD env var is empty. " | |
| "All /api/admin/* requests will return 401. Set ADMIN_PASSWORD in " | |
| "the deployment env to enable access.", | |
| file=_sys.stderr, | |
| flush=True, | |
| ) | |
| def _password_ok(supplied: Optional[str]) -> bool: | |
| expected = os.environ.get("ADMIN_PASSWORD", "").strip() | |
| if not expected: | |
| return False # no password configured => deny | |
| return supplied is not None and supplied == expected | |
| def _check_admin(request: Request, password: Optional[str]) -> None: | |
| """Password-only gate. Raises 401 on bad/missing password. | |
| `request` is kept in the signature so callsites don't churn; the | |
| function does not inspect the client IP. | |
| """ | |
| # TODO: enforce an IP allowlist as a second factor for hardening (with | |
| # a documented break-glass procedure so a network change doesn't lock | |
| # ops out). The gate is password-only, acceptable for the current | |
| # threat model. | |
| if not _password_ok(password): | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # --- Endpoints -------------------------------------------------------------- | |
| async def admin_health( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| _check_admin(request, x_admin_password) | |
| return llm_health.status_summary() | |
| async def admin_probe( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| _check_admin(request, x_admin_password) | |
| state = await llm_health.probe_all() | |
| return {"probed": len(state), "summary": llm_health.status_summary()} | |
| async def admin_chain_get( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| _check_admin(request, x_admin_password) | |
| from backend.providers.nvidia_nim_llm import BRAIN_CHAIN | |
| # Three-chain collapse (2026-05-15) β only the brain role remains. | |
| return { | |
| "brain": BRAIN_CHAIN, | |
| } | |
| class ChainReorderRequest(BaseModel): | |
| role: str # 'brain' | |
| order: list[str] # new ordering of model ids | |
| async def admin_chain_set( | |
| body: ChainReorderRequest, | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| _check_admin(request, x_admin_password) | |
| if body.role != "brain": | |
| raise HTTPException(status_code=400, detail="role must be 'brain'") | |
| # Mutate the in-memory chain | |
| from backend.providers import nvidia_nim_llm as nim | |
| setattr(nim, "BRAIN_CHAIN", list(body.order)) | |
| # Persist for next process restart β write to 40-data/admin_overrides.json | |
| override_path = settings.DATA_DIR / "admin_overrides.json" | |
| override_path.parent.mkdir(parents=True, exist_ok=True) | |
| state = {} | |
| if override_path.exists(): | |
| try: | |
| state = json.loads(override_path.read_text()) | |
| except Exception: | |
| state = {} | |
| state[body.role] = body.order | |
| override_path.write_text(json.dumps(state, indent=2) + "\n") | |
| return {"ok": True, "role": body.role, "new_chain": body.order} | |
| # --------------------------------------------------------------------------- | |
| # /api/admin/usage β per-role usage stats + next-best-recommended | |
| # --------------------------------------------------------------------------- | |
| def _tail_jsonl(path: Path, n: int) -> list[dict]: | |
| """Read the last `n` lines of a JSONL file, returning parsed dicts. | |
| Bad lines are skipped silently. Returns [] when the file doesn't exist β | |
| backward-compatible behavior so the endpoint stays alive before any usage | |
| has been logged. | |
| """ | |
| if not path.exists(): | |
| return [] | |
| try: | |
| # File is bounded by the 1 MB rotation cap in nvidia_nim_llm._append_usage, | |
| # so reading the whole thing and slicing tail is fine; deque keeps memory | |
| # bounded even when no rotation has happened yet. | |
| with path.open("r", encoding="utf-8", errors="replace") as f: | |
| tail = deque(f, maxlen=n) | |
| out: list[dict] = [] | |
| for line in tail: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| out.append(json.loads(line)) | |
| except Exception: | |
| continue # skip malformed line, keep going | |
| return out | |
| except Exception: | |
| return [] | |
| def _stat_block_for_role(rows: list[dict], role: str, chain: list[str], | |
| health_state: dict) -> dict: | |
| """Compute the per-role JSON payload (see endpoint docstring for shape).""" | |
| role_rows = [r for r in rows if r.get("role") == role] | |
| total = len(role_rows) | |
| success_count = sum(1 for r in role_rows if r.get("success") is True) | |
| success_rate = (success_count / total) if total else 0.0 | |
| # Group by served_model (skip rows with no served_model β those are total | |
| # failures already captured in the role-level success_rate). | |
| by_model: dict[str, dict] = {} | |
| for r in role_rows: | |
| m = r.get("served_model") | |
| if not m: | |
| continue | |
| bucket = by_model.setdefault(m, {"calls": 0, "latency_sum": 0, | |
| "success_count": 0}) | |
| bucket["calls"] += 1 | |
| try: | |
| bucket["latency_sum"] += int(r.get("latency_ms") or 0) | |
| except Exception: | |
| pass | |
| if r.get("success") is True: | |
| bucket["success_count"] += 1 | |
| by_model_list = [] | |
| total_served = sum(b["calls"] for b in by_model.values()) or 1 | |
| for m, b in by_model.items(): | |
| avg_lat = int(b["latency_sum"] / b["calls"]) if b["calls"] else 0 | |
| m_success = (b["success_count"] / b["calls"]) if b["calls"] else 0.0 | |
| by_model_list.append({ | |
| "model": m, | |
| "calls": b["calls"], | |
| "share": round(b["calls"] / total_served, 4) if total_served else 0.0, | |
| "avg_latency_ms": avg_lat, | |
| "success_rate": round(m_success, 4), | |
| }) | |
| by_model_list.sort(key=lambda x: -x["calls"]) | |
| primary_model = chain[0] if chain else None | |
| # next_best_recommended: highest-position model in the chain (after the | |
| # primary) whose llm_health status is 'healthy'. If none are healthy, | |
| # fall back to chain[1] with next_best_is_unverified=true so the UI can | |
| # flag the recommendation as unproven. | |
| next_best = None | |
| next_best_unverified = False | |
| for m in chain[1:]: | |
| h = health_state.get(m) | |
| status = getattr(h, "status", None) if h else None | |
| if status == "healthy": | |
| next_best = m | |
| break | |
| if next_best is None: | |
| next_best = chain[1] if len(chain) > 1 else None | |
| next_best_unverified = next_best is not None | |
| return { | |
| "total_calls_24h": total, | |
| "success_rate": round(success_rate, 4), | |
| "by_model": by_model_list, | |
| "primary_model": primary_model, | |
| "next_best_recommended": next_best, | |
| "next_best_is_unverified": next_best_unverified, | |
| } | |
| async def admin_usage( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| """Per-role usage stats over the last USAGE_TAIL_LINES log entries. | |
| Backward-compatible: if 40-data/llm_usage.jsonl doesn't exist yet, returns | |
| zero-stat blocks with primary_model = current chain[0]. The frontend can | |
| render an empty-state without any extra branching. | |
| """ | |
| _check_admin(request, x_admin_password) | |
| # Live (post-override) chain β read attribute off the module so any | |
| # admin reorder applied in the current process is reflected immediately. | |
| # Three-chain collapse (2026-05-15) β only the brain role remains. | |
| from backend.providers import nvidia_nim_llm as nim | |
| chains = { | |
| "brain": list(getattr(nim, "BRAIN_CHAIN", [])), | |
| } | |
| usage_path = settings.DATA_DIR / "llm_usage.jsonl" | |
| rows = _tail_jsonl(usage_path, USAGE_TAIL_LINES) | |
| health_state = llm_health.load() # {model: ModelHealth} dict | |
| return { | |
| role: _stat_block_for_role(rows, role, chains[role], health_state) | |
| for role in chains | |
| } | |
| # --------------------------------------------------------------------------- | |
| # /api/admin/profiles β live-session snapshot (ADR-043, 2026-05-27) | |
| # --------------------------------------------------------------------------- | |
| async def admin_profiles( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| """List every CURRENTLY LIVE in-memory session + lightweight summary. | |
| Pre-ADR-043 this read from `40-data/profiles/<name>.json` and exposed a | |
| cross-session audit of every named visitor. The on-disk store is gone; | |
| operators get the live picture instead β every session that is in | |
| `session_state._sessions` right now (i.e. hasn't been idle-evicted). | |
| Returned shape stays close to the prior schema so the existing admin | |
| UI keeps rendering without changes. | |
| """ | |
| _check_admin(request, x_admin_password) | |
| from backend import session_state as _ss | |
| now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| rows = [] | |
| with _ss._lock: | |
| for sid, s in list(_ss._sessions.items()): | |
| p = s.profile | |
| name = (getattr(p, "name", None) or "").strip() | |
| filled = sum( | |
| 1 for fld in ( | |
| "name", "age", "dependents", "income_band", | |
| "location_tier", "primary_goal", "health_conditions", | |
| ) | |
| if getattr(p, fld, None) not in (None, "", []) | |
| ) | |
| rows.append({ | |
| "name_display": name or "(anonymous)", | |
| "name_slug": name.lower().replace(" ", "-") if name else "", | |
| "session_id": sid, | |
| "last_seen": datetime.fromtimestamp(s.last_touched, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| "session_count": 1, | |
| "profile_complete_fields": filled, | |
| }) | |
| return {"profiles": rows, "total": len(rows), "snapshot_ts": now_iso} | |
| # --------------------------------------------------------------------------- | |
| # /api/profile/select + /api/profile/reject β IN-MEMORY shortlist tracker | |
| # (ADR-043, 2026-05-27) | |
| # | |
| # Pre-ADR-043 these appended events to the named-profile JSON file. They | |
| # now mutate only the live SessionState. Closes the tab β shortlist gone. | |
| # The frontend's shortlist UI continues to work within one session. | |
| # --------------------------------------------------------------------------- | |
| class _PolicyEventBody(BaseModel): | |
| session_id: str | |
| policy_slug: str | |
| insurer: str | |
| reason: Optional[str] = None | |
| _EVENT_TYPE_TO_FIELD = { | |
| "selected": "selected_policies", | |
| "rejected": "rejected_policies", | |
| } | |
| def _do_record_policy_event(body: _PolicyEventBody, event_type: str) -> dict: | |
| """Append a shortlist event to the LIVE in-memory profile.""" | |
| if not body.session_id or not body.policy_slug or not body.insurer: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="session_id, policy_slug, and insurer are required", | |
| ) | |
| if event_type not in _EVENT_TYPE_TO_FIELD: | |
| raise HTTPException(status_code=400, detail="invalid event_type") | |
| from backend.session_state import get_session | |
| session = get_session(body.session_id) | |
| p = session.profile | |
| field_name = _EVENT_TYPE_TO_FIELD[event_type] | |
| entries: list[dict] = list(getattr(p, field_name, None) or []) | |
| # Dedup on policy_slug β bump timestamp + reason on repeat clicks. | |
| dedup_idx = next( | |
| (i for i, e in enumerate(entries) | |
| if (e or {}).get("policy_slug") == body.policy_slug), | |
| None, | |
| ) | |
| now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| payload = { | |
| "policy_slug": body.policy_slug, | |
| "insurer": body.insurer, | |
| "event_at": now_iso, | |
| "session_id": body.session_id, | |
| "reason": body.reason or ("user_clicked_select" if event_type == "selected" else "user_clicked_reject"), | |
| } | |
| if dedup_idx is not None: | |
| entries[dedup_idx] = {**entries[dedup_idx], **payload} | |
| else: | |
| entries.append(payload) | |
| setattr(p, field_name, entries) | |
| return { | |
| "ok": True, | |
| "event_type": event_type, | |
| "policy_slug": body.policy_slug, | |
| "count": len(entries), | |
| } | |
| async def profile_select(body: _PolicyEventBody): | |
| """Record a user clicking "shortlist / save" on a policy card.""" | |
| return _do_record_policy_event(body, "selected") | |
| async def profile_reject(body: _PolicyEventBody): | |
| """Record a user clicking "not for me / reject" on a policy card.""" | |
| return _do_record_policy_event(body, "rejected") | |
| # --------------------------------------------------------------------------- | |
| # /api/admin/performance β aggregated performance/quality metrics | |
| # --------------------------------------------------------------------------- | |
| _REPO_ROOT = Path(__file__).resolve().parent.parent | |
| def _read_eval_summary() -> Optional[dict]: | |
| """Return the `summary` block from eval/results.json β or None if missing.""" | |
| p = _REPO_ROOT / "eval" / "results.json" | |
| if not p.exists(): | |
| return None | |
| try: | |
| raw = json.loads(p.read_text(encoding="utf-8")) | |
| except Exception: | |
| return None | |
| summary = raw.get("summary") if isinstance(raw, dict) else None | |
| if not isinstance(summary, dict): | |
| return None | |
| # Surface exactly the fields the admin panel cares about. Use .get so any | |
| # missing field becomes None rather than raising. | |
| return { | |
| "ran_at": summary.get("ran_at"), | |
| "elapsed_seconds": summary.get("elapsed_seconds"), | |
| "n_questions": summary.get("n_questions"), | |
| "factual_accuracy": summary.get("factual_accuracy"), | |
| "citation_accuracy": summary.get("citation_accuracy"), | |
| "refusal_precision": summary.get("refusal_precision"), | |
| "by_brain": summary.get("by_brain") or {}, | |
| "by_type": summary.get("by_type") or {}, | |
| } | |
| def _latest_audit_dir() -> Optional[Path]: | |
| """Return the most recently modified `80-audit/full_*` directory containing | |
| a summary.json. Returns None if no such directory exists.""" | |
| audit_root = _REPO_ROOT / "80-audit" | |
| if not audit_root.exists(): | |
| return None | |
| candidates = [d for d in audit_root.glob("full_*") if (d / "summary.json").exists()] | |
| if not candidates: | |
| return None | |
| candidates.sort(key=lambda d: d.stat().st_mtime, reverse=True) | |
| return candidates[0] | |
| # Regex helpers for parsing report.md (analyze.py output). Anchored to the | |
| # specific table rows so they're robust against table reordering. | |
| _RE_REPORT_PERSONAS = re.compile(r"Personas completed \| \*\*(\d+)\*\* of (\d+)") | |
| _RE_REPORT_TURNS = re.compile(r"Total turns executed \| \*\*(\d+)\*\*") | |
| _RE_REPORT_ERRORS = re.compile(r"Errors \(HTTP / timeout / network\) \| (\d+)") | |
| _RE_REPORT_REFUSALS = re.compile(r"Refusals \(blocked=true\) \| (\d+)") | |
| _RE_REPORT_P50 = re.compile(r"Latency p50 \| (\d+)\s*ms") | |
| _RE_REPORT_P95 = re.compile(r"Latency p95 \| (\d+)\s*ms") | |
| _RE_REPORT_P99 = re.compile(r"Latency p99 \| (\d+)\s*ms") | |
| _RE_BRAIN_ROW = re.compile(r"^\|\s*`([^`]+)`\s*\|\s*(\d+)\s*\|\s*$") | |
| def _parse_brain_routing(report_text: str) -> dict[str, int]: | |
| """Extract the `## 2. Brain routing` table β {brain: turn_count}.""" | |
| out: dict[str, int] = {} | |
| section_start = report_text.find("## 2. Brain routing") | |
| if section_start < 0: | |
| return out | |
| section_end = report_text.find("## 3.", section_start) | |
| if section_end < 0: | |
| section_end = len(report_text) | |
| for line in report_text[section_start:section_end].splitlines(): | |
| m = _RE_BRAIN_ROW.match(line) | |
| if m: | |
| try: | |
| out[m.group(1)] = int(m.group(2)) | |
| except ValueError: | |
| continue | |
| return out | |
| def _read_audit_summary() -> Optional[dict]: | |
| """Return aggregate metrics for the latest persona-audit run. | |
| The on-disk summary.json is intentionally sparse (it's just the launcher's | |
| config). The real metrics live in `report.md` (produced by | |
| tools/audit/analyze.py). We parse it via regex β much cheaper than | |
| re-walking 100+ transcript JSONs on every admin request. | |
| Returns None if there is no audit run yet OR if the report.md hasn't been | |
| generated (the launcher writes summary.json before analyze.py runs). | |
| """ | |
| run_dir = _latest_audit_dir() | |
| if run_dir is None: | |
| return None | |
| summary_path = run_dir / "summary.json" | |
| report_path = run_dir / "report.md" | |
| try: | |
| launcher_summary = json.loads(summary_path.read_text(encoding="utf-8")) | |
| except Exception: | |
| launcher_summary = {} | |
| out: dict = { | |
| "run_id": launcher_summary.get("run_id") or run_dir.name, | |
| "personas_requested": launcher_summary.get("personas_requested"), | |
| "personas_completed": launcher_summary.get("personas_completed"), | |
| "elapsed_seconds": launcher_summary.get("elapsed_seconds"), | |
| "turns_total": None, | |
| "errors": None, | |
| "refusals": None, | |
| "p50_ms": None, | |
| "p95_ms": None, | |
| "p99_ms": None, | |
| "brain_routing": {}, | |
| } | |
| if not report_path.exists(): | |
| return out # launcher ran but analyze.py hasn't; surface what we have | |
| try: | |
| report_text = report_path.read_text(encoding="utf-8") | |
| except Exception: | |
| return out | |
| def _int_match(rx: re.Pattern[str]) -> Optional[int]: | |
| m = rx.search(report_text) | |
| if not m: | |
| return None | |
| try: | |
| return int(m.group(1)) | |
| except (ValueError, IndexError): | |
| return None | |
| # Pull values from report.md β overrides None defaults set above. Personas | |
| # completed lives in BOTH summary.json and report.md; report.md wins because | |
| # it reflects the actually-analyzed transcripts (in case the launcher | |
| # claimed N but only M wrote transcripts). | |
| p_match = _RE_REPORT_PERSONAS.search(report_text) | |
| if p_match: | |
| try: | |
| out["personas_completed"] = int(p_match.group(1)) | |
| except ValueError: | |
| pass | |
| out["turns_total"] = _int_match(_RE_REPORT_TURNS) | |
| out["errors"] = _int_match(_RE_REPORT_ERRORS) | |
| out["refusals"] = _int_match(_RE_REPORT_REFUSALS) | |
| out["p50_ms"] = _int_match(_RE_REPORT_P50) | |
| out["p95_ms"] = _int_match(_RE_REPORT_P95) | |
| out["p99_ms"] = _int_match(_RE_REPORT_P99) | |
| out["brain_routing"] = _parse_brain_routing(report_text) | |
| return out | |
| def _read_usage_24h() -> Optional[dict]: | |
| """Compute {role: {count, success_rate, avg_latency_ms}} from the last | |
| USAGE_TAIL_LINES entries of 40-data/llm_usage.jsonl. Returns None if the | |
| file is missing OR empty so the frontend can render an empty-state. | |
| Note: "24h" in the field name is conventional β the actual window is the | |
| last USAGE_TAIL_LINES rows (typically covers β24h of activity at current | |
| traffic). Keeping the name aligns with the admin UI label. | |
| The live stack emits the `brain` role (see | |
| providers.nvidia_nim_llm.get_brain_llm + llm_health.ROLES). The | |
| append-only llm_usage.jsonl can contain rows tagged with non-canonical | |
| role values, so we filter to the canonical role set β the same way | |
| /api/admin/usage does via its `chains` dict β instead of surfacing | |
| every role value present in the file. llm_health.ROLES is the single | |
| source of truth (also used as _LLM_HEALTH_CHAIN_ROLES below). | |
| """ | |
| usage_path = settings.DATA_DIR / "llm_usage.jsonl" | |
| rows = _tail_jsonl(usage_path, USAGE_TAIL_LINES) | |
| if not rows: | |
| return None | |
| canonical_roles = set(llm_health.ROLES) | |
| agg: dict[str, dict] = {} | |
| for r in rows: | |
| role = r.get("role") | |
| if not role or role not in canonical_roles: | |
| continue | |
| bucket = agg.setdefault(role, {"count": 0, "success_count": 0, "latency_sum": 0, | |
| "latency_n": 0}) | |
| bucket["count"] += 1 | |
| if r.get("success") is True: | |
| bucket["success_count"] += 1 | |
| lat = r.get("latency_ms") | |
| if isinstance(lat, (int, float)): | |
| bucket["latency_sum"] += int(lat) | |
| bucket["latency_n"] += 1 | |
| out: dict[str, dict] = {} | |
| for role, b in agg.items(): | |
| out[role] = { | |
| "count": b["count"], | |
| "success_rate": round(b["success_count"] / b["count"], 4) if b["count"] else 0.0, | |
| "avg_latency_ms": int(b["latency_sum"] / b["latency_n"]) if b["latency_n"] else 0, | |
| } | |
| return out | |
| async def admin_performance( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| """Aggregated performance/quality metrics for the admin Performance section. | |
| All four sub-blocks are independently nullable β a missing eval/results.json | |
| or absent audit run shouldn't 500 the endpoint. | |
| """ | |
| _check_admin(request, x_admin_password) | |
| return { | |
| "eval": _read_eval_summary(), | |
| "audit": _read_audit_summary(), | |
| "usage_24h": _read_usage_24h(), | |
| "snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # /api/admin/llm-health β KI-086 LLM Health & Credits snapshot | |
| # | |
| # Surfaces KI-080..KI-085 telemetry on the existing admin "LLM Chain" tab so | |
| # the operator can see at a glance: | |
| # - per-chain elected PRIMARY + BACKUP (KI-080) | |
| # - each candidate's latest probe latency + success rate (KI-080) | |
| # - credits remaining + unit + reset deadline (KI-085) | |
| # - degraded-until window when a candidate was demoted on 429 (KI-084) | |
| # - per-turn served-model distribution from the last N llm_usage.jsonl rows | |
| # | |
| # Response shape β three top-level keys (chains / candidates / recent_turns) + | |
| # a snapshot_ts. All durations on the wire are seconds-from-now (relative, | |
| # never absolute monotonic) so the frontend doesn't need to know the server's | |
| # monotonic clock origin. | |
| # --------------------------------------------------------------------------- | |
| # Map of the on-wire chain role name β human-friendly label. The roles | |
| # themselves match llm_health.get_primary() input strings exactly. | |
| # Three-chain collapse (2026-05-15) β only the brain role remains; we | |
| # defer to llm_health.ROLES so a future role only needs to be added once. | |
| _LLM_HEALTH_CHAIN_ROLES = llm_health.ROLES | |
| def _chain_names_map() -> dict[str, list[str]]: | |
| """Live (post-admin-override) chain config β read off the module so | |
| admin reorders applied earlier in the same process are reflected | |
| immediately. Only the brain role is present.""" | |
| from backend.providers import nvidia_nim_llm as nim | |
| return { | |
| "brain": list(getattr(nim, "BRAIN_CHAIN", [])), | |
| } | |
| def _seconds_until_monotonic(deadline: Optional[float]) -> Optional[float]: | |
| """Convert a monotonic-time deadline (as stamped in ModelHealth) into a | |
| seconds-from-now value the frontend can render as an ETA. Returns None | |
| when the deadline is missing OR already in the past; the caller decides | |
| how to render `None` vs `0`.""" | |
| if deadline is None: | |
| return None | |
| import time as _time | |
| rem = deadline - _time.monotonic() | |
| if rem <= 0: | |
| return 0.0 | |
| return round(rem, 1) | |
| def _probe_age_seconds(iso_ts: Optional[str]) -> Optional[float]: | |
| """Wall-clock seconds since a probe iso8601 timestamp. Cheap wrapper | |
| around llm_health._iso_age_seconds for the wire payload.""" | |
| age = llm_health._iso_age_seconds(iso_ts) | |
| if age is None: | |
| return None | |
| return max(0.0, round(age, 1)) | |
| def _success_rate_for(h) -> Optional[float]: | |
| """Last-N probes success rate as a float fraction. Returns None when no | |
| probe history yet.""" | |
| hist = getattr(h, "probe_history", None) or [] | |
| if not hist: | |
| return None | |
| hits = sum(1 for r in hist if r.get("ok")) | |
| return round(hits / len(hist), 4) | |
| def _candidate_snapshot(model: str, health, chain_membership: list[str], | |
| now_mono: float) -> dict: | |
| """Per-candidate row for Section B. Always returns a dict β even when | |
| the model has never been probed yet (status='unknown', everything else | |
| None) β so the frontend table doesn't have to handle missing rows.""" | |
| if health is None: | |
| return { | |
| "model": model, | |
| "provider": llm_health.provider_of(model), | |
| "chain_membership": chain_membership, | |
| "status": "unknown", | |
| "effective_status": "unknown", | |
| "latency_ms": None, | |
| "success_rate": None, | |
| "probe_age_seconds": None, | |
| "last_error": None, | |
| "last_status_code": None, | |
| "health_reason": None, | |
| "credits_remaining": None, | |
| "credits_unit": None, | |
| "credits_low_water": None, | |
| "credits_reset_in_seconds": None, | |
| "degraded_for_seconds": None, | |
| } | |
| deg_until = getattr(health, "degraded_until_monotonic", 0.0) or 0.0 | |
| deg_for = None | |
| if deg_until and deg_until > now_mono: | |
| deg_for = round(deg_until - now_mono, 1) | |
| # KI-202 β operator-facing reason for the admin Health column. None when | |
| # the row is healthy (renders "Live" only) or has no error signal yet. | |
| eff = llm_health.effective_status(health) | |
| if eff == "stale": | |
| health_reason = "stale" | |
| elif eff == "healthy": | |
| health_reason = None | |
| else: | |
| health_reason = llm_health._classify_error_reason( | |
| health.last_error, health.last_status_code, | |
| ) | |
| return { | |
| "model": model, | |
| "provider": llm_health.provider_of(model), | |
| "chain_membership": chain_membership, | |
| "status": health.status, | |
| "effective_status": eff, | |
| "latency_ms": health.latency_ms, | |
| "success_rate": _success_rate_for(health), | |
| "probe_age_seconds": _probe_age_seconds(health.tested_at), | |
| "last_error": health.last_error, | |
| "last_status_code": health.last_status_code, | |
| "health_reason": health_reason, | |
| "credits_remaining": health.credits_remaining, | |
| "credits_unit": health.credits_unit, | |
| "credits_low_water": health.credits_low_water, | |
| "credits_reset_in_seconds": _seconds_until_monotonic(health.credits_reset_at), | |
| "degraded_for_seconds": deg_for, | |
| } | |
| def _candidate_available_for_calls(h, now_mono: float) -> bool: | |
| """KI-122 β Is this candidate AVAILABLE RIGHT NOW for real chat traffic? | |
| This is the operator-facing definition of "live and usable": | |
| - probe says healthy or degraded (NOT 'down' / 'unknown') | |
| - has not been sin-binned by report_failure() in the last | |
| DEGRADED_WINDOW_SEC / DEGRADE_DURATION_LONG_S window | |
| - credits gate (`_has_credits`) passes β either no signal, OR | |
| credits_reset_at has elapsed (stale snapshot), OR credits_remaining | |
| is above the candidate's low_water. | |
| Returns False for None / unknown candidates.""" | |
| if h is None: | |
| return False | |
| if h.status in ("down", "unknown"): | |
| return False | |
| if h.degraded_until_monotonic and h.degraded_until_monotonic > now_mono: | |
| return False | |
| if not llm_health._has_credits(h, now_mono): | |
| return False | |
| return True | |
| def _candidate_credit_exhausted_strict(h, now_mono: float) -> bool: | |
| """STRICT credit-exhausted rule for a single candidate. | |
| True ONLY when ALL three hold: | |
| (1) credits_remaining is NOT None (we have a real signal) | |
| (2) credits_remaining <= credits_low_water | |
| (3) credits_reset_at is set AND in the FUTURE (we're inside an | |
| active gating window β stale or absent reset means the | |
| snapshot is not authoritative). | |
| This is intentionally stricter than `not _has_credits()` because the | |
| banner is louder than the elector. The elector falls through cheaply | |
| on a single bad candidate; this strictness avoids falsely scaring the | |
| operator when one quota-exhausted backup co-exists with a perfectly | |
| healthy primary.""" | |
| if h is None: | |
| return False | |
| if h.credits_remaining is None: | |
| return False | |
| if h.credits_remaining > (h.credits_low_water or 0.0): | |
| return False | |
| if h.credits_reset_at is None: | |
| # No scheduled reset (e.g. OpenRouter usd_balance β prepaid wallet). | |
| # Could still be a real "wallet empty" signal, BUT we won't flag the | |
| # chain banner on it alone because OpenRouter free-tier accounts | |
| # report $0 even when calls succeed. The probe / chat success | |
| # signal is the authoritative truth β fall through to the probe | |
| # status check in `_candidate_available_for_calls` instead. | |
| return False | |
| if now_mono >= h.credits_reset_at: | |
| # Reset window has elapsed β snapshot is stale, treat as permissive. | |
| return False | |
| return True | |
| def _chain_summary(role: str, chains: dict[str, list[str]], | |
| state: dict, now_mono: float) -> dict: | |
| """Per-chain block for Section A. Includes elected primary/backup + | |
| `chain_credit_exhausted` so the frontend can render a banner when every | |
| candidate in the chain is genuinely unusable.""" | |
| chain = chains.get(role) or [] | |
| primary = llm_health.get_primary(role) | |
| backup = llm_health.get_backup(role) | |
| # KI-122 (2026-05-15) β STRICT chain_credit_exhausted rule. | |
| # | |
| # Earlier rules (KI-085, KI-116) tried to skip cold-start candidates | |
| # and elapsed-reset snapshots but still left a hole: when one chain | |
| # member had a None reset_at + low credits (typical for OpenRouter's | |
| # usd_balance) while sibling NIM candidates had `credits_remaining=None` | |
| # (no signal yet), the loop registered `any_signal=True` on the bad | |
| # OpenRouter row but never broke out via a fresh NIM peer β so the | |
| # banner fired despite the elected NIM primary being HEALTHY Β· 100%. | |
| # | |
| # New rule: banner fires ONLY when EVERY chain member is BOTH | |
| # (a) not available_for_calls (down / sin-binned / credit-gated), AND | |
| # (b) at least one of those failures is a credit-exhaustion signal | |
| # (otherwise it's a "chain entirely down" situation, which | |
| # deserves a different banner β handled by the elected_primary | |
| # == None path in the frontend). | |
| members_info: list[dict] = [] | |
| any_available = False | |
| any_credit_exhausted = False | |
| last_probe_at_iso: Optional[str] = None | |
| for m in chain: | |
| h = state.get(m) | |
| available = _candidate_available_for_calls(h, now_mono) | |
| credit_exhausted = _candidate_credit_exhausted_strict(h, now_mono) | |
| if available: | |
| any_available = True | |
| if credit_exhausted: | |
| any_credit_exhausted = True | |
| # Track the most-recent probe timestamp across the chain. | |
| ts = getattr(h, "tested_at", None) if h is not None else None | |
| if ts and (last_probe_at_iso is None or ts > last_probe_at_iso): | |
| last_probe_at_iso = ts | |
| members_info.append({ | |
| "model": m, | |
| "available_for_calls": available, | |
| "credit_exhausted": credit_exhausted, | |
| "is_current_primary": bool(primary and m == primary), | |
| "is_current_backup": bool(backup and m == backup), | |
| }) | |
| # Banner fires only when: zero available members AND at least one | |
| # member is strictly credit-exhausted. If everyone is just 'down' | |
| # (probe failures) the banner is the wrong message β the operator | |
| # needs the "no eligible candidate" state, which the frontend already | |
| # renders via `elected_primary == None`. | |
| chain_credit_exhausted = bool((not any_available) and any_credit_exhausted) | |
| return { | |
| "role": role, | |
| "chain": chain, | |
| "chain_members": members_info, | |
| "elected_primary": primary, | |
| "elected_backup": backup, | |
| # KI-122 β explicit "currently in use" name + a boolean the | |
| # frontend uses to render the "β IN USE" pill without re-deriving. | |
| "current_primary": primary, | |
| "current_primary_available": _candidate_available_for_calls( | |
| state.get(primary) if primary else None, now_mono, | |
| ), | |
| "primary_snapshot": _candidate_snapshot( | |
| primary, state.get(primary), [role], now_mono, | |
| ) if primary else None, | |
| "backup_snapshot": _candidate_snapshot( | |
| backup, state.get(backup), [role], now_mono, | |
| ) if backup else None, | |
| "chain_credit_exhausted": chain_credit_exhausted, | |
| # KI-122 β operator-facing staleness signal. ISO string + a | |
| # convenience seconds-ago value so the UI can render "Probed Ns ago". | |
| "last_probe_at": last_probe_at_iso, | |
| "last_probe_age_seconds": _probe_age_seconds(last_probe_at_iso), | |
| } | |
| def _recent_turns(n: int = 20) -> list[dict]: | |
| """Section C β last N completed turns from 40-data/llm_usage.jsonl, | |
| most-recent-first. We keep the row shape close to the raw log entries | |
| so the frontend can render new fields if the producer adds them. | |
| Fields surfaced (all optional β older rows pre-KI-080 won't carry | |
| elected_primary/backup): | |
| ts / role / chain_primary / elected_primary / elected_backup / | |
| served_model / latency_ms / success / fallback_reason | |
| """ | |
| usage_path = settings.DATA_DIR / "llm_usage.jsonl" | |
| # Re-use the existing tail-reader; cap at N so we don't pay for the | |
| # full 1000-row tail when the panel only renders 20. | |
| rows = _tail_jsonl(usage_path, max(n, 20)) | |
| if not rows: | |
| return [] | |
| out: list[dict] = [] | |
| # tail_jsonl returns oldest-first; reverse to newest-first. | |
| for r in reversed(rows[-n:]): | |
| out.append({ | |
| "ts": r.get("ts"), | |
| "role": r.get("role"), | |
| "chain_primary": r.get("chain_primary"), | |
| "elected_primary": r.get("elected_primary"), | |
| "elected_backup": r.get("elected_backup"), | |
| "served_model": r.get("served_model"), | |
| "latency_ms": r.get("latency_ms"), | |
| "success": r.get("success"), | |
| "fallback_reason": r.get("fallback_reason"), | |
| }) | |
| return out | |
| async def admin_llm_health( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| """KI-086 β composite LLM health + credits snapshot for the LLM Chain tab. | |
| Returns three keys: | |
| chains β Section A: one entry per chain (brain β the only | |
| remaining role after the three-chain collapse) with | |
| elected primary + backup + their snapshots + | |
| chain_credit_exhausted banner flag. | |
| candidates β Section B: one row per known candidate across all chains, | |
| with chain_membership listing every chain it appears in, | |
| latency / success / credits / degraded-window state. | |
| recent_turns β Section C: last 20 served turns from llm_usage.jsonl. | |
| Plus snapshot_ts so the UI can show "updated <wallclock>". | |
| """ | |
| _check_admin(request, x_admin_password) | |
| # KI-088: admin endpoint must never trigger probes β read cached state only. | |
| # Live probing from the admin tab (polled every 30s by the frontend) would | |
| # stack 6+ NIM candidates onto the same per-key concurrency budget and | |
| # starve user chat traffic. All data below comes from llm_health.load() | |
| # (in-memory snapshot persisted by the background_probe_loop) and the | |
| # llm_usage.jsonl append-only log β both are read-only and trigger zero | |
| # outbound LLM calls. | |
| import time as _time | |
| now_mono = _time.monotonic() | |
| chains = _chain_names_map() | |
| state = llm_health.load() # {model -> ModelHealth} (cached snapshot only) | |
| # Section A: per-chain election + credit banner. | |
| chains_block = [ | |
| _chain_summary(role, chains, state, now_mono) | |
| for role in _LLM_HEALTH_CHAIN_ROLES | |
| ] | |
| # Section B: every known candidate Γ chain membership (deduped). | |
| # Membership is the list of chain roles a model belongs to. | |
| membership: dict[str, list[str]] = {} | |
| for role, models in chains.items(): | |
| for m in models: | |
| membership.setdefault(m, []).append(role) | |
| # Also include any candidate present in state but not currently in any | |
| # chain (admin reorder may have just removed it) so the operator can | |
| # still see its last probe + credits. | |
| for m in state.keys(): | |
| membership.setdefault(m, []) | |
| candidates_block = [ | |
| _candidate_snapshot(m, state.get(m), membership[m], now_mono) | |
| for m in membership.keys() | |
| ] | |
| # Sort: degraded first (red), then non-healthy (amber), healthy last, | |
| # then by model name. Helps the operator see problems at the top. | |
| _status_rank = {"down": 0, "degraded": 1, "unknown": 2, "healthy": 3} | |
| candidates_block.sort( | |
| key=lambda c: (_status_rank.get(c["status"], 9), c["model"]) | |
| ) | |
| # Section C: last 20 turns. | |
| recent = _recent_turns(20) | |
| return { | |
| "chains": chains_block, | |
| "candidates": candidates_block, | |
| "recent_turns": recent, | |
| "snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # A5 β Audit fix #4: /api/admin/persona-drift β slot-capture completeness | |
| # for the last 20 personas. Seven canonical required slots: name, age, | |
| # dependents, location_tier, income_band, primary_goal, health_conditions. | |
| # <50% capture is flagged red on the frontend. | |
| # --------------------------------------------------------------------------- | |
| # The seven canonical "ready-to-recommend" fact-find slots. Mirrors | |
| # brain_tools._REQUIRED_FOR_READY (the recommendation-ready required set) | |
| # and needs_finder.Profile's persisted fields. Adding/removing a slot must | |
| # stay in sync with that required set. Health is captured as a list (empty | |
| # list counts as "asked but no conditions" β still a valid captured signal | |
| # once the `asked` array contains the field name). | |
| _PERSONA_DRIFT_SLOTS = ("name", "age", "dependents", "location_tier", | |
| "income_band", "primary_goal", "health_conditions") | |
| def _slot_captured(profile: dict, slot: str, asked: list[str]) -> bool: | |
| """A slot is 'captured' when the field has a non-empty value OR (for | |
| health_conditions) when the user was asked and confirmed no conditions | |
| (asked-list contains the slot but the list is empty β still a positive | |
| answer the bot heard, not a missing signal).""" | |
| v = profile.get(slot) | |
| if slot == "health_conditions": | |
| if isinstance(v, list) and len(v) > 0: | |
| return True | |
| if "health_conditions" in (asked or []): | |
| return True | |
| return False | |
| return v not in (None, "", [], 0) | |
| async def admin_persona_drift( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| """Slot-capture completeness for LIVE in-memory sessions (newest first). | |
| Pre-ADR-043 this walked `40-data/profiles/*.json` and showed every | |
| named visitor's slot-capture progress. The on-disk store is gone; | |
| operators now see the live picture only. | |
| """ | |
| _check_admin(request, x_admin_password) | |
| from backend import session_state as _ss | |
| rows: list[dict] = [] | |
| with _ss._lock: | |
| for sid, s in list(_ss._sessions.items()): | |
| p = s.profile | |
| profile_dict = {fld: getattr(p, fld, None) for fld in _PERSONA_DRIFT_SLOTS} | |
| asked = list(getattr(p, "asked", None) or []) | |
| captured = [s_ for s_ in _PERSONA_DRIFT_SLOTS if _slot_captured(profile_dict, s_, asked)] | |
| missing = [s_ for s_ in _PERSONA_DRIFT_SLOTS if s_ not in captured] | |
| rows.append({ | |
| "persona_id": sid, | |
| "name_display": (getattr(p, "name", None) or "β"), | |
| "last_seen": datetime.fromtimestamp(s.last_touched, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| "captured_slots": captured, | |
| "missing_slots": missing, | |
| "completeness_pct": round(100.0 * len(captured) / len(_PERSONA_DRIFT_SLOTS), 1), | |
| "session_count": 1, | |
| }) | |
| rows.sort(key=lambda r: (r["last_seen"] or ""), reverse=True) | |
| rows = rows[:20] | |
| return { | |
| "personas": rows, | |
| "total": len(rows), | |
| "slots": list(_PERSONA_DRIFT_SLOTS), | |
| "snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # A5 β Audit fix #5: /api/admin/recommendation-history β last 10 policy | |
| # recommendation events across all profiles, newest first. | |
| # --------------------------------------------------------------------------- | |
| async def admin_recommendation_history( | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| """Last 10 policy-event entries across LIVE in-memory sessions. | |
| Pre-ADR-043 this walked `40-data/profiles/*.json` and surfaced every | |
| historical shown/selected/rejected event the bot had ever logged. The | |
| on-disk store is gone; this now reflects the current container's live | |
| sessions only β historical events evict with the session. | |
| """ | |
| _check_admin(request, x_admin_password) | |
| from backend import session_state as _ss | |
| events: list[dict] = [] | |
| with _ss._lock: | |
| for sid, s in list(_ss._sessions.items()): | |
| p = s.profile | |
| name_display = (getattr(p, "name", None) or "β") | |
| for evt_type, field_name in (("shown", "shown_policies"), | |
| ("selected", "selected_policies"), | |
| ("rejected", "rejected_policies")): | |
| for entry in (getattr(p, field_name, None) or []): | |
| events.append({ | |
| "persona_id": sid, | |
| "name_display": name_display, | |
| "event_type": evt_type, | |
| "policy_slug": (entry or {}).get("policy_slug"), | |
| "insurer": (entry or {}).get("insurer"), | |
| "event_at": (entry or {}).get("event_at"), | |
| "session_id": (entry or {}).get("session_id"), | |
| "reason": (entry or {}).get("reason"), | |
| "outcome": evt_type, | |
| }) | |
| events.sort(key=lambda e: (e["event_at"] or ""), reverse=True) | |
| events = events[:10] | |
| return { | |
| "events": events, | |
| "total": len(events), | |
| "snapshot_ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # #77 / #52-residual-5 β operator prune of persisted uploaded marketplace | |
| # docs. Persisted uploads have NO TTL by design (they are public catalogue | |
| # cards); this is the sanctioned way to remove a test/abuse upload. Removes | |
| # the persisted dir (path-safety-guarded in backend.uploaded_docs) + the | |
| # doc's chunks from the GLOBAL policies collection + busts the marketplace | |
| # grade / corpus-pdf caches so /api/policies/all drops the card at once. | |
| # --------------------------------------------------------------------------- | |
| class UploadedDocPruneRequest(BaseModel): | |
| policy_id: Optional[str] = None | |
| prefix: Optional[str] = None # e.g. "user-upload__e2e-verify" β bulk | |
| async def admin_prune_uploaded_docs( | |
| body: UploadedDocPruneRequest, | |
| request: Request, | |
| x_admin_password: Optional[str] = Header(default=None, alias="X-Admin-Password"), | |
| ): | |
| _check_admin(request, x_admin_password) | |
| if not body.policy_id and body.prefix is None: | |
| raise HTTPException(status_code=400, detail="provide policy_id or prefix") | |
| from backend import uploaded_docs as _udocs | |
| res = _udocs.prune_persisted_upload(body.policy_id, prefix=body.prefix) | |
| chroma_deleted: list[str] = [] | |
| chroma_errors: list[str] = [] | |
| if res.get("removed"): | |
| try: | |
| from rag.ingest import get_chroma_collection | |
| _col = get_chroma_collection() | |
| for pid in res["removed"]: | |
| try: | |
| _col.delete(where={"policy_id": pid}) | |
| chroma_deleted.append(pid) | |
| except Exception as e: # noqa: BLE001 β surface, don't swallow | |
| chroma_errors.append(f"{pid}: {type(e).__name__}: {e}") | |
| except Exception as e: # noqa: BLE001 | |
| chroma_errors.append(f"collection: {type(e).__name__}: {e}") | |
| cache_bust = "ok" | |
| try: | |
| import backend.main as _m | |
| _m._CORPUS_PDF_IDX = None | |
| _mg = getattr(_m, "_MG_CACHE", None) | |
| if isinstance(_mg, dict): | |
| _mg["sig"] = None | |
| _mg["index"] = None | |
| except Exception as e: # noqa: BLE001 | |
| cache_bust = f"{type(e).__name__}: {e}" | |
| return { | |
| **res, | |
| "chroma_deleted": chroma_deleted, | |
| "chroma_errors": chroma_errors, | |
| "cache_bust": cache_bust, | |
| } | |