Spaces:
Sleeping
Sleeping
| """Shared Modal image, volumes, and command builders for finetune + server apps.""" | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| import modal | |
| import yaml | |
| _file = Path(__file__).resolve() | |
| try: | |
| LOCAL_REPO_ROOT = _file.parents[2] | |
| except IndexError: | |
| LOCAL_REPO_ROOT = Path("/repo") | |
| if (_file.parent / "experiments.yaml").is_file(): | |
| EXPERIMENTS_PATH = _file.parent / "experiments.yaml" | |
| else: | |
| EXPERIMENTS_PATH = Path("/repo/research/modal/experiments.yaml") | |
| _EVAL_PROFILES_REL = "research/evals/configs/eval_profiles.yaml" | |
| if (LOCAL_REPO_ROOT / _EVAL_PROFILES_REL).is_file(): | |
| EVAL_PROFILES_PATH = LOCAL_REPO_ROOT / _EVAL_PROFILES_REL | |
| else: | |
| EVAL_PROFILES_PATH = Path("/repo") / _EVAL_PROFILES_REL | |
| REPO_ROOT = LOCAL_REPO_ROOT | |
| HF_CACHE_PATH = "/root/.cache/huggingface" | |
| FINETUNE_VOL_PATH = "/vol/finetuned" | |
| LM_EVAL_OUTPUT = f"{FINETUNE_VOL_PATH}/results/lm_eval" | |
| BASE_MODEL_ID = "openbmb/MiniCPM5-1B" | |
| BASELINE_EXPERIMENT = "minicpm5-1b__modal-baseline" | |
| BASELINE_RESULTS_JSON = f"{LM_EVAL_OUTPUT}/{BASELINE_EXPERIMENT}/results.json" | |
| # Shared general-capability profile for publish gates (limit 100; see compare_study). | |
| GENERAL_EVAL_PROFILE = "compare_study" | |
| # Metric keys to prefer when picking a task's "primary" score, in priority | |
| # order. Covers lm-eval-harness multiple-choice (acc), generation (exact_match), | |
| # and code (pass@1) tasks so gates and model cards pick a real score, not a stderr. | |
| _METRIC_PRIORITY = ( | |
| "acc,none", | |
| "acc_norm,none", | |
| "exact_match,strict-match", | |
| "exact_match,flexible-extract", | |
| "pass_at_1,create_test", | |
| "pass_at_1,none", | |
| "f1,none", | |
| "bleu,none", | |
| ) | |
| hf_cache_vol = modal.Volume.from_name("hf-cache", create_if_missing=True) | |
| finetune_vol = modal.Volume.from_name("slm-finetune", create_if_missing=True) | |
| hf_secret = modal.Secret.from_name("huggingface") | |
| image = ( | |
| modal.Image.debian_slim(python_version="3.12") | |
| .apt_install("git", "build-essential") | |
| .pip_install("uv", "pyyaml", "huggingface_hub") | |
| .add_local_dir( | |
| str(REPO_ROOT), | |
| remote_path="/repo", | |
| copy=True, | |
| ignore=[ | |
| ".git/**", | |
| ".venv/**", | |
| "models/**", | |
| "results/**", | |
| "outputs/**", | |
| "**/__pycache__/**", | |
| "**/.pytest_cache/**", | |
| "**/node_modules/**", | |
| ], | |
| ) | |
| .run_commands( | |
| "cd /repo && uv sync --frozen --group finetune --group lm-eval --no-dev", | |
| # lm-eval's ifeval task (instructions profile) needs these, declared via | |
| # the lm-eval[ifeval] extra but not activated into the project venv by the | |
| # frozen group sync. Install the lock-pinned versions into /repo/.venv so | |
| # `uv run slm-lm-eval` can import them. | |
| "cd /repo && uv pip install langdetect==1.0.9 immutabledict==4.3.1", | |
| ) | |
| ) | |
| COMMON_ENV = { | |
| "TRUST_REMOTE_CODE": "true", | |
| "HF_HOME": HF_CACHE_PATH, | |
| # Keep hf-xet logs off the HF cache Volume mount so volume.reload() is not | |
| # blocked by open log file handles on warm containers. | |
| "HF_XET_LOG_DEST": "/tmp/xet-logs/", | |
| "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", | |
| } | |
| DEFAULT_GPU = "A10G" | |
| DEFAULT_KEEPALIVE_HOURS = 4.0 | |
| DEFAULT_SCALEDOWN_WINDOW = 3600 # max allowed by Modal (1h idle before scale-down) | |
| DEFAULT_WORKER_TIMEOUT = 14400 # 4h per method call | |
| def repo_env() -> dict[str, str]: | |
| return {**os.environ, **COMMON_ENV} | |
| def _reload_volume_safe(vol: modal.Volume, *, label: str) -> None: | |
| """Reload a Volume; skip (with warning) when open files block the operation.""" | |
| try: | |
| vol.reload() | |
| except (RuntimeError, modal.exception.ConflictError) as exc: | |
| if "open files preventing the operation" in str(exc): | |
| print(f"warning: skipping {label} volume reload ({exc})") | |
| return | |
| raise | |
| def reload_finetune_volume() -> None: | |
| finetune_vol.reload() | |
| def reload_volumes() -> None: | |
| reload_finetune_volume() | |
| _reload_volume_safe(hf_cache_vol, label="hf-cache") | |
| def commit_volumes() -> None: | |
| finetune_vol.commit() | |
| hf_cache_vol.commit() | |
| def load_experiments() -> dict[str, Any]: | |
| with EXPERIMENTS_PATH.open() as f: | |
| return yaml.safe_load(f) or {} | |
| def apply_defaults(job: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]: | |
| return {**defaults, **job} | |
| # Scalar hyperparameters an experiments.yaml job (or its nested `args:` block) | |
| # may set; each maps 1:1 onto a research/finetune.py flag so any run is tunable | |
| # from config without code changes. | |
| _FINETUNE_FLAGS: dict[str, str] = { | |
| "model": "--model", | |
| "lr": "--lr", | |
| "batch_size": "--batch_size", | |
| "grad_accum": "--grad_accum", | |
| "max_len": "--max_len", | |
| "warmup_ratio": "--warmup_ratio", | |
| "weight_decay": "--weight_decay", | |
| "max_grad_norm": "--max_grad_norm", | |
| "lr_scheduler": "--lr_scheduler", | |
| "logging_steps": "--logging_steps", | |
| "eval_steps": "--eval_steps", | |
| "save_steps": "--save_steps", | |
| "save_total_limit": "--save_total_limit", | |
| "early_stopping_patience": "--early_stopping_patience", | |
| "neftune_noise_alpha": "--neftune_noise_alpha", | |
| "report_to": "--report_to", | |
| "seed": "--seed", | |
| "lora_r": "--lora_r", | |
| "lora_alpha": "--lora_alpha", | |
| "lora_dropout": "--lora_dropout", | |
| "lora_targets": "--lora_targets", | |
| "val_split": "--val_split", | |
| "device": "--device", | |
| } | |
| def split_csv(value: str | None) -> list[str] | None: | |
| if not value: | |
| return None | |
| items = [item.strip() for item in value.split(",") if item.strip()] | |
| return items or None | |
| def parse_json_object(value: str | None, *, flag: str) -> dict[str, Any]: | |
| if not value: | |
| return {} | |
| try: | |
| parsed = json.loads(value) | |
| except json.JSONDecodeError as exc: | |
| raise SystemExit(f"{flag} must be a JSON object: {exc}") from exc | |
| if not isinstance(parsed, dict): | |
| raise SystemExit(f"{flag} must be a JSON object") | |
| return parsed | |
| def job_plan_rows(jobs: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Compact, printable description of selected jobs and their eval profile.""" | |
| rows = [] | |
| for job in jobs: | |
| rows.append( | |
| { | |
| "name": job.get("name"), | |
| "category": job.get("category"), | |
| "usecase": job.get("usecase") or job.get("use_case"), | |
| "profile": job.get("eval_profile", "compare_study"), | |
| "dataset": "mix" if job.get("mix") else job.get("dataset"), | |
| "mode": job.get("mode", "lora"), | |
| "max_steps": job.get("max_steps"), | |
| "max_samples": job.get("max_samples"), | |
| "publish": bool(job.get("publish")), | |
| } | |
| ) | |
| return rows | |
| def build_finetune_cmd(job: dict[str, Any], out_dir: str) -> list[str]: | |
| cmd = [ | |
| "uv", | |
| "run", | |
| "python", | |
| "research/finetune.py", | |
| "--preset", | |
| job.get("preset", "minicpm5-1b"), | |
| "--mode", | |
| job.get("mode", "lora"), | |
| "--out", | |
| out_dir, | |
| ] | |
| # Dataset: a `mix:` list (skill data + general replay) takes precedence over | |
| # a single --dataset/--format source. | |
| if job.get("mix"): | |
| cmd.extend(["--mix-json", json.dumps(job["mix"])]) | |
| else: | |
| cmd.extend(["--dataset", job["dataset"], "--format", job["format"]]) | |
| if job.get("dataset_config"): | |
| cmd.extend(["--dataset-config", job["dataset_config"]]) | |
| if job.get("dataset_split"): | |
| cmd.extend(["--dataset-split", str(job["dataset_split"])]) | |
| if job.get("max_samples") is not None: | |
| cmd.extend(["--dataset-max-samples", str(int(job["max_samples"]))]) | |
| # Optional column remap so a dataset's own columns fit the --format | |
| # (e.g. MetaMathQA query/response -> prompt format). | |
| for field, col in (job.get("columns") or {}).items(): | |
| cmd.extend([f"--{field}-key", str(col)]) | |
| if job.get("max_steps") is not None: | |
| cmd.extend(["--max_steps", str(int(job["max_steps"]))]) | |
| if job.get("epochs") is not None: | |
| cmd.extend(["--epochs", str(job["epochs"])]) | |
| if job.get("mask_prompt") is False: | |
| cmd.append("--no_mask_prompt") | |
| # Scalar hyperparameters: top-level keys plus an optional nested `args:` block. | |
| overrides = {k: job[k] for k in _FINETUNE_FLAGS if k in job} | |
| overrides.update(job.get("args") or {}) | |
| for key, value in overrides.items(): | |
| flag = _FINETUNE_FLAGS.get(key, f"--{key}") | |
| if isinstance(value, bool): | |
| if value: | |
| cmd.append(flag) | |
| else: | |
| cmd.extend([flag, str(value)]) | |
| return cmd | |
| def build_lm_eval_cmd( | |
| *, | |
| experiment_name: str, | |
| config: str, | |
| preset: str | None = None, | |
| model_path: str | None = None, | |
| adapter_path: str | None = None, | |
| compare_to: str | None = None, | |
| tasks: list[str] | None = None, | |
| limit: int | None = None, | |
| num_fewshot: int | None = None, | |
| batch_size: str | None = None, | |
| device: str | None = None, | |
| dtype: str | None = None, | |
| seed: int | None = None, | |
| ) -> list[str]: | |
| cmd = [ | |
| "uv", | |
| "run", | |
| "--package", | |
| "slm-evals", | |
| "slm-lm-eval", | |
| "--config", | |
| config, | |
| "--experiment-name", | |
| experiment_name, | |
| "--output-dir", | |
| LM_EVAL_OUTPUT, | |
| ] | |
| if preset: | |
| cmd.extend(["--preset", preset]) | |
| if model_path: | |
| cmd.extend(["--model", model_path]) | |
| if adapter_path: | |
| cmd.extend(["--adapter", adapter_path]) | |
| if compare_to: | |
| cmd.extend(["--compare-to", compare_to]) | |
| if tasks: | |
| cmd.append("--tasks") | |
| cmd.extend(tasks) | |
| if limit is not None: | |
| cmd.extend(["--limit", str(int(limit))]) | |
| if num_fewshot is not None: | |
| cmd.extend(["--num-fewshot", str(int(num_fewshot))]) | |
| if batch_size: | |
| cmd.extend(["--batch-size", str(batch_size)]) | |
| if device: | |
| cmd.extend(["--device", str(device)]) | |
| if dtype: | |
| cmd.extend(["--dtype", str(dtype)]) | |
| if seed is not None: | |
| cmd.extend(["--seed", str(int(seed))]) | |
| return cmd | |
| def _matches_job_filters( | |
| job: dict[str, Any], | |
| *, | |
| sector: str | None = None, | |
| usecase: str | None = None, | |
| profiles: list[str] | None = None, | |
| ) -> bool: | |
| if sector and job.get("sector", job.get("category")) != sector: | |
| return False | |
| if usecase: | |
| values = { | |
| job.get("usecase"), | |
| job.get("use_case"), | |
| job.get("category"), | |
| job.get("name"), | |
| } | |
| values.update(job.get("tags") or []) | |
| if usecase not in values: | |
| return False | |
| if profiles and job.get("eval_profile", "compare_study") not in profiles: | |
| return False | |
| return True | |
| def prepare_jobs( | |
| *, | |
| job: str | None = None, | |
| category: str | None = None, | |
| sector: str | None = None, | |
| usecase: str | None = None, | |
| profiles: list[str] | None = None, | |
| max_steps: int | None = None, | |
| max_samples: int | None = None, | |
| finetune_overrides: dict[str, Any] | None = None, | |
| ) -> tuple[dict[str, Any], list[dict[str, Any]]]: | |
| spec = load_experiments() | |
| defaults = spec.get("defaults", {}) | |
| jobs = spec.get("finetune", []) | |
| if job: | |
| jobs = [j for j in jobs if j.get("name") == job] | |
| if not jobs: | |
| raise SystemExit( | |
| f"Unknown job {job!r}; check research/modal/experiments.yaml" | |
| ) | |
| if category: | |
| jobs = [j for j in jobs if j.get("category") == category] | |
| if not jobs: | |
| raise SystemExit(f"No jobs with category {category!r}") | |
| if sector or usecase or profiles: | |
| jobs = [ | |
| j | |
| for j in jobs | |
| if _matches_job_filters( | |
| j, | |
| sector=sector, | |
| usecase=usecase, | |
| profiles=profiles, | |
| ) | |
| ] | |
| if not jobs: | |
| filters = { | |
| "sector": sector, | |
| "usecase": usecase, | |
| "profiles": profiles, | |
| } | |
| raise SystemExit(f"No jobs matched filters: {filters}") | |
| prepared: list[dict[str, Any]] = [] | |
| for raw in jobs: | |
| merged = apply_defaults(raw, defaults) | |
| if max_steps is not None: | |
| merged["max_steps"] = max_steps | |
| if max_samples is not None: | |
| merged["max_samples"] = max_samples | |
| if finetune_overrides: | |
| args = {**(merged.get("args") or {})} | |
| for key, value in finetune_overrides.items(): | |
| if key in _FINETUNE_FLAGS: | |
| args[key] = value | |
| else: | |
| merged[key] = value | |
| if args: | |
| merged["args"] = args | |
| prepared.append(merged) | |
| return defaults, prepared | |
| def job_gpu(job: dict[str, Any]) -> str: | |
| return job.get("gpu") or DEFAULT_GPU | |
| def job_needs_general_gate(job: dict[str, Any]) -> bool: | |
| """Publishable jobs run a second general eval and must pass `general_goals`.""" | |
| return bool(job.get("goals") and job.get("publish")) | |
| def general_eval_profile(defaults: dict[str, Any]) -> str: | |
| return defaults.get("general_eval_profile", GENERAL_EVAL_PROFILE) | |
| def general_goals_for_job( | |
| job: dict[str, Any], defaults: dict[str, Any] | |
| ) -> dict[str, Any] | None: | |
| if not job_needs_general_gate(job): | |
| return None | |
| goals = job.get("general_goals") or defaults.get("general_goals") | |
| return goals if goals else None | |
| def baseline_profiles_for_jobs( | |
| jobs: list[dict[str, Any]], defaults: dict[str, Any] | |
| ) -> list[str]: | |
| profiles = {j.get("eval_profile", "compare_study") for j in jobs} | |
| if any(job_needs_general_gate(j) for j in jobs): | |
| profiles.add(general_eval_profile(defaults)) | |
| return sorted(profiles) | |
| def baseline_experiment_name(preset: str, profile: str) -> str: | |
| """Volume path key for the unfine-tuned base model on a given eval profile.""" | |
| return f"{preset}__baseline__{profile}" | |
| def _load_models_registry() -> dict[str, Any]: | |
| path = REPO_ROOT / "models.yaml" | |
| if not path.is_file(): | |
| path = Path("/repo") / "models.yaml" | |
| if not path.is_file(): | |
| return {} | |
| with path.open() as f: | |
| return yaml.safe_load(f) or {} | |
| def resolve_base_model_id(job: dict[str, Any], defaults: dict[str, Any]) -> str: | |
| """Hub/path id of the base model this job fine-tunes — used as the eval baseline.""" | |
| explicit = job.get("model") or (job.get("args") or {}).get("model") | |
| if explicit: | |
| return str(explicit) | |
| preset = job.get("preset", defaults.get("preset", "minicpm5-1b")) | |
| entry = (_load_models_registry().get("models") or {}).get(preset) or {} | |
| return entry.get("model_id") or BASE_MODEL_ID | |
| def discover_cached_baselines( | |
| profile_names: list[str], | |
| *, | |
| preset: str, | |
| eval_tasks: list[str] | None = None, | |
| eval_limit: int | None = None, | |
| eval_num_fewshot: int | None = None, | |
| eval_seed: int | None = None, | |
| ) -> dict[str, bool]: | |
| """True per profile when base-model baseline results already exist on the Volume.""" | |
| cached: dict[str, bool] = {} | |
| for profile in profile_names: | |
| cached[profile] = baseline_is_cached( | |
| baseline_experiment_name(preset, profile), | |
| config_for_profile(profile), | |
| tasks=eval_tasks, | |
| limit=eval_limit, | |
| num_fewshot=eval_num_fewshot, | |
| seed=eval_seed, | |
| ) | |
| return cached | |
| def profiles_needing_baseline_run( | |
| profile_names: list[str], | |
| cached: dict[str, bool], | |
| *, | |
| skip_baseline: bool, | |
| ) -> list[str]: | |
| if skip_baseline: | |
| return [] | |
| return [profile for profile in profile_names if not cached.get(profile)] | |
| def eval_paths( | |
| *, | |
| job_name: str, | |
| preset: str, | |
| profile: str, | |
| ) -> tuple[str, str, str]: | |
| """Return (candidate_results_path, baseline_results_path, experiment_name).""" | |
| exp_name = f"{job_name}__{profile}" | |
| candidate = f"{LM_EVAL_OUTPUT}/{exp_name}/results.json" | |
| baseline = f"{LM_EVAL_OUTPUT}/{baseline_experiment_name(preset, profile)}/results.json" | |
| return candidate, baseline, exp_name | |
| def config_for_profile(profile: str) -> str: | |
| """Map an eval_profiles.yaml profile name to its config path (relative to repo root).""" | |
| with EVAL_PROFILES_PATH.open() as f: | |
| catalog = yaml.safe_load(f) or {} | |
| meta = (catalog.get("profiles") or {}).get(profile) | |
| if not meta or not meta.get("config"): | |
| known = ", ".join(sorted((catalog.get("profiles") or {}))) | |
| raise SystemExit( | |
| f"Unknown eval_profile {profile!r}; check {_EVAL_PROFILES_REL} (known: {known})" | |
| ) | |
| return f"research/evals/configs/{meta['config']}" | |
| def primary_metric(task_metrics: dict[str, Any]) -> tuple[str, float] | None: | |
| """Pick a task's headline (metric_name, score), matching slm_evals summary tables.""" | |
| for key in _METRIC_PRIORITY: | |
| if key in task_metrics and isinstance(task_metrics[key], (int, float)): | |
| return key, float(task_metrics[key]) | |
| for key, value in task_metrics.items(): | |
| if "stderr" in key: | |
| continue | |
| if isinstance(value, (int, float)): | |
| return key, float(value) | |
| return None | |
| def baseline_is_cached( | |
| experiment_name: str, | |
| config_path: str, | |
| *, | |
| tasks: list[str] | None = None, | |
| limit: int | None = None, | |
| num_fewshot: int | None = None, | |
| seed: int | None = None, | |
| ) -> bool: | |
| """True if a baseline results.json exists AND its run_meta still matches the | |
| profile config's tasks/limit/num_fewshot. Config changes (e.g. new guard | |
| tasks or a higher limit) therefore correctly force a fresh baseline.""" | |
| results = Path(LM_EVAL_OUTPUT) / experiment_name / "results.json" | |
| if not results.is_file(): | |
| return False | |
| candidates = [Path(config_path)] | |
| if not Path(config_path).is_absolute(): | |
| candidates += [REPO_ROOT / config_path, Path("/repo") / config_path] | |
| cfg_file = next((p for p in candidates if p.is_file()), None) | |
| if cfg_file is None: | |
| return False | |
| try: | |
| meta = json.loads(results.read_text()).get("run_meta", {}) | |
| cfg = yaml.safe_load(cfg_file.read_text()) or {} | |
| except Exception: | |
| return False | |
| expected_tasks = tasks or cfg.get("tasks") or [] | |
| expected_limit = limit if limit is not None else cfg.get("limit") | |
| expected_fewshot = ( | |
| num_fewshot if num_fewshot is not None else cfg.get("num_fewshot", 0) | |
| ) | |
| expected_seed = seed if seed is not None else cfg.get("seed") | |
| same = ( | |
| sorted(meta.get("tasks") or []) == sorted(expected_tasks) | |
| and meta.get("limit") == expected_limit | |
| and meta.get("num_fewshot") == expected_fewshot | |
| ) | |
| if expected_seed is not None: | |
| same = same and meta.get("seed") == expected_seed | |
| return same | |
| def evaluate_gate( | |
| *, | |
| candidate: dict[str, Any], | |
| baseline: dict[str, Any] | None, | |
| goals: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| """Check a candidate's lm-eval results dict against `goals` (Hub publish gate). | |
| `goals` schema: | |
| task: <lm-eval task name, optional when only guard_tasks are set> | |
| min_score: <float, optional> # candidate score must be >= this | |
| min_improve: <float, optional> # candidate - baseline must be >= this | |
| guard_tasks: # optional regression guards | |
| - task: <lm-eval task name> | |
| max_regress: <float> # baseline - candidate must be <= this | |
| """ | |
| cand_tasks = candidate.get("results", {}) | |
| base_tasks = (baseline or {}).get("results", {}) | |
| def _score(tasks: dict[str, Any], task_name: str) -> float | None: | |
| metrics = tasks.get(task_name) | |
| if not metrics: | |
| return None | |
| picked = primary_metric(metrics) | |
| return picked[1] if picked else None | |
| checks: list[dict[str, Any]] = [] | |
| passed = True | |
| task = goals.get("task") | |
| cand_score = base_score = None | |
| if task: | |
| cand_score = _score(cand_tasks, task) | |
| base_score = _score(base_tasks, task) | |
| # Tolerance so a score landing exactly on a threshold (e.g. a clean +0.02 | |
| # improvement stored as 0.0199999996) is not rejected by float epsilon. | |
| eps = 1e-9 | |
| if goals.get("min_score") is not None: | |
| ok = cand_score is not None and cand_score >= goals["min_score"] - eps | |
| checks.append({"check": f"{task} >= {goals['min_score']}", "value": cand_score, "ok": ok}) | |
| passed = passed and ok | |
| if goals.get("min_improve") is not None: | |
| delta = ( | |
| cand_score - base_score | |
| if (cand_score is not None and base_score is not None) | |
| else None | |
| ) | |
| ok = delta is not None and delta >= goals["min_improve"] - eps | |
| checks.append( | |
| {"check": f"{task} improve >= {goals['min_improve']}", "value": delta, "ok": ok} | |
| ) | |
| passed = passed and ok | |
| for guard in goals.get("guard_tasks", []): | |
| g_task = guard["task"] | |
| g_cand = _score(cand_tasks, g_task) | |
| g_base = _score(base_tasks, g_task) | |
| regress = g_base - g_cand if (g_cand is not None and g_base is not None) else None | |
| ok = regress is not None and regress <= guard["max_regress"] + eps | |
| checks.append( | |
| {"check": f"{g_task} regress <= {guard['max_regress']}", "value": regress, "ok": ok} | |
| ) | |
| passed = passed and ok | |
| if not checks: | |
| passed = False | |
| checks.append({"check": "goals defined no checks", "value": None, "ok": False}) | |
| return { | |
| "passed": passed, | |
| "checks": checks, | |
| "task": task, | |
| "candidate_score": cand_score, | |
| "baseline_score": base_score, | |
| } | |
| def pull_artifacts(job_name: str, exp_name: str, dest: str = "models/finetuned") -> None: | |
| """Download an adapter and its lm-eval results from the `slm-finetune` Volume (run locally).""" | |
| import shutil | |
| import subprocess | |
| def _get(remote: str, parent: str) -> None: | |
| # For a folder REMOTE_PATH, `modal volume get` expects the *parent* | |
| # directory as the destination and recreates the folder inside it. | |
| # Passing the full target path (parent/<name>) raises | |
| # "[Errno 21] Is a directory". Clear the target first for a clean pull. | |
| name = remote.rsplit("/", 1)[-1] | |
| shutil.rmtree(Path(parent) / name, ignore_errors=True) | |
| Path(parent).mkdir(parents=True, exist_ok=True) | |
| subprocess.run( | |
| ["modal", "volume", "get", "slm-finetune", remote, f"{parent}/", "--force"], | |
| check=False, | |
| ) | |
| print(f"--- pulling {job_name} -> {dest}/{job_name} ---") | |
| _get(job_name, dest) | |
| exp_dir = f"results/lm_eval/{exp_name}" | |
| print(f"--- pulling {exp_dir} ---") | |
| _get(exp_dir, "results/lm_eval") | |
| def check_gate_files( | |
| *, | |
| candidate_results_path: str, | |
| baseline_results_path: str | None, | |
| goals: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| """Like evaluate_gate(), but reads results.json files (run inside a volume-mounted function).""" | |
| cand_path = Path(candidate_results_path) | |
| if not cand_path.is_file(): | |
| return {"passed": False, "checks": [], "reason": f"missing results file: {cand_path}"} | |
| candidate = json.loads(cand_path.read_text()) | |
| baseline = None | |
| if baseline_results_path and Path(baseline_results_path).is_file(): | |
| baseline = json.loads(Path(baseline_results_path).read_text()) | |
| return evaluate_gate(candidate=candidate, baseline=baseline, goals=goals) | |
| def check_publish_gate_files( | |
| *, | |
| skill_candidate_path: str, | |
| skill_baseline_path: str | None, | |
| skill_goals: dict[str, Any], | |
| general_candidate_path: str | None = None, | |
| general_baseline_path: str | None = None, | |
| general_goals: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """Gate on skill-specific eval plus optional general-capability eval.""" | |
| skill_gate = check_gate_files( | |
| candidate_results_path=skill_candidate_path, | |
| baseline_results_path=skill_baseline_path, | |
| goals=skill_goals, | |
| ) | |
| general_gate: dict[str, Any] | None = None | |
| if general_goals: | |
| if not general_candidate_path: | |
| general_gate = { | |
| "passed": False, | |
| "checks": [ | |
| { | |
| "check": "general eval results missing", | |
| "value": None, | |
| "ok": False, | |
| } | |
| ], | |
| "reason": "general candidate results path not provided", | |
| } | |
| else: | |
| general_gate = check_gate_files( | |
| candidate_results_path=general_candidate_path, | |
| baseline_results_path=general_baseline_path, | |
| goals=general_goals, | |
| ) | |
| passed = skill_gate.get("passed") and ( | |
| general_gate is None or general_gate.get("passed") | |
| ) | |
| checks = list(skill_gate.get("checks", [])) | |
| if general_gate: | |
| for check in general_gate.get("checks", []): | |
| checks.append({**check, "check": f"general: {check['check']}"}) | |
| return { | |
| "passed": passed, | |
| "checks": checks, | |
| "skill": skill_gate, | |
| "general": general_gate, | |
| "task": skill_gate.get("task"), | |
| "candidate_score": skill_gate.get("candidate_score"), | |
| "baseline_score": skill_gate.get("baseline_score"), | |
| } | |
| def render_model_card( | |
| *, | |
| job: dict[str, Any], | |
| gate_result: dict[str, Any], | |
| candidate: dict[str, Any], | |
| baseline: dict[str, Any] | None, | |
| training_payload: dict[str, Any] | None, | |
| ) -> str: | |
| def _fmt(v: float | None) -> str: | |
| return "—" if v is None else f"{v:.4f}" | |
| cand_tasks = candidate.get("results", {}) | |
| base_tasks = (baseline or {}).get("results", {}) | |
| base_model = (training_payload or {}).get("model") or BASE_MODEL_ID | |
| # A job is either a single dataset (`dataset`/`format`) or a `mix:` of sources. | |
| if job.get("mix"): | |
| dataset_desc = " + ".join( | |
| f"`{s.get('dataset', '?')}`" for s in job["mix"] | |
| ) | |
| format_desc = "mix" | |
| else: | |
| dataset_desc = f"`{job.get('dataset', '?')}`" | |
| format_desc = job.get("format", "?") | |
| lines = [ | |
| "---", | |
| "library_name: peft", | |
| f"base_model: {base_model}", | |
| "license: apache-2.0", | |
| "tags:", | |
| " - lora", | |
| " - qlora", | |
| " - build-small-hackathon", | |
| " - well-tuned", | |
| f" - {job.get('category', 'general')}", | |
| "---", | |
| "", | |
| f"# {job['name']}", | |
| "", | |
| f"QLoRA adapter for **{job.get('category', 'general')}**, fine-tuned from " | |
| f"`{base_model}` on {dataset_desc} (format: `{format_desc}`).", | |
| "", | |
| "Trained, evaluated, and gated on [Modal](https://modal.com/docs/guide) via " | |
| "`research/modal/` (app `slm-finetune-benchmark`).", | |
| "", | |
| "## Benchmark gate", | |
| "", | |
| f"- skill eval profile: `{job.get('eval_profile')}`", | |
| f"- gate: {'**PASSED**' if gate_result.get('passed') else '**FAILED**'}", | |
| "", | |
| ] | |
| def _gate_table(section: dict[str, Any] | None, *, prefix: str = "") -> list[str]: | |
| if not section: | |
| return [] | |
| out = [ | |
| f"### {prefix}checks".strip(), | |
| "", | |
| "| check | value | result |", | |
| "| --- | ---: | --- |", | |
| ] | |
| for c in section.get("checks", []): | |
| out.append( | |
| f"| {c['check']} | {_fmt(c['value'])} | {'pass' if c['ok'] else 'fail'} |" | |
| ) | |
| if not section.get("checks"): | |
| out.append("| — | — | — |") | |
| out.append("") | |
| return out | |
| skill_section = gate_result.get("skill") or gate_result | |
| lines.extend(_gate_table(skill_section, prefix="Skill ")) | |
| if gate_result.get("general"): | |
| gen_profile = job.get("general_eval_profile") or GENERAL_EVAL_PROFILE | |
| lines.append(f"- general eval profile: `{gen_profile}`") | |
| lines.append("") | |
| lines.extend(_gate_table(gate_result["general"], prefix="General ")) | |
| lines.extend( | |
| [ | |
| "", | |
| "## lm-eval results", | |
| "", | |
| "| task | metric | baseline | candidate | delta |", | |
| "| --- | --- | ---: | ---: | ---: |", | |
| ] | |
| ) | |
| for task in sorted(set(cand_tasks) | set(base_tasks)): | |
| c = primary_metric(cand_tasks.get(task, {})) | |
| b = primary_metric(base_tasks.get(task, {})) | |
| metric_name = (c or b or (None, None))[0] or "—" | |
| c_val = c[1] if c else None | |
| b_val = b[1] if b else None | |
| delta = c_val - b_val if (c_val is not None and b_val is not None) else None | |
| sign = "+" if (delta is not None and delta >= 0) else "" | |
| delta_str = "—" if delta is None else f"{sign}{delta:.4f}" | |
| lines.append(f"| {task} | {metric_name} | {_fmt(b_val)} | {_fmt(c_val)} | {delta_str} |") | |
| if training_payload: | |
| lines.extend( | |
| [ | |
| "", | |
| "## Training", | |
| "", | |
| f"- dataset: `{training_payload.get('dataset')}`", | |
| f"- mode: `{training_payload.get('mode')}`", | |
| f"- samples: {training_payload.get('samples')}", | |
| f"- final train loss: {training_payload.get('metrics', {}).get('final_train_loss')}", | |
| f"- eval loss: {training_payload.get('metrics', {}).get('eval_loss')}", | |
| ] | |
| ) | |
| lines.extend( | |
| [ | |
| "", | |
| "## Load with PEFT", | |
| "", | |
| "```python", | |
| "from peft import PeftModel", | |
| "from transformers import AutoModelForCausalLM, AutoTokenizer", | |
| "", | |
| f'base = "{base_model}"', | |
| f'adapter = "{job.get("publish", {}).get("hub_repo", "<hub-repo>")}"', | |
| "", | |
| "tokenizer = AutoTokenizer.from_pretrained(base, trust_remote_code=True)", | |
| "model = AutoModelForCausalLM.from_pretrained(", | |
| ' base, torch_dtype="auto", device_map="auto", trust_remote_code=True', | |
| ")", | |
| "model = PeftModel.from_pretrained(model, adapter)", | |
| "```", | |
| "", | |
| ] | |
| ) | |
| return "\n".join(lines) + "\n" | |
| def publish_adapter_files( | |
| *, | |
| job: dict[str, Any], | |
| adapter_dir: str, | |
| gate_result: dict[str, Any], | |
| candidate_results_path: str, | |
| baseline_results_path: str | None, | |
| ) -> dict[str, Any]: | |
| """Write a model card and push the adapter to the Hub — only if the gate passed. | |
| Run inside a function with `finetune_vol` mounted and `hf_secret` set. | |
| """ | |
| publish_cfg = job.get("publish") | |
| if not publish_cfg: | |
| return {"published": False, "reason": "no publish config for this job"} | |
| if not gate_result.get("passed"): | |
| return {"published": False, "reason": "gate failed", "gate": gate_result} | |
| adapter_path = Path(adapter_dir) | |
| if not adapter_path.is_dir(): | |
| return {"published": False, "reason": f"adapter dir missing: {adapter_dir}"} | |
| candidate = {} | |
| cand_path = Path(candidate_results_path) | |
| if cand_path.is_file(): | |
| candidate = json.loads(cand_path.read_text()) | |
| baseline = None | |
| if baseline_results_path and Path(baseline_results_path).is_file(): | |
| baseline = json.loads(Path(baseline_results_path).read_text()) | |
| training_payload = None | |
| training_results_path = adapter_path / "training_results.json" | |
| if training_results_path.is_file(): | |
| training_payload = json.loads(training_results_path.read_text()) | |
| card = render_model_card( | |
| job=job, | |
| gate_result=gate_result, | |
| candidate=candidate, | |
| baseline=baseline, | |
| training_payload=training_payload, | |
| ) | |
| (adapter_path / "README.md").write_text(card) | |
| commit_volumes() | |
| from huggingface_hub import HfApi | |
| repo_ids = [publish_cfg["hub_repo"], *(publish_cfg.get("mirror_repos") or [])] | |
| private = publish_cfg.get("private", True) | |
| api = HfApi() | |
| uploads = [] | |
| for repo_id in dict.fromkeys(repo_ids): | |
| api.create_repo(repo_id=repo_id, repo_type="model", private=private, exist_ok=True) | |
| api.upload_folder( | |
| folder_path=str(adapter_path), | |
| repo_id=repo_id, | |
| repo_type="model", | |
| commit_message=f"Publish {job['name']} (gate passed: {gate_result.get('task')})", | |
| ) | |
| uploads.append({"repo_id": repo_id, "url": f"https://huggingface.co/{repo_id}"}) | |
| return { | |
| "published": True, | |
| "repo_id": uploads[0]["repo_id"], | |
| "url": uploads[0]["url"], | |
| "uploads": uploads, | |
| } | |