rl4phyx-backup / scripts /variance_select.py
YUNTA88's picture
Upload scripts/variance_select.py with huggingface_hub
d491800 verified
"""
Variance-based One-Shot Question Selection
===========================================
对 SFT 模型在 1533 道开放题上各跑 10 次推理,
用精确字符串匹配判断对错,选出方差最大的题目。
用法:
docker exec rl4phyx_env python3 /workspace/rl4phyx/RL4Phyx/SFT/variance_select.py
输出:
- variance_results.json: 每道题的正确率和方差
- best_question_for_rlvr.json: 方差最大的题目信息
- rlvr_train.parquet: 转好的训练数据 (1题 × 128行)
"""
import json
import re
import os
import torch
import numpy as np
from pathlib import Path
# ============ 配置 ============
MODEL_PATH = "/workspace/rl4phyx/RL4Phyx/SFT/checkpoints/sft_qwen25vl_3b/merged"
TEST_FILE = "/workspace/rl4phyx/RL4Phyx/SFT/sft_test.jsonl"
IMAGE_DIR = "/workspace/rl4phyx/RL4Phyx/SFT/images"
OUTPUT_DIR = "/workspace/rl4phyx/RL4Phyx/SFT"
NUM_RUNS = 10 # 每题推理次数
MAX_NEW_TOKENS = 1024 # 最大生成长度 (缩短加速,只需提取boxed答案)
TEMPERATURE = 0.7 # 采样温度
BATCH_SIZE = 4 # 推理 batch size (根据显存调)
RLVR_COPIES = 128 # one-shot 复制次数
def extract_boxed(text):
"""从模型输出中提取 \\boxed{} 内的答案"""
# 找最后一个 \boxed{}
matches = re.findall(r'\\boxed\{([^{}]*(?:\{[^{}]*\}[^{}]*)*)\}', text)
if matches:
return matches[-1].strip()
return None
def normalize_answer(ans):
"""轻度归一化:去空格、去末尾句号"""
if ans is None:
return None
ans = ans.strip()
ans = ans.rstrip('.')
# 去掉 \text{} 包裹
ans = re.sub(r'\\text\{([^}]*)\}', r'\1', ans)
# 去多余空格
ans = re.sub(r'\s+', ' ', ans)
return ans
def exact_match(pred_answer, gt_answer):
"""精确字符串匹配"""
if pred_answer is None:
return False
pred_norm = normalize_answer(pred_answer)
gt_norm = normalize_answer(gt_answer)
if pred_norm is None or gt_norm is None:
return False
return pred_norm == gt_norm
def load_test_data(test_file):
"""加载测试数据"""
data = []
with open(test_file, 'r', encoding='utf-8') as f:
for line in f:
r = json.loads(line.strip())
data.append(r)
print(f"Loaded {len(data)} test samples")
return data
def run_inference(model, processor, test_data, run_id):
"""对所有题目跑一次推理"""
print(f"\n{'='*60}")
print(f" Run {run_id + 1}/{NUM_RUNS}")
print(f"{'='*60}")
results = []
for i, item in enumerate(test_data):
if i % 10 == 0:
print(f" Processing {i}/{len(test_data)}...", flush=True)
prompt_text = item['prompt']
image_path = os.path.join(IMAGE_DIR, f"{item['index']}.png")
# 构建消息
messages = [{"role": "user", "content": []}]
# 添加图片(如果存在)
if os.path.exists(image_path):
messages[0]["content"].append({
"type": "image",
"image": f"file://{image_path}"
})
messages[0]["content"].append({
"type": "text",
"text": prompt_text
})
# 处理输入
text = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
from qwen_vl_utils import process_vision_info
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt"
).to(model.device)
# 生成
with torch.no_grad():
output_ids = model.generate(
**inputs,
max_new_tokens=MAX_NEW_TOKENS,
temperature=TEMPERATURE,
do_sample=True,
top_p=0.9,
)
# 解码 (只取生成部分)
input_len = inputs['input_ids'].shape[1]
generated = output_ids[0][input_len:]
prediction = processor.decode(generated, skip_special_tokens=True)
# 提取答案
pred_answer = extract_boxed(prediction)
gt_answer = item['ground_truth']
is_correct = exact_match(pred_answer, gt_answer)
results.append({
'index': item['index'],
'pred_answer': pred_answer,
'gt_answer': gt_answer,
'correct': is_correct,
})
correct_count = sum(1 for r in results if r['correct'])
print(f" Run {run_id + 1} accuracy: {correct_count}/{len(results)} "
f"({100*correct_count/len(results):.1f}%)")
return results
def compute_variance(all_runs, test_data):
"""计算每道题的正确率方差"""
n_questions = len(test_data)
stats = []
for qi in range(n_questions):
# 收集这道题在 10 次 run 中的对错情况
correct_flags = [all_runs[run_id][qi]['correct'] for run_id in range(NUM_RUNS)]
n_correct = sum(correct_flags)
p = n_correct / NUM_RUNS # 正确率
variance = p * (1 - p) # 伯努利方差
stats.append({
'index': test_data[qi]['index'],
'category': test_data[qi].get('category', ''),
'ground_truth': test_data[qi]['ground_truth'],
'n_correct': n_correct,
'accuracy': p,
'variance': variance,
'correct_flags': correct_flags,
'pred_answers': [all_runs[run_id][qi]['pred_answer'] for run_id in range(NUM_RUNS)],
})
# 按方差降序排列
stats.sort(key=lambda x: x['variance'], reverse=True)
return stats
def convert_to_training_format(question_item, copies=RLVR_COPIES):
"""将选中的题目转成 RLVR 训练 parquet 格式"""
import pandas as pd
prompt_text = question_item['prompt']
image_path = f"{question_item['index']}.png"
# 构建 veRL 格式的 prompt
prompt_messages = [{"role": "user", "content": prompt_text}]
records = []
for _ in range(copies):
records.append({
'prompt': prompt_messages,
'answer': question_item['ground_truth'],
'image_path': image_path,
'data_source': 'deepscaler',
'category': question_item.get('category', 'Physics'),
'index': question_item['index'],
})
df = pd.DataFrame(records)
return df
def main():
print("=" * 60)
print(" Variance-based One-Shot Question Selection")
print("=" * 60)
# 1. 加载模型
print("\n[1/4] Loading SFT model...")
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_PATH,
torch_dtype=torch.bfloat16,
).to("cuda")
processor = AutoProcessor.from_pretrained(MODEL_PATH)
model.eval()
print(f" Model loaded: {sum(p.numel() for p in model.parameters())/1e6:.0f}M params")
# 2. 加载测试数据
print("\n[2/4] Loading test data...")
test_data = load_test_data(TEST_FILE)
# 3. 跑 10 次推理
print(f"\n[3/4] Running {NUM_RUNS} inference passes on {len(test_data)} questions...")
all_runs = []
for run_id in range(NUM_RUNS):
run_results = run_inference(model, processor, test_data, run_id)
all_runs.append(run_results)
# 每次 run 后保存中间结果
interim_path = os.path.join(OUTPUT_DIR, f"variance_run_{run_id}.json")
with open(interim_path, 'w', encoding='utf-8') as f:
json.dump(run_results, f, ensure_ascii=False, indent=2)
print(f" Saved interim results to {interim_path}")
# 4. 计算方差并选题
print(f"\n[4/4] Computing variance and selecting best question...")
stats = compute_variance(all_runs, test_data)
# 保存完整统计
stats_path = os.path.join(OUTPUT_DIR, "variance_results.json")
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
print(f" Saved all variance stats to {stats_path}")
# 打印 Top 20 最高方差题目
print(f"\n{'='*60}")
print(f" TOP 20 HIGHEST VARIANCE QUESTIONS")
print(f"{'='*60}")
for i, s in enumerate(stats[:20]):
print(f" #{i+1}: idx={s['index']} | gt={s['ground_truth'][:30]:30s} | "
f"correct={s['n_correct']}/{NUM_RUNS} | var={s['variance']:.4f} | "
f"cat={s['category']}")
print(f" preds: {s['pred_answers'][:5]}")
# 选方差最大的题
best = stats[0]
print(f"\n{'='*60}")
print(f" SELECTED QUESTION FOR ONE-SHOT RLVR")
print(f"{'='*60}")
print(f" Index: {best['index']}")
print(f" Category: {best['category']}")
print(f" Ground Truth: {best['ground_truth']}")
print(f" Accuracy: {best['n_correct']}/{NUM_RUNS} ({best['accuracy']*100:.0f}%)")
print(f" Variance: {best['variance']:.4f}")
print(f" Pred Answers: {best['pred_answers']}")
# 保存选中题目
best_idx = int(best['index'])
best_item = None
for item in test_data:
if int(item['index']) == best_idx:
best_item = item
break
best_path = os.path.join(OUTPUT_DIR, "best_question_for_rlvr.json")
with open(best_path, 'w', encoding='utf-8') as f:
json.dump({
'selected_question': best_item,
'stats': best,
}, f, ensure_ascii=False, indent=2)
print(f" Saved best question to {best_path}")
# 转成训练 parquet
if best_item:
df = convert_to_training_format(best_item)
parquet_path = os.path.join(OUTPUT_DIR, "rlvr_train.parquet")
df.to_parquet(parquet_path, index=False)
print(f" Saved training parquet ({len(df)} rows) to {parquet_path}")
print(f"\n{'='*60}")
print(f" DONE!")
print(f"{'='*60}")
if __name__ == "__main__":
main()