#!/usr/bin/env python3 """Run lm-eval across multiple models and tasks defined in eval_config.toml. Uses the lm_eval Python API directly (no subprocess). Usage: python run_eval.py # all models × all tasks python run_eval.py --models Qwen3.5-27b # one model, all tasks python run_eval.py --tasks wikipedia # all models, one task python run_eval.py --models Qwen3.5-27b --tasks wikipedia python run_eval.py --dry-run # print params only Generation parameters (max_gen_toks, temperature, top_p, top_k, min_p, …) are NOT baked into the task YAMLs so they can vary per model. Extra keys are merged into the chat-completions JSON body (OpenRouter-style); you do not need a separate ``extra_body`` wrapper. Set them: 1. Per-model in eval_config.toml → gen_kwargs = "max_gen_toks=4096,temperature=0.6" 2. Via CLI (overrides everything) → --gen-kwargs "max_gen_toks=8192,top_k=20,min_p=0.0" 3. Parallel API calls → [defaults] num_concurrent or --num-concurrent 50 Reasoning: monkey-patches LocalChatCompletion.parse_generations so API fields ``reasoning`` / ``reasoning_content`` are preserved (wire format matches ``f1_utils`` / ``judge_utils`` ``think`` tags). After each run, samples are split: - resps / filtered_resps: final model text only - reasoning_content: parallel nested shape (or "") Optional [hf_hub] in eval_config.toml: - **lm_eval_hub_upload** (default true): lm-eval ``EvaluationTracker`` (results + samples to details/results repos). - **custom_samples_repo**: single dataset repo; uploads **samples only** as ``{slug(name__model_id)}/{lang}/{task}_{YYYY-MM-DD}.jsonl`` (set **lm_eval_hub_upload** false to avoid duplicate Hub uploads). Folder is the slug of the composite ``name__model_id`` so two providers that expose the same ``model_id`` get distinct folders. ``name`` here is the raw ``models.name`` from Convex — alias is intentionally never used in the routing identifier. Without Hub, per-task ``samples__.jsonl`` is still written next to ``samples.json`` when log_samples is true. JSONL rows omit ``*_hash`` fields, add a plain ``prompt`` string, and keep ``target`` at top level (no nested ``gen_args_*`` mirror of lm-eval's Hub format). """ from __future__ import annotations import argparse import copy import io import json import logging import os import re import sys import traceback from datetime import datetime import lm_eval import f1_utils # noqa: F401 — registers regex_last for task YAMLs import _extra_body # shared mutable dict; see module docstring for the __main__ hazard import responses_model # noqa: F401 — registers `local-responses-completions` from register_sas_encoder_metric import ensure_sas_encoder_metric from lm_eval.utils import handle_non_serializable, make_table, sanitize_list ensure_sas_encoder_metric() logger = logging.getLogger(__name__) # Must match multilingual task helpers (f1_utils, judge_utils, spanish/qa/utils). _THINK_OPEN = "" _THINK_CLOSE = "" def _patch_chat_completion(): """Monkey-patch LocalChatCompletion for reasoning pass-through and extra_body.""" from lm_eval.models import openai_completions as oc _orig_create_payload = oc.LocalChatCompletion._create_payload def _create_payload_with_extra(self, *args, **kwargs): payload = _orig_create_payload(self, *args, **kwargs) # Read via the module attribute so both the __main__ run_eval and the # `import run_eval` instance see the same dict (Python loads them as # distinct module objects with their own globals). if _extra_body.value: payload.update(_extra_body.value) return payload @staticmethod def parse_generations(outputs, **kwargs): res = [] if not isinstance(outputs, list): outputs = [outputs] for out in outputs: try: tmp = [None] * len(out["choices"]) for choices in out["choices"]: msg = choices.get("message") or {} content = msg.get("content") if content is None: content = "" reasoning = msg.get("reasoning") or msg.get("reasoning_content") or "" if reasoning is None: reasoning = "" if reasoning: content = f"{_THINK_OPEN}{reasoning}{_THINK_CLOSE}{content}" tmp[choices["index"]] = content except Exception: tmp = [""] res = res + tmp return res oc.LocalChatCompletion._create_payload = _create_payload_with_extra oc.LocalChatCompletion.parse_generations = parse_generations _patch_chat_completion() try: import tomllib # Python 3.11+ stdlib except ModuleNotFoundError: try: import tomli as tomllib # pip install tomli (for Python < 3.11) except ModuleNotFoundError: raise SystemExit( "No TOML parser available. Either:\n" " • Use Python 3.11+ (has tomllib), e.g. python3.11 run_eval.py …\n" " • Or install tomli in this environment: pip install tomli\n" f" (current interpreter: {sys.executable})" ) from None SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) DEFAULT_CONFIG = os.path.join(SCRIPT_DIR, "eval_config.toml") # Load .env.local from project root (one level up from evals/) _ENV_LOCAL = os.path.join(SCRIPT_DIR, "..", ".env.local") if os.path.isfile(_ENV_LOCAL): with open(_ENV_LOCAL) as _f: for _line in _f: _line = _line.strip() if _line and not _line.startswith("#") and "=" in _line: _k, _, _v = _line.partition("=") _k, _v = _k.strip(), _v.strip() if _k and _v and _k not in os.environ: os.environ[_k] = _v def load_config(path: str) -> dict: with open(path, "rb") as f: return tomllib.load(f) def _ensure_lm_eval_api_key(): """local-chat-completions uses OPENAI_API_KEY for the Bearer header only. No-op. The OpenRouter→OPENAI_API_KEY fallback is now done exactly once in main() before the model loop, so per-model key swaps (including intentional clears when a model's api_key_env is unset) are respected for subsequent models. """ return def _split_reasoning_from_text(text: str) -> tuple[str, str]: """If text was wrapped with think tags, return (reasoning, content); else ('', text).""" if not isinstance(text, str) or not text.startswith(_THINK_OPEN): return "", text idx = text.find(_THINK_CLOSE, len(_THINK_OPEN)) if idx == -1: return "", text reasoning = text[len(_THINK_OPEN) : idx] content = text[idx + len(_THINK_CLOSE) :] return reasoning, content def _split_resps_structure(resps): """Return (content_resps, reasoning_resps) with identical nesting.""" if isinstance(resps, str): reasoning, content = _split_reasoning_from_text(resps) return content, reasoning if isinstance(resps, list): contents, reasons = [], [] for x in resps: c, r = _split_resps_structure(x) contents.append(c) reasons.append(r) return contents, reasons return resps, "" def _add_reasoning_content_to_samples(samples: dict) -> None: """Mutate samples: strip think wrapper from resps and filtered_resps; add reasoning_content.""" for _task_name, rows in samples.items(): for sample in rows: if "resps" not in sample: continue content, reasoning = _split_resps_structure(sample["resps"]) sample["resps"] = content sample["reasoning_content"] = reasoning if "filtered_resps" in sample: fc, _ = _split_resps_structure(sample["filtered_resps"]) sample["filtered_resps"] = fc def _is_lm_eval_generation_kwargs(d: dict) -> bool: """Distinguish lm_eval gen config from chat messages / multimodal blobs.""" return any( k in d for k in ( "until", "max_gen_toks", "do_sample", "temperature", "top_p", "top_k", "min_p", "repeats", ) ) def _text_from_prompt_ctx(part) -> list[str]: """Flatten ctx from Instance.args: str, nested list/tuple, or chat message dicts.""" out: list[str] = [] if part is None: return out if isinstance(part, str): s = part.strip() if s.startswith("[") and '"role"' in s and '"content"' in s: try: parsed = json.loads(s) except json.JSONDecodeError: return [part] if isinstance(parsed, list): return _text_from_prompt_ctx(parsed) return [part] if isinstance(part, (list, tuple)): for x in part: out.extend(_text_from_prompt_ctx(x)) return out if isinstance(part, dict): if "content" in part: out.extend(_text_from_prompt_ctx(part["content"])) elif isinstance(part.get("text"), str): out.append(part["text"]) return out return out def _extract_prompt_and_gen_kwargs(arguments) -> tuple[str, dict | None]: """Pull human-readable prompt text and optional generation kwargs from logged arguments.""" if not arguments: return "", None chunks: list[str] = [] gen_kwargs: dict | None = None for req_args in arguments: if not isinstance(req_args, (list, tuple)): continue for item in req_args: if isinstance(item, dict) and _is_lm_eval_generation_kwargs(item): if gen_kwargs is None: gen_kwargs = item continue chunks.extend(_text_from_prompt_ctx(item)) prompt = "\n".join(s for s in chunks if s) return prompt, gen_kwargs def _sample_row_for_jsonl(sample: dict) -> dict: """Readable JSONL row for local files and custom Hub upload (not lm-eval tracker).""" out = copy.deepcopy(sample) for key in ("doc_hash", "prompt_hash", "target_hash"): out.pop(key, None) prompt, gen_kwargs = _extract_prompt_and_gen_kwargs(out.get("arguments")) out["prompt"] = prompt out["target"] = str(out.get("target", "")) out.pop("arguments", None) if gen_kwargs is not None: out["gen_kwargs"] = gen_kwargs out["resps"] = sanitize_list(out["resps"]) out["filtered_resps"] = sanitize_list(out["filtered_resps"]) if "reasoning_content" in out: out["reasoning_content"] = sanitize_list(out["reasoning_content"]) return out def _rows_to_jsonl_bytes(rows: list) -> bytes: """UTF-8 JSONL; one object per line via _sample_row_for_jsonl.""" lines = [] for sample in rows: row = _sample_row_for_jsonl(sample) lines.append( json.dumps( row, default=handle_non_serializable, ensure_ascii=False, ) + "\n" ) return "".join(lines).encode("utf-8") def _hf_path_segment(name: str) -> str: """Safe path segment for Hub (no slashes or odd chars). Excludes `.` because HF Jobs' tag validator rejects dots (model_ids like "minimax/minimax-m2.5" silently failed to spawn at the POST /api/jobs validation step with "tags must contain only alphanumeric characters, '-', '_', or '='"). The label validator accepts dots; tags do not. The slug is reused as both, so we conform to the strictest downstream rule. """ s = (name or "").strip() s = re.sub(r"[^a-zA-Z0-9_-]+", "_", s) return s.strip("_-") or "unknown" def _model_slug(model: dict) -> str: """Canonical eval-pipeline identifier: slug(`name__model_id__provider_name`) when provider is known, slug(`name__model_id`) otherwise (TOML-only local runs). This is the single string used for: * `--models` filter argument (`run_eval.py --models `), * HF Job `model` label (slug-safe by construction), * HF dataset upload folder. The provider component closes the last remaining ambiguity: if two Convex `models` rows share BOTH `name` and `model_id` but route to different providers (e.g. one OpenRouter row and one direct row, both labeled "GPT-5 Nano" with id `openai/gpt-5-nano`), they're distinct entities and need distinct slugs to be benchmarked separately. Without provider in the slug, the dispatcher's duplicate-slug guard refuses to spawn for this otherwise-legitimate case. `name` is the RAW `models.name` from Convex (alias-stripped by `models:listForEvals`). Alias must never participate in routing. `provider_name` comes from `providers.name` on the Convex side and is absent for TOML-only local runs (no Convex roster) — in that case the slug is 2-part and folders share the pre-refactor shape. """ parts = [model.get("name", ""), model.get("model_id", "")] provider = (model.get("provider_name") or "").strip() if provider: parts.append(provider) return _hf_path_segment("__".join(parts)) def _infer_lang_folder(category_task: str, override: str) -> str: """e.g. swahili_sib200 → swahili; override from config wins.""" o = (override or "").strip() if o: return _hf_path_segment(o) parts = category_task.split("_") if len(parts) >= 2: return _hf_path_segment(parts[0]) return _hf_path_segment(category_task) def _write_local_samples_jsonl(output_path: str, task_name: str, rows: list) -> None: """One JSON object per line (readable prompt/target, no hash fields).""" os.makedirs(output_path, exist_ok=True) date_id = datetime.now().isoformat().replace(":", "-") filepath = os.path.join(output_path, f"samples_{task_name}_{date_id}.jsonl") with open(filepath, "wb") as f: f.write(_rows_to_jsonl_bytes(rows)) def _upload_custom_samples_repo( *, repo_id: str, token: str, model_folder: str, lang_folder: str, category: str, date_yyyy_mm_dd: str, rows: list, private: bool, ) -> None: """Upload a single JSONL to dataset repo at model/lang/category_date.jsonl.""" try: from huggingface_hub import HfApi except ModuleNotFoundError as e: raise RuntimeError( "huggingface_hub is required for custom_samples_repo. " "Install with: pip install huggingface_hub" ) from e mf = _hf_path_segment(model_folder) lf = _hf_path_segment(lang_folder) cat = _hf_path_segment(category) filename = f"{cat}_{date_yyyy_mm_dd}.jsonl" path_in_repo = f"{mf}/{lf}/{filename}" api = HfApi(token=token) api.create_repo(repo_id, repo_type="dataset", private=private, exist_ok=True) data = _rows_to_jsonl_bytes(rows) api.upload_file( path_or_fileobj=io.BytesIO(data), path_in_repo=path_in_repo, repo_id=repo_id, repo_type="dataset", commit_message=f"samples {filename}", ) logger.info( "Uploaded samples to Hugging Face dataset %s (path: %s)", repo_id, path_in_repo, ) def _build_evaluation_tracker(output_path: str, hf_hub: dict | None): """Return EvaluationTracker if Hub push is enabled, else None.""" if not hf_hub: return None if hf_hub.get("lm_eval_hub_upload", True) is False: return None push_s = bool(hf_hub.get("push_samples_to_hub", False)) push_r = bool(hf_hub.get("push_results_to_hub", False)) if not push_s and not push_r: return None token = (hf_hub.get("token") or os.environ.get("HF_TOKEN") or "").strip() from lm_eval.loggers.evaluation_tracker import EvaluationTracker try: return EvaluationTracker( output_path=output_path, hub_results_org=str(hf_hub.get("hub_results_org", "") or ""), details_repo_name=str(hf_hub.get("details_repo_name", "") or ""), results_repo_name=str(hf_hub.get("results_repo_name", "") or ""), push_results_to_hub=push_r, push_samples_to_hub=push_s, public_repo=bool(hf_hub.get("public_repo", False)), token=token, gated=bool(hf_hub.get("gated", False)), ) except ValueError as e: logger.warning("HF Hub upload disabled: %s", e) return None def run_single_eval( *, include_path: str, task_name: str, model_id: str, model_display_name: str, model_slug: str, output_path: str, base_url: str, num_concurrent: int, num_fewshot: int, apply_chat_template: bool, log_samples: bool, gen_kwargs: str | None = None, hf_hub: dict | None = None, endpoint_kind: str = "chat_completions", max_retries: int = 3, timeout: int = 300, ) -> dict | None: """Run a single lm_eval evaluation via the Python API and return results. ``max_retries`` and ``timeout`` are forwarded into lm-eval's TemplateAPI via ``model_args``. Defaults match lm-eval's own stock values; the TOML ``[defaults]`` block typically overrides them for Functionary endpoints (see eval_config.toml for the rationale). """ _ensure_lm_eval_api_key() model_args = ( f"model={model_id},base_url={base_url},num_concurrent={num_concurrent}," f"max_retries={max_retries},timeout={timeout}" ) tracker = _build_evaluation_tracker(output_path, hf_hub) lm_eval_model = ( "local-responses-completions" if endpoint_kind == "responses" else "local-chat-completions" ) results = lm_eval.simple_evaluate( model=lm_eval_model, model_args=model_args, tasks=[task_name], num_fewshot=num_fewshot, log_samples=log_samples, task_manager=lm_eval.tasks.TaskManager(include_path=[include_path]), apply_chat_template=apply_chat_template if apply_chat_template else None, gen_kwargs=gen_kwargs, evaluation_tracker=tracker, ) if results and log_samples and results.get("samples"): _add_reasoning_content_to_samples(results["samples"]) if results and output_path: os.makedirs(output_path, exist_ok=True) results_file = os.path.join(output_path, "results.json") dumped = {k: v for k, v in results.items() if k != "samples"} with open(results_file, "w") as f: json.dump(dumped, f, indent=2, default=str) if log_samples and "samples" in results: samples_file = os.path.join(output_path, "samples.json") with open(samples_file, "w") as f: json.dump(results["samples"], f, indent=2, default=str) if tracker is None: for tname, rows in results["samples"].items(): _write_local_samples_jsonl(output_path, tname, rows) if tracker and results: samples = results.get("samples") if log_samples else None results_for_hub = {k: v for k, v in results.items() if k != "samples"} try: tracker.save_results_aggregated( results=results_for_hub, samples=samples, ) if log_samples and samples: for tname in samples: tracker.save_results_samples(task_name=tname, samples=samples[tname]) if tracker.push_results_to_hub or tracker.push_samples_to_hub: try: tracker.recreate_metadata_card() except Exception as e: logger.warning( "Could not recreate HF metadata card (repo/auth?). Local saves OK." ) logger.info(repr(e)) except Exception as e: logger.warning("Hugging Face upload or tracker save failed.") logger.info(repr(e)) # Custom single-repo samples layout: {model}/{lang}/{task}_{YYYY-MM-DD}.jsonl if ( hf_hub and results and log_samples and results.get("samples") and str(hf_hub.get("custom_samples_repo", "") or "").strip() ): repo_id = (os.environ.get("HF_DATASET_REPO") or str(hf_hub["custom_samples_repo"])).strip() token = (hf_hub.get("token") or os.environ.get("HF_TOKEN") or "").strip() if not token: logger.warning("custom_samples_repo set but no HF_TOKEN; skipping custom upload.") else: lang_override = str(hf_hub.get("samples_lang", "") or "") private = bool(hf_hub.get("custom_samples_repo_private", True)) date_day = datetime.now().strftime("%Y-%m-%d") for category, rows in results["samples"].items(): lang = _infer_lang_folder(category, lang_override) try: _upload_custom_samples_repo( repo_id=repo_id, token=token, # Folder = `model_slug` = slug(name__model_id__provider). # Pre-computed by the caller so the same string is used # for the HF Job label, the `--models` filter, and the # upload folder — single source of truth. The provider # component disambiguates two Convex rows that share # BOTH name and model_id but route via different # providers (OpenRouter vs direct, etc). model_folder=model_slug, lang_folder=lang, category=category, date_yyyy_mm_dd=date_day, rows=rows, private=private, ) except Exception as e: logger.warning("Custom HF samples upload failed for %s: %s", category, e) logger.info(repr(e)) # Upload aggregated results.json alongside the samples try: from huggingface_hub import HfApi results_data = { "model_id": model_id, "model_name": model_display_name, "results": results.get("results", {}), "groups": results.get("groups", {}), "group_subtasks": results.get("group_subtasks", {}), } results_bytes = json.dumps(results_data, indent=2, default=str).encode("utf-8") # Same canonical slug as the samples upload above — # slug(name__model_id__provider). Single source of truth for # all eval-pipeline routing identifiers (HF label, --models # filter, dataset folder). results_path = f"{model_slug}/results_{task_name}_{date_day}.json" api = HfApi(token=token) api.upload_file( path_or_fileobj=io.BytesIO(results_bytes), path_in_repo=results_path, repo_id=repo_id, repo_type="dataset", commit_message=f"results {task_name} {date_day}", ) logger.info("Uploaded results to %s (path: %s)", repo_id, results_path) except Exception as e: logger.warning("Custom HF results upload failed: %s", e) logger.info(repr(e)) return results def _filter_models_by_cost( models: list[dict], tasks: list[dict], max_cost_usd: float, ) -> list[dict]: """Estimate per-model cost for the given tasks; drop any over max_cost_usd. Models whose pricing cannot be resolved (no OpenRouter entry and no TOML `pricing` block) are also dropped, with a warning. Prints a kept/dropped table so the run log explains exactly why each model was included. """ from cost_core import ( cost_per_model, fetch_openrouter_pricing, format_money, measure_tasks, ) print(f"Estimating cost across {len(tasks)} task group(s) for {len(models)} model(s)…") task_stats = measure_tasks( [t["name"] for t in tasks], include_path=SCRIPT_DIR, ) total_input = sum(t.input_tokens for t in task_stats) total_output = sum(t.output_tokens for t in task_stats) pricing_data = fetch_openrouter_pricing() # Distinguish "OpenRouter unreachable / returned nothing" from "specific # model has no entry". The former silently dropped every OpenRouter model # from the roster in earlier versions and produced a green workflow with # zero benchmarks run; raising here surfaces the outage via the workflow's # `if: failure()` Slack handler instead of degrading silently. if not pricing_data: needs_pricing = [ m for m in models if not ( isinstance(m.get("pricing"), dict) and "input_per_1m" in m["pricing"] and "output_per_1m" in m["pricing"] ) ] if needs_pricing: names = ", ".join(m["name"] for m in needs_pricing[:5]) more = "" if len(needs_pricing) <= 5 else f" (+{len(needs_pricing) - 5} more)" raise RuntimeError( f"OpenRouter pricing fetch returned no entries; cannot estimate " f"cost for {len(needs_pricing)} model(s) lacking a TOML pricing " f"block: {names}{more}. Failing the discovery step so this " f"surfaces in alerting rather than silently dropping benchmarks." ) rows = cost_per_model( models, total_input_tokens=total_input, total_output_tokens=total_output, openrouter_pricing=pricing_data, ) # Map row -> model dict by (name, model_id, provider_name) so two # Convex rows that share name AND model_id but route through different # providers don't collapse to a single entry here. Without provider in # the key, the cost-filter would drop one of the two before the # dispatcher ever sees it — defeating the provider-in-slug routing # work downstream. `provider_name` is empty string for TOML-only # rows (no Convex provider info), which still keeps each TOML entry # distinct because pure TOML rosters can't have same-(name, model_id) # duplicates by construction. by_key = {(m["name"], m["model_id"], m.get("provider_name", "")): m for m in models} kept: list[dict] = [] print() print(f"{'Model':<32} {'Total $':>10} Decision") print("-" * 70) for r in sorted(rows, key=lambda r: (r.total_cost is None, r.total_cost or 0)): model_entry = by_key[(r.name, r.model_id, r.provider_name)] forced = bool(model_entry.get("force_include")) if r.total_cost is None: if forced: kept.append(model_entry) print(f"{r.name:<32} {'—':>10} keep (force_include)") else: print(f"{r.name:<32} {'—':>10} drop ({r.note})") continue if forced and r.total_cost > max_cost_usd: kept.append(model_entry) print( f"{r.name:<32} {format_money(r.total_cost):>10} " f"keep (force_include, over ${max_cost_usd:.2f})" ) elif r.total_cost <= max_cost_usd: kept.append(model_entry) print(f"{r.name:<32} {format_money(r.total_cost):>10} keep") else: print( f"{r.name:<32} {format_money(r.total_cost):>10} " f"drop (over ${max_cost_usd:.2f})" ) print() return kept def main(): parser = argparse.ArgumentParser(description="Run lm-eval from TOML config") parser.add_argument("--config", default=DEFAULT_CONFIG, help="Path to TOML config") parser.add_argument("--models", nargs="*", help="Filter to specific model name(s)") parser.add_argument( "--tasks", nargs="*", help="Task/group names: filter to [[tasks]] in TOML, or any lm-eval task name", ) parser.add_argument("--dry-run", action="store_true", help="Print params without running") parser.add_argument( "--gen-kwargs", type=str, default=None, help='Comma-separated gen params merged into API JSON, e.g. ' '"max_gen_toks=8192,temperature=0.6,top_p=0.95,top_k=20,min_p=0.0,until=[\'<|endoftext|>\']"', ) parser.add_argument( "--num-concurrent", type=int, default=None, metavar="N", help="Override parallel in-flight API requests (default: [defaults].num_concurrent in TOML)", ) parser.add_argument( "--extra-body", type=str, default=None, help='JSON string merged into every API request body, e.g. ' """'{"provider": {"order": ["alibaba"]}}'""", ) parser.add_argument( "--from-convex", action="store_true", help="Discover the model roster from a live Convex deployment " "(CONVEX_URL env var) instead of reading [[models]] from TOML. " "TOML still supplies per-model overrides (base_url, api_key_env, " "pricing, endpoint_kind) by matching on model_id.", ) parser.add_argument( "--max-cost", type=float, default=None, metavar="USD", help="Drop any discovered model whose estimated cost per full run " "exceeds this many USD. Models with no resolvable pricing (no " "OpenRouter entry and no TOML pricing block) are also dropped. " "Has no effect unless used with --from-convex (or in combination " "with the TOML roster, in which case it filters that too).", ) parser.add_argument( "--list-models", type=str, default=None, metavar="PATH", help="Resolve the roster (apply --from-convex / --max-cost), write " "the full filtered model entries as a JSON array to PATH, print one " "model name per line to stdout, and exit. Lets a workflow run each " "model in its own subprocess via --models-file, so a session/asyncio " "crash on one model can't poison the others.", ) parser.add_argument( "--models-file", type=str, default=None, metavar="PATH", help="Load the model roster from a JSON file (array of model entries) " "instead of querying Convex/reading TOML [[models]]. Used together " "with --models NAME for per-model subprocess invocations.", ) args = parser.parse_args() if args.extra_body: _extra_body.value = json.loads(args.extra_body) logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") # One-time startup fallback: if OPENAI_API_KEY is empty but OPENROUTER_API_KEY # is set, populate OPENAI_API_KEY for models that don't override api_key_env. # This runs BEFORE the model loop so prev_api_key captures the correct value # and restoration between models works properly. if not os.environ.get("OPENAI_API_KEY", "").strip(): or_key = os.environ.get("OPENROUTER_API_KEY", "").strip() if or_key: os.environ["OPENAI_API_KEY"] = or_key cfg = load_config(args.config) defaults = cfg["defaults"] hf_hub = cfg.get("hf_hub") if isinstance(hf_hub, dict): hf_hub = {k: v for k, v in hf_hub.items()} else: hf_hub = None if args.models_file: with open(args.models_file) as f: models = json.load(f) if not isinstance(models, list): print(f"--models-file {args.models_file} must contain a JSON array", file=sys.stderr) sys.exit(1) print(f"Loaded {len(models)} model(s) from {args.models_file}.") elif args.from_convex: from model_discovery import fetch_active_models convex_url = os.environ.get("CONVEX_URL", "").strip() models = fetch_active_models(convex_url, cfg.get("models", [])) print(f"Discovered {len(models)} active text model(s) from Convex.") else: models = cfg["models"] tasks = cfg["tasks"] if args.models: # Per-arg matching with precedence: slug → model_id → name. # Each `--models` argument is resolved independently and the # first matching tier wins for that argument; lower tiers are # NOT tried as fallbacks for the same arg. This is critical # because: # * The dispatcher passes the canonical eval slug # (slug(name__model_id__provider)). A slug uniquely # identifies one entry. If we ALSO matched by `model_id` # for the same arg, every other entry sharing that # `model_id` (e.g. the same model exposed via a second # provider) would also match, and the child would run # all of them — duplicating work and double-uploading. # * Older code matched any of {slug, model_id, name} across # ALL args at once, which silently dropped name-only matches # in mixed invocations like `--models gpt-5-nano my-id` # (if `my-id` matched a model_id, the name-only `gpt-5-nano` # was never tried). # Accepts mixed input — slugs from the dispatcher AND raw # name/model_id from devs typing into a shell — without the # fallthrough hazards. matched: list[dict] = [] matched_ids: set[int] = set() unmatched_args: list[str] = [] def _add(entries: list[dict]) -> None: for entry in entries: key = id(entry) if key not in matched_ids: matched_ids.add(key) matched.append(entry) for arg in args.models: arg_lower = arg.lower() slug_hits = [m for m in models if _model_slug(m).lower() == arg_lower] if slug_hits: _add(slug_hits) continue id_hits = [m for m in models if m.get("model_id", "").lower() == arg_lower] if id_hits: _add(id_hits) continue name_hits = [m for m in models if m["name"].lower() == arg_lower] if name_hits: _add(name_hits) continue unmatched_args.append(arg) if not matched: known = sorted(_model_slug(m) for m in models) print(f"No models matched: {args.models}", file=sys.stderr) print( f"Available identifiers (slug(name__model_id__provider)): {known}", file=sys.stderr, ) sys.exit(1) if unmatched_args: # Some args matched, some didn't — surface the misses but # continue with the partial set so a single typo in a long # invocation doesn't waste the whole run. print( f"WARNING: --models had no match for: {unmatched_args}", file=sys.stderr, ) models = matched if args.tasks: filter_set = {t.lower() for t in args.tasks} matched = [t for t in tasks if t["name"].lower() in filter_set] if matched: tasks = matched else: tasks = [{"name": n} for n in args.tasks] if args.max_cost is not None: models = _filter_models_by_cost(models, tasks, args.max_cost) # Write the roster file FIRST (even if empty), so the workflow's # `jq -r '.[].name' output/roster.json` step always finds the file. # An empty array → zero names → bash loop iterates zero times → workflow # logs "Per-model failures: 0 of 0" and finishes cleanly. Without this, # an over-aggressive --max-cost would race the file's existence and # produce a misleading "workflow failed" Slack alert. if args.list_models: os.makedirs(os.path.dirname(args.list_models) or ".", exist_ok=True) with open(args.list_models, "w") as f: json.dump(models, f, indent=2) for m in models: print(m["name"]) print( f"\nWrote {len(models)} model(s) to {args.list_models}.", file=sys.stderr, ) return if args.max_cost is not None and not models: print( f"No models passed the --max-cost ${args.max_cost:.2f} threshold; nothing to run.", file=sys.stderr, ) sys.exit(0) total = len(models) * len(tasks) print(f"Running {len(models)} model(s) x {len(tasks)} task(s) = {total} eval(s)\n") failures = 0 for i, model in enumerate(models, 1): # --- Multi-provider support --- # Each [[models]] entry can optionally override: # base_url — custom API endpoint (falls back to [defaults].base_url) # api_key_env — env var name holding the API key (default: OPENAI_API_KEY) # custom_headers — dict of extra HTTP headers model_base_url = model.get("base_url", defaults["base_url"]) # Save original API key so we can restore it after this model prev_api_key = os.environ.get("OPENAI_API_KEY", "") api_key_env = model.get("api_key_env", "OPENAI_API_KEY") if api_key_env != "OPENAI_API_KEY": env_val = os.environ.get(api_key_env, "").strip() if env_val: os.environ["OPENAI_API_KEY"] = env_val else: # Clear the stale key rather than silently inheriting the # previous model's credentials. This forces a clean auth # failure on the misconfigured model instead of making it # use the wrong key. logger.warning( "Model %s requests api_key_env=%s but that var is empty/unset; clearing OPENAI_API_KEY for this model.", model["name"], api_key_env, ) os.environ["OPENAI_API_KEY"] = "" model_custom_headers = model.get("custom_headers", {}) if model_custom_headers: prev_extra = _extra_body.value.copy() _extra_body.value = {**_extra_body.value, "extra_headers": model_custom_headers} for j, task in enumerate(tasks, 1): run_idx = (i - 1) * len(tasks) + j output_path = os.path.join( defaults.get("output_path", "output/results"), model["name"], task["name"], ) gen_kwargs = args.gen_kwargs or model.get("gen_kwargs") num_concurrent = ( args.num_concurrent if args.num_concurrent is not None else defaults.get("num_concurrent", 5) ) # Resilience knobs read from [defaults]; see eval_config.toml. # Stock lm-eval defaults (3 / 300) are kept as fallbacks so the # behavior is unchanged when the TOML doesn't override them. max_retries = int(defaults.get("max_retries", 3)) timeout = int(defaults.get("timeout", 300)) eval_kwargs = dict( include_path=SCRIPT_DIR, task_name=task["name"], model_id=model["model_id"], model_display_name=model["name"], # Pre-compute the canonical slug once per (model, task) so the # HF upload folder is guaranteed to match the dispatcher's # spawn label and the child's `--models` arg. model_slug=_model_slug(model), output_path=output_path, base_url=model_base_url, num_concurrent=num_concurrent, num_fewshot=defaults.get("num_fewshot", 0), apply_chat_template=defaults.get("apply_chat_template", True), log_samples=defaults.get("log_samples", True), gen_kwargs=gen_kwargs, hf_hub=hf_hub, endpoint_kind=model.get("endpoint_kind", "chat_completions"), max_retries=max_retries, timeout=timeout, ) header = f"[{run_idx}/{total}] {model['name']} x {task['name']}" print(f"{'-' * 60}") print(f" {header}") print(f" -> params: {eval_kwargs}") print(f"{'-' * 60}") if args.dry_run: continue try: results = run_single_eval(**eval_kwargs) if results: table_str = make_table(results) # Replace Unicode chars that fail on Windows cp1252 table_str = table_str.encode("ascii", "replace").decode("ascii") print(table_str) print(f"\n[DONE] {header}\n") except Exception: traceback.print_exc() print(f"\n[FAILED] {header}\n", file=sys.stderr) failures += 1 # Restore state after processing all tasks for this model if model_custom_headers: _extra_body.value = prev_extra if api_key_env != "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"] = prev_api_key if failures: print(f"\n{failures}/{total} evaluation(s) failed.", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()