| """LLM-as-judge for the per-domain eval set. |
| |
| Grades each (question, model_answer) pair against the rubric in |
| per_domain_eval_set.json using DeepSeek V4 Pro. The judge sees: |
| - the question |
| - the model's answer |
| - the rubric (acceptance criteria + key concepts that must appear) |
| - the citation source (so it can ground "is this consistent with reality") |
| |
| The judge MUST output strict JSON: {label, reasoning}. Label is one of |
| {correct, partial, wrong, refused}. Reasoning is a one-or-two sentence |
| audit trail so any score in the matrix can be traced back to a real |
| chain of reasoning, not a black-box number. |
| |
| Why V4 Pro and not V4 Flash: |
| - Per the company-internal research at 2026-04-29: V4 Pro beats Haiku |
| 4.5 on every published code/STEM benchmark (SWE-Bench 80.6 vs 73.3, |
| LiveCodeBench 93.5). For grading 120 questions with technical |
| nuance, the reasoning headroom matters. Cost per full matrix run is |
| rounding error (~$0.50 at promo pricing). |
| |
| Cost (DeepSeek V4 Pro, 75% off through 2026-05-31): |
| Input ~700 tokens per judgment (rubric + question + answer): $0.0003 |
| Output ~150 tokens per judgment (reasoning + label): $0.0001 |
| Per judgment: ~$0.0004 |
| 120 questions × 11 model variants (1 base + 10 adapters): ~$0.53 |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| import time |
| import urllib.error |
| import urllib.request |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| REPO_ROOT = Path(__file__).resolve().parent.parent.parent |
| sys.path.insert(0, str(REPO_ROOT)) |
|
|
| TRANSIENT_HTTP = {429, 502, 503, 504} |
|
|
|
|
| def resolve_judge() -> tuple[str, str, str, str]: |
| """Resolve (provider, base_url, model, api_key) for the LLM judge. |
| |
| The judge follows the global teacher config (bee/teacher_providers.py |
| resolve_primary), so a single env-var change reroutes every grading |
| run consistently. Pinned to ONE model for the whole batch on purpose |
| — switching graders mid-eval invalidates the comparison. |
| """ |
| from bee.teacher_providers import resolve_primary |
|
|
| primary = resolve_primary() |
| if primary is None: |
| sys.exit( |
| "no teacher provider resolved — set BEE_TEACHER_PROVIDER + the " |
| "matching API key, or BEE_DEEPSEEK_API_KEY for the default." |
| ) |
| return primary.provider, primary.api_url, primary.model, primary.api_key |
|
|
| JUDGE_SYSTEM = """You are an impartial expert grader for an LLM evaluation. |
| |
| Given a question, a rubric of expected concepts, the citation source, \ |
| and a model's answer, classify the answer as one of: |
| |
| correct — the answer satisfies the rubric (all required concepts present, \ |
| factually right per the citation, possibly with extra correct context). \ |
| Minor wording differences are fine. |
| |
| partial — the answer captures SOME but not all required concepts, \ |
| or has a partly-correct framing with one factual slip. Half-credit. |
| |
| wrong — the answer misses the required concepts, or contradicts \ |
| the citation, or hallucinates a wrong fact, or is off-topic. |
| |
| refused — the model declined to answer (e.g. "I can't help with that") \ |
| even though the question is legitimate. Treat as wrong unless the \ |
| refusal is genuinely warranted by the citation source. |
| |
| Output STRICT JSON, exactly this shape, nothing else: |
| |
| {"label": "correct" | "partial" | "wrong" | "refused", "reasoning": "..."} |
| |
| Reasoning must be one or two sentences explaining the verdict — what \ |
| specific rubric concept was matched or missed. Do not include any \ |
| other text outside the JSON object.""" |
|
|
|
|
| @dataclass |
| class Judgment: |
| label: str |
| reasoning: str |
| model_answer: str |
| question_id: str |
| domain: str |
|
|
|
|
| def _load_env() -> dict[str, str]: |
| env_path = REPO_ROOT / ".env" |
| if not env_path.exists(): |
| return {} |
| out: dict[str, str] = {} |
| for line in env_path.read_text(encoding="utf-8").splitlines(): |
| line = line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| key, _, val = line.partition("=") |
| out[key.strip()] = val.strip().strip('"').strip("'") |
| return out |
|
|
|
|
| def _http_post_json(url: str, headers: dict[str, str], body: dict, timeout: int = 120) -> dict: |
| """POST + parse JSON, with 429/5xx retry and Retry-After honor. |
| |
| Mirrors the same pattern in scripts/distill_domain_seeds.py — auth |
| errors fatal, rate-limit/overload transient. |
| """ |
| req = urllib.request.Request( |
| url, |
| data=json.dumps(body).encode("utf-8"), |
| headers=headers, |
| method="POST", |
| ) |
| last_err: Exception | None = None |
| for attempt in range(5): |
| try: |
| with urllib.request.urlopen(req, timeout=timeout) as resp: |
| return json.loads(resp.read().decode("utf-8")) |
| except urllib.error.HTTPError as e: |
| if e.code not in TRANSIENT_HTTP: |
| raise |
| last_err = e |
| ra = e.headers.get("Retry-After") if hasattr(e, "headers") else None |
| try: |
| backoff = int(ra) if ra else (5 * (2**attempt) if attempt > 0 else 5) |
| except ValueError: |
| backoff = 5 * (2**attempt) if attempt > 0 else 5 |
| print(f" judge: http {e.code}; retry {attempt+1}/4 in {backoff}s", file=sys.stderr) |
| time.sleep(backoff) |
| except (ConnectionResetError, urllib.error.URLError, TimeoutError, OSError) as e: |
| last_err = e |
| backoff = 5 * (2**attempt) if attempt > 0 else 5 |
| print(f" judge: {type(e).__name__}; retry {attempt+1}/4 in {backoff}s", file=sys.stderr) |
| time.sleep(backoff) |
| if last_err is not None: |
| raise last_err |
| raise RuntimeError("unreachable") |
|
|
|
|
| def judge_one( |
| *, |
| question_id: str, |
| domain: str, |
| prompt: str, |
| rubric: str, |
| citation: str, |
| model_answer: str, |
| api_key: str, |
| provider: str = "deepseek", |
| base_url: str = "https://api.deepseek.com/v1", |
| model: str = "deepseek-v4-pro", |
| ) -> Judgment: |
| """Grade a single (question, answer) pair. Returns a Judgment. |
| |
| The provider/url/model trio comes from resolve_judge() at startup |
| and stays fixed for the whole batch. |
| """ |
| user_msg = ( |
| f"Question ({domain}, id={question_id}):\n{prompt}\n\n" |
| f"Rubric (what a correct answer must contain):\n{rubric}\n\n" |
| f"Citation source for fact-checking:\n{citation}\n\n" |
| f"Model's answer:\n{model_answer}\n\n" |
| f"Grade it. Reply with only the JSON object." |
| ) |
|
|
| if provider == "anthropic": |
| |
| |
| |
| |
| url = base_url.rstrip("/") + "/messages" |
| headers = { |
| "x-api-key": api_key, |
| "anthropic-version": "2023-06-01", |
| "Content-Type": "application/json", |
| } |
| body = { |
| "model": model, |
| "max_tokens": 1024, |
| "temperature": 0.0, |
| "system": JUDGE_SYSTEM, |
| "messages": [{"role": "user", "content": user_msg}], |
| } |
| resp = _http_post_json(url, headers, body, timeout=120) |
| raw = "" |
| for block in resp.get("content", []): |
| if block.get("type") == "text": |
| raw += block.get("text", "") |
| else: |
| |
| url = base_url.rstrip("/") + "/chat/completions" |
| body = { |
| "model": model, |
| "messages": [ |
| {"role": "system", "content": JUDGE_SYSTEM}, |
| {"role": "user", "content": user_msg}, |
| ], |
| "max_tokens": 1024, |
| "temperature": 0.0, |
| "response_format": {"type": "json_object"}, |
| } |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json", |
| } |
| resp = _http_post_json(url, headers, body, timeout=120) |
| raw = resp.get("choices", [{}])[0].get("message", {}).get("content", "") |
| try: |
| parsed = json.loads(raw) |
| except json.JSONDecodeError: |
| |
| |
| return Judgment( |
| label="wrong", |
| reasoning=f"judge produced unparseable JSON: {raw[:200]!r}", |
| model_answer=model_answer, |
| question_id=question_id, |
| domain=domain, |
| ) |
| label = str(parsed.get("label", "wrong")).lower().strip() |
| if label not in ("correct", "partial", "wrong", "refused"): |
| label = "wrong" |
| reasoning = str(parsed.get("reasoning", ""))[:500] |
| return Judgment( |
| label=label, |
| reasoning=reasoning, |
| model_answer=model_answer, |
| question_id=question_id, |
| domain=domain, |
| ) |
|
|
|
|
| SCORE_MAP = {"correct": 1.0, "partial": 0.5, "wrong": 0.0, "refused": 0.0} |
|
|
|
|
| def aggregate_judgments(judgments: list[Judgment]) -> dict[str, Any]: |
| """Aggregate per-domain and overall scores from a flat list of judgments.""" |
| by_domain: dict[str, list[Judgment]] = {} |
| for j in judgments: |
| by_domain.setdefault(j.domain, []).append(j) |
|
|
| domain_scores: dict[str, dict[str, Any]] = {} |
| for dom, js in by_domain.items(): |
| labels = [j.label for j in js] |
| score = sum(SCORE_MAP[l] for l in labels) / max(len(labels), 1) |
| domain_scores[dom] = { |
| "score": round(score, 3), |
| "n": len(js), |
| "labels": { |
| "correct": labels.count("correct"), |
| "partial": labels.count("partial"), |
| "wrong": labels.count("wrong"), |
| "refused": labels.count("refused"), |
| }, |
| } |
|
|
| overall_score = ( |
| sum(d["score"] * d["n"] for d in domain_scores.values()) |
| / max(sum(d["n"] for d in domain_scores.values()), 1) |
| ) |
|
|
| return { |
| "overall_score": round(overall_score, 3), |
| "n_total": sum(d["n"] for d in domain_scores.values()), |
| "by_domain": domain_scores, |
| } |
|
|
|
|
| def main() -> None: |
| p = argparse.ArgumentParser(description="Smoke-test the judge with one hand-crafted pair.") |
| p.add_argument("--question-id", default="general-08", |
| help="ID from per_domain_eval_set.json") |
| p.add_argument("--answer", default="Approximately 3 × 10^8 m/s.", |
| help="Model's answer to grade") |
| args = p.parse_args() |
|
|
| |
| |
| for k, v in _load_env().items(): |
| os.environ.setdefault(k, v) |
| provider, base_url, model, api_key = resolve_judge() |
| print(f" judge: {provider}:{model} via {base_url}") |
|
|
| eval_set = json.loads( |
| (REPO_ROOT / "scripts/eval/per_domain_eval_set.json").read_text(encoding="utf-8") |
| ) |
| |
| question = None |
| domain = None |
| for dom, blob in eval_set["domains"].items(): |
| for q in blob["questions"]: |
| if q["id"] == args.question_id: |
| question, domain = q, dom |
| break |
| if question: |
| break |
| if not question: |
| sys.exit(f"question id {args.question_id} not found in eval set") |
|
|
| print(f"Judging {args.question_id} ({domain})") |
| print(f"Q: {question['prompt'][:100]}...") |
| print(f"A: {args.answer[:100]}...") |
| print() |
|
|
| j = judge_one( |
| question_id=args.question_id, |
| domain=domain, |
| prompt=question["prompt"], |
| rubric=question["rubric"], |
| citation=question["citation"], |
| model_answer=args.answer, |
| api_key=api_key, |
| provider=provider, |
| base_url=base_url, |
| model=model, |
| ) |
| print(f"Label: {j.label}") |
| print(f"Reasoning: {j.reasoning}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|