rm_code / new.py
hahayang012's picture
Upload folder using huggingface_hub
d8a76be verified
# -*- coding: utf-8 -*-
import os, math, argparse, warnings
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoConfig, AutoModelForSequenceClassification
# ========== 评测核心:左填充编码 + 调你的 reward_model 推理 ==========
def build_left_padded_inputs(tokenizer, texts, max_length, device):
# 左填充 & pad token 兜底
tokenizer.padding_side = "left"
if tokenizer.pad_token_id is None:
if tokenizer.eos_token_id is not None:
tokenizer.pad_token = tokenizer.eos_token
else:
tokenizer.add_special_tokens({"pad_token": "<|pad|>"})
enc = tokenizer(
texts,
padding=True,
truncation=True,
max_length=max_length,
return_tensors="pt",
)
inputs = (
enc["input_ids"].to(device),
enc["attention_mask"].to(device),
)
return inputs
@torch.inference_mode()
def score_texts_last_token(reward_model, tokenizer, texts, max_length, device):
"""
返回 shape=[len(texts)] 的标量分数列表。
内部严格左填充,确保 [:, -1] 是最后一个非 pad token。
"""
inputs = build_left_padded_inputs(tokenizer, texts, max_length, device)
# 你的框架推理约定:
# hidden: [B, T, H]
hidden = reward_model.model(*inputs).last_hidden_state
# score_seq: [B, T] or [B, T, 1]
score_seq = reward_model.score(hidden)
if score_seq.dim() == 3 and score_seq.size(-1) == 1:
score_seq = score_seq.squeeze(-1)
# 取最后 token 的分数(与你训练/推理示例一致)
scores = score_seq[:, -1]
# 防 NaN
scores = torch.nan_to_num(scores, nan=-1e30)
return scores.detach().float().cpu().tolist()
# ========== 数据拼接 ==========
def join_prompt_answer(prompt, answer, joiner="\n"):
p = (prompt or "").rstrip()
a = (answer or "").rstrip()
return f"{p}{joiner}{a}"
# ========== 主流程 ==========
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--data_path", type=str, required=True,
help="包含列 chosen_prompt/chosen/reject 的 parquet 路径")
ap.add_argument("--batch_size", type=int, default=16)
ap.add_argument("--max_length", type=int, default=1024)
ap.add_argument("--joiner", type=str, default="")
# ---- TODO: 你自己加载 reward_model 与 tokenizer 的方式(例如 from your_pkg import ...)----
args = ap.parse_args()
if not os.path.exists(args.data_path):
raise FileNotFoundError(args.data_path)
df = pd.read_parquet(args.data_path)
for col in ["chosen_prompt", "chosen", "reject"]:
if col not in df.columns:
raise ValueError(f"缺列 `{col}`,实际列:{list(df.columns)}")
# ================== TODO:加载你们项目里的 reward_model 和 tokenizer ==================
# 例子(伪代码):根据你们训练框架来
# from your_framework import load_reward_model
# reward_model, tokenizer = load_reward_model("/path/to/your/rm")
# 这里给出占位:请替换为你自己的加载逻辑
# ======================================================================
reward_model = AutoModelForSequenceClassification.from_pretrained(
"/home/rm5.0_9e-6",
num_labels=1,
torch_dtype=torch.bfloat16,
use_cache=False,
)
tokenizer =AutoTokenizer.from_pretrained("/home/rm5.0_9e-6")
device = next(reward_model.parameters()).device
total = len(df)
correct = 0
seen = 0
print(f"Loaded {total} samples from {args.data_path}")
print("Start evaluating (pairwise chosen vs reject)...\n" + "-" * 70)
# 按 batch 处理:每样本 2 条文本(chosen/reject)
for start in range(0, total, args.batch_size):
end = min(start + args.batch_size, total)
batch = df.iloc[start:end]
pair_texts = []
for _, row in batch.iterrows():
pair_texts.append(join_prompt_answer(row["chosen_prompt"], row["chosen"], args.joiner)) # chosen
pair_texts.append(join_prompt_answer(row["chosen_prompt"], row["reject"], args.joiner)) # reject
# 批量打分(左填充,取最后一位)
scores = score_texts_last_token(
reward_model=reward_model,
tokenizer=tokenizer,
texts=pair_texts,
max_length=args.max_length,
device=device,
)
# 拆回每样本两个分数
for i, (_, row) in enumerate(batch.iterrows()):
chosen_score = float(scores[2 * i])
reject_score = float(scores[2 * i + 1])
seen += 1
is_correct = chosen_score > reject_score
correct += int(is_correct)
running_acc = correct / seen
print(
f"[{seen:6d}] "
f"Chosen={chosen_score:.6f} | Reject={reject_score:.6f} | "
f"Correct={is_correct} | RunningAcc={running_acc*100:.2f}%"
)
print("\n" + "-" * 70)
print(f"Finished. Total={seen}, Correct={correct}, FinalAcc={correct/seen*100:.2f}%")
if __name__ == "__main__":
main()