ShadowOps Deploy
Final deploy: Monolithic ShadowOps app + Training Scripts
d064478
"""Laptop-safe OpenEnv loop evaluation for ShadowOps."""
from __future__ import annotations
import argparse
import json
import random
import statistics
import struct
import sys
import zlib
from pathlib import Path
from typing import Any
BACKEND_DIR = Path(__file__).resolve().parents[1]
TRAINING_DIR = BACKEND_DIR / "training"
REPORTS_DIR = TRAINING_DIR / "reports"
if str(BACKEND_DIR) not in sys.path:
sys.path.insert(0, str(BACKEND_DIR))
from openenv_shadowops import ShadowOpsOpenEnv # noqa: E402
from training.shadowops_training_common import ( # noqa: E402
build_q_aware_decision,
compute_risk_score,
evaluate_outputs,
load_validation_samples_for_benchmark,
write_json,
)
DEFAULT_EPISODES = 50
DEFAULT_EPISODE_MAX_LENGTH = 5
DEFAULT_POLICY = "q_aware"
DEFAULT_BASELINE_POLICY = "heuristic"
DEFAULT_BEHAVIOR_EXAMPLES = 8
TARGET_ACTIONS = {"BLOCK", "FORK", "QUARANTINE"}
TIER_EXPECTED_ACTION = {
"BENIGN_CLEAN": "ALLOW",
"BENIGN_NOISY": "ALLOW",
"AMBIGUOUS": "QUARANTINE",
"MALICIOUS_SOFT": "BLOCK",
"MALICIOUS_HARD": "FORK",
}
SUPPORTED_POLICIES = ("random", "heuristic", "q_aware")
def _clip(value: float, low: float = 0.0, high: float = 1.0) -> float:
return max(low, min(high, float(value)))
def _confidence_summary(values: list[float]) -> dict[str, float]:
if not values:
return {"mean": 0.0, "min": 0.0, "max": 0.0, "std": 0.0}
if len(values) == 1:
std = 0.0
else:
std = float(statistics.pstdev(values))
return {
"mean": float(statistics.mean(values)),
"min": float(min(values)),
"max": float(max(values)),
"std": std,
}
def _reward_summary(values: list[float]) -> dict[str, float]:
if not values:
return {"mean": 0.0, "median": 0.0, "min": 0.0, "max": 0.0, "std": 0.0}
if len(values) == 1:
std = 0.0
else:
std = float(statistics.pstdev(values))
return {
"mean": float(statistics.mean(values)),
"median": float(statistics.median(values)),
"min": float(min(values)),
"max": float(max(values)),
"std": std,
}
def _heuristic_action_and_confidence(risk: float, tier: str) -> tuple[str, float, str]:
tier = str(tier or "").upper()
if risk >= 0.62:
confidence = 0.60 + min(0.35, (risk - 0.62) * 1.25)
return "BLOCK", _clip(confidence), "heuristic high-risk threshold"
if risk >= 0.38 or tier == "AMBIGUOUS":
margin = abs(risk - 0.50)
confidence = 0.56 + min(0.22, margin * 1.4)
return "FORK", _clip(confidence), "heuristic uncertainty escalation"
confidence = 0.58 + min(0.30, (0.38 - risk) * 1.2)
return "ALLOW", _clip(confidence), "heuristic low-risk allow"
def _expected_action(tier: str, is_malicious: bool) -> str:
tier_name = str(tier or "").upper()
if tier_name in TIER_EXPECTED_ACTION:
return TIER_EXPECTED_ACTION[tier_name]
return "FORK" if is_malicious else "ALLOW"
def _choose_policy_decision(
env: ShadowOpsOpenEnv,
observation: dict[str, Any],
policy_name: str,
rng: random.Random,
) -> dict[str, Any]:
actions = list(observation.get("available_actions") or ["ALLOW", "BLOCK", "FORK", "QUARANTINE"])
incident = observation.get("incident_state", {})
risk_vector = list(observation.get("risk_vector", [0.0] * 16))
risk = float(compute_risk_score(risk_vector))
if policy_name == "random":
action = rng.choice(actions)
return {
"action": action,
"confidence": 0.25,
"explanation": "random baseline action",
}
if policy_name == "heuristic":
action, confidence, reason = _heuristic_action_and_confidence(risk, str(incident.get("tier", "")))
return {
"action": action,
"confidence": confidence,
"explanation": reason,
}
decision = build_q_aware_decision(
incident.get("domain", "SOC"),
incident.get("intent", "UNKNOWN"),
incident.get("payload", ""),
risk_vector,
actor="openenv_agent",
session_id=env.session_id,
service=incident.get("domain", "unknown"),
environment="production",
provided_evidence=[],
memory_context=env.state().get("memory_context", {}),
)
action = str(decision.get("decision", "QUARANTINE")).upper()
if action not in actions:
action = "QUARANTINE"
confidence = decision.get("confidence")
if confidence is None:
confidence = 0.50 + min(0.40, abs(risk - 0.50))
return {
"action": action,
"confidence": _clip(float(confidence)),
"explanation": str(decision.get("explanation", "q-aware policy decision")).strip() or "q-aware policy decision",
}
def _sample_policy_decision(
sample: dict[str, Any],
policy_name: str,
rng: random.Random,
) -> dict[str, Any]:
risk_vector = list(sample.get("risk_vector") or [0.0] * 16)
risk = float(sample.get("risk_score", compute_risk_score(risk_vector)))
domain = str(sample.get("domain", "SOC"))
intent = str(sample.get("intent", "UNKNOWN"))
payload = str(sample.get("raw_payload", ""))
tier = str(sample.get("scenario_type", ""))
if policy_name == "random":
action = rng.choice(["ALLOW", "BLOCK", "FORK", "QUARANTINE"])
return {"action": action, "confidence": 0.25, "rationale": "random baseline"}
if policy_name == "heuristic":
action, confidence, reason = _heuristic_action_and_confidence(risk, tier)
return {"action": action, "confidence": confidence, "rationale": reason}
decision = build_q_aware_decision(
domain,
intent,
payload,
risk_vector,
actor=str(sample.get("actor", "openenv_eval")),
session_id=str(sample.get("session_id", "openenv-eval")),
service=str(sample.get("service", domain.lower())),
environment=str(sample.get("environment", "production")),
provided_evidence=list(sample.get("provided_evidence", [])),
memory_context={},
)
action = str(decision.get("decision", "QUARANTINE")).upper()
if action not in {"ALLOW", "BLOCK", "FORK", "QUARANTINE"}:
action = "QUARANTINE"
confidence = decision.get("confidence")
if confidence is None:
confidence = 0.50 + min(0.40, abs(risk - 0.50))
rationale = str(decision.get("explanation", "q-aware serving policy"))
return {"action": action, "confidence": _clip(float(confidence)), "rationale": rationale}
def _is_smoke_run(episodes: int) -> bool:
return int(episodes) < DEFAULT_EPISODES
def _evaluate_step_pass(action: str, is_malicious: bool) -> bool:
if is_malicious:
return action in TARGET_ACTIONS
return action == "ALLOW"
def evaluate_openenv_loop(
*,
policy_name: str = DEFAULT_POLICY,
episodes: int = DEFAULT_EPISODES,
episode_max_length: int = DEFAULT_EPISODE_MAX_LENGTH,
seed: int = 42,
) -> dict[str, Any]:
"""Evaluate a policy through the actual multi-step environment loop."""
rng = random.Random(seed)
episode_rows: list[dict[str, Any]] = []
step_rewards: list[float] = []
episode_rewards: list[float] = []
confidence_values: list[float] = []
total_reward = 0.0
total_steps = 0
malicious_steps = 0
benign_steps = 0
unsafe_allows = 0
safe_blocks = 0
safe_allows = 0
conservative_malicious_actions = 0
risky_chain_hits = 0
exact_match_count = 0
step_pass_count = 0
step_fail_count = 0
for episode in range(episodes):
env = ShadowOpsOpenEnv(seed=seed + episode, episode_max_length=episode_max_length, persist_memory=False)
observation = env.reset()
done = False
steps: list[dict[str, Any]] = []
episode_unsafe_allows = 0
episode_safe_blocks = 0
episode_exact = 0
episode_passes = 0
episode_fails = 0
while not done:
incident = dict(observation.get("incident_state", {}))
tier = str(incident.get("tier", "UNKNOWN"))
payload = str(incident.get("payload", ""))
decision = _choose_policy_decision(env, observation, policy_name, rng)
action = str(decision["action"])
confidence = _clip(float(decision.get("confidence", 0.0)))
observation, reward, done, info = env.step(action)
total_reward += reward
total_steps += 1
step_rewards.append(float(reward))
confidence_values.append(confidence)
is_malicious = bool(info.get("is_malicious", False))
expected = _expected_action(tier, is_malicious)
exact = action == expected
pass_step = _evaluate_step_pass(action, is_malicious)
if exact:
exact_match_count += 1
episode_exact += 1
if pass_step:
step_pass_count += 1
episode_passes += 1
else:
step_fail_count += 1
episode_fails += 1
if is_malicious:
malicious_steps += 1
if action == "ALLOW":
unsafe_allows += 1
episode_unsafe_allows += 1
if action in TARGET_ACTIONS:
conservative_malicious_actions += 1
else:
benign_steps += 1
if action in TARGET_ACTIONS:
safe_blocks += 1
episode_safe_blocks += 1
if action == "ALLOW":
safe_allows += 1
if info.get("memory_context", {}).get("risky_chains"):
risky_chain_hits += 1
steps.append(
{
"step": int(info.get("step", len(steps) + 1)),
"domain": info.get("domain"),
"tier": tier,
"payload_excerpt": payload[:140],
"action": action,
"expected_action": expected,
"action_correct": exact,
"step_passed": pass_step,
"reward": float(reward),
"confidence": confidence,
"is_malicious": is_malicious,
"outcome": info.get("outcome"),
"cumulative_risk_score": float(info.get("cumulative_risk_score", 0.0) or 0.0),
"missing_evidence_count": len(info.get("missing_evidence", [])),
"evidence_plan_steps": len(info.get("evidence_plan", [])),
"policy_explanation": str(decision.get("explanation", ""))[:220],
}
)
state = env.state()
episode_reward = float(state.get("episode_reward", 0.0) or 0.0)
episode_rewards.append(episode_reward)
episode_rows.append(
{
"episode": episode + 1,
"episode_reward": episode_reward,
"steps": steps,
"unsafe_allow_steps": episode_unsafe_allows,
"safe_block_steps": episode_safe_blocks,
"exact_match_steps": episode_exact,
"pass_steps": episode_passes,
"fail_steps": episode_fails,
"final_health": state.get("health", {}),
}
)
env.close()
smoke_run = _is_smoke_run(episodes)
run_label = "smoke_test" if smoke_run else f"full_eval_{episodes}_episodes"
unsafe_allow_rate = unsafe_allows / max(malicious_steps, 1)
safe_block_rate = safe_blocks / max(benign_steps, 1)
return {
"policy": policy_name,
"seed": seed,
"episodes": episodes,
"episode_max_length": episode_max_length,
"run_label": run_label,
"is_smoke_test": smoke_run,
"run_scope_note": (
"SMOKE TEST: fewer than 50 episodes; not a full judge run."
if smoke_run
else f"FULL EVAL: {episodes} episodes (judge-facing run size)."
),
"total_steps": total_steps,
"malicious_steps": malicious_steps,
"benign_steps": benign_steps,
"mean_reward_per_step": total_reward / max(total_steps, 1),
"reward_summary_per_step": _reward_summary(step_rewards),
"reward_summary_per_episode": _reward_summary(episode_rewards),
"mean_episode_reward": float(statistics.mean(episode_rewards)) if episode_rewards else 0.0,
"accuracy": exact_match_count / max(total_steps, 1),
"unsafe_allow_count": unsafe_allows,
"unsafe_allow_rate": unsafe_allow_rate,
"unsafe_allow_rate_per_step": unsafe_allows / max(total_steps, 1),
"safe_block_count": safe_blocks,
"safe_block_rate": safe_block_rate,
"safe_allow_count": safe_allows,
"safe_allow_rate": safe_allows / max(benign_steps, 1),
"malicious_block_or_quarantine_count": conservative_malicious_actions,
"malicious_block_or_quarantine_rate": conservative_malicious_actions / max(malicious_steps, 1),
"average_confidence": float(statistics.mean(confidence_values)) if confidence_values else 0.0,
"confidence_summary": _confidence_summary(confidence_values),
"step_pass_count": step_pass_count,
"step_fail_count": step_fail_count,
"risky_chain_signal_count": risky_chain_hits,
"safety_adjusted_score": (total_reward / max(total_steps, 1)) - (unsafe_allow_rate * 50) - (safe_block_rate * 10) - ((step_fail_count / max(total_steps, 1)) * 5),
"episodes_detail": episode_rows,
}
def _model_checkpoint_availability() -> dict[str, Any]:
comparison_path = TRAINING_DIR / "model_policy_comparison.json"
if not comparison_path.exists():
return {
"comparison_file": str(comparison_path.relative_to(BACKEND_DIR)),
"available": False,
"note": "model_policy_comparison.json not found; checkpoint availability unknown.",
}
try:
payload = json.loads(comparison_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {
"comparison_file": str(comparison_path.relative_to(BACKEND_DIR)),
"available": False,
"note": "model_policy_comparison.json is unreadable; checkpoint availability unknown.",
}
rows = payload.get("datasets", {}).get("validation", {}).get("rows", [])
grpo_row = next((row for row in rows if row.get("policy") == "grpo_model"), None)
if grpo_row is None:
return {
"comparison_file": str(comparison_path.relative_to(BACKEND_DIR)),
"available": False,
"note": "No grpo_model row was found in model_policy_comparison.json.",
}
available = bool(grpo_row.get("available", False))
return {
"comparison_file": str(comparison_path.relative_to(BACKEND_DIR)),
"available": available,
"note": (
"Measured grpo_model metrics are available."
if available
else "grpo_model row exists but metrics are not available in this repository snapshot."
),
}
def build_before_after_behavior_comparison(
*,
baseline_policy: str = DEFAULT_BASELINE_POLICY,
target_policy: str = DEFAULT_POLICY,
seed: int = 42,
max_examples: int = DEFAULT_BEHAVIOR_EXAMPLES,
) -> dict[str, Any]:
samples, _ = load_validation_samples_for_benchmark()
rng = random.Random(seed + 17)
rows: list[dict[str, Any]] = []
baseline_actions: list[str] = []
target_actions: list[str] = []
for sample in samples:
baseline = _sample_policy_decision(sample, baseline_policy, rng)
target = _sample_policy_decision(sample, target_policy, rng)
expected = str(sample.get("correct_action") or sample.get("expected_decision") or "UNKNOWN")
baseline_action = str(baseline["action"])
target_action = str(target["action"])
baseline_actions.append(baseline_action)
target_actions.append(target_action)
row = {
"scenario_id": str(sample.get("sample_id", "")),
"scenario": f"{sample.get('domain', 'UNKNOWN')}::{sample.get('intent', 'UNKNOWN')}",
"scenario_summary": str(sample.get("raw_payload", ""))[:180],
"expected_action": expected,
"baseline_action": baseline_action,
"qaware_action": target_action,
"baseline_correct": baseline_action == expected,
"trained_correct": target_action == expected,
"baseline_confidence": round(float(baseline.get("confidence", 0.0) or 0.0), 3),
"confidence": round(float(target.get("confidence", 0.0) or 0.0), 3),
"baseline_failure_reason": str(baseline.get("rationale", ""))[:200] if baseline_action != expected else "",
"qaware_success_reason": str(target.get("rationale", ""))[:200] if target_action == expected else "",
"risk_score": round(float(sample.get("risk_score", 0.0) or 0.0), 4),
"missing_evidence": list(sample.get("missing_evidence", [])),
"evidence_plan": list(sample.get("evidence_plan", [])),
"safe_outcome": target_action in ("BLOCK", "FORK", "QUARANTINE") and expected in ("BLOCK", "FORK", "QUARANTINE") or target_action == "ALLOW" and expected == "ALLOW",
}
rows.append(row)
baseline_metrics = evaluate_outputs(samples, baseline_actions, label=f"{baseline_policy}_baseline")
target_metrics = evaluate_outputs(samples, target_actions, label=f"{target_policy}_serving")
deltas = {
"exact_match_delta": float(target_metrics.get("exact_match", 0.0) - baseline_metrics.get("exact_match", 0.0)),
"safety_accuracy_delta": float(
target_metrics.get("safety_accuracy", 0.0) - baseline_metrics.get("safety_accuracy", 0.0)
),
"unsafe_decision_rate_delta": float(
target_metrics.get("unsafe_decision_rate", 0.0) - baseline_metrics.get("unsafe_decision_rate", 0.0)
),
"reward_mean_delta": float(target_metrics.get("reward_mean", 0.0) - baseline_metrics.get("reward_mean", 0.0)),
}
differing = [row for row in rows if row["baseline_action"] != row["qaware_action"]]
differing.sort(
key=lambda row: (
int(row["trained_correct"]) - int(row["baseline_correct"]),
row["risk_score"],
),
reverse=True,
)
selected = differing[: max(0, max_examples)]
if len(selected) < max_examples:
selected_ids = {row["scenario_id"] for row in selected}
fallback = [
row
for row in sorted(rows, key=lambda item: abs(float(item["risk_score"]) - 0.5), reverse=True)
if row["scenario_id"] not in selected_ids
]
selected.extend(fallback[: max_examples - len(selected)])
checkpoint_status = _model_checkpoint_availability()
return {
"title": "ShadowOps before/after behavior comparison",
"comparison_type": "baseline_vs_serving_policy",
"baseline_policy": baseline_policy,
"target_policy": target_policy,
"sample_source": str((TRAINING_DIR / "qwen3_val_dataset.json").relative_to(BACKEND_DIR)),
"sample_count": len(samples),
"checkpoint_status": checkpoint_status,
"note": (
"Target policy is serving-time q_aware logic. This file does not claim checkpoint training gains "
"unless checkpoint_status.available is true."
),
"aggregate": {
"baseline": {
"exact_match": baseline_metrics.get("exact_match", 0.0),
"safety_accuracy": baseline_metrics.get("safety_accuracy", 0.0),
"unsafe_decision_rate": baseline_metrics.get("unsafe_decision_rate", 0.0),
"reward_mean": baseline_metrics.get("reward_mean", 0.0),
},
"trained_or_serving": {
"exact_match": target_metrics.get("exact_match", 0.0),
"safety_accuracy": target_metrics.get("safety_accuracy", 0.0),
"unsafe_decision_rate": target_metrics.get("unsafe_decision_rate", 0.0),
"reward_mean": target_metrics.get("reward_mean", 0.0),
},
"delta_target_minus_baseline": deltas,
},
"examples": selected,
}
def _png_chunk(kind: bytes, data: bytes) -> bytes:
return struct.pack(">I", len(data)) + kind + data + struct.pack(">I", zlib.crc32(kind + data) & 0xFFFFFFFF)
def _write_png_rgb(path: Path, width: int, height: int, pixels: list[list[tuple[int, int, int]]]) -> None:
raw = bytearray()
for row in pixels:
raw.append(0)
for r, g, b in row:
raw.extend((r, g, b))
payload = b"".join(
[
b"\x89PNG\r\n\x1a\n",
_png_chunk(b"IHDR", struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)),
_png_chunk(b"IDAT", zlib.compress(bytes(raw), level=9)),
_png_chunk(b"IEND", b""),
]
)
path.write_bytes(payload)
def _draw_line(
pixels: list[list[tuple[int, int, int]]],
x0: int,
y0: int,
x1: int,
y1: int,
color: tuple[int, int, int],
) -> None:
width = len(pixels[0])
height = len(pixels)
dx = abs(x1 - x0)
dy = -abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx + dy
while True:
if 0 <= x0 < width and 0 <= y0 < height:
pixels[y0][x0] = color
if x0 == x1 and y0 == y1:
break
e2 = 2 * err
if e2 >= dy:
err += dy
x0 += sx
if e2 <= dx:
err += dx
y0 += sy
def _write_episode_reward_plot(
*,
baseline_rewards: list[float],
target_rewards: list[float],
output_path: Path,
) -> None:
width, height = 960, 420
margin_left, margin_right, margin_top, margin_bottom = 56, 20, 16, 34
pixels = [[(250, 252, 255) for _ in range(width)] for _ in range(height)]
axis = (70, 82, 102)
grid = (228, 233, 242)
for y in range(margin_top, height - margin_bottom):
pixels[y][margin_left] = axis
for x in range(margin_left, width - margin_right):
pixels[height - margin_bottom][x] = axis
for line in range(1, 5):
y = margin_top + int((height - margin_top - margin_bottom) * line / 5)
for x in range(margin_left + 1, width - margin_right):
pixels[y][x] = grid
all_values = [float(v) for v in baseline_rewards + target_rewards]
if not all_values:
_write_png_rgb(output_path, width, height, pixels)
return
min_v = min(all_values)
max_v = max(all_values)
if abs(max_v - min_v) < 1e-9:
max_v = min_v + 1.0
plot_w = width - margin_left - margin_right
plot_h = height - margin_top - margin_bottom
max_index = max(len(baseline_rewards), len(target_rewards)) - 1
max_index = max(max_index, 1)
def map_point(index: int, value: float) -> tuple[int, int]:
x = margin_left + int(index * plot_w / max_index)
y = margin_top + int((1.0 - ((value - min_v) / (max_v - min_v))) * plot_h)
return x, y
baseline_color = (213, 78, 76)
target_color = (46, 172, 104)
if len(baseline_rewards) >= 2:
baseline_points = [map_point(i, float(v)) for i, v in enumerate(baseline_rewards)]
for (x0, y0), (x1, y1) in zip(baseline_points, baseline_points[1:]):
_draw_line(pixels, x0, y0, x1, y1, baseline_color)
if len(target_rewards) >= 2:
target_points = [map_point(i, float(v)) for i, v in enumerate(target_rewards)]
for (x0, y0), (x1, y1) in zip(target_points, target_points[1:]):
_draw_line(pixels, x0, y0, x1, y1, target_color)
_write_png_rgb(output_path, width, height, pixels)
def _write_behavior_comparison(
comparison: dict[str, Any],
*,
output_dir: Path,
) -> dict[str, str]:
json_path = output_dir / "openenv_behavior_comparison.json"
md_path = output_dir / "openenv_behavior_comparison.md"
write_json(json_path, comparison)
lines = [
"# ShadowOps Before/After Behavior Comparison",
"",
f"- Baseline policy: {comparison['baseline_policy']}",
f"- Target policy: {comparison['target_policy']}",
f"- Sample source: `{comparison['sample_source']}`",
f"- Samples: {comparison['sample_count']}",
f"- Checkpoint availability: {comparison['checkpoint_status']['available']}",
f"- Checkpoint note: {comparison['checkpoint_status']['note']}",
"",
"## Aggregate Metrics",
"",
"| Metric | Baseline | Target/Serving | Delta (target-baseline) |",
"| --- | ---: | ---: | ---: |",
f"| Exact match | {comparison['aggregate']['baseline']['exact_match']:.3f} | {comparison['aggregate']['trained_or_serving']['exact_match']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['exact_match_delta']:+.3f} |",
f"| Safety accuracy | {comparison['aggregate']['baseline']['safety_accuracy']:.3f} | {comparison['aggregate']['trained_or_serving']['safety_accuracy']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['safety_accuracy_delta']:+.3f} |",
f"| Unsafe decision rate | {comparison['aggregate']['baseline']['unsafe_decision_rate']:.3f} | {comparison['aggregate']['trained_or_serving']['unsafe_decision_rate']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['unsafe_decision_rate_delta']:+.3f} |",
f"| Reward mean | {comparison['aggregate']['baseline']['reward_mean']:.3f} | {comparison['aggregate']['trained_or_serving']['reward_mean']:.3f} | {comparison['aggregate']['delta_target_minus_baseline']['reward_mean_delta']:+.3f} |",
"",
"## Representative Scenarios",
"",
"| Scenario ID | Scenario Summary | Expected Action | Baseline Action | Q-Aware Action | Failure Reason (Baseline) | Success Reason (Q-Aware) | Risk Score | Confidence | Safe Outcome |",
"| --- | --- | --- | --- | --- | --- | --- | ---: | ---: | --- |",
]
for row in comparison["examples"]:
baseline_rationale = str(row["baseline_failure_reason"]).replace("|", "/")
trained_rationale = str(row["qaware_success_reason"]).replace("|", "/")
lines.append(
f"| `{row['scenario_id']}` | {row['scenario_summary']} | {row['expected_action']} | "
f"{row['baseline_action']} | {row['qaware_action']} | "
f"{baseline_rationale} | {trained_rationale} | "
f"{row['risk_score']:.3f} | {row['confidence']:.3f} | {row['safe_outcome']} |"
)
lines.extend(["", "## Note", "", comparison["note"]])
md_path.write_text("\n".join(lines), encoding="utf-8")
return {"json": json_path.name, "md": md_path.name}
def write_openenv_report(report: dict[str, Any], output_dir: Path = REPORTS_DIR) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
json_path = output_dir / "openenv_loop_eval.json"
md_path = output_dir / "openenv_loop_eval.md"
write_json(json_path, report)
reward_plot = report.get("artifacts", {}).get("episode_reward_plot", "openenv_episode_rewards.png")
behavior_md = report.get("artifacts", {}).get("behavior_comparison_md", "openenv_behavior_comparison.md")
lines = [
"# ShadowOps OpenEnv Loop Evaluation",
"",
f"- Policy evaluated: {report['policy']}",
f"- Baseline policy for comparison: {report['baseline_policy']}",
f"- Episodes: {report['episodes']}",
f"- Episode max length: {report['episode_max_length']}",
f"- Seed: {report['seed']}",
f"- Run label: {report['run_label']}",
f"- Scope note: {report['run_scope_note']}",
"",
"## Core Metrics",
"",
"| Metric | Value |",
"| --- | ---: |",
f"| Total steps | {report['total_steps']} |",
f"| Malicious steps | {report['malicious_steps']} |",
f"| Benign steps | {report['benign_steps']} |",
f"| Accuracy | {report['accuracy']:.3f} |",
f"| Unsafe allow rate (malicious-only) | {report['unsafe_allow_rate']:.3f} |",
f"| Safe block rate (benign blocked/forked/quarantined) | {report['safe_block_rate']:.3f} |",
f"| Average confidence | {report['average_confidence']:.3f} |",
f"| Mean reward per step | {report['mean_reward_per_step']:.3f} |",
f"| Step pass count | {report['step_pass_count']} |",
f"| Step fail count | {report['step_fail_count']} |",
"",
"## Reward Summary",
"",
f"- Per-step reward mean/median/std: {report['reward_summary_per_step']['mean']:.3f} / {report['reward_summary_per_step']['median']:.3f} / {report['reward_summary_per_step']['std']:.3f}",
f"- Per-episode reward mean/median/std: {report['reward_summary_per_episode']['mean']:.3f} / {report['reward_summary_per_episode']['median']:.3f} / {report['reward_summary_per_episode']['std']:.3f}",
f"- Per-episode reward min/max: {report['reward_summary_per_episode']['min']:.3f} / {report['reward_summary_per_episode']['max']:.3f}",
"",
"## Before vs After Aggregate",
"",
"| Metric | Baseline | Target | Delta (target-baseline) |",
"| --- | ---: | ---: | ---: |",
f"| Unsafe allow rate | {report['baseline_summary']['unsafe_allow_rate']:.3f} | {report['unsafe_allow_rate']:.3f} | {report['comparison_delta']['unsafe_allow_rate_delta']:+.3f} |",
f"| Safe block rate | {report['baseline_summary']['safe_block_rate']:.3f} | {report['safe_block_rate']:.3f} | {report['comparison_delta']['safe_block_rate_delta']:+.3f} |",
f"| Average confidence | {report['baseline_summary']['average_confidence']:.3f} | {report['average_confidence']:.3f} | {report['comparison_delta']['average_confidence_delta']:+.3f} |",
f"| Mean reward/step | {report['baseline_summary']['mean_reward_per_step']:.3f} | {report['mean_reward_per_step']:.3f} | {report['comparison_delta']['mean_reward_per_step_delta']:+.3f} |",
f"| Safety Adjusted Score | {report['baseline_summary']['safety_adjusted_score']:.3f} | {report['safety_adjusted_score']:.3f} | {report['comparison_delta'].get('safety_adjusted_score_delta', 0.0):+.3f} |",
"",
"## Safety vs Reward Trade-Off",
"",
"- **Note on Reward vs Safety**: The `heuristic` baseline may occasionally have a higher `mean_reward_per_step` due to faster resolution times.",
"- However, **Q-aware is considered safer** when its `unsafe_allow_rate = 0.000`. Unsafe allow is the primary failure mode in security automation and carries severe negative business impact.",
"- Lower confidence scores in Q-aware do not necessarily mean failure; they often reflect **cautious uncertainty** on ambiguous payloads, which correctly triggers QUARANTINE instead of false-positive blocks or dangerous allows.",
"",
"## Representative Behavior",
"",
f"See `{behavior_md}` for 5-10 structured before/after scenarios.",
"",
"| Scenario | Baseline | Target | Baseline correct | Target correct |",
"| --- | --- | --- | --- | --- |",
]
for row in report["behavior_examples"]:
lines.append(
f"| {row['scenario']} | {row['baseline_action']} | {row['qaware_action']} | "
f"{row['baseline_correct']} | {row['trained_correct']} |"
)
lines.extend(
[
"",
"## Episode Summary",
"",
"| Episode | Reward | Steps | Unsafe allows | Safe blocks | Pass | Fail | Final SOC | Final GitHub | Final AWS |",
"| ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |",
]
)
for row in report["episodes_detail"]:
health = row.get("final_health", {})
lines.append(
f"| {row['episode']} | {float(row['episode_reward']):.3f} | {len(row['steps'])} | "
f"{row['unsafe_allow_steps']} | {row['safe_block_steps']} | {row['pass_steps']} | {row['fail_steps']} | "
f"{health.get('SOC', 0)} | {health.get('GITHUB', 0)} | {health.get('AWS', 0)} |"
)
lines.extend(
[
"",
"## Plot",
"",
f"- Episode reward trend plot: `{reward_plot}`",
"- Color mapping in the plot: red=baseline policy, green=target policy.",
]
)
md_path.write_text("\n".join(lines), encoding="utf-8")
def generate_openenv_report(
output_dir: Path = REPORTS_DIR,
*,
policy_name: str = DEFAULT_POLICY,
baseline_policy: str = DEFAULT_BASELINE_POLICY,
episodes: int = DEFAULT_EPISODES,
episode_max_length: int = DEFAULT_EPISODE_MAX_LENGTH,
seed: int = 42,
behavior_examples: int = DEFAULT_BEHAVIOR_EXAMPLES,
) -> dict[str, Any]:
baseline_report = evaluate_openenv_loop(
policy_name=baseline_policy,
episodes=episodes,
episode_max_length=episode_max_length,
seed=seed,
)
target_report = evaluate_openenv_loop(
policy_name=policy_name,
episodes=episodes,
episode_max_length=episode_max_length,
seed=seed,
)
comparison_delta = {
"unsafe_allow_rate_delta": float(target_report["unsafe_allow_rate"] - baseline_report["unsafe_allow_rate"]),
"safe_block_rate_delta": float(target_report["safe_block_rate"] - baseline_report["safe_block_rate"]),
"average_confidence_delta": float(target_report["average_confidence"] - baseline_report["average_confidence"]),
"mean_reward_per_step_delta": float(
target_report["mean_reward_per_step"] - baseline_report["mean_reward_per_step"]
),
"safety_adjusted_score_delta": float(
target_report["safety_adjusted_score"] - baseline_report["safety_adjusted_score"]
),
}
comparison = build_before_after_behavior_comparison(
baseline_policy=baseline_policy,
target_policy=policy_name,
seed=seed,
max_examples=max(5, min(10, behavior_examples)),
)
behavior_files = _write_behavior_comparison(comparison, output_dir=output_dir)
plots_dir = TRAINING_DIR / "plots"
plots_dir.mkdir(parents=True, exist_ok=True)
reward_plot_path = plots_dir / "openenv_episode_rewards.png"
_write_episode_reward_plot(
baseline_rewards=[float(row["episode_reward"]) for row in baseline_report["episodes_detail"]],
target_rewards=[float(row["episode_reward"]) for row in target_report["episodes_detail"]],
output_path=reward_plot_path,
)
report = dict(target_report)
report["baseline_policy"] = baseline_policy
report["baseline_summary"] = {
"unsafe_allow_rate": baseline_report["unsafe_allow_rate"],
"safe_block_rate": baseline_report["safe_block_rate"],
"average_confidence": baseline_report["average_confidence"],
"mean_reward_per_step": baseline_report["mean_reward_per_step"],
"safety_adjusted_score": baseline_report.get("safety_adjusted_score", 0.0),
}
report["comparison_delta"] = comparison_delta
report["behavior_examples"] = comparison["examples"]
report["checkpoint_status"] = comparison["checkpoint_status"]
report["artifacts"] = {
"episode_reward_plot": reward_plot_path.name,
"behavior_comparison_json": behavior_files["json"],
"behavior_comparison_md": behavior_files["md"],
}
write_openenv_report(report, output_dir)
return report
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="ShadowOps OpenEnv evaluation runner")
parser.add_argument("--policy", default=DEFAULT_POLICY, choices=SUPPORTED_POLICIES, help="Target policy")
parser.add_argument(
"--baseline-policy",
default=DEFAULT_BASELINE_POLICY,
choices=SUPPORTED_POLICIES,
help="Baseline policy for before/after comparison",
)
parser.add_argument("--episodes", type=int, default=DEFAULT_EPISODES, help="Number of episodes to evaluate")
parser.add_argument(
"--episode-max-length",
type=int,
default=DEFAULT_EPISODE_MAX_LENGTH,
help="Max steps per episode",
)
parser.add_argument("--seed", type=int, default=42, help="Random seed")
parser.add_argument(
"--behavior-examples",
type=int,
default=DEFAULT_BEHAVIOR_EXAMPLES,
help="Representative scenarios to keep in before/after summary (5-10 recommended)",
)
parser.add_argument(
"--output-dir",
type=Path,
default=REPORTS_DIR,
help="Output directory for openenv_loop_eval artifacts",
)
return parser.parse_args()
def main() -> int:
args = _parse_args()
report = generate_openenv_report(
output_dir=args.output_dir,
policy_name=args.policy,
baseline_policy=args.baseline_policy,
episodes=max(1, int(args.episodes)),
episode_max_length=max(1, int(args.episode_max_length)),
seed=int(args.seed),
behavior_examples=max(1, int(args.behavior_examples)),
)
output_dir = args.output_dir
print(f"OpenEnv episodes: {report['episodes']}")
print(f"OpenEnv run label: {report['run_label']}")
print(f"OpenEnv unsafe allow rate: {report['unsafe_allow_rate']:.3f}")
print(f"Saved: {(output_dir / 'openenv_loop_eval.json').relative_to(BACKEND_DIR)}")
print(f"Saved: {(output_dir / 'openenv_loop_eval.md').relative_to(BACKEND_DIR)}")
print(f"Saved: {(output_dir / 'openenv_behavior_comparison.json').relative_to(BACKEND_DIR)}")
print(f"Saved: {(output_dir / 'openenv_behavior_comparison.md').relative_to(BACKEND_DIR)}")
print(f"Saved: {(output_dir / 'openenv_episode_rewards.png').relative_to(BACKEND_DIR)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())