Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| rag_eval_metrics.py | |
| Evaluate RAG retrieval quality by comparing app logs (JSONL) with a gold file (CSV). | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Any, Optional | |
| import pandas as pd | |
| import numpy as np | |
| # ----------------------------- Small Utils ----------------------------- # | |
| def filename_key(s: str) -> str: | |
| s = (s or "").strip().replace("\\", "/").split("/")[-1] | |
| return s.casefold() | |
| def re_split_sc(s: str) -> List[str]: | |
| import re | |
| return re.split(r"[;,]", s) | |
| def _pick_last_non_empty(hit_lists) -> List[dict]: | |
| """ | |
| Robustly select the last non-empty hits list from a pandas Series or iterable. | |
| This fixes the KeyError that happens when using reversed() directly on a Series | |
| with a non-range index. | |
| """ | |
| # Convert pandas Series or other iterables to a plain Python list | |
| try: | |
| values = list(hit_lists.tolist()) | |
| except AttributeError: | |
| values = list(hit_lists) | |
| # Walk from last to first, return first non-empty list-like | |
| for lst in reversed(values): | |
| if isinstance(lst, (list, tuple)) and len(lst) > 0: | |
| return lst | |
| # If everything was empty / NaN | |
| return [] | |
| # ----------------------------- IO Helpers ----------------------------- # | |
| def read_logs(jsonl_path: Path) -> pd.DataFrame: | |
| rows = [] | |
| if (not jsonl_path.exists()) or jsonl_path.stat().st_size == 0: | |
| return pd.DataFrame(columns=["question", "hits"]) | |
| with open(jsonl_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| rec = json.loads(line) | |
| except Exception: | |
| continue | |
| # Extract question | |
| q = (((rec.get("inputs") or {}).get("question")) or "").strip() | |
| # Extract retrieval hits (if present) | |
| retr = (rec.get("retrieval") or {}) | |
| hits = retr.get("hits", []) | |
| norm_hits = [] | |
| for h in hits or []: | |
| doc = (h.get("doc") or "").strip() | |
| page = str(h.get("page") or "").strip() | |
| # Normalize page to int or None | |
| try: | |
| page_int = int(page) | |
| except Exception: | |
| page_int = None | |
| norm_hits.append({"doc": doc, "page": page_int}) | |
| rows.append({"question": q, "hits": norm_hits}) | |
| df = pd.DataFrame(rows) | |
| if df.empty: | |
| return pd.DataFrame(columns=["question", "hits"]) | |
| # Group by normalized question text and keep last non-empty hits list per question | |
| df = ( | |
| df.groupby(df["question"].astype(str).str.casefold().str.strip(), as_index=False) | |
| .agg({"question": "last", "hits": _pick_last_non_empty}) | |
| ) | |
| return df | |
| def read_gold(csv_path: Path) -> pd.DataFrame: | |
| df = pd.read_csv(csv_path) | |
| cols = {c.lower().strip(): c for c in df.columns} | |
| # --- question column --- | |
| q_col = None | |
| for cand in ["question", "query", "q"]: | |
| if cand in cols: | |
| q_col = cols[cand] | |
| break | |
| if q_col is None: | |
| raise ValueError("Gold CSV must contain a 'question' column (case-insensitive).") | |
| # --- possible relevant_docs (list-in-cell) column --- | |
| rel_list_col = None | |
| for cand in ["relevant_docs", "relevant", "docs"]: | |
| if cand in cols: | |
| rel_list_col = cols[cand] | |
| break | |
| # --- single-doc-per-row column --- | |
| doc_col = None | |
| for cand in ["doc", "document", "file", "doc_name"]: | |
| if cand in cols: | |
| doc_col = cols[cand] | |
| break | |
| # --- optional page column --- | |
| page_col = None | |
| for cand in ["page", "page_num", "page_number"]: | |
| if cand in cols: | |
| page_col = cols[cand] | |
| break | |
| rows = [] | |
| # Case 1: relevant_docs list column (no explicit doc_col) | |
| if rel_list_col and doc_col is None: | |
| for _, r in df.iterrows(): | |
| q_raw = str(r[q_col]).strip() | |
| q_norm = q_raw.casefold().strip() | |
| rel_val = str(r[rel_list_col]) if pd.notna(r[rel_list_col]) else "" | |
| if not rel_val: | |
| rows.append({ | |
| "question_raw": q_raw, | |
| "question": q_norm, | |
| "doc": None, | |
| "page": np.nan | |
| }) | |
| continue | |
| parts = [p.strip() for p in re_split_sc(rel_val)] | |
| for d in parts: | |
| rows.append({ | |
| "question_raw": q_raw, | |
| "question": q_norm, | |
| "doc": filename_key(d), | |
| "page": np.nan | |
| }) | |
| # Case 2: doc/page columns (one relevant doc per row) | |
| elif doc_col: | |
| for _, r in df.iterrows(): | |
| q_raw = str(r[q_col]).strip() | |
| q_norm = q_raw.casefold().strip() | |
| d = str(r[doc_col]).strip() if pd.notna(r[doc_col]) else "" | |
| p = r[page_col] if (page_col and pd.notna(r[page_col])) else np.nan | |
| try: | |
| p = int(p) | |
| except Exception: | |
| p = np.nan | |
| rows.append({ | |
| "question_raw": q_raw, | |
| "question": q_norm, | |
| "doc": filename_key(d), | |
| "page": p | |
| }) | |
| else: | |
| raise ValueError("Gold CSV must contain either a 'doc' column or a 'relevant_docs' column.") | |
| gold = pd.DataFrame(rows) | |
| # Keep only rows with a valid doc (when docs exist) | |
| gold["has_doc"] = gold["doc"].apply(lambda x: isinstance(x, str) and len(x) > 0) | |
| if gold["has_doc"].any(): | |
| gold = gold[gold["has_doc"]].copy() | |
| gold.drop(columns=["has_doc"], inplace=True, errors="ignore") | |
| # Remove duplicates | |
| gold = gold.drop_duplicates(subset=["question", "doc", "page"]) | |
| return gold | |
| # ----------------------------- Metric Core ----------------------------- # | |
| def dcg_at_k(relevances: List[int]) -> float: | |
| dcg = 0.0 | |
| for i, rel in enumerate(relevances, start=1): | |
| if rel > 0: | |
| dcg += 1.0 / np.log2(i + 1.0) | |
| return float(dcg) | |
| def ndcg_at_k(relevances: List[int]) -> float: | |
| dcg = dcg_at_k(relevances) | |
| ideal = sorted(relevances, reverse=True) | |
| idcg = dcg_at_k(ideal) | |
| if idcg == 0.0: | |
| return 0.0 | |
| return float(dcg / idcg) | |
| def compute_metrics_for_question(gold_docs, gold_pages, hits, k): | |
| top = hits[:k] if hits else [] | |
| pred_docs = [filename_key(h.get("doc", "")) for h in top] | |
| pred_pairs = [(filename_key(h.get("doc", "")), h.get("page", None)) for h in top] | |
| # --- Doc-level metrics --- | |
| gold_doc_set = set([d for d in gold_docs if isinstance(d, str) and d]) | |
| rel_bin_doc = [1 if d in gold_doc_set else 0 for d in pred_docs] | |
| hitk_doc = 1 if any(rel_bin_doc) else 0 | |
| prec_doc = (sum(rel_bin_doc) / max(1, len(pred_docs))) if pred_docs else 0.0 | |
| rec_doc = (sum(rel_bin_doc) / max(1, len(gold_doc_set))) if gold_doc_set else 0.0 | |
| ndcg_doc = ndcg_at_k(rel_bin_doc) | |
| # --- Page-level metrics (only if gold has page labels) --- | |
| gold_pairs = set() | |
| for d, p in zip(gold_docs, gold_pages): | |
| if isinstance(d, str) and d and (p is not None) and (not (isinstance(p, float) and np.isnan(p))): | |
| try: | |
| p_int = int(p) | |
| except Exception: | |
| continue | |
| gold_pairs.add((d, p_int)) | |
| if gold_pairs: | |
| rel_bin_page = [] | |
| for (d, p) in pred_pairs: | |
| if p is None or not isinstance(p, int): | |
| rel_bin_page.append(0) | |
| else: | |
| rel_bin_page.append(1 if (d, p) in gold_pairs else 0) | |
| hitk_page = 1 if any(rel_bin_page) else 0 | |
| prec_page = (sum(rel_bin_page) / max(1, len(pred_pairs))) if pred_pairs else 0.0 | |
| rec_page = (sum(rel_bin_page) / max(1, len(gold_pairs))) if gold_pairs else 0.0 | |
| ndcg_page = ndcg_at_k(rel_bin_page) | |
| else: | |
| hitk_page = prec_page = rec_page = ndcg_page = np.nan | |
| return { | |
| "hit@k_doc": hitk_doc, | |
| "precision@k_doc": prec_doc, | |
| "recall@k_doc": rec_doc, | |
| "ndcg@k_doc": ndcg_doc, | |
| "hit@k_page": hitk_page, | |
| "precision@k_page": prec_page, | |
| "recall@k_page": rec_page, | |
| "ndcg@k_page": ndcg_page, | |
| "n_gold_docs": int(len(gold_doc_set)), | |
| "n_gold_doc_pages": int(len(gold_pairs)), | |
| "n_pred": int(len(pred_docs)) | |
| } | |
| # ----------------------------- Orchestration ----------------------------- # | |
| # === Dark blue and accent colors === | |
| COLOR_TITLE = "\033[94m" # light blue for titles | |
| COLOR_TEXT = "\033[34m" # dark blue | |
| COLOR_ACCENT = "\033[36m" # cyan for metrics | |
| COLOR_RESET = "\033[0m" | |
| def _fmt(x: Any) -> str: | |
| try: | |
| return f"{float(x):.3f}" | |
| except Exception: | |
| return "-" | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--gold_csv", required=True, type=str) | |
| ap.add_argument("--logs_jsonl", required=True, type=str) | |
| ap.add_argument("--k", type=int, default=8) | |
| ap.add_argument("--out_dir", type=str, default="rag_artifacts") | |
| args = ap.parse_args() | |
| out_dir = Path(args.out_dir) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| gold_path = Path(args.gold_csv) | |
| logs_path = Path(args.logs_jsonl) | |
| if not gold_path.exists(): | |
| print(f"{COLOR_TEXT}❌ gold.csv not found at {gold_path}{COLOR_RESET}", file=sys.stderr) | |
| sys.exit(0) | |
| if not logs_path.exists() or logs_path.stat().st_size == 0: | |
| print(f"{COLOR_TEXT}❌ logs JSONL not found or empty at {logs_path}{COLOR_RESET}", file=sys.stderr) | |
| sys.exit(0) | |
| # Read gold | |
| try: | |
| gold = read_gold(gold_path) | |
| except Exception as e: | |
| print(f"{COLOR_TEXT}❌ Failed to read gold: {e}{COLOR_RESET}", file=sys.stderr) | |
| sys.exit(0) | |
| # Read logs (with robust aggregation) | |
| try: | |
| logs = read_logs(logs_path) | |
| except Exception as e: | |
| print(f"{COLOR_TEXT}❌ Failed to read logs: {e}{COLOR_RESET}", file=sys.stderr) | |
| sys.exit(0) | |
| if gold.empty: | |
| print(f"{COLOR_TEXT}❌ Gold file contains no usable rows.{COLOR_RESET}", file=sys.stderr) | |
| sys.exit(0) | |
| if logs.empty: | |
| print(f"{COLOR_TEXT}❌ Logs file contains no usable entries.{COLOR_RESET}", file=sys.stderr) | |
| sys.exit(0) | |
| # Build gold dict: normalized_question -> list of (doc, page) | |
| gdict: Dict[str, List[Tuple[str, Optional[int]]]] = {} | |
| for _, r in gold.iterrows(): | |
| q = str(r["question"]).strip() | |
| d = r["doc"] | |
| p = r["page"] if "page" in r else np.nan | |
| gdict.setdefault(q, []).append((d, p)) | |
| # Normalize log questions for join | |
| logs["q_norm"] = logs["question"].astype(str).str.casefold().str.strip() | |
| perq_rows = [] | |
| not_in_logs, not_in_gold = [], [] | |
| # For each gold question, compute metrics using logs | |
| for q_norm, pairs in gdict.items(): | |
| row = logs[logs["q_norm"] == q_norm] | |
| gdocs = [d for (d, _) in pairs] | |
| gpages = [p for (_, p) in pairs] | |
| if row.empty: | |
| # No logs for this gold question → zero retrieval | |
| not_in_logs.append(q_norm) | |
| metrics = { | |
| "hit@k_doc": 0, | |
| "precision@k_doc": 0.0, | |
| "recall@k_doc": 0.0, | |
| "ndcg@k_doc": 0.0, | |
| "hit@k_page": np.nan, | |
| "precision@k_page": np.nan, | |
| "recall@k_page": np.nan, | |
| "ndcg@k_page": np.nan, | |
| "n_gold_docs": int(len(set([d for d in gdocs if isinstance(d, str) and d]))), | |
| "n_gold_doc_pages": int(len([ | |
| (d, p) for (d, p) in zip(gdocs, gpages) | |
| if isinstance(d, str) and d and pd.notna(p) | |
| ])), | |
| "n_pred": 0 | |
| } | |
| perq_rows.append({ | |
| "question": q_norm, | |
| "covered_in_logs": 0, | |
| **metrics | |
| }) | |
| continue | |
| # Use aggregated hits from read_logs | |
| hits = row.iloc[0]["hits"] or [] | |
| metrics = compute_metrics_for_question(gdocs, gpages, hits, args.k) | |
| perq_rows.append({ | |
| "question": q_norm, | |
| "covered_in_logs": 1, | |
| **metrics | |
| }) | |
| # Any log questions not in gold | |
| gold_qs = set(gdict.keys()) | |
| for qn in logs["q_norm"].tolist(): | |
| if qn not in gold_qs: | |
| not_in_gold.append(qn) | |
| perq = pd.DataFrame(perq_rows) | |
| covered = perq[perq["covered_in_logs"] == 1].copy() | |
| agg = { | |
| "questions_total_gold": int(len(gdict)), | |
| "questions_covered_in_logs": int(covered.shape[0]), | |
| "questions_missing_in_logs": int(len(not_in_logs)), | |
| "questions_in_logs_not_in_gold": int(len(set(not_in_gold))), | |
| "k": int(args.k), | |
| "mean_hit@k_doc": float(covered["hit@k_doc"].mean()) if not covered.empty else 0.0, | |
| "mean_precision@k_doc": float(covered["precision@k_doc"].mean()) if not covered.empty else 0.0, | |
| "mean_recall@k_doc": float(covered["recall@k_doc"].mean()) if not covered.empty else 0.0, | |
| "mean_ndcg@k_doc": float(covered["ndcg@k_doc"].mean()) if not covered.empty else 0.0, | |
| "mean_hit@k_page": float(covered["hit@k_page"].dropna().mean()) if covered["hit@k_page"].notna().any() else None, | |
| "mean_precision@k_page": float(covered["precision@k_page"].dropna().mean()) if covered["precision@k_page"].notna().any() else None, | |
| "mean_recall@k_page": float(covered["recall@k_page"].dropna().mean()) if covered["recall@k_page"].notna().any() else None, | |
| "mean_ndcg@k_page": float(covered["ndcg@k_page"].dropna().mean()) if covered["ndcg@k_page"].notna().any() else None, | |
| "avg_gold_docs_per_q": float(perq["n_gold_docs"].mean()) if not perq.empty else 0.0, | |
| "avg_preds_per_q": float(perq["n_pred"].mean()) if not perq.empty else 0.0, | |
| "examples_missing_in_logs": list(not_in_logs[:10]), | |
| "examples_in_logs_not_in_gold": list(dict.fromkeys(not_in_gold))[:10], | |
| } | |
| perq_path = out_dir / "metrics_per_question.csv" | |
| agg_path = out_dir / "metrics_aggregate.json" | |
| perq.to_csv(perq_path, index=False) | |
| with open(agg_path, "w", encoding="utf-8") as f: | |
| json.dump(agg, f, ensure_ascii=False, indent=2) | |
| # === Console summary with color === | |
| print(f"{COLOR_TITLE}RAG Evaluation Summary{COLOR_RESET}") | |
| print(f"{COLOR_TITLE}----------------------{COLOR_RESET}") | |
| print(f"{COLOR_TEXT}Gold questions: {COLOR_ACCENT}{agg['questions_total_gold']}{COLOR_RESET}") | |
| print(f"{COLOR_TEXT}Covered in logs: {COLOR_ACCENT}{agg['questions_covered_in_logs']}{COLOR_RESET}") | |
| print(f"{COLOR_TEXT}Missing in logs: {COLOR_ACCENT}{agg['questions_missing_in_logs']}{COLOR_RESET}") | |
| print(f"{COLOR_TEXT}In logs but not in gold: {COLOR_ACCENT}{agg['questions_in_logs_not_in_gold']}{COLOR_RESET}") | |
| print(f"{COLOR_TEXT}k = {COLOR_ACCENT}{agg['k']}{COLOR_RESET}\n") | |
| print( | |
| f"{COLOR_TEXT}Doc-level:{COLOR_RESET} " | |
| f"{COLOR_ACCENT}Hit@k={_fmt(agg['mean_hit@k_doc'])} " | |
| f"Precision@k={_fmt(agg['mean_precision@k_doc'])} " | |
| f"Recall@k={_fmt(agg['mean_recall@k_doc'])} " | |
| f"nDCG@k={_fmt(agg['mean_ndcg@k_doc'])}{COLOR_RESET}" | |
| ) | |
| if agg['mean_hit@k_page'] is not None: | |
| print( | |
| f"{COLOR_TEXT}Page-level:{COLOR_RESET} " | |
| f"{COLOR_ACCENT}Hit@k={_fmt(agg['mean_hit@k_page'])} " | |
| f"Precision@k={_fmt(agg['mean_precision@k_page'])} " | |
| f"Recall@k={_fmt(agg['mean_recall@k_page'])} " | |
| f"nDCG@k={_fmt(agg['mean_ndcg@k_page'])}{COLOR_RESET}" | |
| ) | |
| else: | |
| print(f"{COLOR_TEXT}Page-level: (no page labels in gold){COLOR_RESET}") | |
| print() | |
| print(f"{COLOR_TEXT}Wrote per-question CSV → {COLOR_ACCENT}{perq_path}{COLOR_RESET}") | |
| print(f"{COLOR_TEXT}Wrote aggregate JSON → {COLOR_ACCENT}{agg_path}{COLOR_RESET}") | |
| if __name__ == "__main__": | |
| main() | |