""" L2 batch evaluation — runs the golden dataset through the pipeline locally. Usage: python eval/metrics.py # all pairs python eval/metrics.py --domain retail # one domain python eval/metrics.py --domain pharma # one domain python eval/metrics.py --client novamart # one client python eval/metrics.py --out results.json # write JSON + HTML report Requires HF_TOKEN in environment. """ import argparse import json import logging import os import sys from datetime import datetime from pathlib import Path import yaml sys.path.insert(0, str(Path(__file__).parent.parent / "backend")) from huggingface_hub import InferenceClient from pipeline import run log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(message)s") DATASET_PATH = Path(__file__).parent / "golden-dataset.yaml" REPORTS_DIR = Path(__file__).parent / "reports" METRIC_LABELS = { "pii_leakage": "PII Leakage", "token_budget": "Token Budget", "answer_relevancy": "Answer Relevancy", "faithfulness": "Faithfulness", "chain_terminology": "Chain Terminology", } def load_pairs(domain: str | None = None, client: str | None = None) -> list[dict]: data = yaml.safe_load(DATASET_PATH.read_text()) pairs = data["pairs"] if domain: pairs = [p for p in pairs if p["domain"] == domain] if client: pairs = [p for p in pairs if p["client"] == client] return pairs def score_pair(pair: dict, hf_client: InferenceClient) -> dict: """Run one golden pair through the pipeline and return scored result.""" result = run( query=pair["question"], client=pair["client"], hf_client=hf_client, ) payload = result.response_payload metrics = payload["evaluation"]["metrics"] expected = pair.get("expected_contains", []) answer_lower = result.answer.lower() matched = [kw for kw in expected if kw.lower() in answer_lower] keyphrase_coverage = len(matched) / len(expected) if expected else 1.0 return { "id": pair["id"], "client": pair["client"], "domain": pair["domain"], "question": pair["question"], "answer": result.answer, "keyphrase_coverage": round(keyphrase_coverage, 3), "matched_keyphrases": matched, "missing_keyphrases": [kw for kw in expected if kw not in matched], "metrics": metrics, "overall_pass": payload["evaluation"]["overall_pass"], "sources": [s["title"] for s in payload["sources"]], "notes": pair.get("notes", ""), } def print_summary(results: list[dict]) -> None: metric_names = list(results[0]["metrics"].keys()) if results else [] total = len(results) passed = sum(1 for r in results if r["overall_pass"]) log.info("\n── Summary ─────────────────────────────────────") log.info("Pairs evaluated : %d", total) log.info("Overall pass : %d / %d (%.0f%%)", passed, total, 100 * passed / total if total else 0) log.info("\n── Per-metric pass rate ────────────────────────") for name in metric_names: n_pass = sum(1 for r in results if r["metrics"][name]["passed"]) avg_score = sum(r["metrics"][name]["score"] for r in results) / total if total else 0 log.info(" %-22s %d/%d avg %.2f", name, n_pass, total, avg_score) log.info("\n── Keyphrase coverage ──────────────────────────") avg_cov = sum(r["keyphrase_coverage"] for r in results) / total if total else 0 log.info(" Average coverage: %.0f%%", avg_cov * 100) failures = [r for r in results if not r["overall_pass"]] if failures: log.info("\n── Failed pairs ────────────────────────────────") for r in failures: failed_metrics = [m for m, v in r["metrics"].items() if not v["passed"]] log.info(" [%s] %s", r["id"], ", ".join(failed_metrics)) # --------------------------------------------------------------------------- # HTML report # --------------------------------------------------------------------------- def _score_class(score: float, metric: str) -> str: if metric == "pii_leakage": return "pass" if score == 1.0 else "fail" if score >= 0.75: return "pass" if score >= 0.45: return "warn" return "fail" def _metric_cards(metrics: dict) -> str: cards = [] for name, m in metrics.items(): cls = _score_class(m["score"], name) pct = round(m["score"] * 100) label = METRIC_LABELS.get(name, name) cards.append(f"""
{label} {pct}%
""") return "".join(cards) def _pair_row(r: dict) -> str: verdict_cls = "pass" if r["overall_pass"] else "fail" verdict_label = "PASS" if r["overall_pass"] else "FAIL" cov_pct = round(r["keyphrase_coverage"] * 100) cov_cls = "pass" if cov_pct >= 75 else ("warn" if cov_pct >= 45 else "fail") missing = ", ".join(r["missing_keyphrases"]) if r["missing_keyphrases"] else "—" sources = ", ".join(r["sources"]) if r["sources"] else "none" return f"""
{r['id']}
{r['client']}
{verdict_label}
Q: {r['question']}
{r['answer']}
{_metric_cards(r['metrics'])}
Keyphrase coverage: {cov_pct}% Missing: {missing} Sources: {sources}
{f'
{r["notes"]}
' if r["notes"] else ""}
""" def generate_html(results: list[dict], domain: str | None) -> str: total = len(results) passed = sum(1 for r in results if r["overall_pass"]) pass_rate = round(100 * passed / total) if total else 0 metric_names = list(results[0]["metrics"].keys()) if results else [] avg_cov = round(100 * sum(r["keyphrase_coverage"] for r in results) / total) if total else 0 generated = datetime.now().strftime("%Y-%m-%d %H:%M") title = f"Eval Report — {domain or 'all domains'}" summary_pills = "" for name in metric_names: n_pass = sum(1 for r in results if r["metrics"][name]["passed"]) avg = sum(r["metrics"][name]["score"] for r in results) / total if total else 0 cls = "pass" if n_pass == total else ("warn" if n_pass >= total * 0.7 else "fail") summary_pills += f"""
{METRIC_LABELS.get(name, name)}
{n_pass}/{total}
avg {avg:.2f}
""" pair_rows = "".join(_pair_row(r) for r in results) return f""" {title}

AI Response Validator — Eval Report

{title} · Generated {generated}
{total}
Pairs evaluated
{pass_rate}%
Overall pass rate
{avg_cov}%
Keyphrase coverage
Per-metric summary
{summary_pills}
Pair results
{pair_rows} """ # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="L2 batch evaluation against golden dataset") parser.add_argument("--domain", help="Filter by domain (retail|pharma)") parser.add_argument("--client", help="Filter by client id") parser.add_argument("--out", help="Write JSON results to file") args = parser.parse_args() hf_token = os.environ.get("HF_TOKEN") if not hf_token: sys.exit("HF_TOKEN not set") hf_client = InferenceClient(token=hf_token) pairs = load_pairs(domain=args.domain, client=args.client) if not pairs: sys.exit("No pairs matched the given filters") log.info("Evaluating %d pairs...", len(pairs)) results = [] for i, pair in enumerate(pairs, 1): log.info("[%d/%d] %s", i, len(pairs), pair["id"]) results.append(score_pair(pair, hf_client)) print_summary(results) REPORTS_DIR.mkdir(exist_ok=True) suffix = args.domain or args.client or "all" html_path = REPORTS_DIR / f"report_{suffix}.html" html_path.write_text(generate_html(results, args.domain or args.client)) log.info("\nHTML report: %s", html_path) if args.out: Path(args.out).write_text(json.dumps(results, indent=2)) log.info("JSON results: %s", args.out) if __name__ == "__main__": main()