lexir / src /evaluate.py
irinaqqq's picture
ADDED MORE GPAPHS
c6cece9
import json
from pathlib import Path
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from data_io import load_pairs, read_jsonl
def load_index(lang: str, alias: str):
base = Path("artifacts/indexes") / alias
idx_path = base / f"{lang}.faiss"
meta_path = base / f"{lang}_meta.jsonl"
index = faiss.read_index(str(idx_path))
meta = read_jsonl(str(meta_path))
pos_to_id = {int(x["pos"]): x["id"] for x in meta}
return index, meta, pos_to_id
def _stats_from_values(values):
if not values:
return {
"mean": None,
"median": None,
"p10": None,
"p90": None,
}
arr = np.array(values, dtype=float)
return {
"mean": float(np.mean(arr)),
"median": float(np.median(arr)),
"p10": float(np.percentile(arr, 10)),
"p90": float(np.percentile(arr, 90)),
}
def metrics_from_ranks(ranks, ks=(1, 3, 5, 10)):
out = {}
for k in ks:
hits = [1.0 if r is not None and r < k else 0.0 for r in ranks]
hit_rate = float(np.mean(hits)) if ranks else 0.0
out[f"recall@{k}"] = hit_rate
out[f"hit@{k}"] = hit_rate
out[f"precision@{k}"] = float(np.mean([h / k for h in hits])) if ranks else 0.0
rr = []
dcg = []
for r in ranks:
if r is None:
rr.append(0.0)
dcg.append(0.0)
else:
rr.append(1.0 / (r + 1.0))
dcg.append(1.0 / np.log2(r + 2.0))
out["mrr@10"] = float(np.mean(rr)) if rr else 0.0
out["ndcg@10"] = float(np.mean(dcg)) if dcg else 0.0
out["not_found_rate"] = float(np.mean([1.0 if r is None else 0.0 for r in ranks])) if ranks else 0.0
return out
def eval_model(model_name: str, index_alias: str, test_path: str, top_k=10):
model = SentenceTransformer(model_name)
test = load_pairs(test_path)
groups = {
"ru": [x for x in test if x["lang"] == "ru"],
"kz": [x for x in test if x["lang"] == "kz"],
}
results = {
"model": model_name,
"index_alias": index_alias,
"test_path": test_path,
"top_k": top_k,
"by_lang": {},
}
all_ranks = []
all_top1_scores = []
all_top1_scores_tp = []
all_top1_scores_fp = []
all_margins = []
all_coverage_ids = set()
total_corpus_size = 0
for lang, items in groups.items():
if not items:
results["by_lang"][lang] = {"count": 0}
continue
index, meta, pos_to_id = load_index(lang, index_alias)
total_corpus_size += len(meta)
queries = [x["query"] for x in items]
q_emb = model.encode(
queries,
batch_size=64,
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=True,
).astype(np.float32)
scores, idxs = index.search(q_emb, top_k)
ranks = []
top1_scores = []
top1_scores_tp = []
top1_scores_fp = []
margins = []
coverage_ids = set()
for i, x in enumerate(items):
target = x["positive_id"]
found_rank = None
top_scores = [float(s) for s in scores[i].tolist()]
for r in range(top_k):
pos = int(idxs[i, r])
did = pos_to_id.get(pos)
if did is None:
continue
coverage_ids.add(did)
if did == target:
found_rank = r
break
ranks.append(found_rank)
if top_scores:
top1 = top_scores[0]
top1_scores.append(top1)
if found_rank == 0:
top1_scores_tp.append(top1)
else:
top1_scores_fp.append(top1)
if len(top_scores) >= 2:
margins.append(top_scores[0] - top_scores[1])
all_ranks.extend(ranks)
all_top1_scores.extend(top1_scores)
all_top1_scores_tp.extend(top1_scores_tp)
all_top1_scores_fp.extend(top1_scores_fp)
all_margins.extend(margins)
all_coverage_ids.update(coverage_ids)
found_ranks_1based = [r + 1 for r in ranks if r is not None]
rank_stats = _stats_from_values(found_ranks_1based)
rank_stats.update(
{
"found_count": len(found_ranks_1based),
"not_found_count": len(ranks) - len(found_ranks_1based),
"not_found_rate": float(np.mean([1.0 if r is None else 0.0 for r in ranks])) if ranks else 0.0,
}
)
score_stats = _stats_from_values(top1_scores)
margin_stats = _stats_from_values(margins)
coverage = {
"unique_ids": len(coverage_ids),
"corpus_size": len(meta),
"coverage_ratio": float(len(coverage_ids) / len(meta)) if meta else 0.0,
}
results["by_lang"][lang] = {
"count": len(items),
**metrics_from_ranks(ranks, ks=(1, 3, 5, 10)),
"rank_stats": {
"mean_rank": rank_stats["mean"],
"median_rank": rank_stats["median"],
"p10_rank": rank_stats["p10"],
"p90_rank": rank_stats["p90"],
"found_count": rank_stats["found_count"],
"not_found_count": rank_stats["not_found_count"],
"not_found_rate": rank_stats["not_found_rate"],
},
"score_stats": {
"top1_score": score_stats,
"margin_top1_top2": margin_stats,
},
"coverage": coverage,
"distributions": {
"ranks": [r if r is not None else -1 for r in ranks],
"top1_scores": top1_scores,
"top1_scores_tp": top1_scores_tp,
"top1_scores_fp": top1_scores_fp,
"margins": margins,
},
}
overall_found_ranks_1based = [r + 1 for r in all_ranks if r is not None]
overall_rank_stats = _stats_from_values(overall_found_ranks_1based)
overall_rank_stats.update(
{
"found_count": len(overall_found_ranks_1based),
"not_found_count": len(all_ranks) - len(overall_found_ranks_1based),
"not_found_rate": float(np.mean([1.0 if r is None else 0.0 for r in all_ranks])) if all_ranks else 0.0,
}
)
overall_score_stats = _stats_from_values(all_top1_scores)
overall_margin_stats = _stats_from_values(all_margins)
overall_coverage = {
"unique_ids": len(all_coverage_ids),
"corpus_size": total_corpus_size,
"coverage_ratio": float(len(all_coverage_ids) / total_corpus_size) if total_corpus_size else 0.0,
}
results["overall"] = {
"count": len(all_ranks),
**metrics_from_ranks(all_ranks, ks=(1, 3, 5, 10)),
"rank_stats": {
"mean_rank": overall_rank_stats["mean"],
"median_rank": overall_rank_stats["median"],
"p10_rank": overall_rank_stats["p10"],
"p90_rank": overall_rank_stats["p90"],
"found_count": overall_rank_stats["found_count"],
"not_found_count": overall_rank_stats["not_found_count"],
"not_found_rate": overall_rank_stats["not_found_rate"],
},
"score_stats": {
"top1_score": overall_score_stats,
"margin_top1_top2": overall_margin_stats,
},
"coverage": overall_coverage,
"distributions": {
"ranks": [r if r is not None else -1 for r in all_ranks],
"top1_scores": all_top1_scores,
"top1_scores_tp": all_top1_scores_tp,
"top1_scores_fp": all_top1_scores_fp,
"margins": all_margins,
},
}
return results
def main():
test_path = "data/legal_assistant_test.jsonl"
models = [
("mpnet_base", "paraphrase-multilingual-mpnet-base-v2"),
("labse", "sentence-transformers/LaBSE"),
]
finetuned_dir = Path("artifacts/models/finetuned_mpnet")
if finetuned_dir.exists():
models.append(("finetuned", str(finetuned_dir)))
out_dir = Path("artifacts/reports")
out_dir.mkdir(parents=True, exist_ok=True)
for alias, model_name in models:
r = eval_model(model_name, alias, test_path, top_k=10)
(out_dir / f"eval_{alias}.json").write_text(
json.dumps(r, ensure_ascii=False, indent=2),
encoding="utf-8",
)
if __name__ == "__main__":
main()