""" eval-runner entrypoint — Two modes: 1. WEBHOOK RECEIVER (Space mode): Runs an HTTP server on port 7860 that receives webhook POSTs from HF and spawns GPU Jobs with the payload. 2. EVALUATION (Job mode): When WEBHOOK_PAYLOAD is set, runs the eval pipeline. """ import json import os import subprocess import sys import tempfile from pathlib import Path import requests import yaml from huggingface_hub import HfApi # ─── Configuration ─────────────────────────────────────────────────────────── EVAL_SUITES = os.environ.get("EVAL_SUITES", "post_training,post_training_es").split(",") EVAL_YAML_URL = os.environ.get( "EVAL_YAML_URL", "https://raw.githubusercontent.com/latam-gpt/olmo_framework/main/configs/eval.yaml", ) RESULTS_REPO = os.environ.get("RESULTS_REPO", "latam-gpt/eval-results") PARAM_MIN = 7.5e9 PARAM_MAX = 8.5e9 GPUS = int(os.environ.get("GPUS", "1")) OUTPUT_DIR = Path(os.environ.get("OUTPUT_DIR", "/tmp/eval_results")) # ─── Param estimation ──────────────────────────────────────────────────────── def estimate_params(cfg: dict) -> int: """Estimate total parameters from a Llama-style config.json. Accounts for Q/K/V/O attention projections, SwiGLU FFN (3 matrices), and embeddings (doubled if tie_word_embeddings is false). """ H = cfg["hidden_size"] L = cfg["num_hidden_layers"] FFN = cfg["intermediate_size"] V = cfg["vocab_size"] nH = cfg["num_attention_heads"] nKV = cfg.get("num_key_value_heads", nH) D = cfg.get("head_dim", H // nH) attn = H * (nH * D) + H * (nKV * D) * 2 + (nH * D) * H # Q + K + V + O ffn = 3 * H * FFN # gate + up + down (SwiGLU) emb = V * H * (1 if cfg.get("tie_word_embeddings") else 2) return L * (attn + ffn) + emb # ─── Eval YAML fetching ────────────────────────────────────────────────────── LOCAL_EVAL_YAML = Path("/app/eval.yaml") def fetch_eval_config(token: str) -> dict: """Fetch eval.yaml from GitHub, falling back to local copy baked into the image.""" # Try remote first (picks up changes without rebuilding the image) try: headers = {"Authorization": f"Bearer {token}"} resp = requests.get(EVAL_YAML_URL, headers=headers, timeout=30) resp.raise_for_status() return yaml.safe_load(resp.text) except Exception as e: print(f"WARN: remote eval.yaml fetch failed ({e}), using local copy.") # Fall back to local copy if LOCAL_EVAL_YAML.exists(): return yaml.safe_load(LOCAL_EVAL_YAML.read_text()) raise RuntimeError("No eval.yaml available (remote fetch failed and no local copy).") # ─── CLI runners ───────────────────────────────────────────────────────────── def run_olmes_eval(model_id: str, tasks: list[str], output_dir: Path) -> bool: """Run olmes CLI for a set of tasks. Returns True on success.""" output_dir.mkdir(parents=True, exist_ok=True) model_args = {"trust_remote_code": True, "max_length": 2560} if GPUS > 1: model_args["tensor_parallel_size"] = GPUS cmd = [ "olmes", "--model", model_id, "--task", *tasks, "--output-dir", str(output_dir), "--model-type", "vllm", "--model-args", json.dumps(model_args), ] env = os.environ.copy() if "VLLM_USE_V1" not in env: env["VLLM_USE_V1"] = "1" env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" print(f"Running olmes: {' '.join(cmd)}") try: subprocess.run(cmd, check=True, env=env) return True except (subprocess.CalledProcessError, FileNotFoundError) as e: print(f"ERROR running olmes: {e}", file=sys.stderr) return False def run_lm_eval(model_id: str, tasks: list[str], output_dir: Path) -> bool: """Run lm_eval CLI for a set of tasks. Returns True on success.""" output_dir.mkdir(parents=True, exist_ok=True) model_args = f"pretrained={model_id},tensor_parallel_size={GPUS},trust_remote_code=True" cmd = [ "lm_eval", "--model", "vllm", "--model_args", model_args, "--tasks", ",".join(tasks), "--output_path", str(output_dir), "--batch_size", "auto", "--device", "cuda", ] env = os.environ.copy() env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" print(f"Running lm_eval: {' '.join(cmd)}") try: subprocess.run(cmd, check=True, env=env) return True except (subprocess.CalledProcessError, FileNotFoundError) as e: print(f"ERROR running lm_eval: {e}", file=sys.stderr) return False # ─── Result collection ──────────────────────────────────────────────────────── def collect_olmes_results(output_dir: Path) -> dict: """Collect olmes results from per-task JSON files.""" metrics = {} for jf in output_dir.glob("**/*.json"): if jf.name == "summary.json": continue try: with open(jf) as f: data = json.load(f) task_name = jf.stem if isinstance(data, dict): if "metrics" in data: metrics[task_name] = data["metrics"] elif "results" in data: metrics[task_name] = data["results"] else: metrics[task_name] = data except (json.JSONDecodeError, KeyError) as e: print(f"WARN: could not parse {jf}: {e}", file=sys.stderr) return metrics def collect_lm_eval_results(output_dir: Path) -> dict: """Collect lm-eval results from single JSON with 'results' key.""" metrics = {} for jf in output_dir.glob("**/*.json"): if jf.name == "summary.json": continue try: with open(jf) as f: data = json.load(f) if isinstance(data, dict) and "results" in data and isinstance(data["results"], dict): for task_name, task_metrics in data["results"].items(): if isinstance(task_metrics, dict): metrics[task_name] = task_metrics break # single results file expected except (json.JSONDecodeError, KeyError) as e: print(f"WARN: could not parse {jf}: {e}", file=sys.stderr) return metrics def compute_mean_accuracy(metrics: dict) -> float | None: """Compute mean accuracy across tasks.""" accuracies = [] for task_metrics in metrics.values(): if not isinstance(task_metrics, dict): continue for key in ["acc,none", "accuracy", "acc", "exact_match", "pass@1"]: if key in task_metrics: accuracies.append(task_metrics[key]) break return sum(accuracies) / len(accuracies) if accuracies else None # ─── Webhook receiver (Space mode) ──────────────────────────────────────────── def run_webhook_server(): """Run an HTTP server that receives webhook POSTs and spawns eval Jobs.""" from http.server import HTTPServer, BaseHTTPRequestHandler from huggingface_hub import run_job token = os.environ.get("HF_TOKEN") webhook_secret = os.environ.get("WEBHOOK_SECRET") namespace = os.environ.get("JOB_NAMESPACE", "latam-gpt") space_id = os.environ.get("JOB_SPACE_ID", "latam-gpt/eval-runner") flavor = os.environ.get("JOB_FLAVOR", "a100-large") timeout = os.environ.get("JOB_TIMEOUT", "3h") if not token: print("ERROR: HF_TOKEN must be set as a Space secret.", file=sys.stderr) sys.exit(1) class WebhookHandler(BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.end_headers() self.wfile.write(b"eval-runner webhook receiver is running") def do_POST(self): # Verify webhook secret if configured if webhook_secret: req_secret = self.headers.get("X-Webhook-Secret", "") if req_secret != webhook_secret: self.send_response(403) self.end_headers() self.wfile.write(b"Invalid webhook secret") return # Read payload content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") try: payload = json.loads(body) repo_type = payload.get("repo", {}).get("type", "") action = payload.get("event", {}).get("action", "") repo_name = payload.get("repo", {}).get("name", "unknown") print(f"Webhook received: {repo_type} {action} — {repo_name}") # Only spawn a Job for model updates if repo_type != "model" or action != "update": print(f"Skipping: {repo_type} {action}") self.send_response(200) self.end_headers() self.wfile.write(b"Skipped: not a model update") return # Spawn a GPU Job with the webhook payload print(f"Spawning eval Job for {repo_name}...") job = run_job( image=f"hf.co/spaces/{space_id}", command=["python3", "/app/entrypoint.py"], flavor=flavor, timeout=timeout, namespace=namespace, secrets={"HF_TOKEN": token}, env={"WEBHOOK_PAYLOAD": body}, token=token, ) print(f"Job spawned: {job.id} — {job.url}") self.send_response(200) self.end_headers() self.wfile.write(json.dumps({"job_id": job.id}).encode()) except Exception as e: print(f"ERROR handling webhook: {e}", file=sys.stderr) self.send_response(500) self.end_headers() self.wfile.write(str(e).encode()) def log_message(self, format, *args): print(f"[webhook] {args[0]}") port = int(os.environ.get("PORT", "7860")) server = HTTPServer(("0.0.0.0", port), WebhookHandler) print(f"Webhook receiver listening on port {port}") print(f" Namespace: {namespace} | Flavor: {flavor} | Timeout: {timeout}") print(f" Secret verification: {'enabled' if webhook_secret else 'disabled'}") server.serve_forever() # ─── Main ───────────────────────────────────────────────────────────────────── def main(): # 1. Parse webhook payload — if not set, run as webhook receiver payload_raw = os.environ.get("WEBHOOK_PAYLOAD") if not payload_raw: run_webhook_server() return # never reached (serve_forever) payload = json.loads(payload_raw) repo_type = payload["repo"]["type"] action = payload["event"]["action"] model_id = payload["repo"]["name"] sha = payload["repo"].get("headSha", "main") # 2. Filter: only model update events if repo_type != "model" or action != "update": print(f"Skipping: {repo_type} {action}") sys.exit(0) print(f"Evaluating {model_id} @ {sha[:8]}") token = os.environ["HF_TOKEN"] # 3. Fetch config.json (lightweight, no weights) resp = requests.get( f"https://huggingface.co/{model_id}/raw/{sha}/config.json", headers={"Authorization": f"Bearer {token}"}, timeout=30, ) if resp.status_code != 200: print(f"Could not fetch config.json (HTTP {resp.status_code}) — skipping.") sys.exit(0) config = resp.json() # 4. Estimate params — skip if outside 8B range try: total_params = estimate_params(config) except KeyError as e: print(f"Missing config field {e} — skipping.") sys.exit(0) print(f"Estimated params: {total_params / 1e9:.2f}B") if not (PARAM_MIN <= total_params <= PARAM_MAX): print(f"Skipping: {total_params / 1e9:.2f}B is outside {PARAM_MIN/1e9:.1f}B–{PARAM_MAX/1e9:.1f}B range.") sys.exit(0) # 5. Duplicate check — skip if results already exist api = HfApi(token=token) result_filename = f"{model_id.replace('/', '__')}__{sha[:8]}.json" try: existing = api.list_repo_files(repo_id=RESULTS_REPO, repo_type="dataset") if result_filename in existing: print(f"Results already exist: {result_filename} — skipping.") sys.exit(0) except Exception as e: print(f"WARN: could not check for existing results: {e}") # 6. Fetch eval.yaml from olmo_framework repo try: eval_config = fetch_eval_config(token) except Exception as e: print(f"ERROR: could not fetch eval.yaml: {e}", file=sys.stderr) sys.exit(1) suites_config = eval_config.get("suites", {}) # 7–8. Run evaluations results = {} for suite_name in EVAL_SUITES: suite_name = suite_name.strip() suite = suites_config.get(suite_name) if not suite: print(f"WARN: suite '{suite_name}' not found in eval.yaml — skipping.") results[suite_name] = {"error": f"suite not found in eval.yaml"} continue tasks = suite["tasks"] backend = suite.get("backend", "olmes") suite_output = OUTPUT_DIR / model_id.replace("/", "__") / suite_name print(f"\n{'─' * 60}") print(f"Suite: {suite_name} | Backend: {backend} | Tasks: {', '.join(tasks)}") print(f"{'─' * 60}") if backend == "lm_eval": success = run_lm_eval(model_id, tasks, suite_output) metrics = collect_lm_eval_results(suite_output) if success else {} else: success = run_olmes_eval(model_id, tasks, suite_output) metrics = collect_olmes_results(suite_output) if success else {} mean_acc = compute_mean_accuracy(metrics) suite_result = {"metrics": metrics} if mean_acc is not None: suite_result["mean_accuracy"] = mean_acc if not success: suite_result["error"] = "evaluation failed" results[suite_name] = suite_result print(f"Suite {suite_name}: {'OK' if success else 'FAILED'} | mean_accuracy={mean_acc}") # 9. Build summary summary = { "model": model_id, "sha": sha, "params_b": round(total_params / 1e9, 2), "results": results, } # 10. Upload to eval-results dataset print(f"\nUploading results to {RESULTS_REPO}/{result_filename}") try: api.upload_file( path_or_fileobj=json.dumps(summary, indent=2).encode(), path_in_repo=result_filename, repo_id=RESULTS_REPO, repo_type="dataset", ) print(f"Results uploaded: {result_filename}") except Exception as e: print(f"ERROR uploading results: {e}", file=sys.stderr) # Save locally as fallback fallback_path = OUTPUT_DIR / result_filename fallback_path.parent.mkdir(parents=True, exist_ok=True) fallback_path.write_text(json.dumps(summary, indent=2)) print(f"Results saved locally: {fallback_path}") print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()