modelchorus-evals / evals /run_eval.py
brycemeetkai's picture
Mirror evals/ from c1978f83e59e
caf09eb verified
#!/usr/bin/env python3
"""Run lm-eval across multiple models and tasks defined in eval_config.toml.
Uses the lm_eval Python API directly (no subprocess).
Usage:
python run_eval.py # all models × all tasks
python run_eval.py --models Qwen3.5-27b # one model, all tasks
python run_eval.py --tasks wikipedia # all models, one task
python run_eval.py --models Qwen3.5-27b --tasks wikipedia
python run_eval.py --dry-run # print params only
Generation parameters (max_gen_toks, temperature, top_p, top_k, min_p, …) are NOT
baked into the task YAMLs so they can vary per model. Extra keys are merged into
the chat-completions JSON body (OpenRouter-style); you do not need a separate
``extra_body`` wrapper. Set them:
1. Per-model in eval_config.toml → gen_kwargs = "max_gen_toks=4096,temperature=0.6"
2. Via CLI (overrides everything) → --gen-kwargs "max_gen_toks=8192,top_k=20,min_p=0.0"
3. Parallel API calls → [defaults] num_concurrent or --num-concurrent 50
Reasoning: monkey-patches LocalChatCompletion.parse_generations so API fields
``reasoning`` / ``reasoning_content`` are preserved (wire format matches
``f1_utils`` / ``judge_utils`` ``think`` tags). After each run, samples are split:
- resps / filtered_resps: final model text only
- reasoning_content: parallel nested shape (or "")
Optional [hf_hub] in eval_config.toml:
- **lm_eval_hub_upload** (default true): lm-eval ``EvaluationTracker`` (results +
samples to details/results repos).
- **custom_samples_repo**: single dataset repo; uploads **samples only** as
``{slug(name__model_id)}/{lang}/{task}_{YYYY-MM-DD}.jsonl`` (set
**lm_eval_hub_upload** false to avoid duplicate Hub uploads). Folder
is the slug of the composite ``name__model_id`` so two providers that
expose the same ``model_id`` get distinct folders. ``name`` here is
the raw ``models.name`` from Convex — alias is intentionally never
used in the routing identifier.
Without Hub, per-task ``samples_<task>_<timestamp>.jsonl`` is still written next
to ``samples.json`` when log_samples is true. JSONL rows omit ``*_hash`` fields,
add a plain ``prompt`` string, and keep ``target`` at top level (no nested
``gen_args_*`` mirror of lm-eval's Hub format).
"""
from __future__ import annotations
import argparse
import copy
import io
import json
import logging
import os
import re
import sys
import traceback
from datetime import datetime
import lm_eval
import f1_utils # noqa: F401 — registers regex_last for task YAMLs
import _extra_body # shared mutable dict; see module docstring for the __main__ hazard
import responses_model # noqa: F401 — registers `local-responses-completions`
from register_sas_encoder_metric import ensure_sas_encoder_metric
from lm_eval.utils import handle_non_serializable, make_table, sanitize_list
ensure_sas_encoder_metric()
logger = logging.getLogger(__name__)
# Must match multilingual task helpers (f1_utils, judge_utils, spanish/qa/utils).
_THINK_OPEN = "<think>"
_THINK_CLOSE = "</think>"
def _patch_chat_completion():
"""Monkey-patch LocalChatCompletion for reasoning pass-through and extra_body."""
from lm_eval.models import openai_completions as oc
_orig_create_payload = oc.LocalChatCompletion._create_payload
def _create_payload_with_extra(self, *args, **kwargs):
payload = _orig_create_payload(self, *args, **kwargs)
# Read via the module attribute so both the __main__ run_eval and the
# `import run_eval` instance see the same dict (Python loads them as
# distinct module objects with their own globals).
if _extra_body.value:
payload.update(_extra_body.value)
return payload
@staticmethod
def parse_generations(outputs, **kwargs):
res = []
if not isinstance(outputs, list):
outputs = [outputs]
for out in outputs:
try:
tmp = [None] * len(out["choices"])
for choices in out["choices"]:
msg = choices.get("message") or {}
content = msg.get("content")
if content is None:
content = ""
reasoning = msg.get("reasoning") or msg.get("reasoning_content") or ""
if reasoning is None:
reasoning = ""
if reasoning:
content = f"{_THINK_OPEN}{reasoning}{_THINK_CLOSE}{content}"
tmp[choices["index"]] = content
except Exception:
tmp = [""]
res = res + tmp
return res
oc.LocalChatCompletion._create_payload = _create_payload_with_extra
oc.LocalChatCompletion.parse_generations = parse_generations
_patch_chat_completion()
try:
import tomllib # Python 3.11+ stdlib
except ModuleNotFoundError:
try:
import tomli as tomllib # pip install tomli (for Python < 3.11)
except ModuleNotFoundError:
raise SystemExit(
"No TOML parser available. Either:\n"
" • Use Python 3.11+ (has tomllib), e.g. python3.11 run_eval.py …\n"
" • Or install tomli in this environment: pip install tomli\n"
f" (current interpreter: {sys.executable})"
) from None
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DEFAULT_CONFIG = os.path.join(SCRIPT_DIR, "eval_config.toml")
# Load .env.local from project root (one level up from evals/)
_ENV_LOCAL = os.path.join(SCRIPT_DIR, "..", ".env.local")
if os.path.isfile(_ENV_LOCAL):
with open(_ENV_LOCAL) as _f:
for _line in _f:
_line = _line.strip()
if _line and not _line.startswith("#") and "=" in _line:
_k, _, _v = _line.partition("=")
_k, _v = _k.strip(), _v.strip()
if _k and _v and _k not in os.environ:
os.environ[_k] = _v
def load_config(path: str) -> dict:
with open(path, "rb") as f:
return tomllib.load(f)
def _ensure_lm_eval_api_key():
"""local-chat-completions uses OPENAI_API_KEY for the Bearer header only.
No-op. The OpenRouter→OPENAI_API_KEY fallback is now done exactly once
in main() before the model loop, so per-model key swaps (including
intentional clears when a model's api_key_env is unset) are respected
for subsequent models.
"""
return
def _split_reasoning_from_text(text: str) -> tuple[str, str]:
"""If text was wrapped with think tags, return (reasoning, content); else ('', text)."""
if not isinstance(text, str) or not text.startswith(_THINK_OPEN):
return "", text
idx = text.find(_THINK_CLOSE, len(_THINK_OPEN))
if idx == -1:
return "", text
reasoning = text[len(_THINK_OPEN) : idx]
content = text[idx + len(_THINK_CLOSE) :]
return reasoning, content
def _split_resps_structure(resps):
"""Return (content_resps, reasoning_resps) with identical nesting."""
if isinstance(resps, str):
reasoning, content = _split_reasoning_from_text(resps)
return content, reasoning
if isinstance(resps, list):
contents, reasons = [], []
for x in resps:
c, r = _split_resps_structure(x)
contents.append(c)
reasons.append(r)
return contents, reasons
return resps, ""
def _add_reasoning_content_to_samples(samples: dict) -> None:
"""Mutate samples: strip think wrapper from resps and filtered_resps; add reasoning_content."""
for _task_name, rows in samples.items():
for sample in rows:
if "resps" not in sample:
continue
content, reasoning = _split_resps_structure(sample["resps"])
sample["resps"] = content
sample["reasoning_content"] = reasoning
if "filtered_resps" in sample:
fc, _ = _split_resps_structure(sample["filtered_resps"])
sample["filtered_resps"] = fc
def _is_lm_eval_generation_kwargs(d: dict) -> bool:
"""Distinguish lm_eval gen config from chat messages / multimodal blobs."""
return any(
k in d
for k in (
"until",
"max_gen_toks",
"do_sample",
"temperature",
"top_p",
"top_k",
"min_p",
"repeats",
)
)
def _text_from_prompt_ctx(part) -> list[str]:
"""Flatten ctx from Instance.args: str, nested list/tuple, or chat message dicts."""
out: list[str] = []
if part is None:
return out
if isinstance(part, str):
s = part.strip()
if s.startswith("[") and '"role"' in s and '"content"' in s:
try:
parsed = json.loads(s)
except json.JSONDecodeError:
return [part]
if isinstance(parsed, list):
return _text_from_prompt_ctx(parsed)
return [part]
if isinstance(part, (list, tuple)):
for x in part:
out.extend(_text_from_prompt_ctx(x))
return out
if isinstance(part, dict):
if "content" in part:
out.extend(_text_from_prompt_ctx(part["content"]))
elif isinstance(part.get("text"), str):
out.append(part["text"])
return out
return out
def _extract_prompt_and_gen_kwargs(arguments) -> tuple[str, dict | None]:
"""Pull human-readable prompt text and optional generation kwargs from logged arguments."""
if not arguments:
return "", None
chunks: list[str] = []
gen_kwargs: dict | None = None
for req_args in arguments:
if not isinstance(req_args, (list, tuple)):
continue
for item in req_args:
if isinstance(item, dict) and _is_lm_eval_generation_kwargs(item):
if gen_kwargs is None:
gen_kwargs = item
continue
chunks.extend(_text_from_prompt_ctx(item))
prompt = "\n".join(s for s in chunks if s)
return prompt, gen_kwargs
def _sample_row_for_jsonl(sample: dict) -> dict:
"""Readable JSONL row for local files and custom Hub upload (not lm-eval tracker)."""
out = copy.deepcopy(sample)
for key in ("doc_hash", "prompt_hash", "target_hash"):
out.pop(key, None)
prompt, gen_kwargs = _extract_prompt_and_gen_kwargs(out.get("arguments"))
out["prompt"] = prompt
out["target"] = str(out.get("target", ""))
out.pop("arguments", None)
if gen_kwargs is not None:
out["gen_kwargs"] = gen_kwargs
out["resps"] = sanitize_list(out["resps"])
out["filtered_resps"] = sanitize_list(out["filtered_resps"])
if "reasoning_content" in out:
out["reasoning_content"] = sanitize_list(out["reasoning_content"])
return out
def _rows_to_jsonl_bytes(rows: list) -> bytes:
"""UTF-8 JSONL; one object per line via _sample_row_for_jsonl."""
lines = []
for sample in rows:
row = _sample_row_for_jsonl(sample)
lines.append(
json.dumps(
row,
default=handle_non_serializable,
ensure_ascii=False,
)
+ "\n"
)
return "".join(lines).encode("utf-8")
def _hf_path_segment(name: str) -> str:
"""Safe path segment for Hub (no slashes or odd chars).
Excludes `.` because HF Jobs' tag validator rejects dots (model_ids
like "minimax/minimax-m2.5" silently failed to spawn at the
POST /api/jobs validation step with "tags must contain only
alphanumeric characters, '-', '_', or '='"). The label validator
accepts dots; tags do not. The slug is reused as both, so we conform
to the strictest downstream rule.
"""
s = (name or "").strip()
s = re.sub(r"[^a-zA-Z0-9_-]+", "_", s)
return s.strip("_-") or "unknown"
def _model_slug(model: dict) -> str:
"""Canonical eval-pipeline identifier:
slug(`name__model_id__provider_name`) when provider is known,
slug(`name__model_id`) otherwise (TOML-only local runs).
This is the single string used for:
* `--models` filter argument (`run_eval.py --models <slug>`),
* HF Job `model` label (slug-safe by construction),
* HF dataset upload folder.
The provider component closes the last remaining ambiguity: if two
Convex `models` rows share BOTH `name` and `model_id` but route to
different providers (e.g. one OpenRouter row and one direct row,
both labeled "GPT-5 Nano" with id `openai/gpt-5-nano`), they're
distinct entities and need distinct slugs to be benchmarked
separately. Without provider in the slug, the dispatcher's
duplicate-slug guard refuses to spawn for this otherwise-legitimate
case.
`name` is the RAW `models.name` from Convex (alias-stripped by
`models:listForEvals`). Alias must never participate in routing.
`provider_name` comes from `providers.name` on the Convex side and
is absent for TOML-only local runs (no Convex roster) — in that
case the slug is 2-part and folders share the pre-refactor shape.
"""
parts = [model.get("name", ""), model.get("model_id", "")]
provider = (model.get("provider_name") or "").strip()
if provider:
parts.append(provider)
return _hf_path_segment("__".join(parts))
def _infer_lang_folder(category_task: str, override: str) -> str:
"""e.g. swahili_sib200 → swahili; override from config wins."""
o = (override or "").strip()
if o:
return _hf_path_segment(o)
parts = category_task.split("_")
if len(parts) >= 2:
return _hf_path_segment(parts[0])
return _hf_path_segment(category_task)
def _write_local_samples_jsonl(output_path: str, task_name: str, rows: list) -> None:
"""One JSON object per line (readable prompt/target, no hash fields)."""
os.makedirs(output_path, exist_ok=True)
date_id = datetime.now().isoformat().replace(":", "-")
filepath = os.path.join(output_path, f"samples_{task_name}_{date_id}.jsonl")
with open(filepath, "wb") as f:
f.write(_rows_to_jsonl_bytes(rows))
def _upload_custom_samples_repo(
*,
repo_id: str,
token: str,
model_folder: str,
lang_folder: str,
category: str,
date_yyyy_mm_dd: str,
rows: list,
private: bool,
) -> None:
"""Upload a single JSONL to dataset repo at model/lang/category_date.jsonl."""
try:
from huggingface_hub import HfApi
except ModuleNotFoundError as e:
raise RuntimeError(
"huggingface_hub is required for custom_samples_repo. "
"Install with: pip install huggingface_hub"
) from e
mf = _hf_path_segment(model_folder)
lf = _hf_path_segment(lang_folder)
cat = _hf_path_segment(category)
filename = f"{cat}_{date_yyyy_mm_dd}.jsonl"
path_in_repo = f"{mf}/{lf}/{filename}"
api = HfApi(token=token)
api.create_repo(repo_id, repo_type="dataset", private=private, exist_ok=True)
data = _rows_to_jsonl_bytes(rows)
api.upload_file(
path_or_fileobj=io.BytesIO(data),
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"samples {filename}",
)
logger.info(
"Uploaded samples to Hugging Face dataset %s (path: %s)",
repo_id,
path_in_repo,
)
def _build_evaluation_tracker(output_path: str, hf_hub: dict | None):
"""Return EvaluationTracker if Hub push is enabled, else None."""
if not hf_hub:
return None
if hf_hub.get("lm_eval_hub_upload", True) is False:
return None
push_s = bool(hf_hub.get("push_samples_to_hub", False))
push_r = bool(hf_hub.get("push_results_to_hub", False))
if not push_s and not push_r:
return None
token = (hf_hub.get("token") or os.environ.get("HF_TOKEN") or "").strip()
from lm_eval.loggers.evaluation_tracker import EvaluationTracker
try:
return EvaluationTracker(
output_path=output_path,
hub_results_org=str(hf_hub.get("hub_results_org", "") or ""),
details_repo_name=str(hf_hub.get("details_repo_name", "") or ""),
results_repo_name=str(hf_hub.get("results_repo_name", "") or ""),
push_results_to_hub=push_r,
push_samples_to_hub=push_s,
public_repo=bool(hf_hub.get("public_repo", False)),
token=token,
gated=bool(hf_hub.get("gated", False)),
)
except ValueError as e:
logger.warning("HF Hub upload disabled: %s", e)
return None
def run_single_eval(
*,
include_path: str,
task_name: str,
model_id: str,
model_display_name: str,
model_slug: str,
output_path: str,
base_url: str,
num_concurrent: int,
num_fewshot: int,
apply_chat_template: bool,
log_samples: bool,
gen_kwargs: str | None = None,
hf_hub: dict | None = None,
endpoint_kind: str = "chat_completions",
max_retries: int = 3,
timeout: int = 300,
) -> dict | None:
"""Run a single lm_eval evaluation via the Python API and return results.
``max_retries`` and ``timeout`` are forwarded into lm-eval's TemplateAPI
via ``model_args``. Defaults match lm-eval's own stock values; the TOML
``[defaults]`` block typically overrides them for Functionary endpoints
(see eval_config.toml for the rationale).
"""
_ensure_lm_eval_api_key()
model_args = (
f"model={model_id},base_url={base_url},num_concurrent={num_concurrent},"
f"max_retries={max_retries},timeout={timeout}"
)
tracker = _build_evaluation_tracker(output_path, hf_hub)
lm_eval_model = (
"local-responses-completions" if endpoint_kind == "responses" else "local-chat-completions"
)
results = lm_eval.simple_evaluate(
model=lm_eval_model,
model_args=model_args,
tasks=[task_name],
num_fewshot=num_fewshot,
log_samples=log_samples,
task_manager=lm_eval.tasks.TaskManager(include_path=[include_path]),
apply_chat_template=apply_chat_template if apply_chat_template else None,
gen_kwargs=gen_kwargs,
evaluation_tracker=tracker,
)
if results and log_samples and results.get("samples"):
_add_reasoning_content_to_samples(results["samples"])
if results and output_path:
os.makedirs(output_path, exist_ok=True)
results_file = os.path.join(output_path, "results.json")
dumped = {k: v for k, v in results.items() if k != "samples"}
with open(results_file, "w") as f:
json.dump(dumped, f, indent=2, default=str)
if log_samples and "samples" in results:
samples_file = os.path.join(output_path, "samples.json")
with open(samples_file, "w") as f:
json.dump(results["samples"], f, indent=2, default=str)
if tracker is None:
for tname, rows in results["samples"].items():
_write_local_samples_jsonl(output_path, tname, rows)
if tracker and results:
samples = results.get("samples") if log_samples else None
results_for_hub = {k: v for k, v in results.items() if k != "samples"}
try:
tracker.save_results_aggregated(
results=results_for_hub,
samples=samples,
)
if log_samples and samples:
for tname in samples:
tracker.save_results_samples(task_name=tname, samples=samples[tname])
if tracker.push_results_to_hub or tracker.push_samples_to_hub:
try:
tracker.recreate_metadata_card()
except Exception as e:
logger.warning(
"Could not recreate HF metadata card (repo/auth?). Local saves OK."
)
logger.info(repr(e))
except Exception as e:
logger.warning("Hugging Face upload or tracker save failed.")
logger.info(repr(e))
# Custom single-repo samples layout: {model}/{lang}/{task}_{YYYY-MM-DD}.jsonl
if (
hf_hub
and results
and log_samples
and results.get("samples")
and str(hf_hub.get("custom_samples_repo", "") or "").strip()
):
repo_id = (os.environ.get("HF_DATASET_REPO") or str(hf_hub["custom_samples_repo"])).strip()
token = (hf_hub.get("token") or os.environ.get("HF_TOKEN") or "").strip()
if not token:
logger.warning("custom_samples_repo set but no HF_TOKEN; skipping custom upload.")
else:
lang_override = str(hf_hub.get("samples_lang", "") or "")
private = bool(hf_hub.get("custom_samples_repo_private", True))
date_day = datetime.now().strftime("%Y-%m-%d")
for category, rows in results["samples"].items():
lang = _infer_lang_folder(category, lang_override)
try:
_upload_custom_samples_repo(
repo_id=repo_id,
token=token,
# Folder = `model_slug` = slug(name__model_id__provider).
# Pre-computed by the caller so the same string is used
# for the HF Job label, the `--models` filter, and the
# upload folder — single source of truth. The provider
# component disambiguates two Convex rows that share
# BOTH name and model_id but route via different
# providers (OpenRouter vs direct, etc).
model_folder=model_slug,
lang_folder=lang,
category=category,
date_yyyy_mm_dd=date_day,
rows=rows,
private=private,
)
except Exception as e:
logger.warning("Custom HF samples upload failed for %s: %s", category, e)
logger.info(repr(e))
# Upload aggregated results.json alongside the samples
try:
from huggingface_hub import HfApi
results_data = {
"model_id": model_id,
"model_name": model_display_name,
"results": results.get("results", {}),
"groups": results.get("groups", {}),
"group_subtasks": results.get("group_subtasks", {}),
}
results_bytes = json.dumps(results_data, indent=2, default=str).encode("utf-8")
# Same canonical slug as the samples upload above —
# slug(name__model_id__provider). Single source of truth for
# all eval-pipeline routing identifiers (HF label, --models
# filter, dataset folder).
results_path = f"{model_slug}/results_{task_name}_{date_day}.json"
api = HfApi(token=token)
api.upload_file(
path_or_fileobj=io.BytesIO(results_bytes),
path_in_repo=results_path,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"results {task_name} {date_day}",
)
logger.info("Uploaded results to %s (path: %s)", repo_id, results_path)
except Exception as e:
logger.warning("Custom HF results upload failed: %s", e)
logger.info(repr(e))
return results
def _filter_models_by_cost(
models: list[dict],
tasks: list[dict],
max_cost_usd: float,
) -> list[dict]:
"""Estimate per-model cost for the given tasks; drop any over max_cost_usd.
Models whose pricing cannot be resolved (no OpenRouter entry and no TOML
`pricing` block) are also dropped, with a warning. Prints a kept/dropped
table so the run log explains exactly why each model was included.
"""
from cost_core import (
cost_per_model,
fetch_openrouter_pricing,
format_money,
measure_tasks,
)
print(f"Estimating cost across {len(tasks)} task group(s) for {len(models)} model(s)…")
task_stats = measure_tasks(
[t["name"] for t in tasks],
include_path=SCRIPT_DIR,
)
total_input = sum(t.input_tokens for t in task_stats)
total_output = sum(t.output_tokens for t in task_stats)
pricing_data = fetch_openrouter_pricing()
# Distinguish "OpenRouter unreachable / returned nothing" from "specific
# model has no entry". The former silently dropped every OpenRouter model
# from the roster in earlier versions and produced a green workflow with
# zero benchmarks run; raising here surfaces the outage via the workflow's
# `if: failure()` Slack handler instead of degrading silently.
if not pricing_data:
needs_pricing = [
m for m in models
if not (
isinstance(m.get("pricing"), dict)
and "input_per_1m" in m["pricing"]
and "output_per_1m" in m["pricing"]
)
]
if needs_pricing:
names = ", ".join(m["name"] for m in needs_pricing[:5])
more = "" if len(needs_pricing) <= 5 else f" (+{len(needs_pricing) - 5} more)"
raise RuntimeError(
f"OpenRouter pricing fetch returned no entries; cannot estimate "
f"cost for {len(needs_pricing)} model(s) lacking a TOML pricing "
f"block: {names}{more}. Failing the discovery step so this "
f"surfaces in alerting rather than silently dropping benchmarks."
)
rows = cost_per_model(
models,
total_input_tokens=total_input,
total_output_tokens=total_output,
openrouter_pricing=pricing_data,
)
# Map row -> model dict by (name, model_id, provider_name) so two
# Convex rows that share name AND model_id but route through different
# providers don't collapse to a single entry here. Without provider in
# the key, the cost-filter would drop one of the two before the
# dispatcher ever sees it — defeating the provider-in-slug routing
# work downstream. `provider_name` is empty string for TOML-only
# rows (no Convex provider info), which still keeps each TOML entry
# distinct because pure TOML rosters can't have same-(name, model_id)
# duplicates by construction.
by_key = {(m["name"], m["model_id"], m.get("provider_name", "")): m for m in models}
kept: list[dict] = []
print()
print(f"{'Model':<32} {'Total $':>10} Decision")
print("-" * 70)
for r in sorted(rows, key=lambda r: (r.total_cost is None, r.total_cost or 0)):
model_entry = by_key[(r.name, r.model_id, r.provider_name)]
forced = bool(model_entry.get("force_include"))
if r.total_cost is None:
if forced:
kept.append(model_entry)
print(f"{r.name:<32} {'—':>10} keep (force_include)")
else:
print(f"{r.name:<32} {'—':>10} drop ({r.note})")
continue
if forced and r.total_cost > max_cost_usd:
kept.append(model_entry)
print(
f"{r.name:<32} {format_money(r.total_cost):>10} "
f"keep (force_include, over ${max_cost_usd:.2f})"
)
elif r.total_cost <= max_cost_usd:
kept.append(model_entry)
print(f"{r.name:<32} {format_money(r.total_cost):>10} keep")
else:
print(
f"{r.name:<32} {format_money(r.total_cost):>10} "
f"drop (over ${max_cost_usd:.2f})"
)
print()
return kept
def main():
parser = argparse.ArgumentParser(description="Run lm-eval from TOML config")
parser.add_argument("--config", default=DEFAULT_CONFIG, help="Path to TOML config")
parser.add_argument("--models", nargs="*", help="Filter to specific model name(s)")
parser.add_argument(
"--tasks",
nargs="*",
help="Task/group names: filter to [[tasks]] in TOML, or any lm-eval task name",
)
parser.add_argument("--dry-run", action="store_true", help="Print params without running")
parser.add_argument(
"--gen-kwargs",
type=str,
default=None,
help='Comma-separated gen params merged into API JSON, e.g. '
'"max_gen_toks=8192,temperature=0.6,top_p=0.95,top_k=20,min_p=0.0,until=[\'<|endoftext|>\']"',
)
parser.add_argument(
"--num-concurrent",
type=int,
default=None,
metavar="N",
help="Override parallel in-flight API requests (default: [defaults].num_concurrent in TOML)",
)
parser.add_argument(
"--extra-body",
type=str,
default=None,
help='JSON string merged into every API request body, e.g. '
"""'{"provider": {"order": ["alibaba"]}}'""",
)
parser.add_argument(
"--from-convex",
action="store_true",
help="Discover the model roster from a live Convex deployment "
"(CONVEX_URL env var) instead of reading [[models]] from TOML. "
"TOML still supplies per-model overrides (base_url, api_key_env, "
"pricing, endpoint_kind) by matching on model_id.",
)
parser.add_argument(
"--max-cost",
type=float,
default=None,
metavar="USD",
help="Drop any discovered model whose estimated cost per full run "
"exceeds this many USD. Models with no resolvable pricing (no "
"OpenRouter entry and no TOML pricing block) are also dropped. "
"Has no effect unless used with --from-convex (or in combination "
"with the TOML roster, in which case it filters that too).",
)
parser.add_argument(
"--list-models",
type=str,
default=None,
metavar="PATH",
help="Resolve the roster (apply --from-convex / --max-cost), write "
"the full filtered model entries as a JSON array to PATH, print one "
"model name per line to stdout, and exit. Lets a workflow run each "
"model in its own subprocess via --models-file, so a session/asyncio "
"crash on one model can't poison the others.",
)
parser.add_argument(
"--models-file",
type=str,
default=None,
metavar="PATH",
help="Load the model roster from a JSON file (array of model entries) "
"instead of querying Convex/reading TOML [[models]]. Used together "
"with --models NAME for per-model subprocess invocations.",
)
args = parser.parse_args()
if args.extra_body:
_extra_body.value = json.loads(args.extra_body)
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s")
# One-time startup fallback: if OPENAI_API_KEY is empty but OPENROUTER_API_KEY
# is set, populate OPENAI_API_KEY for models that don't override api_key_env.
# This runs BEFORE the model loop so prev_api_key captures the correct value
# and restoration between models works properly.
if not os.environ.get("OPENAI_API_KEY", "").strip():
or_key = os.environ.get("OPENROUTER_API_KEY", "").strip()
if or_key:
os.environ["OPENAI_API_KEY"] = or_key
cfg = load_config(args.config)
defaults = cfg["defaults"]
hf_hub = cfg.get("hf_hub")
if isinstance(hf_hub, dict):
hf_hub = {k: v for k, v in hf_hub.items()}
else:
hf_hub = None
if args.models_file:
with open(args.models_file) as f:
models = json.load(f)
if not isinstance(models, list):
print(f"--models-file {args.models_file} must contain a JSON array", file=sys.stderr)
sys.exit(1)
print(f"Loaded {len(models)} model(s) from {args.models_file}.")
elif args.from_convex:
from model_discovery import fetch_active_models
convex_url = os.environ.get("CONVEX_URL", "").strip()
models = fetch_active_models(convex_url, cfg.get("models", []))
print(f"Discovered {len(models)} active text model(s) from Convex.")
else:
models = cfg["models"]
tasks = cfg["tasks"]
if args.models:
# Per-arg matching with precedence: slug → model_id → name.
# Each `--models` argument is resolved independently and the
# first matching tier wins for that argument; lower tiers are
# NOT tried as fallbacks for the same arg. This is critical
# because:
# * The dispatcher passes the canonical eval slug
# (slug(name__model_id__provider)). A slug uniquely
# identifies one entry. If we ALSO matched by `model_id`
# for the same arg, every other entry sharing that
# `model_id` (e.g. the same model exposed via a second
# provider) would also match, and the child would run
# all of them — duplicating work and double-uploading.
# * Older code matched any of {slug, model_id, name} across
# ALL args at once, which silently dropped name-only matches
# in mixed invocations like `--models gpt-5-nano my-id`
# (if `my-id` matched a model_id, the name-only `gpt-5-nano`
# was never tried).
# Accepts mixed input — slugs from the dispatcher AND raw
# name/model_id from devs typing into a shell — without the
# fallthrough hazards.
matched: list[dict] = []
matched_ids: set[int] = set()
unmatched_args: list[str] = []
def _add(entries: list[dict]) -> None:
for entry in entries:
key = id(entry)
if key not in matched_ids:
matched_ids.add(key)
matched.append(entry)
for arg in args.models:
arg_lower = arg.lower()
slug_hits = [m for m in models if _model_slug(m).lower() == arg_lower]
if slug_hits:
_add(slug_hits)
continue
id_hits = [m for m in models if m.get("model_id", "").lower() == arg_lower]
if id_hits:
_add(id_hits)
continue
name_hits = [m for m in models if m["name"].lower() == arg_lower]
if name_hits:
_add(name_hits)
continue
unmatched_args.append(arg)
if not matched:
known = sorted(_model_slug(m) for m in models)
print(f"No models matched: {args.models}", file=sys.stderr)
print(
f"Available identifiers (slug(name__model_id__provider)): {known}",
file=sys.stderr,
)
sys.exit(1)
if unmatched_args:
# Some args matched, some didn't — surface the misses but
# continue with the partial set so a single typo in a long
# invocation doesn't waste the whole run.
print(
f"WARNING: --models had no match for: {unmatched_args}",
file=sys.stderr,
)
models = matched
if args.tasks:
filter_set = {t.lower() for t in args.tasks}
matched = [t for t in tasks if t["name"].lower() in filter_set]
if matched:
tasks = matched
else:
tasks = [{"name": n} for n in args.tasks]
if args.max_cost is not None:
models = _filter_models_by_cost(models, tasks, args.max_cost)
# Write the roster file FIRST (even if empty), so the workflow's
# `jq -r '.[].name' output/roster.json` step always finds the file.
# An empty array → zero names → bash loop iterates zero times → workflow
# logs "Per-model failures: 0 of 0" and finishes cleanly. Without this,
# an over-aggressive --max-cost would race the file's existence and
# produce a misleading "workflow failed" Slack alert.
if args.list_models:
os.makedirs(os.path.dirname(args.list_models) or ".", exist_ok=True)
with open(args.list_models, "w") as f:
json.dump(models, f, indent=2)
for m in models:
print(m["name"])
print(
f"\nWrote {len(models)} model(s) to {args.list_models}.",
file=sys.stderr,
)
return
if args.max_cost is not None and not models:
print(
f"No models passed the --max-cost ${args.max_cost:.2f} threshold; nothing to run.",
file=sys.stderr,
)
sys.exit(0)
total = len(models) * len(tasks)
print(f"Running {len(models)} model(s) x {len(tasks)} task(s) = {total} eval(s)\n")
failures = 0
for i, model in enumerate(models, 1):
# --- Multi-provider support ---
# Each [[models]] entry can optionally override:
# base_url — custom API endpoint (falls back to [defaults].base_url)
# api_key_env — env var name holding the API key (default: OPENAI_API_KEY)
# custom_headers — dict of extra HTTP headers
model_base_url = model.get("base_url", defaults["base_url"])
# Save original API key so we can restore it after this model
prev_api_key = os.environ.get("OPENAI_API_KEY", "")
api_key_env = model.get("api_key_env", "OPENAI_API_KEY")
if api_key_env != "OPENAI_API_KEY":
env_val = os.environ.get(api_key_env, "").strip()
if env_val:
os.environ["OPENAI_API_KEY"] = env_val
else:
# Clear the stale key rather than silently inheriting the
# previous model's credentials. This forces a clean auth
# failure on the misconfigured model instead of making it
# use the wrong key.
logger.warning(
"Model %s requests api_key_env=%s but that var is empty/unset; clearing OPENAI_API_KEY for this model.",
model["name"],
api_key_env,
)
os.environ["OPENAI_API_KEY"] = ""
model_custom_headers = model.get("custom_headers", {})
if model_custom_headers:
prev_extra = _extra_body.value.copy()
_extra_body.value = {**_extra_body.value, "extra_headers": model_custom_headers}
for j, task in enumerate(tasks, 1):
run_idx = (i - 1) * len(tasks) + j
output_path = os.path.join(
defaults.get("output_path", "output/results"),
model["name"],
task["name"],
)
gen_kwargs = args.gen_kwargs or model.get("gen_kwargs")
num_concurrent = (
args.num_concurrent
if args.num_concurrent is not None
else defaults.get("num_concurrent", 5)
)
# Resilience knobs read from [defaults]; see eval_config.toml.
# Stock lm-eval defaults (3 / 300) are kept as fallbacks so the
# behavior is unchanged when the TOML doesn't override them.
max_retries = int(defaults.get("max_retries", 3))
timeout = int(defaults.get("timeout", 300))
eval_kwargs = dict(
include_path=SCRIPT_DIR,
task_name=task["name"],
model_id=model["model_id"],
model_display_name=model["name"],
# Pre-compute the canonical slug once per (model, task) so the
# HF upload folder is guaranteed to match the dispatcher's
# spawn label and the child's `--models` arg.
model_slug=_model_slug(model),
output_path=output_path,
base_url=model_base_url,
num_concurrent=num_concurrent,
num_fewshot=defaults.get("num_fewshot", 0),
apply_chat_template=defaults.get("apply_chat_template", True),
log_samples=defaults.get("log_samples", True),
gen_kwargs=gen_kwargs,
hf_hub=hf_hub,
endpoint_kind=model.get("endpoint_kind", "chat_completions"),
max_retries=max_retries,
timeout=timeout,
)
header = f"[{run_idx}/{total}] {model['name']} x {task['name']}"
print(f"{'-' * 60}")
print(f" {header}")
print(f" -> params: {eval_kwargs}")
print(f"{'-' * 60}")
if args.dry_run:
continue
try:
results = run_single_eval(**eval_kwargs)
if results:
table_str = make_table(results)
# Replace Unicode chars that fail on Windows cp1252
table_str = table_str.encode("ascii", "replace").decode("ascii")
print(table_str)
print(f"\n[DONE] {header}\n")
except Exception:
traceback.print_exc()
print(f"\n[FAILED] {header}\n", file=sys.stderr)
failures += 1
# Restore state after processing all tasks for this model
if model_custom_headers:
_extra_body.value = prev_extra
if api_key_env != "OPENAI_API_KEY":
os.environ["OPENAI_API_KEY"] = prev_api_key
if failures:
print(f"\n{failures}/{total} evaluation(s) failed.", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()