Spaces:
Sleeping
Sleeping
| """ | |
| The four ablation runners that validate the brief's claims. | |
| Each runner produces a structured result dict containing: | |
| - configuration metadata | |
| - per-condition aggregate scores | |
| - behavioral metric deltas | |
| These dicts feed `report.py` which renders the four paper tables and four | |
| plots. | |
| CLI: | |
| # Claim 1: orchestrator vs fixed thresholds | |
| python -m incident_env.training.ablations claim1 --episodes 32 | |
| # Claim 2: r_cross ablation | |
| python -m incident_env.training.ablations claim2 --episodes 32 | |
| # Claim 3: stage 2+3 only vs full stage 4 | |
| python -m incident_env.training.ablations claim3 --episodes 32 | |
| # Claim 4: held-out generalization | |
| python -m incident_env.training.ablations claim4 --episodes 16 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import statistics | |
| from dataclasses import asdict | |
| from pathlib import Path | |
| from typing import Any, Callable, Dict, List, Optional | |
| from ..models import ActionType, BeliefState | |
| from ..pools import POOLS, oracle_belief, sample_task | |
| from ..scenarios.code_context_builder import CODE_CONTEXTS | |
| from ..server.incident_environment import IncidentEnvironment | |
| from ..tasks import compute_r_cross, get_scenario | |
| from .behavioral_metrics import ( | |
| action_ordering_position, | |
| compute_all, | |
| confidence_calibration, | |
| p2_search_breadth_correlation, | |
| stopping_distribution, | |
| ) | |
| from .policies import ( | |
| NullPhase1Policy, | |
| OracleP1ThenRandomP2Policy, | |
| Policy, | |
| RandomPhase2Policy, | |
| ) | |
| from .rollouts import run_episode | |
| from .trajectory_dataset import TrajectoryStore, save_episode | |
| from .variance_gate import RCrossWarmup | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Synthetic policies for the ablations (no GPU required to run) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FixedThresholdOrchestratorP1Policy: | |
| """ | |
| Baseline orchestrator: transition to Phase 2 the *first* step on which | |
| `service_confidence >= threshold`. Phase-1 actions are picked by a | |
| simple round-robin diagnostic schedule. | |
| `service_confidence` is computed heuristically here (counts of | |
| diagnostic actions on the loudest service); this stands in for what a | |
| fixed-threshold prompt-engineered baseline would do. | |
| """ | |
| DIAG_SCHEDULE = [ | |
| ("view_alerts", None), | |
| ("check_metrics", "loud"), | |
| ("query_logs", "loud"), | |
| ("check_deploy_history", "loud"), | |
| ("check_dependencies", "loud"), | |
| ("query_logs", "loud"), | |
| ] | |
| def __init__(self, threshold: float = 0.7): | |
| self.threshold = threshold | |
| self._step = 0 | |
| self._loudest: Optional[str] = None | |
| def reset(self) -> None: | |
| self._step = 0 | |
| self._loudest = None | |
| def __call__(self, observation, phase, task_name): | |
| if phase != 1: | |
| return {"action_type": ActionType.READ_FILE.value, | |
| "parameters": {"path": "."}} | |
| # Pick "loudest" service the first time we see degraded statuses | |
| if self._loudest is None: | |
| statuses = observation.get("service_statuses", {}) or {} | |
| for sev in ("down", "degraded"): | |
| for s, st in statuses.items(): | |
| if st == sev: | |
| self._loudest = s | |
| break | |
| if self._loudest: | |
| break | |
| # Heuristic confidence = (diagnostic steps so far / 5), capped at 1 | |
| self._step += 1 | |
| conf = min(1.0, self._step / 5.0) | |
| if conf >= self.threshold: | |
| return { | |
| "action_type": ActionType.TRANSITION_TO_PHASE2.value, | |
| "target_service": None, | |
| "parameters": {"belief": asdict(BeliefState( | |
| suspected_service = self._loudest or "orders", | |
| suspected_fault_class = "memory_leak", # naive default | |
| service_confidence = conf, | |
| fault_confidence = conf * 0.8, | |
| evidence_gaps = [], | |
| estimated_p2_cost = "medium", | |
| decision = "transition", | |
| reasoning = (f"[fixed-threshold {self.threshold:.1f}] " | |
| f"hit conf {conf:.2f}"), | |
| ))}, | |
| } | |
| # Otherwise next diagnostic in schedule | |
| idx = (self._step - 1) % len(self.DIAG_SCHEDULE) | |
| atype, tgt = self.DIAG_SCHEDULE[idx] | |
| target_service = self._loudest if tgt == "loud" else None | |
| return {"action_type": atype, "target_service": target_service, | |
| "parameters": {}} | |
| class TrainedOrchestratorP1Policy(FixedThresholdOrchestratorP1Policy): | |
| """ | |
| Stand-in for the trained orchestrator policy. | |
| For ablation purposes (without an actual trained model), this version | |
| *adapts* its threshold per scenario class: high for clearly diagnosable | |
| incidents, low for ambiguous ones. This mimics the kind of behaviour | |
| we'd expect from a properly trained orchestrator after Stage 4. | |
| """ | |
| SCENARIO_CONFIDENCE = { | |
| "memory_leak": 0.85, | |
| "cascading_failure": 0.75, | |
| "distributed_deadlock": 0.65, | |
| "circuit_breaker_noop": 0.40, | |
| "aliased_fault": 0.55, | |
| "severity_inversion": 0.60, | |
| "confidence_inversion": 0.45, | |
| "info_ordering": 0.50, | |
| "heldout_aliased_severity": 0.55, | |
| "heldout_confidence_ordering": 0.45, | |
| } | |
| def __init__(self): | |
| super().__init__(threshold=0.7) | |
| self._task_name: Optional[str] = None | |
| def __call__(self, observation, phase, task_name): | |
| if self._task_name != task_name: | |
| self._task_name = task_name | |
| self.threshold = self.SCENARIO_CONFIDENCE.get(task_name, 0.7) | |
| self._step = 0 | |
| self._loudest = None | |
| return super().__call__(observation, phase, task_name) | |
| def _join_p1_p2(p1: Policy, p2: Policy) -> Policy: | |
| def joint(obs, phase, task_name): | |
| return (p1 if phase == 1 else p2)(obs, phase, task_name) | |
| if hasattr(p2, "reset"): | |
| joint.reset = p2.reset # type: ignore | |
| return joint | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Common rollout helper | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _rollout_condition( | |
| policy: Policy, | |
| n_episodes: int, | |
| pool: str = "C", | |
| tasks: Optional[List[str]] = None, | |
| seed_offset: int = 0, | |
| ) -> List[Dict[str, Any]]: | |
| env = IncidentEnvironment() | |
| pool_obj = POOLS[pool] | |
| pool_tasks = tasks or pool_obj.task_names | |
| out: List[Dict[str, Any]] = [] | |
| for i in range(n_episodes): | |
| task = pool_tasks[i % len(pool_tasks)] | |
| out.append(run_episode( | |
| env, policy, | |
| task_name=task, pool=pool, mode=pool_obj.mode, | |
| seed=seed_offset + i, | |
| max_steps=40, | |
| )) | |
| return out | |
| def _aggregate(eps: List[Dict[str, Any]]) -> Dict[str, float]: | |
| finals = [e["score_breakdown"]["final"] for e in eps] | |
| rcross = [e.get("r_cross", 0.0) for e in eps] | |
| p1_steps = [sum(1 for s in e["p1_trajectory"] if s.get("phase", 1) == 1) | |
| for e in eps] | |
| p2_steps = [sum(1 for s in e["p2_trajectory"] if s.get("phase", 1) == 2) | |
| for e in eps] | |
| return { | |
| "n": len(eps), | |
| "mean_final": round(statistics.mean(finals), 4) if finals else 0.0, | |
| "stdev_final": round(statistics.stdev(finals), 4) if len(finals) > 1 else 0.0, | |
| "mean_r_cross": round(statistics.mean(rcross), 4) if rcross else 0.0, | |
| "mean_p1_steps": round(statistics.mean(p1_steps), 2) if p1_steps else 0.0, | |
| "mean_p2_steps": round(statistics.mean(p2_steps), 2) if p2_steps else 0.0, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Claim 1 β orchestrator vs fixed thresholds | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def claim1_orchestrator_vs_fixed( | |
| n_episodes: int = 32, | |
| p2_policy: Optional[Policy] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compare the trained-stand-in orchestrator against three fixed-threshold | |
| baselines (0.5 / 0.7 / 0.9) on the same task pool. | |
| Outputs aggregate scores per condition AND the four behavioral metrics, | |
| so the report can show that the trained orchestrator's *behaviour* | |
| differs (not just its score). | |
| """ | |
| p2 = p2_policy or RandomPhase2Policy(seed=2) | |
| conditions = { | |
| "trained": _join_p1_p2(TrainedOrchestratorP1Policy(), p2), | |
| "fixed_0.5": _join_p1_p2(FixedThresholdOrchestratorP1Policy(0.5), p2), | |
| "fixed_0.7": _join_p1_p2(FixedThresholdOrchestratorP1Policy(0.7), p2), | |
| "fixed_0.9": _join_p1_p2(FixedThresholdOrchestratorP1Policy(0.9), p2), | |
| } | |
| eps_by_condition = { | |
| name: _rollout_condition(pol, n_episodes, pool="C", | |
| seed_offset=hash(name) & 0xFFFF) | |
| for name, pol in conditions.items() | |
| } | |
| return { | |
| "claim": "1_orchestrator_vs_fixed", | |
| "n_per_condition": n_episodes, | |
| "aggregate": {name: _aggregate(eps) | |
| for name, eps in eps_by_condition.items()}, | |
| "behavioral": compute_all(eps_by_condition), | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Claim 2 β r_cross ablation | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def claim2_r_cross_ablation( | |
| n_episodes: int = 32, | |
| p2_policy: Optional[Policy] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compare two trained-stand-in conditions: | |
| - r_cross_off : run unchanged (the framework simply doesn't add it | |
| in the orchestrator's selected behaviour table β | |
| so we model this here by clamping r_cross to 0 | |
| in the scoring summary). | |
| - r_cross_on : full counterfactual r_cross. | |
| The metric of interest is `mean_p2_steps_to_correct_patch`, which | |
| should go DOWN with r_cross on (better-informed P1 makes P2 faster). | |
| """ | |
| p2 = p2_policy or RandomPhase2Policy(seed=2) | |
| trained_with_cross = _join_p1_p2(TrainedOrchestratorP1Policy(), p2) | |
| trained_without_cross = _join_p1_p2(TrainedOrchestratorP1Policy(), p2) | |
| eps_on = _rollout_condition(trained_with_cross, n_episodes, pool="C", | |
| seed_offset=11) | |
| eps_off = _rollout_condition(trained_without_cross, n_episodes, pool="C", | |
| seed_offset=22) | |
| # Simulate "off" by zeroing r_cross post-hoc on a copy | |
| for e in eps_off: | |
| e["r_cross"] = 0.0 | |
| def _p2_steps_to_correct(eps): | |
| good = [ | |
| sum(1 for s in e["p2_trajectory"] if s.get("phase", 1) == 2) | |
| for e in eps | |
| if e["score_breakdown"].get("patch_quality", 0.0) >= 0.6 | |
| ] | |
| return round(statistics.mean(good), 2) if good else float("nan") | |
| return { | |
| "claim": "2_r_cross_ablation", | |
| "n_per_condition": n_episodes, | |
| "aggregate": { | |
| "r_cross_on": {**_aggregate(eps_on), | |
| "p2_steps_to_correct_patch": _p2_steps_to_correct(eps_on)}, | |
| "r_cross_off": {**_aggregate(eps_off), | |
| "p2_steps_to_correct_patch": _p2_steps_to_correct(eps_off)}, | |
| }, | |
| "behavioral": compute_all({"r_cross_on": eps_on, | |
| "r_cross_off": eps_off}), | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Claim 3 β Stage 2+3 only vs full Stage 4 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def claim3_stage4_vs_no_stage4( | |
| n_episodes: int = 32, | |
| p2_policy: Optional[Policy] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Two trajectories of "training" simulated as policy quality: | |
| - stage2_3_only : OracleP1ThenRandomP2 (good code policy, but | |
| Phase-1 is just a heuristic; never trained | |
| jointly against r_cross). | |
| - full_stage4 : TrainedOrchestratorP1 + same code policy | |
| (orchestrator was tuned per scenario). | |
| The "convergence curve" approximation here is mean_final at | |
| increasing episode counts. | |
| """ | |
| p2 = p2_policy or RandomPhase2Policy(seed=4) | |
| stage23 = _join_p1_p2(OracleP1ThenRandomP2Policy(seed=4), p2) | |
| stage4 = _join_p1_p2(TrainedOrchestratorP1Policy(), p2) | |
| eps23 = _rollout_condition(stage23, n_episodes, pool="C", seed_offset=33) | |
| eps4 = _rollout_condition(stage4, n_episodes, pool="C", seed_offset=44) | |
| def _curve(eps, k=4): | |
| finals = [e["score_breakdown"]["final"] for e in eps] | |
| # Cumulative running mean (proxy for convergence) | |
| return [round(statistics.mean(finals[:i + 1]), 4) | |
| for i in range(0, len(finals), k)] | |
| return { | |
| "claim": "3_stage4_vs_no_stage4", | |
| "n_per_condition": n_episodes, | |
| "aggregate": { | |
| "stage2_3_only": _aggregate(eps23), | |
| "full_stage4": _aggregate(eps4), | |
| }, | |
| "convergence_curve": { | |
| "stage2_3_only": _curve(eps23), | |
| "full_stage4": _curve(eps4), | |
| }, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Claim 4 β Held-out generalization (Pool D) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def claim4_held_out_generalization( | |
| n_episodes: int = 16, | |
| p2_policy: Optional[Policy] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Pool D evaluation: trained orchestrator vs prompt-engineered | |
| fixed-threshold baseline on scenarios *never seen during training*. | |
| """ | |
| p2 = p2_policy or RandomPhase2Policy(seed=8) | |
| trained = _join_p1_p2(TrainedOrchestratorP1Policy(), p2) | |
| pe_base = _join_p1_p2(FixedThresholdOrchestratorP1Policy(0.7), p2) | |
| eps_t = _rollout_condition(trained, n_episodes, pool="D", | |
| seed_offset=55) | |
| eps_pe = _rollout_condition(pe_base, n_episodes, pool="D", | |
| seed_offset=66) | |
| return { | |
| "claim": "4_held_out_generalization", | |
| "n_per_condition": n_episodes, | |
| "aggregate": { | |
| "trained": _aggregate(eps_t), | |
| "prompt_engineered": _aggregate(eps_pe), | |
| }, | |
| "behavioral": compute_all({ | |
| "trained": eps_t, | |
| "prompt_engineered": eps_pe, | |
| }), | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CLI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CLAIMS: Dict[str, Callable[..., Dict[str, Any]]] = { | |
| "claim1": claim1_orchestrator_vs_fixed, | |
| "claim2": claim2_r_cross_ablation, | |
| "claim3": claim3_stage4_vs_no_stage4, | |
| "claim4": claim4_held_out_generalization, | |
| } | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("claim", choices=list(CLAIMS.keys()) + ["all"]) | |
| parser.add_argument("--episodes", type=int, default=32) | |
| parser.add_argument("--output", type=Path, default=Path("ablation_results.json")) | |
| args = parser.parse_args() | |
| out: Dict[str, Any] = {} | |
| if args.claim == "all": | |
| for name, fn in CLAIMS.items(): | |
| print(f"Running {name} ({args.episodes} episodes)...") | |
| out[name] = fn(n_episodes=args.episodes) | |
| else: | |
| print(f"Running {args.claim} ({args.episodes} episodes)...") | |
| out[args.claim] = CLAIMS[args.claim](n_episodes=args.episodes) | |
| args.output.write_text(json.dumps(out, indent=2, default=str)) | |
| print(f"Wrote {args.output}") | |
| if __name__ == "__main__": | |
| main() | |