"""Tests for the Stage-0 pipeline: contract, dedup, driver. Load-bearing coverage: the sentinel leak guard on write_tasks (finding D-8), holdout exclusion + budget stop + idempotency in build_corpus (D-21), and the cross-generation dedup path (D-12). """ from __future__ import annotations import io import json from pathlib import Path import pytest from composer_replication.datagen.env import FeatureDeletionEnv from composer_replication.datagen.rollout_harness import ScriptedPolicy from composer_replication.datagen.sandbox import FakeSandbox from composer_replication.datagen.schema import FeatureDeletionTask from composer_replication.datagen.trajectory import ToolCall from composer_replication.pipeline.build_corpus import build_corpus from composer_replication.pipeline.dedup import ( dedup, find_near_duplicates, jaccard_estimate, load_signatures, minhash_signature, signatures_to_jsonl, ) from composer_replication.pipeline.s3_contract import ( RunLayout, RunManifest, write_dataset_card, write_tasks, write_tasks_full, ) def _task(i: int, **over) -> FeatureDeletionTask: base = dict( task_id=f"task-{i:03d}", repo="org/repo", base_commit="abc", broken_image="img:1", test_command="pytest -q", fail_to_pass=(f"t/a.py::t{i}",), pass_to_pass=("t/a.py::keep",), golden_diff="SENTINEL_NEVER_LEAK", deleted_symbols=("secret_fn",), ) base.update(over) return FeatureDeletionTask(**base) # --------------------------------------------------------------------- # RunLayout / RunManifest # --------------------------------------------------------------------- def test_layout_paths_are_pure_and_namespaced(): lay = RunLayout(root="/data/corpora", run_id="run42") assert lay.sft_path == "/data/corpora/runs/run42/corpus_sft/rows.jsonl" assert lay.manifest_path == "/data/corpora/runs/run42/manifest.json" s3 = RunLayout(root="s3://bucket/prefix/", run_id="r") assert s3.tasks_path == "s3://bucket/prefix/runs/r/tasks/manifest.jsonl" def test_manifest_round_trip_and_budget(tmp_path): lay = RunLayout(root=str(tmp_path), run_id="r1") m = RunManifest(run_id="r1", created_at="2026-06-09T00:00:00Z", source="test", budget_usd=1.0) m.spend(0.4) assert not m.over_budget m.spend(0.6) assert m.over_budget m.write(lay) m2 = RunManifest.read(lay) assert m2.cost_usd == pytest.approx(1.0) assert m2.budget_usd == 1.0 # --------------------------------------------------------------------- # THE leak guard (finding D-8) # --------------------------------------------------------------------- def test_write_tasks_never_leaks_golden_diff(tmp_path): lay = RunLayout(root=str(tmp_path), run_id="r1") write_tasks(lay, [_task(1)]) blob = Path(lay.tasks_path).read_text() assert "SENTINEL_NEVER_LEAK" not in blob assert "secret_fn" not in blob row = json.loads(blob.splitlines()[0]) assert row["golden_diff_sha256"] # provenance preserved as a hash # The restricted full writer DOES carry it (construction side only). write_tasks_full(lay, [_task(1)]) assert "SENTINEL_NEVER_LEAK" in Path(lay.tasks_full_path).read_text() # --------------------------------------------------------------------- # MinHash dedup # --------------------------------------------------------------------- _TEXT_A = "the quick brown fox jumps over the lazy dog and then runs far away home tonight" _TEXT_A2 = "the quick brown fox jumps over the lazy dog and then runs far away home today" _TEXT_B = "import numpy as np def main(): return np.zeros(10) print(main()) totally different content here" def test_jaccard_estimate_near_duplicates_high_disjoint_low(): sa, sa2, sb = (minhash_signature(t) for t in (_TEXT_A, _TEXT_A2, _TEXT_B)) assert jaccard_estimate(sa, sa2) > 0.5 assert jaccard_estimate(sa, sb) < 0.2 assert jaccard_estimate(sa, sa) == 1.0 def test_dedup_keeps_first_and_drops_near_dup(): rows = [{"text": _TEXT_A}, {"text": _TEXT_A2}, {"text": _TEXT_B}] kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5) assert [r["text"] for r in kept] == [_TEXT_A, _TEXT_B] assert stats["dropped_within_run"] == 1 def test_cross_generation_dedup_via_signature_file(): prior_rows = [{"text": _TEXT_A}] buf = io.StringIO() signatures_to_jsonl(prior_rows, lambda r: r["text"], buf) buf.seek(0) prior_sigs = load_signatures(buf) rows = [{"text": _TEXT_A2}, {"text": _TEXT_B}] kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5, prior_signatures=prior_sigs) assert [r["text"] for r in kept] == [_TEXT_B] assert stats["dropped_cross_generation"] == 1 def test_find_near_duplicates_pairs(): rows = [{"t": _TEXT_A}, {"t": _TEXT_A2}] assert find_near_duplicates(rows, lambda r: r["t"], 0.5) == [(0, 1)] # --------------------------------------------------------------------- # build_corpus end-to-end (FakeSandbox + ScriptedPolicy) # --------------------------------------------------------------------- def _passing_policy(): # Flips both this task's F2P tests green generically: FakeSandbox's # set_outcome takes explicit test names, so the fixture tasks share names # via the same fail_to_pass tuple pattern; we set a superset. outcomes = {f"t/a.py::t{i}": True for i in range(20)} outcomes["t/a.py::keep"] = True return ScriptedPolicy(actions=[ToolCall("set_outcome", {"outcomes": outcomes}), "done"]) def _failing_policy(): return ScriptedPolicy(actions=["gave up immediately"]) def _env(): return FeatureDeletionEnv(FakeSandbox(test_outcomes={"t/a.py::keep": True})) def test_build_corpus_end_to_end(tmp_path): tasks = [_task(i) for i in range(6)] lay = RunLayout(root=str(tmp_path), run_id="e2e") manifest = RunManifest(run_id="e2e", created_at="2026-06-09T00:00:00Z", source="fixture") out = build_corpus(tasks, _env, _passing_policy, lay, manifest, holdout_frac=0.34, holdout_seed=7) # Holdout exclusion: holdout tasks were never rolled out. assert out.counts["tasks_holdout"] >= 1 assert out.counts["rollouts"] == out.counts["tasks_train"] # Full passes routed to SFT (post-dedup near-identical rows collapse — # the fixture tasks produce near-identical messages, which is itself a # realistic dedup scenario). assert out.counts["sft_rows"] >= 1 assert out.counts["quarantined"] == 0 # Files exist and the SFT corpus never leaks the sentinel. sft_blob = Path(lay.sft_path).read_text() assert "SENTINEL_NEVER_LEAK" not in sft_blob assert Path(lay.card_path).exists() assert Path(lay.holdout_path).exists() def test_build_corpus_quarantines_failures(tmp_path): tasks = [_task(i) for i in range(3)] lay = RunLayout(root=str(tmp_path), run_id="fail") manifest = RunManifest(run_id="fail", created_at="2026-06-09T00:00:00Z", source="fixture") out = build_corpus(tasks, _env, _failing_policy, lay, manifest, holdout_frac=0.34, holdout_seed=7) assert out.counts["sft_rows"] == 0 assert out.counts["quarantined"] == out.counts["rollouts"] > 0 def test_build_corpus_budget_stop_marks_partial(tmp_path): tasks = [_task(i) for i in range(6)] lay = RunLayout(root=str(tmp_path), run_id="budget") manifest = RunManifest(run_id="budget", created_at="2026-06-09T00:00:00Z", source="fixture", budget_usd=0.25) out = build_corpus(tasks, _env, _passing_policy, lay, manifest, holdout_frac=0.2, holdout_seed=7, cost_per_rollout_usd=0.1) assert out.status == "partial" assert out.counts["rollouts"] < out.counts["tasks_train"] def test_build_corpus_is_write_once(tmp_path): tasks = [_task(i) for i in range(3)] lay = RunLayout(root=str(tmp_path), run_id="once") m1 = RunManifest(run_id="once", created_at="2026-06-09T00:00:00Z", source="fixture") build_corpus(tasks, _env, _passing_policy, lay, m1, holdout_frac=0.34) m2 = RunManifest(run_id="once", created_at="2026-06-09T00:00:01Z", source="fixture") with pytest.raises(FileExistsError, match="write-once"): build_corpus(tasks, _env, _passing_policy, lay, m2, holdout_frac=0.34) def test_dataset_card_contents(tmp_path): lay = RunLayout(root=str(tmp_path), run_id="card") m = RunManifest(run_id="card", created_at="2026-06-09T00:00:00Z", source="fixture", counts={"sft_rows": 3}) write_dataset_card(lay, m, license_tiers={"REDISTRIBUTABLE": 3}, dedup_stats={"rows_kept": 3}) card = Path(lay.card_path).read_text() assert "run `card`" in card assert "sft_rows: 3" in card assert "REDISTRIBUTABLE: 3" in card assert "Decontamination" in card # --------------------------------------------------------------------- # Wave-21 adversarial-review regressions # --------------------------------------------------------------------- def test_budget_is_a_hard_ceiling(tmp_path): """Review P1: cost must never exceed budget (pre-charge check).""" tasks = [_task(i) for i in range(6)] lay = RunLayout(root=str(tmp_path), run_id="hardcap") manifest = RunManifest(run_id="hardcap", created_at="2026-06-09T00:00:00Z", source="fixture", budget_usd=0.25) out = build_corpus(tasks, _env, _passing_policy, lay, manifest, holdout_frac=0.2, holdout_seed=7, cost_per_rollout_usd=0.1) assert out.cost_usd <= out.budget_usd assert out.status == "partial" def test_run_id_path_traversal_rejected(): """Review P2: separators / .. in run_id must be rejected at construction.""" for bad in ("../../escape", "a/b", "a\\b", "", ".."): with pytest.raises(ValueError, match="path"): RunLayout(root="/data", run_id=bad) def test_dedup_stats_partition_disjoint(): """Review P2: a row that is both within-run and cross-gen dup counts once.""" prior_sigs = [minhash_signature(_TEXT_A)] rows = [{"text": _TEXT_A}, {"text": _TEXT_A2}] kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5, prior_signatures=prior_sigs) total_dropped = stats["dropped_within_run"] + stats["dropped_cross_generation"] assert total_dropped == stats["rows_in"] - stats["rows_kept"]