"""Shared Modal image, volumes, and command builders for finetune + server apps.""" from __future__ import annotations import json import os from pathlib import Path from typing import Any import modal import yaml _file = Path(__file__).resolve() try: LOCAL_REPO_ROOT = _file.parents[2] except IndexError: LOCAL_REPO_ROOT = Path("/repo") if (_file.parent / "experiments.yaml").is_file(): EXPERIMENTS_PATH = _file.parent / "experiments.yaml" else: EXPERIMENTS_PATH = Path("/repo/research/modal/experiments.yaml") _EVAL_PROFILES_REL = "research/evals/configs/eval_profiles.yaml" if (LOCAL_REPO_ROOT / _EVAL_PROFILES_REL).is_file(): EVAL_PROFILES_PATH = LOCAL_REPO_ROOT / _EVAL_PROFILES_REL else: EVAL_PROFILES_PATH = Path("/repo") / _EVAL_PROFILES_REL REPO_ROOT = LOCAL_REPO_ROOT HF_CACHE_PATH = "/root/.cache/huggingface" FINETUNE_VOL_PATH = "/vol/finetuned" LM_EVAL_OUTPUT = f"{FINETUNE_VOL_PATH}/results/lm_eval" BASE_MODEL_ID = "openbmb/MiniCPM5-1B" BASELINE_EXPERIMENT = "minicpm5-1b__modal-baseline" BASELINE_RESULTS_JSON = f"{LM_EVAL_OUTPUT}/{BASELINE_EXPERIMENT}/results.json" # Shared general-capability profile for publish gates (limit 100; see compare_study). GENERAL_EVAL_PROFILE = "compare_study" # Metric keys to prefer when picking a task's "primary" score, in priority # order. Covers lm-eval-harness multiple-choice (acc), generation (exact_match), # and code (pass@1) tasks so gates and model cards pick a real score, not a stderr. _METRIC_PRIORITY = ( "acc,none", "acc_norm,none", "exact_match,strict-match", "exact_match,flexible-extract", "pass_at_1,create_test", "pass_at_1,none", "f1,none", "bleu,none", ) hf_cache_vol = modal.Volume.from_name("hf-cache", create_if_missing=True) finetune_vol = modal.Volume.from_name("slm-finetune", create_if_missing=True) hf_secret = modal.Secret.from_name("huggingface") image = ( modal.Image.debian_slim(python_version="3.12") .apt_install("git", "build-essential") .pip_install("uv", "pyyaml", "huggingface_hub") .add_local_dir( str(REPO_ROOT), remote_path="/repo", copy=True, ignore=[ ".git/**", ".venv/**", "models/**", "results/**", "outputs/**", "**/__pycache__/**", "**/.pytest_cache/**", "**/node_modules/**", ], ) .run_commands( "cd /repo && uv sync --frozen --group finetune --group lm-eval --no-dev", # lm-eval's ifeval task (instructions profile) needs these, declared via # the lm-eval[ifeval] extra but not activated into the project venv by the # frozen group sync. Install the lock-pinned versions into /repo/.venv so # `uv run slm-lm-eval` can import them. "cd /repo && uv pip install langdetect==1.0.9 immutabledict==4.3.1", ) ) COMMON_ENV = { "TRUST_REMOTE_CODE": "true", "HF_HOME": HF_CACHE_PATH, # Keep hf-xet logs off the HF cache Volume mount so volume.reload() is not # blocked by open log file handles on warm containers. "HF_XET_LOG_DEST": "/tmp/xet-logs/", "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", } DEFAULT_GPU = "A10G" DEFAULT_KEEPALIVE_HOURS = 4.0 DEFAULT_SCALEDOWN_WINDOW = 3600 # max allowed by Modal (1h idle before scale-down) DEFAULT_WORKER_TIMEOUT = 14400 # 4h per method call def repo_env() -> dict[str, str]: return {**os.environ, **COMMON_ENV} def _reload_volume_safe(vol: modal.Volume, *, label: str) -> None: """Reload a Volume; skip (with warning) when open files block the operation.""" try: vol.reload() except (RuntimeError, modal.exception.ConflictError) as exc: if "open files preventing the operation" in str(exc): print(f"warning: skipping {label} volume reload ({exc})") return raise def reload_finetune_volume() -> None: finetune_vol.reload() def reload_volumes() -> None: reload_finetune_volume() _reload_volume_safe(hf_cache_vol, label="hf-cache") def commit_volumes() -> None: finetune_vol.commit() hf_cache_vol.commit() def load_experiments() -> dict[str, Any]: with EXPERIMENTS_PATH.open() as f: return yaml.safe_load(f) or {} def apply_defaults(job: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]: return {**defaults, **job} # Scalar hyperparameters an experiments.yaml job (or its nested `args:` block) # may set; each maps 1:1 onto a research/finetune.py flag so any run is tunable # from config without code changes. _FINETUNE_FLAGS: dict[str, str] = { "model": "--model", "lr": "--lr", "batch_size": "--batch_size", "grad_accum": "--grad_accum", "max_len": "--max_len", "warmup_ratio": "--warmup_ratio", "weight_decay": "--weight_decay", "max_grad_norm": "--max_grad_norm", "lr_scheduler": "--lr_scheduler", "logging_steps": "--logging_steps", "eval_steps": "--eval_steps", "save_steps": "--save_steps", "save_total_limit": "--save_total_limit", "early_stopping_patience": "--early_stopping_patience", "neftune_noise_alpha": "--neftune_noise_alpha", "report_to": "--report_to", "seed": "--seed", "lora_r": "--lora_r", "lora_alpha": "--lora_alpha", "lora_dropout": "--lora_dropout", "lora_targets": "--lora_targets", "val_split": "--val_split", "device": "--device", } def split_csv(value: str | None) -> list[str] | None: if not value: return None items = [item.strip() for item in value.split(",") if item.strip()] return items or None def parse_json_object(value: str | None, *, flag: str) -> dict[str, Any]: if not value: return {} try: parsed = json.loads(value) except json.JSONDecodeError as exc: raise SystemExit(f"{flag} must be a JSON object: {exc}") from exc if not isinstance(parsed, dict): raise SystemExit(f"{flag} must be a JSON object") return parsed def job_plan_rows(jobs: list[dict[str, Any]]) -> list[dict[str, Any]]: """Compact, printable description of selected jobs and their eval profile.""" rows = [] for job in jobs: rows.append( { "name": job.get("name"), "category": job.get("category"), "usecase": job.get("usecase") or job.get("use_case"), "profile": job.get("eval_profile", "compare_study"), "dataset": "mix" if job.get("mix") else job.get("dataset"), "mode": job.get("mode", "lora"), "max_steps": job.get("max_steps"), "max_samples": job.get("max_samples"), "publish": bool(job.get("publish")), } ) return rows def build_finetune_cmd(job: dict[str, Any], out_dir: str) -> list[str]: cmd = [ "uv", "run", "python", "research/finetune.py", "--preset", job.get("preset", "minicpm5-1b"), "--mode", job.get("mode", "lora"), "--out", out_dir, ] # Dataset: a `mix:` list (skill data + general replay) takes precedence over # a single --dataset/--format source. if job.get("mix"): cmd.extend(["--mix-json", json.dumps(job["mix"])]) else: cmd.extend(["--dataset", job["dataset"], "--format", job["format"]]) if job.get("dataset_config"): cmd.extend(["--dataset-config", job["dataset_config"]]) if job.get("dataset_split"): cmd.extend(["--dataset-split", str(job["dataset_split"])]) if job.get("max_samples") is not None: cmd.extend(["--dataset-max-samples", str(int(job["max_samples"]))]) # Optional column remap so a dataset's own columns fit the --format # (e.g. MetaMathQA query/response -> prompt format). for field, col in (job.get("columns") or {}).items(): cmd.extend([f"--{field}-key", str(col)]) if job.get("max_steps") is not None: cmd.extend(["--max_steps", str(int(job["max_steps"]))]) if job.get("epochs") is not None: cmd.extend(["--epochs", str(job["epochs"])]) if job.get("mask_prompt") is False: cmd.append("--no_mask_prompt") # Scalar hyperparameters: top-level keys plus an optional nested `args:` block. overrides = {k: job[k] for k in _FINETUNE_FLAGS if k in job} overrides.update(job.get("args") or {}) for key, value in overrides.items(): flag = _FINETUNE_FLAGS.get(key, f"--{key}") if isinstance(value, bool): if value: cmd.append(flag) else: cmd.extend([flag, str(value)]) return cmd def build_lm_eval_cmd( *, experiment_name: str, config: str, preset: str | None = None, model_path: str | None = None, adapter_path: str | None = None, compare_to: str | None = None, tasks: list[str] | None = None, limit: int | None = None, num_fewshot: int | None = None, batch_size: str | None = None, device: str | None = None, dtype: str | None = None, seed: int | None = None, ) -> list[str]: cmd = [ "uv", "run", "--package", "slm-evals", "slm-lm-eval", "--config", config, "--experiment-name", experiment_name, "--output-dir", LM_EVAL_OUTPUT, ] if preset: cmd.extend(["--preset", preset]) if model_path: cmd.extend(["--model", model_path]) if adapter_path: cmd.extend(["--adapter", adapter_path]) if compare_to: cmd.extend(["--compare-to", compare_to]) if tasks: cmd.append("--tasks") cmd.extend(tasks) if limit is not None: cmd.extend(["--limit", str(int(limit))]) if num_fewshot is not None: cmd.extend(["--num-fewshot", str(int(num_fewshot))]) if batch_size: cmd.extend(["--batch-size", str(batch_size)]) if device: cmd.extend(["--device", str(device)]) if dtype: cmd.extend(["--dtype", str(dtype)]) if seed is not None: cmd.extend(["--seed", str(int(seed))]) return cmd def _matches_job_filters( job: dict[str, Any], *, sector: str | None = None, usecase: str | None = None, profiles: list[str] | None = None, ) -> bool: if sector and job.get("sector", job.get("category")) != sector: return False if usecase: values = { job.get("usecase"), job.get("use_case"), job.get("category"), job.get("name"), } values.update(job.get("tags") or []) if usecase not in values: return False if profiles and job.get("eval_profile", "compare_study") not in profiles: return False return True def prepare_jobs( *, job: str | None = None, category: str | None = None, sector: str | None = None, usecase: str | None = None, profiles: list[str] | None = None, max_steps: int | None = None, max_samples: int | None = None, finetune_overrides: dict[str, Any] | None = None, ) -> tuple[dict[str, Any], list[dict[str, Any]]]: spec = load_experiments() defaults = spec.get("defaults", {}) jobs = spec.get("finetune", []) if job: jobs = [j for j in jobs if j.get("name") == job] if not jobs: raise SystemExit( f"Unknown job {job!r}; check research/modal/experiments.yaml" ) if category: jobs = [j for j in jobs if j.get("category") == category] if not jobs: raise SystemExit(f"No jobs with category {category!r}") if sector or usecase or profiles: jobs = [ j for j in jobs if _matches_job_filters( j, sector=sector, usecase=usecase, profiles=profiles, ) ] if not jobs: filters = { "sector": sector, "usecase": usecase, "profiles": profiles, } raise SystemExit(f"No jobs matched filters: {filters}") prepared: list[dict[str, Any]] = [] for raw in jobs: merged = apply_defaults(raw, defaults) if max_steps is not None: merged["max_steps"] = max_steps if max_samples is not None: merged["max_samples"] = max_samples if finetune_overrides: args = {**(merged.get("args") or {})} for key, value in finetune_overrides.items(): if key in _FINETUNE_FLAGS: args[key] = value else: merged[key] = value if args: merged["args"] = args prepared.append(merged) return defaults, prepared def job_gpu(job: dict[str, Any]) -> str: return job.get("gpu") or DEFAULT_GPU def job_needs_general_gate(job: dict[str, Any]) -> bool: """Publishable jobs run a second general eval and must pass `general_goals`.""" return bool(job.get("goals") and job.get("publish")) def general_eval_profile(defaults: dict[str, Any]) -> str: return defaults.get("general_eval_profile", GENERAL_EVAL_PROFILE) def general_goals_for_job( job: dict[str, Any], defaults: dict[str, Any] ) -> dict[str, Any] | None: if not job_needs_general_gate(job): return None goals = job.get("general_goals") or defaults.get("general_goals") return goals if goals else None def baseline_profiles_for_jobs( jobs: list[dict[str, Any]], defaults: dict[str, Any] ) -> list[str]: profiles = {j.get("eval_profile", "compare_study") for j in jobs} if any(job_needs_general_gate(j) for j in jobs): profiles.add(general_eval_profile(defaults)) return sorted(profiles) def baseline_experiment_name(preset: str, profile: str) -> str: """Volume path key for the unfine-tuned base model on a given eval profile.""" return f"{preset}__baseline__{profile}" def _load_models_registry() -> dict[str, Any]: path = REPO_ROOT / "models.yaml" if not path.is_file(): path = Path("/repo") / "models.yaml" if not path.is_file(): return {} with path.open() as f: return yaml.safe_load(f) or {} def resolve_base_model_id(job: dict[str, Any], defaults: dict[str, Any]) -> str: """Hub/path id of the base model this job fine-tunes — used as the eval baseline.""" explicit = job.get("model") or (job.get("args") or {}).get("model") if explicit: return str(explicit) preset = job.get("preset", defaults.get("preset", "minicpm5-1b")) entry = (_load_models_registry().get("models") or {}).get(preset) or {} return entry.get("model_id") or BASE_MODEL_ID def discover_cached_baselines( profile_names: list[str], *, preset: str, eval_tasks: list[str] | None = None, eval_limit: int | None = None, eval_num_fewshot: int | None = None, eval_seed: int | None = None, ) -> dict[str, bool]: """True per profile when base-model baseline results already exist on the Volume.""" cached: dict[str, bool] = {} for profile in profile_names: cached[profile] = baseline_is_cached( baseline_experiment_name(preset, profile), config_for_profile(profile), tasks=eval_tasks, limit=eval_limit, num_fewshot=eval_num_fewshot, seed=eval_seed, ) return cached def profiles_needing_baseline_run( profile_names: list[str], cached: dict[str, bool], *, skip_baseline: bool, ) -> list[str]: if skip_baseline: return [] return [profile for profile in profile_names if not cached.get(profile)] def eval_paths( *, job_name: str, preset: str, profile: str, ) -> tuple[str, str, str]: """Return (candidate_results_path, baseline_results_path, experiment_name).""" exp_name = f"{job_name}__{profile}" candidate = f"{LM_EVAL_OUTPUT}/{exp_name}/results.json" baseline = f"{LM_EVAL_OUTPUT}/{baseline_experiment_name(preset, profile)}/results.json" return candidate, baseline, exp_name def config_for_profile(profile: str) -> str: """Map an eval_profiles.yaml profile name to its config path (relative to repo root).""" with EVAL_PROFILES_PATH.open() as f: catalog = yaml.safe_load(f) or {} meta = (catalog.get("profiles") or {}).get(profile) if not meta or not meta.get("config"): known = ", ".join(sorted((catalog.get("profiles") or {}))) raise SystemExit( f"Unknown eval_profile {profile!r}; check {_EVAL_PROFILES_REL} (known: {known})" ) return f"research/evals/configs/{meta['config']}" def primary_metric(task_metrics: dict[str, Any]) -> tuple[str, float] | None: """Pick a task's headline (metric_name, score), matching slm_evals summary tables.""" for key in _METRIC_PRIORITY: if key in task_metrics and isinstance(task_metrics[key], (int, float)): return key, float(task_metrics[key]) for key, value in task_metrics.items(): if "stderr" in key: continue if isinstance(value, (int, float)): return key, float(value) return None def baseline_is_cached( experiment_name: str, config_path: str, *, tasks: list[str] | None = None, limit: int | None = None, num_fewshot: int | None = None, seed: int | None = None, ) -> bool: """True if a baseline results.json exists AND its run_meta still matches the profile config's tasks/limit/num_fewshot. Config changes (e.g. new guard tasks or a higher limit) therefore correctly force a fresh baseline.""" results = Path(LM_EVAL_OUTPUT) / experiment_name / "results.json" if not results.is_file(): return False candidates = [Path(config_path)] if not Path(config_path).is_absolute(): candidates += [REPO_ROOT / config_path, Path("/repo") / config_path] cfg_file = next((p for p in candidates if p.is_file()), None) if cfg_file is None: return False try: meta = json.loads(results.read_text()).get("run_meta", {}) cfg = yaml.safe_load(cfg_file.read_text()) or {} except Exception: return False expected_tasks = tasks or cfg.get("tasks") or [] expected_limit = limit if limit is not None else cfg.get("limit") expected_fewshot = ( num_fewshot if num_fewshot is not None else cfg.get("num_fewshot", 0) ) expected_seed = seed if seed is not None else cfg.get("seed") same = ( sorted(meta.get("tasks") or []) == sorted(expected_tasks) and meta.get("limit") == expected_limit and meta.get("num_fewshot") == expected_fewshot ) if expected_seed is not None: same = same and meta.get("seed") == expected_seed return same def evaluate_gate( *, candidate: dict[str, Any], baseline: dict[str, Any] | None, goals: dict[str, Any], ) -> dict[str, Any]: """Check a candidate's lm-eval results dict against `goals` (Hub publish gate). `goals` schema: task: min_score: # candidate score must be >= this min_improve: # candidate - baseline must be >= this guard_tasks: # optional regression guards - task: max_regress: # baseline - candidate must be <= this """ cand_tasks = candidate.get("results", {}) base_tasks = (baseline or {}).get("results", {}) def _score(tasks: dict[str, Any], task_name: str) -> float | None: metrics = tasks.get(task_name) if not metrics: return None picked = primary_metric(metrics) return picked[1] if picked else None checks: list[dict[str, Any]] = [] passed = True task = goals.get("task") cand_score = base_score = None if task: cand_score = _score(cand_tasks, task) base_score = _score(base_tasks, task) # Tolerance so a score landing exactly on a threshold (e.g. a clean +0.02 # improvement stored as 0.0199999996) is not rejected by float epsilon. eps = 1e-9 if goals.get("min_score") is not None: ok = cand_score is not None and cand_score >= goals["min_score"] - eps checks.append({"check": f"{task} >= {goals['min_score']}", "value": cand_score, "ok": ok}) passed = passed and ok if goals.get("min_improve") is not None: delta = ( cand_score - base_score if (cand_score is not None and base_score is not None) else None ) ok = delta is not None and delta >= goals["min_improve"] - eps checks.append( {"check": f"{task} improve >= {goals['min_improve']}", "value": delta, "ok": ok} ) passed = passed and ok for guard in goals.get("guard_tasks", []): g_task = guard["task"] g_cand = _score(cand_tasks, g_task) g_base = _score(base_tasks, g_task) regress = g_base - g_cand if (g_cand is not None and g_base is not None) else None ok = regress is not None and regress <= guard["max_regress"] + eps checks.append( {"check": f"{g_task} regress <= {guard['max_regress']}", "value": regress, "ok": ok} ) passed = passed and ok if not checks: passed = False checks.append({"check": "goals defined no checks", "value": None, "ok": False}) return { "passed": passed, "checks": checks, "task": task, "candidate_score": cand_score, "baseline_score": base_score, } def pull_artifacts(job_name: str, exp_name: str, dest: str = "models/finetuned") -> None: """Download an adapter and its lm-eval results from the `slm-finetune` Volume (run locally).""" import shutil import subprocess def _get(remote: str, parent: str) -> None: # For a folder REMOTE_PATH, `modal volume get` expects the *parent* # directory as the destination and recreates the folder inside it. # Passing the full target path (parent/) raises # "[Errno 21] Is a directory". Clear the target first for a clean pull. name = remote.rsplit("/", 1)[-1] shutil.rmtree(Path(parent) / name, ignore_errors=True) Path(parent).mkdir(parents=True, exist_ok=True) subprocess.run( ["modal", "volume", "get", "slm-finetune", remote, f"{parent}/", "--force"], check=False, ) print(f"--- pulling {job_name} -> {dest}/{job_name} ---") _get(job_name, dest) exp_dir = f"results/lm_eval/{exp_name}" print(f"--- pulling {exp_dir} ---") _get(exp_dir, "results/lm_eval") def check_gate_files( *, candidate_results_path: str, baseline_results_path: str | None, goals: dict[str, Any], ) -> dict[str, Any]: """Like evaluate_gate(), but reads results.json files (run inside a volume-mounted function).""" cand_path = Path(candidate_results_path) if not cand_path.is_file(): return {"passed": False, "checks": [], "reason": f"missing results file: {cand_path}"} candidate = json.loads(cand_path.read_text()) baseline = None if baseline_results_path and Path(baseline_results_path).is_file(): baseline = json.loads(Path(baseline_results_path).read_text()) return evaluate_gate(candidate=candidate, baseline=baseline, goals=goals) def check_publish_gate_files( *, skill_candidate_path: str, skill_baseline_path: str | None, skill_goals: dict[str, Any], general_candidate_path: str | None = None, general_baseline_path: str | None = None, general_goals: dict[str, Any] | None = None, ) -> dict[str, Any]: """Gate on skill-specific eval plus optional general-capability eval.""" skill_gate = check_gate_files( candidate_results_path=skill_candidate_path, baseline_results_path=skill_baseline_path, goals=skill_goals, ) general_gate: dict[str, Any] | None = None if general_goals: if not general_candidate_path: general_gate = { "passed": False, "checks": [ { "check": "general eval results missing", "value": None, "ok": False, } ], "reason": "general candidate results path not provided", } else: general_gate = check_gate_files( candidate_results_path=general_candidate_path, baseline_results_path=general_baseline_path, goals=general_goals, ) passed = skill_gate.get("passed") and ( general_gate is None or general_gate.get("passed") ) checks = list(skill_gate.get("checks", [])) if general_gate: for check in general_gate.get("checks", []): checks.append({**check, "check": f"general: {check['check']}"}) return { "passed": passed, "checks": checks, "skill": skill_gate, "general": general_gate, "task": skill_gate.get("task"), "candidate_score": skill_gate.get("candidate_score"), "baseline_score": skill_gate.get("baseline_score"), } def render_model_card( *, job: dict[str, Any], gate_result: dict[str, Any], candidate: dict[str, Any], baseline: dict[str, Any] | None, training_payload: dict[str, Any] | None, ) -> str: def _fmt(v: float | None) -> str: return "—" if v is None else f"{v:.4f}" cand_tasks = candidate.get("results", {}) base_tasks = (baseline or {}).get("results", {}) base_model = (training_payload or {}).get("model") or BASE_MODEL_ID # A job is either a single dataset (`dataset`/`format`) or a `mix:` of sources. if job.get("mix"): dataset_desc = " + ".join( f"`{s.get('dataset', '?')}`" for s in job["mix"] ) format_desc = "mix" else: dataset_desc = f"`{job.get('dataset', '?')}`" format_desc = job.get("format", "?") lines = [ "---", "library_name: peft", f"base_model: {base_model}", "license: apache-2.0", "tags:", " - lora", " - qlora", " - build-small-hackathon", " - well-tuned", f" - {job.get('category', 'general')}", "---", "", f"# {job['name']}", "", f"QLoRA adapter for **{job.get('category', 'general')}**, fine-tuned from " f"`{base_model}` on {dataset_desc} (format: `{format_desc}`).", "", "Trained, evaluated, and gated on [Modal](https://modal.com/docs/guide) via " "`research/modal/` (app `slm-finetune-benchmark`).", "", "## Benchmark gate", "", f"- skill eval profile: `{job.get('eval_profile')}`", f"- gate: {'**PASSED**' if gate_result.get('passed') else '**FAILED**'}", "", ] def _gate_table(section: dict[str, Any] | None, *, prefix: str = "") -> list[str]: if not section: return [] out = [ f"### {prefix}checks".strip(), "", "| check | value | result |", "| --- | ---: | --- |", ] for c in section.get("checks", []): out.append( f"| {c['check']} | {_fmt(c['value'])} | {'pass' if c['ok'] else 'fail'} |" ) if not section.get("checks"): out.append("| — | — | — |") out.append("") return out skill_section = gate_result.get("skill") or gate_result lines.extend(_gate_table(skill_section, prefix="Skill ")) if gate_result.get("general"): gen_profile = job.get("general_eval_profile") or GENERAL_EVAL_PROFILE lines.append(f"- general eval profile: `{gen_profile}`") lines.append("") lines.extend(_gate_table(gate_result["general"], prefix="General ")) lines.extend( [ "", "## lm-eval results", "", "| task | metric | baseline | candidate | delta |", "| --- | --- | ---: | ---: | ---: |", ] ) for task in sorted(set(cand_tasks) | set(base_tasks)): c = primary_metric(cand_tasks.get(task, {})) b = primary_metric(base_tasks.get(task, {})) metric_name = (c or b or (None, None))[0] or "—" c_val = c[1] if c else None b_val = b[1] if b else None delta = c_val - b_val if (c_val is not None and b_val is not None) else None sign = "+" if (delta is not None and delta >= 0) else "" delta_str = "—" if delta is None else f"{sign}{delta:.4f}" lines.append(f"| {task} | {metric_name} | {_fmt(b_val)} | {_fmt(c_val)} | {delta_str} |") if training_payload: lines.extend( [ "", "## Training", "", f"- dataset: `{training_payload.get('dataset')}`", f"- mode: `{training_payload.get('mode')}`", f"- samples: {training_payload.get('samples')}", f"- final train loss: {training_payload.get('metrics', {}).get('final_train_loss')}", f"- eval loss: {training_payload.get('metrics', {}).get('eval_loss')}", ] ) lines.extend( [ "", "## Load with PEFT", "", "```python", "from peft import PeftModel", "from transformers import AutoModelForCausalLM, AutoTokenizer", "", f'base = "{base_model}"', f'adapter = "{job.get("publish", {}).get("hub_repo", "")}"', "", "tokenizer = AutoTokenizer.from_pretrained(base, trust_remote_code=True)", "model = AutoModelForCausalLM.from_pretrained(", ' base, torch_dtype="auto", device_map="auto", trust_remote_code=True', ")", "model = PeftModel.from_pretrained(model, adapter)", "```", "", ] ) return "\n".join(lines) + "\n" def publish_adapter_files( *, job: dict[str, Any], adapter_dir: str, gate_result: dict[str, Any], candidate_results_path: str, baseline_results_path: str | None, ) -> dict[str, Any]: """Write a model card and push the adapter to the Hub — only if the gate passed. Run inside a function with `finetune_vol` mounted and `hf_secret` set. """ publish_cfg = job.get("publish") if not publish_cfg: return {"published": False, "reason": "no publish config for this job"} if not gate_result.get("passed"): return {"published": False, "reason": "gate failed", "gate": gate_result} adapter_path = Path(adapter_dir) if not adapter_path.is_dir(): return {"published": False, "reason": f"adapter dir missing: {adapter_dir}"} candidate = {} cand_path = Path(candidate_results_path) if cand_path.is_file(): candidate = json.loads(cand_path.read_text()) baseline = None if baseline_results_path and Path(baseline_results_path).is_file(): baseline = json.loads(Path(baseline_results_path).read_text()) training_payload = None training_results_path = adapter_path / "training_results.json" if training_results_path.is_file(): training_payload = json.loads(training_results_path.read_text()) card = render_model_card( job=job, gate_result=gate_result, candidate=candidate, baseline=baseline, training_payload=training_payload, ) (adapter_path / "README.md").write_text(card) commit_volumes() from huggingface_hub import HfApi repo_ids = [publish_cfg["hub_repo"], *(publish_cfg.get("mirror_repos") or [])] private = publish_cfg.get("private", True) api = HfApi() uploads = [] for repo_id in dict.fromkeys(repo_ids): api.create_repo(repo_id=repo_id, repo_type="model", private=private, exist_ok=True) api.upload_folder( folder_path=str(adapter_path), repo_id=repo_id, repo_type="model", commit_message=f"Publish {job['name']} (gate passed: {gate_result.get('task')})", ) uploads.append({"repo_id": repo_id, "url": f"https://huggingface.co/{repo_id}"}) return { "published": True, "repo_id": uploads[0]["repo_id"], "url": uploads[0]["url"], "uploads": uploads, }