"""LLM-as-judge for the per-domain eval set. Grades each (question, model_answer) pair against the rubric in per_domain_eval_set.json using DeepSeek V4 Pro. The judge sees: - the question - the model's answer - the rubric (acceptance criteria + key concepts that must appear) - the citation source (so it can ground "is this consistent with reality") The judge MUST output strict JSON: {label, reasoning}. Label is one of {correct, partial, wrong, refused}. Reasoning is a one-or-two sentence audit trail so any score in the matrix can be traced back to a real chain of reasoning, not a black-box number. Why V4 Pro and not V4 Flash: - Per the company-internal research at 2026-04-29: V4 Pro beats Haiku 4.5 on every published code/STEM benchmark (SWE-Bench 80.6 vs 73.3, LiveCodeBench 93.5). For grading 120 questions with technical nuance, the reasoning headroom matters. Cost per full matrix run is rounding error (~$0.50 at promo pricing). Cost (DeepSeek V4 Pro, 75% off through 2026-05-31): Input ~700 tokens per judgment (rubric + question + answer): $0.0003 Output ~150 tokens per judgment (reasoning + label): $0.0001 Per judgment: ~$0.0004 120 questions × 11 model variants (1 base + 10 adapters): ~$0.53 """ from __future__ import annotations import argparse import json import os import sys import time import urllib.error import urllib.request from dataclasses import dataclass from pathlib import Path from typing import Any REPO_ROOT = Path(__file__).resolve().parent.parent.parent sys.path.insert(0, str(REPO_ROOT)) TRANSIENT_HTTP = {429, 502, 503, 504} def resolve_judge() -> tuple[str, str, str, str]: """Resolve (provider, base_url, model, api_key) for the LLM judge. The judge follows the global teacher config (bee/teacher_providers.py resolve_primary), so a single env-var change reroutes every grading run consistently. Pinned to ONE model for the whole batch on purpose — switching graders mid-eval invalidates the comparison. """ from bee.teacher_providers import resolve_primary primary = resolve_primary() if primary is None: sys.exit( "no teacher provider resolved — set BEE_TEACHER_PROVIDER + the " "matching API key, or BEE_DEEPSEEK_API_KEY for the default." ) return primary.provider, primary.api_url, primary.model, primary.api_key JUDGE_SYSTEM = """You are an impartial expert grader for an LLM evaluation. Given a question, a rubric of expected concepts, the citation source, \ and a model's answer, classify the answer as one of: correct — the answer satisfies the rubric (all required concepts present, \ factually right per the citation, possibly with extra correct context). \ Minor wording differences are fine. partial — the answer captures SOME but not all required concepts, \ or has a partly-correct framing with one factual slip. Half-credit. wrong — the answer misses the required concepts, or contradicts \ the citation, or hallucinates a wrong fact, or is off-topic. refused — the model declined to answer (e.g. "I can't help with that") \ even though the question is legitimate. Treat as wrong unless the \ refusal is genuinely warranted by the citation source. Output STRICT JSON, exactly this shape, nothing else: {"label": "correct" | "partial" | "wrong" | "refused", "reasoning": "..."} Reasoning must be one or two sentences explaining the verdict — what \ specific rubric concept was matched or missed. Do not include any \ other text outside the JSON object.""" @dataclass class Judgment: label: str reasoning: str model_answer: str question_id: str domain: str def _load_env() -> dict[str, str]: env_path = REPO_ROOT / ".env" if not env_path.exists(): return {} out: dict[str, str] = {} for line in env_path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, val = line.partition("=") out[key.strip()] = val.strip().strip('"').strip("'") return out def _http_post_json(url: str, headers: dict[str, str], body: dict, timeout: int = 120) -> dict: """POST + parse JSON, with 429/5xx retry and Retry-After honor. Mirrors the same pattern in scripts/distill_domain_seeds.py — auth errors fatal, rate-limit/overload transient. """ req = urllib.request.Request( url, data=json.dumps(body).encode("utf-8"), headers=headers, method="POST", ) last_err: Exception | None = None for attempt in range(5): try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as e: if e.code not in TRANSIENT_HTTP: raise last_err = e ra = e.headers.get("Retry-After") if hasattr(e, "headers") else None try: backoff = int(ra) if ra else (5 * (2**attempt) if attempt > 0 else 5) except ValueError: backoff = 5 * (2**attempt) if attempt > 0 else 5 print(f" judge: http {e.code}; retry {attempt+1}/4 in {backoff}s", file=sys.stderr) time.sleep(backoff) except (ConnectionResetError, urllib.error.URLError, TimeoutError, OSError) as e: last_err = e backoff = 5 * (2**attempt) if attempt > 0 else 5 print(f" judge: {type(e).__name__}; retry {attempt+1}/4 in {backoff}s", file=sys.stderr) time.sleep(backoff) if last_err is not None: raise last_err raise RuntimeError("unreachable") def judge_one( *, question_id: str, domain: str, prompt: str, rubric: str, citation: str, model_answer: str, api_key: str, provider: str = "deepseek", base_url: str = "https://api.deepseek.com/v1", model: str = "deepseek-v4-pro", ) -> Judgment: """Grade a single (question, answer) pair. Returns a Judgment. The provider/url/model trio comes from resolve_judge() at startup and stays fixed for the whole batch. """ user_msg = ( f"Question ({domain}, id={question_id}):\n{prompt}\n\n" f"Rubric (what a correct answer must contain):\n{rubric}\n\n" f"Citation source for fact-checking:\n{citation}\n\n" f"Model's answer:\n{model_answer}\n\n" f"Grade it. Reply with only the JSON object." ) if provider == "anthropic": # Anthropic Messages API has a different shape (system as top-level # field, x-api-key auth). It also doesn't accept response_format, # so the prompt itself must enforce strict JSON output — JUDGE_SYSTEM # already says "Output STRICT JSON, exactly this shape, nothing else". url = base_url.rstrip("/") + "/messages" headers = { "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json", } body = { "model": model, "max_tokens": 1024, "temperature": 0.0, "system": JUDGE_SYSTEM, "messages": [{"role": "user", "content": user_msg}], } resp = _http_post_json(url, headers, body, timeout=120) raw = "" for block in resp.get("content", []): if block.get("type") == "text": raw += block.get("text", "") else: # OpenAI-compatible (DeepSeek / OpenAI / Gemini-OpenAI-compat). url = base_url.rstrip("/") + "/chat/completions" body = { "model": model, "messages": [ {"role": "system", "content": JUDGE_SYSTEM}, {"role": "user", "content": user_msg}, ], "max_tokens": 1024, "temperature": 0.0, "response_format": {"type": "json_object"}, } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } resp = _http_post_json(url, headers, body, timeout=120) raw = resp.get("choices", [{}])[0].get("message", {}).get("content", "") try: parsed = json.loads(raw) except json.JSONDecodeError: # Defensive — V4 Pro normally honors response_format=json_object, # but we don't want a single bad response to nuke the whole run. return Judgment( label="wrong", reasoning=f"judge produced unparseable JSON: {raw[:200]!r}", model_answer=model_answer, question_id=question_id, domain=domain, ) label = str(parsed.get("label", "wrong")).lower().strip() if label not in ("correct", "partial", "wrong", "refused"): label = "wrong" reasoning = str(parsed.get("reasoning", ""))[:500] return Judgment( label=label, reasoning=reasoning, model_answer=model_answer, question_id=question_id, domain=domain, ) SCORE_MAP = {"correct": 1.0, "partial": 0.5, "wrong": 0.0, "refused": 0.0} def aggregate_judgments(judgments: list[Judgment]) -> dict[str, Any]: """Aggregate per-domain and overall scores from a flat list of judgments.""" by_domain: dict[str, list[Judgment]] = {} for j in judgments: by_domain.setdefault(j.domain, []).append(j) domain_scores: dict[str, dict[str, Any]] = {} for dom, js in by_domain.items(): labels = [j.label for j in js] score = sum(SCORE_MAP[l] for l in labels) / max(len(labels), 1) domain_scores[dom] = { "score": round(score, 3), "n": len(js), "labels": { "correct": labels.count("correct"), "partial": labels.count("partial"), "wrong": labels.count("wrong"), "refused": labels.count("refused"), }, } overall_score = ( sum(d["score"] * d["n"] for d in domain_scores.values()) / max(sum(d["n"] for d in domain_scores.values()), 1) ) return { "overall_score": round(overall_score, 3), "n_total": sum(d["n"] for d in domain_scores.values()), "by_domain": domain_scores, } def main() -> None: p = argparse.ArgumentParser(description="Smoke-test the judge with one hand-crafted pair.") p.add_argument("--question-id", default="general-08", help="ID from per_domain_eval_set.json") p.add_argument("--answer", default="Approximately 3 × 10^8 m/s.", help="Model's answer to grade") args = p.parse_args() # Hydrate env from .env so resolve_judge() sees configured keys even # when called from a fresh shell. for k, v in _load_env().items(): os.environ.setdefault(k, v) provider, base_url, model, api_key = resolve_judge() print(f" judge: {provider}:{model} via {base_url}") eval_set = json.loads( (REPO_ROOT / "scripts/eval/per_domain_eval_set.json").read_text(encoding="utf-8") ) # Find the question question = None domain = None for dom, blob in eval_set["domains"].items(): for q in blob["questions"]: if q["id"] == args.question_id: question, domain = q, dom break if question: break if not question: sys.exit(f"question id {args.question_id} not found in eval set") print(f"Judging {args.question_id} ({domain})") print(f"Q: {question['prompt'][:100]}...") print(f"A: {args.answer[:100]}...") print() j = judge_one( question_id=args.question_id, domain=domain, prompt=question["prompt"], rubric=question["rubric"], citation=question["citation"], model_answer=args.answer, api_key=api_key, provider=provider, base_url=base_url, model=model, ) print(f"Label: {j.label}") print(f"Reasoning: {j.reasoning}") if __name__ == "__main__": main()