lean-laguna / spec_rl /spec_rl.py
art87able's picture
Upload folder using huggingface_hub
4706f06 verified
#!/usr/bin/env python3
"""
spec_rl.py — a small `verifiers` environment for the combined hackathon thesis:
"lossless DFlash speculative decoding makes RL post-training cheaper."
The environment is a HumanEval-style code-completion task. The policy model
(Laguna XS.2) is prompted with a function signature + docstring and must emit
the function body. The reward executes the candidate completion against the
problem's unit tests in a SUBPROCESS WITH A TIMEOUT and returns 1.0 if every
FRACTION of the problem's unit-test assertions that pass (a dense RL signal in
[0,1]); the pass@1 eval stays binary (evals/humaneval_subset.py). Reward is the
dense learning signal; the eval is the binary scoreboard.
Why this exists for the hackathon
---------------------------------
verifiers runs RL rollouts against an OpenAI-compatible endpoint declared in
`./configs/endpoints.toml`. Point that endpoint at the DFlash-speculated vLLM
server and the *same* reward curve is produced at higher rollout throughput,
because speculative decoding is lossless under greedy decoding (the drafted
tokens are verified by the target model, so accepted text is token-identical to
the no-speculator baseline). The reward signal does not change; only the cost
per rollout drops. That is the "cheaper RL" claim, made measurable.
Local-dev note (Apple Silicon, no CUDA): this module is import-safe even when
`verifiers` is not installed. `import verifiers as vf` is guarded; a clear
ImportError is raised only when `load_environment()` is actually called. The
reward's code-execution + pass/fail logic is plain stdlib and is unit-testable
without verifiers or a GPU.
SAFETY: this executes model-generated code to grade it. Each candidate runs in a
short-lived subprocess with a wall-clock timeout, isolated from this process.
Run RL rollouts only in the disposable venue sandbox, never against real data.
"""
from __future__ import annotations
import ast
import json
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any
# ---------------------------------------------------------------------------
# Import guard: keep the module importable without `verifiers` installed so the
# reward logic can be unit-tested locally on the Mac. The real dependency is
# only required when building the live environment.
# ---------------------------------------------------------------------------
try:
import verifiers as vf # type: ignore
except ImportError: # pragma: no cover - exercised only when dep is absent
vf = None # type: ignore
# Per-candidate execution budget (seconds). Generous enough for HumanEval's
# bounded reference tests, short enough to bound a runaway rollout.
EXEC_TIMEOUT_S = 8
# Stop sequences mirror evals/humaneval_subset.py so completion shape matches
# the parity/pass@1 harness used to prove losslessness.
STOP = ["\nclass ", "\ndef ", "\n#", "\nif __name__"]
# ---------------------------------------------------------------------------
# Dataset — reuse the HumanEval subset shape: {prompt, test, entry_point}.
# We load the canonical HumanEval test split (same source as
# evals/humaneval_subset.py) and keep only the first `num_examples` problems so
# RL rollouts stay small and cheap during the hackathon.
# ---------------------------------------------------------------------------
def load_problems(num_examples: int, offset: int = 0,
dataset: str | None = None,
dataset_split: str | None = None) -> list[dict[str, Any]]:
"""Return `num_examples` code problems (from `offset`) as {prompt, test, entry_point}.
Default source is the canonical HumanEval test split (same as
evals/humaneval_subset.py). Selection is resolved in precedence order
EXPLICIT-ARG > ENV-VAR > DEFAULT, so the same env serves three callers
cleanly:
* ``dataset`` arg / ``SPEC_RL_DATASET`` env — a local ``.jsonl`` path (one
problem per line) OR a Hugging Face dataset id. This is the drop-in seam
for an Adaption-curated / exported dataset: as long as each row carries
``{prompt, test, entry_point}`` it runs unchanged, so a richer code
taskset built with the hackathon's Adaption credits swaps in with one
arg and no code change.
* ``offset`` arg / ``SPEC_RL_OFFSET`` env — skip the first N problems. This
is how a DISJOINT held-out split (e.g. HumanEval 50–74 for eval) is
carved out of the same canonical source the train pool (0–49) draws from,
with no second dataset to publish — train and eval differ by args alone.
* ``HUMANEVAL_DATASET`` env — override just the default HF repo id if the
venue image pins a mirror. ``dataset_split`` arg / ``SPEC_RL_DATASET_SPLIT``
env overrides the split (default ``test``).
With no args and no env vars set the behaviour is identical to before
(first ``num_examples`` of HumanEval test).
"""
import json
import os
src = dataset or os.environ.get("SPEC_RL_DATASET")
if offset == 0:
offset = int(os.environ.get("SPEC_RL_OFFSET", "0"))
if src and src.endswith(".jsonl") and os.path.exists(src):
with open(src) as f:
rows = [json.loads(line) for line in f if line.strip()]
return rows[offset:offset + num_examples]
from datasets import load_dataset
dataset_id = src or os.environ.get("HUMANEVAL_DATASET", "openai/openai_humaneval")
split = dataset_split or os.environ.get("SPEC_RL_DATASET_SPLIT", "test")
ds = load_dataset(dataset_id, split=split)
hi = min(offset + num_examples, len(ds))
return [dict(ds[i]) for i in range(offset, hi)]
# ---------------------------------------------------------------------------
# Reward core — execute the candidate completion against the unit tests in a
# fresh subprocess with a timeout. Pure stdlib, no verifiers/GPU needed, so it
# can be tested locally. Returns True iff all tests pass within the budget.
# ---------------------------------------------------------------------------
def _build_program(problem: dict[str, Any], completion: str) -> str:
"""Assemble the runnable program: signature+docstring + body + tests."""
return (
problem["prompt"]
+ completion
+ "\n"
+ problem["test"]
+ f"\ncheck({problem['entry_point']})\n"
)
def passes(problem: dict[str, Any], completion: str, timeout_s: int = EXEC_TIMEOUT_S) -> bool:
"""True iff `completion` makes the problem's unit tests pass.
Runs the assembled program in a separate `python` subprocess so a hang,
crash, or `sys.exit` in model-generated code cannot take down the rollout
worker. A non-zero exit code, a raised exception, or a timeout all count as
a failure (reward 0.0).
"""
program = _build_program(problem, completion)
with tempfile.TemporaryDirectory() as tmp:
prog_path = Path(tmp) / "candidate.py"
prog_path.write_text(program)
try:
result = subprocess.run(
[sys.executable, str(prog_path)],
capture_output=True,
text=True,
timeout=timeout_s,
cwd=tmp,
)
except subprocess.TimeoutExpired:
return False
return result.returncode == 0
class _AssertCounter(ast.NodeTransformer):
"""Rewrite each ``assert`` so a failure is COUNTED, not fatal.
``assert <test>`` becomes, roughly::
try: __ok = bool(<test>)
except BaseException: __ok = False
__tally['total'] += 1
if __ok: __tally['passed'] += 1
So every assertion that executes (including inside a ``for`` loop over many
input/output pairs) contributes one test to the denominator, and the
numerator is how many held — turning HumanEval's single all-or-nothing
``check()`` into a fractional pass rate.
"""
def visit_Assert(self, node: ast.Assert):
try_node = ast.Try(
body=[ast.Assign(
targets=[ast.Name(id="__ok", ctx=ast.Store())],
value=ast.Call(func=ast.Name(id="bool", ctx=ast.Load()),
args=[node.test], keywords=[]),
)],
handlers=[ast.ExceptHandler(
type=ast.Name(id="BaseException", ctx=ast.Load()),
name=None,
body=[ast.Assign(
targets=[ast.Name(id="__ok", ctx=ast.Store())],
value=ast.Constant(value=False))],
)],
orelse=[], finalbody=[],
)
incr_total = ast.parse("__tally['total'] += 1").body[0]
incr_pass = ast.parse("if __ok:\n __tally['passed'] += 1").body[0]
out = [try_node, incr_total, incr_pass]
for n in out:
ast.copy_location(n, node)
ast.fix_missing_locations(n)
return out
def fraction_passing(problem: dict[str, Any], completion: str,
timeout_s: int = EXEC_TIMEOUT_S) -> float:
"""Fraction of the problem's unit-test assertions the completion passes.
Returns a value in [0.0, 1.0]: 1.0 = all assertions pass, 0.5 = half, 0.0 =
none (or the code didn't even run). This is the dense RL TRAINING reward; the
reported pass@1 EVAL stays binary (evals/humaneval_subset.py). Reward is the
learning signal, eval is the scoreboard — a dense reward avoids GRPO's
all-zero-group advantage collapse on hard prompts (every rollout failing a
hard problem otherwise yields a zero-variance group with no gradient).
Mechanism: instrument the test's ``assert``s (via _AssertCounter) so each is
counted instead of aborting on the first failure, run the assembled program
in a timed subprocess, and read back passed/total. Falls back to the binary
``passes()`` result if the test can't be parsed or exposes no assertions.
"""
try:
tree = ast.parse(problem["test"])
except SyntaxError:
return 1.0 if passes(problem, completion, timeout_s) else 0.0
tree = _AssertCounter().visit(tree)
ast.fix_missing_locations(tree)
try:
instrumented_test = ast.unparse(tree)
except Exception: # pragma: no cover - ast.unparse needs py>=3.9
return 1.0 if passes(problem, completion, timeout_s) else 0.0
program = (
"__tally = {'passed': 0, 'total': 0}\n"
+ problem["prompt"] + completion + "\n"
+ instrumented_test + "\n"
+ "try:\n"
+ f" check({problem['entry_point']})\n"
+ "except BaseException:\n"
+ " pass\n"
+ "import json as __json\n"
+ "print('__FRAC__' + __json.dumps(__tally))\n"
)
with tempfile.TemporaryDirectory() as tmp:
prog_path = Path(tmp) / "candidate.py"
prog_path.write_text(program)
try:
result = subprocess.run(
[sys.executable, str(prog_path)],
capture_output=True, text=True, timeout=timeout_s, cwd=tmp,
)
except subprocess.TimeoutExpired:
return 0.0
for line in result.stdout.splitlines():
if line.startswith("__FRAC__"):
try:
tally = json.loads(line[len("__FRAC__"):])
total = int(tally.get("total", 0))
passed = int(tally.get("passed", 0))
except Exception:
return 0.0
if total == 0: # no assertions found -> fall back to all-or-nothing
return 1.0 if result.returncode == 0 else 0.0
return max(0.0, min(1.0, passed / total))
# No tally line => the program crashed before instrumentation ran (e.g. a
# syntax error in the completion) => nothing passed.
return 0.0
def _msg_role(message: Any) -> Any:
"""Role of a chat message in either shape: dict ``["role"]`` or object ``.role``."""
if isinstance(message, dict):
return message.get("role")
return getattr(message, "role", None)
def _msg_content(message: Any) -> str:
"""Content of a chat message in either shape: dict ``["content"]`` or object ``.content``."""
if isinstance(message, dict):
return str(message.get("content") or "")
return str(getattr(message, "content", "") or "")
def _extract_completion(state: Any) -> str:
"""Pull the assistant's text out of a verifiers rollout state.
Tolerates the completion as a plain string, a list of dict messages, OR a list
of pydantic ``Message`` OBJECTS (``.role``/``.content`` attributes). The object
shape is what the verifiers version behind ``prime eval``/``prime train`` returns;
handling ONLY the dict shape (the original bug) made ``str(message)`` fall back to
the object's repr ``"role='assistant' content='...'"``, which is unparseable as
code -> a spurious 0 reward across every rollout -> no training signal.
"""
completion = None
if isinstance(state, dict):
completion = state.get("completion")
elif hasattr(state, "get"):
try:
completion = state.get("completion")
except Exception:
completion = None
if completion is None:
completion = getattr(state, "completion", None)
if isinstance(completion, str):
return completion
if isinstance(completion, list):
for message in reversed(completion):
if _msg_role(message) == "assistant":
return _msg_content(message)
# fall back to the last item's content if roles are absent
if completion:
return _msg_content(completion[-1])
return ""
# ---------------------------------------------------------------------------
# System prompt — module constant so the offline manual loop (eval_local.py),
# the classic SingleTurnEnv path, and the cookbook Taskset path all send the
# exact same instruction.
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = (
"You are an expert Python programmer. You will be given a function "
"signature and docstring. Complete the function body only. Do not repeat "
"the signature, do not add explanations, and do not wrap the code in "
"markdown fences. Output only the indented function body."
)
def _problem_from(row: Any) -> dict[str, Any]:
"""Rebuild the gradeable problem from a task/info row (never the model output)."""
src = row.get("info") if hasattr(row, "get") and row.get("info") else row
return {
"prompt": src["code_prompt"],
"test": src["test"],
"entry_point": src["entry_point"],
}
def _score_completion(row: Any, completion_text: str) -> float:
"""Shared reward body: return the fractional unit-test pass rate.
Handles BOTH completion shapes:
* text-completion style — the model emits only the indented function body;
we append it to the prompt's signature (trimming at the first STOP).
* chat style — the model echoes the full ``def <entry>(...):`` signature +
docstring + body. Here, appending to the prompt would double-define the
function, and trimming at the ``"\\ndef "`` STOP would (since the echoed
completion *starts* with ``\\ndef``) chop it to nothing — the cause of the
all-zero reward. So we detect the echoed signature, keep only the prompt's
import/preamble, and score the completion's own function source.
"""
problem = _problem_from(row)
entry = problem["entry_point"]
text = completion_text or ""
# strip common chat wrappers before logic: markdown fences, then truncate at the
# first chat/EOS control tag. The classic SingleTurnEnv path (used by both prime
# eval and the hosted trainer) returns completions ending in a literal
# "</assistant>" (and models may emit other special tokens). Left in place these
# become a stray line in the assembled program -> SyntaxError -> spurious 0 reward
# -> no training signal. Cut them off; also drop a stray leading role tag.
text = text.replace("```python", "").replace("```", "")
text = text.replace("<assistant>", "")
for tag in ("</assistant>", "<|im_end|>", "<|endoftext|>", "<|eot_id|>", "</s>", "<|end|>"):
k = text.find(tag)
if k != -1:
text = text[:k]
marker = f"def {entry}"
if marker in text:
preamble = problem["prompt"].split(marker, 1)[0] # imports/helpers only
func_src = text[text.index(marker):]
# trim trailing non-code chatter after the function definition
for tail in ("\n</", "\nif __name__", "\n#", "\nclass "):
j = func_src.find(tail)
if j != -1:
func_src = func_src[:j]
scoring_problem = {"prompt": preamble, "test": problem["test"], "entry_point": entry}
return fraction_passing(scoring_problem, func_src)
# body-only: trim at the first STOP and append to the prompt signature.
for stop in STOP:
idx = text.find(stop)
if idx != -1:
text = text[:idx]
# Normalize indentation: a chat model (the SingleTurnEnv path the trainer uses)
# frequently emits the body at COLUMN 0 (e.g. "\nreturn a + b") instead of the
# indented body the system prompt asks for. Appended verbatim under the signature
# that is a SyntaxError -> spurious 0 reward. If the first non-blank line isn't
# indented, re-indent the whole body by 4 spaces (textwrap.indent preserves the
# body's own relative structure, so multi-line bodies stay correct).
first_code = next((ln for ln in text.split("\n") if ln.strip()), "")
if first_code and not first_code[0].isspace():
import textwrap
text = textwrap.indent(text, " ")
return fraction_passing(problem, text)
def _task_rows(num_examples: int, offset: int = 0,
dataset: str | None = None,
dataset_split: str | None = None) -> list[dict[str, Any]]:
"""HumanEval-style rows carrying every field the reward needs — `info` nested
AND flattened, so both verifiers API shapes can read them."""
rows: list[dict[str, Any]] = []
for i, prob in enumerate(load_problems(num_examples, offset=offset,
dataset=dataset, dataset_split=dataset_split)):
info = {
"task_id": prob.get("task_id", f"example_{i}"),
"code_prompt": prob["prompt"],
"test": prob["test"],
"entry_point": prob["entry_point"],
}
rows.append({"prompt": prob["prompt"], "answer": prob["entry_point"],
"info": info, **info})
return rows
# ---------------------------------------------------------------------------
# Environment factory — prefers the verifiers **v1** taskset+harness API
# (verifiers>=0.1.14, under `verifiers.v1`): a Taskset(source=<zero-arg row
# generator>, rewards=[<@reward fns>]) adapted to a worker Env by vf.Env(taskset=).
# This is the format `prime eval run` / `prime train` consume. Older verifiers used
# a different Taskset shape; load_environment() falls back to the classic
# vf.SingleTurnEnv/vf.Rubric builder when v1 is absent. Both share the same stdlib
# reward core (fraction_passing), so the reward is identical either way.
# ---------------------------------------------------------------------------
def _v1():
"""Return the `verifiers.v1` module if importable, else None."""
try:
import verifiers.v1 as v1 # type: ignore
return v1
except Exception: # pragma: no cover - older verifiers without a v1 module
return None
def _make_source(num: int | None = None, offset: int = 0,
dataset: str | None = None, dataset_split: str | None = None):
"""Build a zero-arg row generator for the v1 Taskset (lazy + import-cheap).
Selection is resolved EXPLICIT-ARG > ENV-VAR > DEFAULT so one env serves both
the TRAIN pool (e.g. ``num=50, offset=0`` -> HumanEval 0–49) and a DISJOINT
held-out EVAL pool (``offset=50, num=25`` -> HumanEval 50–74) within a single
training run, distinguished by ``[[env]].args`` alone — no second dataset to
publish. Each row is a JSON-serializable task: a user-message prompt (the
function signature + docstring), a per-task system prompt, and the gradeable
fields (code_prompt/test/entry_point) the reward reads back off ``task``.
``max_turns=1`` because this is single-turn code completion.
"""
import os
def _source():
n = num if num is not None else int(os.environ.get("SPEC_RL_NUM", "50"))
for row in _task_rows(n, offset=offset, dataset=dataset,
dataset_split=dataset_split):
yield {
"task_id": row["info"]["task_id"],
"system_prompt": SYSTEM_PROMPT,
"prompt": [{"role": "user", "content": row["code_prompt"]}],
"answer": row["entry_point"],
"code_prompt": row["code_prompt"],
"test": row["test"],
"entry_point": row["entry_point"],
"max_turns": 1,
}
return _source
# Backward-compatible default source (env-var driven), kept so any caller that
# referenced `_spec_rl_source` directly still works.
_spec_rl_source = _make_source()
def load_taskset(config=None, *, num: int | None = None, offset: int = 0,
dataset: str | None = None, dataset_split: str | None = None):
"""Build the v1 Taskset: HumanEval-style code rows + the dense unit-test reward.
The reward is a standalone ``@reward`` function (v1 passes ``(task, state)``); it
trims the completion at the first STOP sequence and returns the fractional
unit-test pass rate computed by the shared stdlib core. ``num``/``offset``/
``dataset`` select the slice (see ``_make_source``).
"""
v1 = _v1()
if v1 is None:
raise ImportError("verifiers.v1 is required for the v1 taskset path")
@v1.reward(weight=1.0)
async def code_reward(task, state) -> float:
"""Dense fractional unit-test pass rate in [0,1] — the RL training reward."""
return _score_completion(task, _extract_completion(state))
source = _make_source(num=num, offset=offset, dataset=dataset,
dataset_split=dataset_split)
return v1.Taskset(source=source, rewards=[code_reward], config=config)
def _build_singleturn_env(num_examples: int, offset: int = 0,
dataset: str | None = None,
dataset_split: str | None = None):
"""Classic verifiers path: a ``vf.SingleTurnEnv`` whose ``vf.Rubric`` scores the
fractional unit-test reward.
This is the STABLE API consumed by both ``prime eval run`` and hosted
``prime train`` (it matches the lab-cookbook reference env shape:
``SingleTurnEnv(dataset=, system_prompt=, rubric=Rubric(funcs=, weights=))`` over
a HF dataset with ``question``/``answer``/``info``/``task`` columns and a plain
reward function). The newer ``verifiers.v1.Taskset(source=...)`` API is NOT used
here because the hosted trainer pins a verifiers whose ``Taskset`` signature
differs — this classic path is the common denominator that runs on both.
"""
from datasets import Dataset
dataset_rows = [
{
"question": row["code_prompt"],
"answer": row["entry_point"],
"info": row["info"],
"task": "spec-rl",
}
for row in _task_rows(num_examples, offset=offset, dataset=dataset,
dataset_split=dataset_split)
]
ds = Dataset.from_list(dataset_rows)
def code_reward(completion, info, **kwargs) -> float:
"""Dense fractional unit-test pass rate in [0,1] — the RL training reward."""
text = completion if isinstance(completion, str) else _extract_completion(
{"completion": completion}
)
return _score_completion({"info": info}, text)
rubric = vf.Rubric(funcs=[code_reward], weights=[1.0])
return vf.SingleTurnEnv(dataset=ds, system_prompt=SYSTEM_PROMPT, rubric=rubric)
def load_environment(config: Any = None, *, num_examples: int = 20,
num: int | None = None, offset: int = 0,
dataset: str | None = None, dataset_split: str | None = None,
**kwargs):
"""Build the spec_rl environment.
Returns the classic ``vf.SingleTurnEnv``/``vf.Rubric`` environment — the STABLE
API consumed by BOTH ``prime eval run`` and hosted ``prime train`` (matching the
lab-cookbook reference env). The newer ``verifiers.v1.Taskset(source=...)`` API is
intentionally not used: the hosted trainer pins a verifiers whose ``Taskset``
signature differs, so this classic path is the common denominator that runs on
both surfaces. The reward logic (``fraction_passing``) is importable and
unit-testable WITHOUT verifiers; the hard dependency is enforced only here.
Dataset-slice args (passed through from ``[[env]].args`` in a train config or
``prime eval -a '{...}'``) select WHICH problems this env serves, so one pushed
env covers both the train pool and a disjoint held-out eval pool in one run:
* ``num`` — number of problems in the pool (default: ``SPEC_RL_NUM`` env or 50).
* ``offset`` — skip the first N (e.g. ``offset=50`` for the held-out split).
* ``dataset`` / ``dataset_split`` — override the source (HF id or local jsonl).
Extra runner kwargs are accepted and ignored so the loader stays robust.
"""
if vf is None:
raise ImportError(
"The 'verifiers' package is required to build the spec_rl environment. "
"Install it with `prime env install art87able/spec-rl` (or `pip install verifiers`). "
"The reward logic (spec_rl.fraction_passing) is importable without it."
)
import os
n = num if num is not None else int(os.environ.get("SPEC_RL_NUM", str(num_examples)))
return _build_singleturn_env(n, offset=offset, dataset=dataset,
dataset_split=dataset_split)
# ---------------------------------------------------------------------------
# Local smoke test (no verifiers, no GPU, no network): proves the reward core
# distinguishes a passing completion from a failing one. Run:
# python spec_rl.py
# ---------------------------------------------------------------------------
def _selftest() -> None:
toy = {
"prompt": "def add(a, b):\n \"\"\"Return a + b.\"\"\"\n",
"test": "def check(candidate):\n assert candidate(2, 3) == 5\n assert candidate(-1, 1) == 0\n",
"entry_point": "add",
}
good = " return a + b\n"
bad = " return a - b\n"
partial = " return a + b if a > 0 else a - b\n" # passes 1 of 2 asserts
loops_forever = " while True:\n pass\n"
report = {
"passing_fraction": fraction_passing(toy, good),
"failing_fraction": fraction_passing(toy, bad),
"partial_fraction": fraction_passing(toy, partial),
"timeout_fraction": fraction_passing(toy, loops_forever, timeout_s=2),
"binary_passes_good": passes(toy, good),
"verifiers_available": vf is not None,
}
print(json.dumps(report, indent=2))
assert report["passing_fraction"] == 1.0, "all asserts pass => 1.0"
assert report["failing_fraction"] == 0.0, "no asserts pass => 0.0"
assert report["partial_fraction"] == 0.5, "1 of 2 asserts => 0.5 (fractional)"
assert report["timeout_fraction"] == 0.0, "timeout => 0.0"
# Regression: the classic SingleTurnEnv path (and the hosted trainer) return
# completions ending in a "</assistant>" chat tag. _score_completion must strip
# it; otherwise it lands in the assembled program as a SyntaxError -> 0 reward
# -> no training signal (this exact bug zeroed the first hosted-train attempt).
info = {"code_prompt": toy["prompt"], "test": toy["test"], "entry_point": "add"}
tagged_body = "\nreturn a + b\n</assistant>" # body-only + chat tag
tagged_echo = "\ndef add(a, b):\n return a + b\n</assistant>" # echoed sig + tag
assert _score_completion({"info": info}, tagged_body) == 1.0, "body+tag must score 1.0"
assert _score_completion({"info": info}, tagged_echo) == 1.0, "echoed+tag must score 1.0"
# Regression: completion as a list of pydantic-style Message OBJECTS (not dicts).
# This is what the prime eval/train verifiers returns; mishandling it (treating
# only dicts) zeroed every reward in the hosted-train attempt.
class _FakeMsg:
def __init__(self, role, content):
self.role, self.content = role, content
obj_compl = [_FakeMsg("user", toy["prompt"]), _FakeMsg("assistant", "\nreturn a + b\n</assistant>")]
extracted = _extract_completion({"completion": obj_compl})
assert "return a + b" in extracted and "role=" not in extracted, "must extract content, not repr"
assert _score_completion({"info": info}, extracted) == 1.0, "object-message path must score 1.0"
print("selftest OK (tag-strip + indent-normalize + object-message extraction)")
if __name__ == "__main__":
_selftest()