"""Generate markdown benchmark report from evaluation results.""" from __future__ import annotations from collections import defaultdict from pathlib import Path import yaml from agent_bench.evaluation.harness import EvalResult def generate_report( results: list[EvalResult], config_dict: dict | None = None, provider_name: str = "unknown", corpus_size: int = 0, ) -> str: """Generate a markdown benchmark report.""" lines: list[str] = [] lines.append("# Benchmark Results — Technical Documentation Q&A") lines.append("") if "mock" in provider_name.lower(): lines.append( "> **Note:** This report was generated with MockProvider (deterministic, " "no API key). Metrics reflect the evaluation infrastructure, not real " "LLM performance. Run `make evaluate-fast` with a real provider for " "production numbers." ) lines.append("") lines.append(f"**Provider:** {provider_name} | **Corpus:** {corpus_size} markdown files") lines.append("") # --- Aggregate metrics --- lines.append("## Aggregate Metrics") lines.append("") positive = [r for r in results if r.category != "out_of_scope"] negative = [r for r in results if r.category == "out_of_scope"] calc_qs = [r for r in results if r.category == "calculation"] avg_p5 = _safe_avg([r.retrieval_precision for r in positive]) avg_r5 = _safe_avg([r.retrieval_recall for r in positive]) avg_khr = _safe_avg([r.keyword_hit_rate for r in positive]) source_rate = sum(1 for r in positive if r.has_source_citation) avg_citation = _safe_avg([r.citation_accuracy for r in positive]) refusal_rate = sum(1 for r in negative if r.grounded_refusal) calc_correct = sum(1 for r in calc_qs if r.calculator_used_correctly) latencies = sorted([r.latency_ms for r in results]) p50 = _percentile(latencies, 50) p95 = _percentile(latencies, 95) total_cost = sum(r.tokens_used.estimated_cost_usd for r in results) avg_cost = total_cost / max(len(results), 1) # Optional groundedness (replaces continuous faithfulness in v1). # Discrete-anchored binary 0/1; abstain ('Unknown' score) is excluded # from the average. The float() cast narrows ScoreResult.score from # `int | Literal["Unknown"]` to float for _safe_avg — abstained=False # already guarantees the value is int but mypy doesn't propagate that. grounded_scores: list[float] = [ float(r.judge_scores["groundedness"].score) # type: ignore[arg-type] for r in positive if "groundedness" in r.judge_scores and not r.judge_scores["groundedness"].abstained ] avg_grounded = _safe_avg(grounded_scores) if grounded_scores else None lines.append("| Metric | Value |") lines.append("|--------|-------|") lines.append(f"| Retrieval P@5 | {avg_p5:.2f} |") lines.append(f"| Retrieval R@5 | {avg_r5:.2f} |") lines.append(f"| Keyword Hit Rate | {avg_khr:.2f} |") lines.append(f"| Source Citation Rate | {source_rate}/{len(positive)} |") lines.append(f"| Citation Accuracy | {avg_citation:.2f} |") lines.append(f"| Grounded Refusal Rate | {refusal_rate}/{len(negative)} |") lines.append(f"| Calculator Accuracy | {calc_correct}/{len(calc_qs)} |") if avg_grounded is not None: lines.append(f"| Answer Groundedness (LLM judge) | {avg_grounded:.2f} |") lines.append(f"| Latency p50 | {p50:,.0f} ms |") lines.append(f"| Latency p95 | {p95:,.0f} ms |") lines.append(f"| Cost per query | ${avg_cost:.4f} |") lines.append("") # --- By Category --- lines.append("## By Category") lines.append("") lines.append("| Category | Count | P@5 | R@5 | Keyword Hit | Refusal |") lines.append("|----------|-------|-----|-----|-------------|---------|") by_cat: dict[str, list[EvalResult]] = defaultdict(list) for r in results: by_cat[r.category].append(r) for cat in ["retrieval", "calculation", "out_of_scope"]: cat_results = by_cat.get(cat, []) if not cat_results: continue count = len(cat_results) if cat == "out_of_scope": ref_count = sum(1 for r in cat_results if r.grounded_refusal) lines.append(f"| {cat} | {count} | n/a | n/a | n/a | {ref_count}/{count} |") else: cp5 = _safe_avg([r.retrieval_precision for r in cat_results]) cr5 = _safe_avg([r.retrieval_recall for r in cat_results]) ckhr = _safe_avg([r.keyword_hit_rate for r in cat_results]) lines.append(f"| {cat} | {count} | {cp5:.2f} | {cr5:.2f} | {ckhr:.2f} | n/a |") lines.append("") # --- By Difficulty --- lines.append("## By Difficulty") lines.append("") lines.append("| Difficulty | Count | P@5 | R@5 | Keyword Hit |") lines.append("|-----------|-------|-----|-----|-------------|") by_diff: dict[str, list[EvalResult]] = defaultdict(list) for r in results: by_diff[r.difficulty].append(r) for diff in ["easy", "medium", "hard"]: diff_results = by_diff.get(diff, []) if not diff_results: continue pos_only = [r for r in diff_results if r.category != "out_of_scope"] if not pos_only: lines.append(f"| {diff} | {len(diff_results)} | n/a | n/a | n/a |") continue dp5 = _safe_avg([r.retrieval_precision for r in pos_only]) dr5 = _safe_avg([r.retrieval_recall for r in pos_only]) dkhr = _safe_avg([r.keyword_hit_rate for r in pos_only]) lines.append(f"| {diff} | {len(diff_results)} | {dp5:.2f} | {dr5:.2f} | {dkhr:.2f} |") lines.append("") # --- Chunking strategy comparison --- lines.append("## Chunking Strategy Comparison") lines.append("") lines.append("| Strategy | Note |") lines.append("|----------|------|") lines.append("| Recursive (default) | Used for this benchmark run |") lines.append( "| Fixed-size | Available via `--chunk-strategy fixed` in ingest. " "Re-run evaluation to compare. |" ) lines.append("") lines.append( "_To generate a comparison, run `make ingest` with each strategy " "and `make evaluate-fast` for each, then compare the results JSON files._" ) lines.append("") # --- Failure analysis (3 worst by retrieval precision) --- lines.append("## Failure Analysis (3 worst queries)") lines.append("") scorable = [r for r in results if r.category != "out_of_scope"] worst = sorted(scorable, key=lambda r: r.retrieval_precision)[:3] for r in worst: lines.append(f'**{r.question_id}: "{r.question}"**') lines.append(f"- Retrieval P@5: {r.retrieval_precision:.2f}") lines.append(f"- Retrieval R@5: {r.retrieval_recall:.2f}") lines.append(f"- Keyword Hit Rate: {r.keyword_hit_rate:.2f}") lines.append(f"- Retrieved: {r.retrieved_sources[:3]}") is_mock = "mock" in provider_name.lower() if is_mock and r.retrieval_precision == 0.0 and r.keyword_hit_rate > 0.5: lines.append( "- Root cause: MockProvider returned canned answer — " "retrieval worked but answer text doesn't match expected sources" ) elif is_mock and r.retrieval_precision == 0.0: lines.append( "- Root cause: MockProvider canned response does not target " "this question's expected sources" ) elif r.retrieval_precision == 0.0: lines.append( "- Root cause: Retrieved sources did not match expected sources" ) else: lines.append("- Root cause: _(manual analysis needed)_") lines.append("") # --- Per-question detail --- lines.append("## Per-Question Results") lines.append("") lines.append("| ID | Cat | Diff | P@5 | R@5 | KHR | Citation | Refusal | Calc |") lines.append("|----|-----|------|-----|-----|-----|----------|---------|------|") for r in results: p5 = f"{r.retrieval_precision:.2f}" if r.category != "out_of_scope" else "n/a" r5 = f"{r.retrieval_recall:.2f}" if r.category != "out_of_scope" else "n/a" khr = f"{r.keyword_hit_rate:.2f}" cit = f"{r.citation_accuracy:.2f}" if r.category != "out_of_scope" else "n/a" ref = "PASS" if r.grounded_refusal else "FAIL" calc = "PASS" if r.calculator_used_correctly else "FAIL" lines.append( f"| {r.question_id} | {r.category} | {r.difficulty} " f"| {p5} | {r5} | {khr} | {cit} | {ref} | {calc} |" ) lines.append("") # --- Config snapshot --- if config_dict: lines.append("## Configuration Snapshot") lines.append("") lines.append("```yaml") lines.append(yaml.dump(config_dict, default_flow_style=False).strip()) lines.append("```") lines.append("") return "\n".join(lines) def save_report(report: str, path: str | Path) -> None: """Write report to file.""" p = Path(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(report + "\n") def _safe_avg(values: list[float]) -> float: if not values: return 0.0 return sum(values) / len(values) def _percentile(sorted_values: list[float], p: float) -> float: if not sorted_values: return 0.0 idx = int(len(sorted_values) * p / 100) idx = min(idx, len(sorted_values) - 1) return sorted_values[idx]