| | |
| | import os |
| | import math |
| | import pandas as pd |
| | from tqdm import tqdm |
| | import torch |
| | from datasets import load_dataset |
| | from transformers import AutoModelForSequenceClassification, AutoTokenizer |
| | from sentence_transformers import CrossEncoder |
| | MODEL_NAME = "deeppin/Qwen3-Reranker-8B-SequenceClassification" |
| | DATA_PATH = "data/valid.parquet" |
| | BATCH_SIZE = 8 |
| | MAX_LENGTH = 8192 |
| | DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| |
|
| | def format_instruction(instruction, query, doc): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | output = f"<Instruct>: {instruction}\n<Query>: {query}\n<Document>: {doc}" |
| | return output |
| | import re |
| | import re |
| | _SYS_BLOCK = re.compile( |
| | r"<\|im_start\|\>\s*system\b.*?<\|im_end\|\>", re.IGNORECASE | re.DOTALL |
| | ) |
| | _TURN_BLOCK = re.compile( |
| | r"<\|im_start\|\>\s*(user|assistant)\b\s*(.*?)\s*<\|im_end\|\>", |
| | re.IGNORECASE | re.DOTALL, |
| | ) |
| | _ANY_CHATML_TAG = re.compile(r"<\|[^|]+?\|>") |
| |
|
| | _SYS = re.compile(r"<\|im_start\|\>\s*system\b(.*?)<\|im_end\|\>", re.I|re.S) |
| | _TURN = re.compile(r"<\|im_start\|\>\s*(user|assistant)\b(.*?)<\|im_end\|\>", re.I|re.S) |
| | _TAG = re.compile(r"<\|[^|]+?\|>") |
| |
|
| | _START = re.compile(r"<\|im_start\|\>\s*(system|user|assistant)\s*", re.IGNORECASE) |
| | _END = re.compile(r"<\|im_end\|\>", re.IGNORECASE) |
| | _ANY = re.compile(r"<\|[^|>]+?\|>", re.IGNORECASE) |
| | _THINK_BLOCK = re.compile(r"<think>.*?</think>", re.IGNORECASE | re.DOTALL) |
| |
|
| | def flatten_chatml(text: str, keep_think: bool = False, *, single_line: bool = False, sep: str = " ") -> str: |
| | if not isinstance(text, str): |
| | return "" |
| |
|
| | t = text.replace("\r\n", "\n") |
| | if not keep_think: |
| | t = _THINK_BLOCK.sub("", t) |
| |
|
| | t = _START.sub("", t) |
| | t = _END.sub("\n", t) |
| | t = _ANY.sub("", t) |
| |
|
| | |
| | t = re.sub(r"[ \t]*\n[ \t]*", "\n", t) |
| | t = re.sub(r"\n{3,}", "\n\n", t) |
| | t = t.strip() |
| |
|
| | if single_line: |
| | |
| | t = t.replace("\r", "\n") |
| | t = re.sub(r"[\n\u2028\u2029]+", sep, t) |
| | |
| | t = re.sub(r"[ \t\u00A0]{2,}", " ", t) |
| | t = re.sub(r"\s{2,}", " ", t) |
| | t = t.strip() |
| | return t |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | tokenizer = AutoTokenizer.from_pretrained( |
| | MODEL_NAME, |
| | padding_side="left", |
| | use_fast=False, |
| | trust_remote_code=True, |
| | ) |
| | tokenizer.truncation_side = "left" |
| | |
| | if tokenizer.pad_token_id is None: |
| | if tokenizer.eos_token_id is not None: |
| | tokenizer.pad_token = tokenizer.eos_token |
| | else: |
| | tokenizer.add_special_tokens({"pad_token": "<|endoftext|>"}) |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | model = AutoModelForSequenceClassification.from_pretrained( |
| | MODEL_NAME, torch_dtype=torch.float16, attn_implementation="flash_attention_2", |
| | trust_remote_code=True, |
| | ).to("cuda").eval() |
| | model.config.pad_token_id = tokenizer.pad_token_id |
| | TASK = "Given a roleplay prompt and recent context, score candidate replies higher when they stay in character, continue the scene coherently, and feel vivid and engaging." |
| |
|
| | |
| | df = pd.read_parquet(DATA_PATH) |
| | need_cols = ["chosen_prompt", "chosen", "reject"] |
| | for col in need_cols: |
| | if col not in df.columns: |
| | raise ValueError(f"缺少必要列:{col}") |
| |
|
| | def norm_text(x): |
| | if x is None or (isinstance(x, float) and math.isnan(x)): |
| | return "" |
| | return str(x).strip() |
| |
|
| | df = df[need_cols].copy() |
| | for col in need_cols: |
| | |
| | df[col] = df[col].map(lambda s: flatten_chatml(norm_text(s), single_line=True, sep="")) |
| |
|
| | |
| | mask = (df["chosen_prompt"].str.len()>0) & (df["chosen"].str.len()>0) & (df["reject"].str.len()>0) |
| | df = df[mask].reset_index(drop=True) |
| | total = len(df) |
| | if total == 0: |
| | raise ValueError("过滤后无有效样本。请检查数据内容。") |
| | print(f"[Info] 有效样本数: {total}") |
| |
|
| | |
| | correct = 0 |
| | seen = 0 |
| |
|
| | for idx, row in tqdm(df.iterrows(), total=len(df), desc="Scoring (per-sample)"): |
| | q_clean = row["chosen_prompt"] |
| | c_clean = row["chosen"] |
| | r_clean = row["reject"] |
| |
|
| | p1 = format_instruction(TASK, q_clean, c_clean) |
| | p2 = format_instruction(TASK, q_clean, r_clean) |
| |
|
| | enc = tokenizer([p1, p2], padding=True, truncation=True, max_length=MAX_LENGTH, return_tensors="pt") |
| | enc = {k: v.to(DEVICE) for k, v in enc.items()} |
| |
|
| | with torch.no_grad(): |
| | logits = model(**enc).logits.squeeze(-1) |
| |
|
| | l1, l2 = float(logits[0]), float(logits[1]) |
| | is_correct = (l1 > l2) |
| |
|
| | correct += int(is_correct) |
| | seen += 1 |
| | print(f"[{idx}] logits={[l1, l2]} | first>second={is_correct} | running_acc={correct/seen:.2%} ({correct}/{seen})") |
| |
|
| | print(f"\n[Result] Total={seen} | Correct={correct} | Accuracy={correct/seen:.2%}") |