#!/usr/bin/env python3 """ Phase 1: Open-ended inference on cluster (multi-GPU, no internet needed). Runs both Base and SFT models on the 1533 open-ended physics test set. Saves raw model outputs for later judging. Usage (inside Docker container): cd /tmp && python3 /path/to/eval_openended_inference.py Output: sft_eval_footprint/inference_results_base.jsonl sft_eval_footprint/inference_results_sft.jsonl """ import os import sys import json import re import time import torch import multiprocessing as mp from collections import Counter # ============ CONFIG ============ os.environ["HF_HUB_OFFLINE"] = "1" os.environ["TRANSFORMERS_OFFLINE"] = "1" BASE_MODEL = "/workspace/rl4phyx/models/Qwen2.5-VL-3B-Instruct" SFT_MODEL = "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft/final" TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/test_1533_openended.jsonl" OUTPUT_DIR = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint" IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/MetaPhyX/test_images" # Multi-GPU config: base on GPUs 0-3, SFT on GPUs 4-7 BASE_GPUS = [0, 1, 2, 3] SFT_GPUS = [4, 5, 6, 7] MAX_NEW_TOKENS = 2048 # ================================ def load_test_data(): """Load test samples from JSONL.""" samples = [] with open(TEST_FILE, 'r', encoding='utf-8') as f: for line in f: if line.strip(): samples.append(json.loads(line)) return samples def build_open_ended_prompt(sample): """Build an open-ended prompt (no MCQ options).""" desc = sample.get('description', '') question = sample.get('question', '') prompt = f"""Look at the image and answer the physics question. {desc} {question} Please reason step by step, and put your final answer within \\boxed{{}}. """ return prompt.strip() def worker_inference(gpu_id, model_path, samples, output_file, model_name): """Worker: load model on specific GPU and run inference on assigned samples.""" import torch from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor from qwen_vl_utils import process_vision_info from PIL import Image device = f"cuda:{gpu_id}" print(f"[{model_name}][GPU {gpu_id}] Loading model...", flush=True) processor = AutoProcessor.from_pretrained( model_path, min_pixels=3136, max_pixels=200704, local_files_only=True, trust_remote_code=True, ) model = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_path, torch_dtype=torch.bfloat16, device_map=device, local_files_only=True, trust_remote_code=True, ) model.eval() print(f"[{model_name}][GPU {gpu_id}] Model loaded. Processing {len(samples)} samples.", flush=True) results = [] for i, sample in enumerate(samples): idx = sample['index'] prompt_text = build_open_ended_prompt(sample) image_path = os.path.join(IMAGE_DIR, sample['image']) # Build messages messages = [{ "role": "user", "content": [ {"type": "image", "image": f"file://{image_path}"}, {"type": "text", "text": prompt_text}, ], }] try: text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) image_inputs, video_inputs = process_vision_info(messages) inputs = processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ).to(device) with torch.no_grad(): output_ids = model.generate(**inputs, max_new_tokens=MAX_NEW_TOKENS) generated = output_ids[0][inputs.input_ids.shape[1]:] response = processor.decode(generated, skip_special_tokens=True) except Exception as e: response = f"ERROR: {str(e)}" result = { "index": idx, "category": sample['category'], "subfield": sample.get('subfield', ''), "question": sample['question'], "ground_truth_value": sample['ground_truth_value'], "ground_truth_letter": sample.get('ground_truth_letter', ''), "model_output": response, "model_name": model_name, "gpu_id": gpu_id, } results.append(result) if (i + 1) % 20 == 0 or (i + 1) == len(samples): print(f"[{model_name}][GPU {gpu_id}] {i+1}/{len(samples)} done", flush=True) # Write results with open(output_file, 'w', encoding='utf-8') as f: for r in results: f.write(json.dumps(r, ensure_ascii=False) + '\n') print(f"[{model_name}][GPU {gpu_id}] Saved {len(results)} results to {output_file}", flush=True) return len(results) def run_model_parallel(model_path, model_name, gpu_ids, samples, output_base): """Split samples across GPUs and run in parallel.""" n = len(samples) k = len(gpu_ids) chunk_size = (n + k - 1) // k processes = [] output_files = [] for i, gpu_id in enumerate(gpu_ids): chunk = samples[i * chunk_size: (i + 1) * chunk_size] if not chunk: continue out_file = f"{output_base}_gpu{gpu_id}.jsonl" output_files.append(out_file) p = mp.Process( target=worker_inference, args=(gpu_id, model_path, chunk, out_file, model_name) ) processes.append(p) for p in processes: p.start() for p in processes: p.join() return output_files def merge_results(output_files, final_output): """Merge per-GPU result files into one.""" all_results = [] for f in output_files: if os.path.exists(f): with open(f, 'r', encoding='utf-8') as fh: for line in fh: if line.strip(): all_results.append(json.loads(line)) # Sort by index for consistency all_results.sort(key=lambda x: x['index']) with open(final_output, 'w', encoding='utf-8') as f: for r in all_results: f.write(json.dumps(r, ensure_ascii=False) + '\n') # Cleanup per-GPU files for f in output_files: if os.path.exists(f): os.remove(f) return all_results def main(): mp.set_start_method('spawn', force=True) os.makedirs(OUTPUT_DIR, exist_ok=True) print("=" * 60) print(" OPEN-ENDED EVAL: Base vs SFT (Multi-GPU)") print(f" Base model: {BASE_MODEL}") print(f" SFT model: {SFT_MODEL}") print(f" Base GPUs: {BASE_GPUS}") print(f" SFT GPUs: {SFT_GPUS}") print("=" * 60) # Load test data samples = load_test_data() print(f"\nLoaded {len(samples)} test samples") cats = Counter(s['category'] for s in samples) for cat, cnt in sorted(cats.items(), key=lambda x: -x[1]): print(f" {cat}: {cnt}") # Run both models (each uses 4 GPUs internally for parallel inference) t0 = time.time() base_output = os.path.join(OUTPUT_DIR, "inference_results_base") sft_output = os.path.join(OUTPUT_DIR, "inference_results_sft") # Run base model on GPUs 0-3 (4 workers in parallel) print("\n>>> Starting BASE model inference...", flush=True) run_model_parallel(BASE_MODEL, "base", BASE_GPUS, samples, base_output) # Run SFT model on GPUs 4-7 (4 workers in parallel) print("\n>>> Starting SFT model inference...", flush=True) run_model_parallel(SFT_MODEL, "sft", SFT_GPUS, samples, sft_output) # Merge results base_files = [f"{base_output}_gpu{g}.jsonl" for g in BASE_GPUS] sft_files = [f"{sft_output}_gpu{g}.jsonl" for g in SFT_GPUS] base_final = os.path.join(OUTPUT_DIR, "inference_results_base.jsonl") sft_final = os.path.join(OUTPUT_DIR, "inference_results_sft.jsonl") base_results = merge_results(base_files, base_final) sft_results = merge_results(sft_files, sft_final) elapsed = time.time() - t0 print(f"\n{'=' * 60}") print(f" INFERENCE COMPLETE in {elapsed/60:.1f} min") print(f" Base results: {len(base_results)} → {base_final}") print(f" SFT results: {len(sft_results)} → {sft_final}") print(f"{'=' * 60}") if __name__ == '__main__': main()