Spaces:
Sleeping
Sleeping
| """Calibration runner: generate-outputs | run-judges | build-table. | |
| Orchestrates Steps A, C, D from the design doc's data flow. Step B | |
| (hand-labeling) is manual — done in a Jupyter notebook reading | |
| results/calibration_v1_system_outputs.json and appending to | |
| measurements/2026-05-04-judge-calibration-labels.jsonl. | |
| Examples: | |
| python scripts/run_calibration.py generate-outputs --concurrency 5 | |
| python scripts/run_calibration.py run-judges --row-config=configs/calibration/rows/baseline.yaml | |
| python scripts/run_calibration.py build-table | |
| python scripts/run_calibration.py build-table --strict | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import structlog | |
| import yaml | |
| logger = structlog.get_logger() | |
| REPO = Path(__file__).resolve().parents[1] | |
| CALIBRATION_SPEC = REPO / "agent_bench/evaluation/datasets/calibration_v1.json" | |
| SYSTEM_OUTPUTS = REPO / "results/calibration_v1_system_outputs.json" | |
| LABELS_PATH = REPO / "measurements/2026-05-04-judge-calibration-labels.jsonl" | |
| KAPPA_TABLE_OUT = REPO / "docs/_generated/kappa_table.md" | |
| def _resolve_concurrency(cli_value: int | None) -> int: | |
| """CLI flag overrides config field; default is 5. Logs the resolved value.""" | |
| if cli_value is not None: | |
| resolved = cli_value | |
| else: | |
| cfg_path = REPO / "configs/default.yaml" | |
| cfg_concurrency = None | |
| if cfg_path.exists(): | |
| cfg = yaml.safe_load(cfg_path.read_text()) or {} | |
| cfg_concurrency = (cfg.get("evaluation", {}) or {}).get( | |
| "calibration_concurrency" | |
| ) | |
| resolved = cfg_concurrency if cfg_concurrency is not None else 5 | |
| logger.info("calibration_concurrency_resolved", value=resolved) | |
| return resolved | |
| # --- Subcommand: generate-outputs (Step A) --- | |
| def _build_corpus_orchestrator(cfg, corpus_name: str, embedder, provider): | |
| """Build a per-corpus Orchestrator wired to that corpus's HybridStore. | |
| Mirrors the per-corpus construction in scripts/evaluate.py so calibration | |
| runs use the same retrieval stack as production evaluation. The embedder | |
| and provider are shared across corpora — only the store/retriever/ | |
| SearchTool differ. | |
| """ | |
| from agent_bench.agents.orchestrator import Orchestrator | |
| from agent_bench.rag.retriever import Retriever | |
| from agent_bench.rag.store import HybridStore | |
| from agent_bench.tools.calculator import CalculatorTool | |
| from agent_bench.tools.registry import ToolRegistry | |
| from agent_bench.tools.search import SearchTool | |
| corpus_cfg = cfg.corpora[corpus_name] | |
| store = HybridStore.load(corpus_cfg.store_path, rrf_k=cfg.rag.retrieval.rrf_k) | |
| reranker = None | |
| if cfg.rag.reranker.enabled: | |
| from agent_bench.rag.reranker import CrossEncoderReranker | |
| reranker = CrossEncoderReranker(model_name=cfg.rag.reranker.model_name) | |
| retriever = Retriever( | |
| embedder=embedder, | |
| store=store, | |
| default_strategy=cfg.rag.retrieval.strategy, | |
| candidates_per_system=cfg.rag.retrieval.candidates_per_system, | |
| reranker=reranker, | |
| reranker_top_k=cfg.rag.reranker.top_k, | |
| ) | |
| registry = ToolRegistry() | |
| registry.register( | |
| SearchTool( | |
| retriever=retriever, | |
| default_top_k=cfg.rag.retrieval.top_k, | |
| refusal_threshold=corpus_cfg.refusal_threshold, | |
| ) | |
| ) | |
| registry.register(CalculatorTool()) | |
| return Orchestrator( | |
| provider=provider, | |
| registry=registry, | |
| max_iterations=cfg.agent.max_iterations, | |
| temperature=cfg.agent.temperature, | |
| ) | |
| async def cmd_generate_outputs(concurrency: int) -> None: | |
| """Run the orchestrator against the 30 calibration items with a frozen | |
| configuration; write results/calibration_v1_system_outputs.json. | |
| The calibration spec is mixed-corpus (k8s + fastapi). Each item carries a | |
| `corpus` field; we build one Orchestrator per corpus and route by that | |
| field. A KeyError on an unrecognized corpus is preferable to silently | |
| misrouting an item to the wrong store. | |
| """ | |
| from agent_bench.core.config import load_config | |
| from agent_bench.core.provider import AnthropicProvider | |
| from agent_bench.evaluation.harness import load_golden_dataset | |
| from agent_bench.rag.embedder import Embedder | |
| spec = json.loads(CALIBRATION_SPEC.read_text()) | |
| target_ids = {i["id"]: i for i in spec["items"]} | |
| fastapi = load_golden_dataset( | |
| REPO / "agent_bench/evaluation/datasets/tech_docs_golden.json" | |
| ) | |
| k8s = load_golden_dataset( | |
| REPO / "agent_bench/evaluation/datasets/k8s_golden.json" | |
| ) | |
| items = [q for q in (fastapi + k8s) if q.id in target_ids] | |
| if len(items) != len(target_ids): | |
| missing = set(target_ids) - {q.id for q in items} | |
| raise SystemExit( | |
| f"calibration items not found in goldens: {sorted(missing)}" | |
| ) | |
| cfg = load_config() | |
| provider = AnthropicProvider(cfg) | |
| embedder = Embedder(model_name=cfg.embedding.model, cache_dir=cfg.embedding.cache_dir) | |
| item_corpus = {it.id: target_ids[it.id]["corpus"] for it in items} | |
| unknown: dict[str, list[str]] = {} | |
| for it_id, corpus in item_corpus.items(): | |
| if corpus not in cfg.corpora: | |
| unknown.setdefault(corpus, []).append(it_id) | |
| if unknown: | |
| examples = "; ".join( | |
| f"{cor!r}: {sorted(ids)[:3]}" for cor, ids in sorted(unknown.items()) | |
| ) | |
| raise KeyError( | |
| f"calibration spec references corpora not in cfg.corpora — " | |
| f"{examples}; configured corpora: {sorted(cfg.corpora)!r}" | |
| ) | |
| corpora_needed = sorted(set(item_corpus.values())) | |
| orchestrators = { | |
| name: _build_corpus_orchestrator(cfg, name, embedder, provider) | |
| for name in corpora_needed | |
| } | |
| sem = asyncio.Semaphore(concurrency) | |
| async def _run_one(item): | |
| async with sem: | |
| response = await orchestrators[item_corpus[item.id]].run( | |
| question=item.question, | |
| system_prompt="You are a helpful assistant.", | |
| ) | |
| answer = response.answer | |
| sources = sorted(s.source for s in response.sources) | |
| sys_hash = hashlib.sha256( | |
| f"{item.id}\x00{answer}\x00{','.join(sources)}".encode("utf-8") | |
| ).hexdigest() | |
| return { | |
| "item_id": item.id, | |
| "question": item.question, | |
| "category": item.category, | |
| "answer": answer, | |
| "sources": [s.source for s in response.sources], | |
| "ranked_sources": response.ranked_sources, | |
| "source_chunks": response.source_chunks, | |
| "source_snippets": item.source_snippets, | |
| "reference_answer": item.reference_answer, | |
| "system_output_hash": sys_hash, | |
| "stratum": target_ids[item.id]["stratum"], | |
| "corpus": target_ids[item.id]["corpus"], | |
| } | |
| records = await asyncio.gather(*[_run_one(it) for it in items]) | |
| SYSTEM_OUTPUTS.parent.mkdir(parents=True, exist_ok=True) | |
| SYSTEM_OUTPUTS.write_text(json.dumps(records, indent=2) + "\n") | |
| logger.info( | |
| "generate_outputs_complete", count=len(records), path=str(SYSTEM_OUTPUTS) | |
| ) | |
| # --- Subcommand: run-judges (Step C, one row per invocation) --- | |
| def _make_provider(name: str, cfg, *, model: str | None = None): | |
| from agent_bench.core.provider import AnthropicProvider, OpenAIProvider | |
| if name == "anthropic": | |
| return AnthropicProvider(cfg, model=model) | |
| if name == "openai": | |
| return OpenAIProvider(cfg, model=model) | |
| raise ValueError(f"unknown provider: {name}") | |
| def _make_judge( | |
| provider_name: str, | |
| model_id: str, | |
| dimension: str, | |
| cfg, | |
| *, | |
| use_cot: bool = True, | |
| use_anchors: bool = True, | |
| abstain_allowed_override: bool | None = None, | |
| ): | |
| from agent_bench.evaluation.judges.base import Rubric | |
| from agent_bench.evaluation.judges.citation_faithfulness import ( | |
| CitationFaithfulnessJudge, | |
| ) | |
| from agent_bench.evaluation.judges.completeness import CompletenessJudge | |
| from agent_bench.evaluation.judges.groundedness import GroundednessJudge | |
| from agent_bench.evaluation.judges.relevance import RelevanceJudge | |
| judge_class = { | |
| "groundedness": GroundednessJudge, | |
| "relevance": RelevanceJudge, | |
| "completeness": CompletenessJudge, | |
| "citation_faithfulness": CitationFaithfulnessJudge, | |
| } | |
| rubric_dir = REPO / "agent_bench/evaluation/rubrics" | |
| rubric = Rubric.from_markdown_file(rubric_dir / f"{dimension}.md") | |
| if not use_anchors: | |
| # Strip ### Example sections — body_markdown changes, so | |
| # ScoreResult.rubric_version naturally distinguishes anchored vs | |
| # stripped variants when the calibration report buckets results. | |
| rubric = rubric.strip_anchors() | |
| return judge_class[dimension]( | |
| judge_provider=_make_provider(provider_name, cfg, model=model_id), | |
| rubric=rubric, | |
| model_id=model_id, | |
| use_cot=use_cot, | |
| abstain_allowed_override=abstain_allowed_override, | |
| ) | |
| def _row_judge_options(row: dict) -> dict: | |
| """Pull `options` from a row config and project to _make_judge kwargs. | |
| Defaults (when keys are missing) match the baseline contract: CoT on, | |
| anchors on, abstain follows the rubric (no override). | |
| """ | |
| opts = row.get("options") or {} | |
| abstain_allowed = opts.get("abstain_allowed") | |
| return { | |
| "use_cot": bool(opts.get("use_cot", True)), | |
| "use_anchors": bool(opts.get("use_anchors", True)), | |
| # None = follow rubric; explicit True/False = override | |
| "abstain_allowed_override": ( | |
| None if abstain_allowed is None else bool(abstain_allowed) | |
| ), | |
| } | |
| def _build_item_and_output(rec: dict): | |
| from agent_bench.agents.orchestrator import AgentResponse, SourceReference | |
| from agent_bench.core.types import TokenUsage | |
| from agent_bench.evaluation.harness import GoldenQuestion | |
| item = GoldenQuestion( | |
| id=rec["item_id"], | |
| question=rec["question"], | |
| expected_answer_keywords=[], | |
| expected_sources=[], | |
| category=rec["category"], | |
| difficulty="easy", | |
| requires_calculator=False, | |
| source_snippets=rec.get("source_snippets", []), | |
| reference_answer=rec.get("reference_answer", ""), | |
| ) | |
| output = AgentResponse( | |
| answer=rec["answer"], | |
| sources=[SourceReference(source=s) for s in rec["sources"]], | |
| ranked_sources=rec.get("ranked_sources", []), | |
| source_chunks=rec.get("source_chunks", []), | |
| iterations=1, | |
| usage=TokenUsage(input_tokens=0, output_tokens=0, estimated_cost_usd=0), | |
| latency_ms=0, | |
| ) | |
| return item, output | |
| async def cmd_run_judges(row_config_path: Path, concurrency: int) -> None: | |
| """Score the frozen system outputs with the row's judge configuration.""" | |
| from agent_bench.core.config import load_config | |
| from agent_bench.evaluation.variance.jury import jury | |
| from agent_bench.evaluation.variance.rubric_permute import rubric_permute | |
| if not SYSTEM_OUTPUTS.exists(): | |
| raise SystemExit( | |
| f"{SYSTEM_OUTPUTS} not found — run `generate-outputs` first." | |
| ) | |
| row = yaml.safe_load(row_config_path.read_text()) | |
| outputs = json.loads(SYSTEM_OUTPUTS.read_text()) | |
| cfg = load_config() | |
| sem = asyncio.Semaphore(concurrency) | |
| all_results: list[dict] = [] | |
| strategy = row["strategy"] | |
| def _skip_oos(rec: dict, dim: str) -> bool: | |
| return rec["category"] == "out_of_scope" and dim != "relevance" | |
| judge_opts = _row_judge_options(row) | |
| if strategy == "single": | |
| # Build one judge per dimension up-front, then gather all | |
| # (dim, item) pairs in a single asyncio.gather call. Previous | |
| # design serialized across dimensions (each dim awaited fully | |
| # before the next started), leaving Phase-11 wall-clock on the | |
| # table when the calibration spend is API-rate-limited. | |
| judges_by_dim = { | |
| dim: _make_judge( | |
| row["provider"], row["model_id"], dim, cfg, **judge_opts | |
| ) | |
| for dim in row["dimensions"] | |
| } | |
| async def score_one(rec: dict, dim: str, judge): | |
| async with sem: | |
| if _skip_oos(rec, dim): | |
| return None | |
| item, output = _build_item_and_output(rec) | |
| result = await judge.score(item, output) | |
| return {"item_id": rec["item_id"], "dimension": dim, **result.model_dump()} | |
| coros = [ | |
| score_one(rec, dim, judge) | |
| for dim, judge in judges_by_dim.items() | |
| for rec in outputs | |
| ] | |
| gathered = await asyncio.gather(*coros) | |
| all_results.extend([r for r in gathered if r is not None]) | |
| elif strategy == "rubric_permute": | |
| # Sequential per-item by design: PermutedJudge writes to the | |
| # sidecar JSONL with append mode and within-call ordering matters | |
| # for downstream per-permutation analysis (the kappa_table joins | |
| # by item_id but the sidecar order encodes the permutation seed | |
| # sequence). Across-dim parallelism is left for v1.1 once the | |
| # sidecar contract proves stable. | |
| for dim in row["dimensions"]: | |
| judge = _make_judge( | |
| row["provider"], row["model_id"], dim, cfg, **judge_opts | |
| ) | |
| sidecar = REPO / row.get( | |
| "sidecar_path", "results/calibration_v1_permute_members.jsonl" | |
| ) | |
| permuted = rubric_permute( | |
| judge, | |
| n=row["options"]["n_permutations"], | |
| seeds=row["options"]["seeds"], | |
| sidecar_path=sidecar, | |
| ) | |
| for rec in outputs: | |
| if _skip_oos(rec, dim): | |
| continue | |
| item, output = _build_item_and_output(rec) | |
| result = await permuted.score(item, output) | |
| all_results.append({"item_id": rec["item_id"], "dimension": dim, **result.model_dump()}) | |
| elif strategy == "jury": | |
| # Same sequential rationale as rubric_permute: jury writes a | |
| # per-member sidecar and downstream analysis benefits from stable | |
| # ordering. The asyncio.gather inside Jury.score does parallelize | |
| # member calls within an item; the across-item / across-dim | |
| # serialization is the conservative choice. | |
| for dim in row["dimensions"]: | |
| members = [ | |
| _make_judge(m["provider"], m["model_id"], dim, cfg, **judge_opts) | |
| for m in row["members"] | |
| ] | |
| sidecar = REPO / row["sidecar_path"] | |
| weights = ( | |
| _compute_kappa_weights( | |
| REPO / row["weights_source"], | |
| dim, | |
| expected_judge_ids={m.judge_id for m in members}, | |
| ) | |
| if row.get("aggregation") == "kappa_weighted" | |
| else None | |
| ) | |
| j = jury( | |
| judges=members, | |
| aggregation=row["aggregation"], | |
| weights=weights, | |
| quorum=row.get("quorum"), | |
| sidecar_path=sidecar, | |
| ) | |
| for rec in outputs: | |
| if _skip_oos(rec, dim): | |
| continue | |
| item, output = _build_item_and_output(rec) | |
| result = await j.score(item, output) | |
| all_results.append({"item_id": rec["item_id"], "dimension": dim, **result.model_dump()}) | |
| else: | |
| raise SystemExit(f"unknown strategy: {strategy}") | |
| out_path = REPO / row["output_path"] | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| out_path.write_text(json.dumps(all_results, indent=2) + "\n") | |
| logger.info( | |
| "run_judges_complete", | |
| row=row["label"], | |
| count=len(all_results), | |
| path=str(out_path), | |
| ) | |
| def _compute_kappa_weights( | |
| predictions_path: Path, | |
| dimension: str, | |
| expected_judge_ids: set[str], | |
| ) -> dict[str, float]: | |
| """Compute per-judge weight = max(0, Cohen's κ vs gold labels) for the | |
| dimension, from a predictions file (JSON list or JSONL). | |
| v1.1 replaces v1's stub (which returned 1.0 for every judge_id seen, | |
| causing asymmetric coverage to amplify rather than suppress an | |
| unweighted member). Hard-errors if `predictions_path` is missing, | |
| if any `expected_judge_ids` member has no scored (non-abstain) | |
| predictions for `dimension`, or if no labels are available for the | |
| dimension. | |
| The κ → weight mapping clips negative κ to 0; a member with κ ≤ 0 on | |
| a dimension contributes weight 0 (effective exclusion via weighting). | |
| This is the "soft exclusion" behavior — explicit per-dimension | |
| exclusion is tracked separately on the v1.2 fix-list. | |
| Pragmatic v1.1: `predictions_path` may point at the same calibration | |
| set used for κ reporting (circular weighting); this is documented in | |
| the v1.1 jury-rescue DECISIONS entry. v1.2 will require a held-out | |
| validation set. | |
| """ | |
| from agent_bench.evaluation.calibration.metrics import cohen_kappa | |
| if not predictions_path.exists(): | |
| raise FileNotFoundError( | |
| f"weights source {predictions_path} does not exist; v1.1 " | |
| f"requires explicit κ-derived weights — no silent fallback" | |
| ) | |
| # Load predictions: JSON list (baseline-style) or JSONL (sidecar-style). | |
| raw = predictions_path.read_text() | |
| if predictions_path.suffix == ".jsonl": | |
| preds = [json.loads(line) for line in raw.splitlines() if line.strip()] | |
| else: | |
| preds = json.loads(raw) | |
| if not LABELS_PATH.exists(): | |
| raise FileNotFoundError( | |
| f"labels file {LABELS_PATH} does not exist; cannot compute " | |
| f"κ-derived weights" | |
| ) | |
| labels: dict[str, int] = {} | |
| for line in LABELS_PATH.read_text().splitlines(): | |
| if not line.strip(): | |
| continue | |
| rec = json.loads(line) | |
| if rec.get("dimension") != dimension or rec.get("abstained"): | |
| continue | |
| labels[rec["system_output_hash"]] = int(rec["score"]) | |
| if not labels: | |
| raise ValueError( | |
| f"no gold labels for dimension={dimension!r} in {LABELS_PATH}; " | |
| f"cannot compute κ-derived weights" | |
| ) | |
| # Group predictions by judge_id, joining to labels by system_output_hash. | |
| # The sidecar JSONL has one record per (judge × item × dim); the baseline | |
| # JSON has the same. Both expose `judge_id` of the form `{model}_{dim}`, | |
| # `system_output_hash`, `score`, and (for the abstain-aware filter) the | |
| # `Unknown` sentinel. | |
| by_judge: dict[str, list[tuple[int, int]]] = {} | |
| for p in preds: | |
| # JSONL sidecar lacks `dimension` field; we filter by suffix on | |
| # judge_id instead, which encodes dimension. | |
| if not p["judge_id"].endswith(f"_{dimension}"): | |
| continue | |
| if p["score"] == "Unknown": | |
| continue | |
| h = p["system_output_hash"] | |
| if h not in labels: | |
| continue | |
| by_judge.setdefault(p["judge_id"], []).append( | |
| (labels[h], int(p["score"])) | |
| ) | |
| missing = expected_judge_ids - by_judge.keys() | |
| if missing: | |
| raise ValueError( | |
| f"weights source {predictions_path} has no predictions for " | |
| f"expected judge_ids {sorted(missing)} on dimension={dimension!r}. " | |
| f"Source covers {sorted(by_judge.keys())}. v1.1 requires " | |
| f"symmetric coverage — point weights_source at a predictions " | |
| f"file containing every jury member's verdicts (e.g. the jury " | |
| f"sidecar from a prior run)." | |
| ) | |
| weights: dict[str, float] = {} | |
| for jid in expected_judge_ids: | |
| pairs = by_judge[jid] | |
| y_lab = [a for a, _ in pairs] | |
| y_pred = [b for _, b in pairs] | |
| kappa = cohen_kappa(y_lab, y_pred) | |
| weights[jid] = max(0.0, kappa) | |
| logger.info( | |
| "kappa_weight_computed", | |
| judge_id=jid, | |
| dimension=dimension, | |
| kappa=kappa, | |
| weight=weights[jid], | |
| n=len(pairs), | |
| ) | |
| return weights | |
| # --- Subcommand: build-table (Step D) --- | |
| def cmd_build_table(strict: bool) -> None: | |
| from agent_bench.evaluation.calibration.report import generate_kappa_table | |
| predictions_glob = str(REPO / "results/calibration_v1_judge_*.json") | |
| generate_kappa_table( | |
| predictions_glob=predictions_glob, | |
| labels_path=str(LABELS_PATH), | |
| output_path=str(KAPPA_TABLE_OUT), | |
| strict=strict, | |
| ) | |
| logger.info("build_table_complete", path=str(KAPPA_TABLE_OUT), strict=strict) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter | |
| ) | |
| sub = parser.add_subparsers(dest="cmd", required=True) | |
| p_gen = sub.add_parser( | |
| "generate-outputs", help="Step A: generate frozen system outputs" | |
| ) | |
| p_gen.add_argument("--concurrency", type=int, default=None) | |
| p_run = sub.add_parser("run-judges", help="Step C: score one ablation row") | |
| p_run.add_argument("--row-config", type=Path, required=True) | |
| p_run.add_argument("--concurrency", type=int, default=None) | |
| p_tab = sub.add_parser( | |
| "build-table", help="Step D: aggregate predictions into κ table" | |
| ) | |
| p_tab.add_argument( | |
| "--strict", | |
| action="store_true", | |
| help="Raise on missing predictions/labels (final-artifact path)", | |
| ) | |
| args = parser.parse_args() | |
| if args.cmd == "generate-outputs": | |
| asyncio.run(cmd_generate_outputs(_resolve_concurrency(args.concurrency))) | |
| elif args.cmd == "run-judges": | |
| asyncio.run( | |
| cmd_run_judges(args.row_config, _resolve_concurrency(args.concurrency)) | |
| ) | |
| elif args.cmd == "build-table": | |
| cmd_build_table(strict=args.strict) | |
| if __name__ == "__main__": | |
| main() | |