Spaces:
Running
Running
| """ | |
| eval-runner entrypoint β Two modes: | |
| 1. WEBHOOK RECEIVER (Space mode): Runs an HTTP server on port 7860 that receives | |
| webhook POSTs from HF and spawns GPU Jobs with the payload. | |
| 2. EVALUATION (Job mode): When WEBHOOK_PAYLOAD is set, runs the eval pipeline. | |
| """ | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from pathlib import Path | |
| import requests | |
| import yaml | |
| from huggingface_hub import HfApi | |
| # βββ Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| EVAL_SUITES = os.environ.get("EVAL_SUITES", "post_training,post_training_es").split(",") | |
| EVAL_YAML_URL = os.environ.get( | |
| "EVAL_YAML_URL", | |
| "https://raw.githubusercontent.com/latam-gpt/olmo_framework/main/configs/eval.yaml", | |
| ) | |
| RESULTS_REPO = os.environ.get("RESULTS_REPO", "latam-gpt/eval-results") | |
| PARAM_MIN = 7.5e9 | |
| PARAM_MAX = 8.5e9 | |
| GPUS = int(os.environ.get("GPUS", "1")) | |
| OUTPUT_DIR = Path(os.environ.get("OUTPUT_DIR", "/tmp/eval_results")) | |
| # βββ Param estimation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def estimate_params(cfg: dict) -> int: | |
| """Estimate total parameters from a Llama-style config.json. | |
| Accounts for Q/K/V/O attention projections, SwiGLU FFN (3 matrices), | |
| and embeddings (doubled if tie_word_embeddings is false). | |
| """ | |
| H = cfg["hidden_size"] | |
| L = cfg["num_hidden_layers"] | |
| FFN = cfg["intermediate_size"] | |
| V = cfg["vocab_size"] | |
| nH = cfg["num_attention_heads"] | |
| nKV = cfg.get("num_key_value_heads", nH) | |
| D = cfg.get("head_dim", H // nH) | |
| attn = H * (nH * D) + H * (nKV * D) * 2 + (nH * D) * H # Q + K + V + O | |
| ffn = 3 * H * FFN # gate + up + down (SwiGLU) | |
| emb = V * H * (1 if cfg.get("tie_word_embeddings") else 2) | |
| return L * (attn + ffn) + emb | |
| # βββ Eval YAML fetching ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| LOCAL_EVAL_YAML = Path("/app/eval.yaml") | |
| def fetch_eval_config(token: str) -> dict: | |
| """Fetch eval.yaml from GitHub, falling back to local copy baked into the image.""" | |
| # Try remote first (picks up changes without rebuilding the image) | |
| try: | |
| headers = {"Authorization": f"Bearer {token}"} | |
| resp = requests.get(EVAL_YAML_URL, headers=headers, timeout=30) | |
| resp.raise_for_status() | |
| return yaml.safe_load(resp.text) | |
| except Exception as e: | |
| print(f"WARN: remote eval.yaml fetch failed ({e}), using local copy.") | |
| # Fall back to local copy | |
| if LOCAL_EVAL_YAML.exists(): | |
| return yaml.safe_load(LOCAL_EVAL_YAML.read_text()) | |
| raise RuntimeError("No eval.yaml available (remote fetch failed and no local copy).") | |
| # βββ CLI runners βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_olmes_eval(model_id: str, tasks: list[str], output_dir: Path) -> bool: | |
| """Run olmes CLI for a set of tasks. Returns True on success.""" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| model_args = {"trust_remote_code": True, "max_length": 2560} | |
| if GPUS > 1: | |
| model_args["tensor_parallel_size"] = GPUS | |
| cmd = [ | |
| "olmes", | |
| "--model", model_id, | |
| "--task", *tasks, | |
| "--output-dir", str(output_dir), | |
| "--model-type", "vllm", | |
| "--model-args", json.dumps(model_args), | |
| ] | |
| env = os.environ.copy() | |
| if "VLLM_USE_V1" not in env: | |
| env["VLLM_USE_V1"] = "1" | |
| env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" | |
| print(f"Running olmes: {' '.join(cmd)}") | |
| try: | |
| subprocess.run(cmd, check=True, env=env) | |
| return True | |
| except (subprocess.CalledProcessError, FileNotFoundError) as e: | |
| print(f"ERROR running olmes: {e}", file=sys.stderr) | |
| return False | |
| def run_lm_eval(model_id: str, tasks: list[str], output_dir: Path) -> bool: | |
| """Run lm_eval CLI for a set of tasks. Returns True on success.""" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| model_args = f"pretrained={model_id},tensor_parallel_size={GPUS},trust_remote_code=True" | |
| cmd = [ | |
| "lm_eval", | |
| "--model", "vllm", | |
| "--model_args", model_args, | |
| "--tasks", ",".join(tasks), | |
| "--output_path", str(output_dir), | |
| "--batch_size", "auto", | |
| "--device", "cuda", | |
| ] | |
| env = os.environ.copy() | |
| env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" | |
| print(f"Running lm_eval: {' '.join(cmd)}") | |
| try: | |
| subprocess.run(cmd, check=True, env=env) | |
| return True | |
| except (subprocess.CalledProcessError, FileNotFoundError) as e: | |
| print(f"ERROR running lm_eval: {e}", file=sys.stderr) | |
| return False | |
| # βββ Result collection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def collect_olmes_results(output_dir: Path) -> dict: | |
| """Collect olmes results from per-task JSON files.""" | |
| metrics = {} | |
| for jf in output_dir.glob("**/*.json"): | |
| if jf.name == "summary.json": | |
| continue | |
| try: | |
| with open(jf) as f: | |
| data = json.load(f) | |
| task_name = jf.stem | |
| if isinstance(data, dict): | |
| if "metrics" in data: | |
| metrics[task_name] = data["metrics"] | |
| elif "results" in data: | |
| metrics[task_name] = data["results"] | |
| else: | |
| metrics[task_name] = data | |
| except (json.JSONDecodeError, KeyError) as e: | |
| print(f"WARN: could not parse {jf}: {e}", file=sys.stderr) | |
| return metrics | |
| def collect_lm_eval_results(output_dir: Path) -> dict: | |
| """Collect lm-eval results from single JSON with 'results' key.""" | |
| metrics = {} | |
| for jf in output_dir.glob("**/*.json"): | |
| if jf.name == "summary.json": | |
| continue | |
| try: | |
| with open(jf) as f: | |
| data = json.load(f) | |
| if isinstance(data, dict) and "results" in data and isinstance(data["results"], dict): | |
| for task_name, task_metrics in data["results"].items(): | |
| if isinstance(task_metrics, dict): | |
| metrics[task_name] = task_metrics | |
| break # single results file expected | |
| except (json.JSONDecodeError, KeyError) as e: | |
| print(f"WARN: could not parse {jf}: {e}", file=sys.stderr) | |
| return metrics | |
| def compute_mean_accuracy(metrics: dict) -> float | None: | |
| """Compute mean accuracy across tasks.""" | |
| accuracies = [] | |
| for task_metrics in metrics.values(): | |
| if not isinstance(task_metrics, dict): | |
| continue | |
| for key in ["acc,none", "accuracy", "acc", "exact_match", "pass@1"]: | |
| if key in task_metrics: | |
| accuracies.append(task_metrics[key]) | |
| break | |
| return sum(accuracies) / len(accuracies) if accuracies else None | |
| # βββ Webhook receiver (Space mode) ββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_webhook_server(): | |
| """Run an HTTP server that receives webhook POSTs and spawns eval Jobs.""" | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| from huggingface_hub import run_job | |
| token = os.environ.get("HF_TOKEN") | |
| webhook_secret = os.environ.get("WEBHOOK_SECRET") | |
| namespace = os.environ.get("JOB_NAMESPACE", "latam-gpt") | |
| space_id = os.environ.get("JOB_SPACE_ID", "latam-gpt/eval-runner") | |
| flavor = os.environ.get("JOB_FLAVOR", "a100-large") | |
| timeout = os.environ.get("JOB_TIMEOUT", "3h") | |
| if not token: | |
| print("ERROR: HF_TOKEN must be set as a Space secret.", file=sys.stderr) | |
| sys.exit(1) | |
| class WebhookHandler(BaseHTTPRequestHandler): | |
| def do_GET(self): | |
| self.send_response(200) | |
| self.end_headers() | |
| self.wfile.write(b"eval-runner webhook receiver is running") | |
| def do_POST(self): | |
| # Verify webhook secret if configured | |
| if webhook_secret: | |
| req_secret = self.headers.get("X-Webhook-Secret", "") | |
| if req_secret != webhook_secret: | |
| self.send_response(403) | |
| self.end_headers() | |
| self.wfile.write(b"Invalid webhook secret") | |
| return | |
| # Read payload | |
| content_length = int(self.headers.get("Content-Length", 0)) | |
| body = self.rfile.read(content_length).decode("utf-8") | |
| try: | |
| payload = json.loads(body) | |
| repo_type = payload.get("repo", {}).get("type", "") | |
| action = payload.get("event", {}).get("action", "") | |
| repo_name = payload.get("repo", {}).get("name", "unknown") | |
| print(f"Webhook received: {repo_type} {action} β {repo_name}") | |
| # Only spawn a Job for model updates | |
| if repo_type != "model" or action != "update": | |
| print(f"Skipping: {repo_type} {action}") | |
| self.send_response(200) | |
| self.end_headers() | |
| self.wfile.write(b"Skipped: not a model update") | |
| return | |
| # Spawn a GPU Job with the webhook payload | |
| print(f"Spawning eval Job for {repo_name}...") | |
| job = run_job( | |
| image=f"hf.co/spaces/{space_id}", | |
| command=["python3", "/app/entrypoint.py"], | |
| flavor=flavor, | |
| timeout=timeout, | |
| namespace=namespace, | |
| secrets={"HF_TOKEN": token}, | |
| env={"WEBHOOK_PAYLOAD": body}, | |
| token=token, | |
| ) | |
| print(f"Job spawned: {job.id} β {job.url}") | |
| self.send_response(200) | |
| self.end_headers() | |
| self.wfile.write(json.dumps({"job_id": job.id}).encode()) | |
| except Exception as e: | |
| print(f"ERROR handling webhook: {e}", file=sys.stderr) | |
| self.send_response(500) | |
| self.end_headers() | |
| self.wfile.write(str(e).encode()) | |
| def log_message(self, format, *args): | |
| print(f"[webhook] {args[0]}") | |
| port = int(os.environ.get("PORT", "7860")) | |
| server = HTTPServer(("0.0.0.0", port), WebhookHandler) | |
| print(f"Webhook receiver listening on port {port}") | |
| print(f" Namespace: {namespace} | Flavor: {flavor} | Timeout: {timeout}") | |
| print(f" Secret verification: {'enabled' if webhook_secret else 'disabled'}") | |
| server.serve_forever() | |
| # βββ Main βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| # 1. Parse webhook payload β if not set, run as webhook receiver | |
| payload_raw = os.environ.get("WEBHOOK_PAYLOAD") | |
| if not payload_raw: | |
| run_webhook_server() | |
| return # never reached (serve_forever) | |
| payload = json.loads(payload_raw) | |
| repo_type = payload["repo"]["type"] | |
| action = payload["event"]["action"] | |
| model_id = payload["repo"]["name"] | |
| sha = payload["repo"].get("headSha", "main") | |
| # 2. Filter: only model update events | |
| if repo_type != "model" or action != "update": | |
| print(f"Skipping: {repo_type} {action}") | |
| sys.exit(0) | |
| print(f"Evaluating {model_id} @ {sha[:8]}") | |
| token = os.environ["HF_TOKEN"] | |
| # 3. Fetch config.json (lightweight, no weights) | |
| resp = requests.get( | |
| f"https://huggingface.co/{model_id}/raw/{sha}/config.json", | |
| headers={"Authorization": f"Bearer {token}"}, | |
| timeout=30, | |
| ) | |
| if resp.status_code != 200: | |
| print(f"Could not fetch config.json (HTTP {resp.status_code}) β skipping.") | |
| sys.exit(0) | |
| config = resp.json() | |
| # 4. Estimate params β skip if outside 8B range | |
| try: | |
| total_params = estimate_params(config) | |
| except KeyError as e: | |
| print(f"Missing config field {e} β skipping.") | |
| sys.exit(0) | |
| print(f"Estimated params: {total_params / 1e9:.2f}B") | |
| if not (PARAM_MIN <= total_params <= PARAM_MAX): | |
| print(f"Skipping: {total_params / 1e9:.2f}B is outside {PARAM_MIN/1e9:.1f}Bβ{PARAM_MAX/1e9:.1f}B range.") | |
| sys.exit(0) | |
| # 5. Duplicate check β skip if results already exist | |
| api = HfApi(token=token) | |
| result_filename = f"{model_id.replace('/', '__')}__{sha[:8]}.json" | |
| try: | |
| existing = api.list_repo_files(repo_id=RESULTS_REPO, repo_type="dataset") | |
| if result_filename in existing: | |
| print(f"Results already exist: {result_filename} β skipping.") | |
| sys.exit(0) | |
| except Exception as e: | |
| print(f"WARN: could not check for existing results: {e}") | |
| # 6. Fetch eval.yaml from olmo_framework repo | |
| try: | |
| eval_config = fetch_eval_config(token) | |
| except Exception as e: | |
| print(f"ERROR: could not fetch eval.yaml: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| suites_config = eval_config.get("suites", {}) | |
| # 7β8. Run evaluations | |
| results = {} | |
| for suite_name in EVAL_SUITES: | |
| suite_name = suite_name.strip() | |
| suite = suites_config.get(suite_name) | |
| if not suite: | |
| print(f"WARN: suite '{suite_name}' not found in eval.yaml β skipping.") | |
| results[suite_name] = {"error": f"suite not found in eval.yaml"} | |
| continue | |
| tasks = suite["tasks"] | |
| backend = suite.get("backend", "olmes") | |
| suite_output = OUTPUT_DIR / model_id.replace("/", "__") / suite_name | |
| print(f"\n{'β' * 60}") | |
| print(f"Suite: {suite_name} | Backend: {backend} | Tasks: {', '.join(tasks)}") | |
| print(f"{'β' * 60}") | |
| if backend == "lm_eval": | |
| success = run_lm_eval(model_id, tasks, suite_output) | |
| metrics = collect_lm_eval_results(suite_output) if success else {} | |
| else: | |
| success = run_olmes_eval(model_id, tasks, suite_output) | |
| metrics = collect_olmes_results(suite_output) if success else {} | |
| mean_acc = compute_mean_accuracy(metrics) | |
| suite_result = {"metrics": metrics} | |
| if mean_acc is not None: | |
| suite_result["mean_accuracy"] = mean_acc | |
| if not success: | |
| suite_result["error"] = "evaluation failed" | |
| results[suite_name] = suite_result | |
| print(f"Suite {suite_name}: {'OK' if success else 'FAILED'} | mean_accuracy={mean_acc}") | |
| # 9. Build summary | |
| summary = { | |
| "model": model_id, | |
| "sha": sha, | |
| "params_b": round(total_params / 1e9, 2), | |
| "results": results, | |
| } | |
| # 10. Upload to eval-results dataset | |
| print(f"\nUploading results to {RESULTS_REPO}/{result_filename}") | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=json.dumps(summary, indent=2).encode(), | |
| path_in_repo=result_filename, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| ) | |
| print(f"Results uploaded: {result_filename}") | |
| except Exception as e: | |
| print(f"ERROR uploading results: {e}", file=sys.stderr) | |
| # Save locally as fallback | |
| fallback_path = OUTPUT_DIR / result_filename | |
| fallback_path.parent.mkdir(parents=True, exist_ok=True) | |
| fallback_path.write_text(json.dumps(summary, indent=2)) | |
| print(f"Results saved locally: {fallback_path}") | |
| print(json.dumps(summary, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |