"""Post-training evaluation: plot reward curves, compare policies, dump hero traces. Usage (on host venv, no GPU needed — pure post-hoc analysis): python eval.py curves --runs simmart-runs/smoke-1p5b-* python eval.py baselines --seeds 42 43 44 45 46 --out assets/baselines.json python eval.py trace --adapter --seed 42 --out assets/hero-trace.json """ from __future__ import annotations import argparse import glob import json import os import sys import statistics from pathlib import Path from typing import Any, Dict, List, Optional HERE = os.path.dirname(os.path.abspath(__file__)) if HERE not in sys.path: sys.path.insert(0, HERE) # --------------------------------------------------------------------------- # Curve plotting (matplotlib optional) # --------------------------------------------------------------------------- def cmd_curves(args: argparse.Namespace) -> int: runs = [] for pat in args.runs: runs.extend(sorted(glob.glob(pat))) if not runs: print(f"No runs matched: {args.runs}", file=sys.stderr) return 1 series: Dict[str, List[Dict[str, Any]]] = {} for run in runs: hist = Path(run) / "history.jsonl" if not hist.exists(): print(f"skip {run} (no history.jsonl)") continue entries = [json.loads(line) for line in hist.open() if line.strip()] series[os.path.basename(run)] = entries print(f"loaded {run}: {len(entries)} steps") if args.out: Path(args.out).parent.mkdir(parents=True, exist_ok=True) with open(args.out, "w") as f: json.dump(series, f, indent=2) print(f"wrote {args.out}") try: import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt except ImportError: print("matplotlib not installed — skipping PNG plot") return 0 fig, axes = plt.subplots(2, 2, figsize=(11, 8)) for name, entries in series.items(): xs = [e["step"] for e in entries] axes[0, 0].plot(xs, [e["mean_reward"] for e in entries], label=name, marker="o", ms=3) axes[0, 1].plot(xs, [e["mean_episode_return"] for e in entries], label=name, marker="o", ms=3) axes[1, 0].plot(xs, [e["parse_error_rate"] for e in entries], label=name, marker="o", ms=3) axes[1, 1].plot(xs, [e["rogue_recall"] for e in entries], label=name, marker="o", ms=3) axes[0, 0].set_title("Mean per-week reward"); axes[0, 0].set_xlabel("step") axes[0, 0].axhline(-5.23 / 13, ls="--", c="grey", alpha=0.5, label="random (-0.40)") axes[0, 0].axhline(-2.40 / 13, ls="--", c="green", alpha=0.5, label="heuristic (-0.18)") axes[0, 0].axhline(-2.27 / 13, ls="--", c="blue", alpha=0.5, label="oracle (-0.17)") axes[0, 1].set_title("Episode return (sum of 13 weeks)"); axes[0, 1].set_xlabel("step") axes[0, 1].axhline(-5.23, ls="--", c="grey", alpha=0.5, label="random (-5.23)") axes[0, 1].axhline(-2.40, ls="--", c="green", alpha=0.5, label="heuristic (-2.40)") axes[0, 1].axhline(-2.27, ls="--", c="blue", alpha=0.5, label="oracle (-2.27)") axes[1, 0].set_title("Parse error rate"); axes[1, 0].set_xlabel("step"); axes[1, 0].set_ylim(0, 1) axes[1, 1].set_title("Rogue catch recall"); axes[1, 1].set_xlabel("step"); axes[1, 1].set_ylim(0, 1) for ax in axes.flat: ax.legend(fontsize=7, loc="best") ax.grid(alpha=0.3) png = args.png or "assets/reward_curve.png" Path(png).parent.mkdir(parents=True, exist_ok=True) fig.tight_layout() fig.savefig(png, dpi=140) print(f"wrote {png}") return 0 # --------------------------------------------------------------------------- # Baseline battery # --------------------------------------------------------------------------- def cmd_baselines(args: argparse.Namespace) -> int: from inference import ( HeuristicCEO, OracleCEO, GodCEO, RandomCEO, FrontierCEO, run_policy, EpisodeResult, ) policies: List[Any] = [] skip = set(args.skip or []) if "random" not in skip: policies.append(RandomCEO()) if "heuristic" not in skip: policies.append(HeuristicCEO()) if "oracle" not in skip: policies.append(OracleCEO()) if args.include_god or "god" in (args.only or []): policies.append(GodCEO()) for m in (args.frontier_models or []): policies.append(FrontierCEO( provider=args.frontier_provider, model=m, api_base=args.frontier_api_base, temperature=args.frontier_temperature, max_tokens=args.frontier_max_tokens, budget_hint=not args.frontier_no_budget_hint, )) # Incremental save: checkpoint after each policy so long frontier sweeps # aren't lost to a single transient API hiccup or SIGINT. out: Dict[str, Dict[str, Any]] = {} if args.out and Path(args.out).exists(): try: out = json.loads(Path(args.out).read_text()) print(f"Resuming from {args.out}, {len(out)} policies already done: " f"{sorted(out.keys())}") except Exception: out = {} for pol in policies: if pol.name in out and not args.force: print(f"skip {pol.name} (already in {args.out})") continue print(f"\n>>> {pol.name} @ seeds={args.seeds}") results: List[EpisodeResult] = run_policy(pol, seeds=args.seeds, quiet=True) rewards = [r.total_reward for r in results] out[pol.name] = { "n": len(results), "mean_total_reward": statistics.mean(rewards), "std_total_reward": statistics.stdev(rewards) if len(rewards) > 1 else 0.0, "mean_ebitda_margin_pct": statistics.mean([r.ebitda_margin_pct for r in results]), "mean_avg_stockout_pct": statistics.mean([r.avg_stockout_pct for r in results]), "mean_avg_nps": statistics.mean([r.avg_nps for r in results]), "rogue_recall": statistics.mean([ (r.rogues_caught / r.rogues_total) if r.rogues_total else 1.0 for r in results ]), "seeds": list(args.seeds), "per_seed_total_reward": rewards, } if isinstance(pol, FrontierCEO): out[pol.name].update({ "provider": pol._provider, "model": pol._model, "n_parse_errors": pol.n_parse_errors, "n_api_errors": pol.n_api_errors, "total_tokens": pol.total_tokens, "total_prompt_tokens": pol.total_prompt_tokens, "total_completion_tokens": pol.total_completion_tokens, }) print(f"{pol.name:30s}: mean_r={out[pol.name]['mean_total_reward']:+.2f} " f"ebitda%={out[pol.name]['mean_ebitda_margin_pct']:+.2f} " f"rogue_rec={out[pol.name]['rogue_recall']:.2%}") if args.out: Path(args.out).parent.mkdir(parents=True, exist_ok=True) with open(args.out, "w") as f: json.dump(out, f, indent=2) if args.out: print(f"wrote {args.out}") return 0 # --------------------------------------------------------------------------- # Hero trace: replay a trained model on a fixed seed, dump (prompt, action, reward) # per week for the decision trace document. # --------------------------------------------------------------------------- def cmd_trace(args: argparse.Namespace) -> int: # These imports will pull in heavy deps, so only do on-demand from unsloth import FastLanguageModel import torch from models import SimMartAction from prompts import build_chat, parse_response, render_observation from server.environment import SimMartEnvironment print(f"Loading base model {args.model} + adapter {args.adapter}") model, tokenizer = FastLanguageModel.from_pretrained( model_name=args.model, max_seq_length=4096, dtype=torch.bfloat16, load_in_4bit=True, ) if args.adapter: model.load_adapter(args.adapter, adapter_name="default") FastLanguageModel.for_inference(model) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token env = SimMartEnvironment() obs = env.reset(seed=args.seed, episode_id=f"trace-{args.seed}") trace: List[Dict[str, Any]] = [] total_reward = 0.0 while obs.step_type != "episode_end": chat = build_chat(obs) prompt = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True) enc = tokenizer(prompt, return_tensors="pt").to(model.device) with torch.inference_mode(): out = model.generate( **enc, max_new_tokens=700, do_sample=False, temperature=1.0, pad_token_id=tokenizer.pad_token_id, ) completion = tokenizer.decode(out[0, enc.input_ids.shape[1]:], skip_special_tokens=True) action, tel = parse_response(completion, obs.inbox) observation_render = render_observation(obs) trace.append({ "week": obs.week_of_quarter, "observation_text": observation_render, "completion_text": completion, "parsed_decisions": [d.model_dump() for d in action.decisions], "journal_entry": action.journal_entry, "parse_ok": tel["parse_ok"], "parse_error": tel.get("parse_error"), }) obs = env.step(action) r = obs.reward or 0.0 trace[-1]["reward"] = r total_reward += r summary = { "seed": args.seed, "adapter": args.adapter, "model": args.model, "total_reward": total_reward, "final_ebitda_margin_pct": env.state.company.pnl_qtd.ebitda_margin_pct, "final_cash_inr": env.state.company.balance_sheet.cash_inr, "rogues_total": len(env.state.rogue_incidents), "rogues_caught": sum(1 for r in env.state.rogue_incidents if r.caught), "weeks": trace, } Path(args.out).parent.mkdir(parents=True, exist_ok=True) with open(args.out, "w") as f: json.dump(summary, f, indent=2) print(f"trace → {args.out} total_reward={total_reward:+.3f} " f"ebitda%={summary['final_ebitda_margin_pct']:+.2f} " f"rogues={summary['rogues_caught']}/{summary['rogues_total']}") return 0 # --------------------------------------------------------------------------- # Checkpoint sweep: evaluate each saved adapter on N held-out seeds, deterministic. # --------------------------------------------------------------------------- def cmd_ckpts(args: argparse.Namespace) -> int: from unsloth import FastLanguageModel import torch from prompts import build_chat, parse_response from server.environment import SimMartEnvironment adapters = [] for pat in args.adapters: adapters.extend(sorted(glob.glob(pat))) if not adapters: print(f"No adapters matched: {args.adapters}", file=sys.stderr) return 1 print(f"Found {len(adapters)} adapter(s):") for a in adapters: print(f" {a}") print(f"Loading base model {args.model}") model, tokenizer = FastLanguageModel.from_pretrained( model_name=args.model, max_seq_length=4096, dtype=torch.bfloat16, load_in_4bit=True, ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load each adapter into its own slot on the same base model so we can # swap without reloading weights. adapter_names: List[str] = [] for i, a in enumerate(adapters): name = f"ckpt{i}" model.load_adapter(a, adapter_name=name) adapter_names.append(name) FastLanguageModel.for_inference(model) results: Dict[str, Dict[str, Any]] = {} env = SimMartEnvironment() for name, adapter_path in zip(adapter_names, adapters): model.set_adapter(name) totals: List[float] = [] ebitdas: List[float] = [] stockouts: List[float] = [] parse_errs: List[float] = [] rogue_rec: List[float] = [] for seed in args.seeds: obs = env.reset(seed=seed, episode_id=f"ckpt-{name}-{seed}") total = 0.0 n_parse_err = 0 n_weeks = 0 while obs.step_type != "episode_end": chat = build_chat(obs) prompt = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True) enc = tokenizer(prompt, return_tensors="pt").to(model.device) with torch.inference_mode(): out = model.generate( **enc, max_new_tokens=args.max_new_tokens, do_sample=False, temperature=1.0, pad_token_id=tokenizer.pad_token_id, ) completion = tokenizer.decode(out[0, enc.input_ids.shape[1]:], skip_special_tokens=True) action, tel = parse_response(completion, obs.inbox) if not tel["parse_ok"]: n_parse_err += 1 n_weeks += 1 obs = env.step(action) total += (obs.reward or 0.0) totals.append(total) ebitdas.append(env.state.company.pnl_qtd.ebitda_margin_pct) parse_errs.append(n_parse_err / max(n_weeks, 1)) caught = sum(1 for r in env.state.rogue_incidents if r.caught) tot = len(env.state.rogue_incidents) rogue_rec.append(caught / tot if tot else 1.0) # stockout proxy: check avg across categories from weekly history stockouts.append(0.0) # placeholder — not critical for selection print(f" {name} seed={seed}: ep_ret={total:+.3f} " f"ebitda%={ebitdas[-1]:+.2f} parse_err={parse_errs[-1]:.2%} " f"rogue_rec={rogue_rec[-1]:.2%}") results[adapter_path] = { "name": name, "seeds": args.seeds, "per_seed_total_reward": totals, "mean_total_reward": statistics.mean(totals), "std_total_reward": statistics.stdev(totals) if len(totals) > 1 else 0.0, "mean_ebitda_margin_pct": statistics.mean(ebitdas), "mean_parse_error_rate": statistics.mean(parse_errs), "mean_rogue_recall": statistics.mean(rogue_rec), } print("\n=== Checkpoint sweep summary ===") best = max(results.items(), key=lambda kv: kv[1]["mean_total_reward"]) for path, r in results.items(): tag = " *BEST*" if path == best[0] else "" print(f" {os.path.basename(path):24s} " f"mean_r={r['mean_total_reward']:+.3f}±{r['std_total_reward']:.3f} " f"ebitda%={r['mean_ebitda_margin_pct']:+.2f} " f"parse_err={r['mean_parse_error_rate']:.2%} " f"rogue_rec={r['mean_rogue_recall']:.2%}{tag}") if args.out: Path(args.out).parent.mkdir(parents=True, exist_ok=True) with open(args.out, "w") as f: json.dump({ "base_model": args.model, "seeds": args.seeds, "results": results, "best_adapter": best[0], }, f, indent=2) print(f"wrote {args.out}") return 0 def cmd_ckpts_dual(args: argparse.Namespace) -> int: """Dual-head checkpoint sweep: eval action ckpts paired with a fixed journal ckpt.""" from unsloth import FastLanguageModel import torch from inference import DualHeadCEO from server.environment import SimMartEnvironment action_adapters: List[str] = [] for pat in args.action_adapters: action_adapters.extend(sorted(glob.glob(pat))) if not action_adapters: print(f"No action adapters matched: {args.action_adapters}", file=sys.stderr) return 1 print(f"Found {len(action_adapters)} action adapter(s):") for a in action_adapters: print(f" {a}") print(f"Journal adapter: {args.journal_adapter}") print(f"Loading base model {args.model}") model, tokenizer = FastLanguageModel.from_pretrained( model_name=args.model, max_seq_length=4096, dtype=torch.bfloat16, load_in_4bit=True, ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load the journal adapter once (shared across all action ckpts) model.load_adapter(args.journal_adapter, adapter_name="journal") # Register each action adapter under its own name so we can hot-swap. action_names: List[str] = [] for i, a in enumerate(action_adapters): name = f"act{i}" model.load_adapter(a, adapter_name=name) action_names.append(name) # Unsloth's fused fast-path is tied to the single adapter active at # `for_inference` time; set_adapter() swaps bypass the fused kernels and # drop us to ~6% SM util (observed in v6 eval). Opt-out by default for # dual-head so both adapters share the standard Unsloth+PEFT fast path. # Flip DUAL_FAST_INFER=1 to force the old (single-adapter) fused path. if bool(int(os.environ.get("DUAL_FAST_INFER", "0"))): FastLanguageModel.for_inference(model) print("[eval] FastLanguageModel.for_inference enabled (single-adapter fused)", flush=True) else: model.eval() print("[eval] Skipping for_inference (dual-adapter mode; eval() only)", flush=True) import time verbose = bool(int(os.environ.get("DUAL_VERBOSE", "1"))) results: Dict[str, Dict[str, Any]] = {} env = SimMartEnvironment() for name, adapter_path in zip(action_names, action_adapters): ceo = DualHeadCEO( model, tokenizer, action_adapter=name, journal_adapter="journal", action_max_tokens=args.action_max_tokens, journal_max_tokens=args.journal_max_tokens, do_sample=False, verbose=verbose, ) totals: List[float] = [] ebitdas: List[float] = [] parse_errs: List[float] = [] rogue_rec: List[float] = [] for seed in args.seeds: obs = env.reset(seed=seed, episode_id=f"dual-{name}-{seed}") total = 0.0 n_weeks = 0 ceo.n_action_parse_err = 0 ceo.t_action_s = 0.0 ceo.t_journal_s = 0.0 ceo.n_action_tokens = 0 ceo.n_journal_tokens = 0 t_seed = time.time() print(f"[eval] {name} seed={seed} starting...", flush=True) while obs.step_type != "episode_end": n_weeks += 1 act = ceo.act(obs, week=n_weeks) obs = env.step(act) total += (obs.reward or 0.0) seed_wall = time.time() - t_seed totals.append(total) ebitdas.append(env.state.company.pnl_qtd.ebitda_margin_pct) parse_errs.append(ceo.n_action_parse_err / max(n_weeks, 1)) caught = sum(1 for r in env.state.rogue_incidents if r.caught) tot = len(env.state.rogue_incidents) rogue_rec.append(caught / tot if tot else 1.0) tok_tot = ceo.n_action_tokens + ceo.n_journal_tokens tok_s = tok_tot / seed_wall if seed_wall > 0 else 0 print( f" {name} seed={seed}: ep_ret={total:+.3f} " f"ebitda%={ebitdas[-1]:+.2f} parse_err={parse_errs[-1]:.2%} " f"rogue_rec={rogue_rec[-1]:.2%} " f"[{seed_wall:.0f}s total, {tok_tot} tok, {tok_s:.0f} t/s; " f"act {ceo.t_action_s:.0f}s jrn {ceo.t_journal_s:.0f}s]", flush=True, ) results[adapter_path] = { "name": name, "action_adapter": adapter_path, "journal_adapter": args.journal_adapter, "seeds": args.seeds, "per_seed_total_reward": totals, "mean_total_reward": statistics.mean(totals), "std_total_reward": statistics.stdev(totals) if len(totals) > 1 else 0.0, "mean_ebitda_margin_pct": statistics.mean(ebitdas), "mean_parse_error_rate": statistics.mean(parse_errs), "mean_rogue_recall": statistics.mean(rogue_rec), } print("\n=== Dual-head checkpoint sweep summary ===") best = max(results.items(), key=lambda kv: kv[1]["mean_total_reward"]) for path, r in results.items(): tag = " *BEST*" if path == best[0] else "" print(f" {os.path.basename(path):24s} " f"mean_r={r['mean_total_reward']:+.3f}±{r['std_total_reward']:.3f} " f"ebitda%={r['mean_ebitda_margin_pct']:+.2f} " f"parse_err={r['mean_parse_error_rate']:.2%} " f"rogue_rec={r['mean_rogue_recall']:.2%}{tag}") if args.out: Path(args.out).parent.mkdir(parents=True, exist_ok=True) with open(args.out, "w") as f: json.dump({ "mode": "dual-head", "base_model": args.model, "journal_adapter": args.journal_adapter, "seeds": args.seeds, "results": results, "best_action_adapter": best[0], }, f, indent=2) print(f"wrote {args.out}") return 0 def cmd_dual_baselines(args: argparse.Namespace) -> int: """Apples-to-apples dual-head comparison: run baselines + frontier models on the same seeds the dual-head SFT/GRPO eval uses. Baselines (random/heuristic/oracle/god) are naturally single-pass (no LLM); they are included here because their journal is rule-generated and the comparison target is the same set of reward components. Frontier models run in ``dual_head=True`` mode: two API calls per week using build_action_chat (300 tok) + build_journal_chat (400 tok), mirroring the DualHeadCEO wire format. Incremental-save: any policy already present in --out is skipped unless --force. This makes long frontier sweeps robust to transient API errors. """ import time from inference import ( RandomCEO, HeuristicCEO, OracleCEO, GodCEO, FrontierCEO, run_policy, EpisodeResult, ) policies: List[Any] = [] only = set(args.only or []) skip = set(args.skip or []) def _want(name: str, default: bool) -> bool: if only: return name in only return default and (name not in skip) if _want("random", True): policies.append(RandomCEO()) if _want("heuristic", True): policies.append(HeuristicCEO()) if _want("oracle", True): policies.append(OracleCEO()) if _want("god", True): policies.append(GodCEO()) for m in (args.frontier_models or []): policies.append(FrontierCEO( provider=args.frontier_provider, model=m, api_base=args.frontier_api_base, temperature=args.frontier_temperature, max_tokens=(args.action_max_tokens + args.journal_max_tokens), budget_hint=False, # dual-head budget is split across two calls dual_head=True, action_max_tokens=args.action_max_tokens, journal_max_tokens=args.journal_max_tokens, permissive=args.frontier_permissive, )) # Resume-friendly output out: Dict[str, Dict[str, Any]] = {} if args.out and Path(args.out).exists(): try: out = json.loads(Path(args.out).read_text()) print(f"[dual-baselines] resuming {args.out}, {len(out)} already done: " f"{sorted(out.keys())}", flush=True) except Exception: out = {} for pol in policies: if pol.name in out and not args.force: print(f"[dual-baselines] skip {pol.name} (already in {args.out})", flush=True) continue t0 = time.time() print(f"\n[dual-baselines] >>> {pol.name} seeds={list(args.seeds)}", flush=True) results: List[EpisodeResult] = run_policy(pol, seeds=list(args.seeds), quiet=False) wall = time.time() - t0 rewards = [r.total_reward for r in results] recalls = [(r.rogues_caught / r.rogues_total) if r.rogues_total else 1.0 for r in results] out[pol.name] = { "n": len(results), "wall_s": wall, "mean_total_reward": statistics.mean(rewards), "std_total_reward": statistics.stdev(rewards) if len(rewards) > 1 else 0.0, "mean_ebitda_margin_pct": statistics.mean([r.ebitda_margin_pct for r in results]), "mean_avg_stockout_pct": statistics.mean([r.avg_stockout_pct for r in results]), "mean_avg_nps": statistics.mean([r.avg_nps for r in results]), "rogue_recall": statistics.mean(recalls), "seeds": list(args.seeds), "per_seed_total_reward": rewards, "per_seed_rogue_recall": recalls, "per_seed_ebitda_margin_pct": [r.ebitda_margin_pct for r in results], } if isinstance(pol, FrontierCEO): out[pol.name].update({ "provider": pol._provider, "model": pol._model, "dual_head": True, "action_max_tokens": pol._action_max_tokens, "journal_max_tokens": pol._journal_max_tokens, "n_parse_errors": pol.n_parse_errors, "n_api_errors": pol.n_api_errors, "total_tokens": pol.total_tokens, "total_prompt_tokens": pol.total_prompt_tokens, "total_completion_tokens": pol.total_completion_tokens, }) print( f"[dual-baselines] {pol.name:40s} " f"mean_r={out[pol.name]['mean_total_reward']:+6.3f} " f"ebitda%={out[pol.name]['mean_ebitda_margin_pct']:+6.2f} " f"rogue_rec={out[pol.name]['rogue_recall']:.2%} " f"[{wall:.0f}s]", flush=True, ) if args.out: Path(args.out).parent.mkdir(parents=True, exist_ok=True) with open(args.out, "w") as f: json.dump(out, f, indent=2) # Final summary table, sorted by mean_total_reward print("\n=== Dual-head baselines summary ===", flush=True) ranked = sorted(out.items(), key=lambda kv: kv[1]["mean_total_reward"], reverse=True) for name, r in ranked: print( f" {name:40s} mean_r={r['mean_total_reward']:+6.3f}±{r['std_total_reward']:.3f} " f"ebitda%={r['mean_ebitda_margin_pct']:+6.2f} " f"rogue_rec={r['rogue_recall']:.2%}", flush=True, ) if args.out: print(f"wrote {args.out}", flush=True) return 0 def main() -> int: p = argparse.ArgumentParser() sub = p.add_subparsers(dest="cmd", required=True) pc = sub.add_parser("curves", help="plot reward curves from one or more run dirs") pc.add_argument("--runs", nargs="+", required=True, help="glob patterns, e.g. 'simmart-runs/smoke-1p5b-*'") pc.add_argument("--png", default=None, help="output PNG path") pc.add_argument("--out", default=None, help="output JSON dump path") pc.set_defaults(fn=cmd_curves) pb = sub.add_parser("baselines", help="benchmark random/heuristic/oracle (+ optional frontier LLMs) on N seeds") pb.add_argument("--seeds", type=int, nargs="+", default=[42, 43, 44, 45, 46]) pb.add_argument("--out", default=None) pb.add_argument("--skip", nargs="*", default=[], help="skip built-in policies by name (e.g. --skip random oracle)") pb.add_argument("--include-god", action="store_true", help="include GodCEO (ground-truth cheat + engineered journal) " "to measure empirical ceiling") pb.add_argument("--only", nargs="*", default=[], help="force-include policies (e.g. --only god)") pb.add_argument("--force", action="store_true", help="re-run policies even if already in --out") pb.add_argument("--frontier-models", nargs="*", default=[], help="frontier model ids to add (e.g. Claude-Haiku-4.5 Claude-Sonnet-4.6)") pb.add_argument("--frontier-provider", choices=["auto", "openai", "anthropic", "openai_responses"], default="auto") pb.add_argument("--frontier-api-base", default=None) pb.add_argument("--frontier-temperature", type=float, default=0.0) pb.add_argument("--frontier-max-tokens", type=int, default=None, help="hard token cap (defaults to FrontierCEO.DEFAULT_MAX_TOKENS=600)") pb.add_argument("--frontier-no-budget-hint", action="store_true", help="do NOT tell the frontier model its token budget") pb.set_defaults(fn=cmd_baselines) pt = sub.add_parser("trace", help="rollout a trained adapter deterministically, dump trace") pt.add_argument("--model", default="Qwen/Qwen2.5-1.5B-Instruct") pt.add_argument("--adapter", default=None) pt.add_argument("--seed", type=int, default=42) pt.add_argument("--out", required=True) pt.set_defaults(fn=cmd_trace) pk = sub.add_parser("ckpts", help="sweep many adapters × many seeds, pick best") pk.add_argument("--model", default="Qwen/Qwen2.5-1.5B-Instruct") pk.add_argument("--adapters", nargs="+", required=True, help="glob patterns, e.g. 'simmart-runs/hero-*/adapter-step-*'") pk.add_argument("--seeds", type=int, nargs="+", default=[101, 202, 303]) pk.add_argument("--max-new-tokens", type=int, default=600) pk.add_argument("--out", default=None) pk.set_defaults(fn=cmd_ckpts) pkd = sub.add_parser( "ckpts-dual", help="dual-head eval: sweep action adapters, each paired with a fixed frozen journal adapter", ) pkd.add_argument("--model", default="Qwen/Qwen2.5-1.5B-Instruct") pkd.add_argument("--action-adapters", nargs="+", required=True, help="glob patterns for trainable (action) LoRA checkpoints") pkd.add_argument("--journal-adapter", required=True, help="path to the frozen journal LoRA (shared across all action ckpts)") pkd.add_argument("--seeds", type=int, nargs="+", default=[42, 43, 44, 45, 46]) pkd.add_argument("--action-max-tokens", type=int, default=300) pkd.add_argument("--journal-max-tokens", type=int, default=400) pkd.add_argument("--out", default=None) pkd.set_defaults(fn=cmd_ckpts_dual) pdb = sub.add_parser( "dual-baselines", help="dual-head apples-to-apples: random/heuristic/oracle/god + frontier (two-pass)", ) pdb.add_argument("--seeds", type=int, nargs="+", default=[42, 43, 44, 45, 46]) pdb.add_argument("--out", default="assets/baselines_dual.json") pdb.add_argument("--only", nargs="*", default=[], help="restrict to a subset, e.g. --only heuristic god") pdb.add_argument("--skip", nargs="*", default=[], help="skip baselines by name (e.g. --skip random oracle)") pdb.add_argument("--force", action="store_true", help="re-run policies even if already in --out") pdb.add_argument("--frontier-models", nargs="*", default=[], help="frontier model ids to evaluate in dual-head mode " "(e.g. Claude-Sonnet-4.6 gpt-5.4)") pdb.add_argument("--frontier-provider", choices=["auto", "openai", "anthropic", "openai_responses"], default="auto") pdb.add_argument("--frontier-api-base", default=None) pdb.add_argument("--frontier-temperature", type=float, default=0.0) pdb.add_argument("--action-max-tokens", type=int, default=300) pdb.add_argument("--journal-max-tokens", type=int, default=400) pdb.add_argument("--frontier-permissive", action="store_true", help="use ACTION_SYSTEM_PROMPT_PERMISSIVE for frontier " "models — allows CoT before , " "adds rogue checklist + budget guidance. " "Policy names get `-permissive` suffix so strict & " "permissive runs coexist in the same --out JSON.") pdb.set_defaults(fn=cmd_dual_baselines) args = p.parse_args() return args.fn(args) if __name__ == "__main__": raise SystemExit(main())