Spaces:
Running
Running
| """Generate markdown benchmark report from evaluation results.""" | |
| from __future__ import annotations | |
| from collections import defaultdict | |
| from pathlib import Path | |
| import yaml | |
| from agent_bench.evaluation.harness import EvalResult | |
| def generate_report( | |
| results: list[EvalResult], | |
| config_dict: dict | None = None, | |
| provider_name: str = "unknown", | |
| corpus_size: int = 0, | |
| ) -> str: | |
| """Generate a markdown benchmark report.""" | |
| lines: list[str] = [] | |
| lines.append("# Benchmark Results — Technical Documentation Q&A") | |
| lines.append("") | |
| if "mock" in provider_name.lower(): | |
| lines.append( | |
| "> **Note:** This report was generated with MockProvider (deterministic, " | |
| "no API key). Metrics reflect the evaluation infrastructure, not real " | |
| "LLM performance. Run `make evaluate-fast` with a real provider for " | |
| "production numbers." | |
| ) | |
| lines.append("") | |
| lines.append(f"**Provider:** {provider_name} | **Corpus:** {corpus_size} markdown files") | |
| lines.append("") | |
| # --- Aggregate metrics --- | |
| lines.append("## Aggregate Metrics") | |
| lines.append("") | |
| positive = [r for r in results if r.category != "out_of_scope"] | |
| negative = [r for r in results if r.category == "out_of_scope"] | |
| calc_qs = [r for r in results if r.category == "calculation"] | |
| avg_p5 = _safe_avg([r.retrieval_precision for r in positive]) | |
| avg_r5 = _safe_avg([r.retrieval_recall for r in positive]) | |
| avg_khr = _safe_avg([r.keyword_hit_rate for r in positive]) | |
| source_rate = sum(1 for r in positive if r.has_source_citation) | |
| avg_citation = _safe_avg([r.citation_accuracy for r in positive]) | |
| refusal_rate = sum(1 for r in negative if r.grounded_refusal) | |
| calc_correct = sum(1 for r in calc_qs if r.calculator_used_correctly) | |
| latencies = sorted([r.latency_ms for r in results]) | |
| p50 = _percentile(latencies, 50) | |
| p95 = _percentile(latencies, 95) | |
| total_cost = sum(r.tokens_used.estimated_cost_usd for r in results) | |
| avg_cost = total_cost / max(len(results), 1) | |
| # Optional groundedness (replaces continuous faithfulness in v1). | |
| # Discrete-anchored binary 0/1; abstain ('Unknown' score) is excluded | |
| # from the average. The float() cast narrows ScoreResult.score from | |
| # `int | Literal["Unknown"]` to float for _safe_avg — abstained=False | |
| # already guarantees the value is int but mypy doesn't propagate that. | |
| grounded_scores: list[float] = [ | |
| float(r.judge_scores["groundedness"].score) # type: ignore[arg-type] | |
| for r in positive | |
| if "groundedness" in r.judge_scores | |
| and not r.judge_scores["groundedness"].abstained | |
| ] | |
| avg_grounded = _safe_avg(grounded_scores) if grounded_scores else None | |
| lines.append("| Metric | Value |") | |
| lines.append("|--------|-------|") | |
| lines.append(f"| Retrieval P@5 | {avg_p5:.2f} |") | |
| lines.append(f"| Retrieval R@5 | {avg_r5:.2f} |") | |
| lines.append(f"| Keyword Hit Rate | {avg_khr:.2f} |") | |
| lines.append(f"| Source Citation Rate | {source_rate}/{len(positive)} |") | |
| lines.append(f"| Citation Accuracy | {avg_citation:.2f} |") | |
| lines.append(f"| Grounded Refusal Rate | {refusal_rate}/{len(negative)} |") | |
| lines.append(f"| Calculator Accuracy | {calc_correct}/{len(calc_qs)} |") | |
| if avg_grounded is not None: | |
| lines.append(f"| Answer Groundedness (LLM judge) | {avg_grounded:.2f} |") | |
| lines.append(f"| Latency p50 | {p50:,.0f} ms |") | |
| lines.append(f"| Latency p95 | {p95:,.0f} ms |") | |
| lines.append(f"| Cost per query | ${avg_cost:.4f} |") | |
| lines.append("") | |
| # --- By Category --- | |
| lines.append("## By Category") | |
| lines.append("") | |
| lines.append("| Category | Count | P@5 | R@5 | Keyword Hit | Refusal |") | |
| lines.append("|----------|-------|-----|-----|-------------|---------|") | |
| by_cat: dict[str, list[EvalResult]] = defaultdict(list) | |
| for r in results: | |
| by_cat[r.category].append(r) | |
| for cat in ["retrieval", "calculation", "out_of_scope"]: | |
| cat_results = by_cat.get(cat, []) | |
| if not cat_results: | |
| continue | |
| count = len(cat_results) | |
| if cat == "out_of_scope": | |
| ref_count = sum(1 for r in cat_results if r.grounded_refusal) | |
| lines.append(f"| {cat} | {count} | n/a | n/a | n/a | {ref_count}/{count} |") | |
| else: | |
| cp5 = _safe_avg([r.retrieval_precision for r in cat_results]) | |
| cr5 = _safe_avg([r.retrieval_recall for r in cat_results]) | |
| ckhr = _safe_avg([r.keyword_hit_rate for r in cat_results]) | |
| lines.append(f"| {cat} | {count} | {cp5:.2f} | {cr5:.2f} | {ckhr:.2f} | n/a |") | |
| lines.append("") | |
| # --- By Difficulty --- | |
| lines.append("## By Difficulty") | |
| lines.append("") | |
| lines.append("| Difficulty | Count | P@5 | R@5 | Keyword Hit |") | |
| lines.append("|-----------|-------|-----|-----|-------------|") | |
| by_diff: dict[str, list[EvalResult]] = defaultdict(list) | |
| for r in results: | |
| by_diff[r.difficulty].append(r) | |
| for diff in ["easy", "medium", "hard"]: | |
| diff_results = by_diff.get(diff, []) | |
| if not diff_results: | |
| continue | |
| pos_only = [r for r in diff_results if r.category != "out_of_scope"] | |
| if not pos_only: | |
| lines.append(f"| {diff} | {len(diff_results)} | n/a | n/a | n/a |") | |
| continue | |
| dp5 = _safe_avg([r.retrieval_precision for r in pos_only]) | |
| dr5 = _safe_avg([r.retrieval_recall for r in pos_only]) | |
| dkhr = _safe_avg([r.keyword_hit_rate for r in pos_only]) | |
| lines.append(f"| {diff} | {len(diff_results)} | {dp5:.2f} | {dr5:.2f} | {dkhr:.2f} |") | |
| lines.append("") | |
| # --- Chunking strategy comparison --- | |
| lines.append("## Chunking Strategy Comparison") | |
| lines.append("") | |
| lines.append("| Strategy | Note |") | |
| lines.append("|----------|------|") | |
| lines.append("| Recursive (default) | Used for this benchmark run |") | |
| lines.append( | |
| "| Fixed-size | Available via `--chunk-strategy fixed` in ingest. " | |
| "Re-run evaluation to compare. |" | |
| ) | |
| lines.append("") | |
| lines.append( | |
| "_To generate a comparison, run `make ingest` with each strategy " | |
| "and `make evaluate-fast` for each, then compare the results JSON files._" | |
| ) | |
| lines.append("") | |
| # --- Failure analysis (3 worst by retrieval precision) --- | |
| lines.append("## Failure Analysis (3 worst queries)") | |
| lines.append("") | |
| scorable = [r for r in results if r.category != "out_of_scope"] | |
| worst = sorted(scorable, key=lambda r: r.retrieval_precision)[:3] | |
| for r in worst: | |
| lines.append(f'**{r.question_id}: "{r.question}"**') | |
| lines.append(f"- Retrieval P@5: {r.retrieval_precision:.2f}") | |
| lines.append(f"- Retrieval R@5: {r.retrieval_recall:.2f}") | |
| lines.append(f"- Keyword Hit Rate: {r.keyword_hit_rate:.2f}") | |
| lines.append(f"- Retrieved: {r.retrieved_sources[:3]}") | |
| is_mock = "mock" in provider_name.lower() | |
| if is_mock and r.retrieval_precision == 0.0 and r.keyword_hit_rate > 0.5: | |
| lines.append( | |
| "- Root cause: MockProvider returned canned answer — " | |
| "retrieval worked but answer text doesn't match expected sources" | |
| ) | |
| elif is_mock and r.retrieval_precision == 0.0: | |
| lines.append( | |
| "- Root cause: MockProvider canned response does not target " | |
| "this question's expected sources" | |
| ) | |
| elif r.retrieval_precision == 0.0: | |
| lines.append( | |
| "- Root cause: Retrieved sources did not match expected sources" | |
| ) | |
| else: | |
| lines.append("- Root cause: _(manual analysis needed)_") | |
| lines.append("") | |
| # --- Per-question detail --- | |
| lines.append("## Per-Question Results") | |
| lines.append("") | |
| lines.append("| ID | Cat | Diff | P@5 | R@5 | KHR | Citation | Refusal | Calc |") | |
| lines.append("|----|-----|------|-----|-----|-----|----------|---------|------|") | |
| for r in results: | |
| p5 = f"{r.retrieval_precision:.2f}" if r.category != "out_of_scope" else "n/a" | |
| r5 = f"{r.retrieval_recall:.2f}" if r.category != "out_of_scope" else "n/a" | |
| khr = f"{r.keyword_hit_rate:.2f}" | |
| cit = f"{r.citation_accuracy:.2f}" if r.category != "out_of_scope" else "n/a" | |
| ref = "PASS" if r.grounded_refusal else "FAIL" | |
| calc = "PASS" if r.calculator_used_correctly else "FAIL" | |
| lines.append( | |
| f"| {r.question_id} | {r.category} | {r.difficulty} " | |
| f"| {p5} | {r5} | {khr} | {cit} | {ref} | {calc} |" | |
| ) | |
| lines.append("") | |
| # --- Config snapshot --- | |
| if config_dict: | |
| lines.append("## Configuration Snapshot") | |
| lines.append("") | |
| lines.append("```yaml") | |
| lines.append(yaml.dump(config_dict, default_flow_style=False).strip()) | |
| lines.append("```") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def save_report(report: str, path: str | Path) -> None: | |
| """Write report to file.""" | |
| p = Path(path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| p.write_text(report + "\n") | |
| def _safe_avg(values: list[float]) -> float: | |
| if not values: | |
| return 0.0 | |
| return sum(values) / len(values) | |
| def _percentile(sorted_values: list[float], p: float) -> float: | |
| if not sorted_values: | |
| return 0.0 | |
| idx = int(len(sorted_values) * p / 100) | |
| idx = min(idx, len(sorted_values) - 1) | |
| return sorted_values[idx] | |