asmitha2025
Fix layout coloring and contrast across light and dark themes
ff8ed78
Raw
History Blame Contribute Delete
69.9 kB
from __future__ import annotations
import csv
import io
import json
import os
import sqlite3
import tempfile
from copy import deepcopy
from datetime import datetime, timedelta, timezone
from pathlib import Path
from evaluator import evaluate_dataset, evaluate_samples, evaluator_capabilities
ROOT = Path(__file__).resolve().parents[1]
SAMPLE_DATASET = ROOT / "datasets" / "sentinel_eval_samples.csv"
STATE_PATH = ROOT / "backend" / "state.json"
DB_PATH = Path(tempfile.gettempdir()) / "llm-sentinel-pro" / "sentinel.db"
STATE_VERSION = 1
STATE_ROW_ID = "default"
INITIAL_METRICS = {
"node": "Sentinel-Alpha-9",
"system_status": "System Healthy",
"status_level": "healthy",
"semantic_drift": 0.142,
"hallucination_rate": 2.1,
"statistical_drift": 0.05,
"latency_ms": 842,
"cost_delta": -12.40,
"confidence": 84,
"updated_at": "2026-05-24T12:30:00+05:30",
}
CRITICAL_METRICS = {
**INITIAL_METRICS,
"system_status": "Critical Drift",
"status_level": "critical",
"semantic_drift": 0.318,
"hallucination_rate": 14.8,
"statistical_drift": 0.82,
"latency_ms": 2441,
"confidence": 94,
}
DEFAULT_SETTINGS = {
"semantic_drift_threshold": 0.30,
"hallucination_rate_threshold": 10.0,
"slack_alerts": True,
"email_alerts": True,
"model_name": "GPT-4o Support Primary",
"prompt_version": "support-template-v4",
"guardrail_policy": "Guardrail-Alpha Strict",
}
RANGE_WINDOWS = {
"24h": timedelta(hours=24),
"7d": timedelta(days=7),
"30d": timedelta(days=30),
}
INITIAL_TIMESERIES = [
{"label": "T-6", "accuracy": 0.92, "safety": 0.95},
{"label": "T-5", "accuracy": 0.91, "safety": 0.94},
{"label": "T-4", "accuracy": 0.93, "safety": 0.95},
{"label": "T-3", "accuracy": 0.90, "safety": 0.93},
{"label": "T-2", "accuracy": 0.91, "safety": 0.94},
{"label": "T-1", "accuracy": 0.89, "safety": 0.92},
{"label": "Now", "accuracy": 0.92, "safety": 0.95},
]
SCENARIO = {
"title": "Support Assistant Regression After Prompt Template Update",
"monitoring_node": "Sentinel-Alpha-9",
"model_surface": "Customer Support Copilot",
"audience": "AI reliability, safety, and support operations teams",
"change_event": "Prompt template v4 shipped with broader answer-completion instructions.",
"demo_story": (
"A production support assistant begins giving unsupported or unsafe answers after a prompt "
"template update. Sentinel evaluates sampled responses, detects hallucination and drift, "
"opens alerts, and produces a root-cause report for review."
),
"primary_finding": {
"title": "Unsafe support-policy regression",
"detail": "Password, medical, financial, and legal responses include unsupported instructions that require review.",
},
"secondary_finding": {
"title": "Cross-domain faithfulness drift",
"detail": "The regression is not isolated to one prompt; multiple high-risk categories diverge from expected answers.",
},
"correlation_note": (
"The strongest signal correlates with the prompt-template rollout: higher completion freedom "
"increases unsupported claims across high-risk support workflows."
),
}
INCIDENTS = [
{
"id": "INC-9421-RCA",
"severity": "critical",
"title": "Support assistant prompt template regression",
"summary": "Prompt template v4 produced unsupported high-risk support guidance.",
"affected_area": "Support Bot",
"status": "Open",
"owner": "Safety",
"started_at": "14:02:11",
"duration": "14m 22s",
"impact_radius": "1,242 Sessions",
"confidence": 94,
"risk_level": "Critical",
"timeline": [
{"time": "14:02:11", "label": "Anomalous Prompt Detected", "level": "neutral"},
{"time": "14:03:45", "label": "Safety Guardrail Bypassed", "level": "danger"},
{"time": "14:12:00", "label": "Auto-containment Active", "level": "blue"},
{"time": "14:16:33", "label": "System Normalized", "level": "green"},
],
"evidence": [
{
"timestamp": "14:03:44.022",
"trace_id": "tr_827x_m91",
"signal_type": "Injected Prompt",
"level": "red",
"details": "Detection of DAN-variant prompt payload.",
},
{
"timestamp": "14:04:12.101",
"trace_id": "tr_827x_m94",
"signal_type": "Token Surge",
"level": "amber",
"details": "Prompt token count exceeded baseline by 128%.",
},
{
"timestamp": "14:11:55.783",
"trace_id": "tr_827x_m99",
"signal_type": "Containment",
"level": "green",
"details": "Policy route moved to high-safety model.",
},
],
}
]
PROVIDERS = [
{"provider": "OpenAI", "model": "GPT-4o", "truthfulqa": "88.4%", "hallucination": "4.8%", "latency": "842ms", "cost": "$0.005", "decision": "Primary", "tone": "green"},
{"provider": "Anthropic", "model": "Claude Sonnet", "truthfulqa": "86.9%", "hallucination": "5.2%", "latency": "910ms", "cost": "$0.006", "decision": "Fallback", "tone": "green"},
{"provider": "Google", "model": "Gemini", "truthfulqa": "83.1%", "hallucination": "7.6%", "latency": "760ms", "cost": "$0.004", "decision": "Review", "tone": "amber"},
{"provider": "Meta", "model": "Llama", "truthfulqa": "76.8%", "hallucination": "11.4%", "latency": "620ms", "cost": "$0.001", "decision": "Low Cost", "tone": "amber"},
]
BENCHMARKS = [
{"name": "TruthfulQA", "score": "88.4%", "note": "Strong factual consistency on in-domain examples."},
{"name": "MMLU", "score": "81.6%", "note": "Stable general reasoning across sampled categories."},
{"name": "GSM8K", "score": "74.2%", "note": "Moderate arithmetic reliability under temperature variance."},
{"name": "OOD Diagnosis", "score": "62.0%", "note": "Known limitation: novel root causes need human review."},
]
ALERTS = [
{
"severity": "Warning",
"tone": "amber",
"incident": "Latency Spike",
"summary": "P99 response time increased above baseline.",
"affected_area": "API Gateway",
"status": "Watching",
"owner": "Platform",
"age": "15m ago",
},
{
"severity": "Policy",
"tone": "green",
"incident": "Drift Recalibrated",
"summary": "Baseline drift profile updated for Legal QA.",
"affected_area": "Legal QA",
"status": "Resolved",
"owner": "Evaluation",
"age": "3h ago",
},
]
STATE = {
"metrics": deepcopy(INITIAL_METRICS),
"settings": deepcopy(DEFAULT_SETTINGS),
"evaluation_count": 2450,
"last_event": None,
"category_scores": [],
"hallucination_logs": [],
"evidence": [],
"root_cause": None,
"timeline": [],
"signals": [],
"quality_timeseries": deepcopy(INITIAL_TIMESERIES),
"evaluation_runs": [],
"datasets": [],
}
PERSISTED_KEYS = [
"metrics",
"settings",
"evaluation_count",
"last_event",
"category_scores",
"hallucination_logs",
"evidence",
"root_cause",
"timeline",
"signals",
"quality_timeseries",
"evaluation_runs",
"datasets",
]
def configured_state_backend() -> str:
raw_backend = os.environ.get("SENTINEL_STATE_BACKEND", "local-json").strip().lower()
if raw_backend in {"sqlite", "sqlite3"}:
return "sqlite"
return "local-json"
def state_storage_path() -> Path:
if configured_state_backend() == "sqlite":
return Path(os.environ.get("SENTINEL_DB_PATH", str(DB_PATH))).expanduser()
return STATE_PATH
def build_state_payload() -> dict:
return {
"version": STATE_VERSION,
"saved_at": datetime.now(timezone.utc).isoformat(),
"state": {key: deepcopy(STATE[key]) for key in PERSISTED_KEYS},
}
def apply_state_payload(payload: dict) -> bool:
if payload.get("version") != STATE_VERSION:
return False
persisted = payload.get("state", {})
for key in PERSISTED_KEYS:
if key in persisted:
STATE[key] = persisted[key]
return True
def save_state_json(payload: dict) -> None:
STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
STATE_PATH.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def load_state_json() -> None:
if not STATE_PATH.exists():
return
try:
payload = json.loads(STATE_PATH.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return
apply_state_payload(payload)
def ensure_sqlite_schema(connection: sqlite3.Connection) -> None:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS sentinel_state (
id TEXT PRIMARY KEY,
version INTEGER NOT NULL,
saved_at TEXT NOT NULL,
payload TEXT NOT NULL
)
"""
)
def save_state_sqlite(payload: dict) -> None:
db_path = state_storage_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path) as connection:
ensure_sqlite_schema(connection)
connection.execute(
"""
INSERT INTO sentinel_state (id, version, saved_at, payload)
VALUES (?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
version = excluded.version,
saved_at = excluded.saved_at,
payload = excluded.payload
""",
(STATE_ROW_ID, payload["version"], payload["saved_at"], json.dumps(payload)),
)
def load_state_sqlite() -> None:
db_path = state_storage_path()
if not db_path.exists():
return
try:
with sqlite3.connect(db_path) as connection:
ensure_sqlite_schema(connection)
row = connection.execute(
"SELECT payload FROM sentinel_state WHERE id = ?",
(STATE_ROW_ID,),
).fetchone()
except sqlite3.Error:
return
if not row:
return
try:
payload = json.loads(row[0])
except json.JSONDecodeError:
return
apply_state_payload(payload)
def save_state() -> None:
payload = build_state_payload()
if configured_state_backend() == "sqlite":
save_state_sqlite(payload)
else:
save_state_json(payload)
def load_state() -> None:
if configured_state_backend() == "sqlite":
load_state_sqlite()
else:
load_state_json()
load_state()
def health() -> dict:
return {
"ok": True,
"service": "llm-sentinel-pro-api",
"mode": "fastapi-local",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def get_metrics() -> dict:
return deepcopy(STATE["metrics"])
def get_scenario() -> dict:
return deepcopy(SCENARIO)
def metrics_to_quality_point(metrics: dict, label: str = "Now") -> dict:
accuracy = max(0.0, min(1.0, 1 - metrics["semantic_drift"] * 0.95))
safety = max(0.0, min(1.0, 1 - metrics["hallucination_rate"] / 100))
return {
"label": label,
"accuracy": round(accuracy, 3),
"safety": round(safety, 3),
}
def get_quality_timeseries() -> dict:
return {"items": deepcopy(STATE["quality_timeseries"])}
def parse_timestamp(value: str | None) -> datetime | None:
if not value:
return None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def runs_for_range(range_key: str) -> list[dict]:
window = RANGE_WINDOWS.get(range_key, RANGE_WINDOWS["24h"])
cutoff = datetime.now(timezone.utc) - window
runs = []
for run in STATE["evaluation_runs"]:
created_at = parse_timestamp(run.get("created_at"))
if created_at and created_at >= cutoff:
runs.append(run)
return runs
def get_range_summary(range_key: str = "24h") -> dict:
normalized = range_key if range_key in RANGE_WINDOWS else "24h"
runs = runs_for_range(normalized)
if runs:
series = [
{
"label": run["id"],
"accuracy": metrics_to_quality_point({"semantic_drift": run["semantic_drift"], "hallucination_rate": run["hallucination_rate"]})["accuracy"],
"safety": metrics_to_quality_point({"semantic_drift": run["semantic_drift"], "hallucination_rate": run["hallucination_rate"]})["safety"],
"created_at": run["created_at"],
}
for run in runs[-8:]
]
avg_semantic = sum(run["semantic_drift"] for run in runs) / len(runs)
avg_hallucination = sum(run["hallucination_rate"] for run in runs) / len(runs)
else:
series = deepcopy(INITIAL_TIMESERIES)
avg_semantic = STATE["metrics"]["semantic_drift"]
avg_hallucination = STATE["metrics"]["hallucination_rate"]
return {
"range": normalized,
"run_count": len(runs),
"critical_count": sum(1 for run in runs if run["status_level"] == "critical"),
"warning_count": sum(1 for run in runs if run["status_level"] == "warning"),
"avg_semantic_drift": round(avg_semantic, 3),
"avg_hallucination_rate": round(avg_hallucination, 1),
"items": series,
}
def append_quality_point(metrics: dict) -> None:
history = deepcopy(STATE["quality_timeseries"])
next_index = STATE["evaluation_count"] - 2450
for row in history:
if row["label"] == "Now":
row["label"] = "Prev"
history.append(metrics_to_quality_point(metrics, f"Eval {next_index}"))
STATE["quality_timeseries"] = history[-8:]
def get_evaluation_history() -> dict:
return {"items": list(reversed(deepcopy(STATE["evaluation_runs"])))}
def runtime_metadata() -> dict:
settings = get_settings()
return {
"model_name": settings["model_name"],
"prompt_version": settings["prompt_version"],
"guardrail_policy": settings["guardrail_policy"],
}
COMPARISON_METRICS = [
{"key": "semantic_drift", "label": "Semantic Drift", "unit": "", "better": "lower", "precision": 3},
{"key": "hallucination_rate", "label": "Hallucination Rate", "unit": "%", "better": "lower", "precision": 1},
{"key": "statistical_drift", "label": "KL Divergence", "unit": "", "better": "lower", "precision": 3},
{"key": "latency_ms", "label": "Latency", "unit": "ms", "better": "lower", "precision": 0},
{"key": "confidence", "label": "Confidence", "unit": "%", "better": "higher", "precision": 0},
]
DECISION_STATUSES = {
"pending_review": "Pending Review",
"approved": "Approved",
"rejected": "Rejected",
"rollback": "Rollback Required",
}
CRITICAL_APPROVAL_MESSAGE = 'Critical drift approval requires an exception note that includes "exception" or "risk accepted".'
def round_metric(value: float, precision: int) -> float | int:
return int(round(value)) if precision == 0 else round(value, precision)
def compare_metric(current: dict, previous: dict, metric: dict) -> dict:
current_value = float(current.get(metric["key"], 0))
previous_value = float(previous.get(metric["key"], 0))
delta = current_value - previous_value
tolerance = 0.0005 if metric["precision"] > 1 else 0.05
if abs(delta) <= tolerance:
direction = "flat"
tone = "neutral"
else:
lower_is_better = metric["better"] == "lower"
improved = delta < 0 if lower_is_better else delta > 0
direction = "improved" if improved else "regressed"
tone = "good" if improved else "bad"
return {
"key": metric["key"],
"label": metric["label"],
"unit": metric["unit"],
"current": round_metric(current_value, metric["precision"]),
"previous": round_metric(previous_value, metric["precision"]),
"delta": round_metric(delta, metric["precision"]),
"direction": direction,
"tone": tone,
}
def run_summary(run: dict) -> dict:
return {
"id": run["id"],
"created_at": run["created_at"],
"status": run["status"],
"status_level": run["status_level"],
"source": run.get("source", "Evaluation"),
"dataset_id": run.get("dataset_id"),
"dataset_name": run.get("dataset_name"),
"sample_count": run.get("sample_count"),
"top_category": run.get("top_category", "None"),
"semantic_drift": run.get("semantic_drift", 0),
"hallucination_rate": run.get("hallucination_rate", 0),
"statistical_drift": run.get("statistical_drift", 0),
"latency_ms": run.get("latency_ms", 0),
"confidence": run.get("confidence", 0),
"message": run.get("message", ""),
"model_name": run.get("model_name", "Unknown model"),
"prompt_version": run.get("prompt_version", "Unknown prompt"),
"guardrail_policy": run.get("guardrail_policy", "Unknown policy"),
"decision_status": run.get("decision_status", "pending_review"),
"decision_label": DECISION_STATUSES.get(run.get("decision_status", "pending_review"), "Pending Review"),
"decision_note": run.get("decision_note", ""),
"decision_updated_at": run.get("decision_updated_at"),
}
def find_run(run_id: str) -> dict:
for run in STATE["evaluation_runs"]:
if run["id"] == run_id:
return run
raise ValueError("Evaluation run not found.")
def evaluation_run_summaries() -> list[dict]:
return list(reversed([run_summary(run) for run in STATE["evaluation_runs"]]))
def comparison_payload(current: dict, previous: dict, message: str, mode: str = "latest") -> dict:
metrics = [compare_metric(current, previous, metric) for metric in COMPARISON_METRICS]
improved_count = sum(1 for metric in metrics if metric["direction"] == "improved")
regressed_count = sum(1 for metric in metrics if metric["direction"] == "regressed")
if regressed_count > improved_count:
verdict = {
"label": "Regression detected",
"tone": "bad",
"detail": f"{regressed_count} metric{'s' if regressed_count != 1 else ''} worsened compared with the baseline run.",
}
elif improved_count > regressed_count:
verdict = {
"label": "Quality improved",
"tone": "good",
"detail": f"{improved_count} metric{'s' if improved_count != 1 else ''} improved compared with the baseline run.",
}
else:
verdict = {
"label": "No material change",
"tone": "neutral",
"detail": "Current run is broadly in line with the baseline run.",
}
return {
"available": True,
"mode": mode,
"message": message,
"current": run_summary(current),
"previous": run_summary(previous),
"metrics": metrics,
"verdict": verdict,
"runs": evaluation_run_summaries(),
}
def get_evaluation_comparison() -> dict:
runs = STATE["evaluation_runs"]
if len(runs) < 2:
latest = run_summary(runs[-1]) if runs else None
return {
"available": False,
"mode": "latest",
"message": "Run at least two evaluations to compare quality changes.",
"current": latest,
"previous": None,
"metrics": [],
"verdict": {"label": "Waiting for baseline", "tone": "neutral", "detail": "No previous run is available yet."},
"runs": evaluation_run_summaries(),
}
return comparison_payload(runs[-1], runs[-2], "Latest evaluation compared with the previous run.")
def get_evaluation_pair_comparison(current_id: str, previous_id: str) -> dict:
if current_id == previous_id:
raise ValueError("Choose two different evaluation runs.")
current = find_run(current_id)
previous = find_run(previous_id)
return comparison_payload(current, previous, f"{current_id} compared with {previous_id}.", "selected")
def has_critical_approval_exception(note: str) -> bool:
normalized = note.lower()
return "exception" in normalized or "risk accepted" in normalized or "accepted risk" in normalized
def update_run_decision(run_id: str, payload: dict) -> dict:
run = find_run(run_id)
status = clean_payload_text(payload, "decision_status", "pending_review")
if status not in DECISION_STATUSES:
raise ValueError("Unsupported decision status.")
note = clean_payload_text(payload, "decision_note")[:600]
if status == "approved" and run.get("status_level") == "critical" and not has_critical_approval_exception(note):
raise ValueError(CRITICAL_APPROVAL_MESSAGE)
run["decision_status"] = status
run["decision_note"] = note
run["decision_updated_at"] = datetime.now(timezone.utc).isoformat()
save_state()
return evaluation_run_detail(run_id)
def append_evaluation_run(
message: str,
source: str = "Dataset Replay",
dataset_id: str | None = None,
dataset_name: str | None = None,
sample_count: int | None = None,
) -> dict:
root_cause = STATE["root_cause"] or {}
run_number = STATE["evaluation_count"] - 2450
run = {
"id": f"EVAL-{run_number:03d}",
"created_at": STATE["metrics"]["updated_at"],
"status": STATE["metrics"]["system_status"],
"status_level": STATE["metrics"]["status_level"],
"semantic_drift": STATE["metrics"]["semantic_drift"],
"hallucination_rate": STATE["metrics"]["hallucination_rate"],
"statistical_drift": STATE["metrics"]["statistical_drift"],
"latency_ms": STATE["metrics"]["latency_ms"],
"confidence": STATE["metrics"]["confidence"],
"top_category": root_cause.get("top_category", "None"),
"risk_level": root_cause.get("risk_level", "Healthy"),
"incident_opened": STATE["last_event"] is not None,
"message": message,
"source": source,
"dataset_id": dataset_id,
"dataset_name": dataset_name,
"sample_count": sample_count,
"decision_status": "pending_review",
"decision_note": "",
"decision_updated_at": None,
**runtime_metadata(),
}
STATE["evaluation_runs"].append(run)
STATE["evaluation_runs"] = STATE["evaluation_runs"][-25:]
return run
def get_settings() -> dict:
return {**deepcopy(DEFAULT_SETTINGS), **deepcopy(STATE["settings"])}
def update_settings(payload: dict) -> dict:
settings = get_settings()
if "semantic_drift_threshold" in payload:
settings["semantic_drift_threshold"] = max(0.01, min(float(payload["semantic_drift_threshold"]), 1.0))
if "hallucination_rate_threshold" in payload:
settings["hallucination_rate_threshold"] = max(0.1, min(float(payload["hallucination_rate_threshold"]), 100.0))
if "slack_alerts" in payload:
settings["slack_alerts"] = bool(payload["slack_alerts"])
if "email_alerts" in payload:
settings["email_alerts"] = bool(payload["email_alerts"])
if "model_name" in payload:
settings["model_name"] = clean_payload_text(payload, "model_name", DEFAULT_SETTINGS["model_name"]) or DEFAULT_SETTINGS["model_name"]
if "prompt_version" in payload:
settings["prompt_version"] = clean_payload_text(payload, "prompt_version", DEFAULT_SETTINGS["prompt_version"]) or DEFAULT_SETTINGS["prompt_version"]
if "guardrail_policy" in payload:
settings["guardrail_policy"] = clean_payload_text(payload, "guardrail_policy", DEFAULT_SETTINGS["guardrail_policy"]) or DEFAULT_SETTINGS["guardrail_policy"]
STATE["settings"] = settings
save_state()
return get_settings()
def classify_metrics(metrics: dict) -> dict:
settings = STATE["settings"]
critical = (
metrics["semantic_drift"] >= settings["semantic_drift_threshold"]
or metrics["hallucination_rate"] >= settings["hallucination_rate_threshold"]
)
warning = (
metrics["semantic_drift"] >= settings["semantic_drift_threshold"] * 0.8
or metrics["hallucination_rate"] >= settings["hallucination_rate_threshold"] * 0.8
)
if critical:
return {
**metrics,
"system_status": "Critical Drift",
"status_level": "critical",
}
if warning:
return {
**metrics,
"system_status": "Elevated Risk",
"status_level": "warning",
}
return {
**metrics,
"system_status": "System Healthy",
"status_level": "healthy",
}
def build_timeline(metrics: dict) -> list[dict]:
if metrics["status_level"] == "healthy":
return [
{"time": "12:30:00", "label": "Baseline Within Threshold", "level": "green"},
{"time": "12:31:18", "label": "Routine Drift Check Complete", "level": "neutral"},
{"time": "12:34:44", "label": "No Active Containment Needed", "level": "green"},
]
if metrics["status_level"] == "warning":
return [
{"time": "14:02:11", "label": "Evaluation Batch Started", "level": "neutral"},
{"time": "14:04:30", "label": "Elevated Risk Band Reached", "level": "blue"},
{"time": "14:08:09", "label": "Manual Review Recommended", "level": "danger"},
{"time": "14:16:33", "label": "Warning Report Generated", "level": "green"},
]
return [
{"time": "14:02:11", "label": "Evaluation Batch Started", "level": "neutral"},
{"time": "14:03:45", "label": "Critical Drift Threshold Crossed", "level": "danger"},
{"time": "14:08:09", "label": "Low-Faithfulness Cluster Identified", "level": "danger"},
{"time": "14:12:00", "label": "High-Safety Routing Recommended", "level": "blue"},
{"time": "14:16:33", "label": "Root Cause Report Generated", "level": "green"},
]
def build_signals(metrics: dict) -> list[dict]:
settings = STATE["settings"]
hallucination_alert = metrics["hallucination_rate"] >= settings["hallucination_rate_threshold"]
semantic_alert = metrics["semantic_drift"] >= settings["semantic_drift_threshold"]
return [
{
"label": "P99 Latency",
"value": f"{int(metrics['latency_ms']):,}ms",
"trend": "+42%" if metrics["status_level"] == "critical" else "Stable",
"tone": "bad" if metrics["status_level"] == "critical" else "good",
},
{
"label": "Hallucination Rate",
"value": f"{metrics['hallucination_rate']:.1f}%",
"trend": "High" if hallucination_alert else "Normal",
"tone": "bad" if hallucination_alert else "good",
},
{
"label": "KL Divergence",
"value": f"{metrics['statistical_drift']:.2f}",
"trend": "High" if metrics["statistical_drift"] >= 1 else "Low",
"tone": "bad" if metrics["statistical_drift"] >= 1 else "good",
},
{
"label": "Semantic Drift",
"value": f"{metrics['semantic_drift']:.3f}",
"trend": "Critical" if semantic_alert else "Healthy",
"tone": "bad" if semantic_alert else "good",
},
]
def get_incidents() -> dict:
if not STATE["last_event"]:
return {"items": []}
incident = {
**deepcopy(INCIDENTS[0]),
"summary": STATE["last_event"]["message"],
"duration": STATE["root_cause"]["duration"] if STATE["root_cause"] else INCIDENTS[0]["duration"],
"impact_radius": STATE["root_cause"]["impact_radius"] if STATE["root_cause"] else INCIDENTS[0]["impact_radius"],
"confidence": STATE["metrics"]["confidence"],
"risk_level": STATE["root_cause"]["risk_level"] if STATE["root_cause"] else INCIDENTS[0]["risk_level"],
"timeline": deepcopy(STATE["timeline"]),
"evidence": deepcopy(STATE["evidence"]),
}
return {"items": [incident]}
def get_alerts() -> dict:
alerts = deepcopy(ALERTS)
if STATE["last_event"]:
active_alert = {
"severity": "Critical" if STATE["metrics"]["status_level"] == "critical" else "Warning",
"tone": "red" if STATE["metrics"]["status_level"] == "critical" else "amber",
"incident": STATE["metrics"]["system_status"],
"summary": STATE["last_event"]["message"],
"affected_area": STATE["root_cause"]["top_category"],
"status": "Open" if STATE["metrics"]["status_level"] == "critical" else "Review",
"owner": "Evaluation",
"age": "Just now",
}
alerts.insert(0, active_alert)
return {"items": alerts}
def get_category_scores() -> dict:
if not STATE["category_scores"]:
computed = evaluate_dataset(SAMPLE_DATASET)
STATE["category_scores"] = computed["category_scores"]
return {"items": deepcopy(STATE["category_scores"])}
def get_hallucination_logs() -> dict:
if not STATE["hallucination_logs"]:
computed = evaluate_dataset(SAMPLE_DATASET)
STATE["hallucination_logs"] = computed["hallucination_logs"]
return {"items": deepcopy(STATE["hallucination_logs"])}
def get_root_cause() -> dict:
if not STATE["last_event"]:
return {
"summary": {
"primary_cause": "No active incident. Latest production checks are within configured monitoring thresholds.",
"top_category": "None",
"top_category_score": 0,
"impact_radius": "0 Sessions",
"duration": "0m 00s",
"risk_level": "Healthy",
},
"evidence": [],
"timeline": build_timeline(STATE["metrics"]),
"signals": build_signals(STATE["metrics"]),
}
if not STATE["root_cause"]:
computed = evaluate_dataset(SAMPLE_DATASET)
STATE["root_cause"] = computed["root_cause"]
STATE["evidence"] = computed["evidence"]
return {
"summary": deepcopy(STATE["root_cause"]),
"evidence": deepcopy(STATE["evidence"]),
"timeline": deepcopy(STATE["timeline"]),
"signals": deepcopy(STATE["signals"]),
}
def current_report() -> dict:
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"metrics": get_metrics(),
"settings": get_settings(),
"scenario": get_scenario(),
"drift_categories": get_category_scores()["items"],
"hallucination_logs": get_hallucination_logs()["items"],
"root_cause": get_root_cause(),
"quality_timeseries": get_quality_timeseries()["items"],
"range_summary": get_range_summary("30d"),
"evaluation_history": get_evaluation_history()["items"],
"evaluation_comparison": get_evaluation_comparison(),
"incidents": get_incidents()["items"],
"providers": deepcopy(PROVIDERS),
"benchmarks": deepcopy(BENCHMARKS),
"alerts": get_alerts()["items"],
"datasets": get_datasets()["items"],
}
def decision_counts() -> dict:
counts = {key: 0 for key in DECISION_STATUSES}
for run in STATE["evaluation_runs"]:
status = run.get("decision_status", "pending_review")
counts[status if status in counts else "pending_review"] += 1
return {
"items": [
{"status": key, "label": label, "count": counts[key]}
for key, label in DECISION_STATUSES.items()
],
"total": len(STATE["evaluation_runs"]),
}
def review_gate(latest_run: dict | None, counts: dict) -> dict:
if not latest_run:
return {
"label": "Waiting for evaluation",
"tone": "neutral",
"detail": "Run an evaluation to generate review evidence.",
}
if latest_run["decision_status"] == "rollback":
return {
"label": "Rollback required",
"tone": "bad",
"detail": f"{latest_run['id']} is marked for rollback. Keep the release blocked until remediation is complete.",
}
if latest_run["decision_status"] == "rejected":
return {
"label": "Release rejected",
"tone": "bad",
"detail": f"{latest_run['id']} was rejected by operator review.",
}
if latest_run["decision_status"] == "approved":
if latest_run["status_level"] == "critical":
return {
"label": "Approved with critical risk",
"tone": "bad",
"detail": f"{latest_run['id']} is approved, but critical drift is still active. Attach audit evidence and confirm the release exception before handoff.",
}
return {
"label": "Operator approved",
"tone": "good",
"detail": f"{latest_run['id']} is approved. Latest model/prompt snapshot is ready for handoff with audit evidence attached.",
}
pending = next((row["count"] for row in counts["items"] if row["status"] == "pending_review"), 0)
return {
"label": "Review pending",
"tone": "warning",
"detail": f"{pending} run{'s' if pending != 1 else ''} still need an operator decision before handoff.",
}
def operator_review_summary() -> dict:
metrics = get_metrics()
runs = evaluation_run_summaries()
latest_run = runs[0] if runs else None
counts = decision_counts()
comparison = get_evaluation_comparison()
root_cause = get_root_cause()
latest_audit_url = f"/api/reports/audit/{latest_run['id']}" if latest_run else None
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"gate": review_gate(latest_run, counts),
"metrics": metrics,
"latest_run": latest_run,
"decision_counts": counts,
"comparison": comparison,
"root_cause_summary": root_cause["summary"],
"alerts": get_alerts()["items"][:4],
"recent_runs": runs[:8],
"exports": {
"handoff_package": "/api/reports/handoff",
"current_report": "/api/reports/current",
"latest_audit_bundle": latest_audit_url,
"drift_csv": "/api/reports/drift.csv",
"hallucination_csv": "/api/reports/hallucination.csv",
"root_cause_csv": "/api/reports/root-cause.csv",
},
}
def readiness_check(key: str, label: str, status: str, owner: str, detail: str, action: str) -> dict:
return {
"key": key,
"label": label,
"status": status,
"owner": owner,
"detail": detail,
"action": action,
}
def readiness_summary() -> dict:
settings = get_settings()
runs = evaluation_run_summaries()
latest_run = runs[0] if runs else None
evaluator = evaluator_capabilities()
deployment_profile = os.environ.get("SENTINEL_ENV", "local").strip() or "local"
state_backend = configured_state_backend()
storage_path = state_storage_path()
api_key_configured = bool(os.environ.get("SENTINEL_API_KEY") or os.environ.get("SENTINEL_AUTH_TOKEN"))
alert_channels = [
label
for label, enabled in (("Slack", settings["slack_alerts"]), ("Email", settings["email_alerts"]))
if enabled
]
checks = [
readiness_check(
"evaluation_evidence",
"Evaluation Evidence",
"passed" if runs else "blocked",
"Evaluation",
f"{len(runs)} evaluation run{'s' if len(runs) != 1 else ''} are available for release review."
if runs
else "No evaluation run exists yet.",
"Run the baseline or batch evaluation before deployment." if not runs else "Keep evaluation cadence active.",
),
readiness_check(
"version_snapshot",
"Model / Prompt Snapshot",
"passed"
if settings["model_name"] and settings["prompt_version"] and settings["guardrail_policy"]
else "blocked",
"ML Platform",
f"{settings['model_name']} / {settings['prompt_version']} / {settings['guardrail_policy']}",
"Confirm the release model, prompt version, and guardrail policy are current.",
),
readiness_check(
"baseline_dataset",
"Baseline Dataset",
"passed" if STATE["datasets"] else "warning" if SAMPLE_DATASET.exists() else "blocked",
"Evaluation",
f"{len(STATE['datasets'])} saved dataset{'s' if len(STATE['datasets']) != 1 else ''} available."
if STATE["datasets"]
else "Using the bundled sample dataset only.",
"Upload a production representative CSV dataset." if not STATE["datasets"] else "Refresh datasets after major policy changes.",
),
readiness_check(
"evaluator_engine",
"Evaluator Engine",
evaluator["status"],
"Evaluation",
evaluator["message"],
"Install SentenceTransformers or RAGAS and set SENTINEL_EVALUATOR_ENGINE for production-grade scoring."
if evaluator["active_engine"] == "local"
else "Validate external evaluator model versions and calibration thresholds.",
),
readiness_check(
"audit_exports",
"Audit Exports",
"passed" if latest_run else "blocked",
"Compliance",
f"Latest bundle available at /api/reports/audit/{latest_run['id']}." if latest_run else "No run-specific audit bundle is available.",
"Generate an evaluation run before audit handoff." if not latest_run else "Attach current report, run audit, and CSV exports to the release record.",
),
readiness_check(
"alert_routes",
"Alert Routes",
"passed" if alert_channels else "warning",
"Operations",
f"Enabled channels: {', '.join(alert_channels)}." if alert_channels else "No alert channel is enabled in settings.",
"Connect live Slack/email destinations before production exposure.",
),
readiness_check(
"access_control",
"Access Control",
"passed" if api_key_configured else "blocked",
"Security",
"API key environment variable is configured." if api_key_configured else "No SENTINEL_API_KEY or SENTINEL_AUTH_TOKEN is configured.",
"Set an API key and enforce it at the gateway before exposing this service.",
),
readiness_check(
"state_backend",
"State Backend",
"passed" if state_backend == "sqlite" else "warning",
"Platform",
f"State backend: {state_backend} at {storage_path}.",
"Move from local JSON state to SQLite for production-style persistence." if state_backend == "local-json" else "Validate backup and retention policy.",
),
readiness_check(
"deployment_profile",
"Deployment Profile",
"passed" if deployment_profile not in {"local", "demo", "dev"} else "warning",
"Platform",
f"SENTINEL_ENV={deployment_profile}.",
"Set SENTINEL_ENV=staging or production in deployed environments.",
),
]
if latest_run:
decision = latest_run.get("decision_status", "pending_review")
status = "passed"
detail = f"{latest_run['id']} is {latest_run.get('decision_label', 'reviewed')}."
action = "Keep the approval note with the audit bundle."
if decision in {"pending_review", "rollback", "rejected"}:
status = "blocked"
action = "Resolve the operator decision before handoff."
elif latest_run["status_level"] == "critical":
status = "warning"
detail = f"{latest_run['id']} is approved while critical drift is active."
action = "Confirm a release exception or move the run to rollback."
checks.insert(
1,
readiness_check(
"operator_decision",
"Operator Decision",
status,
"Release Manager",
detail,
action,
),
)
else:
checks.insert(
1,
readiness_check(
"operator_decision",
"Operator Decision",
"blocked",
"Release Manager",
"No latest run exists for operator approval.",
"Run an evaluation and record approve, reject, or rollback.",
),
)
counts = {
"passed": sum(1 for check in checks if check["status"] == "passed"),
"warning": sum(1 for check in checks if check["status"] == "warning"),
"blocked": sum(1 for check in checks if check["status"] == "blocked"),
"total_checks": len(checks),
}
score = round(((counts["passed"] + counts["warning"] * 0.5) / counts["total_checks"]) * 100)
if counts["blocked"]:
status = {
"label": "Needs production input",
"tone": "bad",
"detail": f"{counts['blocked']} blocking item{'s' if counts['blocked'] != 1 else ''} must be resolved before production release.",
}
elif counts["warning"]:
status = {
"label": "Ready for controlled pilot",
"tone": "warning",
"detail": f"{counts['warning']} warning item{'s' if counts['warning'] != 1 else ''} should be accepted or resolved before broad rollout.",
}
else:
status = {
"label": "Production ready",
"tone": "good",
"detail": "All readiness checks passed.",
}
status.update(counts)
status["score"] = score
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"status": status,
"checks": checks,
"required_inputs": [
{
"key": check["key"],
"label": check["label"],
"status": check["status"],
"owner": check["owner"],
"action": check["action"],
}
for check in checks
if check["status"] != "passed"
],
"environment": {
"deployment_profile": deployment_profile,
"state_backend": state_backend,
"api_key_configured": api_key_configured,
"alert_channels": alert_channels,
"state_file": str(storage_path),
"evaluator_engine": evaluator["active_engine"],
"evaluator_mode": evaluator["mode"],
},
"evaluator": evaluator,
"latest_run": latest_run,
"exports": {
"handoff_package": "/api/reports/handoff",
"readiness": "/api/operations/readiness",
"operator_review": "/api/reports/operator-review",
"current_report": "/api/reports/current",
},
}
def production_handoff_actions(readiness: dict) -> list[dict]:
checks = {check["key"]: check for check in readiness["checks"]}
environment = readiness["environment"]
evaluator = readiness["evaluator"]
semantic_available = any(
item["key"] == "sentence_transformers" and item["available"]
for item in evaluator.get("integrations", [])
)
evaluator_command = (
"SENTINEL_EVALUATOR_ENGINE=sentence_transformers"
if semantic_available
else "SENTINEL_EVALUATOR_ENGINE=ragas"
)
rows = [
(
"access_control",
"Set API protection",
"Security",
"SENTINEL_API_KEY=<release-key>",
"Required before exposing the API outside a trusted local demo.",
),
(
"state_backend",
"Use durable state",
"Platform",
"SENTINEL_STATE_BACKEND=sqlite",
f"Current backend is {environment['state_backend']}.",
),
(
"deployment_profile",
"Set deployment profile",
"Platform",
"SENTINEL_ENV=staging",
f"Current profile is {environment['deployment_profile']}.",
),
(
"evaluator_engine",
"Configure evaluator engine",
"Evaluation",
evaluator_command,
evaluator["message"],
),
(
"baseline_dataset",
"Upload production dataset",
"Evaluation",
"POST /api/datasets",
"Use representative prompt, response, expected_answer, and context columns.",
),
(
"operator_decision",
"Record release decision",
"Release Manager",
"POST /api/evaluations/{run_id}/decision",
"Approve, reject, or require rollback for the latest release candidate.",
),
]
actions = []
for key, label, owner, command, detail in rows:
check = checks.get(key, {"status": "warning", "action": "Review before handoff."})
actions.append(
{
"key": key,
"label": label,
"owner": owner,
"status": check["status"],
"command": command,
"detail": detail,
"next_action": check["action"],
"requires_user_input": check["status"] != "passed",
}
)
return actions
def handoff_package() -> dict:
readiness = readiness_summary()
review = operator_review_summary()
report = current_report()
latest_run = readiness.get("latest_run")
latest_audit = audit_bundle(latest_run["id"]) if latest_run else None
return {
"bundle_type": "llm-sentinel-release-handoff",
"generated_at": datetime.now(timezone.utc).isoformat(),
"release_status": readiness["status"],
"release_gate": review["gate"],
"latest_run": latest_run,
"production_actions": production_handoff_actions(readiness),
"readiness": readiness,
"operator_review": review,
"latest_audit_bundle": latest_audit,
"current_report": report,
"exports": {
"handoff_package": "/api/reports/handoff",
"readiness": "/api/operations/readiness",
"operator_review": "/api/reports/operator-review",
"current_report": "/api/reports/current",
"latest_audit_bundle": f"/api/reports/audit/{latest_run['id']}" if latest_run else None,
"drift_csv": "/api/reports/drift.csv",
"hallucination_csv": "/api/reports/hallucination.csv",
"root_cause_csv": "/api/reports/root-cause.csv",
},
}
def audit_bundle(run_id: str) -> dict:
run = find_run(run_id)
summary = run_summary(run)
dataset = None
dataset_note = "No saved dataset is linked to this evaluation run."
if summary.get("dataset_id"):
try:
dataset = get_dataset(summary["dataset_id"])
dataset_note = "Saved dataset snapshot is included for the linked evaluation run."
except ValueError:
dataset_note = "The linked dataset is no longer available in local state."
settings = get_settings()
return {
"bundle_type": "llm-sentinel-run-audit",
"generated_at": datetime.now(timezone.utc).isoformat(),
"run": summary,
"version_snapshot": {
"model_name": summary["model_name"],
"prompt_version": summary["prompt_version"],
"guardrail_policy": summary["guardrail_policy"],
},
"thresholds": {
"scope": "current_settings",
"semantic_drift_threshold": settings["semantic_drift_threshold"],
"hallucination_rate_threshold": settings["hallucination_rate_threshold"],
},
"dataset": dataset,
"dataset_note": dataset_note,
"metrics": get_metrics(),
"comparison": get_evaluation_comparison(),
"current_root_cause_context": get_root_cause(),
"current_scoring_logs": get_hallucination_logs()["items"],
"alerts": get_alerts()["items"],
"scope_note": (
"Run summary and model/prompt/policy values are snapshotted on the evaluation run. "
"Detailed evidence, scoring logs, alerts, and thresholds reflect the current local demo state."
),
"integrity": {
"state_version": STATE_VERSION,
"source": "local-demo-state",
},
}
def evaluation_run_detail(run_id: str) -> dict:
bundle = audit_bundle(run_id)
runs = STATE["evaluation_runs"]
run_index = next(index for index, run in enumerate(runs) if run["id"] == run_id)
previous_run = run_summary(runs[run_index - 1]) if run_index > 0 else None
next_run = run_summary(runs[run_index + 1]) if run_index < len(runs) - 1 else None
run = bundle["run"]
root_context = bundle["current_root_cause_context"]
return {
"generated_at": bundle["generated_at"],
"run": run,
"version_snapshot": bundle["version_snapshot"],
"thresholds": bundle["thresholds"],
"dataset": bundle["dataset"],
"dataset_note": bundle["dataset_note"],
"previous_run": previous_run,
"next_run": next_run,
"metric_cards": [
{"label": "Hallucination", "value": run["hallucination_rate"], "unit": "%", "precision": 1},
{"label": "Semantic Drift", "value": run["semantic_drift"], "unit": "", "precision": 3},
{"label": "KL Divergence", "value": run["statistical_drift"], "unit": "", "precision": 3},
{"label": "Latency", "value": run["latency_ms"], "unit": "ms", "precision": 0},
{"label": "Confidence", "value": run["confidence"], "unit": "%", "precision": 0},
],
"current_context": {
"root_cause_summary": root_context["summary"],
"evidence": root_context["evidence"][:4],
"scoring_logs": bundle["current_scoring_logs"][:5],
"alerts": bundle["alerts"][:3],
"scope_note": bundle["scope_note"],
},
}
def rows_to_csv(rows: list[dict], fieldnames: list[str]) -> str:
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
for row in rows:
writer.writerow(row)
return buffer.getvalue()
def export_hallucination_csv() -> str:
return rows_to_csv(
get_hallucination_logs()["items"],
[
"id",
"category",
"prompt",
"claims",
"score",
"risk",
"semantic_similarity",
"groundedness",
"answer_relevance",
"evaluation_reason",
"status",
"expected_answer",
"current_answer",
],
)
def export_drift_csv() -> str:
return rows_to_csv(
get_category_scores()["items"],
["category", "sample_count", "avg_score", "status"],
)
def export_root_cause_csv() -> str:
root = get_root_cause()
summary_rows = [
{"section": "summary", "key": key, "value": value}
for key, value in root["summary"].items()
]
evidence_rows = [
{
"section": "evidence",
"key": f"{row['timestamp']} {row['trace_id']}",
"value": f"{row['signal_type']}: {row['details']}",
}
for row in root["evidence"]
]
return rows_to_csv(summary_rows + evidence_rows, ["section", "key", "value"])
def reset_state() -> dict:
STATE["metrics"] = deepcopy(INITIAL_METRICS)
STATE["settings"] = deepcopy(DEFAULT_SETTINGS)
STATE["evaluation_count"] = 2450
STATE["last_event"] = None
STATE["category_scores"] = []
STATE["hallucination_logs"] = []
STATE["evidence"] = []
STATE["root_cause"] = None
STATE["timeline"] = []
STATE["signals"] = []
STATE["quality_timeseries"] = deepcopy(INITIAL_TIMESERIES)
STATE["evaluation_runs"] = []
STATE["datasets"] = []
save_state()
return {"message": "Demo state reset.", "metrics": get_metrics(), "settings": get_settings()}
def metrics_from_computed(computed: dict) -> dict:
return {
**deepcopy(CRITICAL_METRICS),
"semantic_drift": computed["semantic_drift"],
"hallucination_rate": computed["hallucination_rate"],
"statistical_drift": computed["statistical_drift"],
"latency_ms": computed["latency_ms"],
"confidence": computed["confidence"],
}
def finalize_evaluation(
computed: dict,
healthy_message: str,
alert_message: str,
source: str,
dataset_id: str | None = None,
dataset_name: str | None = None,
sample_count: int | None = None,
) -> dict:
evaluated_metrics = metrics_from_computed(computed)
STATE["metrics"] = classify_metrics(evaluated_metrics)
STATE["metrics"]["updated_at"] = datetime.now(timezone.utc).isoformat()
STATE["evaluation_count"] += 1
append_quality_point(STATE["metrics"])
STATE["category_scores"] = computed["category_scores"]
STATE["hallucination_logs"] = computed["hallucination_logs"]
STATE["evidence"] = computed["evidence"]
if STATE["metrics"]["status_level"] == "healthy":
STATE["root_cause"] = {
"primary_cause": (
"Evaluation completed under the configured thresholds. Observed semantic drift "
f"was {STATE['metrics']['semantic_drift']:.3f} and hallucination rate was "
f"{STATE['metrics']['hallucination_rate']:.1f}%, so no active incident was opened."
),
"top_category": computed["root_cause"]["top_category"],
"top_category_score": computed["root_cause"]["top_category_score"],
"impact_radius": "0 Sessions",
"duration": "0m 00s",
"risk_level": "Healthy",
}
else:
STATE["root_cause"] = {
**computed["root_cause"],
"risk_level": "Critical" if STATE["metrics"]["status_level"] == "critical" else "Warning",
}
STATE["timeline"] = build_timeline(STATE["metrics"])
STATE["signals"] = build_signals(STATE["metrics"])
if STATE["metrics"]["status_level"] == "healthy":
STATE["last_event"] = None
message = healthy_message
else:
STATE["last_event"] = {
"id": "INC-9421-RCA",
"message": alert_message,
"created_at": STATE["metrics"]["updated_at"],
}
message = STATE["last_event"]["message"]
run = append_evaluation_run(message, source, dataset_id, dataset_name, sample_count)
save_state()
return {
"message": message,
"run": deepcopy(run),
"metrics": get_metrics(),
"category_scores": deepcopy(STATE["category_scores"]),
"hallucination_logs": deepcopy(STATE["hallucination_logs"]),
"root_cause": deepcopy(STATE["root_cause"]),
"evidence": deepcopy(STATE["evidence"]),
"timeline": deepcopy(STATE["timeline"]),
"signals": deepcopy(STATE["signals"]),
"settings": get_settings(),
"incident": deepcopy(INCIDENTS[0]),
}
def run_evaluation() -> dict:
computed = evaluate_dataset(SAMPLE_DATASET)
evaluated = classify_metrics(metrics_from_computed(computed))
return finalize_evaluation(
computed,
"Evaluation complete. No thresholds were crossed.",
f"{evaluated['system_status']} detected. Root cause report generated.",
"Dataset Replay",
)
def clean_payload_text(payload: dict, key: str, default: str = "") -> str:
value = payload.get(key, default)
return str(value).strip()
# Maps user-supplied category labels to canonical evaluator categories
_CATEGORY_ALIASES = {
"landing_sandbox": "customer_support",
"landing": "customer_support",
"support": "customer_support",
"account_access": "customer_support",
"account": "customer_support",
"billing": "customer_support",
"medical": "healthcare",
"health": "healthcare",
"law": "legal",
"coding": "code_generation",
"code": "code_generation",
"writing": "technical_writing",
"investment": "finance",
"money": "finance",
}
def normalize_category(value: str) -> str:
safe_value = "".join(char.lower() if char.isalnum() else " " for char in value)
parts = safe_value.split()
normalized = "_".join(part for part in parts if part)
if not normalized:
return "custom"
return _CATEGORY_ALIASES.get(normalized, normalized)
def first_payload_text(payload: dict, keys: list[str], default: str = "") -> str:
for key in keys:
value = clean_payload_text(payload, key)
if value:
return value
return default
def sample_from_payload(payload: dict, sample_id: int) -> dict:
prompt = first_payload_text(payload, ["prompt", "question", "input"])
response = first_payload_text(payload, ["response", "current_answer", "model_response", "output"])
if not prompt or not response:
raise ValueError("Prompt and model response are required.")
category = normalize_category(first_payload_text(payload, ["category", "domain"], "custom"))
expected = first_payload_text(payload, ["expected_answer", "reference_answer", "expected", "ground_truth"])
context = first_payload_text(payload, ["context", "source_context", "source"])
if not expected:
expected = "The assistant should answer only with supported information from the provided context."
if not context:
context = expected
return {
"id": str(sample_id),
"category": category,
"question": prompt,
"context": context,
"baseline_answer": first_payload_text(payload, ["baseline_answer", "baseline", "reference_answer"], expected),
"expected_answer": expected,
"current_answer": response,
}
def display_category(value: str) -> str:
return value.replace("_", " ").title()
def stored_row_from_payload(payload: dict, sample_id: int) -> dict:
sample = sample_from_payload(payload, sample_id)
return {
"category": sample["category"],
"prompt": sample["question"],
"response": sample["current_answer"],
"expected_answer": sample["expected_answer"],
"context": sample["context"],
"baseline_answer": sample["baseline_answer"],
}
def normalize_batch_rows(rows: list) -> list[dict]:
normalized = []
errors = []
start_id = STATE["evaluation_count"] - 2449
for index, row in enumerate(rows, start=1):
if not isinstance(row, dict):
errors.append(f"Row {index}: expected an object.")
continue
try:
normalized.append(stored_row_from_payload(row, start_id + index - 1))
except ValueError as exc:
errors.append(f"Row {index}: {exc}")
if errors:
raise ValueError(" ".join(errors[:3]))
if not normalized:
raise ValueError("At least one batch row is required.")
return normalized
def samples_from_rows(rows: list[dict]) -> list[dict]:
start_id = STATE["evaluation_count"] - 2449
return [sample_from_payload(row, start_id + index - 1) for index, row in enumerate(rows, start=1)]
def next_dataset_id() -> str:
max_id = 0
for dataset in STATE["datasets"]:
raw_id = str(dataset.get("id", ""))
if raw_id.startswith("DATASET-"):
try:
max_id = max(max_id, int(raw_id.split("-", 1)[1]))
except ValueError:
continue
return f"DATASET-{max_id + 1:03d}"
def dataset_summary(dataset: dict) -> dict:
rows = dataset.get("rows", [])
categories = sorted({display_category(row.get("category", "custom")) for row in rows})
return {
"id": dataset["id"],
"name": dataset["name"],
"row_count": len(rows),
"created_at": dataset["created_at"],
"updated_at": dataset.get("updated_at", dataset["created_at"]),
"last_run_at": dataset.get("last_run_at"),
"last_status": dataset.get("last_status"),
"last_hallucination_rate": dataset.get("last_hallucination_rate"),
"last_semantic_drift": dataset.get("last_semantic_drift"),
"last_run_id": dataset.get("last_run_id"),
"categories": categories,
}
def find_dataset(dataset_id: str) -> dict:
for dataset in STATE["datasets"]:
if dataset["id"] == dataset_id:
return dataset
raise ValueError("Dataset not found.")
def get_datasets() -> dict:
items = [dataset_summary(dataset) for dataset in STATE["datasets"]]
return {"items": list(reversed(items))}
def create_dataset(payload: dict) -> dict:
rows = payload.get("rows", [])
if not isinstance(rows, list) or not rows:
raise ValueError("At least one dataset row is required.")
normalized_rows = normalize_batch_rows(rows)
name = clean_payload_text(payload, "name") or f"Evaluation Dataset {len(STATE['datasets']) + 1}"
now = datetime.now(timezone.utc).isoformat()
dataset = {
"id": next_dataset_id(),
"name": name,
"created_at": now,
"updated_at": now,
"rows": normalized_rows,
}
STATE["datasets"].append(dataset)
STATE["datasets"] = STATE["datasets"][-20:]
save_state()
return dataset_summary(dataset)
def get_dataset(dataset_id: str) -> dict:
dataset = find_dataset(dataset_id)
return {**dataset_summary(dataset), "rows": deepcopy(dataset.get("rows", []))}
def run_custom_evaluation(payload: dict) -> dict:
sample_number = STATE["evaluation_count"] - 2449
sample = sample_from_payload(payload, sample_number)
computed = evaluate_samples([sample])
evaluated = classify_metrics(metrics_from_computed(computed))
result = finalize_evaluation(
computed,
"Custom response scored under the configured thresholds.",
f"{evaluated['system_status']} detected in custom response. Root cause report generated.",
"Custom Response",
sample_count=1,
)
# Propagate per-sample signals to the top-level response so the frontend
# can render policy_coverage, contradiction, policy_flags, and new dashboard fields without
# digging into hallucination_logs.
first_log = (result.get("hallucination_logs") or [None])[0]
if first_log:
result.setdefault("policy_coverage", first_log.get("policy_coverage", 0))
result.setdefault("contradiction_detected", first_log.get("contradiction_detected", False))
result.setdefault("policy_flags", first_log.get("policy_flags", []))
result.setdefault("severity", first_log.get("severity", "LOW"))
result.setdefault("hard_override", first_log.get("hard_override", False))
result.setdefault("detected_violations", first_log.get("detected_violations", []))
result.setdefault("policy_components", first_log.get("policy_components", {}))
result.setdefault("confidence", first_log.get("confidence", 85))
else:
result.setdefault("policy_coverage", 0)
result.setdefault("contradiction_detected", False)
result.setdefault("policy_flags", [])
result.setdefault("severity", "LOW")
result.setdefault("hard_override", False)
result.setdefault("detected_violations", [])
result.setdefault("policy_components", {})
result.setdefault("confidence", 85)
return result
def run_batch_evaluation(payload: dict) -> dict:
rows = payload.get("rows", [])
if not isinstance(rows, list) or not rows:
raise ValueError("At least one batch row is required.")
normalized_rows = normalize_batch_rows(rows)
samples = samples_from_rows(normalized_rows)
computed = evaluate_samples(samples)
evaluated = classify_metrics(metrics_from_computed(computed))
count = len(samples)
result = finalize_evaluation(
computed,
f"Batch evaluation completed for {count} responses under the configured thresholds.",
f"{evaluated['system_status']} detected across {count} uploaded responses. Root cause report generated.",
"Batch Upload",
sample_count=count,
)
result["batch"] = {"sample_count": count}
return result
def run_dataset_evaluation(dataset_id: str) -> dict:
dataset = find_dataset(dataset_id)
rows = dataset.get("rows", [])
if not rows:
raise ValueError("Dataset has no rows to evaluate.")
samples = samples_from_rows(rows)
computed = evaluate_samples(samples)
evaluated = classify_metrics(metrics_from_computed(computed))
count = len(samples)
result = finalize_evaluation(
computed,
f"Dataset '{dataset['name']}' completed for {count} responses under the configured thresholds.",
f"{evaluated['system_status']} detected in dataset '{dataset['name']}'. Root cause report generated.",
"Saved Dataset",
dataset_id=dataset["id"],
dataset_name=dataset["name"],
sample_count=count,
)
dataset["updated_at"] = datetime.now(timezone.utc).isoformat()
dataset["last_run_at"] = result["run"]["created_at"]
dataset["last_run_id"] = result["run"]["id"]
dataset["last_status"] = result["run"]["status"]
dataset["last_hallucination_rate"] = result["run"]["hallucination_rate"]
dataset["last_semantic_drift"] = result["run"]["semantic_drift"]
save_state()
result["dataset"] = dataset_summary(dataset)
result["batch"] = {"sample_count": count}
return result
def generate_support_answer(
payload: dict,
gemini_key: str | None = None,
openai_key: str | None = None,
) -> dict:
prompt = clean_payload_text(payload, "prompt")
category = clean_payload_text(payload, "category", "Customer Support")
context = clean_payload_text(payload, "context")
expected = clean_payload_text(payload, "expected_answer")
provider = clean_payload_text(payload, "provider", "fallback").lower()
if not prompt:
raise ValueError("Prompt is required.")
gemini_key = gemini_key or os.environ.get("GEMINI_API_KEY")
openai_key = openai_key or os.environ.get("OPENAI_API_KEY")
system_instruction = (
"You are an expert customer support assistant. Your response must be safe, professional, "
"accurate, and fully grounded in the provided source context. Do not make any unsupported claims "
"or promise anything not explicitly allowed in the policy context."
)
full_prompt = (
f"Category: {category}\n"
f"Source Context: {context}\n"
f"Expected Guidelines: {expected}\n"
f"Customer Support Ticket: {prompt}\n\n"
"Generate a suitable, professional response for the customer support ticket based strictly on the instructions above."
)
if provider == "gemini" and gemini_key:
try:
import google.generativeai as genai
genai.configure(api_key=gemini_key)
model = genai.GenerativeModel(
model_name="gemini-1.5-flash",
system_instruction=system_instruction
)
response = model.generate_content(full_prompt)
if response and response.text:
return {
"answer": response.text.strip(),
"provider": "gemini",
"mode": "real"
}
else:
raise ValueError("Empty response received from Gemini.")
except Exception as exc:
raise ValueError(f"Gemini generation error: {exc}")
elif provider == "openai" and openai_key:
try:
from openai import OpenAI
client = OpenAI(api_key=openai_key)
completion = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_instruction},
{"role": "user", "content": full_prompt}
]
)
answer = completion.choices[0].message.content
if answer:
return {
"answer": answer.strip(),
"provider": "openai",
"mode": "real"
}
else:
raise ValueError("Empty response received from OpenAI.")
except Exception as exc:
raise ValueError(f"OpenAI generation error: {exc}")
# Fallback deterministic rules
topic = f"{category} {prompt}".lower()
answer = "Answer with approved support steps. Do not collect private secrets or payment details. Escalate unresolved high-risk issues."
if any(k in topic for k in ["account", "login", "access", "reset"]):
answer = "Acknowledge urgency. Send the official password reset account recovery path. Recommend MFA after recovery. Do not request passwords or payment details."
elif any(k in topic for k in ["refund", "billing", "payment", "charge"]):
answer = "Acknowledge the billing issue. Review refund status using the ticket or order reference. Do not request full card numbers, CVV codes, or payment secrets."
elif any(k in topic for k in ["technical", "error", "bug"]):
answer = "Acknowledge the technical issue. Request the error message, device, browser, and failing step. Escalate to technical support if troubleshooting does not resolve it."
return {
"answer": answer,
"provider": "fallback",
"mode": "deterministic"
}