Spaces:
Sleeping
Sleeping
| """ | |
| Unified Evaluation Runner for RAG System | |
| This script provides comprehensive evaluation capabilities including: | |
| - Deterministic groundedness evaluation with reproducible scoring | |
| - Enhanced citation accuracy validation | |
| - Performance benchmarking and latency analysis | |
| - Comprehensive evaluation metrics and reporting | |
| Features: | |
| - LLM-based groundedness evaluation (with fallback to token overlap) | |
| - Citation accuracy checking with filename validation | |
| - Deterministic evaluation with fixed seeds for reproducibility | |
| - Performance tier analysis (fast/normal/slow responses) | |
| - Comprehensive reporting with statistical analysis | |
| """ | |
| import json | |
| import os | |
| import statistics | |
| import time | |
| from typing import Any, Dict, List | |
| import requests | |
| from tqdm import tqdm | |
| ROOT = os.path.dirname(os.path.abspath(__file__)) | |
| EVAL_DIR = os.path.join(ROOT) | |
| QUESTIONS_FILE = os.path.join(EVAL_DIR, "questions.json") | |
| GOLD_FILE = os.path.join(EVAL_DIR, "gold_answers.json") | |
| OUT_FILE = os.path.join(EVAL_DIR, "results.json") | |
| EVAL_RESULTS_DIR = os.path.join(os.path.dirname(EVAL_DIR), "evaluation_results") | |
| os.makedirs(EVAL_RESULTS_DIR, exist_ok=True) | |
| TARGET_URL = os.getenv("EVAL_TARGET_URL", "https://msse-team-3-ai-engineering-project.hf.space") | |
| CHAT_ENDPOINT = os.getenv("EVAL_CHAT_PATH", "/chat") | |
| TIMEOUT = int(os.getenv("EVAL_TIMEOUT", "30")) | |
| def load_json(path: str) -> Any: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def token_overlap_score(gold: str, response: str) -> float: | |
| """Simple partial match score based on token overlap.""" | |
| gold_tokens = set(gold.lower().split()) | |
| resp_tokens = set(response.lower().split()) | |
| if not gold_tokens: | |
| return 0.0 | |
| overlap = gold_tokens & resp_tokens | |
| return len(overlap) / len(gold_tokens) | |
| def citation_matches(expected: List[str], returned_sources: List[Dict[str, Any]]) -> float: | |
| """Fraction of expected sources that appear in returned sources by filename match.""" | |
| # If no expected sources, treat as correct only if model returned none | |
| if not expected: | |
| return 1.0 if not returned_sources else 0.0 | |
| # Helper: normalize a filename or url -> lowercase basename without common extensions | |
| import os | |
| import re | |
| from difflib import SequenceMatcher | |
| def normalize(s: str) -> str: | |
| if not s: | |
| return "" | |
| s = s.strip() | |
| # If it's a URL or path-like, take the basename | |
| # Remove query string / fragments | |
| s = re.sub(r"[?#].*$", "", s) | |
| base = os.path.basename(s) | |
| # remove common extensions | |
| base = re.sub(r"\.(md|markdown|txt|html|htm|pdf|csv|json|yaml|yml|py|ipynb)$", "", base, flags=re.IGNORECASE) | |
| return base.lower() | |
| # Build a set of normalized returned filenames from various possible keys | |
| returned_filenames = set() | |
| for s in returned_sources or []: | |
| # s may be a dict containing keys like filename, source_file, file, url, path | |
| if isinstance(s, dict): | |
| candidates = [s.get(k) for k in ("filename", "source_file", "file", "url", "path", "source")] | |
| # also some sources embed metadata | |
| meta = s.get("metadata") or {} | |
| if isinstance(meta, dict): | |
| candidates += [meta.get(k) for k in ("filename", "file", "source_file")] | |
| else: | |
| # s might be a plain string | |
| candidates = [s] | |
| for c in candidates: | |
| if c: | |
| returned_filenames.add(normalize(str(c))) | |
| # Now for each expected source, try exact normalized match, substring, or fuzzy match | |
| matched = 0 | |
| # threshold can be tuned via environment variable | |
| try: | |
| env_thresh = float(os.getenv("EVAL_CITATION_FUZZY_THRESHOLD", "0.72")) | |
| except Exception: | |
| env_thresh = 0.72 | |
| for e in expected: | |
| ne = normalize(str(e)) | |
| if not ne: | |
| continue | |
| found = False | |
| # exact | |
| if ne in returned_filenames: | |
| found = True | |
| else: | |
| # substring match | |
| for rf in returned_filenames: | |
| if ne in rf or rf in ne: | |
| found = True | |
| break | |
| if not found: | |
| # fuzzy match using SequenceMatcher | |
| best = 0.0 | |
| for rf in returned_filenames: | |
| if not rf: | |
| continue | |
| score = SequenceMatcher(None, ne, rf).ratio() | |
| if score > best: | |
| best = score | |
| # treat as match if similarity >= 0.72 (tunable) | |
| if best >= env_thresh: | |
| found = True | |
| if found: | |
| matched += 1 | |
| return matched / len(expected) | |
| def run_eval(target: str = TARGET_URL): | |
| questions = load_json(QUESTIONS_FILE) | |
| golds = load_json(GOLD_FILE) | |
| results = [] | |
| latencies = [] | |
| for q in tqdm(questions, desc="Questions"): | |
| qid = str(q["id"]) | |
| payload = {"message": q["question"], "include_sources": True} | |
| url = target.rstrip("/") + CHAT_ENDPOINT | |
| start = time.time() | |
| try: | |
| r = requests.post(url, json=payload, timeout=TIMEOUT) | |
| latency = time.time() - start | |
| latencies.append(latency) | |
| if r.status_code != 200: | |
| results.append( | |
| { | |
| "id": qid, | |
| "question": q["question"], | |
| "status_code": r.status_code, | |
| "error": r.text, | |
| } | |
| ) | |
| continue | |
| data = r.json() | |
| response_text = data.get("response", "") | |
| returned_sources = data.get("sources", []) or [] | |
| gold_answer = golds.get(qid, {}).get("answer", "") | |
| expected_sources = golds.get(qid, {}).get("expected_sources", []) | |
| overlap = token_overlap_score(gold_answer, response_text) | |
| citation_acc = citation_matches(expected_sources, returned_sources) | |
| results.append( | |
| { | |
| "id": qid, | |
| "question": q["question"], | |
| "response": response_text, | |
| "latency_s": latency, | |
| "overlap_score": overlap, | |
| "citation_accuracy": citation_acc, | |
| "returned_sources": returned_sources, | |
| } | |
| ) | |
| except Exception as e: | |
| latency = time.time() - start | |
| latencies.append(latency) | |
| results.append( | |
| { | |
| "id": qid, | |
| "question": q["question"], | |
| "status_code": "error", | |
| "error": str(e), | |
| } | |
| ) | |
| # compute summary metrics | |
| success_latencies = [lat for lat in latencies if lat is not None] | |
| p50 = statistics.median(success_latencies) if success_latencies else None | |
| p95 = sorted(success_latencies)[max(0, int(len(success_latencies) * 0.95) - 1)] if success_latencies else None | |
| # compute averages for overlap and citation (only for successful responses) | |
| overlaps = [r.get("overlap_score") for r in results if isinstance(r.get("overlap_score"), float)] | |
| citations = [r.get("citation_accuracy") for r in results if isinstance(r.get("citation_accuracy"), float)] | |
| summary = { | |
| "target": target, | |
| "n_questions": len(questions), | |
| "latency_p50_s": p50, | |
| "latency_p95_s": p95, | |
| "avg_overlap": sum(overlaps) / len(overlaps) if overlaps else None, | |
| "avg_citation_accuracy": sum(citations) / len(citations) if citations else None, | |
| } | |
| out = {"summary": summary, "results": results} | |
| with open(OUT_FILE, "w", encoding="utf-8") as f: | |
| json.dump(out, f, indent=2) | |
| # Also write a compact summary copy for CI collection | |
| try: | |
| summary_path = os.path.join(EVAL_RESULTS_DIR, "results_summary.json") | |
| with open(summary_path, "w", encoding="utf-8") as sf: | |
| json.dump(summary, sf, indent=2) | |
| except Exception: | |
| pass | |
| print("Evaluation complete. Summary:") | |
| print(json.dumps(summary, indent=2)) | |
| print(f"Results written to {OUT_FILE}") | |
| if __name__ == "__main__": | |
| target = os.getenv("EVAL_TARGET_URL", TARGET_URL) | |
| run_eval(target) | |