agentbench / scripts /run_calibration.py
Nomearod's picture
calibrate(jury): v1.1+v1.1.1 — fix weighting bugs; recency-position paraphrase clause
ab0e054
"""Calibration runner: generate-outputs | run-judges | build-table.
Orchestrates Steps A, C, D from the design doc's data flow. Step B
(hand-labeling) is manual — done in a Jupyter notebook reading
results/calibration_v1_system_outputs.json and appending to
measurements/2026-05-04-judge-calibration-labels.jsonl.
Examples:
python scripts/run_calibration.py generate-outputs --concurrency 5
python scripts/run_calibration.py run-judges --row-config=configs/calibration/rows/baseline.yaml
python scripts/run_calibration.py build-table
python scripts/run_calibration.py build-table --strict
"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
from pathlib import Path
import structlog
import yaml
logger = structlog.get_logger()
REPO = Path(__file__).resolve().parents[1]
CALIBRATION_SPEC = REPO / "agent_bench/evaluation/datasets/calibration_v1.json"
SYSTEM_OUTPUTS = REPO / "results/calibration_v1_system_outputs.json"
LABELS_PATH = REPO / "measurements/2026-05-04-judge-calibration-labels.jsonl"
KAPPA_TABLE_OUT = REPO / "docs/_generated/kappa_table.md"
def _resolve_concurrency(cli_value: int | None) -> int:
"""CLI flag overrides config field; default is 5. Logs the resolved value."""
if cli_value is not None:
resolved = cli_value
else:
cfg_path = REPO / "configs/default.yaml"
cfg_concurrency = None
if cfg_path.exists():
cfg = yaml.safe_load(cfg_path.read_text()) or {}
cfg_concurrency = (cfg.get("evaluation", {}) or {}).get(
"calibration_concurrency"
)
resolved = cfg_concurrency if cfg_concurrency is not None else 5
logger.info("calibration_concurrency_resolved", value=resolved)
return resolved
# --- Subcommand: generate-outputs (Step A) ---
def _build_corpus_orchestrator(cfg, corpus_name: str, embedder, provider):
"""Build a per-corpus Orchestrator wired to that corpus's HybridStore.
Mirrors the per-corpus construction in scripts/evaluate.py so calibration
runs use the same retrieval stack as production evaluation. The embedder
and provider are shared across corpora — only the store/retriever/
SearchTool differ.
"""
from agent_bench.agents.orchestrator import Orchestrator
from agent_bench.rag.retriever import Retriever
from agent_bench.rag.store import HybridStore
from agent_bench.tools.calculator import CalculatorTool
from agent_bench.tools.registry import ToolRegistry
from agent_bench.tools.search import SearchTool
corpus_cfg = cfg.corpora[corpus_name]
store = HybridStore.load(corpus_cfg.store_path, rrf_k=cfg.rag.retrieval.rrf_k)
reranker = None
if cfg.rag.reranker.enabled:
from agent_bench.rag.reranker import CrossEncoderReranker
reranker = CrossEncoderReranker(model_name=cfg.rag.reranker.model_name)
retriever = Retriever(
embedder=embedder,
store=store,
default_strategy=cfg.rag.retrieval.strategy,
candidates_per_system=cfg.rag.retrieval.candidates_per_system,
reranker=reranker,
reranker_top_k=cfg.rag.reranker.top_k,
)
registry = ToolRegistry()
registry.register(
SearchTool(
retriever=retriever,
default_top_k=cfg.rag.retrieval.top_k,
refusal_threshold=corpus_cfg.refusal_threshold,
)
)
registry.register(CalculatorTool())
return Orchestrator(
provider=provider,
registry=registry,
max_iterations=cfg.agent.max_iterations,
temperature=cfg.agent.temperature,
)
async def cmd_generate_outputs(concurrency: int) -> None:
"""Run the orchestrator against the 30 calibration items with a frozen
configuration; write results/calibration_v1_system_outputs.json.
The calibration spec is mixed-corpus (k8s + fastapi). Each item carries a
`corpus` field; we build one Orchestrator per corpus and route by that
field. A KeyError on an unrecognized corpus is preferable to silently
misrouting an item to the wrong store.
"""
from agent_bench.core.config import load_config
from agent_bench.core.provider import AnthropicProvider
from agent_bench.evaluation.harness import load_golden_dataset
from agent_bench.rag.embedder import Embedder
spec = json.loads(CALIBRATION_SPEC.read_text())
target_ids = {i["id"]: i for i in spec["items"]}
fastapi = load_golden_dataset(
REPO / "agent_bench/evaluation/datasets/tech_docs_golden.json"
)
k8s = load_golden_dataset(
REPO / "agent_bench/evaluation/datasets/k8s_golden.json"
)
items = [q for q in (fastapi + k8s) if q.id in target_ids]
if len(items) != len(target_ids):
missing = set(target_ids) - {q.id for q in items}
raise SystemExit(
f"calibration items not found in goldens: {sorted(missing)}"
)
cfg = load_config()
provider = AnthropicProvider(cfg)
embedder = Embedder(model_name=cfg.embedding.model, cache_dir=cfg.embedding.cache_dir)
item_corpus = {it.id: target_ids[it.id]["corpus"] for it in items}
unknown: dict[str, list[str]] = {}
for it_id, corpus in item_corpus.items():
if corpus not in cfg.corpora:
unknown.setdefault(corpus, []).append(it_id)
if unknown:
examples = "; ".join(
f"{cor!r}: {sorted(ids)[:3]}" for cor, ids in sorted(unknown.items())
)
raise KeyError(
f"calibration spec references corpora not in cfg.corpora — "
f"{examples}; configured corpora: {sorted(cfg.corpora)!r}"
)
corpora_needed = sorted(set(item_corpus.values()))
orchestrators = {
name: _build_corpus_orchestrator(cfg, name, embedder, provider)
for name in corpora_needed
}
sem = asyncio.Semaphore(concurrency)
async def _run_one(item):
async with sem:
response = await orchestrators[item_corpus[item.id]].run(
question=item.question,
system_prompt="You are a helpful assistant.",
)
answer = response.answer
sources = sorted(s.source for s in response.sources)
sys_hash = hashlib.sha256(
f"{item.id}\x00{answer}\x00{','.join(sources)}".encode("utf-8")
).hexdigest()
return {
"item_id": item.id,
"question": item.question,
"category": item.category,
"answer": answer,
"sources": [s.source for s in response.sources],
"ranked_sources": response.ranked_sources,
"source_chunks": response.source_chunks,
"source_snippets": item.source_snippets,
"reference_answer": item.reference_answer,
"system_output_hash": sys_hash,
"stratum": target_ids[item.id]["stratum"],
"corpus": target_ids[item.id]["corpus"],
}
records = await asyncio.gather(*[_run_one(it) for it in items])
SYSTEM_OUTPUTS.parent.mkdir(parents=True, exist_ok=True)
SYSTEM_OUTPUTS.write_text(json.dumps(records, indent=2) + "\n")
logger.info(
"generate_outputs_complete", count=len(records), path=str(SYSTEM_OUTPUTS)
)
# --- Subcommand: run-judges (Step C, one row per invocation) ---
def _make_provider(name: str, cfg, *, model: str | None = None):
from agent_bench.core.provider import AnthropicProvider, OpenAIProvider
if name == "anthropic":
return AnthropicProvider(cfg, model=model)
if name == "openai":
return OpenAIProvider(cfg, model=model)
raise ValueError(f"unknown provider: {name}")
def _make_judge(
provider_name: str,
model_id: str,
dimension: str,
cfg,
*,
use_cot: bool = True,
use_anchors: bool = True,
abstain_allowed_override: bool | None = None,
):
from agent_bench.evaluation.judges.base import Rubric
from agent_bench.evaluation.judges.citation_faithfulness import (
CitationFaithfulnessJudge,
)
from agent_bench.evaluation.judges.completeness import CompletenessJudge
from agent_bench.evaluation.judges.groundedness import GroundednessJudge
from agent_bench.evaluation.judges.relevance import RelevanceJudge
judge_class = {
"groundedness": GroundednessJudge,
"relevance": RelevanceJudge,
"completeness": CompletenessJudge,
"citation_faithfulness": CitationFaithfulnessJudge,
}
rubric_dir = REPO / "agent_bench/evaluation/rubrics"
rubric = Rubric.from_markdown_file(rubric_dir / f"{dimension}.md")
if not use_anchors:
# Strip ### Example sections — body_markdown changes, so
# ScoreResult.rubric_version naturally distinguishes anchored vs
# stripped variants when the calibration report buckets results.
rubric = rubric.strip_anchors()
return judge_class[dimension](
judge_provider=_make_provider(provider_name, cfg, model=model_id),
rubric=rubric,
model_id=model_id,
use_cot=use_cot,
abstain_allowed_override=abstain_allowed_override,
)
def _row_judge_options(row: dict) -> dict:
"""Pull `options` from a row config and project to _make_judge kwargs.
Defaults (when keys are missing) match the baseline contract: CoT on,
anchors on, abstain follows the rubric (no override).
"""
opts = row.get("options") or {}
abstain_allowed = opts.get("abstain_allowed")
return {
"use_cot": bool(opts.get("use_cot", True)),
"use_anchors": bool(opts.get("use_anchors", True)),
# None = follow rubric; explicit True/False = override
"abstain_allowed_override": (
None if abstain_allowed is None else bool(abstain_allowed)
),
}
def _build_item_and_output(rec: dict):
from agent_bench.agents.orchestrator import AgentResponse, SourceReference
from agent_bench.core.types import TokenUsage
from agent_bench.evaluation.harness import GoldenQuestion
item = GoldenQuestion(
id=rec["item_id"],
question=rec["question"],
expected_answer_keywords=[],
expected_sources=[],
category=rec["category"],
difficulty="easy",
requires_calculator=False,
source_snippets=rec.get("source_snippets", []),
reference_answer=rec.get("reference_answer", ""),
)
output = AgentResponse(
answer=rec["answer"],
sources=[SourceReference(source=s) for s in rec["sources"]],
ranked_sources=rec.get("ranked_sources", []),
source_chunks=rec.get("source_chunks", []),
iterations=1,
usage=TokenUsage(input_tokens=0, output_tokens=0, estimated_cost_usd=0),
latency_ms=0,
)
return item, output
async def cmd_run_judges(row_config_path: Path, concurrency: int) -> None:
"""Score the frozen system outputs with the row's judge configuration."""
from agent_bench.core.config import load_config
from agent_bench.evaluation.variance.jury import jury
from agent_bench.evaluation.variance.rubric_permute import rubric_permute
if not SYSTEM_OUTPUTS.exists():
raise SystemExit(
f"{SYSTEM_OUTPUTS} not found — run `generate-outputs` first."
)
row = yaml.safe_load(row_config_path.read_text())
outputs = json.loads(SYSTEM_OUTPUTS.read_text())
cfg = load_config()
sem = asyncio.Semaphore(concurrency)
all_results: list[dict] = []
strategy = row["strategy"]
def _skip_oos(rec: dict, dim: str) -> bool:
return rec["category"] == "out_of_scope" and dim != "relevance"
judge_opts = _row_judge_options(row)
if strategy == "single":
# Build one judge per dimension up-front, then gather all
# (dim, item) pairs in a single asyncio.gather call. Previous
# design serialized across dimensions (each dim awaited fully
# before the next started), leaving Phase-11 wall-clock on the
# table when the calibration spend is API-rate-limited.
judges_by_dim = {
dim: _make_judge(
row["provider"], row["model_id"], dim, cfg, **judge_opts
)
for dim in row["dimensions"]
}
async def score_one(rec: dict, dim: str, judge):
async with sem:
if _skip_oos(rec, dim):
return None
item, output = _build_item_and_output(rec)
result = await judge.score(item, output)
return {"item_id": rec["item_id"], "dimension": dim, **result.model_dump()}
coros = [
score_one(rec, dim, judge)
for dim, judge in judges_by_dim.items()
for rec in outputs
]
gathered = await asyncio.gather(*coros)
all_results.extend([r for r in gathered if r is not None])
elif strategy == "rubric_permute":
# Sequential per-item by design: PermutedJudge writes to the
# sidecar JSONL with append mode and within-call ordering matters
# for downstream per-permutation analysis (the kappa_table joins
# by item_id but the sidecar order encodes the permutation seed
# sequence). Across-dim parallelism is left for v1.1 once the
# sidecar contract proves stable.
for dim in row["dimensions"]:
judge = _make_judge(
row["provider"], row["model_id"], dim, cfg, **judge_opts
)
sidecar = REPO / row.get(
"sidecar_path", "results/calibration_v1_permute_members.jsonl"
)
permuted = rubric_permute(
judge,
n=row["options"]["n_permutations"],
seeds=row["options"]["seeds"],
sidecar_path=sidecar,
)
for rec in outputs:
if _skip_oos(rec, dim):
continue
item, output = _build_item_and_output(rec)
result = await permuted.score(item, output)
all_results.append({"item_id": rec["item_id"], "dimension": dim, **result.model_dump()})
elif strategy == "jury":
# Same sequential rationale as rubric_permute: jury writes a
# per-member sidecar and downstream analysis benefits from stable
# ordering. The asyncio.gather inside Jury.score does parallelize
# member calls within an item; the across-item / across-dim
# serialization is the conservative choice.
for dim in row["dimensions"]:
members = [
_make_judge(m["provider"], m["model_id"], dim, cfg, **judge_opts)
for m in row["members"]
]
sidecar = REPO / row["sidecar_path"]
weights = (
_compute_kappa_weights(
REPO / row["weights_source"],
dim,
expected_judge_ids={m.judge_id for m in members},
)
if row.get("aggregation") == "kappa_weighted"
else None
)
j = jury(
judges=members,
aggregation=row["aggregation"],
weights=weights,
quorum=row.get("quorum"),
sidecar_path=sidecar,
)
for rec in outputs:
if _skip_oos(rec, dim):
continue
item, output = _build_item_and_output(rec)
result = await j.score(item, output)
all_results.append({"item_id": rec["item_id"], "dimension": dim, **result.model_dump()})
else:
raise SystemExit(f"unknown strategy: {strategy}")
out_path = REPO / row["output_path"]
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(all_results, indent=2) + "\n")
logger.info(
"run_judges_complete",
row=row["label"],
count=len(all_results),
path=str(out_path),
)
def _compute_kappa_weights(
predictions_path: Path,
dimension: str,
expected_judge_ids: set[str],
) -> dict[str, float]:
"""Compute per-judge weight = max(0, Cohen's κ vs gold labels) for the
dimension, from a predictions file (JSON list or JSONL).
v1.1 replaces v1's stub (which returned 1.0 for every judge_id seen,
causing asymmetric coverage to amplify rather than suppress an
unweighted member). Hard-errors if `predictions_path` is missing,
if any `expected_judge_ids` member has no scored (non-abstain)
predictions for `dimension`, or if no labels are available for the
dimension.
The κ → weight mapping clips negative κ to 0; a member with κ ≤ 0 on
a dimension contributes weight 0 (effective exclusion via weighting).
This is the "soft exclusion" behavior — explicit per-dimension
exclusion is tracked separately on the v1.2 fix-list.
Pragmatic v1.1: `predictions_path` may point at the same calibration
set used for κ reporting (circular weighting); this is documented in
the v1.1 jury-rescue DECISIONS entry. v1.2 will require a held-out
validation set.
"""
from agent_bench.evaluation.calibration.metrics import cohen_kappa
if not predictions_path.exists():
raise FileNotFoundError(
f"weights source {predictions_path} does not exist; v1.1 "
f"requires explicit κ-derived weights — no silent fallback"
)
# Load predictions: JSON list (baseline-style) or JSONL (sidecar-style).
raw = predictions_path.read_text()
if predictions_path.suffix == ".jsonl":
preds = [json.loads(line) for line in raw.splitlines() if line.strip()]
else:
preds = json.loads(raw)
if not LABELS_PATH.exists():
raise FileNotFoundError(
f"labels file {LABELS_PATH} does not exist; cannot compute "
f"κ-derived weights"
)
labels: dict[str, int] = {}
for line in LABELS_PATH.read_text().splitlines():
if not line.strip():
continue
rec = json.loads(line)
if rec.get("dimension") != dimension or rec.get("abstained"):
continue
labels[rec["system_output_hash"]] = int(rec["score"])
if not labels:
raise ValueError(
f"no gold labels for dimension={dimension!r} in {LABELS_PATH}; "
f"cannot compute κ-derived weights"
)
# Group predictions by judge_id, joining to labels by system_output_hash.
# The sidecar JSONL has one record per (judge × item × dim); the baseline
# JSON has the same. Both expose `judge_id` of the form `{model}_{dim}`,
# `system_output_hash`, `score`, and (for the abstain-aware filter) the
# `Unknown` sentinel.
by_judge: dict[str, list[tuple[int, int]]] = {}
for p in preds:
# JSONL sidecar lacks `dimension` field; we filter by suffix on
# judge_id instead, which encodes dimension.
if not p["judge_id"].endswith(f"_{dimension}"):
continue
if p["score"] == "Unknown":
continue
h = p["system_output_hash"]
if h not in labels:
continue
by_judge.setdefault(p["judge_id"], []).append(
(labels[h], int(p["score"]))
)
missing = expected_judge_ids - by_judge.keys()
if missing:
raise ValueError(
f"weights source {predictions_path} has no predictions for "
f"expected judge_ids {sorted(missing)} on dimension={dimension!r}. "
f"Source covers {sorted(by_judge.keys())}. v1.1 requires "
f"symmetric coverage — point weights_source at a predictions "
f"file containing every jury member's verdicts (e.g. the jury "
f"sidecar from a prior run)."
)
weights: dict[str, float] = {}
for jid in expected_judge_ids:
pairs = by_judge[jid]
y_lab = [a for a, _ in pairs]
y_pred = [b for _, b in pairs]
kappa = cohen_kappa(y_lab, y_pred)
weights[jid] = max(0.0, kappa)
logger.info(
"kappa_weight_computed",
judge_id=jid,
dimension=dimension,
kappa=kappa,
weight=weights[jid],
n=len(pairs),
)
return weights
# --- Subcommand: build-table (Step D) ---
def cmd_build_table(strict: bool) -> None:
from agent_bench.evaluation.calibration.report import generate_kappa_table
predictions_glob = str(REPO / "results/calibration_v1_judge_*.json")
generate_kappa_table(
predictions_glob=predictions_glob,
labels_path=str(LABELS_PATH),
output_path=str(KAPPA_TABLE_OUT),
strict=strict,
)
logger.info("build_table_complete", path=str(KAPPA_TABLE_OUT), strict=strict)
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
sub = parser.add_subparsers(dest="cmd", required=True)
p_gen = sub.add_parser(
"generate-outputs", help="Step A: generate frozen system outputs"
)
p_gen.add_argument("--concurrency", type=int, default=None)
p_run = sub.add_parser("run-judges", help="Step C: score one ablation row")
p_run.add_argument("--row-config", type=Path, required=True)
p_run.add_argument("--concurrency", type=int, default=None)
p_tab = sub.add_parser(
"build-table", help="Step D: aggregate predictions into κ table"
)
p_tab.add_argument(
"--strict",
action="store_true",
help="Raise on missing predictions/labels (final-artifact path)",
)
args = parser.parse_args()
if args.cmd == "generate-outputs":
asyncio.run(cmd_generate_outputs(_resolve_concurrency(args.concurrency)))
elif args.cmd == "run-judges":
asyncio.run(
cmd_run_judges(args.row_config, _resolve_concurrency(args.concurrency))
)
elif args.cmd == "build-table":
cmd_build_table(strict=args.strict)
if __name__ == "__main__":
main()