""" Multi-GPU parallel scoring using subprocess workers. Splits a CSV of video edits across multiple GPUs for faster inference. Usage: python examples/multi_gpu_scoring.py \ --csv edits.csv \ --output results.csv \ --num_gpus 4 """ import argparse import csv import json import os import subprocess import sys import tempfile def worker_main(args): """Single-GPU worker: load model, score shard, write results.""" import torch from vefx_reward import VEFXReward with open(args.shard_file) as f: shard = json.load(f) model = VEFXReward(args.model, device="cuda:0") results = [] for i, item in enumerate(shard): try: scores = model.score(item["original_video"], item["edited_video"], item["instruction"]) results.append({**item, **scores}) print(f"[GPU {args.gpu_id}] [{i+1}/{len(shard)}] " f"IF={scores['IF']:.2f} RQ={scores['RQ']:.2f} EE={scores['EE']:.2f}", flush=True) except Exception as e: print(f"[GPU {args.gpu_id}] [{i+1}/{len(shard)}] ERROR: {e}", flush=True) results.append({**item, "IF": None, "RQ": None, "EE": None, "Overall": None, "error": str(e)}) with open(args.output_file, "w") as f: json.dump(results, f) print(f"[GPU {args.gpu_id}] Done — {len(results)} results", flush=True) def main(): parser = argparse.ArgumentParser(description="Multi-GPU video edit scoring") parser.add_argument("--csv", required=True, help="Input CSV") parser.add_argument("--output", default="results.csv", help="Output CSV") parser.add_argument("--model", default="VEFX-Reward/VEFX-Reward-4B") parser.add_argument("--num_gpus", type=int, default=4) # Internal worker args parser.add_argument("--_worker", action="store_true", help=argparse.SUPPRESS) parser.add_argument("--gpu_id", type=int, default=0, help=argparse.SUPPRESS) parser.add_argument("--shard_file", type=str, default="", help=argparse.SUPPRESS) parser.add_argument("--output_file", type=str, default="", help=argparse.SUPPRESS) args = parser.parse_args() if args._worker: worker_main(args) return # --- Launcher mode --- with open(args.csv) as f: rows = list(csv.DictReader(f)) print(f"Loaded {len(rows)} samples, distributing across {args.num_gpus} GPUs") items = [dict(row) for row in rows] shards = [[] for _ in range(args.num_gpus)] for i, item in enumerate(items): shards[i % args.num_gpus].append(item) tmpdir = tempfile.mkdtemp(prefix="vefx_multi_") script = os.path.abspath(__file__) procs = [] for gid in range(args.num_gpus): if not shards[gid]: continue sf = os.path.join(tmpdir, f"shard_{gid}.json") of = os.path.join(tmpdir, f"result_{gid}.json") with open(sf, "w") as f: json.dump(shards[gid], f) env = os.environ.copy() env["CUDA_VISIBLE_DEVICES"] = str(gid) env["TOKENIZERS_PARALLELISM"] = "false" p = subprocess.Popen( [sys.executable, script, "--_worker", "--gpu_id", str(gid), "--shard_file", sf, "--output_file", of, "--model", args.model], env=env, stdout=sys.stdout, stderr=sys.stderr, ) procs.append((p, of)) for p, _ in procs: p.wait() # Merge results all_results = [] for _, of in procs: if os.path.exists(of): with open(of) as f: all_results.extend(json.load(f)) fieldnames = list(rows[0].keys()) + ["IF", "RQ", "EE", "Overall"] with open(args.output, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() writer.writerows(all_results) print(f"\nAll done — {len(all_results)} results saved to {args.output}") import shutil shutil.rmtree(tmpdir, ignore_errors=True) if __name__ == "__main__": main()