File size: 8,216 Bytes
0f9dccc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/env python3
"""
Phase 1: Open-ended inference on cluster (multi-GPU, no internet needed).

Runs both Base and SFT models on the 1533 open-ended physics test set.
Saves raw model outputs for later judging.

Usage (inside Docker container):
    cd /tmp && python3 /path/to/eval_openended_inference.py

Output:
    sft_eval_footprint/inference_results_base.jsonl
    sft_eval_footprint/inference_results_phyx.jsonl
"""
import os
import sys
import json
import re
import time
import torch
import multiprocessing as mp
from collections import Counter

# ============ CONFIG ============
os.environ["HF_HUB_OFFLINE"] = "1"
os.environ["TRANSFORMERS_OFFLINE"] = "1"

BASE_MODEL = "/workspace/rl4phyx/models/Qwen2.5-VL-3B-Instruct"
SFT_MODEL = "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b_fullft_phyx/final"
TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint/test_1533_openended.jsonl"
OUTPUT_DIR = "/workspace/rl4phyx/RL4Phyx/SFT/sft_eval_footprint"
IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/MetaPhyX/test_images"

# Multi-GPU config: base on GPUs 0-3, SFT on GPUs 4-7
BASE_GPUS = [0, 1, 2, 3]
SFT_GPUS = [0, 1, 2, 3]
MAX_NEW_TOKENS = 2048
# ================================


def load_test_data():
    """Load test samples from JSONL."""
    samples = []
    with open(TEST_FILE, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                samples.append(json.loads(line))
    return samples


def build_open_ended_prompt(sample):
    """Build an open-ended prompt (no MCQ options)."""
    desc = sample.get('description', '')
    question = sample.get('question', '')

    prompt = f"""Look at the image and answer the physics question.

{desc}

{question}

Please reason step by step, and put your final answer within \\boxed{{}}.
"""
    return prompt.strip()


def worker_inference(gpu_id, model_path, samples, output_file, model_name):
    """Worker: load model on specific GPU and run inference on assigned samples."""
    import torch
    from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
    from qwen_vl_utils import process_vision_info
    from PIL import Image

    device = f"cuda:{gpu_id}"
    print(f"[{model_name}][GPU {gpu_id}] Loading model...", flush=True)

    processor = AutoProcessor.from_pretrained(
        model_path,
        min_pixels=3136,
        max_pixels=200704,
        local_files_only=True,
        trust_remote_code=True,
    )
    model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
        model_path,
        torch_dtype=torch.bfloat16,
        device_map=device,
        local_files_only=True,
        trust_remote_code=True,
    )
    model.eval()
    print(f"[{model_name}][GPU {gpu_id}] Model loaded. Processing {len(samples)} samples.", flush=True)

    results = []
    for i, sample in enumerate(samples):
        idx = sample['index']
        prompt_text = build_open_ended_prompt(sample)
        image_path = os.path.join(IMAGE_DIR, sample['image'])

        # Build messages
        messages = [{
            "role": "user",
            "content": [
                {"type": "image", "image": f"file://{image_path}"},
                {"type": "text", "text": prompt_text},
            ],
        }]

        try:
            text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
            image_inputs, video_inputs = process_vision_info(messages)
            inputs = processor(
                text=[text],
                images=image_inputs,
                videos=video_inputs,
                padding=True,
                return_tensors="pt",
            ).to(device)

            with torch.no_grad():
                output_ids = model.generate(**inputs, max_new_tokens=MAX_NEW_TOKENS)

            generated = output_ids[0][inputs.input_ids.shape[1]:]
            response = processor.decode(generated, skip_special_tokens=True)
        except Exception as e:
            response = f"ERROR: {str(e)}"

        result = {
            "index": idx,
            "category": sample['category'],
            "subfield": sample.get('subfield', ''),
            "question": sample['question'],
            "ground_truth_value": sample['ground_truth_value'],
            "ground_truth_letter": sample.get('ground_truth_letter', ''),
            "model_output": response,
            "model_name": model_name,
            "gpu_id": gpu_id,
        }
        results.append(result)

        if (i + 1) % 20 == 0 or (i + 1) == len(samples):
            print(f"[{model_name}][GPU {gpu_id}] {i+1}/{len(samples)} done", flush=True)

    # Write results
    with open(output_file, 'w', encoding='utf-8') as f:
        for r in results:
            f.write(json.dumps(r, ensure_ascii=False) + '\n')

    print(f"[{model_name}][GPU {gpu_id}] Saved {len(results)} results to {output_file}", flush=True)
    return len(results)


def run_model_parallel(model_path, model_name, gpu_ids, samples, output_base):
    """Split samples across GPUs and run in parallel."""
    n = len(samples)
    k = len(gpu_ids)
    chunk_size = (n + k - 1) // k

    processes = []
    output_files = []
    for i, gpu_id in enumerate(gpu_ids):
        chunk = samples[i * chunk_size: (i + 1) * chunk_size]
        if not chunk:
            continue
        out_file = f"{output_base}_gpu{gpu_id}.jsonl"
        output_files.append(out_file)
        p = mp.Process(
            target=worker_inference,
            args=(gpu_id, model_path, chunk, out_file, model_name)
        )
        processes.append(p)

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    return output_files


def merge_results(output_files, final_output):
    """Merge per-GPU result files into one."""
    all_results = []
    for f in output_files:
        if os.path.exists(f):
            with open(f, 'r', encoding='utf-8') as fh:
                for line in fh:
                    if line.strip():
                        all_results.append(json.loads(line))

    # Sort by index for consistency
    all_results.sort(key=lambda x: x['index'])

    with open(final_output, 'w', encoding='utf-8') as f:
        for r in all_results:
            f.write(json.dumps(r, ensure_ascii=False) + '\n')

    # Cleanup per-GPU files
    for f in output_files:
        if os.path.exists(f):
            os.remove(f)

    return all_results


def main():
    mp.set_start_method('spawn', force=True)
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    print("=" * 60)
    print("  OPEN-ENDED EVAL: Base vs SFT (Multi-GPU)")
    print(f"  Base model: {BASE_MODEL}")
    print(f"  SFT model:  {SFT_MODEL}")
    print(f"  Base GPUs:  {BASE_GPUS}")
    print(f"  SFT GPUs:   {SFT_GPUS}")
    print("=" * 60)

    # Load test data
    samples = load_test_data()
    print(f"\nLoaded {len(samples)} test samples")

    cats = Counter(s['category'] for s in samples)
    for cat, cnt in sorted(cats.items(), key=lambda x: -x[1]):
        print(f"  {cat}: {cnt}")

    # Run both models (each uses 4 GPUs internally for parallel inference)
    t0 = time.time()

    base_output = os.path.join(OUTPUT_DIR, "inference_results_base")
    sft_output = os.path.join(OUTPUT_DIR, "inference_results_phyx")

    # Run base model on GPUs 0-3 (4 workers in parallel)
    pass  # SKIP BASE

    # Run SFT model on GPUs 4-7 (4 workers in parallel)
    print("\n>>> Starting SFT model inference...", flush=True)
    run_model_parallel(SFT_MODEL, "sft", SFT_GPUS, samples, sft_output)

    # Merge results
    base_files = [f"{base_output}_gpu{g}.jsonl" for g in BASE_GPUS]
    sft_files = [f"{sft_output}_gpu{g}.jsonl" for g in SFT_GPUS]

    base_final = os.path.join(OUTPUT_DIR, "inference_results_base.jsonl")
    sft_final = os.path.join(OUTPUT_DIR, "inference_results_phyx.jsonl")

    base_results = []
    sft_results = merge_results(sft_files, sft_final)

    elapsed = time.time() - t0
    print(f"\n{'=' * 60}")
    print(f"  INFERENCE COMPLETE in {elapsed/60:.1f} min")
    print(f"  Base results: {len(base_results)}{base_final}")
    print(f"  SFT results:  {len(sft_results)}{sft_final}")
    print(f"{'=' * 60}")


if __name__ == '__main__':
    main()