Spaces:
Sleeping
Sleeping
| """ | |
| eval/eval_rag.py | |
| RAG pipeline evaluation with LLM-as-Judge quality metrics. | |
| Quality metrics (per-query, inline): | |
| faithfulness - are answer claims grounded in retrieved code? | |
| 1. LLM extracts atomic claims from the answer | |
| 2. LLM verifies all claims in one batched call | |
| 3. score = verified / total | |
| relevancy - does the answer address the question? | |
| 1. LLM generates 3 questions the answer would address | |
| 2. Embed those + original query (all-MiniLM-L6-v2) | |
| 3. score = mean cosine similarity | |
| No external eval libraries. Ollama (local GPU) for LLM calls. | |
| Usage: | |
| python eval/eval_rag.py --profile A5 | |
| python eval/eval_rag.py --tier 1 | |
| python eval/eval_rag.py | |
| python eval/eval_rag.py --no-judge # keyword scoring only, fastest | |
| python eval/eval_rag.py --query T1-005 | |
| python eval/eval_rag.py --dry-run | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| import requests | |
| import yaml | |
| from dotenv import load_dotenv | |
| from sentence_transformers import SentenceTransformer | |
| load_dotenv() | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from app.judge import LLMJudge | |
| from app.retrieval import Retriever | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)-8s %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| log = logging.getLogger(__name__) | |
| # Config | |
| def load_config() -> dict: | |
| with open("config.yaml") as f: | |
| return yaml.safe_load(f) | |
| def load_queries(path: str) -> list[dict]: | |
| with open(path) as f: | |
| return yaml.safe_load(f)["queries"] | |
| def load_baseline(path: str) -> dict: | |
| with open(path) as f: | |
| data = json.load(f) | |
| return {q["query_id"]: q for q in data.get("per_query", [])} | |
| # Context builder | |
| def build_context(chunks: list[dict]) -> str: | |
| parts = [] | |
| for c in chunks: | |
| fp = c.get("filepath", "?") | |
| fn = c.get("function_name", "?") | |
| code = c.get("raw_code", c.get("_text", ""))[:700] | |
| parts.append(f"{fp}::{fn}\n```python\n{code}\n```") | |
| return "\n\n".join(parts) | |
| # Keyword scoring | |
| def score_keywords(answer: str, keywords: list[str]) -> dict: | |
| if not answer or not keywords: | |
| return {"score": None, "found": [], "missed": []} | |
| al = answer.lower() | |
| found = [k for k in keywords if k.lower() in al] | |
| missed = [k for k in keywords if k.lower() not in al] | |
| return {"score": len(found) / len(keywords), "found": found, "missed": missed} | |
| # Main eval loop | |
| def run_eval(queries, retriever, system_prompt, gen_model, judge, | |
| embed_model, method, top_k, kw_threshold, | |
| run_judge, dry_run) -> list[dict]: | |
| results = [] | |
| for i, q in enumerate(queries, 1): | |
| qid = q["id"] | |
| query = q["query"] | |
| kws = q.get("keywords", []) | |
| log.info("[%d/%d] %s - %s", i, len(queries), qid, query[:65]) | |
| # Retrieve | |
| t0 = time.time() | |
| chunks = retriever.retrieve(query, method=method, top_k=top_k) | |
| ret_t = round(time.time() - t0, 2) | |
| repo_counts: dict[str, int] = {} | |
| for c in chunks: | |
| r = c.get("repo", "?") | |
| repo_counts[r] = repo_counts.get(r, 0) + 1 | |
| log.info(" Retrieved: %s (%.2fs)", | |
| ", ".join(f"{r}x{n}" for r, n in repo_counts.items()), ret_t) | |
| context = build_context(chunks) | |
| if dry_run: | |
| results.append({ | |
| "query_id": qid, "tier": q["tier"], "query": query, | |
| "answer": "[DRY RUN]", "chunks_retrieved": len(chunks), | |
| "kw_score": None, "kw_passed": None, | |
| "kw_found": [], "kw_missed": kws, | |
| "faithfulness": None, "faithfulness_details": {}, | |
| "relevancy": None, "relevancy_details": {}, | |
| "ret_time_s": ret_t, "gen_time_s": 0, "method": method, | |
| }) | |
| continue | |
| # Generate | |
| user_msg = f"Retrieved source code:\n\n{context}\n\n---\n\nQuestion: {query}" | |
| t1 = time.time() | |
| answer = judge.call_ollama( | |
| [{"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_msg}], | |
| model=gen_model, max_tokens=600, temp=0.1, | |
| ) | |
| gen_t = round(time.time() - t1, 2) | |
| if answer is None: | |
| log.warning(" No answer - skipping") | |
| results.append({ | |
| "query_id": qid, "tier": q["tier"], "query": query, | |
| "answer": None, "chunks_retrieved": len(chunks), | |
| "kw_score": None, "kw_passed": None, | |
| "kw_found": [], "kw_missed": kws, | |
| "faithfulness": None, "faithfulness_details": {}, | |
| "relevancy": None, "relevancy_details": {}, | |
| "ret_time_s": ret_t, "gen_time_s": gen_t, "method": method, | |
| }) | |
| continue | |
| # Keyword score | |
| kw = score_keywords(answer, kws) | |
| kw_pass = (kw["score"] >= kw_threshold | |
| if kw["score"] is not None else None) | |
| log.info(" KW: %.2f (%d/%d) - %s", | |
| kw["score"] or 0, len(kw["found"]), len(kws), | |
| "PASS" if kw_pass else "FAIL") | |
| # LLM-as-Judge (inline, per query) | |
| faith_val, faith_details = None, {} | |
| relev_val, relev_details = None, {} | |
| if run_judge and chunks and embed_model is not None: | |
| faith_val, faith_details = judge.faithfulness(answer, context) | |
| relev_val, relev_details = judge.relevancy(query, answer, embed_model) | |
| if faith_val is not None: | |
| log.info(" Judge: faithfulness=%.3f relevancy=%.3f", | |
| faith_val, relev_val or 0) | |
| else: | |
| log.warning(" Judge: scoring failed - check debug logs") | |
| results.append({ | |
| "query_id": qid, | |
| "tier": q["tier"], | |
| "query": query, | |
| "answer": answer, | |
| "chunks_retrieved": len(chunks), | |
| "repos_hit": list(repo_counts.keys()), | |
| "kw_score": kw["score"], | |
| "kw_passed": kw_pass, | |
| "kw_found": kw["found"], | |
| "kw_missed": kw["missed"], | |
| "faithfulness": faith_val, | |
| "faithfulness_details": faith_details, | |
| "relevancy": relev_val, | |
| "relevancy_details": relev_details, | |
| "ret_time_s": ret_t, | |
| "gen_time_s": gen_t, | |
| "method": method, | |
| "condition": "rag", | |
| }) | |
| return results | |
| # Report | |
| def print_report(results: list[dict], baseline: dict) -> dict: | |
| log.info("") | |
| log.info("=" * 72) | |
| log.info("RAG Evaluation Results") | |
| log.info("=" * 72) | |
| scored = [r for r in results if r.get("kw_score") is not None] | |
| passed = [r for r in scored if r.get("kw_passed")] | |
| judged = [r for r in scored | |
| if r.get("faithfulness") is not None | |
| and r.get("relevancy") is not None] | |
| for tier in sorted(set(r["tier"] for r in results)): | |
| tv = [r for r in scored if r["tier"] == tier] | |
| if not tv: | |
| continue | |
| tp = sum(1 for r in tv if r.get("kw_passed")) | |
| avg = float(np.mean([r["kw_score"] for r in tv])) | |
| bl_sc = [baseline[r["query_id"]]["score"] | |
| for r in tv | |
| if r["query_id"] in baseline | |
| and baseline[r["query_id"]].get("score") is not None] | |
| bl_avg = float(np.mean(bl_sc)) if bl_sc else None | |
| delta = (avg - bl_avg) if bl_avg is not None else None | |
| extra = (f" baseline {bl_avg:.3f} {delta:+.3f}" | |
| if delta is not None else "") | |
| log.info(" Tier %d: %d/%d passed (%.0f%%) KW %.3f%s", | |
| tier, tp, len(tv), 100 * tp / len(tv), avg, extra) | |
| log.info("") | |
| rag_mean = float(np.mean([r["kw_score"] for r in scored])) if scored else 0 | |
| log.info(" Queries answered : %d / %d", len(scored), len(results)) | |
| log.info(" Passed : %d (%.1f%%)", | |
| len(passed), 100 * len(passed) / len(scored) if scored else 0) | |
| log.info(" Avg KW score : %.3f", rag_mean) | |
| bl_all = [baseline[r["query_id"]]["score"] | |
| for r in scored | |
| if r["query_id"] in baseline | |
| and baseline[r["query_id"]].get("score") is not None] | |
| if bl_all: | |
| bl_mean = float(np.mean(bl_all)) | |
| log.info(" Baseline avg : %.3f", bl_mean) | |
| log.info(" RAG improvement : %+.3f (%.1f%% relative)", | |
| rag_mean - bl_mean, | |
| 100 * (rag_mean - bl_mean) / bl_mean if bl_mean else 0) | |
| avg_f = avg_r = None | |
| if judged: | |
| avg_f = float(np.mean([r["faithfulness"] for r in judged])) | |
| avg_r = float(np.mean([r["relevancy"] for r in judged])) | |
| log.info("") | |
| log.info(" LLM-as-Judge (n=%d):", len(judged)) | |
| log.info(" Faithfulness (claim-based) : %.3f", avg_f) | |
| log.info(" Relevancy (cosine sim) : %.3f", avg_r) | |
| log.info("") | |
| ret_t = [r["ret_time_s"] for r in results if r.get("ret_time_s")] | |
| gen_t = [r["gen_time_s"] for r in results if r.get("gen_time_s")] | |
| if ret_t: log.info(" Avg retrieval time : %.2fs", np.mean(ret_t)) | |
| if gen_t: log.info(" Avg gen time : %.2fs", np.mean(gen_t)) | |
| log.info("=" * 72) | |
| return { | |
| "condition": "rag", | |
| "total": len(results), | |
| "scored": len(scored), | |
| "passed": len(passed), | |
| "pass_rate": len(passed) / len(scored) if scored else 0, | |
| "avg_kw_score": rag_mean, | |
| "avg_faithfulness": avg_f, | |
| "avg_relevancy": avg_r, | |
| "judge_n": len(judged), | |
| "avg_ret_time_s": float(np.mean(ret_t)) if ret_t else None, | |
| "avg_gen_time_s": float(np.mean(gen_t)) if gen_t else None, | |
| } | |
| def save_results(summary: dict, per_query: list[dict], output_dir: Path, profile: str = "A5"): | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| path = output_dir / f"rag_eval_{profile.lower()}.json" | |
| with path.open("w") as f: | |
| json.dump({"summary": summary, "per_query": per_query}, f, indent=2) | |
| log.info("Saved -> %s", path) | |
| # Main | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--profile", type=str, choices=["A1", "A2", "A3", "A4", "A5"], default="A5", help="Ablation profile (default: A5)") | |
| parser.add_argument("--method", default="hybrid", choices=["hybrid", "dense", "bm25"]) | |
| parser.add_argument("--top-k", type=int, default=5) | |
| parser.add_argument("--tier", type=int, default=None) | |
| parser.add_argument("--query", type=str, default=None) | |
| parser.add_argument("--threshold",type=float, default=0.4) | |
| parser.add_argument("--no-judge", action="store_true", help="Keyword scoring only - skip LLM-as-Judge") | |
| parser.add_argument("--dry-run", action="store_true") | |
| args = parser.parse_args() | |
| cfg = load_config() | |
| profile = cfg["profiles"][args.profile] | |
| strategy = profile["chunking"] # e.g., "function", "fixed", "recursive" | |
| gen_cfg = cfg["generation"] | |
| system_prompt = gen_cfg["system_prompt"].strip() | |
| gen_model = gen_cfg["model"] | |
| judge_model = cfg["evaluation"].get("judge", gen_model) | |
| output_dir = Path(cfg["evaluation"]["results_dir"]) | |
| queries = load_queries(cfg["evaluation"]["test_queries_path"]) | |
| if args.tier: queries = [q for q in queries if q["tier"] == args.tier] | |
| if args.query: queries = [q for q in queries if q["id"] == args.query] | |
| if not queries: | |
| log.error("No queries matched") | |
| sys.exit(1) | |
| baseline_path = output_dir / "baseline.json" | |
| baseline = {} | |
| if baseline_path.exists(): | |
| baseline = load_baseline(str(baseline_path)) | |
| log.info("Loaded baseline: %d queries", len(baseline)) | |
| else: | |
| log.warning("No baseline.json - run eval_baseline.py first") | |
| if not args.dry_run: | |
| try: | |
| requests.get("http://localhost:11434", timeout=3) | |
| except Exception: | |
| log.error("Ollama not reachable. Run: ollama serve") | |
| sys.exit(1) | |
| # Load embed model for relevancy scoring | |
| embed_model = None | |
| if not args.no_judge and not args.dry_run: | |
| emb_name = cfg["embedding"]["model"] | |
| log.info("Loading embed model: %s", emb_name) | |
| embed_model = SentenceTransformer(emb_name) | |
| log.info("=" * 72) | |
| log.info("Substrate - RAG Evaluation") | |
| log.info("Profile: %s (%s chunking + %s retrieval)", args.profile, profile["chunking"], profile["retrieval"]) | |
| log.info("Model : %s (judge: %s)", gen_model, judge_model) | |
| log.info("Method : %s top-k=%d", args.method, args.top_k) | |
| log.info("Queries: %d llm-as-judge: %s", len(queries), not args.no_judge) | |
| log.info("=" * 72) | |
| retriever = Retriever(strategy=strategy) | |
| retriever.load() | |
| # Initialize LLM judge (embedding model passed separately to relevancy method) | |
| judge = LLMJudge(judge_model=judge_model) | |
| per_query = run_eval( | |
| queries=queries, | |
| retriever=retriever, | |
| system_prompt=system_prompt, | |
| gen_model=gen_model, | |
| judge=judge, | |
| embed_model=embed_model, | |
| method=args.method, | |
| top_k=args.top_k, | |
| kw_threshold=args.threshold, | |
| run_judge=not args.no_judge, | |
| dry_run=args.dry_run, | |
| ) | |
| summary = print_report(per_query, baseline) | |
| save_results(summary, per_query, output_dir, args.profile) | |
| log.info("Done.") | |
| if __name__ == "__main__": | |
| main() |