| |
| """ |
| Run eval.py across all checkpoints and datasets in parallel (multi-GPU), |
| and collect results to ./eval.csv. |
| |
| - Discovers checkpoints under outputs/checkpoint-* |
| - Evaluates on: data/test/*.parquet and data/val/*.parquet |
| - Uses up to N GPUs concurrently (default: 4) by setting CUDA_VISIBLE_DEVICES |
| - Parses the "Summary ..." line(s) from eval.py logs |
| - Appends rows to ./eval.csv |
| |
| Example: |
| python batch_eval.py \ |
| --outputs_dir outputs \ |
| --embeddings_dir embeddings \ |
| --datasets data/test/*.parquet data/val/*.parquet \ |
| --splits test val \ |
| --num_samples 12800 \ |
| --batch_size 4 \ |
| --gpus 0 1 2 3 \ |
| --eval_script eval.py \ |
| --device cuda |
| |
| Notes: |
| - This script *does not* modify your eval.py. It just orchestrates/launches it. |
| - Requires Python 3.8+ (standard library only). |
| """ |
|
|
| import argparse |
| import csv |
| import os |
| import re |
| import sys |
| import time |
| import glob |
| import queue |
| import threading |
| import subprocess |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
| TF_SUMMARY_RE = re.compile( |
| r"Summary over\s+(\d+)\s+samples\s+→.*?CE=([-\d\.eE]+).*?CODON-acc=([-\d\.eE]+).*?AA-acc=([-\d\.eE]+)" |
| ) |
| EVALALL_SUMMARY_RE = re.compile( |
| r"Full-dataset summary.*?tokens=(\d+).*?CE=([-\d\.eE]+).*?CODON-acc=([-\d\.eE]+).*?AA-acc=([-\d\.eE]+)" |
| ) |
|
|
| CSV_FIELDS = [ |
| "timestamp_iso", |
| "model_path", |
| "checkpoint_step", |
| "split", |
| "data_path", |
| "num_samples", |
| "batch_size", |
| "seed", |
| "eval_all", |
| "gpu_id", |
| "runtime_sec", |
| "tokens", |
| "mean_ce", |
| "mean_codon_acc", |
| "mean_aa_acc", |
| "status", |
| "error", |
| "command", |
| ] |
|
|
|
|
| def parse_args(): |
| p = argparse.ArgumentParser(description="Parallel evaluator for CodonTranslator checkpoints.") |
| p.add_argument("--outputs_dir", type=str, default="outputs/", help="Folder containing checkpoint-* subdirs.") |
| p.add_argument("--embeddings_dir", type=str, default="embeddings/", help="Embeddings dir to pass to eval.py") |
| p.add_argument("--datasets", nargs="+", default=["data/test/*.parquet", "data/val/*.parquet"], |
| help="One or more dataset globs.") |
| p.add_argument("--splits", nargs="+", default=["test", "val"], |
| help="Split names aligned with --datasets (same length).") |
| p.add_argument("--num_samples", type=int, default=12800, help="num_samples for eval.py (random subset mode)") |
| p.add_argument("--batch_size", type=int, default=4, help="batch_size for eval.py") |
| p.add_argument("--seed", type=int, default=42, help="seed for eval.py") |
| p.add_argument("--device", type=str, default="cuda", choices=["cuda", "cpu"], help="Device flag for eval.py") |
| p.add_argument("--gpus", nargs="+", default=["0", "1", "2", "3"], help="GPU IDs to use (as CUDA_VISIBLE_DEVICES)") |
| p.add_argument("--eval_script", type=str, default="eval.py", help="Path to eval.py") |
| p.add_argument("--csv_path", type=str, default="eval.csv", help="Output CSV file") |
| p.add_argument("--eval_all", action="store_true", |
| help="Use eval.py --eval_all (streaming, no num_samples). If set, ignores --num_samples.") |
| p.add_argument("--workers", type=int, default=4, |
| help="--workers passed to eval.py when --eval_all is set.") |
| p.add_argument("--dry_run", action="store_true", help="List planned runs but do not execute.") |
| |
| p.add_argument("--start_after_step", type=int, default=-1, |
| help="Only evaluate checkpoints with step > this value (e.g., 73700)") |
| p.add_argument("--end_step", type=int, default=-1, |
| help="If >0, only evaluate checkpoints with step <= this value") |
| p.add_argument("--skip_existing", dest="skip_existing", action="store_true", default=True, |
| help="Skip tasks already recorded as OK in csv_path") |
| p.add_argument("--no-skip-existing", dest="skip_existing", action="store_false", |
| help="Do not skip existing OK rows; re-run everything in range") |
| return p.parse_args() |
|
|
|
|
| def natural_step(dirpath: Path) -> int: |
| """ |
| Extract integer step from a checkpoint dir name like 'checkpoint-21000'. |
| Returns -1 if not found. |
| """ |
| m = re.search(r"checkpoint-(\d+)", dirpath.name) |
| return int(m.group(1)) if m else -1 |
|
|
|
|
| def discover_checkpoints(outputs_dir: str) -> list[Path]: |
| paths = sorted( |
| (Path(p) for p in glob.glob(os.path.join(outputs_dir, "checkpoint-*")) if os.path.isdir(p)), |
| key=lambda p: natural_step(p), |
| ) |
| |
| filtered = [] |
| for p in paths: |
| has_config = (p / "config.json").exists() or (p / "trainer_config.json").exists() |
| has_weights = (p / "model.safetensors").exists() or (p / "pytorch_model.bin").exists() |
| if has_config and has_weights: |
| filtered.append(p) |
| return filtered |
|
|
|
|
| def build_cmd(py_exec: str, |
| eval_script: str, |
| model_path: str, |
| data_path: str, |
| embeddings_dir: str, |
| device: str, |
| num_samples: int, |
| batch_size: int, |
| seed: int, |
| eval_all: bool, |
| workers: int) -> list[str]: |
| cmd = [py_exec, eval_script, |
| "--model_path", model_path, |
| "--data_path", data_path, |
| "--embeddings_dir", embeddings_dir, |
| "--batch_size", str(batch_size), |
| "--device", device, |
| "--seed", str(seed)] |
| if eval_all: |
| cmd += ["--eval_all", "--workers", str(workers)] |
| else: |
| cmd += ["--num_samples", str(num_samples)] |
| return cmd |
|
|
|
|
| def parse_metrics(stdout: str, stderr: str) -> dict: |
| """ |
| Return dict with keys: tokens, mean_ce, mean_codon_acc, mean_aa_acc (strings), |
| or raise ValueError if no summary line was found. |
| """ |
| text = stdout + "\n" + stderr |
|
|
| |
| m = EVALALL_SUMMARY_RE.search(text) |
| if m: |
| tokens, ce, codon, aa = m.groups() |
| return {"tokens": tokens, "mean_ce": ce, "mean_codon_acc": codon, "mean_aa_acc": aa} |
|
|
| |
| m = TF_SUMMARY_RE.search(text) |
| if m: |
| _samples, ce, codon, aa = m.groups() |
| return {"tokens": "", "mean_ce": ce, "mean_codon_acc": codon, "mean_aa_acc": aa} |
|
|
| |
| raise ValueError("Could not find summary line in eval.py output.") |
|
|
|
|
| def run_one(task: dict, gpu_queue: "queue.Queue[str]", csv_lock: threading.Lock) -> dict: |
| """ |
| Execute one eval.py call using a GPU from the queue. Returns a row dict for CSV. |
| """ |
| gpu_id = gpu_queue.get() |
| start = time.time() |
| status = "OK" |
| err_text = "" |
|
|
| try: |
| env = os.environ.copy() |
| |
| env["CUDA_VISIBLE_DEVICES"] = str(gpu_id) |
| env.setdefault("TOKENIZERS_PARALLELISM", "false") |
| env.setdefault("CUDA_DEVICE_ORDER", "PCI_BUS_ID") |
|
|
| result = subprocess.run( |
| task["cmd"], |
| env=env, |
| capture_output=True, |
| text=True, |
| check=False, |
| ) |
|
|
| try: |
| metrics = parse_metrics(result.stdout, result.stderr) |
| except Exception as e: |
| status = "FAIL" |
| err_text = f"{e}\n--- STDOUT ---\n{result.stdout}\n--- STDERR ---\n{result.stderr}" |
| metrics = {"tokens": "", "mean_ce": "", "mean_codon_acc": "", "mean_aa_acc": ""} |
|
|
| if result.returncode != 0 and status == "OK": |
| status = "FAIL" |
| err_text = f"Non-zero exit code {result.returncode}\n--- STDOUT ---\n{result.stdout}\n--- STDERR ---\n{result.stderr}" |
|
|
| finally: |
| runtime = time.time() - start |
| gpu_queue.put(gpu_id) |
|
|
| row = { |
| "timestamp_iso": time.strftime("%Y-%m-%dT%H:%M:%S"), |
| "model_path": task["model_path"], |
| "checkpoint_step": task["step"], |
| "split": task["split"], |
| "data_path": task["data_path"], |
| "num_samples": task["num_samples"] if not task["eval_all"] else "", |
| "batch_size": task["batch_size"], |
| "seed": task["seed"], |
| "eval_all": str(task["eval_all"]), |
| "gpu_id": str(gpu_id), |
| "runtime_sec": f"{runtime:.2f}", |
| "tokens": metrics.get("tokens", ""), |
| "mean_ce": metrics.get("mean_ce", ""), |
| "mean_codon_acc": metrics.get("mean_codon_acc", ""), |
| "mean_aa_acc": metrics.get("mean_aa_acc", ""), |
| "status": status, |
| "error": err_text.strip(), |
| "command": " ".join(task["cmd"]), |
| } |
| return row |
|
|
|
|
| def ensure_csv(path: str): |
| """Create CSV with header if it does not exist.""" |
| need_header = not os.path.exists(path) or os.path.getsize(path) == 0 |
| if need_header: |
| with open(path, "w", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=CSV_FIELDS) |
| w.writeheader() |
|
|
|
|
| def read_completed_keys(path: str) -> set[tuple[int, str, str]]: |
| """ |
| Read existing CSV and return a set of (step, split, data_path) for rows with status == 'OK'. |
| If CSV does not exist, returns empty set. |
| """ |
| keys: set[tuple[int, str, str]] = set() |
| if not os.path.exists(path) or os.path.getsize(path) == 0: |
| return keys |
| try: |
| with open(path, "r", newline="") as f: |
| r = csv.DictReader(f) |
| for row in r: |
| if (row.get("status") or "").strip().upper() == "OK": |
| try: |
| step = int(row.get("checkpoint_step", "-1")) |
| except ValueError: |
| continue |
| split = row.get("split", "") |
| data_path = row.get("data_path", "") |
| keys.add((step, split, data_path)) |
| except Exception: |
| |
| pass |
| return keys |
|
|
|
|
| def append_row(path: str, row: dict, lock: threading.Lock): |
| with lock: |
| with open(path, "a", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=CSV_FIELDS) |
| w.writerow(row) |
| f.flush() |
|
|
|
|
| def main(): |
| args = parse_args() |
|
|
| if len(args.datasets) != len(args.splits): |
| print("ERROR: --datasets and --splits must have the same length.", file=sys.stderr) |
| sys.exit(2) |
|
|
| checkpoints = discover_checkpoints(args.outputs_dir) |
| if not checkpoints: |
| print(f"No checkpoints found in {args.outputs_dir}/checkpoint-*", file=sys.stderr) |
| sys.exit(1) |
|
|
| print(f"Discovered {len(checkpoints)} checkpoints.") |
| ds_pairs = list(zip(args.splits, args.datasets)) |
| print(f"Datasets: {', '.join([f'{s}:{d}' for s, d in ds_pairs])}") |
| print(f"GPUs: {', '.join(args.gpus)}") |
| print(f"Writing results to: {args.csv_path}") |
| if args.start_after_step >= 0: |
| print(f"Filtering: step > {args.start_after_step}") |
| if args.end_step > 0: |
| print(f"Filtering: step <= {args.end_step}") |
| print(f"Skip existing OK rows in CSV: {args.skip_existing}") |
|
|
| |
| py_exec = sys.executable |
| tasks = [] |
| completed_keys = read_completed_keys(args.csv_path) if args.skip_existing else set() |
| for ckpt in checkpoints: |
| step = natural_step(ckpt) |
| |
| if args.start_after_step >= 0 and step <= args.start_after_step: |
| continue |
| if args.end_step > 0 and step > args.end_step: |
| continue |
| for split, data_path in ds_pairs: |
| |
| if (step, split, data_path) in completed_keys: |
| continue |
| cmd = build_cmd( |
| py_exec=py_exec, |
| eval_script=args.eval_script, |
| model_path=str(ckpt), |
| data_path=data_path, |
| embeddings_dir=args.embeddings_dir, |
| device=args.device, |
| num_samples=args.num_samples, |
| batch_size=args.batch_size, |
| seed=args.seed, |
| eval_all=args.eval_all, |
| workers=args.workers, |
| ) |
| tasks.append({ |
| "model_path": str(ckpt), |
| "step": step, |
| "split": split, |
| "data_path": data_path, |
| "num_samples": args.num_samples, |
| "batch_size": args.batch_size, |
| "seed": args.seed, |
| "eval_all": args.eval_all, |
| "cmd": cmd, |
| }) |
|
|
| |
| if args.dry_run: |
| for t in tasks: |
| print(f"[DRY RUN] GPU=? step={t['step']} split={t['split']} -> {' '.join(t['cmd'])}") |
| print(f"Planned runs: {len(tasks)}") |
| return |
|
|
| |
| ensure_csv(args.csv_path) |
| csv_lock = threading.Lock() |
|
|
| |
| gpu_queue: "queue.Queue[str]" = queue.Queue() |
| for gid in args.gpus: |
| gpu_queue.put(str(gid)) |
|
|
| |
| max_workers = max(1, len(args.gpus)) |
| with ThreadPoolExecutor(max_workers=max_workers) as ex: |
| futures = [ex.submit(run_one, t, gpu_queue, csv_lock) for t in tasks] |
| completed = 0 |
| total = len(futures) |
| for fut in as_completed(futures): |
| row = fut.result() |
| append_row(args.csv_path, row, csv_lock) |
| completed += 1 |
| if row["status"] == "OK": |
| print(f"[{completed}/{total}] ✅ step={row['checkpoint_step']} split={row['split']} " |
| f"CE={row['mean_ce']} CODON={row['mean_codon_acc']} AA={row['mean_aa_acc']} " |
| f"gpu={row['gpu_id']} in {row['runtime_sec']}s") |
| else: |
| print(f"[{completed}/{total}] ❌ step={row['checkpoint_step']} split={row['split']} " |
| f"gpu={row['gpu_id']} See CSV 'error' column for details.") |
|
|
| print(f"Done. Results appended to {args.csv_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|