benchmark-race / update_data.py
davanstrien's picture
davanstrien HF Staff
Add scheduled data update script
05a4bf2 verified
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "httpx",
# "huggingface_hub",
# ]
# ///
"""
Scheduled job: regenerate data.json and upload to the benchmark-race Space.
Run locally:
uv run update_data.py
Schedule on HF Jobs (twice daily):
hf jobs scheduled uv run "0 8,20 * * *" \
--secrets HF_TOKEN \
https://huggingface.co/spaces/davanstrien/benchmark-race/resolve/main/update_data.py
"""
import json
import os
import re
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
import httpx
from huggingface_hub import HfApi
SPACE_REPO = "davanstrien/benchmark-race"
BENCHMARK_CONFIGS = [
{"dataset": "SWE-bench/SWE-bench_Verified", "key": "sweVerified", "name": "SWE-bench Verified", "gated": False},
{"dataset": "ScaleAI/SWE-bench_Pro", "key": "swePro", "name": "SWE-bench Pro", "gated": False},
{"dataset": "TIGER-Lab/MMLU-Pro", "key": "mmluPro", "name": "MMLU-Pro", "gated": False},
{"dataset": "Idavidrein/gpqa", "key": "gpqa", "name": "GPQA Diamond", "gated": True},
{"dataset": "cais/hle", "key": "hle", "name": "HLE", "gated": True},
{"dataset": "MathArena/aime_2026", "key": "aime2026", "name": "AIME 2026", "gated": False},
{"dataset": "MathArena/hmmt_feb_2026", "key": "hmmt2026", "name": "HMMT Feb 2026", "gated": False},
{"dataset": "allenai/olmOCR-bench", "key": "olmOcr", "name": "olmOCR-bench", "gated": False},
{"dataset": "harborframework/terminal-bench-2.0", "key": "terminalBench", "name": "Terminal-Bench 2.0", "gated": False},
{"dataset": "FutureMa/EvasionBench", "key": "evasionBench", "name": "EvasionBench", "gated": False},
]
PALETTE = [
"#6366f1", "#0d9488", "#d97706", "#e11d48", "#7c3aed",
"#16a34a", "#2563eb", "#ea580c", "#8b5cf6", "#0891b2",
"#c026d3", "#65a30d", "#dc2626", "#0284c7", "#a21caf",
"#059669", "#9333ea", "#ca8a04", "#be185d", "#0369a1",
]
def fetch_leaderboard(config: dict, hf_token: str | None) -> list[dict]:
url = f"https://huggingface.co/api/datasets/{config['dataset']}/leaderboard"
headers = {}
if config["gated"] and hf_token:
headers["Authorization"] = f"Bearer {hf_token}"
elif config["gated"]:
print(f" {config['name']}: skipped (gated, no token)")
return []
print(f" {config['name']}: fetching scores...")
try:
resp = httpx.get(url, headers=headers, timeout=30)
if resp.status_code != 200:
print(f" skip (status {resp.status_code})")
return []
data = resp.json()
if not isinstance(data, list):
return []
except Exception as e:
print(f" error: {e}")
return []
seen = {}
for entry in data:
model_id = entry.get("modelId")
score = entry.get("value")
if model_id and score is not None:
score = float(score)
if model_id not in seen or score > seen[model_id]:
seen[model_id] = score
print(f" {len(seen)} models")
return [{"model_id": mid, "score": s} for mid, s in seen.items()]
def fetch_model_dates(model_ids: list[str], hf_token: str | None) -> dict[str, dict]:
api = HfApi()
results = {}
def _get_info(mid):
try:
info = api.model_info(mid, token=hf_token)
params_b = None
if info.safetensors and hasattr(info.safetensors, "total"):
params_b = round(info.safetensors.total / 1_000_000_000, 1)
if params_b is None:
m = re.findall(r"[-_/](\d+\.?\d*)[Bb](?:[-_/]|$)", mid)
if m:
params_b = max(float(x) for x in m)
return mid, info.created_at.strftime("%Y-%m-%d"), params_b
except Exception:
return mid, None, None
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(_get_info, mid): mid for mid in model_ids}
for f in as_completed(futures):
mid, date, params = f.result()
if date:
results[mid] = {"date": date, "parameters_b": params}
return results
def fetch_logo(provider: str) -> str | None:
try:
resp = httpx.get(
f"https://huggingface.co/api/organizations/{provider}/avatar",
timeout=5,
)
if resp.status_code == 200:
return resp.json().get("avatarUrl")
except Exception:
pass
return None
def fetch_all_logos(providers: set[str]) -> dict[str, str]:
logos = {}
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(fetch_logo, p): p for p in providers}
for f in as_completed(futures):
p = futures[f]
url = f.result()
if url:
logos[p] = url
return logos
def main():
hf_token = os.environ.get("HF_TOKEN")
print("Generating data.json for bar chart race\n")
all_scores: dict[str, list[dict]] = {}
all_model_ids: set[str] = set()
for config in BENCHMARK_CONFIGS:
rows = fetch_leaderboard(config, hf_token)
if rows:
all_scores[config["key"]] = {"name": config["name"], "rows": rows}
all_model_ids.update(r["model_id"] for r in rows)
print(f"\n{len(all_model_ids)} unique models across {len(all_scores)} benchmarks")
print("Fetching model dates...")
model_dates = fetch_model_dates(list(all_model_ids), hf_token)
print(f" got dates for {len(model_dates)}/{len(all_model_ids)} models")
all_providers: set[str] = set()
benchmarks = {}
for key, info in all_scores.items():
models = []
for row in info["rows"]:
mid = row["model_id"]
if mid not in model_dates:
continue
provider = mid.split("/")[0] if "/" in mid else mid
short_name = mid.split("/")[-1]
all_providers.add(provider)
models.append({
"model_id": mid,
"short_name": short_name,
"provider": provider,
"score": round(row["score"], 2),
"date": model_dates[mid]["date"],
})
if models:
benchmarks[key] = {"name": info["name"], "models": models}
print(f"\nFetching logos for {len(all_providers)} providers...")
logos = fetch_all_logos(all_providers)
print(f" got {len(logos)} logos")
color_map = {}
for i, provider in enumerate(sorted(all_providers)):
color_map[provider] = PALETTE[i % len(PALETTE)]
output = {
"benchmarks": benchmarks,
"logos": logos,
"colors": color_map,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
data_json = json.dumps(output, indent=2)
print(f"\nGenerated {len(data_json) / 1024:.1f} KB")
for key, bm in benchmarks.items():
print(f" {bm['name']}: {len(bm['models'])} models")
# Upload to Space
print(f"\nUploading data.json to {SPACE_REPO}...")
api = HfApi()
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write(data_json)
tmp_path = f.name
try:
api.upload_file(
path_or_fileobj=tmp_path,
path_in_repo="data.json",
repo_id=SPACE_REPO,
repo_type="space",
commit_message=f"Update data.json ({datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')})",
)
print("Done!")
finally:
Path(tmp_path).unlink(missing_ok=True)
if __name__ == "__main__":
main()