Reinforcement Learning
Transformers
English
post-training
distillation
agentic-coding
composer-2.5
cursor
kimi-k2
grpo
dapo
diloco
openenv
trl
verl
research
methodology
Instructions to use Codeseys/composer-replication-framework with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use Codeseys/composer-replication-framework with Transformers:
# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("Codeseys/composer-replication-framework", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| """Tests for the Stage-0 pipeline: contract, dedup, driver. | |
| Load-bearing coverage: the sentinel leak guard on write_tasks (finding D-8), | |
| holdout exclusion + budget stop + idempotency in build_corpus (D-21), and the | |
| cross-generation dedup path (D-12). | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import json | |
| from pathlib import Path | |
| import pytest | |
| from composer_replication.datagen.env import FeatureDeletionEnv | |
| from composer_replication.datagen.rollout_harness import ScriptedPolicy | |
| from composer_replication.datagen.sandbox import FakeSandbox | |
| from composer_replication.datagen.schema import FeatureDeletionTask | |
| from composer_replication.datagen.trajectory import ToolCall | |
| from composer_replication.pipeline.build_corpus import build_corpus | |
| from composer_replication.pipeline.dedup import ( | |
| dedup, | |
| find_near_duplicates, | |
| jaccard_estimate, | |
| load_signatures, | |
| minhash_signature, | |
| signatures_to_jsonl, | |
| ) | |
| from composer_replication.pipeline.s3_contract import ( | |
| RunLayout, | |
| RunManifest, | |
| write_dataset_card, | |
| write_tasks, | |
| write_tasks_full, | |
| ) | |
| def _task(i: int, **over) -> FeatureDeletionTask: | |
| base = dict( | |
| task_id=f"task-{i:03d}", repo="org/repo", base_commit="abc", | |
| broken_image="img:1", test_command="pytest -q", | |
| fail_to_pass=(f"t/a.py::t{i}",), pass_to_pass=("t/a.py::keep",), | |
| golden_diff="SENTINEL_NEVER_LEAK", deleted_symbols=("secret_fn",), | |
| ) | |
| base.update(over) | |
| return FeatureDeletionTask(**base) | |
| # --------------------------------------------------------------------- | |
| # RunLayout / RunManifest | |
| # --------------------------------------------------------------------- | |
| def test_layout_paths_are_pure_and_namespaced(): | |
| lay = RunLayout(root="/data/corpora", run_id="run42") | |
| assert lay.sft_path == "/data/corpora/runs/run42/corpus_sft/rows.jsonl" | |
| assert lay.manifest_path == "/data/corpora/runs/run42/manifest.json" | |
| s3 = RunLayout(root="s3://bucket/prefix/", run_id="r") | |
| assert s3.tasks_path == "s3://bucket/prefix/runs/r/tasks/manifest.jsonl" | |
| def test_manifest_round_trip_and_budget(tmp_path): | |
| lay = RunLayout(root=str(tmp_path), run_id="r1") | |
| m = RunManifest(run_id="r1", created_at="2026-06-09T00:00:00Z", | |
| source="test", budget_usd=1.0) | |
| m.spend(0.4) | |
| assert not m.over_budget | |
| m.spend(0.6) | |
| assert m.over_budget | |
| m.write(lay) | |
| m2 = RunManifest.read(lay) | |
| assert m2.cost_usd == pytest.approx(1.0) | |
| assert m2.budget_usd == 1.0 | |
| # --------------------------------------------------------------------- | |
| # THE leak guard (finding D-8) | |
| # --------------------------------------------------------------------- | |
| def test_write_tasks_never_leaks_golden_diff(tmp_path): | |
| lay = RunLayout(root=str(tmp_path), run_id="r1") | |
| write_tasks(lay, [_task(1)]) | |
| blob = Path(lay.tasks_path).read_text() | |
| assert "SENTINEL_NEVER_LEAK" not in blob | |
| assert "secret_fn" not in blob | |
| row = json.loads(blob.splitlines()[0]) | |
| assert row["golden_diff_sha256"] # provenance preserved as a hash | |
| # The restricted full writer DOES carry it (construction side only). | |
| write_tasks_full(lay, [_task(1)]) | |
| assert "SENTINEL_NEVER_LEAK" in Path(lay.tasks_full_path).read_text() | |
| # --------------------------------------------------------------------- | |
| # MinHash dedup | |
| # --------------------------------------------------------------------- | |
| _TEXT_A = "the quick brown fox jumps over the lazy dog and then runs far away home tonight" | |
| _TEXT_A2 = "the quick brown fox jumps over the lazy dog and then runs far away home today" | |
| _TEXT_B = "import numpy as np def main(): return np.zeros(10) print(main()) totally different content here" | |
| def test_jaccard_estimate_near_duplicates_high_disjoint_low(): | |
| sa, sa2, sb = (minhash_signature(t) for t in (_TEXT_A, _TEXT_A2, _TEXT_B)) | |
| assert jaccard_estimate(sa, sa2) > 0.5 | |
| assert jaccard_estimate(sa, sb) < 0.2 | |
| assert jaccard_estimate(sa, sa) == 1.0 | |
| def test_dedup_keeps_first_and_drops_near_dup(): | |
| rows = [{"text": _TEXT_A}, {"text": _TEXT_A2}, {"text": _TEXT_B}] | |
| kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5) | |
| assert [r["text"] for r in kept] == [_TEXT_A, _TEXT_B] | |
| assert stats["dropped_within_run"] == 1 | |
| def test_cross_generation_dedup_via_signature_file(): | |
| prior_rows = [{"text": _TEXT_A}] | |
| buf = io.StringIO() | |
| signatures_to_jsonl(prior_rows, lambda r: r["text"], buf) | |
| buf.seek(0) | |
| prior_sigs = load_signatures(buf) | |
| rows = [{"text": _TEXT_A2}, {"text": _TEXT_B}] | |
| kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5, | |
| prior_signatures=prior_sigs) | |
| assert [r["text"] for r in kept] == [_TEXT_B] | |
| assert stats["dropped_cross_generation"] == 1 | |
| def test_find_near_duplicates_pairs(): | |
| rows = [{"t": _TEXT_A}, {"t": _TEXT_A2}] | |
| assert find_near_duplicates(rows, lambda r: r["t"], 0.5) == [(0, 1)] | |
| # --------------------------------------------------------------------- | |
| # build_corpus end-to-end (FakeSandbox + ScriptedPolicy) | |
| # --------------------------------------------------------------------- | |
| def _passing_policy(): | |
| # Flips both this task's F2P tests green generically: FakeSandbox's | |
| # set_outcome takes explicit test names, so the fixture tasks share names | |
| # via the same fail_to_pass tuple pattern; we set a superset. | |
| outcomes = {f"t/a.py::t{i}": True for i in range(20)} | |
| outcomes["t/a.py::keep"] = True | |
| return ScriptedPolicy(actions=[ToolCall("set_outcome", {"outcomes": outcomes}), "done"]) | |
| def _failing_policy(): | |
| return ScriptedPolicy(actions=["gave up immediately"]) | |
| def _env(): | |
| return FeatureDeletionEnv(FakeSandbox(test_outcomes={"t/a.py::keep": True})) | |
| def test_build_corpus_end_to_end(tmp_path): | |
| tasks = [_task(i) for i in range(6)] | |
| lay = RunLayout(root=str(tmp_path), run_id="e2e") | |
| manifest = RunManifest(run_id="e2e", created_at="2026-06-09T00:00:00Z", source="fixture") | |
| out = build_corpus(tasks, _env, _passing_policy, lay, manifest, | |
| holdout_frac=0.34, holdout_seed=7) | |
| # Holdout exclusion: holdout tasks were never rolled out. | |
| assert out.counts["tasks_holdout"] >= 1 | |
| assert out.counts["rollouts"] == out.counts["tasks_train"] | |
| # Full passes routed to SFT (post-dedup near-identical rows collapse — | |
| # the fixture tasks produce near-identical messages, which is itself a | |
| # realistic dedup scenario). | |
| assert out.counts["sft_rows"] >= 1 | |
| assert out.counts["quarantined"] == 0 | |
| # Files exist and the SFT corpus never leaks the sentinel. | |
| sft_blob = Path(lay.sft_path).read_text() | |
| assert "SENTINEL_NEVER_LEAK" not in sft_blob | |
| assert Path(lay.card_path).exists() | |
| assert Path(lay.holdout_path).exists() | |
| def test_build_corpus_quarantines_failures(tmp_path): | |
| tasks = [_task(i) for i in range(3)] | |
| lay = RunLayout(root=str(tmp_path), run_id="fail") | |
| manifest = RunManifest(run_id="fail", created_at="2026-06-09T00:00:00Z", source="fixture") | |
| out = build_corpus(tasks, _env, _failing_policy, lay, manifest, | |
| holdout_frac=0.34, holdout_seed=7) | |
| assert out.counts["sft_rows"] == 0 | |
| assert out.counts["quarantined"] == out.counts["rollouts"] > 0 | |
| def test_build_corpus_budget_stop_marks_partial(tmp_path): | |
| tasks = [_task(i) for i in range(6)] | |
| lay = RunLayout(root=str(tmp_path), run_id="budget") | |
| manifest = RunManifest(run_id="budget", created_at="2026-06-09T00:00:00Z", | |
| source="fixture", budget_usd=0.25) | |
| out = build_corpus(tasks, _env, _passing_policy, lay, manifest, | |
| holdout_frac=0.2, holdout_seed=7, | |
| cost_per_rollout_usd=0.1) | |
| assert out.status == "partial" | |
| assert out.counts["rollouts"] < out.counts["tasks_train"] | |
| def test_build_corpus_is_write_once(tmp_path): | |
| tasks = [_task(i) for i in range(3)] | |
| lay = RunLayout(root=str(tmp_path), run_id="once") | |
| m1 = RunManifest(run_id="once", created_at="2026-06-09T00:00:00Z", source="fixture") | |
| build_corpus(tasks, _env, _passing_policy, lay, m1, holdout_frac=0.34) | |
| m2 = RunManifest(run_id="once", created_at="2026-06-09T00:00:01Z", source="fixture") | |
| with pytest.raises(FileExistsError, match="write-once"): | |
| build_corpus(tasks, _env, _passing_policy, lay, m2, holdout_frac=0.34) | |
| def test_dataset_card_contents(tmp_path): | |
| lay = RunLayout(root=str(tmp_path), run_id="card") | |
| m = RunManifest(run_id="card", created_at="2026-06-09T00:00:00Z", | |
| source="fixture", counts={"sft_rows": 3}) | |
| write_dataset_card(lay, m, license_tiers={"REDISTRIBUTABLE": 3}, | |
| dedup_stats={"rows_kept": 3}) | |
| card = Path(lay.card_path).read_text() | |
| assert "run `card`" in card | |
| assert "sft_rows: 3" in card | |
| assert "REDISTRIBUTABLE: 3" in card | |
| assert "Decontamination" in card | |
| # --------------------------------------------------------------------- | |
| # Wave-21 adversarial-review regressions | |
| # --------------------------------------------------------------------- | |
| def test_budget_is_a_hard_ceiling(tmp_path): | |
| """Review P1: cost must never exceed budget (pre-charge check).""" | |
| tasks = [_task(i) for i in range(6)] | |
| lay = RunLayout(root=str(tmp_path), run_id="hardcap") | |
| manifest = RunManifest(run_id="hardcap", created_at="2026-06-09T00:00:00Z", | |
| source="fixture", budget_usd=0.25) | |
| out = build_corpus(tasks, _env, _passing_policy, lay, manifest, | |
| holdout_frac=0.2, holdout_seed=7, | |
| cost_per_rollout_usd=0.1) | |
| assert out.cost_usd <= out.budget_usd | |
| assert out.status == "partial" | |
| def test_run_id_path_traversal_rejected(): | |
| """Review P2: separators / .. in run_id must be rejected at construction.""" | |
| for bad in ("../../escape", "a/b", "a\\b", "", ".."): | |
| with pytest.raises(ValueError, match="path"): | |
| RunLayout(root="/data", run_id=bad) | |
| def test_dedup_stats_partition_disjoint(): | |
| """Review P2: a row that is both within-run and cross-gen dup counts once.""" | |
| prior_sigs = [minhash_signature(_TEXT_A)] | |
| rows = [{"text": _TEXT_A}, {"text": _TEXT_A2}] | |
| kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5, | |
| prior_signatures=prior_sigs) | |
| total_dropped = stats["dropped_within_run"] + stats["dropped_cross_generation"] | |
| assert total_dropped == stats["rows_in"] - stats["rows_kept"] | |