Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """Run lm-eval across multiple models and tasks defined in eval_config.toml. | |
| Uses the lm_eval Python API directly (no subprocess). | |
| Usage: | |
| python run_eval.py # all models × all tasks | |
| python run_eval.py --models Qwen3.5-27b # one model, all tasks | |
| python run_eval.py --tasks wikipedia # all models, one task | |
| python run_eval.py --models Qwen3.5-27b --tasks wikipedia | |
| python run_eval.py --dry-run # print params only | |
| Generation parameters (max_gen_toks, temperature, top_p, top_k, min_p, …) are NOT | |
| baked into the task YAMLs so they can vary per model. Extra keys are merged into | |
| the chat-completions JSON body (OpenRouter-style); you do not need a separate | |
| ``extra_body`` wrapper. Set them: | |
| 1. Per-model in eval_config.toml → gen_kwargs = "max_gen_toks=4096,temperature=0.6" | |
| 2. Via CLI (overrides everything) → --gen-kwargs "max_gen_toks=8192,top_k=20,min_p=0.0" | |
| 3. Parallel API calls → [defaults] num_concurrent or --num-concurrent 50 | |
| Reasoning: monkey-patches LocalChatCompletion.parse_generations so API fields | |
| ``reasoning`` / ``reasoning_content`` are preserved (wire format matches | |
| ``f1_utils`` / ``judge_utils`` ``think`` tags). After each run, samples are split: | |
| - resps / filtered_resps: final model text only | |
| - reasoning_content: parallel nested shape (or "") | |
| Optional [hf_hub] in eval_config.toml: | |
| - **lm_eval_hub_upload** (default true): lm-eval ``EvaluationTracker`` (results + | |
| samples to details/results repos). | |
| - **custom_samples_repo**: single dataset repo; uploads **samples only** as | |
| ``{slug(name__model_id)}/{lang}/{task}_{YYYY-MM-DD}.jsonl`` (set | |
| **lm_eval_hub_upload** false to avoid duplicate Hub uploads). Folder | |
| is the slug of the composite ``name__model_id`` so two providers that | |
| expose the same ``model_id`` get distinct folders. ``name`` here is | |
| the raw ``models.name`` from Convex — alias is intentionally never | |
| used in the routing identifier. | |
| Without Hub, per-task ``samples_<task>_<timestamp>.jsonl`` is still written next | |
| to ``samples.json`` when log_samples is true. JSONL rows omit ``*_hash`` fields, | |
| add a plain ``prompt`` string, and keep ``target`` at top level (no nested | |
| ``gen_args_*`` mirror of lm-eval's Hub format). | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import copy | |
| import io | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import sys | |
| import traceback | |
| from datetime import datetime | |
| import lm_eval | |
| import f1_utils # noqa: F401 — registers regex_last for task YAMLs | |
| import _extra_body # shared mutable dict; see module docstring for the __main__ hazard | |
| import responses_model # noqa: F401 — registers `local-responses-completions` | |
| from register_sas_encoder_metric import ensure_sas_encoder_metric | |
| from lm_eval.utils import handle_non_serializable, make_table, sanitize_list | |
| ensure_sas_encoder_metric() | |
| logger = logging.getLogger(__name__) | |
| # Must match multilingual task helpers (f1_utils, judge_utils, spanish/qa/utils). | |
| _THINK_OPEN = "<think>" | |
| _THINK_CLOSE = "</think>" | |
| def _patch_chat_completion(): | |
| """Monkey-patch LocalChatCompletion for reasoning pass-through and extra_body.""" | |
| from lm_eval.models import openai_completions as oc | |
| _orig_create_payload = oc.LocalChatCompletion._create_payload | |
| def _create_payload_with_extra(self, *args, **kwargs): | |
| payload = _orig_create_payload(self, *args, **kwargs) | |
| # Read via the module attribute so both the __main__ run_eval and the | |
| # `import run_eval` instance see the same dict (Python loads them as | |
| # distinct module objects with their own globals). | |
| if _extra_body.value: | |
| payload.update(_extra_body.value) | |
| return payload | |
| def parse_generations(outputs, **kwargs): | |
| res = [] | |
| if not isinstance(outputs, list): | |
| outputs = [outputs] | |
| for out in outputs: | |
| try: | |
| tmp = [None] * len(out["choices"]) | |
| for choices in out["choices"]: | |
| msg = choices.get("message") or {} | |
| content = msg.get("content") | |
| if content is None: | |
| content = "" | |
| reasoning = msg.get("reasoning") or msg.get("reasoning_content") or "" | |
| if reasoning is None: | |
| reasoning = "" | |
| if reasoning: | |
| content = f"{_THINK_OPEN}{reasoning}{_THINK_CLOSE}{content}" | |
| tmp[choices["index"]] = content | |
| except Exception: | |
| tmp = [""] | |
| res = res + tmp | |
| return res | |
| oc.LocalChatCompletion._create_payload = _create_payload_with_extra | |
| oc.LocalChatCompletion.parse_generations = parse_generations | |
| _patch_chat_completion() | |
| try: | |
| import tomllib # Python 3.11+ stdlib | |
| except ModuleNotFoundError: | |
| try: | |
| import tomli as tomllib # pip install tomli (for Python < 3.11) | |
| except ModuleNotFoundError: | |
| raise SystemExit( | |
| "No TOML parser available. Either:\n" | |
| " • Use Python 3.11+ (has tomllib), e.g. python3.11 run_eval.py …\n" | |
| " • Or install tomli in this environment: pip install tomli\n" | |
| f" (current interpreter: {sys.executable})" | |
| ) from None | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DEFAULT_CONFIG = os.path.join(SCRIPT_DIR, "eval_config.toml") | |
| # Load .env.local from project root (one level up from evals/) | |
| _ENV_LOCAL = os.path.join(SCRIPT_DIR, "..", ".env.local") | |
| if os.path.isfile(_ENV_LOCAL): | |
| with open(_ENV_LOCAL) as _f: | |
| for _line in _f: | |
| _line = _line.strip() | |
| if _line and not _line.startswith("#") and "=" in _line: | |
| _k, _, _v = _line.partition("=") | |
| _k, _v = _k.strip(), _v.strip() | |
| if _k and _v and _k not in os.environ: | |
| os.environ[_k] = _v | |
| def load_config(path: str) -> dict: | |
| with open(path, "rb") as f: | |
| return tomllib.load(f) | |
| def _ensure_lm_eval_api_key(): | |
| """local-chat-completions uses OPENAI_API_KEY for the Bearer header only. | |
| No-op. The OpenRouter→OPENAI_API_KEY fallback is now done exactly once | |
| in main() before the model loop, so per-model key swaps (including | |
| intentional clears when a model's api_key_env is unset) are respected | |
| for subsequent models. | |
| """ | |
| return | |
| def _split_reasoning_from_text(text: str) -> tuple[str, str]: | |
| """If text was wrapped with think tags, return (reasoning, content); else ('', text).""" | |
| if not isinstance(text, str) or not text.startswith(_THINK_OPEN): | |
| return "", text | |
| idx = text.find(_THINK_CLOSE, len(_THINK_OPEN)) | |
| if idx == -1: | |
| return "", text | |
| reasoning = text[len(_THINK_OPEN) : idx] | |
| content = text[idx + len(_THINK_CLOSE) :] | |
| return reasoning, content | |
| def _split_resps_structure(resps): | |
| """Return (content_resps, reasoning_resps) with identical nesting.""" | |
| if isinstance(resps, str): | |
| reasoning, content = _split_reasoning_from_text(resps) | |
| return content, reasoning | |
| if isinstance(resps, list): | |
| contents, reasons = [], [] | |
| for x in resps: | |
| c, r = _split_resps_structure(x) | |
| contents.append(c) | |
| reasons.append(r) | |
| return contents, reasons | |
| return resps, "" | |
| def _add_reasoning_content_to_samples(samples: dict) -> None: | |
| """Mutate samples: strip think wrapper from resps and filtered_resps; add reasoning_content.""" | |
| for _task_name, rows in samples.items(): | |
| for sample in rows: | |
| if "resps" not in sample: | |
| continue | |
| content, reasoning = _split_resps_structure(sample["resps"]) | |
| sample["resps"] = content | |
| sample["reasoning_content"] = reasoning | |
| if "filtered_resps" in sample: | |
| fc, _ = _split_resps_structure(sample["filtered_resps"]) | |
| sample["filtered_resps"] = fc | |
| def _is_lm_eval_generation_kwargs(d: dict) -> bool: | |
| """Distinguish lm_eval gen config from chat messages / multimodal blobs.""" | |
| return any( | |
| k in d | |
| for k in ( | |
| "until", | |
| "max_gen_toks", | |
| "do_sample", | |
| "temperature", | |
| "top_p", | |
| "top_k", | |
| "min_p", | |
| "repeats", | |
| ) | |
| ) | |
| def _text_from_prompt_ctx(part) -> list[str]: | |
| """Flatten ctx from Instance.args: str, nested list/tuple, or chat message dicts.""" | |
| out: list[str] = [] | |
| if part is None: | |
| return out | |
| if isinstance(part, str): | |
| s = part.strip() | |
| if s.startswith("[") and '"role"' in s and '"content"' in s: | |
| try: | |
| parsed = json.loads(s) | |
| except json.JSONDecodeError: | |
| return [part] | |
| if isinstance(parsed, list): | |
| return _text_from_prompt_ctx(parsed) | |
| return [part] | |
| if isinstance(part, (list, tuple)): | |
| for x in part: | |
| out.extend(_text_from_prompt_ctx(x)) | |
| return out | |
| if isinstance(part, dict): | |
| if "content" in part: | |
| out.extend(_text_from_prompt_ctx(part["content"])) | |
| elif isinstance(part.get("text"), str): | |
| out.append(part["text"]) | |
| return out | |
| return out | |
| def _extract_prompt_and_gen_kwargs(arguments) -> tuple[str, dict | None]: | |
| """Pull human-readable prompt text and optional generation kwargs from logged arguments.""" | |
| if not arguments: | |
| return "", None | |
| chunks: list[str] = [] | |
| gen_kwargs: dict | None = None | |
| for req_args in arguments: | |
| if not isinstance(req_args, (list, tuple)): | |
| continue | |
| for item in req_args: | |
| if isinstance(item, dict) and _is_lm_eval_generation_kwargs(item): | |
| if gen_kwargs is None: | |
| gen_kwargs = item | |
| continue | |
| chunks.extend(_text_from_prompt_ctx(item)) | |
| prompt = "\n".join(s for s in chunks if s) | |
| return prompt, gen_kwargs | |
| def _sample_row_for_jsonl(sample: dict) -> dict: | |
| """Readable JSONL row for local files and custom Hub upload (not lm-eval tracker).""" | |
| out = copy.deepcopy(sample) | |
| for key in ("doc_hash", "prompt_hash", "target_hash"): | |
| out.pop(key, None) | |
| prompt, gen_kwargs = _extract_prompt_and_gen_kwargs(out.get("arguments")) | |
| out["prompt"] = prompt | |
| out["target"] = str(out.get("target", "")) | |
| out.pop("arguments", None) | |
| if gen_kwargs is not None: | |
| out["gen_kwargs"] = gen_kwargs | |
| out["resps"] = sanitize_list(out["resps"]) | |
| out["filtered_resps"] = sanitize_list(out["filtered_resps"]) | |
| if "reasoning_content" in out: | |
| out["reasoning_content"] = sanitize_list(out["reasoning_content"]) | |
| return out | |
| def _rows_to_jsonl_bytes(rows: list) -> bytes: | |
| """UTF-8 JSONL; one object per line via _sample_row_for_jsonl.""" | |
| lines = [] | |
| for sample in rows: | |
| row = _sample_row_for_jsonl(sample) | |
| lines.append( | |
| json.dumps( | |
| row, | |
| default=handle_non_serializable, | |
| ensure_ascii=False, | |
| ) | |
| + "\n" | |
| ) | |
| return "".join(lines).encode("utf-8") | |
| def _hf_path_segment(name: str) -> str: | |
| """Safe path segment for Hub (no slashes or odd chars). | |
| Excludes `.` because HF Jobs' tag validator rejects dots (model_ids | |
| like "minimax/minimax-m2.5" silently failed to spawn at the | |
| POST /api/jobs validation step with "tags must contain only | |
| alphanumeric characters, '-', '_', or '='"). The label validator | |
| accepts dots; tags do not. The slug is reused as both, so we conform | |
| to the strictest downstream rule. | |
| """ | |
| s = (name or "").strip() | |
| s = re.sub(r"[^a-zA-Z0-9_-]+", "_", s) | |
| return s.strip("_-") or "unknown" | |
| def _model_slug(model: dict) -> str: | |
| """Canonical eval-pipeline identifier: | |
| slug(`name__model_id__provider_name`) when provider is known, | |
| slug(`name__model_id`) otherwise (TOML-only local runs). | |
| This is the single string used for: | |
| * `--models` filter argument (`run_eval.py --models <slug>`), | |
| * HF Job `model` label (slug-safe by construction), | |
| * HF dataset upload folder. | |
| The provider component closes the last remaining ambiguity: if two | |
| Convex `models` rows share BOTH `name` and `model_id` but route to | |
| different providers (e.g. one OpenRouter row and one direct row, | |
| both labeled "GPT-5 Nano" with id `openai/gpt-5-nano`), they're | |
| distinct entities and need distinct slugs to be benchmarked | |
| separately. Without provider in the slug, the dispatcher's | |
| duplicate-slug guard refuses to spawn for this otherwise-legitimate | |
| case. | |
| `name` is the RAW `models.name` from Convex (alias-stripped by | |
| `models:listForEvals`). Alias must never participate in routing. | |
| `provider_name` comes from `providers.name` on the Convex side and | |
| is absent for TOML-only local runs (no Convex roster) — in that | |
| case the slug is 2-part and folders share the pre-refactor shape. | |
| """ | |
| parts = [model.get("name", ""), model.get("model_id", "")] | |
| provider = (model.get("provider_name") or "").strip() | |
| if provider: | |
| parts.append(provider) | |
| return _hf_path_segment("__".join(parts)) | |
| def _infer_lang_folder(category_task: str, override: str) -> str: | |
| """e.g. swahili_sib200 → swahili; override from config wins.""" | |
| o = (override or "").strip() | |
| if o: | |
| return _hf_path_segment(o) | |
| parts = category_task.split("_") | |
| if len(parts) >= 2: | |
| return _hf_path_segment(parts[0]) | |
| return _hf_path_segment(category_task) | |
| def _write_local_samples_jsonl(output_path: str, task_name: str, rows: list) -> None: | |
| """One JSON object per line (readable prompt/target, no hash fields).""" | |
| os.makedirs(output_path, exist_ok=True) | |
| date_id = datetime.now().isoformat().replace(":", "-") | |
| filepath = os.path.join(output_path, f"samples_{task_name}_{date_id}.jsonl") | |
| with open(filepath, "wb") as f: | |
| f.write(_rows_to_jsonl_bytes(rows)) | |
| def _upload_custom_samples_repo( | |
| *, | |
| repo_id: str, | |
| token: str, | |
| model_folder: str, | |
| lang_folder: str, | |
| category: str, | |
| date_yyyy_mm_dd: str, | |
| rows: list, | |
| private: bool, | |
| ) -> None: | |
| """Upload a single JSONL to dataset repo at model/lang/category_date.jsonl.""" | |
| try: | |
| from huggingface_hub import HfApi | |
| except ModuleNotFoundError as e: | |
| raise RuntimeError( | |
| "huggingface_hub is required for custom_samples_repo. " | |
| "Install with: pip install huggingface_hub" | |
| ) from e | |
| mf = _hf_path_segment(model_folder) | |
| lf = _hf_path_segment(lang_folder) | |
| cat = _hf_path_segment(category) | |
| filename = f"{cat}_{date_yyyy_mm_dd}.jsonl" | |
| path_in_repo = f"{mf}/{lf}/{filename}" | |
| api = HfApi(token=token) | |
| api.create_repo(repo_id, repo_type="dataset", private=private, exist_ok=True) | |
| data = _rows_to_jsonl_bytes(rows) | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(data), | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"samples {filename}", | |
| ) | |
| logger.info( | |
| "Uploaded samples to Hugging Face dataset %s (path: %s)", | |
| repo_id, | |
| path_in_repo, | |
| ) | |
| def _build_evaluation_tracker(output_path: str, hf_hub: dict | None): | |
| """Return EvaluationTracker if Hub push is enabled, else None.""" | |
| if not hf_hub: | |
| return None | |
| if hf_hub.get("lm_eval_hub_upload", True) is False: | |
| return None | |
| push_s = bool(hf_hub.get("push_samples_to_hub", False)) | |
| push_r = bool(hf_hub.get("push_results_to_hub", False)) | |
| if not push_s and not push_r: | |
| return None | |
| token = (hf_hub.get("token") or os.environ.get("HF_TOKEN") or "").strip() | |
| from lm_eval.loggers.evaluation_tracker import EvaluationTracker | |
| try: | |
| return EvaluationTracker( | |
| output_path=output_path, | |
| hub_results_org=str(hf_hub.get("hub_results_org", "") or ""), | |
| details_repo_name=str(hf_hub.get("details_repo_name", "") or ""), | |
| results_repo_name=str(hf_hub.get("results_repo_name", "") or ""), | |
| push_results_to_hub=push_r, | |
| push_samples_to_hub=push_s, | |
| public_repo=bool(hf_hub.get("public_repo", False)), | |
| token=token, | |
| gated=bool(hf_hub.get("gated", False)), | |
| ) | |
| except ValueError as e: | |
| logger.warning("HF Hub upload disabled: %s", e) | |
| return None | |
| def run_single_eval( | |
| *, | |
| include_path: str, | |
| task_name: str, | |
| model_id: str, | |
| model_display_name: str, | |
| model_slug: str, | |
| output_path: str, | |
| base_url: str, | |
| num_concurrent: int, | |
| num_fewshot: int, | |
| apply_chat_template: bool, | |
| log_samples: bool, | |
| gen_kwargs: str | None = None, | |
| hf_hub: dict | None = None, | |
| endpoint_kind: str = "chat_completions", | |
| max_retries: int = 3, | |
| timeout: int = 300, | |
| ) -> dict | None: | |
| """Run a single lm_eval evaluation via the Python API and return results. | |
| ``max_retries`` and ``timeout`` are forwarded into lm-eval's TemplateAPI | |
| via ``model_args``. Defaults match lm-eval's own stock values; the TOML | |
| ``[defaults]`` block typically overrides them for Functionary endpoints | |
| (see eval_config.toml for the rationale). | |
| """ | |
| _ensure_lm_eval_api_key() | |
| model_args = ( | |
| f"model={model_id},base_url={base_url},num_concurrent={num_concurrent}," | |
| f"max_retries={max_retries},timeout={timeout}" | |
| ) | |
| tracker = _build_evaluation_tracker(output_path, hf_hub) | |
| lm_eval_model = ( | |
| "local-responses-completions" if endpoint_kind == "responses" else "local-chat-completions" | |
| ) | |
| results = lm_eval.simple_evaluate( | |
| model=lm_eval_model, | |
| model_args=model_args, | |
| tasks=[task_name], | |
| num_fewshot=num_fewshot, | |
| log_samples=log_samples, | |
| task_manager=lm_eval.tasks.TaskManager(include_path=[include_path]), | |
| apply_chat_template=apply_chat_template if apply_chat_template else None, | |
| gen_kwargs=gen_kwargs, | |
| evaluation_tracker=tracker, | |
| ) | |
| if results and log_samples and results.get("samples"): | |
| _add_reasoning_content_to_samples(results["samples"]) | |
| if results and output_path: | |
| os.makedirs(output_path, exist_ok=True) | |
| results_file = os.path.join(output_path, "results.json") | |
| dumped = {k: v for k, v in results.items() if k != "samples"} | |
| with open(results_file, "w") as f: | |
| json.dump(dumped, f, indent=2, default=str) | |
| if log_samples and "samples" in results: | |
| samples_file = os.path.join(output_path, "samples.json") | |
| with open(samples_file, "w") as f: | |
| json.dump(results["samples"], f, indent=2, default=str) | |
| if tracker is None: | |
| for tname, rows in results["samples"].items(): | |
| _write_local_samples_jsonl(output_path, tname, rows) | |
| if tracker and results: | |
| samples = results.get("samples") if log_samples else None | |
| results_for_hub = {k: v for k, v in results.items() if k != "samples"} | |
| try: | |
| tracker.save_results_aggregated( | |
| results=results_for_hub, | |
| samples=samples, | |
| ) | |
| if log_samples and samples: | |
| for tname in samples: | |
| tracker.save_results_samples(task_name=tname, samples=samples[tname]) | |
| if tracker.push_results_to_hub or tracker.push_samples_to_hub: | |
| try: | |
| tracker.recreate_metadata_card() | |
| except Exception as e: | |
| logger.warning( | |
| "Could not recreate HF metadata card (repo/auth?). Local saves OK." | |
| ) | |
| logger.info(repr(e)) | |
| except Exception as e: | |
| logger.warning("Hugging Face upload or tracker save failed.") | |
| logger.info(repr(e)) | |
| # Custom single-repo samples layout: {model}/{lang}/{task}_{YYYY-MM-DD}.jsonl | |
| if ( | |
| hf_hub | |
| and results | |
| and log_samples | |
| and results.get("samples") | |
| and str(hf_hub.get("custom_samples_repo", "") or "").strip() | |
| ): | |
| repo_id = (os.environ.get("HF_DATASET_REPO") or str(hf_hub["custom_samples_repo"])).strip() | |
| token = (hf_hub.get("token") or os.environ.get("HF_TOKEN") or "").strip() | |
| if not token: | |
| logger.warning("custom_samples_repo set but no HF_TOKEN; skipping custom upload.") | |
| else: | |
| lang_override = str(hf_hub.get("samples_lang", "") or "") | |
| private = bool(hf_hub.get("custom_samples_repo_private", True)) | |
| date_day = datetime.now().strftime("%Y-%m-%d") | |
| for category, rows in results["samples"].items(): | |
| lang = _infer_lang_folder(category, lang_override) | |
| try: | |
| _upload_custom_samples_repo( | |
| repo_id=repo_id, | |
| token=token, | |
| # Folder = `model_slug` = slug(name__model_id__provider). | |
| # Pre-computed by the caller so the same string is used | |
| # for the HF Job label, the `--models` filter, and the | |
| # upload folder — single source of truth. The provider | |
| # component disambiguates two Convex rows that share | |
| # BOTH name and model_id but route via different | |
| # providers (OpenRouter vs direct, etc). | |
| model_folder=model_slug, | |
| lang_folder=lang, | |
| category=category, | |
| date_yyyy_mm_dd=date_day, | |
| rows=rows, | |
| private=private, | |
| ) | |
| except Exception as e: | |
| logger.warning("Custom HF samples upload failed for %s: %s", category, e) | |
| logger.info(repr(e)) | |
| # Upload aggregated results.json alongside the samples | |
| try: | |
| from huggingface_hub import HfApi | |
| results_data = { | |
| "model_id": model_id, | |
| "model_name": model_display_name, | |
| "results": results.get("results", {}), | |
| "groups": results.get("groups", {}), | |
| "group_subtasks": results.get("group_subtasks", {}), | |
| } | |
| results_bytes = json.dumps(results_data, indent=2, default=str).encode("utf-8") | |
| # Same canonical slug as the samples upload above — | |
| # slug(name__model_id__provider). Single source of truth for | |
| # all eval-pipeline routing identifiers (HF label, --models | |
| # filter, dataset folder). | |
| results_path = f"{model_slug}/results_{task_name}_{date_day}.json" | |
| api = HfApi(token=token) | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(results_bytes), | |
| path_in_repo=results_path, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"results {task_name} {date_day}", | |
| ) | |
| logger.info("Uploaded results to %s (path: %s)", repo_id, results_path) | |
| except Exception as e: | |
| logger.warning("Custom HF results upload failed: %s", e) | |
| logger.info(repr(e)) | |
| return results | |
| def _filter_models_by_cost( | |
| models: list[dict], | |
| tasks: list[dict], | |
| max_cost_usd: float, | |
| ) -> list[dict]: | |
| """Estimate per-model cost for the given tasks; drop any over max_cost_usd. | |
| Models whose pricing cannot be resolved (no OpenRouter entry and no TOML | |
| `pricing` block) are also dropped, with a warning. Prints a kept/dropped | |
| table so the run log explains exactly why each model was included. | |
| """ | |
| from cost_core import ( | |
| cost_per_model, | |
| fetch_openrouter_pricing, | |
| format_money, | |
| measure_tasks, | |
| ) | |
| print(f"Estimating cost across {len(tasks)} task group(s) for {len(models)} model(s)…") | |
| task_stats = measure_tasks( | |
| [t["name"] for t in tasks], | |
| include_path=SCRIPT_DIR, | |
| ) | |
| total_input = sum(t.input_tokens for t in task_stats) | |
| total_output = sum(t.output_tokens for t in task_stats) | |
| pricing_data = fetch_openrouter_pricing() | |
| # Distinguish "OpenRouter unreachable / returned nothing" from "specific | |
| # model has no entry". The former silently dropped every OpenRouter model | |
| # from the roster in earlier versions and produced a green workflow with | |
| # zero benchmarks run; raising here surfaces the outage via the workflow's | |
| # `if: failure()` Slack handler instead of degrading silently. | |
| if not pricing_data: | |
| needs_pricing = [ | |
| m for m in models | |
| if not ( | |
| isinstance(m.get("pricing"), dict) | |
| and "input_per_1m" in m["pricing"] | |
| and "output_per_1m" in m["pricing"] | |
| ) | |
| ] | |
| if needs_pricing: | |
| names = ", ".join(m["name"] for m in needs_pricing[:5]) | |
| more = "" if len(needs_pricing) <= 5 else f" (+{len(needs_pricing) - 5} more)" | |
| raise RuntimeError( | |
| f"OpenRouter pricing fetch returned no entries; cannot estimate " | |
| f"cost for {len(needs_pricing)} model(s) lacking a TOML pricing " | |
| f"block: {names}{more}. Failing the discovery step so this " | |
| f"surfaces in alerting rather than silently dropping benchmarks." | |
| ) | |
| rows = cost_per_model( | |
| models, | |
| total_input_tokens=total_input, | |
| total_output_tokens=total_output, | |
| openrouter_pricing=pricing_data, | |
| ) | |
| # Map row -> model dict by (name, model_id, provider_name) so two | |
| # Convex rows that share name AND model_id but route through different | |
| # providers don't collapse to a single entry here. Without provider in | |
| # the key, the cost-filter would drop one of the two before the | |
| # dispatcher ever sees it — defeating the provider-in-slug routing | |
| # work downstream. `provider_name` is empty string for TOML-only | |
| # rows (no Convex provider info), which still keeps each TOML entry | |
| # distinct because pure TOML rosters can't have same-(name, model_id) | |
| # duplicates by construction. | |
| by_key = {(m["name"], m["model_id"], m.get("provider_name", "")): m for m in models} | |
| kept: list[dict] = [] | |
| print() | |
| print(f"{'Model':<32} {'Total $':>10} Decision") | |
| print("-" * 70) | |
| for r in sorted(rows, key=lambda r: (r.total_cost is None, r.total_cost or 0)): | |
| model_entry = by_key[(r.name, r.model_id, r.provider_name)] | |
| forced = bool(model_entry.get("force_include")) | |
| if r.total_cost is None: | |
| if forced: | |
| kept.append(model_entry) | |
| print(f"{r.name:<32} {'—':>10} keep (force_include)") | |
| else: | |
| print(f"{r.name:<32} {'—':>10} drop ({r.note})") | |
| continue | |
| if forced and r.total_cost > max_cost_usd: | |
| kept.append(model_entry) | |
| print( | |
| f"{r.name:<32} {format_money(r.total_cost):>10} " | |
| f"keep (force_include, over ${max_cost_usd:.2f})" | |
| ) | |
| elif r.total_cost <= max_cost_usd: | |
| kept.append(model_entry) | |
| print(f"{r.name:<32} {format_money(r.total_cost):>10} keep") | |
| else: | |
| print( | |
| f"{r.name:<32} {format_money(r.total_cost):>10} " | |
| f"drop (over ${max_cost_usd:.2f})" | |
| ) | |
| print() | |
| return kept | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Run lm-eval from TOML config") | |
| parser.add_argument("--config", default=DEFAULT_CONFIG, help="Path to TOML config") | |
| parser.add_argument("--models", nargs="*", help="Filter to specific model name(s)") | |
| parser.add_argument( | |
| "--tasks", | |
| nargs="*", | |
| help="Task/group names: filter to [[tasks]] in TOML, or any lm-eval task name", | |
| ) | |
| parser.add_argument("--dry-run", action="store_true", help="Print params without running") | |
| parser.add_argument( | |
| "--gen-kwargs", | |
| type=str, | |
| default=None, | |
| help='Comma-separated gen params merged into API JSON, e.g. ' | |
| '"max_gen_toks=8192,temperature=0.6,top_p=0.95,top_k=20,min_p=0.0,until=[\'<|endoftext|>\']"', | |
| ) | |
| parser.add_argument( | |
| "--num-concurrent", | |
| type=int, | |
| default=None, | |
| metavar="N", | |
| help="Override parallel in-flight API requests (default: [defaults].num_concurrent in TOML)", | |
| ) | |
| parser.add_argument( | |
| "--extra-body", | |
| type=str, | |
| default=None, | |
| help='JSON string merged into every API request body, e.g. ' | |
| """'{"provider": {"order": ["alibaba"]}}'""", | |
| ) | |
| parser.add_argument( | |
| "--from-convex", | |
| action="store_true", | |
| help="Discover the model roster from a live Convex deployment " | |
| "(CONVEX_URL env var) instead of reading [[models]] from TOML. " | |
| "TOML still supplies per-model overrides (base_url, api_key_env, " | |
| "pricing, endpoint_kind) by matching on model_id.", | |
| ) | |
| parser.add_argument( | |
| "--max-cost", | |
| type=float, | |
| default=None, | |
| metavar="USD", | |
| help="Drop any discovered model whose estimated cost per full run " | |
| "exceeds this many USD. Models with no resolvable pricing (no " | |
| "OpenRouter entry and no TOML pricing block) are also dropped. " | |
| "Has no effect unless used with --from-convex (or in combination " | |
| "with the TOML roster, in which case it filters that too).", | |
| ) | |
| parser.add_argument( | |
| "--list-models", | |
| type=str, | |
| default=None, | |
| metavar="PATH", | |
| help="Resolve the roster (apply --from-convex / --max-cost), write " | |
| "the full filtered model entries as a JSON array to PATH, print one " | |
| "model name per line to stdout, and exit. Lets a workflow run each " | |
| "model in its own subprocess via --models-file, so a session/asyncio " | |
| "crash on one model can't poison the others.", | |
| ) | |
| parser.add_argument( | |
| "--models-file", | |
| type=str, | |
| default=None, | |
| metavar="PATH", | |
| help="Load the model roster from a JSON file (array of model entries) " | |
| "instead of querying Convex/reading TOML [[models]]. Used together " | |
| "with --models NAME for per-model subprocess invocations.", | |
| ) | |
| args = parser.parse_args() | |
| if args.extra_body: | |
| _extra_body.value = json.loads(args.extra_body) | |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s") | |
| # One-time startup fallback: if OPENAI_API_KEY is empty but OPENROUTER_API_KEY | |
| # is set, populate OPENAI_API_KEY for models that don't override api_key_env. | |
| # This runs BEFORE the model loop so prev_api_key captures the correct value | |
| # and restoration between models works properly. | |
| if not os.environ.get("OPENAI_API_KEY", "").strip(): | |
| or_key = os.environ.get("OPENROUTER_API_KEY", "").strip() | |
| if or_key: | |
| os.environ["OPENAI_API_KEY"] = or_key | |
| cfg = load_config(args.config) | |
| defaults = cfg["defaults"] | |
| hf_hub = cfg.get("hf_hub") | |
| if isinstance(hf_hub, dict): | |
| hf_hub = {k: v for k, v in hf_hub.items()} | |
| else: | |
| hf_hub = None | |
| if args.models_file: | |
| with open(args.models_file) as f: | |
| models = json.load(f) | |
| if not isinstance(models, list): | |
| print(f"--models-file {args.models_file} must contain a JSON array", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Loaded {len(models)} model(s) from {args.models_file}.") | |
| elif args.from_convex: | |
| from model_discovery import fetch_active_models | |
| convex_url = os.environ.get("CONVEX_URL", "").strip() | |
| models = fetch_active_models(convex_url, cfg.get("models", [])) | |
| print(f"Discovered {len(models)} active text model(s) from Convex.") | |
| else: | |
| models = cfg["models"] | |
| tasks = cfg["tasks"] | |
| if args.models: | |
| # Per-arg matching with precedence: slug → model_id → name. | |
| # Each `--models` argument is resolved independently and the | |
| # first matching tier wins for that argument; lower tiers are | |
| # NOT tried as fallbacks for the same arg. This is critical | |
| # because: | |
| # * The dispatcher passes the canonical eval slug | |
| # (slug(name__model_id__provider)). A slug uniquely | |
| # identifies one entry. If we ALSO matched by `model_id` | |
| # for the same arg, every other entry sharing that | |
| # `model_id` (e.g. the same model exposed via a second | |
| # provider) would also match, and the child would run | |
| # all of them — duplicating work and double-uploading. | |
| # * Older code matched any of {slug, model_id, name} across | |
| # ALL args at once, which silently dropped name-only matches | |
| # in mixed invocations like `--models gpt-5-nano my-id` | |
| # (if `my-id` matched a model_id, the name-only `gpt-5-nano` | |
| # was never tried). | |
| # Accepts mixed input — slugs from the dispatcher AND raw | |
| # name/model_id from devs typing into a shell — without the | |
| # fallthrough hazards. | |
| matched: list[dict] = [] | |
| matched_ids: set[int] = set() | |
| unmatched_args: list[str] = [] | |
| def _add(entries: list[dict]) -> None: | |
| for entry in entries: | |
| key = id(entry) | |
| if key not in matched_ids: | |
| matched_ids.add(key) | |
| matched.append(entry) | |
| for arg in args.models: | |
| arg_lower = arg.lower() | |
| slug_hits = [m for m in models if _model_slug(m).lower() == arg_lower] | |
| if slug_hits: | |
| _add(slug_hits) | |
| continue | |
| id_hits = [m for m in models if m.get("model_id", "").lower() == arg_lower] | |
| if id_hits: | |
| _add(id_hits) | |
| continue | |
| name_hits = [m for m in models if m["name"].lower() == arg_lower] | |
| if name_hits: | |
| _add(name_hits) | |
| continue | |
| unmatched_args.append(arg) | |
| if not matched: | |
| known = sorted(_model_slug(m) for m in models) | |
| print(f"No models matched: {args.models}", file=sys.stderr) | |
| print( | |
| f"Available identifiers (slug(name__model_id__provider)): {known}", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| if unmatched_args: | |
| # Some args matched, some didn't — surface the misses but | |
| # continue with the partial set so a single typo in a long | |
| # invocation doesn't waste the whole run. | |
| print( | |
| f"WARNING: --models had no match for: {unmatched_args}", | |
| file=sys.stderr, | |
| ) | |
| models = matched | |
| if args.tasks: | |
| filter_set = {t.lower() for t in args.tasks} | |
| matched = [t for t in tasks if t["name"].lower() in filter_set] | |
| if matched: | |
| tasks = matched | |
| else: | |
| tasks = [{"name": n} for n in args.tasks] | |
| if args.max_cost is not None: | |
| models = _filter_models_by_cost(models, tasks, args.max_cost) | |
| # Write the roster file FIRST (even if empty), so the workflow's | |
| # `jq -r '.[].name' output/roster.json` step always finds the file. | |
| # An empty array → zero names → bash loop iterates zero times → workflow | |
| # logs "Per-model failures: 0 of 0" and finishes cleanly. Without this, | |
| # an over-aggressive --max-cost would race the file's existence and | |
| # produce a misleading "workflow failed" Slack alert. | |
| if args.list_models: | |
| os.makedirs(os.path.dirname(args.list_models) or ".", exist_ok=True) | |
| with open(args.list_models, "w") as f: | |
| json.dump(models, f, indent=2) | |
| for m in models: | |
| print(m["name"]) | |
| print( | |
| f"\nWrote {len(models)} model(s) to {args.list_models}.", | |
| file=sys.stderr, | |
| ) | |
| return | |
| if args.max_cost is not None and not models: | |
| print( | |
| f"No models passed the --max-cost ${args.max_cost:.2f} threshold; nothing to run.", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(0) | |
| total = len(models) * len(tasks) | |
| print(f"Running {len(models)} model(s) x {len(tasks)} task(s) = {total} eval(s)\n") | |
| failures = 0 | |
| for i, model in enumerate(models, 1): | |
| # --- Multi-provider support --- | |
| # Each [[models]] entry can optionally override: | |
| # base_url — custom API endpoint (falls back to [defaults].base_url) | |
| # api_key_env — env var name holding the API key (default: OPENAI_API_KEY) | |
| # custom_headers — dict of extra HTTP headers | |
| model_base_url = model.get("base_url", defaults["base_url"]) | |
| # Save original API key so we can restore it after this model | |
| prev_api_key = os.environ.get("OPENAI_API_KEY", "") | |
| api_key_env = model.get("api_key_env", "OPENAI_API_KEY") | |
| if api_key_env != "OPENAI_API_KEY": | |
| env_val = os.environ.get(api_key_env, "").strip() | |
| if env_val: | |
| os.environ["OPENAI_API_KEY"] = env_val | |
| else: | |
| # Clear the stale key rather than silently inheriting the | |
| # previous model's credentials. This forces a clean auth | |
| # failure on the misconfigured model instead of making it | |
| # use the wrong key. | |
| logger.warning( | |
| "Model %s requests api_key_env=%s but that var is empty/unset; clearing OPENAI_API_KEY for this model.", | |
| model["name"], | |
| api_key_env, | |
| ) | |
| os.environ["OPENAI_API_KEY"] = "" | |
| model_custom_headers = model.get("custom_headers", {}) | |
| if model_custom_headers: | |
| prev_extra = _extra_body.value.copy() | |
| _extra_body.value = {**_extra_body.value, "extra_headers": model_custom_headers} | |
| for j, task in enumerate(tasks, 1): | |
| run_idx = (i - 1) * len(tasks) + j | |
| output_path = os.path.join( | |
| defaults.get("output_path", "output/results"), | |
| model["name"], | |
| task["name"], | |
| ) | |
| gen_kwargs = args.gen_kwargs or model.get("gen_kwargs") | |
| num_concurrent = ( | |
| args.num_concurrent | |
| if args.num_concurrent is not None | |
| else defaults.get("num_concurrent", 5) | |
| ) | |
| # Resilience knobs read from [defaults]; see eval_config.toml. | |
| # Stock lm-eval defaults (3 / 300) are kept as fallbacks so the | |
| # behavior is unchanged when the TOML doesn't override them. | |
| max_retries = int(defaults.get("max_retries", 3)) | |
| timeout = int(defaults.get("timeout", 300)) | |
| eval_kwargs = dict( | |
| include_path=SCRIPT_DIR, | |
| task_name=task["name"], | |
| model_id=model["model_id"], | |
| model_display_name=model["name"], | |
| # Pre-compute the canonical slug once per (model, task) so the | |
| # HF upload folder is guaranteed to match the dispatcher's | |
| # spawn label and the child's `--models` arg. | |
| model_slug=_model_slug(model), | |
| output_path=output_path, | |
| base_url=model_base_url, | |
| num_concurrent=num_concurrent, | |
| num_fewshot=defaults.get("num_fewshot", 0), | |
| apply_chat_template=defaults.get("apply_chat_template", True), | |
| log_samples=defaults.get("log_samples", True), | |
| gen_kwargs=gen_kwargs, | |
| hf_hub=hf_hub, | |
| endpoint_kind=model.get("endpoint_kind", "chat_completions"), | |
| max_retries=max_retries, | |
| timeout=timeout, | |
| ) | |
| header = f"[{run_idx}/{total}] {model['name']} x {task['name']}" | |
| print(f"{'-' * 60}") | |
| print(f" {header}") | |
| print(f" -> params: {eval_kwargs}") | |
| print(f"{'-' * 60}") | |
| if args.dry_run: | |
| continue | |
| try: | |
| results = run_single_eval(**eval_kwargs) | |
| if results: | |
| table_str = make_table(results) | |
| # Replace Unicode chars that fail on Windows cp1252 | |
| table_str = table_str.encode("ascii", "replace").decode("ascii") | |
| print(table_str) | |
| print(f"\n[DONE] {header}\n") | |
| except Exception: | |
| traceback.print_exc() | |
| print(f"\n[FAILED] {header}\n", file=sys.stderr) | |
| failures += 1 | |
| # Restore state after processing all tasks for this model | |
| if model_custom_headers: | |
| _extra_body.value = prev_extra | |
| if api_key_env != "OPENAI_API_KEY": | |
| os.environ["OPENAI_API_KEY"] = prev_api_key | |
| if failures: | |
| print(f"\n{failures}/{total} evaluation(s) failed.", file=sys.stderr) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |