import json from pathlib import Path import faiss import numpy as np from sentence_transformers import SentenceTransformer from data_io import load_pairs, read_jsonl def load_index(lang: str, alias: str): base = Path("artifacts/indexes") / alias idx_path = base / f"{lang}.faiss" meta_path = base / f"{lang}_meta.jsonl" index = faiss.read_index(str(idx_path)) meta = read_jsonl(str(meta_path)) pos_to_id = {int(x["pos"]): x["id"] for x in meta} return index, meta, pos_to_id def _stats_from_values(values): if not values: return { "mean": None, "median": None, "p10": None, "p90": None, } arr = np.array(values, dtype=float) return { "mean": float(np.mean(arr)), "median": float(np.median(arr)), "p10": float(np.percentile(arr, 10)), "p90": float(np.percentile(arr, 90)), } def metrics_from_ranks(ranks, ks=(1, 3, 5, 10)): out = {} for k in ks: hits = [1.0 if r is not None and r < k else 0.0 for r in ranks] hit_rate = float(np.mean(hits)) if ranks else 0.0 out[f"recall@{k}"] = hit_rate out[f"hit@{k}"] = hit_rate out[f"precision@{k}"] = float(np.mean([h / k for h in hits])) if ranks else 0.0 rr = [] dcg = [] for r in ranks: if r is None: rr.append(0.0) dcg.append(0.0) else: rr.append(1.0 / (r + 1.0)) dcg.append(1.0 / np.log2(r + 2.0)) out["mrr@10"] = float(np.mean(rr)) if rr else 0.0 out["ndcg@10"] = float(np.mean(dcg)) if dcg else 0.0 out["not_found_rate"] = float(np.mean([1.0 if r is None else 0.0 for r in ranks])) if ranks else 0.0 return out def eval_model(model_name: str, index_alias: str, test_path: str, top_k=10): model = SentenceTransformer(model_name) test = load_pairs(test_path) groups = { "ru": [x for x in test if x["lang"] == "ru"], "kz": [x for x in test if x["lang"] == "kz"], } results = { "model": model_name, "index_alias": index_alias, "test_path": test_path, "top_k": top_k, "by_lang": {}, } all_ranks = [] all_top1_scores = [] all_top1_scores_tp = [] all_top1_scores_fp = [] all_margins = [] all_coverage_ids = set() total_corpus_size = 0 for lang, items in groups.items(): if not items: results["by_lang"][lang] = {"count": 0} continue index, meta, pos_to_id = load_index(lang, index_alias) total_corpus_size += len(meta) queries = [x["query"] for x in items] q_emb = model.encode( queries, batch_size=64, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=True, ).astype(np.float32) scores, idxs = index.search(q_emb, top_k) ranks = [] top1_scores = [] top1_scores_tp = [] top1_scores_fp = [] margins = [] coverage_ids = set() for i, x in enumerate(items): target = x["positive_id"] found_rank = None top_scores = [float(s) for s in scores[i].tolist()] for r in range(top_k): pos = int(idxs[i, r]) did = pos_to_id.get(pos) if did is None: continue coverage_ids.add(did) if did == target: found_rank = r break ranks.append(found_rank) if top_scores: top1 = top_scores[0] top1_scores.append(top1) if found_rank == 0: top1_scores_tp.append(top1) else: top1_scores_fp.append(top1) if len(top_scores) >= 2: margins.append(top_scores[0] - top_scores[1]) all_ranks.extend(ranks) all_top1_scores.extend(top1_scores) all_top1_scores_tp.extend(top1_scores_tp) all_top1_scores_fp.extend(top1_scores_fp) all_margins.extend(margins) all_coverage_ids.update(coverage_ids) found_ranks_1based = [r + 1 for r in ranks if r is not None] rank_stats = _stats_from_values(found_ranks_1based) rank_stats.update( { "found_count": len(found_ranks_1based), "not_found_count": len(ranks) - len(found_ranks_1based), "not_found_rate": float(np.mean([1.0 if r is None else 0.0 for r in ranks])) if ranks else 0.0, } ) score_stats = _stats_from_values(top1_scores) margin_stats = _stats_from_values(margins) coverage = { "unique_ids": len(coverage_ids), "corpus_size": len(meta), "coverage_ratio": float(len(coverage_ids) / len(meta)) if meta else 0.0, } results["by_lang"][lang] = { "count": len(items), **metrics_from_ranks(ranks, ks=(1, 3, 5, 10)), "rank_stats": { "mean_rank": rank_stats["mean"], "median_rank": rank_stats["median"], "p10_rank": rank_stats["p10"], "p90_rank": rank_stats["p90"], "found_count": rank_stats["found_count"], "not_found_count": rank_stats["not_found_count"], "not_found_rate": rank_stats["not_found_rate"], }, "score_stats": { "top1_score": score_stats, "margin_top1_top2": margin_stats, }, "coverage": coverage, "distributions": { "ranks": [r if r is not None else -1 for r in ranks], "top1_scores": top1_scores, "top1_scores_tp": top1_scores_tp, "top1_scores_fp": top1_scores_fp, "margins": margins, }, } overall_found_ranks_1based = [r + 1 for r in all_ranks if r is not None] overall_rank_stats = _stats_from_values(overall_found_ranks_1based) overall_rank_stats.update( { "found_count": len(overall_found_ranks_1based), "not_found_count": len(all_ranks) - len(overall_found_ranks_1based), "not_found_rate": float(np.mean([1.0 if r is None else 0.0 for r in all_ranks])) if all_ranks else 0.0, } ) overall_score_stats = _stats_from_values(all_top1_scores) overall_margin_stats = _stats_from_values(all_margins) overall_coverage = { "unique_ids": len(all_coverage_ids), "corpus_size": total_corpus_size, "coverage_ratio": float(len(all_coverage_ids) / total_corpus_size) if total_corpus_size else 0.0, } results["overall"] = { "count": len(all_ranks), **metrics_from_ranks(all_ranks, ks=(1, 3, 5, 10)), "rank_stats": { "mean_rank": overall_rank_stats["mean"], "median_rank": overall_rank_stats["median"], "p10_rank": overall_rank_stats["p10"], "p90_rank": overall_rank_stats["p90"], "found_count": overall_rank_stats["found_count"], "not_found_count": overall_rank_stats["not_found_count"], "not_found_rate": overall_rank_stats["not_found_rate"], }, "score_stats": { "top1_score": overall_score_stats, "margin_top1_top2": overall_margin_stats, }, "coverage": overall_coverage, "distributions": { "ranks": [r if r is not None else -1 for r in all_ranks], "top1_scores": all_top1_scores, "top1_scores_tp": all_top1_scores_tp, "top1_scores_fp": all_top1_scores_fp, "margins": all_margins, }, } return results def main(): test_path = "data/legal_assistant_test.jsonl" models = [ ("mpnet_base", "paraphrase-multilingual-mpnet-base-v2"), ("labse", "sentence-transformers/LaBSE"), ] finetuned_dir = Path("artifacts/models/finetuned_mpnet") if finetuned_dir.exists(): models.append(("finetuned", str(finetuned_dir))) out_dir = Path("artifacts/reports") out_dir.mkdir(parents=True, exist_ok=True) for alias, model_name in models: r = eval_model(model_name, alias, test_path, top_k=10) (out_dir / f"eval_{alias}.json").write_text( json.dumps(r, ensure_ascii=False, indent=2), encoding="utf-8", ) if __name__ == "__main__": main()