|
|
| """
|
| Phase 1: Open-ended inference on cluster (multi-GPU, no internet needed).
|
|
|
| Runs both Base and SFT models on the 1533 open-ended physics test set.
|
| Saves raw model outputs for later judging.
|
|
|
| Usage (inside Docker container):
|
| cd /tmp && python3 /path/to/eval_openended_inference.py
|
|
|
| Output:
|
| sft_eval_footprint/inference_results_base.jsonl
|
| sft_eval_footprint/inference_results_sft.jsonl
|
| """
|
| import os
|
| import sys
|
| import json
|
| import re
|
| import time
|
| import torch
|
| import multiprocessing as mp
|
| from collections import Counter
|
|
|
|
|
| os.environ["HF_HUB_OFFLINE"] = "1"
|
| os.environ["TRANSFORMERS_OFFLINE"] = "1"
|
|
|
| BASE_MODEL = "/workspace/rl4phyx/models/Qwen2.5-VL-3B-Instruct"
|
| SFT_MODEL = "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft/final"
|
| TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/test_1533_openended.jsonl"
|
| OUTPUT_DIR = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint"
|
| IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/MetaPhyX/test_images"
|
|
|
|
|
| BASE_GPUS = [0, 1, 2, 3]
|
| SFT_GPUS = [4, 5, 6, 7]
|
| MAX_NEW_TOKENS = 2048
|
|
|
|
|
|
|
| def load_test_data():
|
| """Load test samples from JSONL."""
|
| samples = []
|
| with open(TEST_FILE, 'r', encoding='utf-8') as f:
|
| for line in f:
|
| if line.strip():
|
| samples.append(json.loads(line))
|
| return samples
|
|
|
|
|
| def build_open_ended_prompt(sample):
|
| """Build an open-ended prompt (no MCQ options)."""
|
| desc = sample.get('description', '')
|
| question = sample.get('question', '')
|
|
|
| prompt = f"""Look at the image and answer the physics question.
|
|
|
| {desc}
|
|
|
| {question}
|
|
|
| Please reason step by step, and put your final answer within \\boxed{{}}.
|
| """
|
| return prompt.strip()
|
|
|
|
|
| def worker_inference(gpu_id, model_path, samples, output_file, model_name):
|
| """Worker: load model on specific GPU and run inference on assigned samples."""
|
| import torch
|
| from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
|
| from qwen_vl_utils import process_vision_info
|
| from PIL import Image
|
|
|
| device = f"cuda:{gpu_id}"
|
| print(f"[{model_name}][GPU {gpu_id}] Loading model...", flush=True)
|
|
|
| processor = AutoProcessor.from_pretrained(
|
| model_path,
|
| min_pixels=3136,
|
| max_pixels=200704,
|
| local_files_only=True,
|
| trust_remote_code=True,
|
| )
|
| model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
|
| model_path,
|
| torch_dtype=torch.bfloat16,
|
| device_map=device,
|
| local_files_only=True,
|
| trust_remote_code=True,
|
| )
|
| model.eval()
|
| print(f"[{model_name}][GPU {gpu_id}] Model loaded. Processing {len(samples)} samples.", flush=True)
|
|
|
| results = []
|
| for i, sample in enumerate(samples):
|
| idx = sample['index']
|
| prompt_text = build_open_ended_prompt(sample)
|
| image_path = os.path.join(IMAGE_DIR, sample['image'])
|
|
|
|
|
| messages = [{
|
| "role": "user",
|
| "content": [
|
| {"type": "image", "image": f"file://{image_path}"},
|
| {"type": "text", "text": prompt_text},
|
| ],
|
| }]
|
|
|
| try:
|
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
|
| image_inputs, video_inputs = process_vision_info(messages)
|
| inputs = processor(
|
| text=[text],
|
| images=image_inputs,
|
| videos=video_inputs,
|
| padding=True,
|
| return_tensors="pt",
|
| ).to(device)
|
|
|
| with torch.no_grad():
|
| output_ids = model.generate(**inputs, max_new_tokens=MAX_NEW_TOKENS)
|
|
|
| generated = output_ids[0][inputs.input_ids.shape[1]:]
|
| response = processor.decode(generated, skip_special_tokens=True)
|
| except Exception as e:
|
| response = f"ERROR: {str(e)}"
|
|
|
| result = {
|
| "index": idx,
|
| "category": sample['category'],
|
| "subfield": sample.get('subfield', ''),
|
| "question": sample['question'],
|
| "ground_truth_value": sample['ground_truth_value'],
|
| "ground_truth_letter": sample.get('ground_truth_letter', ''),
|
| "model_output": response,
|
| "model_name": model_name,
|
| "gpu_id": gpu_id,
|
| }
|
| results.append(result)
|
|
|
| if (i + 1) % 20 == 0 or (i + 1) == len(samples):
|
| print(f"[{model_name}][GPU {gpu_id}] {i+1}/{len(samples)} done", flush=True)
|
|
|
|
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| for r in results:
|
| f.write(json.dumps(r, ensure_ascii=False) + '\n')
|
|
|
| print(f"[{model_name}][GPU {gpu_id}] Saved {len(results)} results to {output_file}", flush=True)
|
| return len(results)
|
|
|
|
|
| def run_model_parallel(model_path, model_name, gpu_ids, samples, output_base):
|
| """Split samples across GPUs and run in parallel."""
|
| n = len(samples)
|
| k = len(gpu_ids)
|
| chunk_size = (n + k - 1) // k
|
|
|
| processes = []
|
| output_files = []
|
| for i, gpu_id in enumerate(gpu_ids):
|
| chunk = samples[i * chunk_size: (i + 1) * chunk_size]
|
| if not chunk:
|
| continue
|
| out_file = f"{output_base}_gpu{gpu_id}.jsonl"
|
| output_files.append(out_file)
|
| p = mp.Process(
|
| target=worker_inference,
|
| args=(gpu_id, model_path, chunk, out_file, model_name)
|
| )
|
| processes.append(p)
|
|
|
| for p in processes:
|
| p.start()
|
| for p in processes:
|
| p.join()
|
|
|
| return output_files
|
|
|
|
|
| def merge_results(output_files, final_output):
|
| """Merge per-GPU result files into one."""
|
| all_results = []
|
| for f in output_files:
|
| if os.path.exists(f):
|
| with open(f, 'r', encoding='utf-8') as fh:
|
| for line in fh:
|
| if line.strip():
|
| all_results.append(json.loads(line))
|
|
|
|
|
| all_results.sort(key=lambda x: x['index'])
|
|
|
| with open(final_output, 'w', encoding='utf-8') as f:
|
| for r in all_results:
|
| f.write(json.dumps(r, ensure_ascii=False) + '\n')
|
|
|
|
|
| for f in output_files:
|
| if os.path.exists(f):
|
| os.remove(f)
|
|
|
| return all_results
|
|
|
|
|
| def main():
|
| mp.set_start_method('spawn', force=True)
|
| os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
|
| print("=" * 60)
|
| print(" OPEN-ENDED EVAL: Base vs SFT (Multi-GPU)")
|
| print(f" Base model: {BASE_MODEL}")
|
| print(f" SFT model: {SFT_MODEL}")
|
| print(f" Base GPUs: {BASE_GPUS}")
|
| print(f" SFT GPUs: {SFT_GPUS}")
|
| print("=" * 60)
|
|
|
|
|
| samples = load_test_data()
|
| print(f"\nLoaded {len(samples)} test samples")
|
|
|
| cats = Counter(s['category'] for s in samples)
|
| for cat, cnt in sorted(cats.items(), key=lambda x: -x[1]):
|
| print(f" {cat}: {cnt}")
|
|
|
|
|
| t0 = time.time()
|
|
|
| base_output = os.path.join(OUTPUT_DIR, "inference_results_base")
|
| sft_output = os.path.join(OUTPUT_DIR, "inference_results_sft")
|
|
|
|
|
| print("\n>>> Starting BASE model inference...", flush=True)
|
| run_model_parallel(BASE_MODEL, "base", BASE_GPUS, samples, base_output)
|
|
|
|
|
| print("\n>>> Starting SFT model inference...", flush=True)
|
| run_model_parallel(SFT_MODEL, "sft", SFT_GPUS, samples, sft_output)
|
|
|
|
|
| base_files = [f"{base_output}_gpu{g}.jsonl" for g in BASE_GPUS]
|
| sft_files = [f"{sft_output}_gpu{g}.jsonl" for g in SFT_GPUS]
|
|
|
| base_final = os.path.join(OUTPUT_DIR, "inference_results_base.jsonl")
|
| sft_final = os.path.join(OUTPUT_DIR, "inference_results_sft.jsonl")
|
|
|
| base_results = merge_results(base_files, base_final)
|
| sft_results = merge_results(sft_files, sft_final)
|
|
|
| elapsed = time.time() - t0
|
| print(f"\n{'=' * 60}")
|
| print(f" INFERENCE COMPLETE in {elapsed/60:.1f} min")
|
| print(f" Base results: {len(base_results)} → {base_final}")
|
| print(f" SFT results: {len(sft_results)} → {sft_final}")
|
| print(f"{'=' * 60}")
|
|
|
|
|
| if __name__ == '__main__':
|
| main()
|
|
|