lesson-agent-dev / research /modal /server_app.py
MSG
Feat/last sprintos (#23)
28543d3
Raw
History Blame Contribute Delete
24 kB
"""
Long-lived Modal GPU worker — reuse one warm container for many finetune / eval runs.
Deploy once (enables min_containers warm pool across separate CLI invocations):
modal deploy research/modal/server_app.py
Default: keep a GPU worker alive for several hours (blocks local terminal):
modal run research/modal/server_app.py
modal run research/modal/server_app.py --hours 6
Detached keep-alive (local terminal free):
modal run -d research/modal/server_app.py --hours 6
Run the skill-matrix pipeline on the warm worker (separate terminal, same
container when deployed) — per-profile baselines -> finetune -> eval -> gate -> publish:
modal run research/modal/server_app.py --job math-lora --max-steps 20
modal run research/modal/server_app.py --category science
modal run research/modal/server_app.py --pipeline --no-publish
modal run research/modal/server_app.py --eval-only --job math-lora
modal run research/modal/server_app.py --publish-only --job math-lora
modal run research/modal/server_app.py --cmd "uv run python research/finetune.py --help"
Stop deployed app:
modal app stop slm-gpu-worker
"""
from __future__ import annotations
import json
import shlex
import subprocess
import sys
import time
from pathlib import Path
from typing import Any
import modal
# Make `_common` importable both locally (sibling file) and in the Modal
# container, where the entrypoint lands at /root but the repo is baked into the
# image at /repo (see add_local_dir in _common.py).
for _candidate in (Path(__file__).resolve().parent, Path("/repo/research/modal")):
if _candidate.is_dir() and str(_candidate) not in sys.path:
sys.path.insert(0, str(_candidate))
from _common import ( # noqa: E402
DEFAULT_GPU,
DEFAULT_KEEPALIVE_HOURS,
DEFAULT_SCALEDOWN_WINDOW,
DEFAULT_WORKER_TIMEOUT,
FINETUNE_VOL_PATH,
HF_CACHE_PATH,
LM_EVAL_OUTPUT,
baseline_experiment_name,
baseline_profiles_for_jobs,
build_finetune_cmd,
build_lm_eval_cmd,
check_gate_files,
check_publish_gate_files,
commit_volumes,
config_for_profile,
discover_cached_baselines,
eval_paths,
finetune_vol,
general_eval_profile,
general_goals_for_job,
hf_cache_vol,
hf_secret,
image,
job_plan_rows,
parse_json_object,
prepare_jobs,
profiles_needing_baseline_run,
resolve_base_model_id,
split_csv,
publish_adapter_files,
pull_artifacts,
reload_finetune_volume,
reload_volumes,
repo_env,
)
APP_NAME = "slm-gpu-worker"
app = modal.App(APP_NAME, image=image)
@app.cls(
gpu=DEFAULT_GPU,
volumes={
HF_CACHE_PATH: hf_cache_vol,
FINETUNE_VOL_PATH: finetune_vol,
},
secrets=[hf_secret],
timeout=DEFAULT_WORKER_TIMEOUT,
scaledown_window=DEFAULT_SCALEDOWN_WINDOW,
min_containers=1,
max_containers=1, # single warm container; serialize work, never sprawl
)
class GpuWorker:
"""Single warm GPU container for sequential finetune / lm-eval / shell commands."""
@modal.enter()
def startup(self) -> None:
reload_volumes()
print(
f"GpuWorker ready (HF cache={HF_CACHE_PATH}, finetune vol={FINETUNE_VOL_PATH})"
)
@modal.method()
def ping(self) -> dict[str, str]:
return {"status": "ok", "app": APP_NAME}
@modal.method()
def keep_alive(self, hours: float = DEFAULT_KEEPALIVE_HOURS) -> dict[str, Any]:
"""Hold this container open; cheap heartbeat so scaledown_window stays fresh."""
deadline = time.time() + hours * 3600
ticks = 0
while time.time() < deadline:
remaining = int(deadline - time.time())
if ticks % 5 == 0:
print(f"keep_alive: {remaining}s remaining")
time.sleep(60)
ticks += 1
return {"status": "done", "hours": hours}
@modal.method()
def exec_cmd(self, argv: list[str], cwd: str = "/repo") -> dict[str, Any]:
"""Run an arbitrary command in the repo (same env as finetune.py)."""
print("Running:", " ".join(argv))
proc = subprocess.run(
argv,
cwd=cwd,
check=False,
env=repo_env(),
capture_output=True,
text=True,
)
commit_volumes()
return {
"argv": argv,
"exit_code": proc.returncode,
"ok": proc.returncode == 0,
"stdout": proc.stdout,
"stderr": proc.stderr,
}
@modal.method()
def finetune(self, job: dict[str, Any]) -> dict[str, Any]:
"""Fine-tune one dataset job via research/finetune.py."""
name = job["name"]
out_dir = f"{FINETUNE_VOL_PATH}/{name}"
Path(out_dir).mkdir(parents=True, exist_ok=True)
cmd = build_finetune_cmd(job, out_dir)
print("Running:", " ".join(cmd))
subprocess.run(cmd, cwd="/repo", check=True, env=repo_env())
commit_volumes()
results_path = Path(out_dir) / "training_results.json"
payload = json.loads(results_path.read_text())
payload["job_name"] = name
payload["output_dir"] = out_dir
return payload
@modal.method()
def lm_eval(
self,
*,
experiment_name: str,
config: str = "research/evals/configs/lm_eval_smoke.yaml",
preset: str | None = None,
model_path: str | None = None,
adapter_path: str | None = None,
compare_to: str | None = None,
tasks: list[str] | None = None,
limit: int | None = None,
num_fewshot: int | None = None,
batch_size: str | None = None,
device: str | None = None,
dtype: str | None = None,
seed: int | None = None,
) -> dict[str, Any]:
"""Run slm-lm-eval on base model or finetuned checkpoint."""
# Pick up adapters committed by another container (e.g. a separate
# eval-only invocation) — the warm container's mount may predate them.
# Only finetune_vol is needed here; reloading hf-cache can fail when
# hf-xet keeps log files open on the warm container's HF cache mount.
reload_finetune_volume()
if adapter_path:
adapter_dir = Path(adapter_path)
adapter_cfg = adapter_dir / "adapter_config.json"
if not adapter_cfg.is_file():
raise FileNotFoundError(
f"LoRA adapter not visible at {adapter_path} "
f"(missing {adapter_cfg.name})."
)
cmd = build_lm_eval_cmd(
experiment_name=experiment_name,
config=config,
preset=preset,
model_path=model_path,
adapter_path=adapter_path,
compare_to=compare_to,
tasks=tasks,
limit=limit,
num_fewshot=num_fewshot,
batch_size=batch_size,
device=device,
dtype=dtype,
seed=seed,
)
print("Running:", " ".join(cmd))
proc = subprocess.run(cmd, cwd="/repo", check=False, env=repo_env())
commit_volumes()
out_root = Path(LM_EVAL_OUTPUT) / experiment_name
results_json = out_root / "results.json"
summary_md = out_root / "summary.md"
comparison_md = out_root / "comparison.md"
return {
"experiment_name": experiment_name,
"config": config,
"preset": preset,
"model_path": model_path,
"adapter_path": adapter_path,
"compare_to": compare_to,
"tasks": tasks,
"limit": limit,
"num_fewshot": num_fewshot,
"batch_size": batch_size,
"device": device,
"dtype": dtype,
"seed": seed,
"results_json": str(results_json),
"summary_md": str(summary_md),
"comparison_md": str(comparison_md) if comparison_md.is_file() else None,
"exit_code": proc.returncode,
"ok": proc.returncode == 0,
}
@modal.method()
def check_gate(
self,
*,
candidate_results_path: str,
baseline_results_path: str | None,
goals: dict[str, Any],
general_candidate_results_path: str | None = None,
general_baseline_results_path: str | None = None,
general_goals: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Check skill + general lm-eval results against publish goals."""
if general_goals:
return check_publish_gate_files(
skill_candidate_path=candidate_results_path,
skill_baseline_path=baseline_results_path,
skill_goals=goals,
general_candidate_path=general_candidate_results_path,
general_baseline_path=general_baseline_results_path,
general_goals=general_goals,
)
return check_gate_files(
candidate_results_path=candidate_results_path,
baseline_results_path=baseline_results_path,
goals=goals,
)
@modal.method()
def publish_adapter(
self,
*,
job: dict[str, Any],
adapter_dir: str,
gate_result: dict[str, Any],
candidate_results_path: str,
baseline_results_path: str | None,
) -> dict[str, Any]:
"""Write a model card and push the adapter to the Hub, but only if the gate passed."""
return publish_adapter_files(
job=job,
adapter_dir=adapter_dir,
gate_result=gate_result,
candidate_results_path=candidate_results_path,
baseline_results_path=baseline_results_path,
)
@modal.method()
def run_pipeline(
self,
*,
job_names: list[str] | None = None,
category: str | None = None,
sector: str | None = None,
usecase: str | None = None,
profiles: list[str] | None = None,
max_steps: int | None = None,
max_samples: int | None = None,
finetune_overrides: dict[str, Any] | None = None,
train: bool = True,
eval_only: bool = False,
eval_tasks: list[str] | None = None,
eval_limit: int | None = None,
eval_num_fewshot: int | None = None,
eval_batch_size: str | None = None,
eval_device: str | None = None,
eval_dtype: str | None = None,
eval_seed: int | None = None,
skip_baseline: bool = False,
publish: bool = True,
plan_only: bool = False,
) -> dict[str, Any]:
"""Per-profile baselines -> finetune -> eval -> gate -> publish (same container)."""
defaults, prepared = prepare_jobs(
job=None,
category=category,
sector=sector,
usecase=usecase,
profiles=profiles,
max_steps=max_steps,
max_samples=max_samples,
finetune_overrides=finetune_overrides,
)
if job_names:
wanted = set(job_names)
prepared = [j for j in prepared if j.get("name") in wanted]
if not prepared:
raise ValueError(f"No matching jobs in experiments.yaml: {job_names}")
if not prepared:
raise ValueError("No jobs matched the requested filters")
preset = defaults.get("preset", "minicpm5-1b")
profile_names = baseline_profiles_for_jobs(prepared, defaults)
plan = job_plan_rows(prepared)
if plan_only:
return {"preset": preset, "jobs": plan}
baselines_ok = discover_cached_baselines(
profile_names,
preset=preset,
eval_tasks=eval_tasks,
eval_limit=eval_limit,
eval_num_fewshot=eval_num_fewshot,
eval_seed=eval_seed,
)
missing_baselines = profiles_needing_baseline_run(
profile_names, baselines_ok, skip_baseline=skip_baseline
)
for profile in missing_baselines:
exp = baseline_experiment_name(preset, profile)
cfg_path = config_for_profile(profile)
result = self.lm_eval.local(
experiment_name=exp,
config=cfg_path,
preset=preset,
tasks=eval_tasks,
limit=eval_limit,
num_fewshot=eval_num_fewshot,
batch_size=eval_batch_size,
device=eval_device,
dtype=eval_dtype,
seed=eval_seed,
)
baselines_ok[profile] = bool(result.get("ok"))
train_results: dict[str, dict[str, Any]] = {}
if train and not eval_only:
for j in prepared:
train_results[j["name"]] = self.finetune.local(j)
rows: list[dict[str, Any]] = []
gen_profile = general_eval_profile(defaults)
for j in prepared:
job_name = j["name"]
profile = j.get("eval_profile", "compare_study")
train_payload = train_results.get(job_name)
adapter_path = (
train_payload["output_dir"]
if train_payload
else f"{FINETUNE_VOL_PATH}/{job_name}"
)
baseline_path = f"{LM_EVAL_OUTPUT}/{baseline_experiment_name(preset, profile)}/results.json"
compare_to = baseline_path if baselines_ok.get(profile) else None
base_model_id = resolve_base_model_id(j, defaults)
exp_name = f"{job_name}__{profile}"
eval_result = self.lm_eval.local(
experiment_name=exp_name,
config=config_for_profile(profile),
model_path=base_model_id,
adapter_path=adapter_path,
compare_to=compare_to,
tasks=eval_tasks,
limit=eval_limit,
num_fewshot=eval_num_fewshot,
batch_size=eval_batch_size,
device=eval_device,
dtype=eval_dtype,
seed=eval_seed,
)
general_goals = general_goals_for_job(j, defaults)
general_eval_result: dict[str, Any] | None = None
general_candidate_path: str | None = None
general_baseline_path: str | None = None
if general_goals:
general_baseline_path = (
f"{LM_EVAL_OUTPUT}/{baseline_experiment_name(preset, gen_profile)}/results.json"
)
gen_compare_to = (
general_baseline_path if baselines_ok.get(gen_profile) else None
)
gen_exp_name = f"{job_name}__{gen_profile}"
general_eval_result = self.lm_eval.local(
experiment_name=gen_exp_name,
config=config_for_profile(gen_profile),
model_path=base_model_id,
adapter_path=adapter_path,
compare_to=gen_compare_to,
tasks=eval_tasks,
limit=eval_limit,
num_fewshot=eval_num_fewshot,
batch_size=eval_batch_size,
device=eval_device,
dtype=eval_dtype,
seed=eval_seed,
)
general_candidate_path = general_eval_result["results_json"]
row: dict[str, Any] = {
"name": job_name,
"category": j.get("category"),
"profile": profile,
"general_profile": gen_profile if general_goals else None,
"plan": next((p for p in plan if p["name"] == job_name), None),
"eval": eval_result,
}
if general_eval_result:
row["general_eval"] = general_eval_result
gate_result: dict[str, Any] | None = None
if j.get("goals"):
skill_ok = bool(eval_result.get("ok"))
general_ok = (
not general_goals
or bool(general_eval_result and general_eval_result.get("ok"))
)
if skill_ok and general_ok:
gate_result = self.check_gate.local(
candidate_results_path=eval_result["results_json"],
baseline_results_path=baseline_path,
goals=j["goals"],
general_candidate_results_path=general_candidate_path,
general_baseline_results_path=general_baseline_path,
general_goals=general_goals,
)
row["gate"] = gate_result
if j.get("publish") and publish and gate_result is not None:
row["publish"] = self.publish_adapter.local(
job=j,
adapter_dir=adapter_path,
gate_result=gate_result,
candidate_results_path=eval_result["results_json"],
baseline_results_path=baseline_path,
)
rows.append(row)
return {"jobs": rows}
def _worker() -> GpuWorker:
"""Prefer deployed warm worker; fall back to ephemeral cls for first deploy."""
try:
cls = modal.Cls.from_name(APP_NAME, "GpuWorker")
return cls()
except modal.exception.NotFoundError:
return GpuWorker()
@app.local_entrypoint()
def main(
serve: bool = True,
hours: float = DEFAULT_KEEPALIVE_HOURS,
cmd: str | None = None,
job: str | None = None,
category: str | None = None,
sector: str | None = None,
usecase: str | None = None,
profiles: str | None = None,
max_steps: int | None = None,
max_samples: int | None = None,
finetune_args_json: str | None = None,
eval_only: bool = False,
pipeline: bool = False,
publish: bool = True,
publish_only: bool = False,
pull: bool = True,
ping: bool = False,
plan: bool = False,
skip_baseline: bool = False,
eval_tasks: str | None = None,
eval_limit: int | None = None,
eval_num_fewshot: int | None = None,
eval_batch_size: str | None = None,
eval_device: str | None = None,
eval_dtype: str | None = None,
eval_seed: int | None = None,
):
"""
GPU worker CLI.
With no task flags, keeps one container alive (default). With --job/--category,
--cmd, --eval-only, --pipeline, or --publish-only, runs that task on the warm
worker instead. --pipeline (and --job/--category/--eval-only) run the skill-matrix
pipeline: per-profile baselines -> finetune -> eval -> gate -> publish.
Examples:
modal deploy research/modal/server_app.py
modal run research/modal/server_app.py
modal run research/modal/server_app.py --pipeline --job math-lora --max-steps 20
modal run research/modal/server_app.py --pipeline --category science --no-publish
modal run research/modal/server_app.py --pipeline --sector science --eval-limit 25
modal run research/modal/server_app.py --plan --profiles math,science
modal run research/modal/server_app.py --eval-only --job math-lora
modal run research/modal/server_app.py --publish-only --job math-lora
modal run research/modal/server_app.py --cmd "uv run python research/finetune.py --help"
"""
has_task = bool(
cmd
or job
or category
or sector
or usecase
or profiles
or eval_only
or pipeline
or publish_only
or ping
or plan
)
if has_task:
serve = False
worker = _worker()
if ping:
print(json.dumps(worker.ping.remote(), indent=2))
return
if cmd:
argv = shlex.split(cmd)
result = worker.exec_cmd.remote(argv)
if result.get("stdout"):
print(result["stdout"], end="")
if result.get("stderr"):
print(result["stderr"], end="", file=__import__("sys").stderr)
if not result.get("ok"):
raise SystemExit(result.get("exit_code", 1))
return
if publish_only:
if not job:
raise SystemExit("--publish-only requires --job")
defaults, prepared = prepare_jobs(job=job)
j = prepared[0]
if not j.get("goals") or not j.get("publish"):
raise SystemExit(f"Job {job!r} needs `goals` and `publish` in experiments.yaml")
preset = defaults.get("preset", "minicpm5-1b")
profile = j.get("eval_profile", "compare_study")
gen_profile = general_eval_profile(defaults)
general_goals = general_goals_for_job(j, defaults)
adapter_path = f"{FINETUNE_VOL_PATH}/{job}"
candidate_results_path, baseline_results_path, _ = eval_paths(
job_name=job, preset=preset, profile=profile
)
general_candidate_path = None
general_baseline_path = None
if general_goals:
general_candidate_path, general_baseline_path, _ = eval_paths(
job_name=job, preset=preset, profile=gen_profile
)
gate_result = worker.check_gate.remote(
candidate_results_path=candidate_results_path,
baseline_results_path=baseline_results_path,
goals=j["goals"],
general_candidate_results_path=general_candidate_path,
general_baseline_results_path=general_baseline_path,
general_goals=general_goals,
)
print(json.dumps(gate_result, indent=2))
result = worker.publish_adapter.remote(
job=j,
adapter_dir=adapter_path,
gate_result=gate_result,
candidate_results_path=candidate_results_path,
baseline_results_path=baseline_results_path,
)
print(json.dumps(result, indent=2))
return
if pipeline or job or category or sector or usecase or profiles or eval_only or plan:
job_names = [job] if job else None
result = worker.run_pipeline.remote(
job_names=job_names,
category=category,
sector=sector,
usecase=usecase,
profiles=split_csv(profiles),
max_steps=max_steps,
max_samples=max_samples,
finetune_overrides=parse_json_object(
finetune_args_json, flag="--finetune-args-json"
),
train=not eval_only,
eval_only=eval_only,
eval_tasks=split_csv(eval_tasks),
eval_limit=eval_limit,
eval_num_fewshot=eval_num_fewshot,
eval_batch_size=eval_batch_size,
eval_device=eval_device,
eval_dtype=eval_dtype,
eval_seed=eval_seed,
skip_baseline=skip_baseline,
publish=publish,
plan_only=plan,
)
print(json.dumps(result, indent=2))
if plan:
return
if pull:
for row in result.get("jobs", []):
pull_artifacts(row["name"], f"{row['name']}__{row['profile']}")
if row.get("general_profile"):
pull_artifacts(
row["name"],
f"{row['name']}__{row['general_profile']}",
dest="models/finetuned",
)
return
if serve:
print(
f"Keeping GpuWorker alive for {hours}h "
f"(deploy with `modal deploy` so other terminals reuse this container)"
)
worker.ping.remote()
result = worker.keep_alive.remote(hours=hours)
print(json.dumps(result, indent=2))
return
raise SystemExit(
"Nothing to do. Use default serve mode, or pass --job, --category, --cmd, "
"--pipeline, --eval-only, --publish-only, or --ping."
)