SimMart / eval.py
Viani's picture
inference: rename internal `amd` provider to generic `openai_responses`
745336f
"""Post-training evaluation: plot reward curves, compare policies, dump hero traces.
Usage (on host venv, no GPU needed — pure post-hoc analysis):
python eval.py curves --runs simmart-runs/smoke-1p5b-*
python eval.py baselines --seeds 42 43 44 45 46 --out assets/baselines.json
python eval.py trace --adapter <path> --seed 42 --out assets/hero-trace.json
"""
from __future__ import annotations
import argparse
import glob
import json
import os
import sys
import statistics
from pathlib import Path
from typing import Any, Dict, List, Optional
HERE = os.path.dirname(os.path.abspath(__file__))
if HERE not in sys.path:
sys.path.insert(0, HERE)
# ---------------------------------------------------------------------------
# Curve plotting (matplotlib optional)
# ---------------------------------------------------------------------------
def cmd_curves(args: argparse.Namespace) -> int:
runs = []
for pat in args.runs:
runs.extend(sorted(glob.glob(pat)))
if not runs:
print(f"No runs matched: {args.runs}", file=sys.stderr)
return 1
series: Dict[str, List[Dict[str, Any]]] = {}
for run in runs:
hist = Path(run) / "history.jsonl"
if not hist.exists():
print(f"skip {run} (no history.jsonl)")
continue
entries = [json.loads(line) for line in hist.open() if line.strip()]
series[os.path.basename(run)] = entries
print(f"loaded {run}: {len(entries)} steps")
if args.out:
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
with open(args.out, "w") as f:
json.dump(series, f, indent=2)
print(f"wrote {args.out}")
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
except ImportError:
print("matplotlib not installed — skipping PNG plot")
return 0
fig, axes = plt.subplots(2, 2, figsize=(11, 8))
for name, entries in series.items():
xs = [e["step"] for e in entries]
axes[0, 0].plot(xs, [e["mean_reward"] for e in entries], label=name, marker="o", ms=3)
axes[0, 1].plot(xs, [e["mean_episode_return"] for e in entries], label=name, marker="o", ms=3)
axes[1, 0].plot(xs, [e["parse_error_rate"] for e in entries], label=name, marker="o", ms=3)
axes[1, 1].plot(xs, [e["rogue_recall"] for e in entries], label=name, marker="o", ms=3)
axes[0, 0].set_title("Mean per-week reward"); axes[0, 0].set_xlabel("step")
axes[0, 0].axhline(-5.23 / 13, ls="--", c="grey", alpha=0.5, label="random (-0.40)")
axes[0, 0].axhline(-2.40 / 13, ls="--", c="green", alpha=0.5, label="heuristic (-0.18)")
axes[0, 0].axhline(-2.27 / 13, ls="--", c="blue", alpha=0.5, label="oracle (-0.17)")
axes[0, 1].set_title("Episode return (sum of 13 weeks)"); axes[0, 1].set_xlabel("step")
axes[0, 1].axhline(-5.23, ls="--", c="grey", alpha=0.5, label="random (-5.23)")
axes[0, 1].axhline(-2.40, ls="--", c="green", alpha=0.5, label="heuristic (-2.40)")
axes[0, 1].axhline(-2.27, ls="--", c="blue", alpha=0.5, label="oracle (-2.27)")
axes[1, 0].set_title("Parse error rate"); axes[1, 0].set_xlabel("step"); axes[1, 0].set_ylim(0, 1)
axes[1, 1].set_title("Rogue catch recall"); axes[1, 1].set_xlabel("step"); axes[1, 1].set_ylim(0, 1)
for ax in axes.flat:
ax.legend(fontsize=7, loc="best")
ax.grid(alpha=0.3)
png = args.png or "assets/reward_curve.png"
Path(png).parent.mkdir(parents=True, exist_ok=True)
fig.tight_layout()
fig.savefig(png, dpi=140)
print(f"wrote {png}")
return 0
# ---------------------------------------------------------------------------
# Baseline battery
# ---------------------------------------------------------------------------
def cmd_baselines(args: argparse.Namespace) -> int:
from inference import (
HeuristicCEO, OracleCEO, GodCEO, RandomCEO, FrontierCEO,
run_policy, EpisodeResult,
)
policies: List[Any] = []
skip = set(args.skip or [])
if "random" not in skip:
policies.append(RandomCEO())
if "heuristic" not in skip:
policies.append(HeuristicCEO())
if "oracle" not in skip:
policies.append(OracleCEO())
if args.include_god or "god" in (args.only or []):
policies.append(GodCEO())
for m in (args.frontier_models or []):
policies.append(FrontierCEO(
provider=args.frontier_provider,
model=m,
api_base=args.frontier_api_base,
temperature=args.frontier_temperature,
max_tokens=args.frontier_max_tokens,
budget_hint=not args.frontier_no_budget_hint,
))
# Incremental save: checkpoint after each policy so long frontier sweeps
# aren't lost to a single transient API hiccup or SIGINT.
out: Dict[str, Dict[str, Any]] = {}
if args.out and Path(args.out).exists():
try:
out = json.loads(Path(args.out).read_text())
print(f"Resuming from {args.out}, {len(out)} policies already done: "
f"{sorted(out.keys())}")
except Exception:
out = {}
for pol in policies:
if pol.name in out and not args.force:
print(f"skip {pol.name} (already in {args.out})")
continue
print(f"\n>>> {pol.name} @ seeds={args.seeds}")
results: List[EpisodeResult] = run_policy(pol, seeds=args.seeds, quiet=True)
rewards = [r.total_reward for r in results]
out[pol.name] = {
"n": len(results),
"mean_total_reward": statistics.mean(rewards),
"std_total_reward": statistics.stdev(rewards) if len(rewards) > 1 else 0.0,
"mean_ebitda_margin_pct": statistics.mean([r.ebitda_margin_pct for r in results]),
"mean_avg_stockout_pct": statistics.mean([r.avg_stockout_pct for r in results]),
"mean_avg_nps": statistics.mean([r.avg_nps for r in results]),
"rogue_recall": statistics.mean([
(r.rogues_caught / r.rogues_total) if r.rogues_total else 1.0
for r in results
]),
"seeds": list(args.seeds),
"per_seed_total_reward": rewards,
}
if isinstance(pol, FrontierCEO):
out[pol.name].update({
"provider": pol._provider,
"model": pol._model,
"n_parse_errors": pol.n_parse_errors,
"n_api_errors": pol.n_api_errors,
"total_tokens": pol.total_tokens,
"total_prompt_tokens": pol.total_prompt_tokens,
"total_completion_tokens": pol.total_completion_tokens,
})
print(f"{pol.name:30s}: mean_r={out[pol.name]['mean_total_reward']:+.2f} "
f"ebitda%={out[pol.name]['mean_ebitda_margin_pct']:+.2f} "
f"rogue_rec={out[pol.name]['rogue_recall']:.2%}")
if args.out:
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
with open(args.out, "w") as f:
json.dump(out, f, indent=2)
if args.out:
print(f"wrote {args.out}")
return 0
# ---------------------------------------------------------------------------
# Hero trace: replay a trained model on a fixed seed, dump (prompt, action, reward)
# per week for the decision trace document.
# ---------------------------------------------------------------------------
def cmd_trace(args: argparse.Namespace) -> int:
# These imports will pull in heavy deps, so only do on-demand
from unsloth import FastLanguageModel
import torch
from models import SimMartAction
from prompts import build_chat, parse_response, render_observation
from server.environment import SimMartEnvironment
print(f"Loading base model {args.model} + adapter {args.adapter}")
model, tokenizer = FastLanguageModel.from_pretrained(
model_name=args.model,
max_seq_length=4096,
dtype=torch.bfloat16,
load_in_4bit=True,
)
if args.adapter:
model.load_adapter(args.adapter, adapter_name="default")
FastLanguageModel.for_inference(model)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
env = SimMartEnvironment()
obs = env.reset(seed=args.seed, episode_id=f"trace-{args.seed}")
trace: List[Dict[str, Any]] = []
total_reward = 0.0
while obs.step_type != "episode_end":
chat = build_chat(obs)
prompt = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
enc = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.inference_mode():
out = model.generate(
**enc, max_new_tokens=700, do_sample=False,
temperature=1.0, pad_token_id=tokenizer.pad_token_id,
)
completion = tokenizer.decode(out[0, enc.input_ids.shape[1]:], skip_special_tokens=True)
action, tel = parse_response(completion, obs.inbox)
observation_render = render_observation(obs)
trace.append({
"week": obs.week_of_quarter,
"observation_text": observation_render,
"completion_text": completion,
"parsed_decisions": [d.model_dump() for d in action.decisions],
"journal_entry": action.journal_entry,
"parse_ok": tel["parse_ok"],
"parse_error": tel.get("parse_error"),
})
obs = env.step(action)
r = obs.reward or 0.0
trace[-1]["reward"] = r
total_reward += r
summary = {
"seed": args.seed,
"adapter": args.adapter,
"model": args.model,
"total_reward": total_reward,
"final_ebitda_margin_pct": env.state.company.pnl_qtd.ebitda_margin_pct,
"final_cash_inr": env.state.company.balance_sheet.cash_inr,
"rogues_total": len(env.state.rogue_incidents),
"rogues_caught": sum(1 for r in env.state.rogue_incidents if r.caught),
"weeks": trace,
}
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
with open(args.out, "w") as f:
json.dump(summary, f, indent=2)
print(f"trace → {args.out} total_reward={total_reward:+.3f} "
f"ebitda%={summary['final_ebitda_margin_pct']:+.2f} "
f"rogues={summary['rogues_caught']}/{summary['rogues_total']}")
return 0
# ---------------------------------------------------------------------------
# Checkpoint sweep: evaluate each saved adapter on N held-out seeds, deterministic.
# ---------------------------------------------------------------------------
def cmd_ckpts(args: argparse.Namespace) -> int:
from unsloth import FastLanguageModel
import torch
from prompts import build_chat, parse_response
from server.environment import SimMartEnvironment
adapters = []
for pat in args.adapters:
adapters.extend(sorted(glob.glob(pat)))
if not adapters:
print(f"No adapters matched: {args.adapters}", file=sys.stderr)
return 1
print(f"Found {len(adapters)} adapter(s):")
for a in adapters:
print(f" {a}")
print(f"Loading base model {args.model}")
model, tokenizer = FastLanguageModel.from_pretrained(
model_name=args.model,
max_seq_length=4096,
dtype=torch.bfloat16,
load_in_4bit=True,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load each adapter into its own slot on the same base model so we can
# swap without reloading weights.
adapter_names: List[str] = []
for i, a in enumerate(adapters):
name = f"ckpt{i}"
model.load_adapter(a, adapter_name=name)
adapter_names.append(name)
FastLanguageModel.for_inference(model)
results: Dict[str, Dict[str, Any]] = {}
env = SimMartEnvironment()
for name, adapter_path in zip(adapter_names, adapters):
model.set_adapter(name)
totals: List[float] = []
ebitdas: List[float] = []
stockouts: List[float] = []
parse_errs: List[float] = []
rogue_rec: List[float] = []
for seed in args.seeds:
obs = env.reset(seed=seed, episode_id=f"ckpt-{name}-{seed}")
total = 0.0
n_parse_err = 0
n_weeks = 0
while obs.step_type != "episode_end":
chat = build_chat(obs)
prompt = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
enc = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.inference_mode():
out = model.generate(
**enc, max_new_tokens=args.max_new_tokens, do_sample=False,
temperature=1.0, pad_token_id=tokenizer.pad_token_id,
)
completion = tokenizer.decode(out[0, enc.input_ids.shape[1]:], skip_special_tokens=True)
action, tel = parse_response(completion, obs.inbox)
if not tel["parse_ok"]:
n_parse_err += 1
n_weeks += 1
obs = env.step(action)
total += (obs.reward or 0.0)
totals.append(total)
ebitdas.append(env.state.company.pnl_qtd.ebitda_margin_pct)
parse_errs.append(n_parse_err / max(n_weeks, 1))
caught = sum(1 for r in env.state.rogue_incidents if r.caught)
tot = len(env.state.rogue_incidents)
rogue_rec.append(caught / tot if tot else 1.0)
# stockout proxy: check avg across categories from weekly history
stockouts.append(0.0) # placeholder — not critical for selection
print(f" {name} seed={seed}: ep_ret={total:+.3f} "
f"ebitda%={ebitdas[-1]:+.2f} parse_err={parse_errs[-1]:.2%} "
f"rogue_rec={rogue_rec[-1]:.2%}")
results[adapter_path] = {
"name": name,
"seeds": args.seeds,
"per_seed_total_reward": totals,
"mean_total_reward": statistics.mean(totals),
"std_total_reward": statistics.stdev(totals) if len(totals) > 1 else 0.0,
"mean_ebitda_margin_pct": statistics.mean(ebitdas),
"mean_parse_error_rate": statistics.mean(parse_errs),
"mean_rogue_recall": statistics.mean(rogue_rec),
}
print("\n=== Checkpoint sweep summary ===")
best = max(results.items(), key=lambda kv: kv[1]["mean_total_reward"])
for path, r in results.items():
tag = " *BEST*" if path == best[0] else ""
print(f" {os.path.basename(path):24s} "
f"mean_r={r['mean_total_reward']:+.3f}±{r['std_total_reward']:.3f} "
f"ebitda%={r['mean_ebitda_margin_pct']:+.2f} "
f"parse_err={r['mean_parse_error_rate']:.2%} "
f"rogue_rec={r['mean_rogue_recall']:.2%}{tag}")
if args.out:
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
with open(args.out, "w") as f:
json.dump({
"base_model": args.model,
"seeds": args.seeds,
"results": results,
"best_adapter": best[0],
}, f, indent=2)
print(f"wrote {args.out}")
return 0
def cmd_ckpts_dual(args: argparse.Namespace) -> int:
"""Dual-head checkpoint sweep: eval action ckpts paired with a fixed journal ckpt."""
from unsloth import FastLanguageModel
import torch
from inference import DualHeadCEO
from server.environment import SimMartEnvironment
action_adapters: List[str] = []
for pat in args.action_adapters:
action_adapters.extend(sorted(glob.glob(pat)))
if not action_adapters:
print(f"No action adapters matched: {args.action_adapters}", file=sys.stderr)
return 1
print(f"Found {len(action_adapters)} action adapter(s):")
for a in action_adapters:
print(f" {a}")
print(f"Journal adapter: {args.journal_adapter}")
print(f"Loading base model {args.model}")
model, tokenizer = FastLanguageModel.from_pretrained(
model_name=args.model,
max_seq_length=4096,
dtype=torch.bfloat16,
load_in_4bit=True,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load the journal adapter once (shared across all action ckpts)
model.load_adapter(args.journal_adapter, adapter_name="journal")
# Register each action adapter under its own name so we can hot-swap.
action_names: List[str] = []
for i, a in enumerate(action_adapters):
name = f"act{i}"
model.load_adapter(a, adapter_name=name)
action_names.append(name)
# Unsloth's fused fast-path is tied to the single adapter active at
# `for_inference` time; set_adapter() swaps bypass the fused kernels and
# drop us to ~6% SM util (observed in v6 eval). Opt-out by default for
# dual-head so both adapters share the standard Unsloth+PEFT fast path.
# Flip DUAL_FAST_INFER=1 to force the old (single-adapter) fused path.
if bool(int(os.environ.get("DUAL_FAST_INFER", "0"))):
FastLanguageModel.for_inference(model)
print("[eval] FastLanguageModel.for_inference enabled (single-adapter fused)", flush=True)
else:
model.eval()
print("[eval] Skipping for_inference (dual-adapter mode; eval() only)", flush=True)
import time
verbose = bool(int(os.environ.get("DUAL_VERBOSE", "1")))
results: Dict[str, Dict[str, Any]] = {}
env = SimMartEnvironment()
for name, adapter_path in zip(action_names, action_adapters):
ceo = DualHeadCEO(
model, tokenizer,
action_adapter=name,
journal_adapter="journal",
action_max_tokens=args.action_max_tokens,
journal_max_tokens=args.journal_max_tokens,
do_sample=False,
verbose=verbose,
)
totals: List[float] = []
ebitdas: List[float] = []
parse_errs: List[float] = []
rogue_rec: List[float] = []
for seed in args.seeds:
obs = env.reset(seed=seed, episode_id=f"dual-{name}-{seed}")
total = 0.0
n_weeks = 0
ceo.n_action_parse_err = 0
ceo.t_action_s = 0.0
ceo.t_journal_s = 0.0
ceo.n_action_tokens = 0
ceo.n_journal_tokens = 0
t_seed = time.time()
print(f"[eval] {name} seed={seed} starting...", flush=True)
while obs.step_type != "episode_end":
n_weeks += 1
act = ceo.act(obs, week=n_weeks)
obs = env.step(act)
total += (obs.reward or 0.0)
seed_wall = time.time() - t_seed
totals.append(total)
ebitdas.append(env.state.company.pnl_qtd.ebitda_margin_pct)
parse_errs.append(ceo.n_action_parse_err / max(n_weeks, 1))
caught = sum(1 for r in env.state.rogue_incidents if r.caught)
tot = len(env.state.rogue_incidents)
rogue_rec.append(caught / tot if tot else 1.0)
tok_tot = ceo.n_action_tokens + ceo.n_journal_tokens
tok_s = tok_tot / seed_wall if seed_wall > 0 else 0
print(
f" {name} seed={seed}: ep_ret={total:+.3f} "
f"ebitda%={ebitdas[-1]:+.2f} parse_err={parse_errs[-1]:.2%} "
f"rogue_rec={rogue_rec[-1]:.2%} "
f"[{seed_wall:.0f}s total, {tok_tot} tok, {tok_s:.0f} t/s; "
f"act {ceo.t_action_s:.0f}s jrn {ceo.t_journal_s:.0f}s]",
flush=True,
)
results[adapter_path] = {
"name": name,
"action_adapter": adapter_path,
"journal_adapter": args.journal_adapter,
"seeds": args.seeds,
"per_seed_total_reward": totals,
"mean_total_reward": statistics.mean(totals),
"std_total_reward": statistics.stdev(totals) if len(totals) > 1 else 0.0,
"mean_ebitda_margin_pct": statistics.mean(ebitdas),
"mean_parse_error_rate": statistics.mean(parse_errs),
"mean_rogue_recall": statistics.mean(rogue_rec),
}
print("\n=== Dual-head checkpoint sweep summary ===")
best = max(results.items(), key=lambda kv: kv[1]["mean_total_reward"])
for path, r in results.items():
tag = " *BEST*" if path == best[0] else ""
print(f" {os.path.basename(path):24s} "
f"mean_r={r['mean_total_reward']:+.3f}±{r['std_total_reward']:.3f} "
f"ebitda%={r['mean_ebitda_margin_pct']:+.2f} "
f"parse_err={r['mean_parse_error_rate']:.2%} "
f"rogue_rec={r['mean_rogue_recall']:.2%}{tag}")
if args.out:
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
with open(args.out, "w") as f:
json.dump({
"mode": "dual-head",
"base_model": args.model,
"journal_adapter": args.journal_adapter,
"seeds": args.seeds,
"results": results,
"best_action_adapter": best[0],
}, f, indent=2)
print(f"wrote {args.out}")
return 0
def cmd_dual_baselines(args: argparse.Namespace) -> int:
"""Apples-to-apples dual-head comparison: run baselines + frontier models
on the same seeds the dual-head SFT/GRPO eval uses.
Baselines (random/heuristic/oracle/god) are naturally single-pass (no LLM);
they are included here because their journal is rule-generated and the
comparison target is the same set of reward components.
Frontier models run in ``dual_head=True`` mode: two API calls per week
using build_action_chat (300 tok) + build_journal_chat (400 tok), mirroring
the DualHeadCEO wire format.
Incremental-save: any policy already present in --out is skipped unless
--force. This makes long frontier sweeps robust to transient API errors.
"""
import time
from inference import (
RandomCEO, HeuristicCEO, OracleCEO, GodCEO, FrontierCEO,
run_policy, EpisodeResult,
)
policies: List[Any] = []
only = set(args.only or [])
skip = set(args.skip or [])
def _want(name: str, default: bool) -> bool:
if only:
return name in only
return default and (name not in skip)
if _want("random", True):
policies.append(RandomCEO())
if _want("heuristic", True):
policies.append(HeuristicCEO())
if _want("oracle", True):
policies.append(OracleCEO())
if _want("god", True):
policies.append(GodCEO())
for m in (args.frontier_models or []):
policies.append(FrontierCEO(
provider=args.frontier_provider,
model=m,
api_base=args.frontier_api_base,
temperature=args.frontier_temperature,
max_tokens=(args.action_max_tokens + args.journal_max_tokens),
budget_hint=False, # dual-head budget is split across two calls
dual_head=True,
action_max_tokens=args.action_max_tokens,
journal_max_tokens=args.journal_max_tokens,
permissive=args.frontier_permissive,
))
# Resume-friendly output
out: Dict[str, Dict[str, Any]] = {}
if args.out and Path(args.out).exists():
try:
out = json.loads(Path(args.out).read_text())
print(f"[dual-baselines] resuming {args.out}, {len(out)} already done: "
f"{sorted(out.keys())}", flush=True)
except Exception:
out = {}
for pol in policies:
if pol.name in out and not args.force:
print(f"[dual-baselines] skip {pol.name} (already in {args.out})", flush=True)
continue
t0 = time.time()
print(f"\n[dual-baselines] >>> {pol.name} seeds={list(args.seeds)}", flush=True)
results: List[EpisodeResult] = run_policy(pol, seeds=list(args.seeds), quiet=False)
wall = time.time() - t0
rewards = [r.total_reward for r in results]
recalls = [(r.rogues_caught / r.rogues_total) if r.rogues_total else 1.0 for r in results]
out[pol.name] = {
"n": len(results),
"wall_s": wall,
"mean_total_reward": statistics.mean(rewards),
"std_total_reward": statistics.stdev(rewards) if len(rewards) > 1 else 0.0,
"mean_ebitda_margin_pct": statistics.mean([r.ebitda_margin_pct for r in results]),
"mean_avg_stockout_pct": statistics.mean([r.avg_stockout_pct for r in results]),
"mean_avg_nps": statistics.mean([r.avg_nps for r in results]),
"rogue_recall": statistics.mean(recalls),
"seeds": list(args.seeds),
"per_seed_total_reward": rewards,
"per_seed_rogue_recall": recalls,
"per_seed_ebitda_margin_pct": [r.ebitda_margin_pct for r in results],
}
if isinstance(pol, FrontierCEO):
out[pol.name].update({
"provider": pol._provider,
"model": pol._model,
"dual_head": True,
"action_max_tokens": pol._action_max_tokens,
"journal_max_tokens": pol._journal_max_tokens,
"n_parse_errors": pol.n_parse_errors,
"n_api_errors": pol.n_api_errors,
"total_tokens": pol.total_tokens,
"total_prompt_tokens": pol.total_prompt_tokens,
"total_completion_tokens": pol.total_completion_tokens,
})
print(
f"[dual-baselines] {pol.name:40s} "
f"mean_r={out[pol.name]['mean_total_reward']:+6.3f} "
f"ebitda%={out[pol.name]['mean_ebitda_margin_pct']:+6.2f} "
f"rogue_rec={out[pol.name]['rogue_recall']:.2%} "
f"[{wall:.0f}s]",
flush=True,
)
if args.out:
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
with open(args.out, "w") as f:
json.dump(out, f, indent=2)
# Final summary table, sorted by mean_total_reward
print("\n=== Dual-head baselines summary ===", flush=True)
ranked = sorted(out.items(), key=lambda kv: kv[1]["mean_total_reward"], reverse=True)
for name, r in ranked:
print(
f" {name:40s} mean_r={r['mean_total_reward']:+6.3f}±{r['std_total_reward']:.3f} "
f"ebitda%={r['mean_ebitda_margin_pct']:+6.2f} "
f"rogue_rec={r['rogue_recall']:.2%}",
flush=True,
)
if args.out:
print(f"wrote {args.out}", flush=True)
return 0
def main() -> int:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
pc = sub.add_parser("curves", help="plot reward curves from one or more run dirs")
pc.add_argument("--runs", nargs="+", required=True,
help="glob patterns, e.g. 'simmart-runs/smoke-1p5b-*'")
pc.add_argument("--png", default=None, help="output PNG path")
pc.add_argument("--out", default=None, help="output JSON dump path")
pc.set_defaults(fn=cmd_curves)
pb = sub.add_parser("baselines",
help="benchmark random/heuristic/oracle (+ optional frontier LLMs) on N seeds")
pb.add_argument("--seeds", type=int, nargs="+", default=[42, 43, 44, 45, 46])
pb.add_argument("--out", default=None)
pb.add_argument("--skip", nargs="*", default=[],
help="skip built-in policies by name (e.g. --skip random oracle)")
pb.add_argument("--include-god", action="store_true",
help="include GodCEO (ground-truth cheat + engineered journal) "
"to measure empirical ceiling")
pb.add_argument("--only", nargs="*", default=[],
help="force-include policies (e.g. --only god)")
pb.add_argument("--force", action="store_true",
help="re-run policies even if already in --out")
pb.add_argument("--frontier-models", nargs="*", default=[],
help="frontier model ids to add (e.g. Claude-Haiku-4.5 Claude-Sonnet-4.6)")
pb.add_argument("--frontier-provider",
choices=["auto", "openai", "anthropic", "openai_responses"], default="auto")
pb.add_argument("--frontier-api-base", default=None)
pb.add_argument("--frontier-temperature", type=float, default=0.0)
pb.add_argument("--frontier-max-tokens", type=int, default=None,
help="hard token cap (defaults to FrontierCEO.DEFAULT_MAX_TOKENS=600)")
pb.add_argument("--frontier-no-budget-hint", action="store_true",
help="do NOT tell the frontier model its token budget")
pb.set_defaults(fn=cmd_baselines)
pt = sub.add_parser("trace", help="rollout a trained adapter deterministically, dump trace")
pt.add_argument("--model", default="Qwen/Qwen2.5-1.5B-Instruct")
pt.add_argument("--adapter", default=None)
pt.add_argument("--seed", type=int, default=42)
pt.add_argument("--out", required=True)
pt.set_defaults(fn=cmd_trace)
pk = sub.add_parser("ckpts", help="sweep many adapters × many seeds, pick best")
pk.add_argument("--model", default="Qwen/Qwen2.5-1.5B-Instruct")
pk.add_argument("--adapters", nargs="+", required=True,
help="glob patterns, e.g. 'simmart-runs/hero-*/adapter-step-*'")
pk.add_argument("--seeds", type=int, nargs="+", default=[101, 202, 303])
pk.add_argument("--max-new-tokens", type=int, default=600)
pk.add_argument("--out", default=None)
pk.set_defaults(fn=cmd_ckpts)
pkd = sub.add_parser(
"ckpts-dual",
help="dual-head eval: sweep action adapters, each paired with a fixed frozen journal adapter",
)
pkd.add_argument("--model", default="Qwen/Qwen2.5-1.5B-Instruct")
pkd.add_argument("--action-adapters", nargs="+", required=True,
help="glob patterns for trainable (action) LoRA checkpoints")
pkd.add_argument("--journal-adapter", required=True,
help="path to the frozen journal LoRA (shared across all action ckpts)")
pkd.add_argument("--seeds", type=int, nargs="+", default=[42, 43, 44, 45, 46])
pkd.add_argument("--action-max-tokens", type=int, default=300)
pkd.add_argument("--journal-max-tokens", type=int, default=400)
pkd.add_argument("--out", default=None)
pkd.set_defaults(fn=cmd_ckpts_dual)
pdb = sub.add_parser(
"dual-baselines",
help="dual-head apples-to-apples: random/heuristic/oracle/god + frontier (two-pass)",
)
pdb.add_argument("--seeds", type=int, nargs="+", default=[42, 43, 44, 45, 46])
pdb.add_argument("--out", default="assets/baselines_dual.json")
pdb.add_argument("--only", nargs="*", default=[],
help="restrict to a subset, e.g. --only heuristic god")
pdb.add_argument("--skip", nargs="*", default=[],
help="skip baselines by name (e.g. --skip random oracle)")
pdb.add_argument("--force", action="store_true",
help="re-run policies even if already in --out")
pdb.add_argument("--frontier-models", nargs="*", default=[],
help="frontier model ids to evaluate in dual-head mode "
"(e.g. Claude-Sonnet-4.6 gpt-5.4)")
pdb.add_argument("--frontier-provider",
choices=["auto", "openai", "anthropic", "openai_responses"], default="auto")
pdb.add_argument("--frontier-api-base", default=None)
pdb.add_argument("--frontier-temperature", type=float, default=0.0)
pdb.add_argument("--action-max-tokens", type=int, default=300)
pdb.add_argument("--journal-max-tokens", type=int, default=400)
pdb.add_argument("--frontier-permissive", action="store_true",
help="use ACTION_SYSTEM_PROMPT_PERMISSIVE for frontier "
"models — allows <thinking> CoT before <action>, "
"adds rogue checklist + budget guidance. "
"Policy names get `-permissive` suffix so strict & "
"permissive runs coexist in the same --out JSON.")
pdb.set_defaults(fn=cmd_dual_baselines)
args = p.parse_args()
return args.fn(args)
if __name__ == "__main__":
raise SystemExit(main())