Spaces:
Sleeping
Sleeping
| import argparse | |
| import json | |
| import re | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| from app.db.sqlite import init_db | |
| from app.rag.graph import ClaimsRAGGraph | |
| from app.rag.ingestion import DocumentIngestionService | |
| from app.rag.qdrant_store import QdrantVectorStore | |
| DECISION_RE = re.compile(r"Decision:\s*(Likely covered|Likely not covered|Needs human review)", re.I) | |
| def load_jsonl(path: Path) -> list[dict[str, Any]]: | |
| rows = [] | |
| for line in path.read_text(encoding="utf-8").splitlines(): | |
| if line.strip(): | |
| rows.append(json.loads(line)) | |
| return rows | |
| def extract_decision(answer: str) -> str: | |
| match = DECISION_RE.search(answer) | |
| if not match: | |
| return "Unknown" | |
| return match.group(1).capitalize().replace("Not", "not") | |
| def accepted_decisions(case: dict[str, Any]) -> set[str]: | |
| values = case.get("acceptable_decisions") or [case["expected_decision"]] | |
| return {str(v).lower() for v in values} | |
| def evaluate(dataset_path: Path, user_id: str, limit: int | None = None) -> dict[str, Any]: | |
| init_db() | |
| QdrantVectorStore().ensure_collections() | |
| DocumentIngestionService().ingest_pdf_directory() | |
| graph = ClaimsRAGGraph() | |
| cases = load_jsonl(dataset_path) | |
| if limit: | |
| cases = cases[:limit] | |
| results = [] | |
| for case in cases: | |
| started = time.perf_counter() | |
| state = graph.run(case["query"], user_id=user_id, use_cache=False) | |
| latency_ms = round((time.perf_counter() - started) * 1000, 2) | |
| answer = state.get("answer", "") | |
| decision = extract_decision(answer) | |
| sources = state.get("reranked_sources") or state.get("sources", []) | |
| critique = state.get("self_rag", {}) | |
| decision_ok = decision.lower() in accepted_decisions(case) | |
| sources_ok = bool(sources) if case.get("must_have_sources", True) else True | |
| self_rag_ok = bool(critique.get("isrel")) and bool(critique.get("issup")) and bool(critique.get("isuse")) | |
| passed = decision_ok and sources_ok and self_rag_ok | |
| results.append( | |
| { | |
| "id": case["id"], | |
| "expected": case["expected_decision"], | |
| "decision": decision, | |
| "decision_ok": decision_ok, | |
| "sources": len(sources), | |
| "sources_ok": sources_ok, | |
| "self_rag": { | |
| "ISREL": critique.get("isrel"), | |
| "ISSUP": critique.get("issup"), | |
| "ISUSE": critique.get("isuse"), | |
| }, | |
| "self_rag_ok": self_rag_ok, | |
| "latency_ms": latency_ms, | |
| "passed": passed, | |
| } | |
| ) | |
| total = len(results) | |
| summary = { | |
| "total": total, | |
| "passed": sum(1 for r in results if r["passed"]), | |
| "decision_accuracy": round(sum(1 for r in results if r["decision_ok"]) / total, 3) if total else 0, | |
| "source_rate": round(sum(1 for r in results if r["sources_ok"]) / total, 3) if total else 0, | |
| "self_rag_pass_rate": round(sum(1 for r in results if r["self_rag_ok"]) / total, 3) if total else 0, | |
| "avg_latency_ms": round(sum(r["latency_ms"] for r in results) / total, 2) if total else 0, | |
| "results": results, | |
| } | |
| return summary | |
| def main() -> None: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dataset", default="data/eval/golden_claim_scenarios.jsonl") | |
| parser.add_argument("--user-id", default="eval_user") | |
| parser.add_argument("--limit", type=int, default=None) | |
| args = parser.parse_args() | |
| summary = evaluate(Path(args.dataset), args.user_id, args.limit) | |
| print(json.dumps(summary, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |