Spaces:
Sleeping
Sleeping
| """Laptop-safe OpenEnv loop evaluation for ShadowOps.""" | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import random | |
| import statistics | |
| import struct | |
| import sys | |
| import zlib | |
| from pathlib import Path | |
| from typing import Any | |
| BACKEND_DIR = Path(__file__).resolve().parents[1] | |
| TRAINING_DIR = BACKEND_DIR / "training" | |
| REPORTS_DIR = TRAINING_DIR / "reports" | |
| if str(BACKEND_DIR) not in sys.path: | |
| sys.path.insert(0, str(BACKEND_DIR)) | |
| from openenv_shadowops import ShadowOpsOpenEnv # noqa: E402 | |
| from training.shadowops_training_common import ( # noqa: E402 | |
| build_q_aware_decision, | |
| compute_risk_score, | |
| evaluate_outputs, | |
| load_validation_samples_for_benchmark, | |
| write_json, | |
| ) | |
| DEFAULT_EPISODES = 50 | |
| DEFAULT_EPISODE_MAX_LENGTH = 5 | |
| DEFAULT_POLICY = "q_aware" | |
| DEFAULT_BASELINE_POLICY = "heuristic" | |
| DEFAULT_BEHAVIOR_EXAMPLES = 8 | |
| TARGET_ACTIONS = {"BLOCK", "FORK", "QUARANTINE"} | |
| TIER_EXPECTED_ACTION = { | |
| "BENIGN_CLEAN": "ALLOW", | |
| "BENIGN_NOISY": "ALLOW", | |
| "AMBIGUOUS": "QUARANTINE", | |
| "MALICIOUS_SOFT": "BLOCK", | |
| "MALICIOUS_HARD": "FORK", | |
| } | |
| SUPPORTED_POLICIES = ("random", "heuristic", "q_aware") | |
| def _clip(value: float, low: float = 0.0, high: float = 1.0) -> float: | |
| return max(low, min(high, float(value))) | |
| def _confidence_summary(values: list[float]) -> dict[str, float]: | |
| if not values: | |
| return {"mean": 0.0, "min": 0.0, "max": 0.0, "std": 0.0} | |
| if len(values) == 1: | |
| std = 0.0 | |
| else: | |
| std = float(statistics.pstdev(values)) | |
| return { | |
| "mean": float(statistics.mean(values)), | |
| "min": float(min(values)), | |
| "max": float(max(values)), | |
| "std": std, | |
| } | |
| def _reward_summary(values: list[float]) -> dict[str, float]: | |
| if not values: | |
| return {"mean": 0.0, "median": 0.0, "min": 0.0, "max": 0.0, "std": 0.0} | |
| if len(values) == 1: | |
| std = 0.0 | |
| else: | |
| std = float(statistics.pstdev(values)) | |
| return { | |
| "mean": float(statistics.mean(values)), | |
| "median": float(statistics.median(values)), | |
| "min": float(min(values)), | |
| "max": float(max(values)), | |
| "std": std, | |
| } | |
| def _heuristic_action_and_confidence(risk: float, tier: str) -> tuple[str, float, str]: | |
| tier = str(tier or "").upper() | |
| if risk >= 0.62: | |
| confidence = 0.60 + min(0.35, (risk - 0.62) * 1.25) | |
| return "BLOCK", _clip(confidence), "heuristic high-risk threshold" | |
| if risk >= 0.38 or tier == "AMBIGUOUS": | |
| margin = abs(risk - 0.50) | |
| confidence = 0.56 + min(0.22, margin * 1.4) | |
| return "FORK", _clip(confidence), "heuristic uncertainty escalation" | |
| confidence = 0.58 + min(0.30, (0.38 - risk) * 1.2) | |
| return "ALLOW", _clip(confidence), "heuristic low-risk allow" | |
| def _expected_action(tier: str, is_malicious: bool) -> str: | |
| tier_name = str(tier or "").upper() | |
| if tier_name in TIER_EXPECTED_ACTION: | |
| return TIER_EXPECTED_ACTION[tier_name] | |
| return "FORK" if is_malicious else "ALLOW" | |
| def _choose_policy_decision( | |
| env: ShadowOpsOpenEnv, | |
| observation: dict[str, Any], | |
| policy_name: str, | |
| rng: random.Random, | |
| ) -> dict[str, Any]: | |
| actions = list(observation.get("available_actions") or ["ALLOW", "BLOCK", "FORK", "QUARANTINE"]) | |
| incident = observation.get("incident_state", {}) | |
| risk_vector = list(observation.get("risk_vector", [0.0] * 16)) | |
| risk = float(compute_risk_score(risk_vector)) | |
| if policy_name == "random": | |
| action = rng.choice(actions) | |
| return { | |
| "action": action, | |
| "confidence": 0.25, | |
| "explanation": "random baseline action", | |
| } | |
| if policy_name == "heuristic": | |
| action, confidence, reason = _heuristic_action_and_confidence(risk, str(incident.get("tier", ""))) | |
| return { | |
| "action": action, | |
| "confidence": confidence, | |
| "explanation": reason, | |
| } | |
| decision = build_q_aware_decision( | |
| incident.get("domain", "SOC"), | |
| incident.get("intent", "UNKNOWN"), | |
| incident.get("payload", ""), | |
| risk_vector, | |
| actor="openenv_agent", | |
| session_id=env.session_id, | |
| service=incident.get("domain", "unknown"), | |
| environment="production", | |
| provided_evidence=[], | |
| memory_context=env.state().get("memory_context", {}), | |
| ) | |
| action = str(decision.get("decision", "QUARANTINE")).upper() | |
| if action not in actions: | |
| action = "QUARANTINE" | |
| confidence = decision.get("confidence") | |
| if confidence is None: | |
| confidence = 0.50 + min(0.40, abs(risk - 0.50)) | |
| return { | |
| "action": action, | |
| "confidence": _clip(float(confidence)), | |
| "explanation": str(decision.get("explanation", "q-aware policy decision")).strip() or "q-aware policy decision", | |
| } | |
| def _sample_policy_decision( | |
| sample: dict[str, Any], | |
| policy_name: str, | |
| rng: random.Random, | |
| ) -> dict[str, Any]: | |
| risk_vector = list(sample.get("risk_vector") or [0.0] * 16) | |
| risk = float(sample.get("risk_score", compute_risk_score(risk_vector))) | |
| domain = str(sample.get("domain", "SOC")) | |
| intent = str(sample.get("intent", "UNKNOWN")) | |
| payload = str(sample.get("raw_payload", "")) | |
| tier = str(sample.get("scenario_type", "")) | |
| if policy_name == "random": | |
| action = rng.choice(["ALLOW", "BLOCK", "FORK", "QUARANTINE"]) | |
| return {"action": action, "confidence": 0.25, "rationale": "random baseline"} | |
| if policy_name == "heuristic": | |
| action, confidence, reason = _heuristic_action_and_confidence(risk, tier) | |
| return {"action": action, "confidence": confidence, "rationale": reason} | |
| decision = build_q_aware_decision( | |
| domain, | |
| intent, | |
| payload, | |
| risk_vector, | |
| actor=str(sample.get("actor", "openenv_eval")), | |
| session_id=str(sample.get("session_id", "openenv-eval")), | |
| service=str(sample.get("service", domain.lower())), | |
| environment=str(sample.get("environment", "production")), | |
| provided_evidence=list(sample.get("provided_evidence", [])), | |
| memory_context={}, | |
| ) | |
| action = str(decision.get("decision", "QUARANTINE")).upper() | |
| if action not in {"ALLOW", "BLOCK", "FORK", "QUARANTINE"}: | |
| action = "QUARANTINE" | |
| confidence = decision.get("confidence") | |
| if confidence is None: | |
| confidence = 0.50 + min(0.40, abs(risk - 0.50)) | |
| rationale = str(decision.get("explanation", "q-aware serving policy")) | |
| return {"action": action, "confidence": _clip(float(confidence)), "rationale": rationale} | |
| def _is_smoke_run(episodes: int) -> bool: | |
| return int(episodes) < DEFAULT_EPISODES | |
| def _evaluate_step_pass(action: str, is_malicious: bool) -> bool: | |
| if is_malicious: | |
| return action in TARGET_ACTIONS | |
| return action == "ALLOW" | |
| def evaluate_openenv_loop( | |
| *, | |
| policy_name: str = DEFAULT_POLICY, | |
| episodes: int = DEFAULT_EPISODES, | |
| episode_max_length: int = DEFAULT_EPISODE_MAX_LENGTH, | |
| seed: int = 42, | |
| ) -> dict[str, Any]: | |
| """Evaluate a policy through the actual multi-step environment loop.""" | |
| rng = random.Random(seed) | |
| episode_rows: list[dict[str, Any]] = [] | |
| step_rewards: list[float] = [] | |
| episode_rewards: list[float] = [] | |
| confidence_values: list[float] = [] | |
| total_reward = 0.0 | |
| total_steps = 0 | |
| malicious_steps = 0 | |
| benign_steps = 0 | |
| unsafe_allows = 0 | |
| safe_blocks = 0 | |
| safe_allows = 0 | |
| conservative_malicious_actions = 0 | |
| risky_chain_hits = 0 | |
| exact_match_count = 0 | |
| step_pass_count = 0 | |
| step_fail_count = 0 | |
| for episode in range(episodes): | |
| env = ShadowOpsOpenEnv(seed=seed + episode, episode_max_length=episode_max_length, persist_memory=False) | |
| observation = env.reset() | |
| done = False | |
| steps: list[dict[str, Any]] = [] | |
| episode_unsafe_allows = 0 | |
| episode_safe_blocks = 0 | |
| episode_exact = 0 | |
| episode_passes = 0 | |
| episode_fails = 0 | |
| while not done: | |
| incident = dict(observation.get("incident_state", {})) | |
| tier = str(incident.get("tier", "UNKNOWN")) | |
| payload = str(incident.get("payload", "")) | |
| decision = _choose_policy_decision(env, observation, policy_name, rng) | |
| action = str(decision["action"]) | |
| confidence = _clip(float(decision.get("confidence", 0.0))) | |
| observation, reward, done, info = env.step(action) | |
| total_reward += reward | |
| total_steps += 1 | |
| step_rewards.append(float(reward)) | |
| confidence_values.append(confidence) | |
| is_malicious = bool(info.get("is_malicious", False)) | |
| expected = _expected_action(tier, is_malicious) | |
| exact = action == expected | |
| pass_step = _evaluate_step_pass(action, is_malicious) | |
| if exact: | |
| exact_match_count += 1 | |
| episode_exact += 1 | |
| if pass_step: | |
| step_pass_count += 1 | |
| episode_passes += 1 | |
| else: | |
| step_fail_count += 1 | |
| episode_fails += 1 | |
| if is_malicious: | |
| malicious_steps += 1 | |
| if action == "ALLOW": | |
| unsafe_allows += 1 | |
| episode_unsafe_allows += 1 | |
| if action in TARGET_ACTIONS: | |
| conservative_malicious_actions += 1 | |
| else: | |
| benign_steps += 1 | |
| if action in TARGET_ACTIONS: | |
| safe_blocks += 1 | |
| episode_safe_blocks += 1 | |
| if action == "ALLOW": | |
| safe_allows += 1 | |
| if info.get("memory_context", {}).get("risky_chains"): | |
| risky_chain_hits += 1 | |
| steps.append( | |
| { | |
| "step": int(info.get("step", len(steps) + 1)), | |
| "domain": info.get("domain"), | |
| "tier": tier, | |
| "payload_excerpt": payload[:140], | |
| "action": action, | |
| "expected_action": expected, | |
| "action_correct": exact, | |
| "step_passed": pass_step, | |
| "reward": float(reward), | |
| "confidence": confidence, | |
| "is_malicious": is_malicious, | |
| "outcome": info.get("outcome"), | |
| "cumulative_risk_score": float(info.get("cumulative_risk_score", 0.0) or 0.0), | |
| "missing_evidence_count": len(info.get("missing_evidence", [])), | |
| "evidence_plan_steps": len(info.get("evidence_plan", [])), | |
| "policy_explanation": str(decision.get("explanation", ""))[:220], | |
| } | |
| ) | |
| state = env.state() | |
| episode_reward = float(state.get("episode_reward", 0.0) or 0.0) | |
| episode_rewards.append(episode_reward) | |
| episode_rows.append( | |
| { | |
| "episode": episode + 1, | |
| "episode_reward": episode_reward, | |
| "steps": steps, | |
| "unsafe_allow_steps": episode_unsafe_allows, | |
| "safe_block_steps": episode_safe_blocks, | |
| "exact_match_steps": episode_exact, | |
| "pass_steps": episode_passes, | |
| "fail_steps": episode_fails, | |
| "final_health": state.get("health", {}), | |
| } | |
| ) | |
| env.close() | |
| smoke_run = _is_smoke_run(episodes) | |
| run_label = "smoke_test" if smoke_run else f"full_eval_{episodes}_episodes" | |
| unsafe_allow_rate = unsafe_allows / max(malicious_steps, 1) | |
| safe_block_rate = safe_blocks / max(benign_steps, 1) | |
| return { | |
| "policy": policy_name, | |
| "seed": seed, | |
| "episodes": episodes, | |
| "episode_max_length": episode_max_length, | |
| "run_label": run_label, | |
| "is_smoke_test": smoke_run, | |
| "run_scope_note": ( | |
| "SMOKE TEST: fewer than 50 episodes; not a full judge run." | |
| if smoke_run | |
| else f"FULL EVAL: {episodes} episodes (judge-facing run size)." | |
| ), | |
| "total_steps": total_steps, | |
| "malicious_steps": malicious_steps, | |
| "benign_steps": benign_steps, | |
| "mean_reward_per_step": total_reward / max(total_steps, 1), | |
| "reward_summary_per_step": _reward_summary(step_rewards), | |
| "reward_summary_per_episode": _reward_summary(episode_rewards), | |
| "mean_episode_reward": float(statistics.mean(episode_rewards)) if episode_rewards else 0.0, | |
| "accuracy": exact_match_count / max(total_steps, 1), | |
| "unsafe_allow_count": unsafe_allows, | |
| "unsafe_allow_rate": unsafe_allow_rate, | |
| "unsafe_allow_rate_per_step": unsafe_allows / max(total_steps, 1), | |
| "safe_block_count": safe_blocks, | |
| "safe_block_rate": safe_block_rate, | |
| "safe_allow_count": safe_allows, | |
| "safe_allow_rate": safe_allows / max(benign_steps, 1), | |
| "malicious_block_or_quarantine_count": conservative_malicious_actions, | |
| "malicious_block_or_quarantine_rate": conservative_malicious_actions / max(malicious_steps, 1), | |
| "average_confidence": float(statistics.mean(confidence_values)) if confidence_values else 0.0, | |
| "confidence_summary": _confidence_summary(confidence_values), | |
| "step_pass_count": step_pass_count, | |
| "step_fail_count": step_fail_count, | |
| "risky_chain_signal_count": risky_chain_hits, | |
| "safety_adjusted_score": (total_reward / max(total_steps, 1)) - (unsafe_allow_rate * 50) - (safe_block_rate * 10) - ((step_fail_count / max(total_steps, 1)) * 5), | |
| "episodes_detail": episode_rows, | |
| } | |
| def _model_checkpoint_availability() -> dict[str, Any]: | |
| comparison_path = TRAINING_DIR / "model_policy_comparison.json" | |
| if not comparison_path.exists(): | |
| return { | |
| "comparison_file": str(comparison_path.relative_to(BACKEND_DIR)), | |
| "available": False, | |
| "note": "model_policy_comparison.json not found; checkpoint availability unknown.", | |
| } | |
| try: | |
| payload = json.loads(comparison_path.read_text(encoding="utf-8")) | |
| except json.JSONDecodeError: | |
| return { | |
| "comparison_file": str(comparison_path.relative_to(BACKEND_DIR)), | |
| "available": False, | |
| "note": "model_policy_comparison.json is unreadable; checkpoint availability unknown.", | |
| } | |
| rows = payload.get("datasets", {}).get("validation", {}).get("rows", []) | |
| grpo_row = next((row for row in rows if row.get("policy") == "grpo_model"), None) | |
| if grpo_row is None: | |
| return { | |
| "comparison_file": str(comparison_path.relative_to(BACKEND_DIR)), | |
| "available": False, | |
| "note": "No grpo_model row was found in model_policy_comparison.json.", | |
| } | |
| available = bool(grpo_row.get("available", False)) | |
| return { | |
| "comparison_file": str(comparison_path.relative_to(BACKEND_DIR)), | |
| "available": available, | |
| "note": ( | |
| "Measured grpo_model metrics are available." | |
| if available | |
| else "grpo_model row exists but metrics are not available in this repository snapshot." | |
| ), | |
| } | |
| def build_before_after_behavior_comparison( | |
| *, | |
| baseline_policy: str = DEFAULT_BASELINE_POLICY, | |
| target_policy: str = DEFAULT_POLICY, | |
| seed: int = 42, | |
| max_examples: int = DEFAULT_BEHAVIOR_EXAMPLES, | |
| ) -> dict[str, Any]: | |
| samples, _ = load_validation_samples_for_benchmark() | |
| rng = random.Random(seed + 17) | |
| rows: list[dict[str, Any]] = [] | |
| baseline_actions: list[str] = [] | |
| target_actions: list[str] = [] | |
| for sample in samples: | |
| baseline = _sample_policy_decision(sample, baseline_policy, rng) | |
| target = _sample_policy_decision(sample, target_policy, rng) | |
| expected = str(sample.get("correct_action") or sample.get("expected_decision") or "UNKNOWN") | |
| baseline_action = str(baseline["action"]) | |
| target_action = str(target["action"]) | |
| baseline_actions.append(baseline_action) | |
| target_actions.append(target_action) | |
| row = { | |
| "scenario_id": str(sample.get("sample_id", "")), | |
| "scenario": f"{sample.get('domain', 'UNKNOWN')}::{sample.get('intent', 'UNKNOWN')}", | |
| "scenario_summary": str(sample.get("raw_payload", ""))[:180], | |
| "expected_action": expected, | |
| "baseline_action": baseline_action, | |
| "qaware_action": target_action, | |
| "baseline_correct": baseline_action == expected, | |
| "trained_correct": target_action == expected, | |
| "baseline_confidence": round(float(baseline.get("confidence", 0.0) or 0.0), 3), | |
| "confidence": round(float(target.get("confidence", 0.0) or 0.0), 3), | |
| "baseline_failure_reason": str(baseline.get("rationale", ""))[:200] if baseline_action != expected else "", | |
| "qaware_success_reason": str(target.get("rationale", ""))[:200] if target_action == expected else "", | |
| "risk_score": round(float(sample.get("risk_score", 0.0) or 0.0), 4), | |
| "missing_evidence": list(sample.get("missing_evidence", [])), | |
| "evidence_plan": list(sample.get("evidence_plan", [])), | |
| "safe_outcome": target_action in ("BLOCK", "FORK", "QUARANTINE") and expected in ("BLOCK", "FORK", "QUARANTINE") or target_action == "ALLOW" and expected == "ALLOW", | |
| } | |
| rows.append(row) | |
| baseline_metrics = evaluate_outputs(samples, baseline_actions, label=f"{baseline_policy}_baseline") | |
| target_metrics = evaluate_outputs(samples, target_actions, label=f"{target_policy}_serving") | |
| deltas = { | |
| "exact_match_delta": float(target_metrics.get("exact_match", 0.0) - baseline_metrics.get("exact_match", 0.0)), | |
| "safety_accuracy_delta": float( | |
| target_metrics.get("safety_accuracy", 0.0) - baseline_metrics.get("safety_accuracy", 0.0) | |
| ), | |
| "unsafe_decision_rate_delta": float( | |
| target_metrics.get("unsafe_decision_rate", 0.0) - baseline_metrics.get("unsafe_decision_rate", 0.0) | |
| ), | |
| "reward_mean_delta": float(target_metrics.get("reward_mean", 0.0) - baseline_metrics.get("reward_mean", 0.0)), | |
| } | |
| differing = [row for row in rows if row["baseline_action"] != row["qaware_action"]] | |
| differing.sort( | |
| key=lambda row: ( | |
| int(row["trained_correct"]) - int(row["baseline_correct"]), | |
| row["risk_score"], | |
| ), | |
| reverse=True, | |
| ) | |
| selected = differing[: max(0, max_examples)] | |
| if len(selected) < max_examples: | |
| selected_ids = {row["scenario_id"] for row in selected} | |
| fallback = [ | |
| row | |
| for row in sorted(rows, key=lambda item: abs(float(item["risk_score"]) - 0.5), reverse=True) | |
| if row["scenario_id"] not in selected_ids | |
| ] | |
| selected.extend(fallback[: max_examples - len(selected)]) | |
| checkpoint_status = _model_checkpoint_availability() | |
| return { | |
| "title": "ShadowOps before/after behavior comparison", | |
| "comparison_type": "baseline_vs_serving_policy", | |
| "baseline_policy": baseline_policy, | |
| "target_policy": target_policy, | |
| "sample_source": str((TRAINING_DIR / "qwen3_val_dataset.json").relative_to(BACKEND_DIR)), | |
| "sample_count": len(samples), | |
| "checkpoint_status": checkpoint_status, | |
| "note": ( | |
| "Target policy is serving-time q_aware logic. This file does not claim checkpoint training gains " | |
| "unless checkpoint_status.available is true." | |
| ), | |
| "aggregate": { | |
| "baseline": { | |
| "exact_match": baseline_metrics.get("exact_match", 0.0), | |
| "safety_accuracy": baseline_metrics.get("safety_accuracy", 0.0), | |
| "unsafe_decision_rate": baseline_metrics.get("unsafe_decision_rate", 0.0), | |
| "reward_mean": baseline_metrics.get("reward_mean", 0.0), | |
| }, | |
| "trained_or_serving": { | |
| "exact_match": target_metrics.get("exact_match", 0.0), | |
| "safety_accuracy": target_metrics.get("safety_accuracy", 0.0), | |
| "unsafe_decision_rate": target_metrics.get("unsafe_decision_rate", 0.0), | |
| "reward_mean": target_metrics.get("reward_mean", 0.0), | |
| }, | |
| "delta_target_minus_baseline": deltas, | |
| }, | |
| "examples": selected, | |
| } | |
| def _png_chunk(kind: bytes, data: bytes) -> bytes: | |
| return struct.pack(">I", len(data)) + kind + data + struct.pack(">I", zlib.crc32(kind + data) & 0xFFFFFFFF) | |
| def _write_png_rgb(path: Path, width: int, height: int, pixels: list[list[tuple[int, int, int]]]) -> None: | |
| raw = bytearray() | |
| for row in pixels: | |
| raw.append(0) | |
| for r, g, b in row: | |
| raw.extend((r, g, b)) | |
| payload = b"".join( | |
| [ | |
| b"\x89PNG\r\n\x1a\n", | |
| _png_chunk(b"IHDR", struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)), | |
| _png_chunk(b"IDAT", zlib.compress(bytes(raw), level=9)), | |
| _png_chunk(b"IEND", b""), | |
| ] | |
| ) | |
| path.write_bytes(payload) | |
| def _draw_line( | |
| pixels: list[list[tuple[int, int, int]]], | |
| x0: int, | |
| y0: int, | |
| x1: int, | |
| y1: int, | |
| color: tuple[int, int, int], | |
| ) -> None: | |
| width = len(pixels[0]) | |
| height = len(pixels) | |
| dx = abs(x1 - x0) | |
| dy = -abs(y1 - y0) | |
| sx = 1 if x0 < x1 else -1 | |
| sy = 1 if y0 < y1 else -1 | |
| err = dx + dy | |
| while True: | |
| if 0 <= x0 < width and 0 <= y0 < height: | |
| pixels[y0][x0] = color | |
| if x0 == x1 and y0 == y1: | |
| break | |
| e2 = 2 * err | |
| if e2 >= dy: | |
| err += dy | |
| x0 += sx | |
| if e2 <= dx: | |
| err += dx | |
| y0 += sy | |
| def _write_episode_reward_plot( | |
| *, | |
| baseline_rewards: list[float], | |
| target_rewards: list[float], | |
| output_path: Path, | |
| ) -> None: | |
| width, height = 960, 420 | |
| margin_left, margin_right, margin_top, margin_bottom = 56, 20, 16, 34 | |
| pixels = [[(250, 252, 255) for _ in range(width)] for _ in range(height)] | |
| axis = (70, 82, 102) | |
| grid = (228, 233, 242) | |
| for y in range(margin_top, height - margin_bottom): | |
| pixels[y][margin_left] = axis | |
| for x in range(margin_left, width - margin_right): | |
| pixels[height - margin_bottom][x] = axis | |
| for line in range(1, 5): | |
| y = margin_top + int((height - margin_top - margin_bottom) * line / 5) | |
| for x in range(margin_left + 1, width - margin_right): | |
| pixels[y][x] = grid | |
| all_values = [float(v) for v in baseline_rewards + target_rewards] | |
| if not all_values: | |
| _write_png_rgb(output_path, width, height, pixels) | |
| return | |
| min_v = min(all_values) | |
| max_v = max(all_values) | |
| if abs(max_v - min_v) < 1e-9: | |
| max_v = min_v + 1.0 | |
| plot_w = width - margin_left - margin_right | |
| plot_h = height - margin_top - margin_bottom | |
| max_index = max(len(baseline_rewards), len(target_rewards)) - 1 | |
| max_index = max(max_index, 1) | |
| def map_point(index: int, value: float) -> tuple[int, int]: | |
| x = margin_left + int(index * plot_w / max_index) | |
| y = margin_top + int((1.0 - ((value - min_v) / (max_v - min_v))) * plot_h) | |
| return x, y | |
| baseline_color = (213, 78, 76) | |
| target_color = (46, 172, 104) | |
| if len(baseline_rewards) >= 2: | |
| baseline_points = [map_point(i, float(v)) for i, v in enumerate(baseline_rewards)] | |
| for (x0, y0), (x1, y1) in zip(baseline_points, baseline_points[1:]): | |
| _draw_line(pixels, x0, y0, x1, y1, baseline_color) | |
| if len(target_rewards) >= 2: | |
| target_points = [map_point(i, float(v)) for i, v in enumerate(target_rewards)] | |
| for (x0, y0), (x1, y1) in zip(target_points, target_points[1:]): | |
| _draw_line(pixels, x0, y0, x1, y1, target_color) | |
| _write_png_rgb(output_path, width, height, pixels) | |
| def _write_behavior_comparison( | |
| comparison: dict[str, Any], | |
| *, | |
| output_dir: Path, | |
| ) -> dict[str, str]: | |
| json_path = output_dir / "openenv_behavior_comparison.json" | |
| md_path = output_dir / "openenv_behavior_comparison.md" | |
| write_json(json_path, comparison) | |
| lines = [ | |
| "# ShadowOps Before/After Behavior Comparison", | |
| "", | |
| f"- Baseline policy: {comparison['baseline_policy']}", | |
| f"- Target policy: {comparison['target_policy']}", | |
| f"- Sample source: `{comparison['sample_source']}`", | |
| f"- Samples: {comparison['sample_count']}", | |
| f"- Checkpoint availability: {comparison['checkpoint_status']['available']}", | |
| f"- Checkpoint note: {comparison['checkpoint_status']['note']}", | |
| "", | |
| "## Aggregate Metrics", | |
| "", | |
| "| Metric | Baseline | Target/Serving | Delta (target-baseline) |", | |
| "| --- | ---: | ---: | ---: |", | |
| f"| Exact match | {comparison['aggregate']['baseline']['exact_match']:.3f} | {comparison['aggregate']['trained_or_serving']['exact_match']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['exact_match_delta']:+.3f} |", | |
| f"| Safety accuracy | {comparison['aggregate']['baseline']['safety_accuracy']:.3f} | {comparison['aggregate']['trained_or_serving']['safety_accuracy']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['safety_accuracy_delta']:+.3f} |", | |
| f"| Unsafe decision rate | {comparison['aggregate']['baseline']['unsafe_decision_rate']:.3f} | {comparison['aggregate']['trained_or_serving']['unsafe_decision_rate']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['unsafe_decision_rate_delta']:+.3f} |", | |
| f"| Reward mean | {comparison['aggregate']['baseline']['reward_mean']:.3f} | {comparison['aggregate']['trained_or_serving']['reward_mean']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['reward_mean_delta']:+.3f} |", | |
| "", | |
| "## Representative Scenarios", | |
| "", | |
| "| Scenario ID | Scenario Summary | Expected Action | Baseline Action | Q-Aware Action | Failure Reason (Baseline) | Success Reason (Q-Aware) | Risk Score | Confidence | Safe Outcome |", | |
| "| --- | --- | --- | --- | --- | --- | --- | ---: | ---: | --- |", | |
| ] | |
| for row in comparison["examples"]: | |
| baseline_rationale = str(row["baseline_failure_reason"]).replace("|", "/") | |
| trained_rationale = str(row["qaware_success_reason"]).replace("|", "/") | |
| lines.append( | |
| f"| `{row['scenario_id']}` | {row['scenario_summary']} | {row['expected_action']} | " | |
| f"{row['baseline_action']} | {row['qaware_action']} | " | |
| f"{baseline_rationale} | {trained_rationale} | " | |
| f"{row['risk_score']:.3f} | {row['confidence']:.3f} | {row['safe_outcome']} |" | |
| ) | |
| lines.extend(["", "## Note", "", comparison["note"]]) | |
| md_path.write_text("\n".join(lines), encoding="utf-8") | |
| return {"json": json_path.name, "md": md_path.name} | |
| def write_openenv_report(report: dict[str, Any], output_dir: Path = REPORTS_DIR) -> None: | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| json_path = output_dir / "openenv_loop_eval.json" | |
| md_path = output_dir / "openenv_loop_eval.md" | |
| write_json(json_path, report) | |
| reward_plot = report.get("artifacts", {}).get("episode_reward_plot", "openenv_episode_rewards.png") | |
| behavior_md = report.get("artifacts", {}).get("behavior_comparison_md", "openenv_behavior_comparison.md") | |
| lines = [ | |
| "# ShadowOps OpenEnv Loop Evaluation", | |
| "", | |
| f"- Policy evaluated: {report['policy']}", | |
| f"- Baseline policy for comparison: {report['baseline_policy']}", | |
| f"- Episodes: {report['episodes']}", | |
| f"- Episode max length: {report['episode_max_length']}", | |
| f"- Seed: {report['seed']}", | |
| f"- Run label: {report['run_label']}", | |
| f"- Scope note: {report['run_scope_note']}", | |
| "", | |
| "## Core Metrics", | |
| "", | |
| "| Metric | Value |", | |
| "| --- | ---: |", | |
| f"| Total steps | {report['total_steps']} |", | |
| f"| Malicious steps | {report['malicious_steps']} |", | |
| f"| Benign steps | {report['benign_steps']} |", | |
| f"| Accuracy | {report['accuracy']:.3f} |", | |
| f"| Unsafe allow rate (malicious-only) | {report['unsafe_allow_rate']:.3f} |", | |
| f"| Safe block rate (benign blocked/forked/quarantined) | {report['safe_block_rate']:.3f} |", | |
| f"| Average confidence | {report['average_confidence']:.3f} |", | |
| f"| Mean reward per step | {report['mean_reward_per_step']:.3f} |", | |
| f"| Step pass count | {report['step_pass_count']} |", | |
| f"| Step fail count | {report['step_fail_count']} |", | |
| "", | |
| "## Reward Summary", | |
| "", | |
| f"- Per-step reward mean/median/std: {report['reward_summary_per_step']['mean']:.3f} / {report['reward_summary_per_step']['median']:.3f} / {report['reward_summary_per_step']['std']:.3f}", | |
| f"- Per-episode reward mean/median/std: {report['reward_summary_per_episode']['mean']:.3f} / {report['reward_summary_per_episode']['median']:.3f} / {report['reward_summary_per_episode']['std']:.3f}", | |
| f"- Per-episode reward min/max: {report['reward_summary_per_episode']['min']:.3f} / {report['reward_summary_per_episode']['max']:.3f}", | |
| "", | |
| "## Before vs After Aggregate", | |
| "", | |
| "| Metric | Baseline | Target | Delta (target-baseline) |", | |
| "| --- | ---: | ---: | ---: |", | |
| f"| Unsafe allow rate | {report['baseline_summary']['unsafe_allow_rate']:.3f} | {report['unsafe_allow_rate']:.3f} | {report['comparison_delta']['unsafe_allow_rate_delta']:+.3f} |", | |
| f"| Safe block rate | {report['baseline_summary']['safe_block_rate']:.3f} | {report['safe_block_rate']:.3f} | {report['comparison_delta']['safe_block_rate_delta']:+.3f} |", | |
| f"| Average confidence | {report['baseline_summary']['average_confidence']:.3f} | {report['average_confidence']:.3f} | {report['comparison_delta']['average_confidence_delta']:+.3f} |", | |
| f"| Mean reward/step | {report['baseline_summary']['mean_reward_per_step']:.3f} | {report['mean_reward_per_step']:.3f} | {report['comparison_delta']['mean_reward_per_step_delta']:+.3f} |", | |
| f"| Safety Adjusted Score | {report['baseline_summary']['safety_adjusted_score']:.3f} | {report['safety_adjusted_score']:.3f} | {report['comparison_delta'].get('safety_adjusted_score_delta', 0.0):+.3f} |", | |
| "", | |
| "## Safety vs Reward Trade-Off", | |
| "", | |
| "- **Note on Reward vs Safety**: The `heuristic` baseline may occasionally have a higher `mean_reward_per_step` due to faster resolution times.", | |
| "- However, **Q-aware is considered safer** when its `unsafe_allow_rate = 0.000`. Unsafe allow is the primary failure mode in security automation and carries severe negative business impact.", | |
| "- Lower confidence scores in Q-aware do not necessarily mean failure; they often reflect **cautious uncertainty** on ambiguous payloads, which correctly triggers QUARANTINE instead of false-positive blocks or dangerous allows.", | |
| "", | |
| "## Representative Behavior", | |
| "", | |
| f"See `{behavior_md}` for 5-10 structured before/after scenarios.", | |
| "", | |
| "| Scenario | Baseline | Target | Baseline correct | Target correct |", | |
| "| --- | --- | --- | --- | --- |", | |
| ] | |
| for row in report["behavior_examples"]: | |
| lines.append( | |
| f"| {row['scenario']} | {row['baseline_action']} | {row['qaware_action']} | " | |
| f"{row['baseline_correct']} | {row['trained_correct']} |" | |
| ) | |
| lines.extend( | |
| [ | |
| "", | |
| "## Episode Summary", | |
| "", | |
| "| Episode | Reward | Steps | Unsafe allows | Safe blocks | Pass | Fail | Final SOC | Final GitHub | Final AWS |", | |
| "| ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", | |
| ] | |
| ) | |
| for row in report["episodes_detail"]: | |
| health = row.get("final_health", {}) | |
| lines.append( | |
| f"| {row['episode']} | {float(row['episode_reward']):.3f} | {len(row['steps'])} | " | |
| f"{row['unsafe_allow_steps']} | {row['safe_block_steps']} | {row['pass_steps']} | {row['fail_steps']} | " | |
| f"{health.get('SOC', 0)} | {health.get('GITHUB', 0)} | {health.get('AWS', 0)} |" | |
| ) | |
| lines.extend( | |
| [ | |
| "", | |
| "## Plot", | |
| "", | |
| f"- Episode reward trend plot: `{reward_plot}`", | |
| "- Color mapping in the plot: red=baseline policy, green=target policy.", | |
| ] | |
| ) | |
| md_path.write_text("\n".join(lines), encoding="utf-8") | |
| def generate_openenv_report( | |
| output_dir: Path = REPORTS_DIR, | |
| *, | |
| policy_name: str = DEFAULT_POLICY, | |
| baseline_policy: str = DEFAULT_BASELINE_POLICY, | |
| episodes: int = DEFAULT_EPISODES, | |
| episode_max_length: int = DEFAULT_EPISODE_MAX_LENGTH, | |
| seed: int = 42, | |
| behavior_examples: int = DEFAULT_BEHAVIOR_EXAMPLES, | |
| ) -> dict[str, Any]: | |
| baseline_report = evaluate_openenv_loop( | |
| policy_name=baseline_policy, | |
| episodes=episodes, | |
| episode_max_length=episode_max_length, | |
| seed=seed, | |
| ) | |
| target_report = evaluate_openenv_loop( | |
| policy_name=policy_name, | |
| episodes=episodes, | |
| episode_max_length=episode_max_length, | |
| seed=seed, | |
| ) | |
| comparison_delta = { | |
| "unsafe_allow_rate_delta": float(target_report["unsafe_allow_rate"] - baseline_report["unsafe_allow_rate"]), | |
| "safe_block_rate_delta": float(target_report["safe_block_rate"] - baseline_report["safe_block_rate"]), | |
| "average_confidence_delta": float(target_report["average_confidence"] - baseline_report["average_confidence"]), | |
| "mean_reward_per_step_delta": float( | |
| target_report["mean_reward_per_step"] - baseline_report["mean_reward_per_step"] | |
| ), | |
| "safety_adjusted_score_delta": float( | |
| target_report["safety_adjusted_score"] - baseline_report["safety_adjusted_score"] | |
| ), | |
| } | |
| comparison = build_before_after_behavior_comparison( | |
| baseline_policy=baseline_policy, | |
| target_policy=policy_name, | |
| seed=seed, | |
| max_examples=max(5, min(10, behavior_examples)), | |
| ) | |
| behavior_files = _write_behavior_comparison(comparison, output_dir=output_dir) | |
| plots_dir = TRAINING_DIR / "plots" | |
| plots_dir.mkdir(parents=True, exist_ok=True) | |
| reward_plot_path = plots_dir / "openenv_episode_rewards.png" | |
| _write_episode_reward_plot( | |
| baseline_rewards=[float(row["episode_reward"]) for row in baseline_report["episodes_detail"]], | |
| target_rewards=[float(row["episode_reward"]) for row in target_report["episodes_detail"]], | |
| output_path=reward_plot_path, | |
| ) | |
| report = dict(target_report) | |
| report["baseline_policy"] = baseline_policy | |
| report["baseline_summary"] = { | |
| "unsafe_allow_rate": baseline_report["unsafe_allow_rate"], | |
| "safe_block_rate": baseline_report["safe_block_rate"], | |
| "average_confidence": baseline_report["average_confidence"], | |
| "mean_reward_per_step": baseline_report["mean_reward_per_step"], | |
| "safety_adjusted_score": baseline_report.get("safety_adjusted_score", 0.0), | |
| } | |
| report["comparison_delta"] = comparison_delta | |
| report["behavior_examples"] = comparison["examples"] | |
| report["checkpoint_status"] = comparison["checkpoint_status"] | |
| report["artifacts"] = { | |
| "episode_reward_plot": reward_plot_path.name, | |
| "behavior_comparison_json": behavior_files["json"], | |
| "behavior_comparison_md": behavior_files["md"], | |
| } | |
| write_openenv_report(report, output_dir) | |
| return report | |
| def _parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="ShadowOps OpenEnv evaluation runner") | |
| parser.add_argument("--policy", default=DEFAULT_POLICY, choices=SUPPORTED_POLICIES, help="Target policy") | |
| parser.add_argument( | |
| "--baseline-policy", | |
| default=DEFAULT_BASELINE_POLICY, | |
| choices=SUPPORTED_POLICIES, | |
| help="Baseline policy for before/after comparison", | |
| ) | |
| parser.add_argument("--episodes", type=int, default=DEFAULT_EPISODES, help="Number of episodes to evaluate") | |
| parser.add_argument( | |
| "--episode-max-length", | |
| type=int, | |
| default=DEFAULT_EPISODE_MAX_LENGTH, | |
| help="Max steps per episode", | |
| ) | |
| parser.add_argument("--seed", type=int, default=42, help="Random seed") | |
| parser.add_argument( | |
| "--behavior-examples", | |
| type=int, | |
| default=DEFAULT_BEHAVIOR_EXAMPLES, | |
| help="Representative scenarios to keep in before/after summary (5-10 recommended)", | |
| ) | |
| parser.add_argument( | |
| "--output-dir", | |
| type=Path, | |
| default=REPORTS_DIR, | |
| help="Output directory for openenv_loop_eval artifacts", | |
| ) | |
| return parser.parse_args() | |
| def main() -> int: | |
| args = _parse_args() | |
| report = generate_openenv_report( | |
| output_dir=args.output_dir, | |
| policy_name=args.policy, | |
| baseline_policy=args.baseline_policy, | |
| episodes=max(1, int(args.episodes)), | |
| episode_max_length=max(1, int(args.episode_max_length)), | |
| seed=int(args.seed), | |
| behavior_examples=max(1, int(args.behavior_examples)), | |
| ) | |
| output_dir = args.output_dir | |
| print(f"OpenEnv episodes: {report['episodes']}") | |
| print(f"OpenEnv run label: {report['run_label']}") | |
| print(f"OpenEnv unsafe allow rate: {report['unsafe_allow_rate']:.3f}") | |
| print(f"Saved: {(output_dir / 'openenv_loop_eval.json').relative_to(BACKEND_DIR)}") | |
| print(f"Saved: {(output_dir / 'openenv_loop_eval.md').relative_to(BACKEND_DIR)}") | |
| print(f"Saved: {(output_dir / 'openenv_behavior_comparison.json').relative_to(BACKEND_DIR)}") | |
| print(f"Saved: {(output_dir / 'openenv_behavior_comparison.md').relative_to(BACKEND_DIR)}") | |
| print(f"Saved: {(output_dir / 'openenv_episode_rewards.png').relative_to(BACKEND_DIR)}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |