| import argparse |
| import asyncio |
| import inspect |
| import json |
| import math |
| import os |
| import random |
| import textwrap |
| from contextlib import asynccontextmanager |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Dict, List, Optional |
| from urllib.parse import urlparse |
|
|
| from dotenv import load_dotenv |
| from openai import AsyncOpenAI |
|
|
| try: |
| from AntiAtropos.client import AntiAtroposEnv |
| from AntiAtropos.grader import EpisodeGrader |
| from AntiAtropos.models import ActionType, SREAction |
| from AntiAtropos.replay import EpisodeReplayBuffer, compress_trajectory |
| except ModuleNotFoundError: |
| from client import AntiAtroposEnv |
| from grader import EpisodeGrader |
| from models import ActionType, SREAction |
| from replay import EpisodeReplayBuffer, compress_trajectory |
|
|
| load_dotenv() |
|
|
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| API_BASE_URL = os.getenv("API_BASE_URL") or ( |
| "https://api.groq.com/openai/v1" if GROQ_API_KEY else "https://router.huggingface.co/v1" |
| ) |
| MODEL_NAME = os.getenv("MODEL_NAME") or ( |
| "llama-3.1-8b-instant" if GROQ_API_KEY else "Qwen/Qwen2.5-72B-Instruct" |
| ) |
| API_KEY = os.getenv("API_KEY") or GROQ_API_KEY |
| if not API_KEY: |
| |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("OPENAI_API_KEY") |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") |
|
|
| DEFAULT_ENV_URL = "https://pranavkk-antiatropos.hf.space" |
| ENV_URL = os.getenv("ENV_URL") or os.getenv("ANTIATROPOS_ENV_URL") or DEFAULT_ENV_URL |
| ENV_MODE = os.getenv("ANTIATROPOS_MODE", "simulated") |
| TASK_NAME = os.getenv("ANTIATROPOS_TASK", "task-1") |
| BENCHMARK = os.getenv("ANTIATROPOS_BENCHMARK", "antiatropos") |
|
|
| MAX_STEPS_PER_TASK = 60 |
| MESSAGE_TIMEOUT_S = 300 |
| MODEL_TIMEOUT_S = 25 |
|
|
| TEMPERATURE = float(os.getenv("ANTIATROPOS_TEMPERATURE", "0.0")) |
| MAX_TOKENS = int(os.getenv("ANTIATROPOS_MAX_TOKENS", "180")) |
| SEED = int(os.getenv("ANTIATROPOS_SEED", "42")) |
| SUCCESS_SCORE_THRESHOLD = float(os.getenv("ANTIATROPOS_SUCCESS_THRESHOLD", "0.55")) |
| EVAL_RUNS = int(os.getenv("ANTIATROPOS_EVAL_RUNS", "3")) |
| TEMPERATURE_SWEEP = [0.6, 0.3, 0.7] |
|
|
| |
| LEADERBOARD_WEIGHTS = {"task-1": 0.4, "task-2": 0.3, "task-3": 0.3} |
|
|
| TASK_BRIEFS: Dict[str, str] = { |
| "task-1": "Traffic ramps linearly every tick. Scale up proactively — new capacity takes 5 ticks to boot. Keep latency under SLA (200ms) while minimizing cost. Scale down when queues are safe.", |
| "task-2": "One node (node-1 through node-4) will fail permanently. Wait until you SEE a FAILED node — do NOT pre-scale. Once a node shows status=FAILED: reroute traffic FROM the failed node to healthy peers, and scale up any starved children. Do NOT scale node-0 unless node-4 failed independently. SCALE_DOWN cancels pending boots and reduces cost. If reward is falling, stop scaling.", |
| "task-3": "A surge (~75 req/tick) will hit node-1 and node-2 via a side channel bypassing node-0. Do NOT scale node-0 — it is NOT affected. ONLY scale node-1 or node-2 when their queue_depth rises. Do NOT pre-scale. 3-4 SCALE_UPs on each is sufficient. SCALE_DOWN cancels pending boots and reduces cost — use it when queues are safe. If reward is falling, STOP scaling and SCALE_DOWN to recover.", |
| } |
|
|
| SYSTEM_PROMPT = textwrap.dedent( |
| """ |
| You are an autonomous SRE controller managing a five-node microservice cluster. |
| |
| CLUSTER TOPOLOGY (traffic flows parent → children): |
| node-0 → node-1, node-2 |
| node-2 → node-3 |
| node-4 (independent ingress) |
| FAILED nodes have outflow=0 — their children are starved. |
| Backpressure: overloaded children reduce parent capacity. |
| |
| ACTIONS (new capacity takes 5 ticks to boot): |
| SCALE_UP <node> <amount> — add capacity (0.3-0.5 normal, 0.6-0.8 heavy surge), clears DEGRADED |
| SCALE_DOWN <node> <amount> — cancel pending boots first, then remove active capacity (0.2-0.4 safe, 0.5-0.7 aggressive) |
| REROUTE_TRAFFIC <node> <fraction> — reduce THIS node capacity, redistribute to peers (0.3-0.5) |
| SHED_LOAD <node> <fraction> — drop incoming traffic (0.3-0.5), NEVER on node-0 (payment gateway) |
| NO_OP — do nothing |
| |
| REWARD PRIORITIES (in order): |
| 1. Avoid SLA violations (latency > 200ms or error rate > 5%) |
| 2. Keep queues low (growing queues = destabilizing system) |
| 3. Don't over-provision (excess capacity costs money) |
| |
| REWARD SIGNAL: Each step returns a reward [0,1]. |
| > 0.5 = good. 0.15–0.5 = acceptable. < 0.15 = you are making things worse. |
| If reward is falling, STOP the current strategy — try a different action or NO_OP. |
| Repeating the same action when reward < 0.1 is always wrong. |
| |
| Scale when your observations demand it, not preemptively. |
| Boot delay is 5 ticks — factor this into your timing. |
| Scale back down when safe to save cost. |
| |
| Return exactly one JSON object: |
| { |
| "action_type": "SCALE_UP" | "SCALE_DOWN" | "REROUTE_TRAFFIC" | "SHED_LOAD" | "NO_OP", |
| "target_node_id": "node-0" | "node-1" | "node-2" | "node-3" | "node-4", |
| "parameter": 0.0 |
| } |
| """ |
| ).strip() |
|
|
|
|
| class InferenceError(RuntimeError): |
| """Raised when inference execution fails for a task.""" |
|
|
|
|
| def log_start(task: str, env: str, model: str) -> None: |
| print(f"[START] task={task} env={env} model={model}", flush=True) |
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: |
| error_val = error if error else "null" |
| print( |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={error_val}", |
| flush=True, |
| ) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) |
| print( |
| f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", |
| flush=True, |
| ) |
|
|
|
|
| def _seed_everything(seed: int) -> None: |
| random.seed(seed) |
| try: |
| import numpy as np |
|
|
| np.random.seed(seed) |
| except ImportError: |
| pass |
|
|
|
|
| def _task_seed(base_seed: int, task_id: str) -> int: |
| offsets = {"task-1": 0, "task-2": 1, "task-3": 2} |
| return int(base_seed + offsets.get(task_id, 0)) |
|
|
|
|
| def _strict_score(score: float, eps: float = 0.001) -> float: |
| del eps |
| return min(1.0, max(0.0, float(score))) |
|
|
|
|
| def _hf_web_fallback_url(base_url: str) -> str: |
| parsed = urlparse(base_url) |
| host = parsed.netloc.lower() |
| path = parsed.path.rstrip("/") |
| if host.endswith(".hf.space") and path == "": |
| return base_url.rstrip("/") + "/web" |
| return base_url |
|
|
|
|
| @asynccontextmanager |
| async def open_env(message_timeout_s: int): |
| |
| if ENV_URL: |
| try: |
| async with AntiAtroposEnv(ENV_URL, message_timeout_s=message_timeout_s) as env: |
| yield env |
| return |
| except ConnectionError as e: |
| fallback_url = _hf_web_fallback_url(ENV_URL) |
| if fallback_url == ENV_URL or "404" not in str(e): |
| raise |
| async with AntiAtroposEnv(fallback_url, message_timeout_s=message_timeout_s) as env: |
| yield env |
| return |
|
|
| if LOCAL_IMAGE_NAME: |
| env = AntiAtroposEnv.from_docker_image(LOCAL_IMAGE_NAME) |
| try: |
| yield env |
| finally: |
| close_result = env.close() |
| if inspect.isawaitable(close_result): |
| await close_result |
| return |
|
|
| raise RuntimeError("Missing environment target. Set ENV_URL/ANTIATROPOS_ENV_URL or LOCAL_IMAGE_NAME.") |
|
|
|
|
| def build_user_prompt(task_id: str, step: int, obs: dict, history: List[str], demo_text: str = "") -> str: |
| recent = "\n".join(history[-4:]) if history else "None" |
| brief = TASK_BRIEFS.get(task_id, "Maintain SLA, stability, and efficient cost.") |
| demo_section = f"\n\n{demo_text}" if demo_text else "" |
|
|
| |
| cost_hour = obs.get("current_cost_per_hour", 0.0) |
| cost_dev = "low" if cost_hour < 1.2 else ("high" if cost_hour > 1.8 else "baseline") |
| queue_backlog = obs.get("total_queue_backlog", 0.0) |
| queue_trend = "rising" if queue_backlog > 0.3 else ("stable" if queue_backlog < 0.1 else "moderate") |
| sla_violations = obs.get("sla_violations", 0) |
| sla_note = f" ({sla_violations} violations)" if sla_violations > 0 else "" |
| |
| recent_rewards = [float(h.split("reward=")[1].split()[0]) for h in history[-3:] if "reward=" in h] |
| if recent_rewards: |
| last_r = recent_rewards[-1] |
| r_tag = "GOOD" if last_r > 0.5 else ("OK" if last_r > 0.2 else ("BAD" if last_r > 0.05 else "STOP-SCALING")) |
| trend = "↓" if len(recent_rewards) > 1 and recent_rewards[-1] < recent_rewards[0] else ("↑" if len(recent_rewards) > 1 and recent_rewards[-1] > recent_rewards[0] else "") |
| reward_feedback = f" | Reward: {last_r:.2f}={r_tag}{trend}" |
| else: |
| reward_feedback = "" |
| cluster_summary = f"Cost: {cost_dev} (${cost_hour:.2f}/hr) | Queues: {queue_trend}{sla_note}{reward_feedback}" |
|
|
| return textwrap.dedent( |
| f""" |
| Task: {task_id} |
| Objective: {brief} |
| Step: {step} |
| Status: {cluster_summary} |
| |
| Current state: |
| {json.dumps(obs, separators=(",", ":"))} |
| |
| Recent decisions: |
| {recent}{demo_section} |
| |
| Choose the next SRE action. |
| """ |
| ).strip() |
|
|
|
|
| def observation_for_model(obs) -> dict: |
| """ |
| Build a compact observation dict for the LLM. |
| |
| DESIGN: only raw physical metrics a human SRE sees on their dashboard. |
| Reward decomposition and pre-digested scoring signals are EXCLUDED — |
| the LLM must reason from physics, not reverse-engineer the scorer. |
| |
| The scalar reward for past steps is already in the history (correct). |
| """ |
| |
| |
| |
| failed_nodes = [] |
| degraded_nodes = [] |
| for node in obs.nodes: |
| s = str(getattr(node.status, "value", str(node.status))) |
| if s == "failed": |
| failed_nodes.append(node.node_id) |
| elif s == "degraded": |
| degraded_nodes.append(node.node_id) |
|
|
| return { |
| "task_id": obs.task_id, |
| "mode": getattr(obs.mode, "value", str(obs.mode)), |
| "step": obs.step, |
| "max_steps": obs.max_steps, |
| "failed_nodes": failed_nodes, |
| "degraded_nodes": degraded_nodes, |
| "average_latency_ms": obs.average_latency_ms, |
| "error_rate": obs.error_rate, |
| "total_queue_backlog": obs.total_queue_backlog, |
| "current_cost_per_hour": getattr(obs, "current_cost_per_hour", 0.0), |
| "sla_violations": obs.sla_violations, |
| "invalid_action_count": obs.invalid_action_count, |
| "nodes": [ |
| { |
| "node_id": node.node_id, |
| "status": getattr(node.status, "value", str(node.status)), |
| "queue_depth": node.queue_depth, |
| "latency_ms": node.latency_ms, |
| "incoming_request_rate": node.incoming_request_rate, |
| "cpu_utilization": node.cpu_utilization, |
| "capacity": getattr(node, "capacity", 0.0), |
| "pending_capacity": getattr(node, "pending_capacity", 0.0), |
| "queue_delta": getattr(node, "queue_delta", 0.0), |
| "outflow_rate": getattr(node, "outflow_rate", 0.0), |
| "downstream_nodes": getattr(node, "downstream_nodes", []), |
| "upstream_nodes": getattr(node, "upstream_nodes", []), |
| "upstream_pressure": getattr(node, "upstream_pressure", 0.0), |
| } |
| for node in obs.nodes |
| ], |
| } |
|
|
|
|
| def _extract_json_object(text: str) -> dict: |
| stripped = text.strip() |
| start = stripped.find("{") |
| end = stripped.rfind("}") |
| if start == -1 or end == -1 or end < start: |
| raise ValueError("no JSON object found") |
| return json.loads(stripped[start : end + 1]) |
|
|
|
|
| def _parse_action(payload: dict) -> SREAction: |
| action_type = str(payload.get("action_type", "NO_OP")).upper() |
| target_node_id = str(payload.get("target_node_id") or "node-0") |
| parameter = float(payload.get("parameter") or 0.0) |
| return SREAction( |
| action_type=ActionType(action_type), |
| target_node_id=target_node_id, |
| parameter=parameter, |
| ) |
|
|
|
|
| def _action_had_effect(ack_status: str) -> bool: |
| """Determine if an action had measurable effect from the ack status.""" |
| if not ack_status: |
| return True |
| if ack_status.startswith("Rejected:") or ack_status.startswith("Error:"): |
| return False |
| return True |
|
|
|
|
| async def get_model_action(client: AsyncOpenAI, task_id: str, step: int, obs: dict, history: List[str], demo_text: str = "") -> SREAction: |
| prompt = build_user_prompt(task_id=task_id, step=step, obs=obs, history=history, demo_text=demo_text) |
| try: |
| completion = await client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": prompt}, |
| ], |
| temperature=TEMPERATURE, |
| max_tokens=MAX_TOKENS, |
| presence_penalty=0.3, |
| response_format={"type": "json_object"}, |
| timeout=MODEL_TIMEOUT_S, |
| ) |
| content = completion.choices[0].message.content or "" |
| if not content.strip(): |
| raise InferenceError("Model returned empty content.") |
| return _parse_action(_extract_json_object(content)) |
| except Exception as exc: |
| raise InferenceError(f"Model inference failed at step {step} for {task_id}: {exc}") from exc |
|
|
|
|
| def _compact_action(action: SREAction) -> str: |
| payload = { |
| "action_type": action.action_type.value, |
| "target_node_id": action.target_node_id, |
| "parameter": round(float(action.parameter), 4), |
| } |
| return json.dumps(payload, separators=(",", ":")) |
|
|
|
|
| async def run_single_task( |
| env: AntiAtroposEnv, |
| client: AsyncOpenAI, |
| task_id: str, |
| temperature: float = 0.0, |
| replay_buffer: Optional[EpisodeReplayBuffer] = None, |
| run_seed: Optional[int] = None, |
| is_baseline: bool = False, |
| ) -> dict: |
| task_seed = run_seed if run_seed is not None else _task_seed(SEED, task_id) |
| result = await env.reset(task_id=task_id, mode=ENV_MODE, seed=task_seed) |
|
|
| grader = EpisodeGrader(task_id=task_id) |
| grader.record(result.observation) |
| history: List[str] = [] |
| rewards: List[float] = [] |
| raw_steps: List[dict] = [] |
| steps_taken = 0 |
| |
| demo_text = "" if is_baseline else (replay_buffer.format_demonstrations() if replay_buffer else "") |
|
|
| for step in range(1, MAX_STEPS_PER_TASK + 1): |
| if result.done: |
| break |
|
|
| action = await get_model_action( |
| client=client, |
| task_id=task_id, |
| step=step, |
| obs=observation_for_model(result.observation), |
| history=history, |
| demo_text=demo_text, |
| ) |
| result = await env.step(action) |
| grader.record(result.observation) |
|
|
| reward = float(result.reward or 0.0) |
| rewards.append(reward) |
| steps_taken = step |
| action_str = _compact_action(action) |
|
|
| |
| ack_status = getattr(result.observation, "action_ack_status", "") |
| had_effect = _action_had_effect(ack_status) |
|
|
| |
| grader.record_action( |
| action_type=action.action_type.value, |
| target_node_id=action.target_node_id, |
| parameter=float(action.parameter), |
| had_effect=had_effect, |
| ) |
|
|
| history.append(f"step={step} action={action_str} reward={reward:.2f}") |
|
|
| |
| obs = result.observation |
| raw_steps.append({ |
| "step": step, |
| "action_type": action.action_type.value, |
| "target_node_id": action.target_node_id, |
| "parameter": float(action.parameter), |
| "reward": reward, |
| "avg_latency_norm": getattr(obs, "average_latency_ms", 0.0), |
| "error_rate": getattr(obs, "error_rate", 0.0), |
| "queue_backlog_norm": getattr(obs, "total_queue_backlog", 0.0), |
| "sla_violation": reward < 0.3, |
| }) |
|
|
| error = ack_status if ack_status.startswith("Error:") or ack_status.startswith("Rejected:") else None |
| log_step(step=step, action=action_str, reward=reward, done=bool(result.done), error=error) |
|
|
| grade = grader.score() |
| score = _strict_score(float(grade.composite)) |
| success = score >= SUCCESS_SCORE_THRESHOLD |
|
|
| |
| if not is_baseline and replay_buffer is not None and raw_steps: |
| trajectory = compress_trajectory( |
| steps=raw_steps, |
| task_id=task_id, |
| score=score, |
| total_steps=steps_taken, |
| final_sla_violations=int(grade.scores.get("violations", 0)), |
| final_invalid_actions=int(grade.scores.get("invalid_actions", 0)), |
| ) |
| replay_buffer.store(trajectory, score) |
|
|
| |
| grade_scores = {} |
| for k, v in grade.scores.items(): |
| if isinstance(v, float) and math.isnan(v): |
| grade_scores[k] = None |
| else: |
| grade_scores[k] = v |
|
|
| return { |
| "task_id": task_id, |
| "success": success, |
| "score": score, |
| "steps": steps_taken, |
| "rewards": rewards, |
| "grade_scores": grade_scores, |
| "composite": float(grade.composite), |
| } |
|
|
|
|
| def _compute_aggregates(scores: List[float]) -> Dict[str, float]: |
| """Compute aggregate metrics from a list of per-run composite scores.""" |
| if not scores: |
| return {"mean": 0.0, "std": 0.0, "worst_case": 0.0, "pass_rate": 0.0, "consistency": 0.0} |
| mean = sum(scores) / len(scores) |
| std = (sum((s - mean) ** 2 for s in scores) / len(scores)) ** 0.5 |
| worst_case = min(scores) |
| pass_rate = sum(1 for s in scores if s >= SUCCESS_SCORE_THRESHOLD) / len(scores) |
| consistency = max(0.0, 1.0 - std / mean) if mean > 0 else 0.0 |
| return { |
| "mean": round(mean, 4), |
| "std": round(std, 4), |
| "worst_case": round(worst_case, 4), |
| "pass_rate": round(pass_rate, 4), |
| "consistency": round(consistency, 4), |
| } |
|
|
|
|
| def _compute_leaderboard(task_composites: Dict[str, float]) -> float: |
| """Weighted leaderboard score across tasks.""" |
| total = 0.0 |
| for task_id, weight in LEADERBOARD_WEIGHTS.items(): |
| total += weight * task_composites.get(task_id, 0.0) |
| return round(total, 4) |
|
|
|
|
| def _write_eval_report( |
| output_dir: Path, |
| run_mode: str, |
| task_results: Dict[str, dict], |
| leaderboard_score: float, |
| ) -> Path: |
| """Write structured JSON eval report to disk and return the path.""" |
| output_dir.mkdir(parents=True, exist_ok=True) |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") |
| filename = f"{run_mode}_{timestamp}.json" |
| filepath = output_dir / filename |
|
|
| report = { |
| "run_type": run_mode, |
| "timestamp": timestamp, |
| "model": MODEL_NAME, |
| "seed_base": SEED, |
| "leaderboard_score": leaderboard_score, |
| "tasks": task_results, |
| } |
|
|
| |
| opposite_mode = "baseline" if run_mode == "trained" else "trained" |
| existing = sorted(output_dir.glob(f"{opposite_mode}_*.json")) |
| if existing: |
| with open(existing[-1], "r") as f: |
| other = json.load(f) |
| delta = {} |
| for tid in task_results: |
| other_mean = other.get("tasks", {}).get(tid, {}).get("aggregates", {}).get("mean", 0.0) |
| this_mean = task_results[tid].get("aggregates", {}).get("mean", 0.0) |
| delta[tid] = round(this_mean - other_mean, 4) |
| report["delta_from_" + opposite_mode] = delta |
|
|
| with open(filepath, "w") as f: |
| json.dump(report, f, indent=2, default=str) |
|
|
| return filepath |
|
|
|
|
| async def run_all_tasks(overrides: Optional[argparse.Namespace] = None) -> None: |
| _seed_everything(SEED) |
| all_tasks = ["task-1", "task-2", "task-3"] |
| if overrides and overrides.task != "all": |
| tasks_to_run = [overrides.task] |
| else: |
| run_single = os.getenv("ANTIATROPOS_RUN_SINGLE_TASK", "false").lower() == "true" |
| task_id = TASK_NAME if TASK_NAME in set(all_tasks) else "task-1" |
| tasks_to_run = [task_id] if run_single else all_tasks |
| if not API_KEY: |
| raise RuntimeError("Missing API key (API_KEY/HF_TOKEN/OPENAI_API_KEY).") |
|
|
| |
| is_baseline = getattr(overrides, "mode", "trained") == "baseline" |
| eval_runs_val = getattr(overrides, "eval_runs", None) |
| eval_runs = eval_runs_val if eval_runs_val is not None else EVAL_RUNS |
| output_dir = Path(getattr(overrides, "output_dir", "eval_results")) |
|
|
| client = AsyncOpenAI(base_url=API_BASE_URL, api_key=API_KEY) |
| |
| replay_buffer = EpisodeReplayBuffer() |
|
|
| |
| task_results: Dict[str, dict] = {} |
|
|
| try: |
| async with open_env(MESSAGE_TIMEOUT_S) as env: |
| for task in tasks_to_run: |
| task_scores: List[float] = [] |
| task_composites: List[float] = [] |
| task_successes: List[bool] = [] |
| episode_breakdown: List[dict] = [] |
|
|
| for run_idx in range(eval_runs): |
| |
| |
| run_seed = SEED * 1000 + hash(task) % 100 + run_idx |
| temperature = TEMPERATURE_SWEEP[run_idx % len(TEMPERATURE_SWEEP)] |
|
|
| success = False |
| steps = 0 |
| score = 0.0 |
| composite = 0.0 |
| rewards: List[float] = [] |
| grade_scores: dict = {} |
| task_error: Optional[Exception] = None |
| log_start(task=f"{task} run={run_idx+1}/{eval_runs} temp={temperature} mode={'baseline' if is_baseline else 'trained'}", env=BENCHMARK, model=MODEL_NAME) |
| try: |
| report = await run_single_task( |
| env=env, |
| client=client, |
| task_id=task, |
| temperature=temperature, |
| replay_buffer=replay_buffer, |
| run_seed=run_seed, |
| is_baseline=is_baseline, |
| ) |
| success = bool(report["success"]) |
| steps = int(report["steps"]) |
| score = _strict_score(float(report["score"])) |
| composite = float(report.get("composite", score)) |
| rewards = list(report["rewards"]) |
| grade_scores = report.get("grade_scores", {}) |
| task_scores.append(score) |
| task_composites.append(composite) |
| task_successes.append(success) |
| episode_breakdown.append({ |
| "run": run_idx + 1, |
| "temp": temperature, |
| "composite": round(composite, 4), |
| "score": round(score, 4), |
| "steps": steps, |
| "success": success, |
| **{k: v for k, v in grade_scores.items() if k not in ("action_distribution", "node_heatmap")}, |
| }) |
| except Exception as exc: |
| task_error = exc |
| score = 0.0 |
| finally: |
| log_end(success=success, steps=steps, score=score, rewards=rewards) |
| if task_error is not None: |
| raise InferenceError(f"Task {task} run {run_idx+1} failed.") from task_error |
|
|
| |
| aggregates = _compute_aggregates(task_composites) |
| task_results[task] = { |
| "aggregates": aggregates, |
| "episodes": episode_breakdown, |
| } |
|
|
| print( |
| f"[AGGREGATE] task={task} mean={aggregates['mean']:.3f} " |
| f"std={aggregates['std']:.3f} worst={aggregates['worst_case']:.3f} " |
| f"pass_rate={aggregates['pass_rate']:.2f} consistency={aggregates['consistency']:.3f} " |
| f"runs={len(task_composites)}", |
| flush=True, |
| ) |
|
|
| finally: |
| await client.close() |
|
|
| |
| task_means = {tid: task_results[tid]["aggregates"]["mean"] for tid in task_results} |
| leaderboard = _compute_leaderboard(task_means) |
| print(f"\n[LEADERBOARD] score={leaderboard:.3f}", flush=True) |
| for tid, mean in task_means.items(): |
| weight = LEADERBOARD_WEIGHTS.get(tid, 0.0) |
| print(f" {tid}: mean={mean:.3f} (weight={weight})", flush=True) |
|
|
| |
| run_mode = "baseline" if is_baseline else "trained" |
| filepath = _write_eval_report(output_dir, run_mode, task_results, leaderboard) |
| print(f"\n[REPORT] written to {filepath}", flush=True) |
|
|
| |
| opposite = "baseline" if is_baseline else "trained" |
| existing = sorted(output_dir.glob(f"{opposite}_*.json")) |
| if existing: |
| with open(existing[-1], "r") as f: |
| other = json.load(f) |
| print(f"\n[DELTA] vs {opposite} ({existing[-1].name}):", flush=True) |
| for tid in task_results: |
| other_mean = other.get("tasks", {}).get(tid, {}).get("aggregates", {}).get("mean", 0.0) |
| this_mean = task_results[tid]["aggregates"]["mean"] |
| delta = this_mean - other_mean |
| sign = "+" if delta >= 0 else "" |
| print(f" {tid}: {sign}{delta:.3f} ({opposite}={other_mean:.3f} → {run_mode}={this_mean:.3f})", flush=True) |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="AntiAtropos SRE inference") |
| parser.add_argument( |
| "--task", "-t", |
| choices=["task-1", "task-2", "task-3", "all"], |
| default="all", |
| help="Run a specific task or all tasks (default: all)", |
| ) |
| parser.add_argument( |
| "--mode", "-m", |
| choices=["trained", "baseline"], |
| default="trained", |
| help="Evaluation mode: 'trained' uses replay buffer demos, 'baseline' runs without (default: trained)", |
| ) |
| parser.add_argument( |
| "--eval-runs", "-n", |
| type=int, |
| default=None, |
| help=f"Number of evaluation runs per task (default: {EVAL_RUNS})", |
| ) |
| parser.add_argument( |
| "--output-dir", "-o", |
| type=str, |
| default="eval_results", |
| help="Directory for JSON eval reports (default: eval_results)", |
| ) |
| args = parser.parse_args() |
| asyncio.run(run_all_tasks(overrides=args)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|