rl4phyx-backup / root_scripts /eval_phyx_final.py
YUNTA88's picture
Upload root_scripts/eval_phyx_final.py with huggingface_hub
0f9dccc verified
#!/usr/bin/env python3
"""
Phase 1: Open-ended inference on cluster (multi-GPU, no internet needed).
Runs both Base and SFT models on the 1533 open-ended physics test set.
Saves raw model outputs for later judging.
Usage (inside Docker container):
cd /tmp && python3 /path/to/eval_openended_inference.py
Output:
sft_eval_footprint/inference_results_base.jsonl
sft_eval_footprint/inference_results_phyx.jsonl
"""
import os
import sys
import json
import re
import time
import torch
import multiprocessing as mp
from collections import Counter
# ============ CONFIG ============
os.environ["HF_HUB_OFFLINE"] = "1"
os.environ["TRANSFORMERS_OFFLINE"] = "1"
BASE_MODEL = "/workspace/rl4phyx/models/Qwen2.5-VL-3B-Instruct"
SFT_MODEL = "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft_phyx/final"
TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/test_1533_openended.jsonl"
OUTPUT_DIR = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint"
IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/MetaPhyX/test_images"
# Multi-GPU config: base on GPUs 0-3, SFT on GPUs 4-7
BASE_GPUS = [0, 1, 2, 3]
SFT_GPUS = [0, 1, 2, 3]
MAX_NEW_TOKENS = 2048
# ================================
def load_test_data():
"""Load test samples from JSONL."""
samples = []
with open(TEST_FILE, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
samples.append(json.loads(line))
return samples
def build_open_ended_prompt(sample):
"""Build an open-ended prompt (no MCQ options)."""
desc = sample.get('description', '')
question = sample.get('question', '')
prompt = f"""Look at the image and answer the physics question.
{desc}
{question}
Please reason step by step, and put your final answer within \\boxed{{}}.
"""
return prompt.strip()
def worker_inference(gpu_id, model_path, samples, output_file, model_name):
"""Worker: load model on specific GPU and run inference on assigned samples."""
import torch
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
from PIL import Image
device = f"cuda:{gpu_id}"
print(f"[{model_name}][GPU {gpu_id}] Loading model...", flush=True)
processor = AutoProcessor.from_pretrained(
model_path,
min_pixels=3136,
max_pixels=200704,
local_files_only=True,
trust_remote_code=True,
)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
device_map=device,
local_files_only=True,
trust_remote_code=True,
)
model.eval()
print(f"[{model_name}][GPU {gpu_id}] Model loaded. Processing {len(samples)} samples.", flush=True)
results = []
for i, sample in enumerate(samples):
idx = sample['index']
prompt_text = build_open_ended_prompt(sample)
image_path = os.path.join(IMAGE_DIR, sample['image'])
# Build messages
messages = [{
"role": "user",
"content": [
{"type": "image", "image": f"file://{image_path}"},
{"type": "text", "text": prompt_text},
],
}]
try:
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
).to(device)
with torch.no_grad():
output_ids = model.generate(**inputs, max_new_tokens=MAX_NEW_TOKENS)
generated = output_ids[0][inputs.input_ids.shape[1]:]
response = processor.decode(generated, skip_special_tokens=True)
except Exception as e:
response = f"ERROR: {str(e)}"
result = {
"index": idx,
"category": sample['category'],
"subfield": sample.get('subfield', ''),
"question": sample['question'],
"ground_truth_value": sample['ground_truth_value'],
"ground_truth_letter": sample.get('ground_truth_letter', ''),
"model_output": response,
"model_name": model_name,
"gpu_id": gpu_id,
}
results.append(result)
if (i + 1) % 20 == 0 or (i + 1) == len(samples):
print(f"[{model_name}][GPU {gpu_id}] {i+1}/{len(samples)} done", flush=True)
# Write results
with open(output_file, 'w', encoding='utf-8') as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + '\n')
print(f"[{model_name}][GPU {gpu_id}] Saved {len(results)} results to {output_file}", flush=True)
return len(results)
def run_model_parallel(model_path, model_name, gpu_ids, samples, output_base):
"""Split samples across GPUs and run in parallel."""
n = len(samples)
k = len(gpu_ids)
chunk_size = (n + k - 1) // k
processes = []
output_files = []
for i, gpu_id in enumerate(gpu_ids):
chunk = samples[i * chunk_size: (i + 1) * chunk_size]
if not chunk:
continue
out_file = f"{output_base}_gpu{gpu_id}.jsonl"
output_files.append(out_file)
p = mp.Process(
target=worker_inference,
args=(gpu_id, model_path, chunk, out_file, model_name)
)
processes.append(p)
for p in processes:
p.start()
for p in processes:
p.join()
return output_files
def merge_results(output_files, final_output):
"""Merge per-GPU result files into one."""
all_results = []
for f in output_files:
if os.path.exists(f):
with open(f, 'r', encoding='utf-8') as fh:
for line in fh:
if line.strip():
all_results.append(json.loads(line))
# Sort by index for consistency
all_results.sort(key=lambda x: x['index'])
with open(final_output, 'w', encoding='utf-8') as f:
for r in all_results:
f.write(json.dumps(r, ensure_ascii=False) + '\n')
# Cleanup per-GPU files
for f in output_files:
if os.path.exists(f):
os.remove(f)
return all_results
def main():
mp.set_start_method('spawn', force=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
print("=" * 60)
print(" OPEN-ENDED EVAL: Base vs SFT (Multi-GPU)")
print(f" Base model: {BASE_MODEL}")
print(f" SFT model: {SFT_MODEL}")
print(f" Base GPUs: {BASE_GPUS}")
print(f" SFT GPUs: {SFT_GPUS}")
print("=" * 60)
# Load test data
samples = load_test_data()
print(f"\nLoaded {len(samples)} test samples")
cats = Counter(s['category'] for s in samples)
for cat, cnt in sorted(cats.items(), key=lambda x: -x[1]):
print(f" {cat}: {cnt}")
# Run both models (each uses 4 GPUs internally for parallel inference)
t0 = time.time()
base_output = os.path.join(OUTPUT_DIR, "inference_results_base")
sft_output = os.path.join(OUTPUT_DIR, "inference_results_phyx")
# Run base model on GPUs 0-3 (4 workers in parallel)
pass # SKIP BASE
# Run SFT model on GPUs 4-7 (4 workers in parallel)
print("\n>>> Starting SFT model inference...", flush=True)
run_model_parallel(SFT_MODEL, "sft", SFT_GPUS, samples, sft_output)
# Merge results
base_files = [f"{base_output}_gpu{g}.jsonl" for g in BASE_GPUS]
sft_files = [f"{sft_output}_gpu{g}.jsonl" for g in SFT_GPUS]
base_final = os.path.join(OUTPUT_DIR, "inference_results_base.jsonl")
sft_final = os.path.join(OUTPUT_DIR, "inference_results_phyx.jsonl")
base_results = []
sft_results = merge_results(sft_files, sft_final)
elapsed = time.time() - t0
print(f"\n{'=' * 60}")
print(f" INFERENCE COMPLETE in {elapsed/60:.1f} min")
print(f" Base results: {len(base_results)}{base_final}")
print(f" SFT results: {len(sft_results)}{sft_final}")
print(f"{'=' * 60}")
if __name__ == '__main__':
main()