Spaces:
Running
Running
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "httpx", | |
| # "huggingface_hub", | |
| # ] | |
| # /// | |
| """ | |
| Scheduled job: regenerate data.json and upload to the benchmark-race Space. | |
| Run locally: | |
| uv run update_data.py | |
| Schedule on HF Jobs (twice daily): | |
| hf jobs scheduled uv run "0 8,20 * * *" \ | |
| --secrets HF_TOKEN \ | |
| https://huggingface.co/spaces/davanstrien/benchmark-race/resolve/main/update_data.py | |
| """ | |
| import json | |
| import os | |
| import re | |
| import tempfile | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import httpx | |
| from huggingface_hub import HfApi | |
| SPACE_REPO = "davanstrien/benchmark-race" | |
| BENCHMARK_CONFIGS = [ | |
| {"dataset": "SWE-bench/SWE-bench_Verified", "key": "sweVerified", "name": "SWE-bench Verified", "gated": False}, | |
| {"dataset": "ScaleAI/SWE-bench_Pro", "key": "swePro", "name": "SWE-bench Pro", "gated": False}, | |
| {"dataset": "TIGER-Lab/MMLU-Pro", "key": "mmluPro", "name": "MMLU-Pro", "gated": False}, | |
| {"dataset": "Idavidrein/gpqa", "key": "gpqa", "name": "GPQA Diamond", "gated": True}, | |
| {"dataset": "cais/hle", "key": "hle", "name": "HLE", "gated": True}, | |
| {"dataset": "MathArena/aime_2026", "key": "aime2026", "name": "AIME 2026", "gated": False}, | |
| {"dataset": "MathArena/hmmt_feb_2026", "key": "hmmt2026", "name": "HMMT Feb 2026", "gated": False}, | |
| {"dataset": "allenai/olmOCR-bench", "key": "olmOcr", "name": "olmOCR-bench", "gated": False}, | |
| {"dataset": "harborframework/terminal-bench-2.0", "key": "terminalBench", "name": "Terminal-Bench 2.0", "gated": False}, | |
| {"dataset": "FutureMa/EvasionBench", "key": "evasionBench", "name": "EvasionBench", "gated": False}, | |
| ] | |
| PALETTE = [ | |
| "#6366f1", "#0d9488", "#d97706", "#e11d48", "#7c3aed", | |
| "#16a34a", "#2563eb", "#ea580c", "#8b5cf6", "#0891b2", | |
| "#c026d3", "#65a30d", "#dc2626", "#0284c7", "#a21caf", | |
| "#059669", "#9333ea", "#ca8a04", "#be185d", "#0369a1", | |
| ] | |
| def fetch_leaderboard(config: dict, hf_token: str | None) -> list[dict]: | |
| url = f"https://huggingface.co/api/datasets/{config['dataset']}/leaderboard" | |
| headers = {} | |
| if config["gated"] and hf_token: | |
| headers["Authorization"] = f"Bearer {hf_token}" | |
| elif config["gated"]: | |
| print(f" {config['name']}: skipped (gated, no token)") | |
| return [] | |
| print(f" {config['name']}: fetching scores...") | |
| try: | |
| resp = httpx.get(url, headers=headers, timeout=30) | |
| if resp.status_code != 200: | |
| print(f" skip (status {resp.status_code})") | |
| return [] | |
| data = resp.json() | |
| if not isinstance(data, list): | |
| return [] | |
| except Exception as e: | |
| print(f" error: {e}") | |
| return [] | |
| seen = {} | |
| for entry in data: | |
| model_id = entry.get("modelId") | |
| score = entry.get("value") | |
| if model_id and score is not None: | |
| score = float(score) | |
| if model_id not in seen or score > seen[model_id]: | |
| seen[model_id] = score | |
| print(f" {len(seen)} models") | |
| return [{"model_id": mid, "score": s} for mid, s in seen.items()] | |
| def fetch_model_dates(model_ids: list[str], hf_token: str | None) -> dict[str, dict]: | |
| api = HfApi() | |
| results = {} | |
| def _get_info(mid): | |
| try: | |
| info = api.model_info(mid, token=hf_token) | |
| params_b = None | |
| if info.safetensors and hasattr(info.safetensors, "total"): | |
| params_b = round(info.safetensors.total / 1_000_000_000, 1) | |
| if params_b is None: | |
| m = re.findall(r"[-_/](\d+\.?\d*)[Bb](?:[-_/]|$)", mid) | |
| if m: | |
| params_b = max(float(x) for x in m) | |
| return mid, info.created_at.strftime("%Y-%m-%d"), params_b | |
| except Exception: | |
| return mid, None, None | |
| with ThreadPoolExecutor(max_workers=8) as pool: | |
| futures = {pool.submit(_get_info, mid): mid for mid in model_ids} | |
| for f in as_completed(futures): | |
| mid, date, params = f.result() | |
| if date: | |
| results[mid] = {"date": date, "parameters_b": params} | |
| return results | |
| def fetch_logo(provider: str) -> str | None: | |
| try: | |
| resp = httpx.get( | |
| f"https://huggingface.co/api/organizations/{provider}/avatar", | |
| timeout=5, | |
| ) | |
| if resp.status_code == 200: | |
| return resp.json().get("avatarUrl") | |
| except Exception: | |
| pass | |
| return None | |
| def fetch_all_logos(providers: set[str]) -> dict[str, str]: | |
| logos = {} | |
| with ThreadPoolExecutor(max_workers=8) as pool: | |
| futures = {pool.submit(fetch_logo, p): p for p in providers} | |
| for f in as_completed(futures): | |
| p = futures[f] | |
| url = f.result() | |
| if url: | |
| logos[p] = url | |
| return logos | |
| def main(): | |
| hf_token = os.environ.get("HF_TOKEN") | |
| print("Generating data.json for bar chart race\n") | |
| all_scores: dict[str, list[dict]] = {} | |
| all_model_ids: set[str] = set() | |
| for config in BENCHMARK_CONFIGS: | |
| rows = fetch_leaderboard(config, hf_token) | |
| if rows: | |
| all_scores[config["key"]] = {"name": config["name"], "rows": rows} | |
| all_model_ids.update(r["model_id"] for r in rows) | |
| print(f"\n{len(all_model_ids)} unique models across {len(all_scores)} benchmarks") | |
| print("Fetching model dates...") | |
| model_dates = fetch_model_dates(list(all_model_ids), hf_token) | |
| print(f" got dates for {len(model_dates)}/{len(all_model_ids)} models") | |
| all_providers: set[str] = set() | |
| benchmarks = {} | |
| for key, info in all_scores.items(): | |
| models = [] | |
| for row in info["rows"]: | |
| mid = row["model_id"] | |
| if mid not in model_dates: | |
| continue | |
| provider = mid.split("/")[0] if "/" in mid else mid | |
| short_name = mid.split("/")[-1] | |
| all_providers.add(provider) | |
| models.append({ | |
| "model_id": mid, | |
| "short_name": short_name, | |
| "provider": provider, | |
| "score": round(row["score"], 2), | |
| "date": model_dates[mid]["date"], | |
| }) | |
| if models: | |
| benchmarks[key] = {"name": info["name"], "models": models} | |
| print(f"\nFetching logos for {len(all_providers)} providers...") | |
| logos = fetch_all_logos(all_providers) | |
| print(f" got {len(logos)} logos") | |
| color_map = {} | |
| for i, provider in enumerate(sorted(all_providers)): | |
| color_map[provider] = PALETTE[i % len(PALETTE)] | |
| output = { | |
| "benchmarks": benchmarks, | |
| "logos": logos, | |
| "colors": color_map, | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| data_json = json.dumps(output, indent=2) | |
| print(f"\nGenerated {len(data_json) / 1024:.1f} KB") | |
| for key, bm in benchmarks.items(): | |
| print(f" {bm['name']}: {len(bm['models'])} models") | |
| # Upload to Space | |
| print(f"\nUploading data.json to {SPACE_REPO}...") | |
| api = HfApi() | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: | |
| f.write(data_json) | |
| tmp_path = f.name | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=tmp_path, | |
| path_in_repo="data.json", | |
| repo_id=SPACE_REPO, | |
| repo_type="space", | |
| commit_message=f"Update data.json ({datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')})", | |
| ) | |
| print("Done!") | |
| finally: | |
| Path(tmp_path).unlink(missing_ok=True) | |
| if __name__ == "__main__": | |
| main() | |