#!/usr/bin/env python # -*- coding: utf-8 -*- """ 并行评测脚本:针对 Qwen2.5-7B-Math(或你微调后的 HF 格式权重) 单卡示例: python simple_valid.py \ --model_path /pfs/lichenyi/work/finetune_output_train1/checkpoint-300 \ --data_path /pfs/lichenyi/work/evaluation/valid.json \ --dtype bf16 \ --use_system \ --temperature 0.0 多卡示例(4 卡): torchrun --nproc_per_node 4 simple_valid.py \ --model_path /pfs/lichenyi/work/finetune_output_train1/checkpoint-300 \ --data_path /pfs/lichenyi/work/evaluation/valid.json \ --dtype bf16 \ --use_system \ --temperature 0.0 若不显式传 --out_path,将自动写入: /pfs/lichenyi/work/evaluation/predictions/predictions_.json 例如: /pfs/lichenyi/work/evaluation/predictions/predictions_checkpoint-300.json """ import argparse import json import os from typing import List, Dict, Any import torch import torch.distributed as dist from tqdm import tqdm from transformers import AutoModelForCausalLM, AutoTokenizer # ===================== 模型与 tokenizer ===================== def load_model( model_path: str, load_in_8bit: bool, load_in_4bit: bool, dtype: str, device_map="auto", ): kwargs = {} if load_in_4bit: # 4bit 量化 kwargs.update(dict(load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16)) elif load_in_8bit: # 8bit 量化 kwargs.update(dict(load_in_8bit=True)) else: # bf16 优先,否则 fp16 if dtype == "bf16" and torch.cuda.is_available(): kwargs.update(dict(dtype=torch.bfloat16)) else: kwargs.update(dict(dtype=torch.float16)) if torch.cuda.is_available(): # TF32 一般能加速 matmul,精度对推理影响很小 torch.backends.cuda.matmul.allow_tf32 = True model = AutoModelForCausalLM.from_pretrained( model_path, device_map=device_map, trust_remote_code=True, **kwargs, ) model.eval() tokenizer = AutoTokenizer.from_pretrained( model_path, trust_remote_code=True, use_fast=True, ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token return model, tokenizer # ===================== 文本处理工具 ===================== def canonicalize_human(value: str) -> str: # 你的数据里 human 的 value 末尾常带 ":::" return value.split(":::")[0].strip() def decode_only_new(gen_ids: torch.Tensor, prompt_len: int, tokenizer) -> str: new_tokens = gen_ids[0, prompt_len:] text = tokenizer.decode(new_tokens, skip_special_tokens=False) # 先按若干结束标记截断 stop_markers = [] # tokenizer 自己的 eos_token 也算一个 if getattr(tokenizer, "eos_token", None): stop_markers.append(tokenizer.eos_token) # 常见的几种形式,按需补充 stop_markers.extend([ "<|im_end|>", "<|endoftext|>", "", ]) for m in stop_markers: if m and m in text: text = text.split(m)[0] break # 再做一次“只取首个非空段落”的裁剪(可选) lines = text.splitlines() block = [] for ln in lines: if ln.strip() == "": break block.append(ln) text = "\n".join(block).strip() return text def build_model_inputs(messages, tokenizer, device): """ 兼容有/没有 chat_template 的 Qwen2.5-7B-Math: - 优先用 tokenizer.apply_chat_template - 如果你的 Math 模型没带 chat_template,则退化为简单字符串拼接 + tokenizer() """ # 优先尝试 chat 模板 try: model_inputs = tokenizer.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_tensors="pt", ) model_inputs = {k: v.to(device) for k, v in model_inputs.items()} return model_inputs except Exception: # 没有 chat_template 的 fallback:按你需要的格式自定义 # 可以根据你微调时的 prompt 格式进行修改 text_parts = [] for m in messages: role = m["role"] content = m["content"] if role == "system": text_parts.append(f"[SYSTEM]\n{content}\n") elif role == "user": text_parts.append(f"[USER]\n{content}\n") # 让模型继续扮演 “assistant” text = "\n".join(text_parts) + "\n[ASSISTANT]\n" enc = tokenizer( text, return_tensors="pt", ) enc = {k: v.to(device) for k, v in enc.items()} return enc # ===================== 分布式相关 ===================== def setup_distributed(): """ 如果用 torchrun 启动,就初始化分布式;否则退化为单进程。 返回: (distributed, rank, world_size, local_rank) """ world_size = int(os.environ.get("WORLD_SIZE", "1")) distributed = world_size > 1 if not distributed: return False, 0, 1, 0 dist.init_process_group(backend="nccl") rank = dist.get_rank() world_size = dist.get_world_size() local_rank = int(os.environ.get("LOCAL_RANK", 0)) torch.cuda.set_device(local_rank) return True, rank, world_size, local_rank # ===================== 主逻辑 ===================== def main(): ap = argparse.ArgumentParser() ap.add_argument("--model_path", type=str, required=True, help="本地模型路径或HF模型名,如 /pfs/.../Qwen2.5-7B-Math") ap.add_argument("--data_path", type=str, required=True, help="测试集 JSON 路径") ap.add_argument( "--out_path", type=str, default="", help="输出预测 JSON 路径;留空则自动根据 model_path 生成", ) ap.add_argument("--max_new_tokens", type=int, default=128) ap.add_argument("--temperature", type=float, default=0.1) ap.add_argument("--top_p", type=float, default=0.95) ap.add_argument("--load_in_8bit", action="store_true") ap.add_argument("--load_in_4bit", action="store_true") ap.add_argument("--dtype", choices=["bf16", "fp16"], default="bf16") ap.add_argument("--use_system", action="store_true", help="把样本里的 system 也塞到对话中") args = ap.parse_args() # ---------- 分布式设置 ---------- distributed, rank, world_size, local_rank = setup_distributed() if distributed and rank == 0: print(f"[INFO] Distributed inference, world_size={world_size}") if distributed: # 每个进程只用一张 GPU device_map = {"": local_rank} else: device_map = "auto" # 单进程可用 auto # ---------- 自动决定 out_path ---------- if args.out_path: out_path = args.out_path os.makedirs(out_path, exist_ok=True) # 用 model_path 的 basename 当后缀,例如 checkpoint-300 base_name = os.path.basename(os.path.normpath(args.model_path)) if not base_name: # 防止尾部斜杠导致空字符串 base_name = os.path.basename(args.model_path.rstrip("/")) out_path = os.path.join( args.out_path, f"predictions_{base_name}.json", ) else: # 目标目录固定为 /pfs/lichenyi/work/evaluation/predictions base_out_dir = "/pfs/lichenyi/work/evaluation/predictions" os.makedirs(base_out_dir, exist_ok=True) # 用 model_path 的 basename 当后缀,例如 checkpoint-300 base_name = os.path.basename(os.path.normpath(args.model_path)) if not base_name: # 防止尾部斜杠导致空字符串 base_name = os.path.basename(args.model_path.rstrip("/")) out_path = os.path.join( base_out_dir, f"predictions_{base_name}.json", ) if rank == 0: print(f"[INFO] Output path: {out_path}") # ---------- 加载模型 ---------- if rank == 0: print(f"[INFO] Loading model from {args.model_path} ...") model, tokenizer = load_model( args.model_path, args.load_in_8bit, args.load_in_4bit, args.dtype, device_map=device_map, ) # ====== 构造统一的 eos_token_id(支持多种结束 token)====== extra_eos_tokens = ["<|im_end|>", "<|endoftext|>", ""] eos_ids = set() if getattr(tokenizer, "eos_token_id", None) is not None: if isinstance(tokenizer.eos_token_id, int): eos_ids.add(tokenizer.eos_token_id) else: eos_ids.update(tokenizer.eos_token_id) vocab = tokenizer.get_vocab() for tok in extra_eos_tokens: if tok in vocab: eos_ids.add(vocab[tok]) if len(eos_ids) == 0: eos_token_id = None elif len(eos_ids) == 1: eos_token_id = next(iter(eos_ids)) else: # transformers 支持传 list,表示多个 EOS eos_token_id = list(eos_ids) # ---------- 加载数据 ---------- if rank == 0: print(f"[INFO] Loading dataset from {args.data_path} ...") with open(args.data_path, "r", encoding="utf-8") as f: dataset: List[Dict[str, Any]] = json.load(f) num_samples = len(dataset) # ---------- 划分数据:每个 rank 处理一部分 ---------- indices = list(range(rank, num_samples, world_size)) if rank == 0: iter_indices = tqdm(indices, desc="Running inference") else: iter_indices = indices results = [] for idx in iter_indices: item = dataset[idx] # 取 system(可选)和 human system_text = item.get("system", "").strip() prompt_text = "" gt_text = "" # 找到 human 和 gpt(gpt.value 作为真解) for turn in item.get("conversations", []): if turn.get("from") == "human": prompt_text = canonicalize_human(turn.get("value", "")) elif turn.get("from") == "gpt": gt_text = turn.get("value", "").strip() # 组装 chat messages messages = [] if args.use_system and system_text: messages.append({"role": "system", "content": system_text}) messages.append({"role": "user", "content": prompt_text}) # 构造模型输入(兼容有/无 chat_template 的 Qwen2.5-7B-Math) model_inputs = build_model_inputs(messages, tokenizer, model.device) gen_kwargs = dict( max_new_tokens=args.max_new_tokens, do_sample=args.temperature > 0, temperature=args.temperature, top_p=args.top_p, pad_token_id=tokenizer.pad_token_id, ) if eos_token_id is not None: gen_kwargs["eos_token_id"] = eos_token_id with torch.no_grad(): output_ids = model.generate( **model_inputs, **gen_kwargs, ) prompt_len = model_inputs["input_ids"].shape[-1] pred = decode_only_new(output_ids, prompt_len, tokenizer) results.append({ "id": idx, # 用全局下标,方便合并排序 "system": system_text if args.use_system else "", "prompt": prompt_text, "ground_truth": gt_text, # 真解(gpt.value) "model_output": pred # 模型生成 }) # ---------- 汇总 & 写结果 ---------- if distributed: # 收集所有 rank 的结果 all_results = [None for _ in range(world_size)] dist.all_gather_object(all_results, results) if rank == 0: merged = [] for part in all_results: merged.extend(part) merged.sort(key=lambda x: x["id"]) with open(out_path, "w", encoding="utf-8") as f: json.dump(merged, f, ensure_ascii=False, indent=2) print(f"[OK] 写入 {out_path} (共 {len(merged)} 条)") dist.barrier() dist.destroy_process_group() else: with open(out_path, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"[OK] 写入 {out_path} (共 {len(results)} 条)") if __name__ == "__main__": main()