File size: 5,561 Bytes
913f5f7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | #!/usr/bin/env python3
"""Evaluate 2 SFT models on open-ended 1533 test set, 1 round each."""
import os, sys, json, torch
from PIL import Image
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import multiprocessing as mp
# Config
TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/test_1533_openended.jsonl"
IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/MetaPhyX/test_images"
MODELS = {
"phyx": {
"path": "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft_phyx/final",
"output": "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/inference_results_phyx.jsonl",
"gpus": [0, 1, 2, 3],
},
"phyx_50000": {
"path": "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft_phyx_50000/final",
"output": "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/inference_results_phyx_50000.jsonl",
"gpus": [4, 5, 6, 7],
},
}
def build_open_ended_prompt(sample):
q = sample.get("question", "")
return f"Look at this image and answer the physics question. Think step by step and put your final answer in \\boxed{{}}.\n\nQuestion: {q}"
def run_inference_on_gpu(gpu_id, model_path, samples, output_file):
"""Run inference for a subset of samples on a specific GPU."""
device = f"cuda:{gpu_id}"
print(f"[GPU {gpu_id}] Loading model from {model_path}...", flush=True)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
device_map=device,
trust_remote_code=True,
local_files_only=True,
attn_implementation="sdpa",
)
model.eval()
processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True, local_files_only=True)
print(f"[GPU {gpu_id}] Processing {len(samples)} samples...", flush=True)
results = []
for idx, sample in enumerate(samples):
try:
image_path = os.path.join(IMAGE_DIR, sample["image"])
if not os.path.exists(image_path):
results.append({**sample, "prediction": "ERROR: image not found"})
continue
prompt_text = build_open_ended_prompt(sample)
messages = [{"role": "user", "content": [
{"type": "image", "image": f"file://{image_path}"},
{"type": "text", "text": prompt_text},
]}]
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(text=[text], images=image_inputs, videos=video_inputs,
padding=True, return_tensors="pt").to(device)
with torch.no_grad():
output_ids = model.generate(**inputs, max_new_tokens=2048, temperature=0.1, do_sample=False)
output_text = processor.batch_decode(output_ids[:, inputs.input_ids.shape[1]:],
skip_special_tokens=True)[0]
results.append({**sample, "prediction": output_text})
if (idx + 1) % 50 == 0:
print(f"[GPU {gpu_id}] {idx+1}/{len(samples)} done", flush=True)
except Exception as e:
print(f"[GPU {gpu_id}] Error on sample {idx}: {e}", flush=True)
results.append({**sample, "prediction": f"ERROR: {str(e)}"})
# Write results to temp file
tmp_file = f"{output_file}_gpu{gpu_id}.jsonl"
with open(tmp_file, "w") as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f"[GPU {gpu_id}] Saved {len(results)} results to {tmp_file}", flush=True)
def run_model_eval(name, config):
"""Run evaluation for a single model across its assigned GPUs."""
print(f"\n{'='*50}")
print(f"Evaluating model: {name}")
print(f"Path: {config['path']}")
print(f"GPUs: {config['gpus']}")
print(f"{'='*50}", flush=True)
# Load test data
with open(TEST_FILE) as f:
samples = [json.loads(line) for line in f]
print(f"Loaded {len(samples)} test samples", flush=True)
# Split samples across GPUs
gpus = config["gpus"]
chunks = [[] for _ in gpus]
for i, s in enumerate(samples):
chunks[i % len(gpus)].append(s)
# Launch parallel processes
processes = []
for gpu_id, chunk in zip(gpus, chunks):
p = mp.Process(target=run_inference_on_gpu,
args=(gpu_id, config["path"], chunk, config["output"]))
p.start()
processes.append(p)
for p in processes:
p.join()
# Merge results
all_results = []
for gpu_id in gpus:
tmp_file = f"{config['output']}_gpu{gpu_id}.jsonl"
if os.path.exists(tmp_file):
with open(tmp_file) as f:
all_results.extend([json.loads(l) for l in f])
os.remove(tmp_file)
with open(config["output"], "w") as f:
for r in all_results:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f"Merged {len(all_results)} results to {config['output']}", flush=True)
if __name__ == "__main__":
mp.set_start_method("spawn")
# Run first model (phyx) on GPUs 0-3
run_model_eval("phyx", MODELS["phyx"])
# Run second model (phyx_50000) on GPUs 4-7
run_model_eval("phyx_50000", MODELS["phyx_50000"])
print("\nALL_EVAL_DONE")
|