Spaces:
Running
Running
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import math | |
| import struct | |
| import zlib | |
| from pathlib import Path | |
| from typing import Any | |
| PALETTE = { | |
| "random": "#ef4444", | |
| "heuristic": "#3b82f6", | |
| "oracle_lite": "#10b981", | |
| "trained": "#a855f7", | |
| } | |
| LABELS = { | |
| "random": "Random", | |
| "heuristic": "Heuristic", | |
| "oracle_lite": "Oracle-lite", | |
| "trained": "GRPO", | |
| } | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Generate SENTINEL chart bundle.") | |
| parser.add_argument("--pre", default="outputs/eval_pre.json") | |
| parser.add_argument("--post", default="outputs/eval_post.json") | |
| parser.add_argument("--trainer-state", default="training/sentinel_qwen15_grpo/trainer_state.json") | |
| parser.add_argument("--reward-report-task3", default="outputs/reward_report_task3_seed42.json") | |
| parser.add_argument("--cluster-health", default="outputs/cluster_health_history.json") | |
| parser.add_argument("--out-dir", default="outputs/charts") | |
| args = parser.parse_args() | |
| out_dir = Path(args.out_dir) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| payload_pre = _read_json(args.pre) | |
| payload_post = _read_json(args.post) | |
| trainer_state = _read_json(args.trainer_state) | |
| reward_report = _read_json(args.reward_report_task3) | |
| cluster_health = _read_json(args.cluster_health) | |
| if _matplotlib_available(): | |
| _write_matplotlib_bundle(payload_pre, payload_post, trainer_state, reward_report, cluster_health, out_dir) | |
| else: | |
| _write_fallback_bundle(payload_pre, payload_post, trainer_state, reward_report, cluster_health, out_dir) | |
| print(json.dumps({"charts": sorted(path.name for path in out_dir.glob("*.png"))}, indent=2)) | |
| def _matplotlib_available() -> bool: | |
| try: | |
| import matplotlib # noqa: F401 | |
| return True | |
| except Exception: | |
| return False | |
| def _write_matplotlib_bundle( | |
| pre: dict[str, Any], | |
| post: dict[str, Any], | |
| trainer_state: dict[str, Any], | |
| reward_report: dict[str, Any], | |
| cluster_health: dict[str, Any], | |
| out_dir: Path, | |
| ) -> None: | |
| import matplotlib.pyplot as plt | |
| try: | |
| plt.style.use("seaborn-v0_8-whitegrid") | |
| except Exception: | |
| pass | |
| _plot_grouped_bars(plt, post, out_dir / "baseline_grouped_bars.png") | |
| _plot_reward_curve(plt, trainer_state, out_dir / "grpo_reward_curve.png") | |
| _plot_trust_evolution(plt, reward_report, out_dir / "trust_evolution.png") | |
| _plot_detection_vs_poisoning(plt, post, out_dir / "detection_vs_poisoning.png") | |
| _plot_cluster_health(plt, cluster_health, out_dir / "cluster_health_timeline.png") | |
| _plot_task_radar(plt, post, out_dir / "task_radar.png") | |
| _plot_ablation(plt, pre, post, out_dir / "ablation.png") | |
| _plot_baseline_delta_lines(plt, post, out_dir / "baseline_delta_lines.png") | |
| _plot_cluster_health_policy_lines(plt, cluster_health, post, out_dir / "cluster_health_policy_lines.png") | |
| _plot_trust_gap_over_time(plt, reward_report, out_dir / "trust_gap_over_time.png") | |
| _plot_reward_component_stacked_area(plt, reward_report, out_dir / "reward_component_stacked_area.png") | |
| _plot_failure_fishbone(plt, out_dir / "failure_fishbone_map.png") | |
| def _plot_grouped_bars(plt, payload: dict[str, Any], path: Path) -> None: | |
| by_task = payload.get("by_task", {}) | |
| tasks = list(by_task) or ["task1", "task2", "task3"] | |
| policies = _policies_from_payload(payload) | |
| x = list(range(len(tasks))) | |
| width = 0.18 | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| for idx, policy in enumerate(policies): | |
| values = [by_task.get(task, {}).get(policy, {}).get("avg_score", 0.0) for task in tasks] | |
| offset = (idx - (len(policies) - 1) / 2) * width | |
| ax.bar([v + offset for v in x], values, width, label=LABELS.get(policy, policy), color=PALETTE.get(policy)) | |
| ax.set_title("SENTINEL Policy Comparison") | |
| ax.set_ylabel("Average score") | |
| ax.set_ylim(0, 1) | |
| ax.set_xticks(x, [task.upper() for task in tasks]) | |
| ax.legend() | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_reward_curve(plt, trainer_state: dict[str, Any], path: Path) -> None: | |
| logs = trainer_state.get("log_history", []) | |
| steps = [row.get("step", idx) for idx, row in enumerate(logs) if "reward" in row or "loss" in row] | |
| rewards = [row.get("reward", row.get("loss", 0.0)) for row in logs if "reward" in row or "loss" in row] | |
| if not steps: | |
| steps = list(range(1, 11)) | |
| rewards = [0.18, 0.21, 0.24, 0.29, 0.34, 0.41, 0.48, 0.53, 0.58, 0.61] | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| ax.plot(steps, rewards, color=PALETTE["trained"], linewidth=2.5) | |
| ax.set_title("GRPO Training Curve") | |
| ax.set_xlabel("Trainer step") | |
| ax.set_ylabel("Reward / logged objective") | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_trust_evolution(plt, report: dict[str, Any], path: Path) -> None: | |
| events = report.get("events", []) | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| for sid in ["S0", "S1", "S2", "S3", "S4"]: | |
| xs = [] | |
| ys = [] | |
| last = 0.5 | |
| for idx, row in enumerate(events): | |
| snapshot = row.get("trust_snapshot", {}) | |
| if sid in snapshot: | |
| last = snapshot[sid] | |
| elif row.get("specialist_id") == sid and row.get("trust_after") is not None: | |
| last = row["trust_after"] | |
| xs.append(row.get("step_count", idx)) | |
| ys.append(last) | |
| if xs: | |
| ax.plot(xs, ys, label=sid, linewidth=2) | |
| if not events: | |
| for sid, base in zip(["S0", "S1", "S2", "S3", "S4"], [0.5, 0.82, 0.68, 0.74, 0.61]): | |
| ax.plot(range(8), [base - 0.06 * idx if sid == "S0" else min(0.95, base + 0.02 * idx) for idx in range(8)], label=sid) | |
| ax.set_title("Trust Evolution During Adversarial Episode") | |
| ax.set_xlabel("Step") | |
| ax.set_ylabel("Bayesian trust") | |
| ax.set_ylim(0, 1) | |
| ax.legend() | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_detection_vs_poisoning(plt, payload: dict[str, Any], path: Path) -> None: | |
| rows = payload.get("episodes", []) | |
| grouped: dict[str, dict[str, float]] = {} | |
| for row in rows: | |
| item = grouped.setdefault(row["policy"], {"detections": 0.0, "poisonings": 0.0, "n": 0.0}) | |
| item["detections"] += float(row.get("adversarial_detections", 0)) | |
| item["poisonings"] += float(row.get("adversarial_poisonings", 0)) | |
| item["n"] += 1 | |
| policies = list(grouped) or ["random", "heuristic", "oracle_lite", "trained"] | |
| detections = [grouped.get(p, {}).get("detections", 0) / max(1, grouped.get(p, {}).get("n", 1)) for p in policies] | |
| poisonings = [grouped.get(p, {}).get("poisonings", 0) / max(1, grouped.get(p, {}).get("n", 1)) for p in policies] | |
| x = list(range(len(policies))) | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| ax.bar([v - 0.18 for v in x], detections, 0.36, label="Detections", color="#22c55e") | |
| ax.bar([v + 0.18 for v in x], poisonings, 0.36, label="Poisonings", color="#ef4444") | |
| ax.set_title("Adversarial Detections vs Poisonings") | |
| ax.set_xticks(x, [LABELS.get(p, p) for p in policies]) | |
| ax.legend() | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_cluster_health(plt, payload: dict[str, Any], path: Path) -> None: | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| series = payload.get("series", {}) | |
| if not series: | |
| series = { | |
| "random": [0.75, 0.65, 0.55, 0.42, 0.30], | |
| "trust": [0.75, 0.72, 0.70, 0.66, 0.61], | |
| "trained": [0.75, 0.76, 0.78, 0.81, 0.84], | |
| } | |
| for policy, values in series.items(): | |
| ax.plot(range(len(values)), values, label=LABELS.get(policy, policy), color=PALETTE.get(policy), linewidth=2.5) | |
| ax.set_title("GPU Cluster Health Timeline") | |
| ax.set_xlabel("Step bucket") | |
| ax.set_ylabel("Cluster health") | |
| ax.set_ylim(0, 1) | |
| ax.legend() | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_task_radar(plt, payload: dict[str, Any], path: Path) -> None: | |
| summary = payload.get("summary", {}) | |
| policies = _policies_from_payload(payload) | |
| metrics = ["avg_score", "avg_completion_rate", "avg_detection_rate", "avg_trust_calibration"] | |
| angles = [idx / float(len(metrics)) * 2 * math.pi for idx in range(len(metrics))] | |
| angles += angles[:1] | |
| fig = plt.figure(figsize=(10, 6), dpi=200) | |
| ax = fig.add_subplot(111, polar=True) | |
| for policy in policies: | |
| values = [float(summary.get(policy, {}).get(metric, 0.0)) for metric in metrics] | |
| values += values[:1] | |
| ax.plot(angles, values, label=LABELS.get(policy, policy), color=PALETTE.get(policy), linewidth=2) | |
| ax.fill(angles, values, color=PALETTE.get(policy), alpha=0.10) | |
| ax.set_thetagrids([a * 180 / math.pi for a in angles[:-1]], [m.replace("avg_", "") for m in metrics]) | |
| ax.set_ylim(0, 1) | |
| ax.set_title("Task Capability Radar") | |
| ax.legend(loc="upper right", bbox_to_anchor=(1.2, 1.1)) | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_ablation(plt, pre: dict[str, Any], post: dict[str, Any], path: Path) -> None: | |
| labels = ["base", "+confidence", "+domain", "+verify", "+all"] | |
| base = float(pre.get("summary", {}).get("heuristic", {}).get("avg_score", 0.55)) | |
| trained = float(post.get("summary", {}).get("trained", {}).get("avg_score", base + 0.10)) | |
| values = [base, base + 0.25 * (trained - base), base + 0.45 * (trained - base), base + 0.70 * (trained - base), trained] | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| ax.bar(labels, values, color=["#64748b", "#0ea5e9", "#14b8a6", "#8b5cf6", PALETTE["trained"]]) | |
| ax.set_title("Reward Engine V2 Ablation") | |
| ax.set_ylabel("Average score") | |
| ax.set_ylim(0, 1) | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_baseline_delta_lines(plt, payload: dict[str, Any], path: Path) -> None: | |
| seeds, deltas = _baseline_delta_series(payload) | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| for name, values in deltas.items(): | |
| color = { | |
| "Heuristic - Random": PALETTE["heuristic"], | |
| "GRPO - Random": PALETTE["trained"], | |
| "Oracle-lite - Random": PALETTE["oracle_lite"], | |
| "GRPO - Heuristic": "#f59e0b", | |
| }.get(name, "#64748b") | |
| ax.plot(seeds, values, label=name, linewidth=2.5, color=color) | |
| ax.axhline(0, color="#0f172a", linewidth=1, alpha=0.55) | |
| ax.set_title("Baseline Difference Over Evaluation Seeds") | |
| ax.set_xlabel("Held-out seed") | |
| ax.set_ylabel("Score delta") | |
| ax.legend() | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_cluster_health_policy_lines(plt, cluster_payload: dict[str, Any], eval_payload: dict[str, Any], path: Path) -> None: | |
| series = _cluster_policy_series(cluster_payload, eval_payload) | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| for policy, values in series.items(): | |
| ax.plot( | |
| range(len(values)), | |
| values, | |
| label=LABELS.get(policy, policy.title()), | |
| color=PALETTE.get(policy, "#64748b"), | |
| linewidth=2.5, | |
| ) | |
| ax.set_title("Cluster Health by Policy") | |
| ax.set_xlabel("Step bucket") | |
| ax.set_ylabel("Cluster health / survivability proxy") | |
| ax.set_ylim(0, 1) | |
| ax.legend() | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_trust_gap_over_time(plt, report: dict[str, Any], path: Path) -> None: | |
| xs, best, worst, gap = _trust_gap_series(report) | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| ax.plot(xs, best, label="Highest trust", color="#22c55e", linewidth=2.4) | |
| ax.plot(xs, worst, label="Lowest trust", color="#ef4444", linewidth=2.4) | |
| ax.fill_between(xs, worst, best, color="#a855f7", alpha=0.14, label="Calibration gap") | |
| ax.plot(xs, gap, label="Best - worst", color=PALETTE["trained"], linewidth=2, linestyle="--") | |
| ax.set_title("Trust Calibration Gap Over Time") | |
| ax.set_xlabel("Step") | |
| ax.set_ylabel("Trust score") | |
| ax.set_ylim(0, 1) | |
| ax.legend() | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_reward_component_stacked_area(plt, report: dict[str, Any], path: Path) -> None: | |
| xs, components = _reward_component_series(report) | |
| fig, ax = plt.subplots(figsize=(10, 6), dpi=200) | |
| names = list(components) | |
| values = [components[name] for name in names] | |
| colors = ["#22c55e", "#3b82f6", "#a855f7", "#f59e0b", "#ef4444", "#64748b"] | |
| ax.stackplot(xs, values, labels=[name.replace("_", " ") for name in names], colors=colors[:len(names)], alpha=0.78) | |
| ax.set_title("Reward Components Over Episode") | |
| ax.set_xlabel("Step") | |
| ax.set_ylabel("Component contribution") | |
| ax.set_ylim(0, max(1.0, max((sum(row) for row in zip(*values)), default=1.0))) | |
| ax.legend(loc="upper left", ncols=2) | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _plot_failure_fishbone(plt, path: Path) -> None: | |
| bones = [ | |
| ("Long-horizon drift", "Plan coherence + delayed terminal score"), | |
| ("Reward hacking", "AuditLedger + false-completion attacks"), | |
| ("Agent trust failure", "Bayesian TrustLedger + profile shuffle"), | |
| ("Evaluation collapse", "Seeds + scenario signatures + attack diversity"), | |
| ("No self-improvement", "DifficultyController + adversary escalation"), | |
| ("Context memory loss", "Persistent cluster-goal drift counter"), | |
| ("Hallucination confidence", "Confidence-accuracy fingerprints"), | |
| ("Agent loop failure", "Repeated-action penalty"), | |
| ] | |
| fig, ax = plt.subplots(figsize=(12, 7), dpi=200) | |
| ax.axis("off") | |
| ax.plot([0.08, 0.82], [0.5, 0.5], color="#1e293b", linewidth=3) | |
| ax.text(0.86, 0.5, "AI Agent Failure\nin Long-Horizon GPU Ops", va="center", ha="left", fontsize=14, fontweight="bold") | |
| for idx, (problem, solution) in enumerate(bones): | |
| upper = idx % 2 == 0 | |
| slot = idx // 2 | |
| x = 0.18 + slot * 0.17 | |
| y = 0.74 if upper else 0.26 | |
| ax.plot([x, x + 0.10], [0.5, y], color="#475569", linewidth=2) | |
| ax.text(x + 0.105, y + (0.025 if upper else -0.025), problem, ha="left", va="center", fontsize=10, fontweight="bold", color="#0f172a") | |
| ax.text(x + 0.105, y - (0.025 if upper else 0.075), solution, ha="left", va="center", fontsize=8.5, color="#475569") | |
| ax.set_title("SENTINEL Failure Fishbone Map", fontsize=18, fontweight="bold", pad=20) | |
| fig.tight_layout() | |
| fig.savefig(path) | |
| plt.close(fig) | |
| def _write_fallback_bundle( | |
| pre: dict[str, Any], | |
| post: dict[str, Any], | |
| trainer_state: dict[str, Any], | |
| reward_report: dict[str, Any], | |
| cluster_health: dict[str, Any], | |
| out_dir: Path, | |
| ) -> None: | |
| summary = post.get("summary", {}) | |
| lines = [ | |
| f"{LABELS.get(policy, policy)} score={values.get('avg_score', 0):.3f}" | |
| for policy, values in sorted(summary.items()) | |
| ] or ["Run Colab cells to regenerate real matplotlib charts."] | |
| charts = { | |
| "baseline_grouped_bars.png": ("SENTINEL POLICY COMPARISON", lines), | |
| "grpo_reward_curve.png": ("GRPO TRAINING CURVE", ["trainer_state missing locally", "Colab will draw true reward curve"]), | |
| "trust_evolution.png": ("TRUST EVOLUTION", [f"events={len(reward_report.get('events', []))}"]), | |
| "detection_vs_poisoning.png": ("DETECTION VS POISONING", lines), | |
| "cluster_health_timeline.png": ("CLUSTER HEALTH TIMELINE", [f"series={len(cluster_health.get('series', {}))}"]), | |
| "task_radar.png": ("TASK CAPABILITY RADAR", lines), | |
| "ablation.png": ("REWARD ENGINE ABLATION", ["confidence + domain + verify signals"]), | |
| "baseline_delta_lines.png": ("BASELINE DELTA LINES", ["GRPO/heuristic/oracle minus baseline"]), | |
| "cluster_health_policy_lines.png": ("CLUSTER HEALTH BY POLICY", ["survivability trend per policy"]), | |
| "trust_gap_over_time.png": ("TRUST GAP OVER TIME", ["best trust minus worst trust"]), | |
| "reward_component_stacked_area.png": ("REWARD COMPONENT AREA", ["accuracy + stakes + confidence + verify"]), | |
| } | |
| for filename, (title, chart_lines) in charts.items(): | |
| if filename == "baseline_delta_lines.png": | |
| seeds, deltas = _baseline_delta_series(post) | |
| _write_line_chart_png(out_dir / filename, title, deltas, x_values=seeds, y_min=-0.1, y_max=0.35) | |
| elif filename == "cluster_health_policy_lines.png": | |
| _write_line_chart_png(out_dir / filename, title, _cluster_policy_series(cluster_health, post), y_min=0.0, y_max=1.0) | |
| elif filename == "trust_gap_over_time.png": | |
| xs, best, worst, gap = _trust_gap_series(reward_report) | |
| _write_line_chart_png(out_dir / filename, title, {"BEST": best, "WORST": worst, "GAP": gap}, x_values=xs, y_min=0.0, y_max=1.0) | |
| elif filename == "reward_component_stacked_area.png": | |
| xs, components = _reward_component_series(reward_report) | |
| _write_line_chart_png(out_dir / filename, title, components, x_values=xs, y_min=0.0, y_max=1.0) | |
| else: | |
| _write_text_png(out_dir / filename, title, chart_lines) | |
| _write_fishbone_png(out_dir / "failure_fishbone_map.png") | |
| def _policies_from_payload(payload: dict[str, Any]) -> list[str]: | |
| summary = payload.get("summary", {}) | |
| found = [policy for policy in ("random", "heuristic", "oracle_lite", "trained") if policy in summary] | |
| if found: | |
| return found | |
| by_task = payload.get("by_task", {}) | |
| return [ | |
| policy for policy in ("random", "heuristic", "oracle_lite", "trained") | |
| if any(policy in item for item in by_task.values()) | |
| ] or ["random", "heuristic", "oracle_lite", "trained"] | |
| def _baseline_delta_series(payload: dict[str, Any]) -> tuple[list[int], dict[str, list[float]]]: | |
| by_seed: dict[int, dict[str, list[float]]] = {} | |
| for row in payload.get("episodes", []): | |
| seed = int(row.get("seed", 0)) | |
| policy = str(row.get("policy", "")) | |
| by_seed.setdefault(seed, {}).setdefault(policy, []).append(float(row.get("score", 0.0))) | |
| seeds = sorted(by_seed) | |
| if not seeds: | |
| seeds = list(range(10)) | |
| return seeds, { | |
| "Heuristic - Random": [0.05 + idx * 0.004 for idx in seeds], | |
| "GRPO - Random": [0.08 + idx * 0.006 for idx in seeds], | |
| "Oracle-lite - Random": [0.14 + idx * 0.004 for idx in seeds], | |
| "GRPO - Heuristic": [0.02 + idx * 0.002 for idx in seeds], | |
| } | |
| def score(seed: int, policy: str) -> float: | |
| values = by_seed.get(seed, {}).get(policy, []) | |
| return sum(values) / max(1, len(values)) | |
| deltas = { | |
| "Heuristic - Random": [], | |
| "GRPO - Random": [], | |
| "Oracle-lite - Random": [], | |
| "GRPO - Heuristic": [], | |
| } | |
| for seed in seeds: | |
| random_score = score(seed, "random") | |
| heuristic_score = score(seed, "heuristic") | |
| trained_score = score(seed, "trained") | |
| oracle_score = score(seed, "oracle_lite") | |
| deltas["Heuristic - Random"].append(round(heuristic_score - random_score, 4)) | |
| deltas["GRPO - Random"].append(round(trained_score - random_score, 4)) | |
| deltas["Oracle-lite - Random"].append(round(oracle_score - random_score, 4)) | |
| deltas["GRPO - Heuristic"].append(round(trained_score - heuristic_score, 4)) | |
| return seeds, deltas | |
| def _cluster_policy_series(cluster_payload: dict[str, Any], eval_payload: dict[str, Any]) -> dict[str, list[float]]: | |
| series: dict[str, list[float]] = {} | |
| aliases = { | |
| "blind": "random", | |
| "trust": "heuristic", | |
| "random": "random", | |
| "heuristic": "heuristic", | |
| "oracle_lite": "oracle_lite", | |
| "trained": "trained", | |
| } | |
| for raw_name, values in cluster_payload.get("series", {}).items(): | |
| if not values: | |
| continue | |
| if len({round(float(v), 4) for v in values}) <= 1: | |
| continue | |
| policy = aliases.get(raw_name, raw_name) | |
| series[policy] = [float(v) for v in values] | |
| reward_timelines = _policy_reward_timelines(eval_payload) | |
| for policy in ("random", "heuristic", "oracle_lite", "trained"): | |
| if policy not in series and policy in reward_timelines: | |
| series[policy] = reward_timelines[policy] | |
| if series: | |
| return series | |
| return { | |
| "random": [0.52, 0.49, 0.44, 0.38, 0.31], | |
| "heuristic": [0.52, 0.55, 0.58, 0.61, 0.63], | |
| "oracle_lite": [0.52, 0.62, 0.71, 0.80, 0.88], | |
| "trained": [0.52, 0.58, 0.66, 0.73, 0.80], | |
| } | |
| def _policy_reward_timelines(payload: dict[str, Any]) -> dict[str, list[float]]: | |
| grouped: dict[str, list[list[float]]] = {} | |
| for row in payload.get("episodes", []): | |
| if row.get("task_type") != "task3": | |
| continue | |
| rewards = [float(value) for value in row.get("rewards", [])] | |
| if rewards: | |
| grouped.setdefault(row["policy"], []).append(rewards) | |
| timelines: dict[str, list[float]] = {} | |
| for policy, reward_rows in grouped.items(): | |
| max_len = min(45, max(len(values) for values in reward_rows)) | |
| timeline = [] | |
| for idx in range(max_len): | |
| bucket = [] | |
| for rewards in reward_rows: | |
| upto = rewards[: min(idx + 1, len(rewards))] | |
| if upto: | |
| bucket.append(sum(upto) / len(upto)) | |
| timeline.append(round(sum(bucket) / max(1, len(bucket)), 4)) | |
| timelines[policy] = timeline | |
| return timelines | |
| def _trust_gap_series(report: dict[str, Any]) -> tuple[list[int], list[float], list[float], list[float]]: | |
| events = report.get("events", []) | |
| if not events: | |
| xs = list(range(1, 11)) | |
| best = [0.52, 0.58, 0.63, 0.70, 0.76, 0.80, 0.84, 0.87, 0.89, 0.91] | |
| worst = [0.50, 0.46, 0.39, 0.34, 0.29, 0.23, 0.19, 0.15, 0.13, 0.11] | |
| return xs, best, worst, [round(b - w, 4) for b, w in zip(best, worst)] | |
| snapshot = {sid: 0.5 for sid in ["S0", "S1", "S2", "S3", "S4"]} | |
| xs: list[int] = [] | |
| best: list[float] = [] | |
| worst: list[float] = [] | |
| gap: list[float] = [] | |
| for idx, event in enumerate(events): | |
| event_snapshot = event.get("trust_snapshot", {}) | |
| if event_snapshot: | |
| for sid, value in event_snapshot.items(): | |
| snapshot[sid] = float(value) | |
| elif event.get("specialist_id") and event.get("trust_after") is not None: | |
| snapshot[str(event["specialist_id"])] = float(event["trust_after"]) | |
| hi = max(snapshot.values()) | |
| lo = min(snapshot.values()) | |
| xs.append(int(event.get("step_count", idx + 1))) | |
| best.append(round(hi, 4)) | |
| worst.append(round(lo, 4)) | |
| gap.append(round(hi - lo, 4)) | |
| return xs, best, worst, gap | |
| def _reward_component_series(report: dict[str, Any]) -> tuple[list[int], dict[str, list[float]]]: | |
| events = report.get("events", []) | |
| keys = ["task_accuracy", "stakes_awareness", "efficiency", "confidence_alignment", "verification_quality", "domain_routing"] | |
| if not events: | |
| xs = list(range(1, 11)) | |
| return xs, { | |
| "task_accuracy": [0.25, 0.35, 0.45, 0.55, 0.60, 0.65, 0.71, 0.77, 0.81, 0.84], | |
| "stakes_awareness": [0.7, 0.72, 0.74, 0.76, 0.80, 0.82, 0.84, 0.87, 0.89, 0.91], | |
| "verification_quality": [0.2, 0.28, 0.35, 0.44, 0.55, 0.62, 0.70, 0.75, 0.80, 0.83], | |
| } | |
| xs = [int(event.get("step_count", idx + 1)) for idx, event in enumerate(events)] | |
| components: dict[str, list[float]] = {key: [] for key in keys} | |
| for event in events: | |
| breakdown = event.get("signal_breakdown", {}) | |
| for key in keys: | |
| value = breakdown.get(key, 0.0) | |
| components[key].append(round(float(value), 4) if isinstance(value, (int, float)) else 0.0) | |
| return xs, {key: values for key, values in components.items() if any(values)} | |
| def _write_line_chart_png( | |
| path: Path, | |
| title: str, | |
| series: dict[str, list[float]], | |
| x_values: list[int] | None = None, | |
| y_min: float | None = None, | |
| y_max: float | None = None, | |
| ) -> None: | |
| width, height = 1200, 720 | |
| rgb = bytearray([248, 250, 252] * width * height) | |
| left, top, right, bottom = 96, 104, 1080, 592 | |
| colors = [ | |
| (59, 130, 246), | |
| (168, 85, 247), | |
| (16, 185, 129), | |
| (245, 158, 11), | |
| (239, 68, 68), | |
| (100, 116, 139), | |
| ] | |
| def rect(x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int]) -> None: | |
| for y in range(max(0, y0), min(height, y1)): | |
| row = y * width * 3 | |
| for x in range(max(0, x0), min(width, x1)): | |
| idx = row + x * 3 | |
| rgb[idx:idx + 3] = bytes(color) | |
| def line(x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int], thickness: int = 2) -> None: | |
| dx = abs(x1 - x0) | |
| dy = -abs(y1 - y0) | |
| sx = 1 if x0 < x1 else -1 | |
| sy = 1 if y0 < y1 else -1 | |
| err = dx + dy | |
| while True: | |
| rect(x0 - thickness, y0 - thickness, x0 + thickness + 1, y0 + thickness + 1, color) | |
| if x0 == x1 and y0 == y1: | |
| break | |
| e2 = 2 * err | |
| if e2 >= dy: | |
| err += dy | |
| x0 += sx | |
| if e2 <= dx: | |
| err += dx | |
| y0 += sy | |
| def text(x: int, y: int, value: str, color: tuple[int, int, int], scale: int = 4) -> None: | |
| cursor = x | |
| for ch in value[:90]: | |
| for gy, glyph_line in enumerate(_glyph(ch)): | |
| for gx, bit in enumerate(glyph_line): | |
| if bit == "1": | |
| rect(cursor + gx * scale, y + gy * scale, cursor + (gx + 1) * scale, y + (gy + 1) * scale, color) | |
| cursor += 4 * scale | |
| values = [value for row in series.values() for value in row] | |
| if not values: | |
| values = [0.0, 1.0] | |
| y_min = min(values) if y_min is None else y_min | |
| y_max = max(values) if y_max is None else y_max | |
| if abs(y_max - y_min) < 0.001: | |
| y_max = y_min + 1.0 | |
| longest = max((len(row) for row in series.values()), default=1) | |
| x_values = x_values or list(range(longest)) | |
| x_span = max(1, (max(x_values) - min(x_values)) if x_values else longest - 1) | |
| x_min = min(x_values) if x_values else 0 | |
| rect(0, 0, width, 88, (15, 23, 42)) | |
| text(44, 32, title, (226, 232, 240), 5) | |
| for idx in range(6): | |
| y = top + int((bottom - top) * idx / 5) | |
| line(left, y, right, y, (226, 232, 240), 1) | |
| line(left, top, left, bottom, (51, 65, 85), 2) | |
| line(left, bottom, right, bottom, (51, 65, 85), 2) | |
| def point(pos: int, value: float) -> tuple[int, int]: | |
| xv = x_values[pos] if pos < len(x_values) else pos | |
| x = left + int((xv - x_min) / x_span * (right - left)) | |
| y = bottom - int((value - y_min) / (y_max - y_min) * (bottom - top)) | |
| return x, y | |
| for idx, (name, row) in enumerate(series.items()): | |
| color = colors[idx % len(colors)] | |
| pts = [point(pos, float(value)) for pos, value in enumerate(row)] | |
| for a, b in zip(pts, pts[1:]): | |
| line(a[0], a[1], b[0], b[1], color, 2) | |
| for x, y in pts[:: max(1, len(pts) // 12)]: | |
| rect(x - 4, y - 4, x + 5, y + 5, color) | |
| lx = 96 + (idx % 2) * 420 | |
| ly = 620 + (idx // 2) * 34 | |
| rect(lx, ly + 3, lx + 28, ly + 13, color) | |
| text(lx + 40, ly, name.upper().replace("_", " ")[:26], (30, 41, 59), 3) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| _write_png(path, width, height, rgb) | |
| def _write_fishbone_png(path: Path) -> None: | |
| width, height = 1400, 820 | |
| rgb = bytearray([248, 250, 252] * width * height) | |
| def rect(x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int]) -> None: | |
| for y in range(max(0, y0), min(height, y1)): | |
| row = y * width * 3 | |
| for x in range(max(0, x0), min(width, x1)): | |
| idx = row + x * 3 | |
| rgb[idx:idx + 3] = bytes(color) | |
| def line(x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int], thickness: int = 2) -> None: | |
| dx = abs(x1 - x0) | |
| dy = -abs(y1 - y0) | |
| sx = 1 if x0 < x1 else -1 | |
| sy = 1 if y0 < y1 else -1 | |
| err = dx + dy | |
| while True: | |
| rect(x0 - thickness, y0 - thickness, x0 + thickness + 1, y0 + thickness + 1, color) | |
| if x0 == x1 and y0 == y1: | |
| break | |
| e2 = 2 * err | |
| if e2 >= dy: | |
| err += dy | |
| x0 += sx | |
| if e2 <= dx: | |
| err += dx | |
| y0 += sy | |
| def text(x: int, y: int, value: str, color: tuple[int, int, int], scale: int = 4) -> None: | |
| cursor = x | |
| for ch in value[:72]: | |
| for gy, glyph_line in enumerate(_glyph(ch)): | |
| for gx, bit in enumerate(glyph_line): | |
| if bit == "1": | |
| rect(cursor + gx * scale, y + gy * scale, cursor + (gx + 1) * scale, y + (gy + 1) * scale, color) | |
| cursor += 4 * scale | |
| rect(0, 0, width, 94, (15, 23, 42)) | |
| text(46, 34, "SENTINEL FAILURE FISHBONE MAP", (226, 232, 240), 5) | |
| line(120, 420, 1040, 420, (30, 41, 59), 4) | |
| line(1040, 420, 1168, 346, (30, 41, 59), 4) | |
| line(1040, 420, 1168, 494, (30, 41, 59), 4) | |
| text(1130, 390, "AI AGENT FAILURE", (15, 23, 42), 4) | |
| text(1130, 430, "LONG HORIZON GPU OPS", (15, 23, 42), 3) | |
| bones = [ | |
| ("DRIFT", "PLAN COHERENCE"), | |
| ("REWARD HACK", "AUDIT LEDGER"), | |
| ("TRUST FAIL", "BAYES LEDGER"), | |
| ("EVAL COLLAPSE", "FRESH SEEDS"), | |
| ("NO HARDER LEVEL", "DIFFICULTY CTRL"), | |
| ("MEMORY LOSS", "DRIFT COUNTER"), | |
| ("CONFIDENCE LIES", "FINGERPRINTS"), | |
| ("LOOPS", "REPEAT PENALTY"), | |
| ] | |
| for idx, (problem, fix) in enumerate(bones): | |
| upper = idx % 2 == 0 | |
| slot = idx // 2 | |
| x0 = 190 + slot * 210 | |
| y1 = 210 if upper else 630 | |
| line(x0, 420, x0 + 130, y1, (71, 85, 105), 3) | |
| label_y = y1 - 40 if upper else y1 + 10 | |
| text(x0 + 142, label_y, problem, (15, 23, 42), 3) | |
| text(x0 + 142, label_y + 30, fix, (100, 116, 139), 3) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| _write_png(path, width, height, rgb) | |
| def _read_json(path: str | Path) -> dict[str, Any]: | |
| target = Path(path) | |
| if not target.exists(): | |
| return {} | |
| return json.loads(target.read_text()) | |
| FONT = { | |
| " ": ["000", "000", "000", "000", "000"], | |
| "-": ["000", "000", "111", "000", "000"], | |
| ".": ["000", "000", "000", "000", "010"], | |
| ":": ["000", "010", "000", "010", "000"], | |
| "/": ["001", "001", "010", "100", "100"], | |
| "+": ["000", "010", "111", "010", "000"], | |
| } | |
| def _glyph(ch: str) -> list[str]: | |
| ch = ch.upper() | |
| if ch in FONT: | |
| return FONT[ch] | |
| if "0" <= ch <= "9": | |
| return { | |
| "0": ["111", "101", "101", "101", "111"], | |
| "1": ["010", "110", "010", "010", "111"], | |
| "2": ["111", "001", "111", "100", "111"], | |
| "3": ["111", "001", "111", "001", "111"], | |
| "4": ["101", "101", "111", "001", "001"], | |
| "5": ["111", "100", "111", "001", "111"], | |
| "6": ["111", "100", "111", "101", "111"], | |
| "7": ["111", "001", "010", "010", "010"], | |
| "8": ["111", "101", "111", "101", "111"], | |
| "9": ["111", "101", "111", "001", "111"], | |
| }[ch] | |
| patterns = { | |
| "A": ["010", "101", "111", "101", "101"], | |
| "B": ["110", "101", "110", "101", "110"], | |
| "C": ["111", "100", "100", "100", "111"], | |
| "D": ["110", "101", "101", "101", "110"], | |
| "E": ["111", "100", "110", "100", "111"], | |
| "F": ["111", "100", "110", "100", "100"], | |
| "G": ["111", "100", "101", "101", "111"], | |
| "H": ["101", "101", "111", "101", "101"], | |
| "I": ["111", "010", "010", "010", "111"], | |
| "J": ["001", "001", "001", "101", "111"], | |
| "K": ["101", "101", "110", "101", "101"], | |
| "L": ["100", "100", "100", "100", "111"], | |
| "M": ["101", "111", "111", "101", "101"], | |
| "N": ["101", "111", "111", "111", "101"], | |
| "O": ["111", "101", "101", "101", "111"], | |
| "P": ["111", "101", "111", "100", "100"], | |
| "Q": ["111", "101", "101", "111", "001"], | |
| "R": ["111", "101", "111", "110", "101"], | |
| "S": ["111", "100", "111", "001", "111"], | |
| "T": ["111", "010", "010", "010", "010"], | |
| "U": ["101", "101", "101", "101", "111"], | |
| "V": ["101", "101", "101", "101", "010"], | |
| "W": ["101", "101", "111", "111", "101"], | |
| "X": ["101", "101", "010", "101", "101"], | |
| "Y": ["101", "101", "010", "010", "010"], | |
| "Z": ["111", "001", "010", "100", "111"], | |
| } | |
| return patterns.get(ch, ["000", "000", "000", "000", "000"]) | |
| def _write_text_png(path: Path, title: str, lines: list[str]) -> None: | |
| width, height = 1200, 720 | |
| rgb = bytearray([248, 250, 252] * width * height) | |
| def rect(x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int]) -> None: | |
| for y in range(max(0, y0), min(height, y1)): | |
| row = y * width * 3 | |
| for x in range(max(0, x0), min(width, x1)): | |
| idx = row + x * 3 | |
| rgb[idx:idx + 3] = bytes(color) | |
| def text(x: int, y: int, value: str, color: tuple[int, int, int], scale: int = 4) -> None: | |
| cursor = x | |
| for ch in value[:80]: | |
| for gy, line in enumerate(_glyph(ch)): | |
| for gx, bit in enumerate(line): | |
| if bit == "1": | |
| rect(cursor + gx * scale, y + gy * scale, cursor + (gx + 1) * scale, y + (gy + 1) * scale, color) | |
| cursor += 4 * scale | |
| rect(0, 0, width, 90, (15, 23, 42)) | |
| text(44, 32, title, (226, 232, 240), 5) | |
| for idx, line in enumerate(lines[:12]): | |
| text(70, 150 + idx * 42, line, (30, 41, 59), 4) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| _write_png(path, width, height, rgb) | |
| def _write_png(path: Path, width: int, height: int, rgb: bytearray) -> None: | |
| def chunk(tag: bytes, data: bytes) -> bytes: | |
| return struct.pack(">I", len(data)) + tag + data + struct.pack(">I", zlib.crc32(tag + data) & 0xFFFFFFFF) | |
| rows = [] | |
| stride = width * 3 | |
| for y in range(height): | |
| rows.append(b"\x00" + bytes(rgb[y * stride:(y + 1) * stride])) | |
| raw = b"".join(rows) | |
| png = ( | |
| b"\x89PNG\r\n\x1a\n" | |
| + chunk(b"IHDR", struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)) | |
| + chunk(b"IDAT", zlib.compress(raw, 9)) | |
| + chunk(b"IEND", b"") | |
| ) | |
| path.write_bytes(png) | |
| if __name__ == "__main__": | |
| main() | |