Baladithya Balamurugan
Wave 21: adversarial-review fixes — all 9 verified findings closed
3bbcf21
Raw
History Blame Contribute Delete
10.5 kB
"""Tests for the Stage-0 pipeline: contract, dedup, driver.
Load-bearing coverage: the sentinel leak guard on write_tasks (finding D-8),
holdout exclusion + budget stop + idempotency in build_corpus (D-21), and the
cross-generation dedup path (D-12).
"""
from __future__ import annotations
import io
import json
from pathlib import Path
import pytest
from composer_replication.datagen.env import FeatureDeletionEnv
from composer_replication.datagen.rollout_harness import ScriptedPolicy
from composer_replication.datagen.sandbox import FakeSandbox
from composer_replication.datagen.schema import FeatureDeletionTask
from composer_replication.datagen.trajectory import ToolCall
from composer_replication.pipeline.build_corpus import build_corpus
from composer_replication.pipeline.dedup import (
dedup,
find_near_duplicates,
jaccard_estimate,
load_signatures,
minhash_signature,
signatures_to_jsonl,
)
from composer_replication.pipeline.s3_contract import (
RunLayout,
RunManifest,
write_dataset_card,
write_tasks,
write_tasks_full,
)
def _task(i: int, **over) -> FeatureDeletionTask:
base = dict(
task_id=f"task-{i:03d}", repo="org/repo", base_commit="abc",
broken_image="img:1", test_command="pytest -q",
fail_to_pass=(f"t/a.py::t{i}",), pass_to_pass=("t/a.py::keep",),
golden_diff="SENTINEL_NEVER_LEAK", deleted_symbols=("secret_fn",),
)
base.update(over)
return FeatureDeletionTask(**base)
# ---------------------------------------------------------------------
# RunLayout / RunManifest
# ---------------------------------------------------------------------
def test_layout_paths_are_pure_and_namespaced():
lay = RunLayout(root="/data/corpora", run_id="run42")
assert lay.sft_path == "/data/corpora/runs/run42/corpus_sft/rows.jsonl"
assert lay.manifest_path == "/data/corpora/runs/run42/manifest.json"
s3 = RunLayout(root="s3://bucket/prefix/", run_id="r")
assert s3.tasks_path == "s3://bucket/prefix/runs/r/tasks/manifest.jsonl"
def test_manifest_round_trip_and_budget(tmp_path):
lay = RunLayout(root=str(tmp_path), run_id="r1")
m = RunManifest(run_id="r1", created_at="2026-06-09T00:00:00Z",
source="test", budget_usd=1.0)
m.spend(0.4)
assert not m.over_budget
m.spend(0.6)
assert m.over_budget
m.write(lay)
m2 = RunManifest.read(lay)
assert m2.cost_usd == pytest.approx(1.0)
assert m2.budget_usd == 1.0
# ---------------------------------------------------------------------
# THE leak guard (finding D-8)
# ---------------------------------------------------------------------
def test_write_tasks_never_leaks_golden_diff(tmp_path):
lay = RunLayout(root=str(tmp_path), run_id="r1")
write_tasks(lay, [_task(1)])
blob = Path(lay.tasks_path).read_text()
assert "SENTINEL_NEVER_LEAK" not in blob
assert "secret_fn" not in blob
row = json.loads(blob.splitlines()[0])
assert row["golden_diff_sha256"] # provenance preserved as a hash
# The restricted full writer DOES carry it (construction side only).
write_tasks_full(lay, [_task(1)])
assert "SENTINEL_NEVER_LEAK" in Path(lay.tasks_full_path).read_text()
# ---------------------------------------------------------------------
# MinHash dedup
# ---------------------------------------------------------------------
_TEXT_A = "the quick brown fox jumps over the lazy dog and then runs far away home tonight"
_TEXT_A2 = "the quick brown fox jumps over the lazy dog and then runs far away home today"
_TEXT_B = "import numpy as np def main(): return np.zeros(10) print(main()) totally different content here"
def test_jaccard_estimate_near_duplicates_high_disjoint_low():
sa, sa2, sb = (minhash_signature(t) for t in (_TEXT_A, _TEXT_A2, _TEXT_B))
assert jaccard_estimate(sa, sa2) > 0.5
assert jaccard_estimate(sa, sb) < 0.2
assert jaccard_estimate(sa, sa) == 1.0
def test_dedup_keeps_first_and_drops_near_dup():
rows = [{"text": _TEXT_A}, {"text": _TEXT_A2}, {"text": _TEXT_B}]
kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5)
assert [r["text"] for r in kept] == [_TEXT_A, _TEXT_B]
assert stats["dropped_within_run"] == 1
def test_cross_generation_dedup_via_signature_file():
prior_rows = [{"text": _TEXT_A}]
buf = io.StringIO()
signatures_to_jsonl(prior_rows, lambda r: r["text"], buf)
buf.seek(0)
prior_sigs = load_signatures(buf)
rows = [{"text": _TEXT_A2}, {"text": _TEXT_B}]
kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5,
prior_signatures=prior_sigs)
assert [r["text"] for r in kept] == [_TEXT_B]
assert stats["dropped_cross_generation"] == 1
def test_find_near_duplicates_pairs():
rows = [{"t": _TEXT_A}, {"t": _TEXT_A2}]
assert find_near_duplicates(rows, lambda r: r["t"], 0.5) == [(0, 1)]
# ---------------------------------------------------------------------
# build_corpus end-to-end (FakeSandbox + ScriptedPolicy)
# ---------------------------------------------------------------------
def _passing_policy():
# Flips both this task's F2P tests green generically: FakeSandbox's
# set_outcome takes explicit test names, so the fixture tasks share names
# via the same fail_to_pass tuple pattern; we set a superset.
outcomes = {f"t/a.py::t{i}": True for i in range(20)}
outcomes["t/a.py::keep"] = True
return ScriptedPolicy(actions=[ToolCall("set_outcome", {"outcomes": outcomes}), "done"])
def _failing_policy():
return ScriptedPolicy(actions=["gave up immediately"])
def _env():
return FeatureDeletionEnv(FakeSandbox(test_outcomes={"t/a.py::keep": True}))
def test_build_corpus_end_to_end(tmp_path):
tasks = [_task(i) for i in range(6)]
lay = RunLayout(root=str(tmp_path), run_id="e2e")
manifest = RunManifest(run_id="e2e", created_at="2026-06-09T00:00:00Z", source="fixture")
out = build_corpus(tasks, _env, _passing_policy, lay, manifest,
holdout_frac=0.34, holdout_seed=7)
# Holdout exclusion: holdout tasks were never rolled out.
assert out.counts["tasks_holdout"] >= 1
assert out.counts["rollouts"] == out.counts["tasks_train"]
# Full passes routed to SFT (post-dedup near-identical rows collapse —
# the fixture tasks produce near-identical messages, which is itself a
# realistic dedup scenario).
assert out.counts["sft_rows"] >= 1
assert out.counts["quarantined"] == 0
# Files exist and the SFT corpus never leaks the sentinel.
sft_blob = Path(lay.sft_path).read_text()
assert "SENTINEL_NEVER_LEAK" not in sft_blob
assert Path(lay.card_path).exists()
assert Path(lay.holdout_path).exists()
def test_build_corpus_quarantines_failures(tmp_path):
tasks = [_task(i) for i in range(3)]
lay = RunLayout(root=str(tmp_path), run_id="fail")
manifest = RunManifest(run_id="fail", created_at="2026-06-09T00:00:00Z", source="fixture")
out = build_corpus(tasks, _env, _failing_policy, lay, manifest,
holdout_frac=0.34, holdout_seed=7)
assert out.counts["sft_rows"] == 0
assert out.counts["quarantined"] == out.counts["rollouts"] > 0
def test_build_corpus_budget_stop_marks_partial(tmp_path):
tasks = [_task(i) for i in range(6)]
lay = RunLayout(root=str(tmp_path), run_id="budget")
manifest = RunManifest(run_id="budget", created_at="2026-06-09T00:00:00Z",
source="fixture", budget_usd=0.25)
out = build_corpus(tasks, _env, _passing_policy, lay, manifest,
holdout_frac=0.2, holdout_seed=7,
cost_per_rollout_usd=0.1)
assert out.status == "partial"
assert out.counts["rollouts"] < out.counts["tasks_train"]
def test_build_corpus_is_write_once(tmp_path):
tasks = [_task(i) for i in range(3)]
lay = RunLayout(root=str(tmp_path), run_id="once")
m1 = RunManifest(run_id="once", created_at="2026-06-09T00:00:00Z", source="fixture")
build_corpus(tasks, _env, _passing_policy, lay, m1, holdout_frac=0.34)
m2 = RunManifest(run_id="once", created_at="2026-06-09T00:00:01Z", source="fixture")
with pytest.raises(FileExistsError, match="write-once"):
build_corpus(tasks, _env, _passing_policy, lay, m2, holdout_frac=0.34)
def test_dataset_card_contents(tmp_path):
lay = RunLayout(root=str(tmp_path), run_id="card")
m = RunManifest(run_id="card", created_at="2026-06-09T00:00:00Z",
source="fixture", counts={"sft_rows": 3})
write_dataset_card(lay, m, license_tiers={"REDISTRIBUTABLE": 3},
dedup_stats={"rows_kept": 3})
card = Path(lay.card_path).read_text()
assert "run `card`" in card
assert "sft_rows: 3" in card
assert "REDISTRIBUTABLE: 3" in card
assert "Decontamination" in card
# ---------------------------------------------------------------------
# Wave-21 adversarial-review regressions
# ---------------------------------------------------------------------
def test_budget_is_a_hard_ceiling(tmp_path):
"""Review P1: cost must never exceed budget (pre-charge check)."""
tasks = [_task(i) for i in range(6)]
lay = RunLayout(root=str(tmp_path), run_id="hardcap")
manifest = RunManifest(run_id="hardcap", created_at="2026-06-09T00:00:00Z",
source="fixture", budget_usd=0.25)
out = build_corpus(tasks, _env, _passing_policy, lay, manifest,
holdout_frac=0.2, holdout_seed=7,
cost_per_rollout_usd=0.1)
assert out.cost_usd <= out.budget_usd
assert out.status == "partial"
def test_run_id_path_traversal_rejected():
"""Review P2: separators / .. in run_id must be rejected at construction."""
for bad in ("../../escape", "a/b", "a\\b", "", ".."):
with pytest.raises(ValueError, match="path"):
RunLayout(root="/data", run_id=bad)
def test_dedup_stats_partition_disjoint():
"""Review P2: a row that is both within-run and cross-gen dup counts once."""
prior_sigs = [minhash_signature(_TEXT_A)]
rows = [{"text": _TEXT_A}, {"text": _TEXT_A2}]
kept, stats = dedup(rows, lambda r: r["text"], threshold=0.5,
prior_signatures=prior_sigs)
total_dropped = stats["dropped_within_run"] + stats["dropped_cross_generation"]
assert total_dropped == stats["rows_in"] - stats["rows_kept"]