""" garkboard-lite data layer. Loads models, runs, and probe_results from a HuggingFace Dataset into in-memory pandas DataFrames. All query functions operate on these DataFrames directly — no HTTP API, no database. Set HF_DATASET_REPO (default: Jake/glokta-public) and optionally HF_TOKEN for private repos. """ import os from datetime import datetime import pandas as pd from risks import ACTIVE_RISKS, compute_risk_pass_rates HF_DATASET_REPO: str = os.environ.get("HF_DATASET_REPO", "Jake/glokta-public") HF_TOKEN: str | None = os.environ.get("HF_TOKEN") or None # Module-level cache — populated by load_data() _models: pd.DataFrame = pd.DataFrame() _runs: pd.DataFrame = pd.DataFrame() _probe_results: pd.DataFrame = pd.DataFrame() def load_data() -> None: """Load all three tables from the HF Dataset into memory. Safe to call multiple times (re-loads on each call). Raises RuntimeError if the dataset cannot be reached. """ global _models, _runs, _probe_results try: from datasets import load_dataset models_ds = load_dataset(HF_DATASET_REPO, name="models", token=HF_TOKEN) runs_ds = load_dataset(HF_DATASET_REPO, name="runs", token=HF_TOKEN) probe_results_ds = load_dataset(HF_DATASET_REPO, name="probe_results", token=HF_TOKEN) except Exception as e: raise RuntimeError(f"Failed to load HF dataset '{HF_DATASET_REPO}': {e}") from e _models = models_ds["train"].to_pandas() _runs = runs_ds["train"].to_pandas() _probe_results = probe_results_ds["train"].to_pandas() # Normalise types for col in ("created_at", "started_at", "completed_at", "scanned_at"): if col in _runs.columns: _runs[col] = pd.to_datetime(_runs[col], utc=True, errors="coerce") if "created_at" in _models.columns: _models["created_at"] = pd.to_datetime(_models["created_at"], utc=True, errors="coerce") print( f"[data] Loaded {len(_models)} models, {len(_runs)} runs, " f"{len(_probe_results)} probe_results from {HF_DATASET_REPO}" ) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _latest_run_ids() -> dict[str, str]: """Return {model_id: run_id} for each model's most recent complete run.""" complete = _runs[_runs["status"] == "complete"].copy() if complete.empty: return {} idx = complete.groupby("model_id")["completed_at"].idxmax() latest = complete.loc[idx, ["model_id", "id"]] return dict(zip(latest["model_id"], latest["id"])) def _model_name(model_id: str) -> str: rows = _models[_models["id"] == model_id] return rows.iloc[0]["name"] if not rows.empty else model_id # --------------------------------------------------------------------------- # Public query functions # --------------------------------------------------------------------------- def get_models() -> list[dict]: """Return all models as a list of dicts.""" if _models.empty: return [] return _models[["id", "name", "provider"]].to_dict(orient="records") def get_probe_categories() -> list[str]: """Return all unique probe categories present in probe_results.""" if _probe_results.empty: return [] return sorted(_probe_results["probe_category"].dropna().unique().tolist()) def get_leaderboard( probe_category: str | None = None, model_id: str | None = None, ) -> list[dict]: """Aggregate probe results from each model's latest complete run. Mirrors GET /api/leaderboard response shape: model_id, model_name, provider, probe_category, total_pass, total_fail, score, pass_rate, origin """ if _probe_results.empty or _runs.empty or _models.empty: return [] latest = _latest_run_ids() # {model_id: run_id} if not latest: return [] latest_run_ids = set(latest.values()) pr = _probe_results[_probe_results["run_id"].isin(latest_run_ids)].copy() if probe_category: pr = pr[pr["probe_category"] == probe_category] if model_id: run_id = latest.get(model_id) if not run_id: return [] pr = pr[pr["run_id"] == run_id] if pr.empty: return [] # Join run.model_id back onto probe_results via run_id. # Exclude triggered_by from the join/groupby — it can be null and would silently # drop rows from the groupby (pandas default dropna=True behaviour). run_map = _runs[_runs["id"].isin(latest_run_ids)][["id", "model_id"]].copy() run_map = run_map.rename(columns={"id": "run_id"}) pr = pr.merge(run_map, on="run_id", how="left") model_map = _models[["id", "name", "provider"]].rename(columns={"id": "model_id"}) pr = pr.merge(model_map, on="model_id", how="left") grouped = ( pr.groupby(["model_id", "name", "provider", "probe_category"]) .agg(total_pass=("pass_count", "sum"), total_fail=("fail_count", "sum"), score=("score", "mean")) .reset_index() ) grouped["pass_rate"] = grouped.apply( lambda r: r["total_pass"] / (r["total_pass"] + r["total_fail"]) if (r["total_pass"] + r["total_fail"]) > 0 else 0.0, axis=1, ) grouped = grouped.sort_values("score") # Look up triggered_by per model from its latest run separately to avoid groupby null drop. model_origin = {} for mid, rid in latest.items(): row = _runs[_runs["id"] == rid]["triggered_by"] model_origin[mid] = row.iloc[0] if not row.empty and pd.notna(row.iloc[0]) else "api" return [ { "model_id": row["model_id"], "model_name": row["name"], "provider": row["provider"], "probe_category": row["probe_category"], "total_pass": int(row["total_pass"]), "total_fail": int(row["total_fail"]), "score": float(row["score"]) if pd.notna(row["score"]) else 0.0, "pass_rate": float(row["pass_rate"]), "origin": model_origin.get(row["model_id"], "api"), } for _, row in grouped.iterrows() ] def get_model_detail(model_id: str) -> dict | None: """Return probe results for a model's latest complete run.""" latest = _latest_run_ids() run_id = latest.get(model_id) if not run_id: return None pr = _probe_results[_probe_results["run_id"] == run_id] if pr.empty: return None model_row = _models[_models["id"] == model_id] model_name = model_row.iloc[0]["name"] if not model_row.empty else model_id provider = model_row.iloc[0]["provider"] if not model_row.empty else "" probe_results = [ { "probe_name": r["probe_name"], "probe_category": r["probe_category"], "detector": r["detector"], "pass_count": int(r["pass_count"]), "fail_count": int(r["fail_count"]), "score": float(r["score"]) if pd.notna(r.get("score")) else None, } for _, r in pr.iterrows() ] return { "model_id": model_id, "model_name": model_name, "provider": provider, "run_id": run_id, "probe_results": probe_results, } def get_risk_leaderboard(included_risks: list[str]) -> list[dict]: """Risk-based leaderboard: mean of per-risk pass rates across included categories.""" if _probe_results.empty or _models.empty: return [] latest = _latest_run_ids() if not latest: return [] results = [] for model_id, run_id in latest.items(): pr = _probe_results[_probe_results["run_id"] == run_id] pr_dicts = [ {"probe_category": r["probe_category"], "pass_count": int(r["pass_count"]), "fail_count": int(r["fail_count"])} for _, r in pr.iterrows() ] per_risk, overall = compute_risk_pass_rates(pr_dicts, included_risks=included_risks) model_row = _models[_models["id"] == model_id] results.append({ "model_id": model_id, "model_name": model_row.iloc[0]["name"] if not model_row.empty else model_id, "provider": model_row.iloc[0]["provider"] if not model_row.empty else "", "overall_pass_rate": overall, "per_risk": per_risk, }) results.sort(key=lambda r: r["overall_pass_rate"] if r["overall_pass_rate"] is not None else -1, reverse=True) return results def get_trends(model_id: str, included_risks: list[str]) -> dict | None: """All completed scan results for a model, ordered by date.""" model_row = _models[_models["id"] == model_id] if model_row.empty: return None model_name = model_row.iloc[0]["name"] runs = _runs[(_runs["model_id"] == model_id) & (_runs["status"] == "complete")].copy() runs = runs.sort_values("completed_at") points = [] for _, run in runs.iterrows(): pr = _probe_results[_probe_results["run_id"] == run["id"]] pr_dicts = [ {"probe_category": r["probe_category"], "pass_count": int(r["pass_count"]), "fail_count": int(r["fail_count"])} for _, r in pr.iterrows() ] per_risk, overall = compute_risk_pass_rates(pr_dicts, included_risks=included_risks) completed_at = run["completed_at"] if hasattr(completed_at, "isoformat"): completed_at = completed_at.isoformat() points.append({ "run_id": run["id"], "completed_at": completed_at, "per_risk": per_risk, "overall_pass_rate": overall, }) return {"model_id": model_id, "model_name": model_name, "points": points} def get_run_summary() -> list[dict]: """Per-model run status counts.""" if _runs.empty or _models.empty: return [] model_map = dict(zip(_models["id"], _models["name"])) provider_map = dict(zip(_models["id"], _models["provider"])) summary = ( _runs.groupby(["model_id", "status"]) .size() .unstack(fill_value=0) .reset_index() ) for col in ("complete", "running", "pending", "failed"): if col not in summary.columns: summary[col] = 0 # Latest origin per model latest_origin = ( _runs[_runs["status"] == "complete"] .sort_values("completed_at") .groupby("model_id")["triggered_by"] .last() ) rows = [] for _, row in summary.iterrows(): mid = row["model_id"] rows.append({ "model_name": model_map.get(mid, mid), "provider": provider_map.get(mid, ""), "complete": int(row.get("complete", 0)), "running": int(row.get("running", 0)), "pending": int(row.get("pending", 0)), "failed": int(row.get("failed", 0)), "latest_origin": latest_origin.get(mid, ""), }) return rows def get_runs(status: str | None = None) -> list[dict]: """Recent runs (up to 200), optionally filtered by status.""" if _runs.empty or _models.empty: return [] model_map = dict(zip(_models["id"], _models["name"])) runs = _runs.copy() if status and status != "All": runs = runs[runs["status"] == status] runs = runs.sort_values("created_at", ascending=False).head(200) rows = [] for _, r in runs.iterrows(): rows.append({ "id": r["id"], "model_id": r["model_id"], "model_name": model_map.get(r["model_id"], r["model_id"]), "status": r["status"], "garak_version": r.get("garak_version") or "", "created_at": r["created_at"].isoformat() if pd.notna(r["created_at"]) else "", "completed_at": r["completed_at"].isoformat() if pd.notna(r.get("completed_at")) else "", }) return rows def get_run_probe_results(run_id: str) -> list[dict]: """All probe results for a specific run.""" pr = _probe_results[_probe_results["run_id"] == run_id] return [ { "probe_name": r["probe_name"], "probe_category": r["probe_category"], "detector": r["detector"], "pass_count": int(r["pass_count"]), "fail_count": int(r["fail_count"]), "score": float(r["score"]) if pd.notna(r.get("score")) else None, } for _, r in pr.iterrows() ]