#!/usr/bin/env python3 """ spec_rl.py — a small `verifiers` environment for the combined hackathon thesis: "lossless DFlash speculative decoding makes RL post-training cheaper." The environment is a HumanEval-style code-completion task. The policy model (Laguna XS.2) is prompted with a function signature + docstring and must emit the function body. The reward executes the candidate completion against the problem's unit tests in a SUBPROCESS WITH A TIMEOUT and returns 1.0 if every FRACTION of the problem's unit-test assertions that pass (a dense RL signal in [0,1]); the pass@1 eval stays binary (evals/humaneval_subset.py). Reward is the dense learning signal; the eval is the binary scoreboard. Why this exists for the hackathon --------------------------------- verifiers runs RL rollouts against an OpenAI-compatible endpoint declared in `./configs/endpoints.toml`. Point that endpoint at the DFlash-speculated vLLM server and the *same* reward curve is produced at higher rollout throughput, because speculative decoding is lossless under greedy decoding (the drafted tokens are verified by the target model, so accepted text is token-identical to the no-speculator baseline). The reward signal does not change; only the cost per rollout drops. That is the "cheaper RL" claim, made measurable. Local-dev note (Apple Silicon, no CUDA): this module is import-safe even when `verifiers` is not installed. `import verifiers as vf` is guarded; a clear ImportError is raised only when `load_environment()` is actually called. The reward's code-execution + pass/fail logic is plain stdlib and is unit-testable without verifiers or a GPU. SAFETY: this executes model-generated code to grade it. Each candidate runs in a short-lived subprocess with a wall-clock timeout, isolated from this process. Run RL rollouts only in the disposable venue sandbox, never against real data. """ from __future__ import annotations import ast import json import subprocess import sys import tempfile from pathlib import Path from typing import Any # --------------------------------------------------------------------------- # Import guard: keep the module importable without `verifiers` installed so the # reward logic can be unit-tested locally on the Mac. The real dependency is # only required when building the live environment. # --------------------------------------------------------------------------- try: import verifiers as vf # type: ignore except ImportError: # pragma: no cover - exercised only when dep is absent vf = None # type: ignore # Per-candidate execution budget (seconds). Generous enough for HumanEval's # bounded reference tests, short enough to bound a runaway rollout. EXEC_TIMEOUT_S = 8 # Stop sequences mirror evals/humaneval_subset.py so completion shape matches # the parity/pass@1 harness used to prove losslessness. STOP = ["\nclass ", "\ndef ", "\n#", "\nif __name__"] # --------------------------------------------------------------------------- # Dataset — reuse the HumanEval subset shape: {prompt, test, entry_point}. # We load the canonical HumanEval test split (same source as # evals/humaneval_subset.py) and keep only the first `num_examples` problems so # RL rollouts stay small and cheap during the hackathon. # --------------------------------------------------------------------------- def load_problems(num_examples: int, offset: int = 0, dataset: str | None = None, dataset_split: str | None = None) -> list[dict[str, Any]]: """Return `num_examples` code problems (from `offset`) as {prompt, test, entry_point}. Default source is the canonical HumanEval test split (same as evals/humaneval_subset.py). Selection is resolved in precedence order EXPLICIT-ARG > ENV-VAR > DEFAULT, so the same env serves three callers cleanly: * ``dataset`` arg / ``SPEC_RL_DATASET`` env — a local ``.jsonl`` path (one problem per line) OR a Hugging Face dataset id. This is the drop-in seam for an Adaption-curated / exported dataset: as long as each row carries ``{prompt, test, entry_point}`` it runs unchanged, so a richer code taskset built with the hackathon's Adaption credits swaps in with one arg and no code change. * ``offset`` arg / ``SPEC_RL_OFFSET`` env — skip the first N problems. This is how a DISJOINT held-out split (e.g. HumanEval 50–74 for eval) is carved out of the same canonical source the train pool (0–49) draws from, with no second dataset to publish — train and eval differ by args alone. * ``HUMANEVAL_DATASET`` env — override just the default HF repo id if the venue image pins a mirror. ``dataset_split`` arg / ``SPEC_RL_DATASET_SPLIT`` env overrides the split (default ``test``). With no args and no env vars set the behaviour is identical to before (first ``num_examples`` of HumanEval test). """ import json import os src = dataset or os.environ.get("SPEC_RL_DATASET") if offset == 0: offset = int(os.environ.get("SPEC_RL_OFFSET", "0")) if src and src.endswith(".jsonl") and os.path.exists(src): with open(src) as f: rows = [json.loads(line) for line in f if line.strip()] return rows[offset:offset + num_examples] from datasets import load_dataset dataset_id = src or os.environ.get("HUMANEVAL_DATASET", "openai/openai_humaneval") split = dataset_split or os.environ.get("SPEC_RL_DATASET_SPLIT", "test") ds = load_dataset(dataset_id, split=split) hi = min(offset + num_examples, len(ds)) return [dict(ds[i]) for i in range(offset, hi)] # --------------------------------------------------------------------------- # Reward core — execute the candidate completion against the unit tests in a # fresh subprocess with a timeout. Pure stdlib, no verifiers/GPU needed, so it # can be tested locally. Returns True iff all tests pass within the budget. # --------------------------------------------------------------------------- def _build_program(problem: dict[str, Any], completion: str) -> str: """Assemble the runnable program: signature+docstring + body + tests.""" return ( problem["prompt"] + completion + "\n" + problem["test"] + f"\ncheck({problem['entry_point']})\n" ) def passes(problem: dict[str, Any], completion: str, timeout_s: int = EXEC_TIMEOUT_S) -> bool: """True iff `completion` makes the problem's unit tests pass. Runs the assembled program in a separate `python` subprocess so a hang, crash, or `sys.exit` in model-generated code cannot take down the rollout worker. A non-zero exit code, a raised exception, or a timeout all count as a failure (reward 0.0). """ program = _build_program(problem, completion) with tempfile.TemporaryDirectory() as tmp: prog_path = Path(tmp) / "candidate.py" prog_path.write_text(program) try: result = subprocess.run( [sys.executable, str(prog_path)], capture_output=True, text=True, timeout=timeout_s, cwd=tmp, ) except subprocess.TimeoutExpired: return False return result.returncode == 0 class _AssertCounter(ast.NodeTransformer): """Rewrite each ``assert`` so a failure is COUNTED, not fatal. ``assert `` becomes, roughly:: try: __ok = bool() except BaseException: __ok = False __tally['total'] += 1 if __ok: __tally['passed'] += 1 So every assertion that executes (including inside a ``for`` loop over many input/output pairs) contributes one test to the denominator, and the numerator is how many held — turning HumanEval's single all-or-nothing ``check()`` into a fractional pass rate. """ def visit_Assert(self, node: ast.Assert): try_node = ast.Try( body=[ast.Assign( targets=[ast.Name(id="__ok", ctx=ast.Store())], value=ast.Call(func=ast.Name(id="bool", ctx=ast.Load()), args=[node.test], keywords=[]), )], handlers=[ast.ExceptHandler( type=ast.Name(id="BaseException", ctx=ast.Load()), name=None, body=[ast.Assign( targets=[ast.Name(id="__ok", ctx=ast.Store())], value=ast.Constant(value=False))], )], orelse=[], finalbody=[], ) incr_total = ast.parse("__tally['total'] += 1").body[0] incr_pass = ast.parse("if __ok:\n __tally['passed'] += 1").body[0] out = [try_node, incr_total, incr_pass] for n in out: ast.copy_location(n, node) ast.fix_missing_locations(n) return out def fraction_passing(problem: dict[str, Any], completion: str, timeout_s: int = EXEC_TIMEOUT_S) -> float: """Fraction of the problem's unit-test assertions the completion passes. Returns a value in [0.0, 1.0]: 1.0 = all assertions pass, 0.5 = half, 0.0 = none (or the code didn't even run). This is the dense RL TRAINING reward; the reported pass@1 EVAL stays binary (evals/humaneval_subset.py). Reward is the learning signal, eval is the scoreboard — a dense reward avoids GRPO's all-zero-group advantage collapse on hard prompts (every rollout failing a hard problem otherwise yields a zero-variance group with no gradient). Mechanism: instrument the test's ``assert``s (via _AssertCounter) so each is counted instead of aborting on the first failure, run the assembled program in a timed subprocess, and read back passed/total. Falls back to the binary ``passes()`` result if the test can't be parsed or exposes no assertions. """ try: tree = ast.parse(problem["test"]) except SyntaxError: return 1.0 if passes(problem, completion, timeout_s) else 0.0 tree = _AssertCounter().visit(tree) ast.fix_missing_locations(tree) try: instrumented_test = ast.unparse(tree) except Exception: # pragma: no cover - ast.unparse needs py>=3.9 return 1.0 if passes(problem, completion, timeout_s) else 0.0 program = ( "__tally = {'passed': 0, 'total': 0}\n" + problem["prompt"] + completion + "\n" + instrumented_test + "\n" + "try:\n" + f" check({problem['entry_point']})\n" + "except BaseException:\n" + " pass\n" + "import json as __json\n" + "print('__FRAC__' + __json.dumps(__tally))\n" ) with tempfile.TemporaryDirectory() as tmp: prog_path = Path(tmp) / "candidate.py" prog_path.write_text(program) try: result = subprocess.run( [sys.executable, str(prog_path)], capture_output=True, text=True, timeout=timeout_s, cwd=tmp, ) except subprocess.TimeoutExpired: return 0.0 for line in result.stdout.splitlines(): if line.startswith("__FRAC__"): try: tally = json.loads(line[len("__FRAC__"):]) total = int(tally.get("total", 0)) passed = int(tally.get("passed", 0)) except Exception: return 0.0 if total == 0: # no assertions found -> fall back to all-or-nothing return 1.0 if result.returncode == 0 else 0.0 return max(0.0, min(1.0, passed / total)) # No tally line => the program crashed before instrumentation ran (e.g. a # syntax error in the completion) => nothing passed. return 0.0 def _msg_role(message: Any) -> Any: """Role of a chat message in either shape: dict ``["role"]`` or object ``.role``.""" if isinstance(message, dict): return message.get("role") return getattr(message, "role", None) def _msg_content(message: Any) -> str: """Content of a chat message in either shape: dict ``["content"]`` or object ``.content``.""" if isinstance(message, dict): return str(message.get("content") or "") return str(getattr(message, "content", "") or "") def _extract_completion(state: Any) -> str: """Pull the assistant's text out of a verifiers rollout state. Tolerates the completion as a plain string, a list of dict messages, OR a list of pydantic ``Message`` OBJECTS (``.role``/``.content`` attributes). The object shape is what the verifiers version behind ``prime eval``/``prime train`` returns; handling ONLY the dict shape (the original bug) made ``str(message)`` fall back to the object's repr ``"role='assistant' content='...'"``, which is unparseable as code -> a spurious 0 reward across every rollout -> no training signal. """ completion = None if isinstance(state, dict): completion = state.get("completion") elif hasattr(state, "get"): try: completion = state.get("completion") except Exception: completion = None if completion is None: completion = getattr(state, "completion", None) if isinstance(completion, str): return completion if isinstance(completion, list): for message in reversed(completion): if _msg_role(message) == "assistant": return _msg_content(message) # fall back to the last item's content if roles are absent if completion: return _msg_content(completion[-1]) return "" # --------------------------------------------------------------------------- # System prompt — module constant so the offline manual loop (eval_local.py), # the classic SingleTurnEnv path, and the cookbook Taskset path all send the # exact same instruction. # --------------------------------------------------------------------------- SYSTEM_PROMPT = ( "You are an expert Python programmer. You will be given a function " "signature and docstring. Complete the function body only. Do not repeat " "the signature, do not add explanations, and do not wrap the code in " "markdown fences. Output only the indented function body." ) def _problem_from(row: Any) -> dict[str, Any]: """Rebuild the gradeable problem from a task/info row (never the model output).""" src = row.get("info") if hasattr(row, "get") and row.get("info") else row return { "prompt": src["code_prompt"], "test": src["test"], "entry_point": src["entry_point"], } def _score_completion(row: Any, completion_text: str) -> float: """Shared reward body: return the fractional unit-test pass rate. Handles BOTH completion shapes: * text-completion style — the model emits only the indented function body; we append it to the prompt's signature (trimming at the first STOP). * chat style — the model echoes the full ``def (...):`` signature + docstring + body. Here, appending to the prompt would double-define the function, and trimming at the ``"\\ndef "`` STOP would (since the echoed completion *starts* with ``\\ndef``) chop it to nothing — the cause of the all-zero reward. So we detect the echoed signature, keep only the prompt's import/preamble, and score the completion's own function source. """ problem = _problem_from(row) entry = problem["entry_point"] text = completion_text or "" # strip common chat wrappers before logic: markdown fences, then truncate at the # first chat/EOS control tag. The classic SingleTurnEnv path (used by both prime # eval and the hosted trainer) returns completions ending in a literal # "" (and models may emit other special tokens). Left in place these # become a stray line in the assembled program -> SyntaxError -> spurious 0 reward # -> no training signal. Cut them off; also drop a stray leading role tag. text = text.replace("```python", "").replace("```", "") text = text.replace("", "") for tag in ("", "<|im_end|>", "<|endoftext|>", "<|eot_id|>", "", "<|end|>"): k = text.find(tag) if k != -1: text = text[:k] marker = f"def {entry}" if marker in text: preamble = problem["prompt"].split(marker, 1)[0] # imports/helpers only func_src = text[text.index(marker):] # trim trailing non-code chatter after the function definition for tail in ("\n spurious 0 reward. If the first non-blank line isn't # indented, re-indent the whole body by 4 spaces (textwrap.indent preserves the # body's own relative structure, so multi-line bodies stay correct). first_code = next((ln for ln in text.split("\n") if ln.strip()), "") if first_code and not first_code[0].isspace(): import textwrap text = textwrap.indent(text, " ") return fraction_passing(problem, text) def _task_rows(num_examples: int, offset: int = 0, dataset: str | None = None, dataset_split: str | None = None) -> list[dict[str, Any]]: """HumanEval-style rows carrying every field the reward needs — `info` nested AND flattened, so both verifiers API shapes can read them.""" rows: list[dict[str, Any]] = [] for i, prob in enumerate(load_problems(num_examples, offset=offset, dataset=dataset, dataset_split=dataset_split)): info = { "task_id": prob.get("task_id", f"example_{i}"), "code_prompt": prob["prompt"], "test": prob["test"], "entry_point": prob["entry_point"], } rows.append({"prompt": prob["prompt"], "answer": prob["entry_point"], "info": info, **info}) return rows # --------------------------------------------------------------------------- # Environment factory — prefers the verifiers **v1** taskset+harness API # (verifiers>=0.1.14, under `verifiers.v1`): a Taskset(source=, rewards=[<@reward fns>]) adapted to a worker Env by vf.Env(taskset=). # This is the format `prime eval run` / `prime train` consume. Older verifiers used # a different Taskset shape; load_environment() falls back to the classic # vf.SingleTurnEnv/vf.Rubric builder when v1 is absent. Both share the same stdlib # reward core (fraction_passing), so the reward is identical either way. # --------------------------------------------------------------------------- def _v1(): """Return the `verifiers.v1` module if importable, else None.""" try: import verifiers.v1 as v1 # type: ignore return v1 except Exception: # pragma: no cover - older verifiers without a v1 module return None def _make_source(num: int | None = None, offset: int = 0, dataset: str | None = None, dataset_split: str | None = None): """Build a zero-arg row generator for the v1 Taskset (lazy + import-cheap). Selection is resolved EXPLICIT-ARG > ENV-VAR > DEFAULT so one env serves both the TRAIN pool (e.g. ``num=50, offset=0`` -> HumanEval 0–49) and a DISJOINT held-out EVAL pool (``offset=50, num=25`` -> HumanEval 50–74) within a single training run, distinguished by ``[[env]].args`` alone — no second dataset to publish. Each row is a JSON-serializable task: a user-message prompt (the function signature + docstring), a per-task system prompt, and the gradeable fields (code_prompt/test/entry_point) the reward reads back off ``task``. ``max_turns=1`` because this is single-turn code completion. """ import os def _source(): n = num if num is not None else int(os.environ.get("SPEC_RL_NUM", "50")) for row in _task_rows(n, offset=offset, dataset=dataset, dataset_split=dataset_split): yield { "task_id": row["info"]["task_id"], "system_prompt": SYSTEM_PROMPT, "prompt": [{"role": "user", "content": row["code_prompt"]}], "answer": row["entry_point"], "code_prompt": row["code_prompt"], "test": row["test"], "entry_point": row["entry_point"], "max_turns": 1, } return _source # Backward-compatible default source (env-var driven), kept so any caller that # referenced `_spec_rl_source` directly still works. _spec_rl_source = _make_source() def load_taskset(config=None, *, num: int | None = None, offset: int = 0, dataset: str | None = None, dataset_split: str | None = None): """Build the v1 Taskset: HumanEval-style code rows + the dense unit-test reward. The reward is a standalone ``@reward`` function (v1 passes ``(task, state)``); it trims the completion at the first STOP sequence and returns the fractional unit-test pass rate computed by the shared stdlib core. ``num``/``offset``/ ``dataset`` select the slice (see ``_make_source``). """ v1 = _v1() if v1 is None: raise ImportError("verifiers.v1 is required for the v1 taskset path") @v1.reward(weight=1.0) async def code_reward(task, state) -> float: """Dense fractional unit-test pass rate in [0,1] — the RL training reward.""" return _score_completion(task, _extract_completion(state)) source = _make_source(num=num, offset=offset, dataset=dataset, dataset_split=dataset_split) return v1.Taskset(source=source, rewards=[code_reward], config=config) def _build_singleturn_env(num_examples: int, offset: int = 0, dataset: str | None = None, dataset_split: str | None = None): """Classic verifiers path: a ``vf.SingleTurnEnv`` whose ``vf.Rubric`` scores the fractional unit-test reward. This is the STABLE API consumed by both ``prime eval run`` and hosted ``prime train`` (it matches the lab-cookbook reference env shape: ``SingleTurnEnv(dataset=, system_prompt=, rubric=Rubric(funcs=, weights=))`` over a HF dataset with ``question``/``answer``/``info``/``task`` columns and a plain reward function). The newer ``verifiers.v1.Taskset(source=...)`` API is NOT used here because the hosted trainer pins a verifiers whose ``Taskset`` signature differs — this classic path is the common denominator that runs on both. """ from datasets import Dataset dataset_rows = [ { "question": row["code_prompt"], "answer": row["entry_point"], "info": row["info"], "task": "spec-rl", } for row in _task_rows(num_examples, offset=offset, dataset=dataset, dataset_split=dataset_split) ] ds = Dataset.from_list(dataset_rows) def code_reward(completion, info, **kwargs) -> float: """Dense fractional unit-test pass rate in [0,1] — the RL training reward.""" text = completion if isinstance(completion, str) else _extract_completion( {"completion": completion} ) return _score_completion({"info": info}, text) rubric = vf.Rubric(funcs=[code_reward], weights=[1.0]) return vf.SingleTurnEnv(dataset=ds, system_prompt=SYSTEM_PROMPT, rubric=rubric) def load_environment(config: Any = None, *, num_examples: int = 20, num: int | None = None, offset: int = 0, dataset: str | None = None, dataset_split: str | None = None, **kwargs): """Build the spec_rl environment. Returns the classic ``vf.SingleTurnEnv``/``vf.Rubric`` environment — the STABLE API consumed by BOTH ``prime eval run`` and hosted ``prime train`` (matching the lab-cookbook reference env). The newer ``verifiers.v1.Taskset(source=...)`` API is intentionally not used: the hosted trainer pins a verifiers whose ``Taskset`` signature differs, so this classic path is the common denominator that runs on both surfaces. The reward logic (``fraction_passing``) is importable and unit-testable WITHOUT verifiers; the hard dependency is enforced only here. Dataset-slice args (passed through from ``[[env]].args`` in a train config or ``prime eval -a '{...}'``) select WHICH problems this env serves, so one pushed env covers both the train pool and a disjoint held-out eval pool in one run: * ``num`` — number of problems in the pool (default: ``SPEC_RL_NUM`` env or 50). * ``offset`` — skip the first N (e.g. ``offset=50`` for the held-out split). * ``dataset`` / ``dataset_split`` — override the source (HF id or local jsonl). Extra runner kwargs are accepted and ignored so the loader stays robust. """ if vf is None: raise ImportError( "The 'verifiers' package is required to build the spec_rl environment. " "Install it with `prime env install art87able/spec-rl` (or `pip install verifiers`). " "The reward logic (spec_rl.fraction_passing) is importable without it." ) import os n = num if num is not None else int(os.environ.get("SPEC_RL_NUM", str(num_examples))) return _build_singleturn_env(n, offset=offset, dataset=dataset, dataset_split=dataset_split) # --------------------------------------------------------------------------- # Local smoke test (no verifiers, no GPU, no network): proves the reward core # distinguishes a passing completion from a failing one. Run: # python spec_rl.py # --------------------------------------------------------------------------- def _selftest() -> None: toy = { "prompt": "def add(a, b):\n \"\"\"Return a + b.\"\"\"\n", "test": "def check(candidate):\n assert candidate(2, 3) == 5\n assert candidate(-1, 1) == 0\n", "entry_point": "add", } good = " return a + b\n" bad = " return a - b\n" partial = " return a + b if a > 0 else a - b\n" # passes 1 of 2 asserts loops_forever = " while True:\n pass\n" report = { "passing_fraction": fraction_passing(toy, good), "failing_fraction": fraction_passing(toy, bad), "partial_fraction": fraction_passing(toy, partial), "timeout_fraction": fraction_passing(toy, loops_forever, timeout_s=2), "binary_passes_good": passes(toy, good), "verifiers_available": vf is not None, } print(json.dumps(report, indent=2)) assert report["passing_fraction"] == 1.0, "all asserts pass => 1.0" assert report["failing_fraction"] == 0.0, "no asserts pass => 0.0" assert report["partial_fraction"] == 0.5, "1 of 2 asserts => 0.5 (fractional)" assert report["timeout_fraction"] == 0.0, "timeout => 0.0" # Regression: the classic SingleTurnEnv path (and the hosted trainer) return # completions ending in a "" chat tag. _score_completion must strip # it; otherwise it lands in the assembled program as a SyntaxError -> 0 reward # -> no training signal (this exact bug zeroed the first hosted-train attempt). info = {"code_prompt": toy["prompt"], "test": toy["test"], "entry_point": "add"} tagged_body = "\nreturn a + b\n" # body-only + chat tag tagged_echo = "\ndef add(a, b):\n return a + b\n" # echoed sig + tag assert _score_completion({"info": info}, tagged_body) == 1.0, "body+tag must score 1.0" assert _score_completion({"info": info}, tagged_echo) == 1.0, "echoed+tag must score 1.0" # Regression: completion as a list of pydantic-style Message OBJECTS (not dicts). # This is what the prime eval/train verifiers returns; mishandling it (treating # only dicts) zeroed every reward in the hosted-train attempt. class _FakeMsg: def __init__(self, role, content): self.role, self.content = role, content obj_compl = [_FakeMsg("user", toy["prompt"]), _FakeMsg("assistant", "\nreturn a + b\n")] extracted = _extract_completion({"completion": obj_compl}) assert "return a + b" in extracted and "role=" not in extracted, "must extract content, not repr" assert _score_completion({"info": info}, extracted) == 1.0, "object-message path must score 1.0" print("selftest OK (tag-strip + indent-normalize + object-message extraction)") if __name__ == "__main__": _selftest()