""" Variance-based One-Shot Question Selection =========================================== 对 SFT 模型在 1533 道开放题上各跑 10 次推理, 用精确字符串匹配判断对错,选出方差最大的题目。 用法: docker exec rl4phyx_env python3 /workspace/rl4phyx/RL4Phyx/SFT/variance_select.py 输出: - variance_results.json: 每道题的正确率和方差 - best_question_for_rlvr.json: 方差最大的题目信息 - rlvr_train.parquet: 转好的训练数据 (1题 × 128行) """ import json import re import os import torch import numpy as np from pathlib import Path # ============ 配置 ============ MODEL_PATH = "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b/merged" TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_test.jsonl" IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/SFT/images" OUTPUT_DIR = "/workspace/rl4phyx/RL4Phyx/SFT" NUM_RUNS = 10 # 每题推理次数 MAX_NEW_TOKENS = 1024 # 最大生成长度 (缩短加速,只需提取boxed答案) TEMPERATURE = 0.7 # 采样温度 BATCH_SIZE = 4 # 推理 batch size (根据显存调) RLVR_COPIES = 128 # one-shot 复制次数 def extract_boxed(text): """从模型输出中提取 \\boxed{} 内的答案""" # 找最后一个 \boxed{} matches = re.findall(r'\\boxed\{([^{}]*(?:\{[^{}]*\}[^{}]*)*)\}', text) if matches: return matches[-1].strip() return None def normalize_answer(ans): """轻度归一化:去空格、去末尾句号""" if ans is None: return None ans = ans.strip() ans = ans.rstrip('.') # 去掉 \text{} 包裹 ans = re.sub(r'\\text\{([^}]*)\}', r'\1', ans) # 去多余空格 ans = re.sub(r'\s+', ' ', ans) return ans def exact_match(pred_answer, gt_answer): """精确字符串匹配""" if pred_answer is None: return False pred_norm = normalize_answer(pred_answer) gt_norm = normalize_answer(gt_answer) if pred_norm is None or gt_norm is None: return False return pred_norm == gt_norm def load_test_data(test_file): """加载测试数据""" data = [] with open(test_file, 'r', encoding='utf-8') as f: for line in f: r = json.loads(line.strip()) data.append(r) print(f"Loaded {len(data)} test samples") return data def run_inference(model, processor, test_data, run_id): """对所有题目跑一次推理""" print(f"\n{'='*60}") print(f" Run {run_id + 1}/{NUM_RUNS}") print(f"{'='*60}") results = [] for i, item in enumerate(test_data): if i % 10 == 0: print(f" Processing {i}/{len(test_data)}...", flush=True) prompt_text = item['prompt'] image_path = os.path.join(IMAGE_DIR, f"{item['index']}.png") # 构建消息 messages = [{"role": "user", "content": []}] # 添加图片(如果存在) if os.path.exists(image_path): messages[0]["content"].append({ "type": "image", "image": f"file://{image_path}" }) messages[0]["content"].append({ "type": "text", "text": prompt_text }) # 处理输入 text = processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) from qwen_vl_utils import process_vision_info image_inputs, video_inputs = process_vision_info(messages) inputs = processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt" ).to(model.device) # 生成 with torch.no_grad(): output_ids = model.generate( **inputs, max_new_tokens=MAX_NEW_TOKENS, temperature=TEMPERATURE, do_sample=True, top_p=0.9, ) # 解码 (只取生成部分) input_len = inputs['input_ids'].shape[1] generated = output_ids[0][input_len:] prediction = processor.decode(generated, skip_special_tokens=True) # 提取答案 pred_answer = extract_boxed(prediction) gt_answer = item['ground_truth'] is_correct = exact_match(pred_answer, gt_answer) results.append({ 'index': item['index'], 'pred_answer': pred_answer, 'gt_answer': gt_answer, 'correct': is_correct, }) correct_count = sum(1 for r in results if r['correct']) print(f" Run {run_id + 1} accuracy: {correct_count}/{len(results)} " f"({100*correct_count/len(results):.1f}%)") return results def compute_variance(all_runs, test_data): """计算每道题的正确率方差""" n_questions = len(test_data) stats = [] for qi in range(n_questions): # 收集这道题在 10 次 run 中的对错情况 correct_flags = [all_runs[run_id][qi]['correct'] for run_id in range(NUM_RUNS)] n_correct = sum(correct_flags) p = n_correct / NUM_RUNS # 正确率 variance = p * (1 - p) # 伯努利方差 stats.append({ 'index': test_data[qi]['index'], 'category': test_data[qi].get('category', ''), 'ground_truth': test_data[qi]['ground_truth'], 'n_correct': n_correct, 'accuracy': p, 'variance': variance, 'correct_flags': correct_flags, 'pred_answers': [all_runs[run_id][qi]['pred_answer'] for run_id in range(NUM_RUNS)], }) # 按方差降序排列 stats.sort(key=lambda x: x['variance'], reverse=True) return stats def convert_to_training_format(question_item, copies=RLVR_COPIES): """将选中的题目转成 RLVR 训练 parquet 格式""" import pandas as pd prompt_text = question_item['prompt'] image_path = f"{question_item['index']}.png" # 构建 veRL 格式的 prompt prompt_messages = [{"role": "user", "content": prompt_text}] records = [] for _ in range(copies): records.append({ 'prompt': prompt_messages, 'answer': question_item['ground_truth'], 'image_path': image_path, 'data_source': 'deepscaler', 'category': question_item.get('category', 'Physics'), 'index': question_item['index'], }) df = pd.DataFrame(records) return df def main(): print("=" * 60) print(" Variance-based One-Shot Question Selection") print("=" * 60) # 1. 加载模型 print("\n[1/4] Loading SFT model...") from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor import os os.environ["CUDA_VISIBLE_DEVICES"] = "0" model = Qwen2_5_VLForConditionalGeneration.from_pretrained( MODEL_PATH, torch_dtype=torch.bfloat16, ).to("cuda") processor = AutoProcessor.from_pretrained(MODEL_PATH) model.eval() print(f" Model loaded: {sum(p.numel() for p in model.parameters())/1e6:.0f}M params") # 2. 加载测试数据 print("\n[2/4] Loading test data...") test_data = load_test_data(TEST_FILE) # 3. 跑 10 次推理 print(f"\n[3/4] Running {NUM_RUNS} inference passes on {len(test_data)} questions...") all_runs = [] for run_id in range(NUM_RUNS): run_results = run_inference(model, processor, test_data, run_id) all_runs.append(run_results) # 每次 run 后保存中间结果 interim_path = os.path.join(OUTPUT_DIR, f"variance_run_{run_id}.json") with open(interim_path, 'w', encoding='utf-8') as f: json.dump(run_results, f, ensure_ascii=False, indent=2) print(f" Saved interim results to {interim_path}") # 4. 计算方差并选题 print(f"\n[4/4] Computing variance and selecting best question...") stats = compute_variance(all_runs, test_data) # 保存完整统计 stats_path = os.path.join(OUTPUT_DIR, "variance_results.json") with open(stats_path, 'w', encoding='utf-8') as f: json.dump(stats, f, ensure_ascii=False, indent=2) print(f" Saved all variance stats to {stats_path}") # 打印 Top 20 最高方差题目 print(f"\n{'='*60}") print(f" TOP 20 HIGHEST VARIANCE QUESTIONS") print(f"{'='*60}") for i, s in enumerate(stats[:20]): print(f" #{i+1}: idx={s['index']} | gt={s['ground_truth'][:30]:30s} | " f"correct={s['n_correct']}/{NUM_RUNS} | var={s['variance']:.4f} | " f"cat={s['category']}") print(f" preds: {s['pred_answers'][:5]}") # 选方差最大的题 best = stats[0] print(f"\n{'='*60}") print(f" SELECTED QUESTION FOR ONE-SHOT RLVR") print(f"{'='*60}") print(f" Index: {best['index']}") print(f" Category: {best['category']}") print(f" Ground Truth: {best['ground_truth']}") print(f" Accuracy: {best['n_correct']}/{NUM_RUNS} ({best['accuracy']*100:.0f}%)") print(f" Variance: {best['variance']:.4f}") print(f" Pred Answers: {best['pred_answers']}") # 保存选中题目 best_idx = int(best['index']) best_item = None for item in test_data: if int(item['index']) == best_idx: best_item = item break best_path = os.path.join(OUTPUT_DIR, "best_question_for_rlvr.json") with open(best_path, 'w', encoding='utf-8') as f: json.dump({ 'selected_question': best_item, 'stats': best, }, f, ensure_ascii=False, indent=2) print(f" Saved best question to {best_path}") # 转成训练 parquet if best_item: df = convert_to_training_format(best_item) parquet_path = os.path.join(OUTPUT_DIR, "rlvr_train.parquet") df.to_parquet(parquet_path, index=False) print(f" Saved training parquet ({len(df)} rows) to {parquet_path}") print(f"\n{'='*60}") print(f" DONE!") print(f"{'='*60}") if __name__ == "__main__": main()