glokta-lite / data.py
JakeBx
feat: lite dashboard
e134b33
Raw
History Blame Contribute Delete
12.5 kB
"""
garkboard-lite data layer.
Loads models, runs, and probe_results from a HuggingFace Dataset into
in-memory pandas DataFrames. All query functions operate on these DataFrames
directly — no HTTP API, no database.
Set HF_DATASET_REPO (default: Jake/glokta-public) and optionally HF_TOKEN
for private repos.
"""
import os
from datetime import datetime
import pandas as pd
from risks import ACTIVE_RISKS, compute_risk_pass_rates
HF_DATASET_REPO: str = os.environ.get("HF_DATASET_REPO", "Jake/glokta-public")
HF_TOKEN: str | None = os.environ.get("HF_TOKEN") or None
# Module-level cache — populated by load_data()
_models: pd.DataFrame = pd.DataFrame()
_runs: pd.DataFrame = pd.DataFrame()
_probe_results: pd.DataFrame = pd.DataFrame()
def load_data() -> None:
"""Load all three tables from the HF Dataset into memory.
Safe to call multiple times (re-loads on each call).
Raises RuntimeError if the dataset cannot be reached.
"""
global _models, _runs, _probe_results
try:
from datasets import load_dataset
models_ds = load_dataset(HF_DATASET_REPO, name="models", token=HF_TOKEN)
runs_ds = load_dataset(HF_DATASET_REPO, name="runs", token=HF_TOKEN)
probe_results_ds = load_dataset(HF_DATASET_REPO, name="probe_results", token=HF_TOKEN)
except Exception as e:
raise RuntimeError(f"Failed to load HF dataset '{HF_DATASET_REPO}': {e}") from e
_models = models_ds["train"].to_pandas()
_runs = runs_ds["train"].to_pandas()
_probe_results = probe_results_ds["train"].to_pandas()
# Normalise types
for col in ("created_at", "started_at", "completed_at", "scanned_at"):
if col in _runs.columns:
_runs[col] = pd.to_datetime(_runs[col], utc=True, errors="coerce")
if "created_at" in _models.columns:
_models["created_at"] = pd.to_datetime(_models["created_at"], utc=True, errors="coerce")
print(
f"[data] Loaded {len(_models)} models, {len(_runs)} runs, "
f"{len(_probe_results)} probe_results from {HF_DATASET_REPO}"
)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _latest_run_ids() -> dict[str, str]:
"""Return {model_id: run_id} for each model's most recent complete run."""
complete = _runs[_runs["status"] == "complete"].copy()
if complete.empty:
return {}
idx = complete.groupby("model_id")["completed_at"].idxmax()
latest = complete.loc[idx, ["model_id", "id"]]
return dict(zip(latest["model_id"], latest["id"]))
def _model_name(model_id: str) -> str:
rows = _models[_models["id"] == model_id]
return rows.iloc[0]["name"] if not rows.empty else model_id
# ---------------------------------------------------------------------------
# Public query functions
# ---------------------------------------------------------------------------
def get_models() -> list[dict]:
"""Return all models as a list of dicts."""
if _models.empty:
return []
return _models[["id", "name", "provider"]].to_dict(orient="records")
def get_probe_categories() -> list[str]:
"""Return all unique probe categories present in probe_results."""
if _probe_results.empty:
return []
return sorted(_probe_results["probe_category"].dropna().unique().tolist())
def get_leaderboard(
probe_category: str | None = None,
model_id: str | None = None,
) -> list[dict]:
"""Aggregate probe results from each model's latest complete run.
Mirrors GET /api/leaderboard response shape:
model_id, model_name, provider, probe_category,
total_pass, total_fail, score, pass_rate, origin
"""
if _probe_results.empty or _runs.empty or _models.empty:
return []
latest = _latest_run_ids() # {model_id: run_id}
if not latest:
return []
latest_run_ids = set(latest.values())
pr = _probe_results[_probe_results["run_id"].isin(latest_run_ids)].copy()
if probe_category:
pr = pr[pr["probe_category"] == probe_category]
if model_id:
run_id = latest.get(model_id)
if not run_id:
return []
pr = pr[pr["run_id"] == run_id]
if pr.empty:
return []
# Join run.model_id back onto probe_results via run_id.
# Exclude triggered_by from the join/groupby — it can be null and would silently
# drop rows from the groupby (pandas default dropna=True behaviour).
run_map = _runs[_runs["id"].isin(latest_run_ids)][["id", "model_id"]].copy()
run_map = run_map.rename(columns={"id": "run_id"})
pr = pr.merge(run_map, on="run_id", how="left")
model_map = _models[["id", "name", "provider"]].rename(columns={"id": "model_id"})
pr = pr.merge(model_map, on="model_id", how="left")
grouped = (
pr.groupby(["model_id", "name", "provider", "probe_category"])
.agg(total_pass=("pass_count", "sum"), total_fail=("fail_count", "sum"), score=("score", "mean"))
.reset_index()
)
grouped["pass_rate"] = grouped.apply(
lambda r: r["total_pass"] / (r["total_pass"] + r["total_fail"])
if (r["total_pass"] + r["total_fail"]) > 0 else 0.0,
axis=1,
)
grouped = grouped.sort_values("score")
# Look up triggered_by per model from its latest run separately to avoid groupby null drop.
model_origin = {}
for mid, rid in latest.items():
row = _runs[_runs["id"] == rid]["triggered_by"]
model_origin[mid] = row.iloc[0] if not row.empty and pd.notna(row.iloc[0]) else "api"
return [
{
"model_id": row["model_id"],
"model_name": row["name"],
"provider": row["provider"],
"probe_category": row["probe_category"],
"total_pass": int(row["total_pass"]),
"total_fail": int(row["total_fail"]),
"score": float(row["score"]) if pd.notna(row["score"]) else 0.0,
"pass_rate": float(row["pass_rate"]),
"origin": model_origin.get(row["model_id"], "api"),
}
for _, row in grouped.iterrows()
]
def get_model_detail(model_id: str) -> dict | None:
"""Return probe results for a model's latest complete run."""
latest = _latest_run_ids()
run_id = latest.get(model_id)
if not run_id:
return None
pr = _probe_results[_probe_results["run_id"] == run_id]
if pr.empty:
return None
model_row = _models[_models["id"] == model_id]
model_name = model_row.iloc[0]["name"] if not model_row.empty else model_id
provider = model_row.iloc[0]["provider"] if not model_row.empty else ""
probe_results = [
{
"probe_name": r["probe_name"],
"probe_category": r["probe_category"],
"detector": r["detector"],
"pass_count": int(r["pass_count"]),
"fail_count": int(r["fail_count"]),
"score": float(r["score"]) if pd.notna(r.get("score")) else None,
}
for _, r in pr.iterrows()
]
return {
"model_id": model_id,
"model_name": model_name,
"provider": provider,
"run_id": run_id,
"probe_results": probe_results,
}
def get_risk_leaderboard(included_risks: list[str]) -> list[dict]:
"""Risk-based leaderboard: mean of per-risk pass rates across included categories."""
if _probe_results.empty or _models.empty:
return []
latest = _latest_run_ids()
if not latest:
return []
results = []
for model_id, run_id in latest.items():
pr = _probe_results[_probe_results["run_id"] == run_id]
pr_dicts = [
{"probe_category": r["probe_category"],
"pass_count": int(r["pass_count"]),
"fail_count": int(r["fail_count"])}
for _, r in pr.iterrows()
]
per_risk, overall = compute_risk_pass_rates(pr_dicts, included_risks=included_risks)
model_row = _models[_models["id"] == model_id]
results.append({
"model_id": model_id,
"model_name": model_row.iloc[0]["name"] if not model_row.empty else model_id,
"provider": model_row.iloc[0]["provider"] if not model_row.empty else "",
"overall_pass_rate": overall,
"per_risk": per_risk,
})
results.sort(key=lambda r: r["overall_pass_rate"] if r["overall_pass_rate"] is not None else -1, reverse=True)
return results
def get_trends(model_id: str, included_risks: list[str]) -> dict | None:
"""All completed scan results for a model, ordered by date."""
model_row = _models[_models["id"] == model_id]
if model_row.empty:
return None
model_name = model_row.iloc[0]["name"]
runs = _runs[(_runs["model_id"] == model_id) & (_runs["status"] == "complete")].copy()
runs = runs.sort_values("completed_at")
points = []
for _, run in runs.iterrows():
pr = _probe_results[_probe_results["run_id"] == run["id"]]
pr_dicts = [
{"probe_category": r["probe_category"],
"pass_count": int(r["pass_count"]),
"fail_count": int(r["fail_count"])}
for _, r in pr.iterrows()
]
per_risk, overall = compute_risk_pass_rates(pr_dicts, included_risks=included_risks)
completed_at = run["completed_at"]
if hasattr(completed_at, "isoformat"):
completed_at = completed_at.isoformat()
points.append({
"run_id": run["id"],
"completed_at": completed_at,
"per_risk": per_risk,
"overall_pass_rate": overall,
})
return {"model_id": model_id, "model_name": model_name, "points": points}
def get_run_summary() -> list[dict]:
"""Per-model run status counts."""
if _runs.empty or _models.empty:
return []
model_map = dict(zip(_models["id"], _models["name"]))
provider_map = dict(zip(_models["id"], _models["provider"]))
summary = (
_runs.groupby(["model_id", "status"])
.size()
.unstack(fill_value=0)
.reset_index()
)
for col in ("complete", "running", "pending", "failed"):
if col not in summary.columns:
summary[col] = 0
# Latest origin per model
latest_origin = (
_runs[_runs["status"] == "complete"]
.sort_values("completed_at")
.groupby("model_id")["triggered_by"]
.last()
)
rows = []
for _, row in summary.iterrows():
mid = row["model_id"]
rows.append({
"model_name": model_map.get(mid, mid),
"provider": provider_map.get(mid, ""),
"complete": int(row.get("complete", 0)),
"running": int(row.get("running", 0)),
"pending": int(row.get("pending", 0)),
"failed": int(row.get("failed", 0)),
"latest_origin": latest_origin.get(mid, ""),
})
return rows
def get_runs(status: str | None = None) -> list[dict]:
"""Recent runs (up to 200), optionally filtered by status."""
if _runs.empty or _models.empty:
return []
model_map = dict(zip(_models["id"], _models["name"]))
runs = _runs.copy()
if status and status != "All":
runs = runs[runs["status"] == status]
runs = runs.sort_values("created_at", ascending=False).head(200)
rows = []
for _, r in runs.iterrows():
rows.append({
"id": r["id"],
"model_id": r["model_id"],
"model_name": model_map.get(r["model_id"], r["model_id"]),
"status": r["status"],
"garak_version": r.get("garak_version") or "",
"created_at": r["created_at"].isoformat() if pd.notna(r["created_at"]) else "",
"completed_at": r["completed_at"].isoformat() if pd.notna(r.get("completed_at")) else "",
})
return rows
def get_run_probe_results(run_id: str) -> list[dict]:
"""All probe results for a specific run."""
pr = _probe_results[_probe_results["run_id"] == run_id]
return [
{
"probe_name": r["probe_name"],
"probe_category": r["probe_category"],
"detector": r["detector"],
"pass_count": int(r["pass_count"]),
"fail_count": int(r["fail_count"]),
"score": float(r["score"]) if pd.notna(r.get("score")) else None,
}
for _, r in pr.iterrows()
]