| |
| """ |
| spec_rl.py — a small `verifiers` environment for the combined hackathon thesis: |
| "lossless DFlash speculative decoding makes RL post-training cheaper." |
| |
| The environment is a HumanEval-style code-completion task. The policy model |
| (Laguna XS.2) is prompted with a function signature + docstring and must emit |
| the function body. The reward executes the candidate completion against the |
| problem's unit tests in a SUBPROCESS WITH A TIMEOUT and returns 1.0 if every |
| FRACTION of the problem's unit-test assertions that pass (a dense RL signal in |
| [0,1]); the pass@1 eval stays binary (evals/humaneval_subset.py). Reward is the |
| dense learning signal; the eval is the binary scoreboard. |
| |
| Why this exists for the hackathon |
| --------------------------------- |
| verifiers runs RL rollouts against an OpenAI-compatible endpoint declared in |
| `./configs/endpoints.toml`. Point that endpoint at the DFlash-speculated vLLM |
| server and the *same* reward curve is produced at higher rollout throughput, |
| because speculative decoding is lossless under greedy decoding (the drafted |
| tokens are verified by the target model, so accepted text is token-identical to |
| the no-speculator baseline). The reward signal does not change; only the cost |
| per rollout drops. That is the "cheaper RL" claim, made measurable. |
| |
| Local-dev note (Apple Silicon, no CUDA): this module is import-safe even when |
| `verifiers` is not installed. `import verifiers as vf` is guarded; a clear |
| ImportError is raised only when `load_environment()` is actually called. The |
| reward's code-execution + pass/fail logic is plain stdlib and is unit-testable |
| without verifiers or a GPU. |
| |
| SAFETY: this executes model-generated code to grade it. Each candidate runs in a |
| short-lived subprocess with a wall-clock timeout, isolated from this process. |
| Run RL rollouts only in the disposable venue sandbox, never against real data. |
| """ |
| from __future__ import annotations |
|
|
| import ast |
| import json |
| import subprocess |
| import sys |
| import tempfile |
| from pathlib import Path |
| from typing import Any |
|
|
| |
| |
| |
| |
| |
| try: |
| import verifiers as vf |
| except ImportError: |
| vf = None |
|
|
|
|
| |
| |
| EXEC_TIMEOUT_S = 8 |
|
|
| |
| |
| STOP = ["\nclass ", "\ndef ", "\n#", "\nif __name__"] |
|
|
|
|
| |
| |
| |
| |
| |
| |
| def load_problems(num_examples: int, offset: int = 0, |
| dataset: str | None = None, |
| dataset_split: str | None = None) -> list[dict[str, Any]]: |
| """Return `num_examples` code problems (from `offset`) as {prompt, test, entry_point}. |
| |
| Default source is the canonical HumanEval test split (same as |
| evals/humaneval_subset.py). Selection is resolved in precedence order |
| EXPLICIT-ARG > ENV-VAR > DEFAULT, so the same env serves three callers |
| cleanly: |
| |
| * ``dataset`` arg / ``SPEC_RL_DATASET`` env — a local ``.jsonl`` path (one |
| problem per line) OR a Hugging Face dataset id. This is the drop-in seam |
| for an Adaption-curated / exported dataset: as long as each row carries |
| ``{prompt, test, entry_point}`` it runs unchanged, so a richer code |
| taskset built with the hackathon's Adaption credits swaps in with one |
| arg and no code change. |
| * ``offset`` arg / ``SPEC_RL_OFFSET`` env — skip the first N problems. This |
| is how a DISJOINT held-out split (e.g. HumanEval 50–74 for eval) is |
| carved out of the same canonical source the train pool (0–49) draws from, |
| with no second dataset to publish — train and eval differ by args alone. |
| * ``HUMANEVAL_DATASET`` env — override just the default HF repo id if the |
| venue image pins a mirror. ``dataset_split`` arg / ``SPEC_RL_DATASET_SPLIT`` |
| env overrides the split (default ``test``). |
| |
| With no args and no env vars set the behaviour is identical to before |
| (first ``num_examples`` of HumanEval test). |
| """ |
| import json |
| import os |
|
|
| src = dataset or os.environ.get("SPEC_RL_DATASET") |
| if offset == 0: |
| offset = int(os.environ.get("SPEC_RL_OFFSET", "0")) |
| if src and src.endswith(".jsonl") and os.path.exists(src): |
| with open(src) as f: |
| rows = [json.loads(line) for line in f if line.strip()] |
| return rows[offset:offset + num_examples] |
|
|
| from datasets import load_dataset |
|
|
| dataset_id = src or os.environ.get("HUMANEVAL_DATASET", "openai/openai_humaneval") |
| split = dataset_split or os.environ.get("SPEC_RL_DATASET_SPLIT", "test") |
| ds = load_dataset(dataset_id, split=split) |
| hi = min(offset + num_examples, len(ds)) |
| return [dict(ds[i]) for i in range(offset, hi)] |
|
|
|
|
| |
| |
| |
| |
| |
| def _build_program(problem: dict[str, Any], completion: str) -> str: |
| """Assemble the runnable program: signature+docstring + body + tests.""" |
| return ( |
| problem["prompt"] |
| + completion |
| + "\n" |
| + problem["test"] |
| + f"\ncheck({problem['entry_point']})\n" |
| ) |
|
|
|
|
| def passes(problem: dict[str, Any], completion: str, timeout_s: int = EXEC_TIMEOUT_S) -> bool: |
| """True iff `completion` makes the problem's unit tests pass. |
| |
| Runs the assembled program in a separate `python` subprocess so a hang, |
| crash, or `sys.exit` in model-generated code cannot take down the rollout |
| worker. A non-zero exit code, a raised exception, or a timeout all count as |
| a failure (reward 0.0). |
| """ |
| program = _build_program(problem, completion) |
| with tempfile.TemporaryDirectory() as tmp: |
| prog_path = Path(tmp) / "candidate.py" |
| prog_path.write_text(program) |
| try: |
| result = subprocess.run( |
| [sys.executable, str(prog_path)], |
| capture_output=True, |
| text=True, |
| timeout=timeout_s, |
| cwd=tmp, |
| ) |
| except subprocess.TimeoutExpired: |
| return False |
| return result.returncode == 0 |
|
|
|
|
| class _AssertCounter(ast.NodeTransformer): |
| """Rewrite each ``assert`` so a failure is COUNTED, not fatal. |
| |
| ``assert <test>`` becomes, roughly:: |
| |
| try: __ok = bool(<test>) |
| except BaseException: __ok = False |
| __tally['total'] += 1 |
| if __ok: __tally['passed'] += 1 |
| |
| So every assertion that executes (including inside a ``for`` loop over many |
| input/output pairs) contributes one test to the denominator, and the |
| numerator is how many held — turning HumanEval's single all-or-nothing |
| ``check()`` into a fractional pass rate. |
| """ |
|
|
| def visit_Assert(self, node: ast.Assert): |
| try_node = ast.Try( |
| body=[ast.Assign( |
| targets=[ast.Name(id="__ok", ctx=ast.Store())], |
| value=ast.Call(func=ast.Name(id="bool", ctx=ast.Load()), |
| args=[node.test], keywords=[]), |
| )], |
| handlers=[ast.ExceptHandler( |
| type=ast.Name(id="BaseException", ctx=ast.Load()), |
| name=None, |
| body=[ast.Assign( |
| targets=[ast.Name(id="__ok", ctx=ast.Store())], |
| value=ast.Constant(value=False))], |
| )], |
| orelse=[], finalbody=[], |
| ) |
| incr_total = ast.parse("__tally['total'] += 1").body[0] |
| incr_pass = ast.parse("if __ok:\n __tally['passed'] += 1").body[0] |
| out = [try_node, incr_total, incr_pass] |
| for n in out: |
| ast.copy_location(n, node) |
| ast.fix_missing_locations(n) |
| return out |
|
|
|
|
| def fraction_passing(problem: dict[str, Any], completion: str, |
| timeout_s: int = EXEC_TIMEOUT_S) -> float: |
| """Fraction of the problem's unit-test assertions the completion passes. |
| |
| Returns a value in [0.0, 1.0]: 1.0 = all assertions pass, 0.5 = half, 0.0 = |
| none (or the code didn't even run). This is the dense RL TRAINING reward; the |
| reported pass@1 EVAL stays binary (evals/humaneval_subset.py). Reward is the |
| learning signal, eval is the scoreboard — a dense reward avoids GRPO's |
| all-zero-group advantage collapse on hard prompts (every rollout failing a |
| hard problem otherwise yields a zero-variance group with no gradient). |
| |
| Mechanism: instrument the test's ``assert``s (via _AssertCounter) so each is |
| counted instead of aborting on the first failure, run the assembled program |
| in a timed subprocess, and read back passed/total. Falls back to the binary |
| ``passes()`` result if the test can't be parsed or exposes no assertions. |
| """ |
| try: |
| tree = ast.parse(problem["test"]) |
| except SyntaxError: |
| return 1.0 if passes(problem, completion, timeout_s) else 0.0 |
| tree = _AssertCounter().visit(tree) |
| ast.fix_missing_locations(tree) |
| try: |
| instrumented_test = ast.unparse(tree) |
| except Exception: |
| return 1.0 if passes(problem, completion, timeout_s) else 0.0 |
|
|
| program = ( |
| "__tally = {'passed': 0, 'total': 0}\n" |
| + problem["prompt"] + completion + "\n" |
| + instrumented_test + "\n" |
| + "try:\n" |
| + f" check({problem['entry_point']})\n" |
| + "except BaseException:\n" |
| + " pass\n" |
| + "import json as __json\n" |
| + "print('__FRAC__' + __json.dumps(__tally))\n" |
| ) |
| with tempfile.TemporaryDirectory() as tmp: |
| prog_path = Path(tmp) / "candidate.py" |
| prog_path.write_text(program) |
| try: |
| result = subprocess.run( |
| [sys.executable, str(prog_path)], |
| capture_output=True, text=True, timeout=timeout_s, cwd=tmp, |
| ) |
| except subprocess.TimeoutExpired: |
| return 0.0 |
| for line in result.stdout.splitlines(): |
| if line.startswith("__FRAC__"): |
| try: |
| tally = json.loads(line[len("__FRAC__"):]) |
| total = int(tally.get("total", 0)) |
| passed = int(tally.get("passed", 0)) |
| except Exception: |
| return 0.0 |
| if total == 0: |
| return 1.0 if result.returncode == 0 else 0.0 |
| return max(0.0, min(1.0, passed / total)) |
| |
| |
| return 0.0 |
|
|
|
|
| def _msg_role(message: Any) -> Any: |
| """Role of a chat message in either shape: dict ``["role"]`` or object ``.role``.""" |
| if isinstance(message, dict): |
| return message.get("role") |
| return getattr(message, "role", None) |
|
|
|
|
| def _msg_content(message: Any) -> str: |
| """Content of a chat message in either shape: dict ``["content"]`` or object ``.content``.""" |
| if isinstance(message, dict): |
| return str(message.get("content") or "") |
| return str(getattr(message, "content", "") or "") |
|
|
|
|
| def _extract_completion(state: Any) -> str: |
| """Pull the assistant's text out of a verifiers rollout state. |
| |
| Tolerates the completion as a plain string, a list of dict messages, OR a list |
| of pydantic ``Message`` OBJECTS (``.role``/``.content`` attributes). The object |
| shape is what the verifiers version behind ``prime eval``/``prime train`` returns; |
| handling ONLY the dict shape (the original bug) made ``str(message)`` fall back to |
| the object's repr ``"role='assistant' content='...'"``, which is unparseable as |
| code -> a spurious 0 reward across every rollout -> no training signal. |
| """ |
| completion = None |
| if isinstance(state, dict): |
| completion = state.get("completion") |
| elif hasattr(state, "get"): |
| try: |
| completion = state.get("completion") |
| except Exception: |
| completion = None |
| if completion is None: |
| completion = getattr(state, "completion", None) |
| if isinstance(completion, str): |
| return completion |
| if isinstance(completion, list): |
| for message in reversed(completion): |
| if _msg_role(message) == "assistant": |
| return _msg_content(message) |
| |
| if completion: |
| return _msg_content(completion[-1]) |
| return "" |
|
|
|
|
| |
| |
| |
| |
| |
| SYSTEM_PROMPT = ( |
| "You are an expert Python programmer. You will be given a function " |
| "signature and docstring. Complete the function body only. Do not repeat " |
| "the signature, do not add explanations, and do not wrap the code in " |
| "markdown fences. Output only the indented function body." |
| ) |
|
|
|
|
| def _problem_from(row: Any) -> dict[str, Any]: |
| """Rebuild the gradeable problem from a task/info row (never the model output).""" |
| src = row.get("info") if hasattr(row, "get") and row.get("info") else row |
| return { |
| "prompt": src["code_prompt"], |
| "test": src["test"], |
| "entry_point": src["entry_point"], |
| } |
|
|
|
|
| def _score_completion(row: Any, completion_text: str) -> float: |
| """Shared reward body: return the fractional unit-test pass rate. |
| |
| Handles BOTH completion shapes: |
| * text-completion style — the model emits only the indented function body; |
| we append it to the prompt's signature (trimming at the first STOP). |
| * chat style — the model echoes the full ``def <entry>(...):`` signature + |
| docstring + body. Here, appending to the prompt would double-define the |
| function, and trimming at the ``"\\ndef "`` STOP would (since the echoed |
| completion *starts* with ``\\ndef``) chop it to nothing — the cause of the |
| all-zero reward. So we detect the echoed signature, keep only the prompt's |
| import/preamble, and score the completion's own function source. |
| """ |
| problem = _problem_from(row) |
| entry = problem["entry_point"] |
| text = completion_text or "" |
| |
| |
| |
| |
| |
| |
| text = text.replace("```python", "").replace("```", "") |
| text = text.replace("<assistant>", "") |
| for tag in ("</assistant>", "<|im_end|>", "<|endoftext|>", "<|eot_id|>", "</s>", "<|end|>"): |
| k = text.find(tag) |
| if k != -1: |
| text = text[:k] |
| marker = f"def {entry}" |
| if marker in text: |
| preamble = problem["prompt"].split(marker, 1)[0] |
| func_src = text[text.index(marker):] |
| |
| for tail in ("\n</", "\nif __name__", "\n#", "\nclass "): |
| j = func_src.find(tail) |
| if j != -1: |
| func_src = func_src[:j] |
| scoring_problem = {"prompt": preamble, "test": problem["test"], "entry_point": entry} |
| return fraction_passing(scoring_problem, func_src) |
| |
| for stop in STOP: |
| idx = text.find(stop) |
| if idx != -1: |
| text = text[:idx] |
| |
| |
| |
| |
| |
| |
| first_code = next((ln for ln in text.split("\n") if ln.strip()), "") |
| if first_code and not first_code[0].isspace(): |
| import textwrap |
| text = textwrap.indent(text, " ") |
| return fraction_passing(problem, text) |
|
|
|
|
| def _task_rows(num_examples: int, offset: int = 0, |
| dataset: str | None = None, |
| dataset_split: str | None = None) -> list[dict[str, Any]]: |
| """HumanEval-style rows carrying every field the reward needs — `info` nested |
| AND flattened, so both verifiers API shapes can read them.""" |
| rows: list[dict[str, Any]] = [] |
| for i, prob in enumerate(load_problems(num_examples, offset=offset, |
| dataset=dataset, dataset_split=dataset_split)): |
| info = { |
| "task_id": prob.get("task_id", f"example_{i}"), |
| "code_prompt": prob["prompt"], |
| "test": prob["test"], |
| "entry_point": prob["entry_point"], |
| } |
| rows.append({"prompt": prob["prompt"], "answer": prob["entry_point"], |
| "info": info, **info}) |
| return rows |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def _v1(): |
| """Return the `verifiers.v1` module if importable, else None.""" |
| try: |
| import verifiers.v1 as v1 |
| return v1 |
| except Exception: |
| return None |
|
|
|
|
| def _make_source(num: int | None = None, offset: int = 0, |
| dataset: str | None = None, dataset_split: str | None = None): |
| """Build a zero-arg row generator for the v1 Taskset (lazy + import-cheap). |
| |
| Selection is resolved EXPLICIT-ARG > ENV-VAR > DEFAULT so one env serves both |
| the TRAIN pool (e.g. ``num=50, offset=0`` -> HumanEval 0–49) and a DISJOINT |
| held-out EVAL pool (``offset=50, num=25`` -> HumanEval 50–74) within a single |
| training run, distinguished by ``[[env]].args`` alone — no second dataset to |
| publish. Each row is a JSON-serializable task: a user-message prompt (the |
| function signature + docstring), a per-task system prompt, and the gradeable |
| fields (code_prompt/test/entry_point) the reward reads back off ``task``. |
| ``max_turns=1`` because this is single-turn code completion. |
| """ |
| import os |
|
|
| def _source(): |
| n = num if num is not None else int(os.environ.get("SPEC_RL_NUM", "50")) |
| for row in _task_rows(n, offset=offset, dataset=dataset, |
| dataset_split=dataset_split): |
| yield { |
| "task_id": row["info"]["task_id"], |
| "system_prompt": SYSTEM_PROMPT, |
| "prompt": [{"role": "user", "content": row["code_prompt"]}], |
| "answer": row["entry_point"], |
| "code_prompt": row["code_prompt"], |
| "test": row["test"], |
| "entry_point": row["entry_point"], |
| "max_turns": 1, |
| } |
| return _source |
|
|
|
|
| |
| |
| _spec_rl_source = _make_source() |
|
|
|
|
| def load_taskset(config=None, *, num: int | None = None, offset: int = 0, |
| dataset: str | None = None, dataset_split: str | None = None): |
| """Build the v1 Taskset: HumanEval-style code rows + the dense unit-test reward. |
| |
| The reward is a standalone ``@reward`` function (v1 passes ``(task, state)``); it |
| trims the completion at the first STOP sequence and returns the fractional |
| unit-test pass rate computed by the shared stdlib core. ``num``/``offset``/ |
| ``dataset`` select the slice (see ``_make_source``). |
| """ |
| v1 = _v1() |
| if v1 is None: |
| raise ImportError("verifiers.v1 is required for the v1 taskset path") |
|
|
| @v1.reward(weight=1.0) |
| async def code_reward(task, state) -> float: |
| """Dense fractional unit-test pass rate in [0,1] — the RL training reward.""" |
| return _score_completion(task, _extract_completion(state)) |
|
|
| source = _make_source(num=num, offset=offset, dataset=dataset, |
| dataset_split=dataset_split) |
| return v1.Taskset(source=source, rewards=[code_reward], config=config) |
|
|
|
|
| def _build_singleturn_env(num_examples: int, offset: int = 0, |
| dataset: str | None = None, |
| dataset_split: str | None = None): |
| """Classic verifiers path: a ``vf.SingleTurnEnv`` whose ``vf.Rubric`` scores the |
| fractional unit-test reward. |
| |
| This is the STABLE API consumed by both ``prime eval run`` and hosted |
| ``prime train`` (it matches the lab-cookbook reference env shape: |
| ``SingleTurnEnv(dataset=, system_prompt=, rubric=Rubric(funcs=, weights=))`` over |
| a HF dataset with ``question``/``answer``/``info``/``task`` columns and a plain |
| reward function). The newer ``verifiers.v1.Taskset(source=...)`` API is NOT used |
| here because the hosted trainer pins a verifiers whose ``Taskset`` signature |
| differs — this classic path is the common denominator that runs on both. |
| """ |
| from datasets import Dataset |
|
|
| dataset_rows = [ |
| { |
| "question": row["code_prompt"], |
| "answer": row["entry_point"], |
| "info": row["info"], |
| "task": "spec-rl", |
| } |
| for row in _task_rows(num_examples, offset=offset, dataset=dataset, |
| dataset_split=dataset_split) |
| ] |
| ds = Dataset.from_list(dataset_rows) |
|
|
| def code_reward(completion, info, **kwargs) -> float: |
| """Dense fractional unit-test pass rate in [0,1] — the RL training reward.""" |
| text = completion if isinstance(completion, str) else _extract_completion( |
| {"completion": completion} |
| ) |
| return _score_completion({"info": info}, text) |
|
|
| rubric = vf.Rubric(funcs=[code_reward], weights=[1.0]) |
| return vf.SingleTurnEnv(dataset=ds, system_prompt=SYSTEM_PROMPT, rubric=rubric) |
|
|
|
|
| def load_environment(config: Any = None, *, num_examples: int = 20, |
| num: int | None = None, offset: int = 0, |
| dataset: str | None = None, dataset_split: str | None = None, |
| **kwargs): |
| """Build the spec_rl environment. |
| |
| Returns the classic ``vf.SingleTurnEnv``/``vf.Rubric`` environment — the STABLE |
| API consumed by BOTH ``prime eval run`` and hosted ``prime train`` (matching the |
| lab-cookbook reference env). The newer ``verifiers.v1.Taskset(source=...)`` API is |
| intentionally not used: the hosted trainer pins a verifiers whose ``Taskset`` |
| signature differs, so this classic path is the common denominator that runs on |
| both surfaces. The reward logic (``fraction_passing``) is importable and |
| unit-testable WITHOUT verifiers; the hard dependency is enforced only here. |
| |
| Dataset-slice args (passed through from ``[[env]].args`` in a train config or |
| ``prime eval -a '{...}'``) select WHICH problems this env serves, so one pushed |
| env covers both the train pool and a disjoint held-out eval pool in one run: |
| |
| * ``num`` — number of problems in the pool (default: ``SPEC_RL_NUM`` env or 50). |
| * ``offset`` — skip the first N (e.g. ``offset=50`` for the held-out split). |
| * ``dataset`` / ``dataset_split`` — override the source (HF id or local jsonl). |
| |
| Extra runner kwargs are accepted and ignored so the loader stays robust. |
| """ |
| if vf is None: |
| raise ImportError( |
| "The 'verifiers' package is required to build the spec_rl environment. " |
| "Install it with `prime env install art87able/spec-rl` (or `pip install verifiers`). " |
| "The reward logic (spec_rl.fraction_passing) is importable without it." |
| ) |
| import os |
| n = num if num is not None else int(os.environ.get("SPEC_RL_NUM", str(num_examples))) |
| return _build_singleturn_env(n, offset=offset, dataset=dataset, |
| dataset_split=dataset_split) |
|
|
|
|
| |
| |
| |
| |
| |
| def _selftest() -> None: |
| toy = { |
| "prompt": "def add(a, b):\n \"\"\"Return a + b.\"\"\"\n", |
| "test": "def check(candidate):\n assert candidate(2, 3) == 5\n assert candidate(-1, 1) == 0\n", |
| "entry_point": "add", |
| } |
| good = " return a + b\n" |
| bad = " return a - b\n" |
| partial = " return a + b if a > 0 else a - b\n" |
| loops_forever = " while True:\n pass\n" |
| report = { |
| "passing_fraction": fraction_passing(toy, good), |
| "failing_fraction": fraction_passing(toy, bad), |
| "partial_fraction": fraction_passing(toy, partial), |
| "timeout_fraction": fraction_passing(toy, loops_forever, timeout_s=2), |
| "binary_passes_good": passes(toy, good), |
| "verifiers_available": vf is not None, |
| } |
| print(json.dumps(report, indent=2)) |
| assert report["passing_fraction"] == 1.0, "all asserts pass => 1.0" |
| assert report["failing_fraction"] == 0.0, "no asserts pass => 0.0" |
| assert report["partial_fraction"] == 0.5, "1 of 2 asserts => 0.5 (fractional)" |
| assert report["timeout_fraction"] == 0.0, "timeout => 0.0" |
|
|
| |
| |
| |
| |
| info = {"code_prompt": toy["prompt"], "test": toy["test"], "entry_point": "add"} |
| tagged_body = "\nreturn a + b\n</assistant>" |
| tagged_echo = "\ndef add(a, b):\n return a + b\n</assistant>" |
| assert _score_completion({"info": info}, tagged_body) == 1.0, "body+tag must score 1.0" |
| assert _score_completion({"info": info}, tagged_echo) == 1.0, "echoed+tag must score 1.0" |
|
|
| |
| |
| |
| class _FakeMsg: |
| def __init__(self, role, content): |
| self.role, self.content = role, content |
| obj_compl = [_FakeMsg("user", toy["prompt"]), _FakeMsg("assistant", "\nreturn a + b\n</assistant>")] |
| extracted = _extract_completion({"completion": obj_compl}) |
| assert "return a + b" in extracted and "role=" not in extracted, "must extract content, not repr" |
| assert _score_completion({"info": info}, extracted) == 1.0, "object-message path must score 1.0" |
| print("selftest OK (tag-strip + indent-normalize + object-message extraction)") |
|
|
|
|
| if __name__ == "__main__": |
| _selftest() |
|
|