bee / scripts /eval /run_matrix.py
Bee Deploy
HF Space backend deploy [de0cba5]
5e21013
"""Run the full benchmark matrix for one (base_model, adapter) cell.
Inputs:
--base HuggingFace model id (e.g. HuggingFaceTB/SmolLM2-360M-Instruct)
--adapter optional HF repo + branch (e.g. cuilabs/bee-cell:cybersecurity-2026-04-28-1221)
If omitted, runs on the base model alone.
--output-dir where to write the per-cell JSON (default: data/eval_reports/matrix/)
--limit cap questions per domain (smoke testing; default: all 12)
Outputs:
data/eval_reports/matrix/<base_short>__<adapter_short>.json
{
"model": {...},
"device": "...",
"per_domain_eval": {
"overall_score": 0.xx,
"by_domain": {...},
"judgments": [...]
},
"throughput": {"tok_per_s": ...},
"started_at": "...",
"completed_at": "...",
"total_time_s": ...
}
Why local-first instead of lighteval (for now): the per-domain eval is
the unique-value part of the Bee benchmark, lighteval doesn't have it,
and getting the local runner working end-to-end is the fastest path to
the matrix. The standard SmolLM-card-aligned suite (MMLU, HumanEval,
etc.) is queued as a follow-up — runs separately via lighteval, results
merge into the same matrix JSON.
"""
from __future__ import annotations
import argparse
import datetime
import json
import os
import sys
import time
from dataclasses import asdict
from pathlib import Path
from typing import Optional
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(REPO_ROOT))
from scripts.eval.judge import ( # noqa: E402
Judgment,
aggregate_judgments,
judge_one,
)
def _load_env_keys() -> dict[str, str]:
env_path = REPO_ROOT / ".env"
if not env_path.exists():
return {}
out: dict[str, str] = {}
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
out[k.strip()] = v.strip().strip('"').strip("'")
return out
def _generate(model, tokenizer, prompt: str, max_new_tokens: int, device: str) -> str:
"""Generate one response. Uses chat template if available."""
import torch # noqa: E402
if hasattr(tokenizer, "apply_chat_template") and tokenizer.chat_template:
chat = [{"role": "user", "content": prompt}]
text = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
inputs = tokenizer(text, return_tensors="pt").to(device)
else:
inputs = tokenizer(prompt, return_tensors="pt").to(device)
with torch.no_grad():
out = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=False, # greedy for determinism
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
)
gen = out[0][inputs["input_ids"].shape[1]:]
return tokenizer.decode(gen, skip_special_tokens=True).strip()
def _measure_throughput(model, tokenizer, device: str) -> dict:
"""5 prompts × 100 new tokens each, return aggregate tok/s.
Mirrors data/eval_reports/2026-04-29_throughput_mps.json so all
matrix cells have a comparable throughput number.
"""
import torch # noqa: E402
prompts = [
"Explain machine learning in one paragraph.",
"Describe how a quantum computer works.",
"What is a smart contract?",
"How does gradient descent optimize a model?",
"Summarize the basics of public-key cryptography.",
]
# Warmup
chat = [{"role": "user", "content": prompts[0]}]
text = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
ins = tokenizer(text, return_tensors="pt").to(device)
with torch.no_grad():
model.generate(**ins, max_new_tokens=8, do_sample=False, pad_token_id=tokenizer.pad_token_id)
total_new = 0
total_t = 0.0
per_prompt = []
for p in prompts:
chat = [{"role": "user", "content": p}]
text = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
ins = tokenizer(text, return_tensors="pt").to(device)
t0 = time.perf_counter()
with torch.no_grad():
o = model.generate(
**ins, max_new_tokens=100, do_sample=False,
pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id,
)
dt = time.perf_counter() - t0
n = o.shape[1] - ins["input_ids"].shape[1]
total_new += n
total_t += dt
per_prompt.append({"new_tokens": n, "seconds": round(dt, 3), "tok_per_s": round(n / dt, 1)})
return {
"max_new_tokens_per_prompt": 100,
"decoding": "greedy",
"per_prompt": per_prompt,
"aggregate": {
"total_new_tokens": total_new,
"total_seconds": round(total_t, 3),
"tok_per_s": round(total_new / max(total_t, 1e-6), 1),
},
}
def _load_model(base: str, adapter: Optional[str], device: str):
"""Load base model + optional LoRA adapter from cuilabs/bee-cell:branch.
`adapter` format: "cuilabs/bee-cell:cybersecurity-2026-04-28-1221"
(repo_id:branch). If None, returns base model alone.
"""
import torch # noqa: E402
from transformers import AutoModelForCausalLM, AutoTokenizer # noqa: E402
tokenizer = AutoTokenizer.from_pretrained(base, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
dtype = torch.float16 if device == "mps" else None
model = AutoModelForCausalLM.from_pretrained(
base, trust_remote_code=True, torch_dtype=dtype,
).to(device)
adapter_info = None
if adapter:
from peft import PeftModel # noqa: E402
if ":" in adapter:
adapter_repo, adapter_branch = adapter.split(":", 1)
else:
adapter_repo, adapter_branch = adapter, None
token = os.environ.get("HF_TOKEN") or _load_env_keys().get("HF_TOKEN")
model = PeftModel.from_pretrained(
model, adapter_repo,
revision=adapter_branch,
token=token,
)
adapter_info = {"repo": adapter_repo, "branch": adapter_branch}
model.eval()
n_params = sum(p.numel() for p in model.parameters()) / 1e6
return model, tokenizer, {
"base": base,
"adapter": adapter_info,
"params_m": round(n_params, 1),
}
def run_per_domain_eval(
model, tokenizer, device: str,
eval_set: dict, judge_key: str,
limit_per_domain: Optional[int] = None,
judge_provider: str = "deepseek",
judge_base_url: str = "https://api.deepseek.com/v1",
judge_model: str = "deepseek-v4-pro",
) -> dict:
"""Run every question in eval_set, judge each answer, return aggregate.
The judge_* trio is pinned for the entire batch so every judgment is
apples-to-apples (no mid-batch grader switch). Caller passes the
resolver-resolved primary in.
"""
judgments: list[Judgment] = []
raw_answers: list[dict] = []
for domain, blob in eval_set["domains"].items():
questions = blob["questions"]
if limit_per_domain is not None:
questions = questions[:limit_per_domain]
for q in questions:
prompt = q["prompt"]
t0 = time.perf_counter()
answer = _generate(model, tokenizer, prompt, max_new_tokens=512, device=device)
gen_s = time.perf_counter() - t0
j = judge_one(
question_id=q["id"],
domain=domain,
prompt=prompt,
rubric=q["rubric"],
citation=q["citation"],
model_answer=answer,
api_key=judge_key,
provider=judge_provider,
base_url=judge_base_url,
model=judge_model,
)
judgments.append(j)
raw_answers.append({
"id": q["id"],
"domain": domain,
"difficulty": q.get("difficulty"),
"prompt": prompt,
"answer": answer,
"judge_label": j.label,
"judge_reasoning": j.reasoning,
"gen_s": round(gen_s, 2),
})
print(
f" [{q['id']:<22}] {j.label:<8} ({gen_s:.1f}s gen) {q['prompt'][:60]}",
flush=True,
)
agg = aggregate_judgments(judgments)
return {
"overall_score": agg["overall_score"],
"n_total": agg["n_total"],
"by_domain": agg["by_domain"],
"answers": raw_answers,
}
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--base", required=True,
help="HF base model id, e.g. HuggingFaceTB/SmolLM2-360M-Instruct")
p.add_argument("--adapter", default=None,
help="optional adapter as repo_id:branch, e.g. cuilabs/bee-cell:cybersecurity-2026-04-28-1221")
p.add_argument("--device", default=None,
help="device override; default = mps if available, else cpu")
p.add_argument("--output-dir", default=None,
help="default: data/eval_reports/matrix/")
p.add_argument("--limit", type=int, default=None,
help="cap questions per domain (smoke testing)")
args = p.parse_args()
import torch # noqa: E402
device = args.device or ("mps" if torch.backends.mps.is_available() else "cpu")
output_dir = Path(args.output_dir or REPO_ROOT / "data/eval_reports/matrix")
output_dir.mkdir(parents=True, exist_ok=True)
env = _load_env_keys()
# Hydrate so resolve_judge() picks up keys from .env in fresh shells.
for k, v in env.items():
os.environ.setdefault(k, v)
from judge import resolve_judge # type: ignore[import-not-found]
judge_provider, judge_base_url, judge_model, judge_key = resolve_judge()
print(f" judge: {judge_provider}:{judge_model} via {judge_base_url}")
hf_token = env.get("HF_TOKEN") or os.environ.get("HF_TOKEN", "")
if hf_token:
os.environ["HF_TOKEN"] = hf_token
os.environ["HUGGINGFACE_HUB_TOKEN"] = hf_token
eval_set = json.loads(
(REPO_ROOT / "scripts/eval/per_domain_eval_set.json").read_text(encoding="utf-8")
)
started = datetime.datetime.now(datetime.timezone.utc).isoformat()
t_start = time.perf_counter()
print(f"=== loading {args.base}" + (f" + {args.adapter}" if args.adapter else "") + f" on {device}")
model, tokenizer, model_info = _load_model(args.base, args.adapter, device)
print(f" {model_info['params_m']:.1f}M params")
print(f"\n=== throughput ({device})")
throughput = _measure_throughput(model, tokenizer, device)
print(f" {throughput['aggregate']['tok_per_s']:.1f} tok/s aggregate")
print(f"\n=== per-domain eval ({sum(len(b['questions']) for b in eval_set['domains'].values())} questions)")
pd = run_per_domain_eval(
model, tokenizer, device, eval_set, judge_key,
limit_per_domain=args.limit,
judge_provider=judge_provider,
judge_base_url=judge_base_url,
judge_model=judge_model,
)
completed = datetime.datetime.now(datetime.timezone.utc).isoformat()
total = round(time.perf_counter() - t_start, 1)
# Filename: <base-short>__<adapter-short>.json
base_short = args.base.split("/")[-1]
if args.adapter:
adapter_short = args.adapter.replace(":", "__").split("/")[-1]
out_name = f"{base_short}__{adapter_short}.json"
else:
out_name = f"{base_short}__base.json"
out_path = output_dir / out_name
report = {
"model": model_info,
"device": device,
"started_at": started,
"completed_at": completed,
"total_time_s": total,
"throughput": throughput,
"per_domain_eval": {
"judge_provider": judge_provider,
"judge_model": judge_model,
"overall_score": pd["overall_score"],
"n_total": pd["n_total"],
"by_domain": pd["by_domain"],
"answers": pd["answers"],
},
}
out_path.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\n=== DONE in {total}s")
print(f" per-domain overall: {pd['overall_score']:.3f} ({pd['n_total']} questions)")
print(f" by domain:")
for dom, d in sorted(pd["by_domain"].items()):
print(f" {dom:<18} {d['score']:.3f} ({d['labels']['correct']}/{d['labels']['partial']}/{d['labels']['wrong']}/{d['labels']['refused']})")
print(f" throughput: {throughput['aggregate']['tok_per_s']:.1f} tok/s")
print(f" saved: {out_path}")
if __name__ == "__main__":
main()