#!/usr/bin/env python3 """把 golden_set.csv (≈1000 条) 全部和 ruler 200 条做 cosine 相似度, 每条算 Top-K 最近的 ruler items,并把结果保存到本地。 用法: # 默认路径 python3 batch_top5_match.py # 自定义 python3 batch_top5_match.py \ --csv /mnt/.../aipf_golden_set.csv \ --ruler /mnt/.../ruler_items.json \ --model /mnt/.../Qwen3-Embedding-8B \ --output golden_top5.jsonl \ --top-k 5 \ --boundary-score 44.72 \ --cache-dir cache_emb \ --limit 50 # 先小跑 50 条 sanity check 输出: - {output}.jsonl 每行一条样本,含 task_id / label / Top-K 详情 / weighted_score / 预测 - {output}.summary.csv 按行汇总,便于在 Excel / pandas 里筛 - cache_emb/*.npy (可选)embedding 缓存,重跑时自动复用 """ import argparse import json import re import sys import time from pathlib import Path import numpy as np import pandas as pd import torch import torch.nn.functional as F from torch import Tensor from transformers import AutoTokenizer, AutoModel DEFAULT_MODEL = "/mnt/bn/tns-algo-ue-my/biaowu/WorkSpace/Models/Qwen3-Embedding-8B" DEFAULT_RULER = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/ranking_moderation/data/dm/youth_sexual_and_physical_abuse_aigt_v009/ranking_bucket/ruler_items.json" DEFAULT_CSV = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/example/yss_ruler_eval/data/aipf_golden_set.csv" # ---------- model utils ---------- def last_token_pool(h: Tensor, attn: Tensor) -> Tensor: if (attn[:, -1].sum() == attn.shape[0]): # left padding return h[:, -1] lens = attn.sum(dim=1) - 1 bsz = h.shape[0] return h[torch.arange(bsz, device=h.device), lens] @torch.no_grad() def encode(texts, tokenizer, model, max_length, batch_size, label="encode"): embs = [] n = len(texts) t0 = time.time() for i in range(0, n, batch_size): batch = texts[i:i + batch_size] d = tokenizer(batch, padding=True, truncation=True, max_length=max_length, return_tensors="pt").to(model.device) out = model(**d) e = last_token_pool(out.last_hidden_state, d["attention_mask"]) e = F.normalize(e, p=2, dim=1) embs.append(e.cpu().float()) del out, d, e if torch.cuda.is_available(): torch.cuda.empty_cache() done = min(i + batch_size, n) if done % (batch_size * 10) == 0 or done == n: elapsed = time.time() - t0 rate = done / max(elapsed, 1e-3) eta = (n - done) / max(rate, 1e-3) print(f" [{label}] {done}/{n} | {rate:.1f} ex/s | eta {eta:.0f}s", flush=True) return torch.cat(embs, dim=0).numpy() # ---------- data utils ---------- def load_ruler_items(path): with open(path, "r", encoding="utf-8") as f: data = json.load(f) items = data if isinstance(data, list) else ( data.get("items") or data.get("ruler_items") or data.get("data") or []) out = [] for it in items: inner = it.get("item", {}) if isinstance(it.get("item"), dict) else {} conv = inner.get("conv_text") or it.get("conv_text") or "" out.append({ "rank": it.get("rank"), "score": float(it.get("score", 0.0)), "item_id": str(it.get("item_id")), "text": conv, }) return out _M_PREFIX = re.compile(r"") def extract_conv(raw): """golden_set 的 text 里可能带 alias-age dict 前缀,这里只取 ... 之后的。""" if not isinstance(raw, str): return "" m = _M_PREFIX.search(raw) return raw[m.start():] if m else raw.strip() def load_csv(path, text_col, id_col, label_col, limit=None): df = pd.read_csv(path, keep_default_na=False) needed = [c for c in (id_col, label_col) if c not in df.columns] if needed: raise ValueError(f"missing columns: {needed}; available: {list(df.columns)}") if text_col not in df.columns: if "conv_text" in df.columns: text_col = "conv_text" else: raise ValueError("no text/conv_text column") if limit: df = df.head(limit).copy() rows = [] for _, r in df.iterrows(): rows.append({ "task_id": str(r[id_col]), "label": str(r[label_col]).strip().upper(), "raw_text": str(r[text_col]), "conv_text": extract_conv(r[text_col]), }) return rows # ---------- cache ---------- def cache_path(cache_dir, name, n_items, max_length): return Path(cache_dir) / f"{name}_n{n_items}_L{max_length}.npy" def encode_with_cache(texts, tokenizer, model, *, max_length, batch_size, cache_dir, name): if cache_dir: Path(cache_dir).mkdir(parents=True, exist_ok=True) p = cache_path(cache_dir, name, len(texts), max_length) if p.exists(): print(f" [{name}] using cached embeddings: {p}") return np.load(p) emb = encode(texts, tokenizer, model, max_length, batch_size, label=name) if cache_dir: np.save(p, emb) print(f" [{name}] saved cache: {p}") return emb # ---------- args ---------- def parse_args(): p = argparse.ArgumentParser() p.add_argument("--csv", default=DEFAULT_CSV) p.add_argument("--ruler", default=DEFAULT_RULER) p.add_argument("--model", default=DEFAULT_MODEL) p.add_argument("--output", default="golden_top5.jsonl") p.add_argument("--text-col", default="text") p.add_argument("--id-col", default="task_id") p.add_argument("--label-col", default="label") p.add_argument("--top-k", type=int, default=5) p.add_argument("--boundary-score", type=float, default=44.72, help="预测阈值,weighted_score >= 该值则 pred=1(默认从 pipeline.yaml 抄过来的 youth 类阈值)") p.add_argument("--max-length", type=int, default=4096) p.add_argument("--batch-size", type=int, default=4) p.add_argument("--cache-dir", default="cache_emb", help="embedding 缓存目录;设空字符串关闭缓存") p.add_argument("--limit", type=int, default=None, help="只跑前 N 条做 smoke test") p.add_argument("--cpu", action="store_true") p.add_argument("--no-flash-attn", action="store_true") return p.parse_args() def main(): args = parse_args() # 1) data print(f"[1/4] load csv: {args.csv}") rows = load_csv(args.csv, args.text_col, args.id_col, args.label_col, args.limit) print(f" -> {len(rows)} samples") print(f"[2/4] load ruler: {args.ruler}") ruler = load_ruler_items(args.ruler) print(f" -> {len(ruler)} ruler items") # 2) model print(f"[3/4] load model: {args.model}") device = "cpu" if args.cpu else ("cuda" if torch.cuda.is_available() else "cpu") print(f" device: {device}") mk = {} if device == "cuda": mk["torch_dtype"] = torch.float16 if not args.no_flash_attn: mk["attn_implementation"] = "flash_attention_2" tokenizer = AutoTokenizer.from_pretrained(args.model, padding_side="left") model = AutoModel.from_pretrained(args.model, **mk).to(device).eval() # 3) encode(分别缓存 csv 和 ruler) cd = args.cache_dir or None print(f"[4/4] encode (batch_size={args.batch_size}, max_length={args.max_length})") csv_emb = encode_with_cache([r["conv_text"] for r in rows], tokenizer, model, max_length=args.max_length, batch_size=args.batch_size, cache_dir=cd, name=f"csv_{Path(args.csv).stem}") ruler_emb = encode_with_cache([it["text"] for it in ruler], tokenizer, model, max_length=args.max_length, batch_size=args.batch_size, cache_dir=cd, name=f"ruler_{Path(args.ruler).parent.name}") # 4) sim matrix + Top-K sims = csv_emb @ ruler_emb.T # (N_csv, N_ruler) K = min(args.top_k, len(ruler)) # argpartition 找 K 个最大,再排序 top_idx_part = np.argpartition(-sims, K - 1, axis=1)[:, :K] # 在每行内按 sim 排序 row_arange = np.arange(sims.shape[0])[:, None] top_sims_part = sims[row_arange, top_idx_part] order = np.argsort(-top_sims_part, axis=1) top_idx = np.take_along_axis(top_idx_part, order, axis=1) top_sims = np.take_along_axis(top_sims_part, order, axis=1) # 5) 写 JSONL + summary out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) summary_rows = [] print(f"[write] {out_path}") with out_path.open("w", encoding="utf-8") as f: for i, row in enumerate(rows): topk = [] for j in range(K): idx = int(top_idx[i, j]) topk.append({ "rank": ruler[idx]["rank"], "score": ruler[idx]["score"], "sim": float(top_sims[i, j]), "item_id": ruler[idx]["item_id"], }) sims_arr = np.array([t["sim"] for t in topk], dtype=float) scores_arr = np.array([t["score"] for t in topk], dtype=float) wsim = float(sims_arr.sum()) weighted_score = float((sims_arr * scores_arr).sum() / wsim) if wsim > 0 else 0.0 top1_score = topk[0]["score"] pred = int(weighted_score >= args.boundary_score) gt = int(row["label"] == "Y") record = { "task_id": row["task_id"], "label": row["label"], "ground_truth": gt, "weighted_score": weighted_score, "top1_score": top1_score, "top1_sim": topk[0]["sim"], "top1_rank": topk[0]["rank"], "pred_by_weighted": pred, "topk": topk, } f.write(json.dumps(record, ensure_ascii=False) + "\n") summary_rows.append({ "task_id": row["task_id"], "label": row["label"], "ground_truth": gt, "weighted_score": round(weighted_score, 4), "top1_rank": topk[0]["rank"], "top1_score": round(top1_score, 4), "top1_sim": round(topk[0]["sim"], 4), "top1_item_id": topk[0]["item_id"], "pred_by_weighted": pred, }) summary_csv = out_path.with_suffix(".summary.csv") pd.DataFrame(summary_rows).to_csv(summary_csv, index=False) print(f"[write] {summary_csv}") # 6) 顺手算个总指标 sdf = pd.DataFrame(summary_rows) if "ground_truth" in sdf.columns and len(sdf): tp = int(((sdf.pred_by_weighted == 1) & (sdf.ground_truth == 1)).sum()) fp = int(((sdf.pred_by_weighted == 1) & (sdf.ground_truth == 0)).sum()) tn = int(((sdf.pred_by_weighted == 0) & (sdf.ground_truth == 0)).sum()) fn = int(((sdf.pred_by_weighted == 0) & (sdf.ground_truth == 1)).sum()) prec = tp / (tp + fp) if tp + fp else 0.0 rec = tp / (tp + fn) if tp + fn else 0.0 f1 = 2 * prec * rec / (prec + rec) if prec + rec else 0.0 print(f"\n[metrics @ weighted_score >= {args.boundary_score}]") print(f" TP={tp} FP={fp} TN={tn} FN={fn}") print(f" precision={prec:.4f} recall={rec:.4f} f1={f1:.4f}") if __name__ == "__main__": main()