eval-runner / entrypoint.py
ouhenio's picture
Webhook receiver mode: Space receives POSTs, spawns GPU Jobs
9a21135
"""
eval-runner entrypoint β€” Two modes:
1. WEBHOOK RECEIVER (Space mode): Runs an HTTP server on port 7860 that receives
webhook POSTs from HF and spawns GPU Jobs with the payload.
2. EVALUATION (Job mode): When WEBHOOK_PAYLOAD is set, runs the eval pipeline.
"""
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
import requests
import yaml
from huggingface_hub import HfApi
# ─── Configuration ───────────────────────────────────────────────────────────
EVAL_SUITES = os.environ.get("EVAL_SUITES", "post_training,post_training_es").split(",")
EVAL_YAML_URL = os.environ.get(
"EVAL_YAML_URL",
"https://raw.githubusercontent.com/latam-gpt/olmo_framework/main/configs/eval.yaml",
)
RESULTS_REPO = os.environ.get("RESULTS_REPO", "latam-gpt/eval-results")
PARAM_MIN = 7.5e9
PARAM_MAX = 8.5e9
GPUS = int(os.environ.get("GPUS", "1"))
OUTPUT_DIR = Path(os.environ.get("OUTPUT_DIR", "/tmp/eval_results"))
# ─── Param estimation ────────────────────────────────────────────────────────
def estimate_params(cfg: dict) -> int:
"""Estimate total parameters from a Llama-style config.json.
Accounts for Q/K/V/O attention projections, SwiGLU FFN (3 matrices),
and embeddings (doubled if tie_word_embeddings is false).
"""
H = cfg["hidden_size"]
L = cfg["num_hidden_layers"]
FFN = cfg["intermediate_size"]
V = cfg["vocab_size"]
nH = cfg["num_attention_heads"]
nKV = cfg.get("num_key_value_heads", nH)
D = cfg.get("head_dim", H // nH)
attn = H * (nH * D) + H * (nKV * D) * 2 + (nH * D) * H # Q + K + V + O
ffn = 3 * H * FFN # gate + up + down (SwiGLU)
emb = V * H * (1 if cfg.get("tie_word_embeddings") else 2)
return L * (attn + ffn) + emb
# ─── Eval YAML fetching ──────────────────────────────────────────────────────
LOCAL_EVAL_YAML = Path("/app/eval.yaml")
def fetch_eval_config(token: str) -> dict:
"""Fetch eval.yaml from GitHub, falling back to local copy baked into the image."""
# Try remote first (picks up changes without rebuilding the image)
try:
headers = {"Authorization": f"Bearer {token}"}
resp = requests.get(EVAL_YAML_URL, headers=headers, timeout=30)
resp.raise_for_status()
return yaml.safe_load(resp.text)
except Exception as e:
print(f"WARN: remote eval.yaml fetch failed ({e}), using local copy.")
# Fall back to local copy
if LOCAL_EVAL_YAML.exists():
return yaml.safe_load(LOCAL_EVAL_YAML.read_text())
raise RuntimeError("No eval.yaml available (remote fetch failed and no local copy).")
# ─── CLI runners ─────────────────────────────────────────────────────────────
def run_olmes_eval(model_id: str, tasks: list[str], output_dir: Path) -> bool:
"""Run olmes CLI for a set of tasks. Returns True on success."""
output_dir.mkdir(parents=True, exist_ok=True)
model_args = {"trust_remote_code": True, "max_length": 2560}
if GPUS > 1:
model_args["tensor_parallel_size"] = GPUS
cmd = [
"olmes",
"--model", model_id,
"--task", *tasks,
"--output-dir", str(output_dir),
"--model-type", "vllm",
"--model-args", json.dumps(model_args),
]
env = os.environ.copy()
if "VLLM_USE_V1" not in env:
env["VLLM_USE_V1"] = "1"
env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
print(f"Running olmes: {' '.join(cmd)}")
try:
subprocess.run(cmd, check=True, env=env)
return True
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"ERROR running olmes: {e}", file=sys.stderr)
return False
def run_lm_eval(model_id: str, tasks: list[str], output_dir: Path) -> bool:
"""Run lm_eval CLI for a set of tasks. Returns True on success."""
output_dir.mkdir(parents=True, exist_ok=True)
model_args = f"pretrained={model_id},tensor_parallel_size={GPUS},trust_remote_code=True"
cmd = [
"lm_eval",
"--model", "vllm",
"--model_args", model_args,
"--tasks", ",".join(tasks),
"--output_path", str(output_dir),
"--batch_size", "auto",
"--device", "cuda",
]
env = os.environ.copy()
env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
print(f"Running lm_eval: {' '.join(cmd)}")
try:
subprocess.run(cmd, check=True, env=env)
return True
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"ERROR running lm_eval: {e}", file=sys.stderr)
return False
# ─── Result collection ────────────────────────────────────────────────────────
def collect_olmes_results(output_dir: Path) -> dict:
"""Collect olmes results from per-task JSON files."""
metrics = {}
for jf in output_dir.glob("**/*.json"):
if jf.name == "summary.json":
continue
try:
with open(jf) as f:
data = json.load(f)
task_name = jf.stem
if isinstance(data, dict):
if "metrics" in data:
metrics[task_name] = data["metrics"]
elif "results" in data:
metrics[task_name] = data["results"]
else:
metrics[task_name] = data
except (json.JSONDecodeError, KeyError) as e:
print(f"WARN: could not parse {jf}: {e}", file=sys.stderr)
return metrics
def collect_lm_eval_results(output_dir: Path) -> dict:
"""Collect lm-eval results from single JSON with 'results' key."""
metrics = {}
for jf in output_dir.glob("**/*.json"):
if jf.name == "summary.json":
continue
try:
with open(jf) as f:
data = json.load(f)
if isinstance(data, dict) and "results" in data and isinstance(data["results"], dict):
for task_name, task_metrics in data["results"].items():
if isinstance(task_metrics, dict):
metrics[task_name] = task_metrics
break # single results file expected
except (json.JSONDecodeError, KeyError) as e:
print(f"WARN: could not parse {jf}: {e}", file=sys.stderr)
return metrics
def compute_mean_accuracy(metrics: dict) -> float | None:
"""Compute mean accuracy across tasks."""
accuracies = []
for task_metrics in metrics.values():
if not isinstance(task_metrics, dict):
continue
for key in ["acc,none", "accuracy", "acc", "exact_match", "pass@1"]:
if key in task_metrics:
accuracies.append(task_metrics[key])
break
return sum(accuracies) / len(accuracies) if accuracies else None
# ─── Webhook receiver (Space mode) ────────────────────────────────────────────
def run_webhook_server():
"""Run an HTTP server that receives webhook POSTs and spawns eval Jobs."""
from http.server import HTTPServer, BaseHTTPRequestHandler
from huggingface_hub import run_job
token = os.environ.get("HF_TOKEN")
webhook_secret = os.environ.get("WEBHOOK_SECRET")
namespace = os.environ.get("JOB_NAMESPACE", "latam-gpt")
space_id = os.environ.get("JOB_SPACE_ID", "latam-gpt/eval-runner")
flavor = os.environ.get("JOB_FLAVOR", "a100-large")
timeout = os.environ.get("JOB_TIMEOUT", "3h")
if not token:
print("ERROR: HF_TOKEN must be set as a Space secret.", file=sys.stderr)
sys.exit(1)
class WebhookHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b"eval-runner webhook receiver is running")
def do_POST(self):
# Verify webhook secret if configured
if webhook_secret:
req_secret = self.headers.get("X-Webhook-Secret", "")
if req_secret != webhook_secret:
self.send_response(403)
self.end_headers()
self.wfile.write(b"Invalid webhook secret")
return
# Read payload
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
try:
payload = json.loads(body)
repo_type = payload.get("repo", {}).get("type", "")
action = payload.get("event", {}).get("action", "")
repo_name = payload.get("repo", {}).get("name", "unknown")
print(f"Webhook received: {repo_type} {action} β€” {repo_name}")
# Only spawn a Job for model updates
if repo_type != "model" or action != "update":
print(f"Skipping: {repo_type} {action}")
self.send_response(200)
self.end_headers()
self.wfile.write(b"Skipped: not a model update")
return
# Spawn a GPU Job with the webhook payload
print(f"Spawning eval Job for {repo_name}...")
job = run_job(
image=f"hf.co/spaces/{space_id}",
command=["python3", "/app/entrypoint.py"],
flavor=flavor,
timeout=timeout,
namespace=namespace,
secrets={"HF_TOKEN": token},
env={"WEBHOOK_PAYLOAD": body},
token=token,
)
print(f"Job spawned: {job.id} β€” {job.url}")
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps({"job_id": job.id}).encode())
except Exception as e:
print(f"ERROR handling webhook: {e}", file=sys.stderr)
self.send_response(500)
self.end_headers()
self.wfile.write(str(e).encode())
def log_message(self, format, *args):
print(f"[webhook] {args[0]}")
port = int(os.environ.get("PORT", "7860"))
server = HTTPServer(("0.0.0.0", port), WebhookHandler)
print(f"Webhook receiver listening on port {port}")
print(f" Namespace: {namespace} | Flavor: {flavor} | Timeout: {timeout}")
print(f" Secret verification: {'enabled' if webhook_secret else 'disabled'}")
server.serve_forever()
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
# 1. Parse webhook payload β€” if not set, run as webhook receiver
payload_raw = os.environ.get("WEBHOOK_PAYLOAD")
if not payload_raw:
run_webhook_server()
return # never reached (serve_forever)
payload = json.loads(payload_raw)
repo_type = payload["repo"]["type"]
action = payload["event"]["action"]
model_id = payload["repo"]["name"]
sha = payload["repo"].get("headSha", "main")
# 2. Filter: only model update events
if repo_type != "model" or action != "update":
print(f"Skipping: {repo_type} {action}")
sys.exit(0)
print(f"Evaluating {model_id} @ {sha[:8]}")
token = os.environ["HF_TOKEN"]
# 3. Fetch config.json (lightweight, no weights)
resp = requests.get(
f"https://huggingface.co/{model_id}/raw/{sha}/config.json",
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
if resp.status_code != 200:
print(f"Could not fetch config.json (HTTP {resp.status_code}) β€” skipping.")
sys.exit(0)
config = resp.json()
# 4. Estimate params β€” skip if outside 8B range
try:
total_params = estimate_params(config)
except KeyError as e:
print(f"Missing config field {e} β€” skipping.")
sys.exit(0)
print(f"Estimated params: {total_params / 1e9:.2f}B")
if not (PARAM_MIN <= total_params <= PARAM_MAX):
print(f"Skipping: {total_params / 1e9:.2f}B is outside {PARAM_MIN/1e9:.1f}B–{PARAM_MAX/1e9:.1f}B range.")
sys.exit(0)
# 5. Duplicate check β€” skip if results already exist
api = HfApi(token=token)
result_filename = f"{model_id.replace('/', '__')}__{sha[:8]}.json"
try:
existing = api.list_repo_files(repo_id=RESULTS_REPO, repo_type="dataset")
if result_filename in existing:
print(f"Results already exist: {result_filename} β€” skipping.")
sys.exit(0)
except Exception as e:
print(f"WARN: could not check for existing results: {e}")
# 6. Fetch eval.yaml from olmo_framework repo
try:
eval_config = fetch_eval_config(token)
except Exception as e:
print(f"ERROR: could not fetch eval.yaml: {e}", file=sys.stderr)
sys.exit(1)
suites_config = eval_config.get("suites", {})
# 7–8. Run evaluations
results = {}
for suite_name in EVAL_SUITES:
suite_name = suite_name.strip()
suite = suites_config.get(suite_name)
if not suite:
print(f"WARN: suite '{suite_name}' not found in eval.yaml β€” skipping.")
results[suite_name] = {"error": f"suite not found in eval.yaml"}
continue
tasks = suite["tasks"]
backend = suite.get("backend", "olmes")
suite_output = OUTPUT_DIR / model_id.replace("/", "__") / suite_name
print(f"\n{'─' * 60}")
print(f"Suite: {suite_name} | Backend: {backend} | Tasks: {', '.join(tasks)}")
print(f"{'─' * 60}")
if backend == "lm_eval":
success = run_lm_eval(model_id, tasks, suite_output)
metrics = collect_lm_eval_results(suite_output) if success else {}
else:
success = run_olmes_eval(model_id, tasks, suite_output)
metrics = collect_olmes_results(suite_output) if success else {}
mean_acc = compute_mean_accuracy(metrics)
suite_result = {"metrics": metrics}
if mean_acc is not None:
suite_result["mean_accuracy"] = mean_acc
if not success:
suite_result["error"] = "evaluation failed"
results[suite_name] = suite_result
print(f"Suite {suite_name}: {'OK' if success else 'FAILED'} | mean_accuracy={mean_acc}")
# 9. Build summary
summary = {
"model": model_id,
"sha": sha,
"params_b": round(total_params / 1e9, 2),
"results": results,
}
# 10. Upload to eval-results dataset
print(f"\nUploading results to {RESULTS_REPO}/{result_filename}")
try:
api.upload_file(
path_or_fileobj=json.dumps(summary, indent=2).encode(),
path_in_repo=result_filename,
repo_id=RESULTS_REPO,
repo_type="dataset",
)
print(f"Results uploaded: {result_filename}")
except Exception as e:
print(f"ERROR uploading results: {e}", file=sys.stderr)
# Save locally as fallback
fallback_path = OUTPUT_DIR / result_filename
fallback_path.parent.mkdir(parents=True, exist_ok=True)
fallback_path.write_text(json.dumps(summary, indent=2))
print(f"Results saved locally: {fallback_path}")
print(json.dumps(summary, indent=2))
if __name__ == "__main__":
main()