# -*- coding: utf-8 -*- import os, math, argparse, warnings import pandas as pd import numpy as np import torch from transformers import AutoTokenizer, AutoConfig, AutoModelForSequenceClassification # ========== 评测核心:左填充编码 + 调你的 reward_model 推理 ========== def build_left_padded_inputs(tokenizer, texts, max_length, device): # 左填充 & pad token 兜底 tokenizer.padding_side = "left" if tokenizer.pad_token_id is None: if tokenizer.eos_token_id is not None: tokenizer.pad_token = tokenizer.eos_token else: tokenizer.add_special_tokens({"pad_token": "<|pad|>"}) enc = tokenizer( texts, padding=True, truncation=True, max_length=max_length, return_tensors="pt", ) inputs = ( enc["input_ids"].to(device), enc["attention_mask"].to(device), ) return inputs @torch.inference_mode() def score_texts_last_token(reward_model, tokenizer, texts, max_length, device): """ 返回 shape=[len(texts)] 的标量分数列表。 内部严格左填充,确保 [:, -1] 是最后一个非 pad token。 """ inputs = build_left_padded_inputs(tokenizer, texts, max_length, device) # 你的框架推理约定: # hidden: [B, T, H] hidden = reward_model.model(*inputs).last_hidden_state # score_seq: [B, T] or [B, T, 1] score_seq = reward_model.score(hidden) if score_seq.dim() == 3 and score_seq.size(-1) == 1: score_seq = score_seq.squeeze(-1) # 取最后 token 的分数(与你训练/推理示例一致) scores = score_seq[:, -1] # 防 NaN scores = torch.nan_to_num(scores, nan=-1e30) return scores.detach().float().cpu().tolist() # ========== 数据拼接 ========== def join_prompt_answer(prompt, answer, joiner="\n"): p = (prompt or "").rstrip() a = (answer or "").rstrip() return f"{p}{joiner}{a}" # ========== 主流程 ========== def main(): ap = argparse.ArgumentParser() ap.add_argument("--data_path", type=str, required=True, help="包含列 chosen_prompt/chosen/reject 的 parquet 路径") ap.add_argument("--batch_size", type=int, default=16) ap.add_argument("--max_length", type=int, default=1024) ap.add_argument("--joiner", type=str, default="") # ---- TODO: 你自己加载 reward_model 与 tokenizer 的方式(例如 from your_pkg import ...)---- args = ap.parse_args() if not os.path.exists(args.data_path): raise FileNotFoundError(args.data_path) df = pd.read_parquet(args.data_path) for col in ["chosen_prompt", "chosen", "reject"]: if col not in df.columns: raise ValueError(f"缺列 `{col}`,实际列:{list(df.columns)}") # ================== TODO:加载你们项目里的 reward_model 和 tokenizer ================== # 例子(伪代码):根据你们训练框架来 # from your_framework import load_reward_model # reward_model, tokenizer = load_reward_model("/path/to/your/rm") # 这里给出占位:请替换为你自己的加载逻辑 # ====================================================================== reward_model = AutoModelForSequenceClassification.from_pretrained( "/home/rm5.0_9e-6", num_labels=1, torch_dtype=torch.bfloat16, use_cache=False, ) tokenizer =AutoTokenizer.from_pretrained("/home/rm5.0_9e-6") device = next(reward_model.parameters()).device total = len(df) correct = 0 seen = 0 print(f"Loaded {total} samples from {args.data_path}") print("Start evaluating (pairwise chosen vs reject)...\n" + "-" * 70) # 按 batch 处理:每样本 2 条文本(chosen/reject) for start in range(0, total, args.batch_size): end = min(start + args.batch_size, total) batch = df.iloc[start:end] pair_texts = [] for _, row in batch.iterrows(): pair_texts.append(join_prompt_answer(row["chosen_prompt"], row["chosen"], args.joiner)) # chosen pair_texts.append(join_prompt_answer(row["chosen_prompt"], row["reject"], args.joiner)) # reject # 批量打分(左填充,取最后一位) scores = score_texts_last_token( reward_model=reward_model, tokenizer=tokenizer, texts=pair_texts, max_length=args.max_length, device=device, ) # 拆回每样本两个分数 for i, (_, row) in enumerate(batch.iterrows()): chosen_score = float(scores[2 * i]) reject_score = float(scores[2 * i + 1]) seen += 1 is_correct = chosen_score > reject_score correct += int(is_correct) running_acc = correct / seen print( f"[{seen:6d}] " f"Chosen={chosen_score:.6f} | Reject={reject_score:.6f} | " f"Correct={is_correct} | RunningAcc={running_acc*100:.2f}%" ) print("\n" + "-" * 70) print(f"Finished. Total={seen}, Correct={correct}, FinalAcc={correct/seen*100:.2f}%") if __name__ == "__main__": main()