Spaces:
Sleeping
Sleeping
| """ | |
| Long-lived Modal GPU worker — reuse one warm container for many finetune / eval runs. | |
| Deploy once (enables min_containers warm pool across separate CLI invocations): | |
| modal deploy research/modal/server_app.py | |
| Default: keep a GPU worker alive for several hours (blocks local terminal): | |
| modal run research/modal/server_app.py | |
| modal run research/modal/server_app.py --hours 6 | |
| Detached keep-alive (local terminal free): | |
| modal run -d research/modal/server_app.py --hours 6 | |
| Run the skill-matrix pipeline on the warm worker (separate terminal, same | |
| container when deployed) — per-profile baselines -> finetune -> eval -> gate -> publish: | |
| modal run research/modal/server_app.py --job math-lora --max-steps 20 | |
| modal run research/modal/server_app.py --category science | |
| modal run research/modal/server_app.py --pipeline --no-publish | |
| modal run research/modal/server_app.py --eval-only --job math-lora | |
| modal run research/modal/server_app.py --publish-only --job math-lora | |
| modal run research/modal/server_app.py --cmd "uv run python research/finetune.py --help" | |
| Stop deployed app: | |
| modal app stop slm-gpu-worker | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import shlex | |
| import subprocess | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| import modal | |
| # Make `_common` importable both locally (sibling file) and in the Modal | |
| # container, where the entrypoint lands at /root but the repo is baked into the | |
| # image at /repo (see add_local_dir in _common.py). | |
| for _candidate in (Path(__file__).resolve().parent, Path("/repo/research/modal")): | |
| if _candidate.is_dir() and str(_candidate) not in sys.path: | |
| sys.path.insert(0, str(_candidate)) | |
| from _common import ( # noqa: E402 | |
| DEFAULT_GPU, | |
| DEFAULT_KEEPALIVE_HOURS, | |
| DEFAULT_SCALEDOWN_WINDOW, | |
| DEFAULT_WORKER_TIMEOUT, | |
| FINETUNE_VOL_PATH, | |
| HF_CACHE_PATH, | |
| LM_EVAL_OUTPUT, | |
| baseline_experiment_name, | |
| baseline_profiles_for_jobs, | |
| build_finetune_cmd, | |
| build_lm_eval_cmd, | |
| check_gate_files, | |
| check_publish_gate_files, | |
| commit_volumes, | |
| config_for_profile, | |
| discover_cached_baselines, | |
| eval_paths, | |
| finetune_vol, | |
| general_eval_profile, | |
| general_goals_for_job, | |
| hf_cache_vol, | |
| hf_secret, | |
| image, | |
| job_plan_rows, | |
| parse_json_object, | |
| prepare_jobs, | |
| profiles_needing_baseline_run, | |
| resolve_base_model_id, | |
| split_csv, | |
| publish_adapter_files, | |
| pull_artifacts, | |
| reload_finetune_volume, | |
| reload_volumes, | |
| repo_env, | |
| ) | |
| APP_NAME = "slm-gpu-worker" | |
| app = modal.App(APP_NAME, image=image) | |
| class GpuWorker: | |
| """Single warm GPU container for sequential finetune / lm-eval / shell commands.""" | |
| def startup(self) -> None: | |
| reload_volumes() | |
| print( | |
| f"GpuWorker ready (HF cache={HF_CACHE_PATH}, finetune vol={FINETUNE_VOL_PATH})" | |
| ) | |
| def ping(self) -> dict[str, str]: | |
| return {"status": "ok", "app": APP_NAME} | |
| def keep_alive(self, hours: float = DEFAULT_KEEPALIVE_HOURS) -> dict[str, Any]: | |
| """Hold this container open; cheap heartbeat so scaledown_window stays fresh.""" | |
| deadline = time.time() + hours * 3600 | |
| ticks = 0 | |
| while time.time() < deadline: | |
| remaining = int(deadline - time.time()) | |
| if ticks % 5 == 0: | |
| print(f"keep_alive: {remaining}s remaining") | |
| time.sleep(60) | |
| ticks += 1 | |
| return {"status": "done", "hours": hours} | |
| def exec_cmd(self, argv: list[str], cwd: str = "/repo") -> dict[str, Any]: | |
| """Run an arbitrary command in the repo (same env as finetune.py).""" | |
| print("Running:", " ".join(argv)) | |
| proc = subprocess.run( | |
| argv, | |
| cwd=cwd, | |
| check=False, | |
| env=repo_env(), | |
| capture_output=True, | |
| text=True, | |
| ) | |
| commit_volumes() | |
| return { | |
| "argv": argv, | |
| "exit_code": proc.returncode, | |
| "ok": proc.returncode == 0, | |
| "stdout": proc.stdout, | |
| "stderr": proc.stderr, | |
| } | |
| def finetune(self, job: dict[str, Any]) -> dict[str, Any]: | |
| """Fine-tune one dataset job via research/finetune.py.""" | |
| name = job["name"] | |
| out_dir = f"{FINETUNE_VOL_PATH}/{name}" | |
| Path(out_dir).mkdir(parents=True, exist_ok=True) | |
| cmd = build_finetune_cmd(job, out_dir) | |
| print("Running:", " ".join(cmd)) | |
| subprocess.run(cmd, cwd="/repo", check=True, env=repo_env()) | |
| commit_volumes() | |
| results_path = Path(out_dir) / "training_results.json" | |
| payload = json.loads(results_path.read_text()) | |
| payload["job_name"] = name | |
| payload["output_dir"] = out_dir | |
| return payload | |
| def lm_eval( | |
| self, | |
| *, | |
| experiment_name: str, | |
| config: str = "research/evals/configs/lm_eval_smoke.yaml", | |
| preset: str | None = None, | |
| model_path: str | None = None, | |
| adapter_path: str | None = None, | |
| compare_to: str | None = None, | |
| tasks: list[str] | None = None, | |
| limit: int | None = None, | |
| num_fewshot: int | None = None, | |
| batch_size: str | None = None, | |
| device: str | None = None, | |
| dtype: str | None = None, | |
| seed: int | None = None, | |
| ) -> dict[str, Any]: | |
| """Run slm-lm-eval on base model or finetuned checkpoint.""" | |
| # Pick up adapters committed by another container (e.g. a separate | |
| # eval-only invocation) — the warm container's mount may predate them. | |
| # Only finetune_vol is needed here; reloading hf-cache can fail when | |
| # hf-xet keeps log files open on the warm container's HF cache mount. | |
| reload_finetune_volume() | |
| if adapter_path: | |
| adapter_dir = Path(adapter_path) | |
| adapter_cfg = adapter_dir / "adapter_config.json" | |
| if not adapter_cfg.is_file(): | |
| raise FileNotFoundError( | |
| f"LoRA adapter not visible at {adapter_path} " | |
| f"(missing {adapter_cfg.name})." | |
| ) | |
| cmd = build_lm_eval_cmd( | |
| experiment_name=experiment_name, | |
| config=config, | |
| preset=preset, | |
| model_path=model_path, | |
| adapter_path=adapter_path, | |
| compare_to=compare_to, | |
| tasks=tasks, | |
| limit=limit, | |
| num_fewshot=num_fewshot, | |
| batch_size=batch_size, | |
| device=device, | |
| dtype=dtype, | |
| seed=seed, | |
| ) | |
| print("Running:", " ".join(cmd)) | |
| proc = subprocess.run(cmd, cwd="/repo", check=False, env=repo_env()) | |
| commit_volumes() | |
| out_root = Path(LM_EVAL_OUTPUT) / experiment_name | |
| results_json = out_root / "results.json" | |
| summary_md = out_root / "summary.md" | |
| comparison_md = out_root / "comparison.md" | |
| return { | |
| "experiment_name": experiment_name, | |
| "config": config, | |
| "preset": preset, | |
| "model_path": model_path, | |
| "adapter_path": adapter_path, | |
| "compare_to": compare_to, | |
| "tasks": tasks, | |
| "limit": limit, | |
| "num_fewshot": num_fewshot, | |
| "batch_size": batch_size, | |
| "device": device, | |
| "dtype": dtype, | |
| "seed": seed, | |
| "results_json": str(results_json), | |
| "summary_md": str(summary_md), | |
| "comparison_md": str(comparison_md) if comparison_md.is_file() else None, | |
| "exit_code": proc.returncode, | |
| "ok": proc.returncode == 0, | |
| } | |
| def check_gate( | |
| self, | |
| *, | |
| candidate_results_path: str, | |
| baseline_results_path: str | None, | |
| goals: dict[str, Any], | |
| general_candidate_results_path: str | None = None, | |
| general_baseline_results_path: str | None = None, | |
| general_goals: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """Check skill + general lm-eval results against publish goals.""" | |
| if general_goals: | |
| return check_publish_gate_files( | |
| skill_candidate_path=candidate_results_path, | |
| skill_baseline_path=baseline_results_path, | |
| skill_goals=goals, | |
| general_candidate_path=general_candidate_results_path, | |
| general_baseline_path=general_baseline_results_path, | |
| general_goals=general_goals, | |
| ) | |
| return check_gate_files( | |
| candidate_results_path=candidate_results_path, | |
| baseline_results_path=baseline_results_path, | |
| goals=goals, | |
| ) | |
| def publish_adapter( | |
| self, | |
| *, | |
| job: dict[str, Any], | |
| adapter_dir: str, | |
| gate_result: dict[str, Any], | |
| candidate_results_path: str, | |
| baseline_results_path: str | None, | |
| ) -> dict[str, Any]: | |
| """Write a model card and push the adapter to the Hub, but only if the gate passed.""" | |
| return publish_adapter_files( | |
| job=job, | |
| adapter_dir=adapter_dir, | |
| gate_result=gate_result, | |
| candidate_results_path=candidate_results_path, | |
| baseline_results_path=baseline_results_path, | |
| ) | |
| def run_pipeline( | |
| self, | |
| *, | |
| job_names: list[str] | None = None, | |
| category: str | None = None, | |
| sector: str | None = None, | |
| usecase: str | None = None, | |
| profiles: list[str] | None = None, | |
| max_steps: int | None = None, | |
| max_samples: int | None = None, | |
| finetune_overrides: dict[str, Any] | None = None, | |
| train: bool = True, | |
| eval_only: bool = False, | |
| eval_tasks: list[str] | None = None, | |
| eval_limit: int | None = None, | |
| eval_num_fewshot: int | None = None, | |
| eval_batch_size: str | None = None, | |
| eval_device: str | None = None, | |
| eval_dtype: str | None = None, | |
| eval_seed: int | None = None, | |
| skip_baseline: bool = False, | |
| publish: bool = True, | |
| plan_only: bool = False, | |
| ) -> dict[str, Any]: | |
| """Per-profile baselines -> finetune -> eval -> gate -> publish (same container).""" | |
| defaults, prepared = prepare_jobs( | |
| job=None, | |
| category=category, | |
| sector=sector, | |
| usecase=usecase, | |
| profiles=profiles, | |
| max_steps=max_steps, | |
| max_samples=max_samples, | |
| finetune_overrides=finetune_overrides, | |
| ) | |
| if job_names: | |
| wanted = set(job_names) | |
| prepared = [j for j in prepared if j.get("name") in wanted] | |
| if not prepared: | |
| raise ValueError(f"No matching jobs in experiments.yaml: {job_names}") | |
| if not prepared: | |
| raise ValueError("No jobs matched the requested filters") | |
| preset = defaults.get("preset", "minicpm5-1b") | |
| profile_names = baseline_profiles_for_jobs(prepared, defaults) | |
| plan = job_plan_rows(prepared) | |
| if plan_only: | |
| return {"preset": preset, "jobs": plan} | |
| baselines_ok = discover_cached_baselines( | |
| profile_names, | |
| preset=preset, | |
| eval_tasks=eval_tasks, | |
| eval_limit=eval_limit, | |
| eval_num_fewshot=eval_num_fewshot, | |
| eval_seed=eval_seed, | |
| ) | |
| missing_baselines = profiles_needing_baseline_run( | |
| profile_names, baselines_ok, skip_baseline=skip_baseline | |
| ) | |
| for profile in missing_baselines: | |
| exp = baseline_experiment_name(preset, profile) | |
| cfg_path = config_for_profile(profile) | |
| result = self.lm_eval.local( | |
| experiment_name=exp, | |
| config=cfg_path, | |
| preset=preset, | |
| tasks=eval_tasks, | |
| limit=eval_limit, | |
| num_fewshot=eval_num_fewshot, | |
| batch_size=eval_batch_size, | |
| device=eval_device, | |
| dtype=eval_dtype, | |
| seed=eval_seed, | |
| ) | |
| baselines_ok[profile] = bool(result.get("ok")) | |
| train_results: dict[str, dict[str, Any]] = {} | |
| if train and not eval_only: | |
| for j in prepared: | |
| train_results[j["name"]] = self.finetune.local(j) | |
| rows: list[dict[str, Any]] = [] | |
| gen_profile = general_eval_profile(defaults) | |
| for j in prepared: | |
| job_name = j["name"] | |
| profile = j.get("eval_profile", "compare_study") | |
| train_payload = train_results.get(job_name) | |
| adapter_path = ( | |
| train_payload["output_dir"] | |
| if train_payload | |
| else f"{FINETUNE_VOL_PATH}/{job_name}" | |
| ) | |
| baseline_path = f"{LM_EVAL_OUTPUT}/{baseline_experiment_name(preset, profile)}/results.json" | |
| compare_to = baseline_path if baselines_ok.get(profile) else None | |
| base_model_id = resolve_base_model_id(j, defaults) | |
| exp_name = f"{job_name}__{profile}" | |
| eval_result = self.lm_eval.local( | |
| experiment_name=exp_name, | |
| config=config_for_profile(profile), | |
| model_path=base_model_id, | |
| adapter_path=adapter_path, | |
| compare_to=compare_to, | |
| tasks=eval_tasks, | |
| limit=eval_limit, | |
| num_fewshot=eval_num_fewshot, | |
| batch_size=eval_batch_size, | |
| device=eval_device, | |
| dtype=eval_dtype, | |
| seed=eval_seed, | |
| ) | |
| general_goals = general_goals_for_job(j, defaults) | |
| general_eval_result: dict[str, Any] | None = None | |
| general_candidate_path: str | None = None | |
| general_baseline_path: str | None = None | |
| if general_goals: | |
| general_baseline_path = ( | |
| f"{LM_EVAL_OUTPUT}/{baseline_experiment_name(preset, gen_profile)}/results.json" | |
| ) | |
| gen_compare_to = ( | |
| general_baseline_path if baselines_ok.get(gen_profile) else None | |
| ) | |
| gen_exp_name = f"{job_name}__{gen_profile}" | |
| general_eval_result = self.lm_eval.local( | |
| experiment_name=gen_exp_name, | |
| config=config_for_profile(gen_profile), | |
| model_path=base_model_id, | |
| adapter_path=adapter_path, | |
| compare_to=gen_compare_to, | |
| tasks=eval_tasks, | |
| limit=eval_limit, | |
| num_fewshot=eval_num_fewshot, | |
| batch_size=eval_batch_size, | |
| device=eval_device, | |
| dtype=eval_dtype, | |
| seed=eval_seed, | |
| ) | |
| general_candidate_path = general_eval_result["results_json"] | |
| row: dict[str, Any] = { | |
| "name": job_name, | |
| "category": j.get("category"), | |
| "profile": profile, | |
| "general_profile": gen_profile if general_goals else None, | |
| "plan": next((p for p in plan if p["name"] == job_name), None), | |
| "eval": eval_result, | |
| } | |
| if general_eval_result: | |
| row["general_eval"] = general_eval_result | |
| gate_result: dict[str, Any] | None = None | |
| if j.get("goals"): | |
| skill_ok = bool(eval_result.get("ok")) | |
| general_ok = ( | |
| not general_goals | |
| or bool(general_eval_result and general_eval_result.get("ok")) | |
| ) | |
| if skill_ok and general_ok: | |
| gate_result = self.check_gate.local( | |
| candidate_results_path=eval_result["results_json"], | |
| baseline_results_path=baseline_path, | |
| goals=j["goals"], | |
| general_candidate_results_path=general_candidate_path, | |
| general_baseline_results_path=general_baseline_path, | |
| general_goals=general_goals, | |
| ) | |
| row["gate"] = gate_result | |
| if j.get("publish") and publish and gate_result is not None: | |
| row["publish"] = self.publish_adapter.local( | |
| job=j, | |
| adapter_dir=adapter_path, | |
| gate_result=gate_result, | |
| candidate_results_path=eval_result["results_json"], | |
| baseline_results_path=baseline_path, | |
| ) | |
| rows.append(row) | |
| return {"jobs": rows} | |
| def _worker() -> GpuWorker: | |
| """Prefer deployed warm worker; fall back to ephemeral cls for first deploy.""" | |
| try: | |
| cls = modal.Cls.from_name(APP_NAME, "GpuWorker") | |
| return cls() | |
| except modal.exception.NotFoundError: | |
| return GpuWorker() | |
| def main( | |
| serve: bool = True, | |
| hours: float = DEFAULT_KEEPALIVE_HOURS, | |
| cmd: str | None = None, | |
| job: str | None = None, | |
| category: str | None = None, | |
| sector: str | None = None, | |
| usecase: str | None = None, | |
| profiles: str | None = None, | |
| max_steps: int | None = None, | |
| max_samples: int | None = None, | |
| finetune_args_json: str | None = None, | |
| eval_only: bool = False, | |
| pipeline: bool = False, | |
| publish: bool = True, | |
| publish_only: bool = False, | |
| pull: bool = True, | |
| ping: bool = False, | |
| plan: bool = False, | |
| skip_baseline: bool = False, | |
| eval_tasks: str | None = None, | |
| eval_limit: int | None = None, | |
| eval_num_fewshot: int | None = None, | |
| eval_batch_size: str | None = None, | |
| eval_device: str | None = None, | |
| eval_dtype: str | None = None, | |
| eval_seed: int | None = None, | |
| ): | |
| """ | |
| GPU worker CLI. | |
| With no task flags, keeps one container alive (default). With --job/--category, | |
| --cmd, --eval-only, --pipeline, or --publish-only, runs that task on the warm | |
| worker instead. --pipeline (and --job/--category/--eval-only) run the skill-matrix | |
| pipeline: per-profile baselines -> finetune -> eval -> gate -> publish. | |
| Examples: | |
| modal deploy research/modal/server_app.py | |
| modal run research/modal/server_app.py | |
| modal run research/modal/server_app.py --pipeline --job math-lora --max-steps 20 | |
| modal run research/modal/server_app.py --pipeline --category science --no-publish | |
| modal run research/modal/server_app.py --pipeline --sector science --eval-limit 25 | |
| modal run research/modal/server_app.py --plan --profiles math,science | |
| modal run research/modal/server_app.py --eval-only --job math-lora | |
| modal run research/modal/server_app.py --publish-only --job math-lora | |
| modal run research/modal/server_app.py --cmd "uv run python research/finetune.py --help" | |
| """ | |
| has_task = bool( | |
| cmd | |
| or job | |
| or category | |
| or sector | |
| or usecase | |
| or profiles | |
| or eval_only | |
| or pipeline | |
| or publish_only | |
| or ping | |
| or plan | |
| ) | |
| if has_task: | |
| serve = False | |
| worker = _worker() | |
| if ping: | |
| print(json.dumps(worker.ping.remote(), indent=2)) | |
| return | |
| if cmd: | |
| argv = shlex.split(cmd) | |
| result = worker.exec_cmd.remote(argv) | |
| if result.get("stdout"): | |
| print(result["stdout"], end="") | |
| if result.get("stderr"): | |
| print(result["stderr"], end="", file=__import__("sys").stderr) | |
| if not result.get("ok"): | |
| raise SystemExit(result.get("exit_code", 1)) | |
| return | |
| if publish_only: | |
| if not job: | |
| raise SystemExit("--publish-only requires --job") | |
| defaults, prepared = prepare_jobs(job=job) | |
| j = prepared[0] | |
| if not j.get("goals") or not j.get("publish"): | |
| raise SystemExit(f"Job {job!r} needs `goals` and `publish` in experiments.yaml") | |
| preset = defaults.get("preset", "minicpm5-1b") | |
| profile = j.get("eval_profile", "compare_study") | |
| gen_profile = general_eval_profile(defaults) | |
| general_goals = general_goals_for_job(j, defaults) | |
| adapter_path = f"{FINETUNE_VOL_PATH}/{job}" | |
| candidate_results_path, baseline_results_path, _ = eval_paths( | |
| job_name=job, preset=preset, profile=profile | |
| ) | |
| general_candidate_path = None | |
| general_baseline_path = None | |
| if general_goals: | |
| general_candidate_path, general_baseline_path, _ = eval_paths( | |
| job_name=job, preset=preset, profile=gen_profile | |
| ) | |
| gate_result = worker.check_gate.remote( | |
| candidate_results_path=candidate_results_path, | |
| baseline_results_path=baseline_results_path, | |
| goals=j["goals"], | |
| general_candidate_results_path=general_candidate_path, | |
| general_baseline_results_path=general_baseline_path, | |
| general_goals=general_goals, | |
| ) | |
| print(json.dumps(gate_result, indent=2)) | |
| result = worker.publish_adapter.remote( | |
| job=j, | |
| adapter_dir=adapter_path, | |
| gate_result=gate_result, | |
| candidate_results_path=candidate_results_path, | |
| baseline_results_path=baseline_results_path, | |
| ) | |
| print(json.dumps(result, indent=2)) | |
| return | |
| if pipeline or job or category or sector or usecase or profiles or eval_only or plan: | |
| job_names = [job] if job else None | |
| result = worker.run_pipeline.remote( | |
| job_names=job_names, | |
| category=category, | |
| sector=sector, | |
| usecase=usecase, | |
| profiles=split_csv(profiles), | |
| max_steps=max_steps, | |
| max_samples=max_samples, | |
| finetune_overrides=parse_json_object( | |
| finetune_args_json, flag="--finetune-args-json" | |
| ), | |
| train=not eval_only, | |
| eval_only=eval_only, | |
| eval_tasks=split_csv(eval_tasks), | |
| eval_limit=eval_limit, | |
| eval_num_fewshot=eval_num_fewshot, | |
| eval_batch_size=eval_batch_size, | |
| eval_device=eval_device, | |
| eval_dtype=eval_dtype, | |
| eval_seed=eval_seed, | |
| skip_baseline=skip_baseline, | |
| publish=publish, | |
| plan_only=plan, | |
| ) | |
| print(json.dumps(result, indent=2)) | |
| if plan: | |
| return | |
| if pull: | |
| for row in result.get("jobs", []): | |
| pull_artifacts(row["name"], f"{row['name']}__{row['profile']}") | |
| if row.get("general_profile"): | |
| pull_artifacts( | |
| row["name"], | |
| f"{row['name']}__{row['general_profile']}", | |
| dest="models/finetuned", | |
| ) | |
| return | |
| if serve: | |
| print( | |
| f"Keeping GpuWorker alive for {hours}h " | |
| f"(deploy with `modal deploy` so other terminals reuse this container)" | |
| ) | |
| worker.ping.remote() | |
| result = worker.keep_alive.remote(hours=hours) | |
| print(json.dumps(result, indent=2)) | |
| return | |
| raise SystemExit( | |
| "Nothing to do. Use default serve mode, or pass --job, --category, --cmd, " | |
| "--pipeline, --eval-only, --publish-only, or --ping." | |
| ) | |