""" L2 batch evaluation — runs the golden dataset through the pipeline locally. Usage: python eval/metrics.py # all pairs python eval/metrics.py --domain retail # one domain python eval/metrics.py --domain pharma # one domain python eval/metrics.py --client novamart # one client python eval/metrics.py --out results.json # write JSON + HTML report Requires HF_TOKEN in environment. """ import argparse import json import logging import os import sys from datetime import datetime from pathlib import Path import yaml sys.path.insert(0, str(Path(__file__).parent.parent / "backend")) from huggingface_hub import InferenceClient from pipeline import run log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(message)s") DATASET_PATH = Path(__file__).parent / "golden-dataset.yaml" REPORTS_DIR = Path(__file__).parent / "reports" METRIC_LABELS = { "pii_leakage": "PII Leakage", "token_budget": "Token Budget", "answer_relevancy": "Answer Relevancy", "faithfulness": "Faithfulness", "chain_terminology": "Chain Terminology", } def load_pairs(domain: str | None = None, client: str | None = None) -> list[dict]: data = yaml.safe_load(DATASET_PATH.read_text()) pairs = data["pairs"] if domain: pairs = [p for p in pairs if p["domain"] == domain] if client: pairs = [p for p in pairs if p["client"] == client] return pairs def score_pair(pair: dict, hf_client: InferenceClient) -> dict: """Run one golden pair through the pipeline and return scored result.""" result = run( query=pair["question"], client=pair["client"], hf_client=hf_client, ) payload = result.response_payload metrics = payload["evaluation"]["metrics"] expected = pair.get("expected_contains", []) answer_lower = result.answer.lower() matched = [kw for kw in expected if kw.lower() in answer_lower] keyphrase_coverage = len(matched) / len(expected) if expected else 1.0 return { "id": pair["id"], "client": pair["client"], "domain": pair["domain"], "question": pair["question"], "answer": result.answer, "keyphrase_coverage": round(keyphrase_coverage, 3), "matched_keyphrases": matched, "missing_keyphrases": [kw for kw in expected if kw not in matched], "metrics": metrics, "overall_pass": payload["evaluation"]["overall_pass"], "sources": [s["title"] for s in payload["sources"]], "notes": pair.get("notes", ""), } def print_summary(results: list[dict]) -> None: metric_names = list(results[0]["metrics"].keys()) if results else [] total = len(results) passed = sum(1 for r in results if r["overall_pass"]) log.info("\n── Summary ─────────────────────────────────────") log.info("Pairs evaluated : %d", total) log.info("Overall pass : %d / %d (%.0f%%)", passed, total, 100 * passed / total if total else 0) log.info("\n── Per-metric pass rate ────────────────────────") for name in metric_names: n_pass = sum(1 for r in results if r["metrics"][name]["passed"]) avg_score = sum(r["metrics"][name]["score"] for r in results) / total if total else 0 log.info(" %-22s %d/%d avg %.2f", name, n_pass, total, avg_score) log.info("\n── Keyphrase coverage ──────────────────────────") avg_cov = sum(r["keyphrase_coverage"] for r in results) / total if total else 0 log.info(" Average coverage: %.0f%%", avg_cov * 100) failures = [r for r in results if not r["overall_pass"]] if failures: log.info("\n── Failed pairs ────────────────────────────────") for r in failures: failed_metrics = [m for m, v in r["metrics"].items() if not v["passed"]] log.info(" [%s] %s", r["id"], ", ".join(failed_metrics)) # --------------------------------------------------------------------------- # HTML report # --------------------------------------------------------------------------- def _score_class(score: float, metric: str) -> str: if metric == "pii_leakage": return "pass" if score == 1.0 else "fail" if score >= 0.75: return "pass" if score >= 0.45: return "warn" return "fail" def _metric_cards(metrics: dict) -> str: cards = [] for name, m in metrics.items(): cls = _score_class(m["score"], name) pct = round(m["score"] * 100) label = METRIC_LABELS.get(name, name) cards.append(f"""