"""Run a structured CORP-ENV data pipeline for SFT -> RLVR. This script standardizes where artifacts are written so data prep stays tidy: - raw/imported: imported E1/M1 generated examples - raw/synthetic: synthetic seed traces (H1 by default) - processed/verified: clean/rejected outputs after strict verification - sft/merged: final SFT chat JSONL It wraps existing scripts and keeps compatibility by optionally writing the legacy flat output files as copies. """ from __future__ import annotations import argparse import shutil import subprocess import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] def run(cmd: list[str], *, use_uv: bool) -> None: final_cmd = (["uv", "run"] + cmd) if use_uv else cmd print("+", " ".join(final_cmd)) subprocess.run(final_cmd, check=True, cwd=ROOT) def ensure_dirs(paths: list[Path]) -> None: for path in paths: path.mkdir(parents=True, exist_ok=True) def copy_if_exists(src: Path, dst: Path) -> None: if src.exists(): dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, dst) def main() -> None: parser = argparse.ArgumentParser(description="Structured data pipeline for CORP-ENV.") parser.add_argument("--python", default=sys.executable) parser.add_argument( "--use-uv", action="store_true", help="Run each pipeline stage with `uv run` to use the project environment.", ) parser.add_argument("--h1-per-task", type=int, default=24) parser.add_argument("--h1-variant-stride", type=int, default=2) parser.add_argument("--min-pass-rate", type=float, default=0.85) parser.add_argument("--min-reasoning-steps", type=int, default=1) parser.add_argument("--min-conflict-steps", type=int, default=0) parser.add_argument("--min-resolution-steps", type=int, default=0) parser.add_argument("--max-per-task", type=int, default=0) parser.add_argument( "--write-legacy-copies", action="store_true", help="Also copy outputs to legacy flat paths for backward compatibility.", ) args = parser.parse_args() data = ROOT / "data" raw_imported = data / "raw" / "imported" raw_synthetic = data / "raw" / "synthetic" processed_verified = data / "processed" / "verified" sft_merged = data / "sft" / "merged" summaries = ROOT / "results" / "data_pipeline" ensure_dirs([raw_imported, raw_synthetic, processed_verified, sft_merged, summaries]) e1m1_raw = raw_imported / "e1_m1_examples.jsonl" h1_raw = raw_synthetic / "h1_seed.jsonl" e1m1_clean = processed_verified / "e1_m1_clean.jsonl" e1m1_rejected = processed_verified / "e1_m1_rejected.jsonl" h1_clean = processed_verified / "h1_seed_clean.jsonl" h1_rejected = processed_verified / "h1_seed_rejected.jsonl" sft_out = sft_merged / "e1_m1_h1_examples.jsonl" run( [ args.python, "scripts/import_generated_examples.py", "--inputs", "data/raw/e1_to_e100_tasks.py", "data/raw/m1_to_m100_tasks.py", "--output", str(e1m1_raw), ], use_uv=args.use_uv, ) run( [ args.python, "scripts/generate_sft_data.py", "--tasks", "h1_acquisition_defence", "--per-task", str(args.h1_per_task), "--variant-stride", str(args.h1_variant_stride), "--output", str(h1_raw), ], use_uv=args.use_uv, ) run( [ args.python, "scripts/verify_examples.py", "--input", str(e1m1_raw), "--clean", str(e1m1_clean), "--rejected", str(e1m1_rejected), "--all-records", str(processed_verified / "e1_m1_all_records.jsonl"), "--summary", str(summaries / "e1_m1_summary.json"), "--strict-json", "--require-stepwise-deliberation", ], use_uv=args.use_uv, ) run( [ args.python, "scripts/verify_examples.py", "--input", str(h1_raw), "--clean", str(h1_clean), "--rejected", str(h1_rejected), "--all-records", str(processed_verified / "h1_seed_all_records.jsonl"), "--summary", str(summaries / "h1_seed_summary.json"), "--strict-json", "--require-stepwise-deliberation", ], use_uv=args.use_uv, ) prep_cmd = [ args.python, "scripts/prepare_sft_data.py", "--input", str(e1m1_clean), "--input", str(h1_clean), "--output", str(sft_out), "--min-pass-rate", str(args.min_pass_rate), "--min-reasoning-steps", str(args.min_reasoning_steps), "--min-conflict-steps", str(args.min_conflict_steps), "--min-resolution-steps", str(args.min_resolution_steps), "--require-stepwise-deliberation", ] if args.max_per_task > 0: prep_cmd.extend(["--max-per-task", str(args.max_per_task)]) run(prep_cmd, use_uv=args.use_uv) if args.write_legacy_copies: copy_if_exists(e1m1_raw, data / "raw" / "e1_m1_examples.jsonl") copy_if_exists(h1_raw, data / "raw" / "h1_seed.jsonl") copy_if_exists(e1m1_clean, data / "processed" / "e1_m1_clean.jsonl") copy_if_exists(e1m1_rejected, data / "processed" / "e1_m1_rejected.jsonl") copy_if_exists(h1_clean, data / "processed" / "h1_seed_clean.jsonl") copy_if_exists(h1_rejected, data / "processed" / "h1_seed_rejected.jsonl") copy_if_exists(sft_out, data / "sft" / "e1_m1_h1_examples.jsonl") print("\nStructured data pipeline complete.") print(f"SFT dataset: {sft_out}") print(f"Summaries: {summaries}") if __name__ == "__main__": main()