#!/usr/bin/env python3 """Evaluate 2 SFT models on open-ended 1533 test set, 1 round each.""" import os, sys, json, torch from PIL import Image from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor from qwen_vl_utils import process_vision_info import multiprocessing as mp # Config TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/test_1533_openended.jsonl" IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/MetaPhyX/test_images" MODELS = { "phyx": { "path": "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft_phyx/final", "output": "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/inference_results_phyx.jsonl", "gpus": [0, 1, 2, 3], }, "phyx_50000": { "path": "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft_phyx_50000/final", "output": "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/inference_results_phyx_50000.jsonl", "gpus": [4, 5, 6, 7], }, } def build_open_ended_prompt(sample): q = sample.get("question", "") return f"Look at this image and answer the physics question. Think step by step and put your final answer in \\boxed{{}}.\n\nQuestion: {q}" def run_inference_on_gpu(gpu_id, model_path, samples, output_file): """Run inference for a subset of samples on a specific GPU.""" device = f"cuda:{gpu_id}" print(f"[GPU {gpu_id}] Loading model from {model_path}...", flush=True) model = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_path, torch_dtype=torch.bfloat16, device_map=device, trust_remote_code=True, local_files_only=True, attn_implementation="sdpa", ) model.eval() processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True, local_files_only=True) print(f"[GPU {gpu_id}] Processing {len(samples)} samples...", flush=True) results = [] for idx, sample in enumerate(samples): try: image_path = os.path.join(IMAGE_DIR, sample["image"]) if not os.path.exists(image_path): results.append({**sample, "prediction": "ERROR: image not found"}) continue prompt_text = build_open_ended_prompt(sample) messages = [{"role": "user", "content": [ {"type": "image", "image": f"file://{image_path}"}, {"type": "text", "text": prompt_text}, ]}] text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) image_inputs, video_inputs = process_vision_info(messages) inputs = processor(text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt").to(device) with torch.no_grad(): output_ids = model.generate(**inputs, max_new_tokens=2048, temperature=0.1, do_sample=False) output_text = processor.batch_decode(output_ids[:, inputs.input_ids.shape[1]:], skip_special_tokens=True)[0] results.append({**sample, "prediction": output_text}) if (idx + 1) % 50 == 0: print(f"[GPU {gpu_id}] {idx+1}/{len(samples)} done", flush=True) except Exception as e: print(f"[GPU {gpu_id}] Error on sample {idx}: {e}", flush=True) results.append({**sample, "prediction": f"ERROR: {str(e)}"}) # Write results to temp file tmp_file = f"{output_file}_gpu{gpu_id}.jsonl" with open(tmp_file, "w") as f: for r in results: f.write(json.dumps(r, ensure_ascii=False) + "\n") print(f"[GPU {gpu_id}] Saved {len(results)} results to {tmp_file}", flush=True) def run_model_eval(name, config): """Run evaluation for a single model across its assigned GPUs.""" print(f"\n{'='*50}") print(f"Evaluating model: {name}") print(f"Path: {config['path']}") print(f"GPUs: {config['gpus']}") print(f"{'='*50}", flush=True) # Load test data with open(TEST_FILE) as f: samples = [json.loads(line) for line in f] print(f"Loaded {len(samples)} test samples", flush=True) # Split samples across GPUs gpus = config["gpus"] chunks = [[] for _ in gpus] for i, s in enumerate(samples): chunks[i % len(gpus)].append(s) # Launch parallel processes processes = [] for gpu_id, chunk in zip(gpus, chunks): p = mp.Process(target=run_inference_on_gpu, args=(gpu_id, config["path"], chunk, config["output"])) p.start() processes.append(p) for p in processes: p.join() # Merge results all_results = [] for gpu_id in gpus: tmp_file = f"{config['output']}_gpu{gpu_id}.jsonl" if os.path.exists(tmp_file): with open(tmp_file) as f: all_results.extend([json.loads(l) for l in f]) os.remove(tmp_file) with open(config["output"], "w") as f: for r in all_results: f.write(json.dumps(r, ensure_ascii=False) + "\n") print(f"Merged {len(all_results)} results to {config['output']}", flush=True) if __name__ == "__main__": mp.set_start_method("spawn") # Run first model (phyx) on GPUs 0-3 run_model_eval("phyx", MODELS["phyx"]) # Run second model (phyx_50000) on GPUs 4-7 run_model_eval("phyx_50000", MODELS["phyx_50000"]) print("\nALL_EVAL_DONE")