Buckets:
| """Trial runner: process lifecycle and metrics monitoring. | |
| The runner manages GPU-isolated training processes, polls their metrics files, | |
| and detects failures. State is persisted to JSON so the runner can recover | |
| after MCP server restarts while training processes continue running. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import signal | |
| import subprocess | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| from pawn.lab.monitor import check_health, is_alive, read_metrics | |
| from pawn.lab.state import Trial, _format_duration, _now_iso | |
| log = logging.getLogger("pawn.lab") | |
| def _validate_config(config: dict[str, Any]) -> dict[str, Any]: | |
| """Validate a config dict against RunConfig and return the normalized dict. | |
| Raises ``pydantic.ValidationError`` on bad input. | |
| """ | |
| from pydantic import TypeAdapter | |
| from pawn.run_config import AdapterConfig, PretrainConfig | |
| run_type = config.get("run_type") | |
| if run_type not in ("pretrain", "adapter"): | |
| raise ValueError(f"run_type must be 'pretrain' or 'adapter', got {run_type!r}") | |
| ta = TypeAdapter( | |
| PretrainConfig if run_type == "pretrain" else AdapterConfig | |
| ) | |
| return ta.validate_python(config).model_dump() | |
| class TrialRunner: | |
| """Manages GPU-isolated training processes.""" | |
| def __init__( | |
| self, | |
| workspace: str | None = None, | |
| code_dir: str | None = None, | |
| python: str = "python3", | |
| ): | |
| ws = workspace or os.environ.get("PAWN_WORKSPACE") | |
| if ws is None: | |
| # On pods: /workspace. Locally: runs/ under the repo root. | |
| ws = "/workspace" if Path("/workspace").exists() else str( | |
| Path(__file__).resolve().parents[2] / "runs" | |
| ) | |
| self.workspace = Path(ws) | |
| self.code_dir = Path( | |
| code_dir | |
| or os.environ.get("PAWN_CODE_DIR") | |
| or str(Path(__file__).resolve().parents[2]) | |
| ) | |
| self.python = python | |
| self.log_dir = self.workspace / "logs" | |
| self.results_dir = self.workspace / "sweep_results" | |
| self.state_path = self.workspace / "lab_state.json" | |
| self.events_path = self.workspace / "lab_events.jsonl" | |
| self.progress_log_path = self.workspace / "pod_manager.md" | |
| # State | |
| self.trials: dict[int, Trial] = {} | |
| self.next_trial_id: int = 0 | |
| self.gpu_count: int = 0 | |
| self.gpu_names: list[str] = [] | |
| self.gpu_vram_mb: list[int] = [] | |
| self.gpu_assignments: dict[int, int | None] = {} | |
| # Events | |
| self.events: list[dict[str, Any]] = [] | |
| self.event_seq: int = 0 | |
| self.start_time: float = time.time() | |
| self.cost_per_hour: float | None = None | |
| # Async | |
| self._monitor_tasks: dict[int, asyncio.Task[None]] = {} | |
| self._metrics_offsets: dict[int, int] = {} | |
| self._ensure_dirs() | |
| self._gpus_discovered = False | |
| self._mps_active: bool | None = None | |
| # ======================================================================= | |
| # Setup | |
| # ======================================================================= | |
| def _ensure_dirs(self) -> None: | |
| self.log_dir.mkdir(parents=True, exist_ok=True) | |
| self.results_dir.mkdir(parents=True, exist_ok=True) | |
| def _discover_gpus(self) -> None: | |
| """Detect GPUs via a subprocess to avoid loading torch into this process. | |
| Torch's ROCm/HIP runtime spawns background threads that busy-spin, | |
| burning ~30% CPU permanently. Running discovery in a subprocess | |
| keeps the MCP server process clean. | |
| """ | |
| if self._gpus_discovered: | |
| return | |
| self._gpus_discovered = True | |
| try: | |
| out = subprocess.check_output( | |
| [ | |
| self.python.split()[0], "-c", | |
| "import json, torch; " | |
| "gpus = [{" | |
| "'name': torch.cuda.get_device_name(i), " | |
| "'vram_mb': torch.cuda.get_device_properties(i).total_memory // (1024*1024)" | |
| "} for i in range(torch.cuda.device_count())]; " | |
| "print(json.dumps(gpus))", | |
| ], | |
| text=True, timeout=30, | |
| ) | |
| gpus = json.loads(out.strip()) | |
| self.gpu_count = len(gpus) | |
| for i, g in enumerate(gpus): | |
| self.gpu_names.append(g["name"]) | |
| self.gpu_vram_mb.append(g["vram_mb"]) | |
| self.gpu_assignments.setdefault(i, None) | |
| log.info("Found %d GPUs: %s", self.gpu_count, self.gpu_names) | |
| except Exception as e: | |
| log.warning("GPU discovery failed: %s", e) | |
| self.gpu_count = 0 | |
| # ======================================================================= | |
| # State persistence | |
| # ======================================================================= | |
| def _save_state(self) -> None: | |
| state = { | |
| "next_trial_id": self.next_trial_id, | |
| "trials": {str(k): v.to_dict() for k, v in self.trials.items()}, | |
| "event_seq": self.event_seq, | |
| "start_time": self.start_time, | |
| "cost_per_hour": self.cost_per_hour, | |
| } | |
| tmp = self.state_path.with_suffix(".tmp") | |
| tmp.write_text(json.dumps(state, indent=2, default=str)) | |
| tmp.rename(self.state_path) | |
| def _load_state(self) -> None: | |
| if not self.state_path.exists(): | |
| return | |
| try: | |
| state = json.loads(self.state_path.read_text()) | |
| self.next_trial_id = state.get("next_trial_id", 0) | |
| for k, v in state.get("trials", {}).items(): | |
| self.trials[int(k)] = Trial.from_dict(v) | |
| self.event_seq = state.get("event_seq", 0) | |
| self.start_time = state.get("start_time", self.start_time) | |
| self.cost_per_hour = state.get("cost_per_hour") | |
| log.info("Loaded state: %d trials", len(self.trials)) | |
| except Exception as e: | |
| log.error("Failed to load state: %s", e) | |
| # ======================================================================= | |
| # GPU management | |
| # ======================================================================= | |
| def _is_mps_active(self) -> bool: | |
| """Detect if CUDA MPS daemon is running.""" | |
| if self._mps_active is None: | |
| try: | |
| out = subprocess.check_output( | |
| ["pgrep", "-f", "nvidia-cuda-mps"], | |
| text=True, timeout=5, | |
| ) | |
| self._mps_active = bool(out.strip()) | |
| except Exception: | |
| self._mps_active = False | |
| if self._mps_active: | |
| log.info("CUDA MPS detected — GPU isolation disabled") | |
| return self._mps_active | |
| def _find_free_gpu(self) -> int | None: | |
| self._discover_gpus() | |
| if self._is_mps_active(): | |
| return 0 if self.gpu_count > 0 else None | |
| for gpu_id, trial_id in self.gpu_assignments.items(): | |
| if trial_id is None: | |
| return gpu_id | |
| return None | |
| def _assign_gpu(self, trial_id: int, gpu_id: int) -> None: | |
| if not self._is_mps_active(): | |
| self.gpu_assignments[gpu_id] = trial_id | |
| def _release_gpu(self, gpu_id: int) -> None: | |
| if not self._is_mps_active(): | |
| self.gpu_assignments[gpu_id] = None | |
| def gpu_utilization(self) -> list[dict[str, Any]]: | |
| """Return GPU info without importing torch into this process.""" | |
| self._discover_gpus() | |
| return [ | |
| { | |
| "gpu": i, | |
| "total_mb": self.gpu_vram_mb[i] if i < len(self.gpu_vram_mb) else 0, | |
| "assigned_trial": self.gpu_assignments.get(i), | |
| "mps": self._is_mps_active(), | |
| } | |
| for i in range(self.gpu_count) | |
| ] | |
| # ======================================================================= | |
| # Trial lifecycle | |
| # ======================================================================= | |
| async def launch( | |
| self, | |
| config: dict[str, Any], | |
| *, | |
| # Legacy compat: if strategy/params/base_args are passed, merge | |
| # them into a config dict automatically. | |
| strategy: str | None = None, | |
| params: dict[str, Any] | None = None, | |
| base_args: dict[str, Any] | None = None, | |
| ) -> int: | |
| """Launch a single trial. Returns trial_id. | |
| ``config`` is a dict matching ``RunConfig`` (either | |
| ``PretrainConfig`` or ``AdapterConfig``). It is validated and | |
| written to a JSON file, then passed to ``scripts/train.py | |
| --config``. | |
| """ | |
| # Legacy shim: build a config dict from strategy/params/base_args | |
| if strategy is not None: | |
| merged: dict[str, Any] = {"run_type": "adapter", "strategy": strategy} | |
| merged.update(base_args or {}) | |
| merged.update(params or {}) | |
| merged.update(config or {}) | |
| config = merged | |
| gpu_id = self._find_free_gpu() | |
| if gpu_id is None: | |
| raise RuntimeError(f"No free GPU (all {self.gpu_count} assigned)") | |
| trial_id = self.next_trial_id | |
| self.next_trial_id += 1 | |
| # Apply defaults | |
| trial_log_dir = str(self.log_dir / f"trial_{trial_id:04d}") | |
| config.setdefault("log_dir", trial_log_dir) | |
| config.setdefault("local_checkpoints", True) | |
| # Validate via Pydantic | |
| validated = _validate_config(config) | |
| cmd = self._build_command(validated, trial_id) | |
| strategy_display = validated.get("strategy") or validated.get("variant", "pretrain") | |
| trial = Trial( | |
| trial_id=trial_id, | |
| strategy=strategy_display, | |
| config=validated, | |
| params=validated, | |
| cli_command=cmd, | |
| gpu_id=gpu_id, | |
| log_path=str(self.results_dir / f"trial_{trial_id:04d}.log"), | |
| total_steps=validated.get("total_steps", 0) or 0, | |
| ) | |
| self.trials[trial_id] = trial | |
| self._assign_gpu(trial_id, gpu_id) | |
| await self._spawn(trial) | |
| self._emit("trial_started", trial_id, { | |
| "strategy": strategy_display, "gpu": gpu_id, | |
| "config": validated, | |
| }) | |
| self._save_state() | |
| self.render_progress_log() | |
| return trial_id | |
| async def resume_trial( | |
| self, | |
| trial_id: int, | |
| total_steps: int | None = None, | |
| pause_after_steps: int | None = None, | |
| ) -> int: | |
| """Resume a completed/failed trial from its best checkpoint.""" | |
| old = self.trials.get(trial_id) | |
| if not old: | |
| raise RuntimeError(f"Trial {trial_id} not found") | |
| if not old.run_dir: | |
| raise RuntimeError(f"Trial {trial_id} has no run directory") | |
| ckpt_base = Path(old.run_dir) / "checkpoints" | |
| ckpt_dir = ckpt_base / "best" | |
| if not ckpt_dir.exists(): | |
| ckpt_dir = ckpt_base / "final" | |
| if not ckpt_dir.exists(): | |
| # Pretraining uses step_XXXXXXXX naming — pick the highest step | |
| step_dirs = sorted(ckpt_base.glob("step_*")) | |
| if step_dirs: | |
| ckpt_dir = step_dirs[-1] | |
| if not ckpt_dir.exists(): | |
| raise RuntimeError(f"No checkpoint found for trial {trial_id}") | |
| new_config = dict(old.config) | |
| new_config.pop("pause_after_steps", None) | |
| new_config["resume"] = str(ckpt_dir) | |
| if total_steps is not None: | |
| new_config["total_steps"] = total_steps | |
| if pause_after_steps is not None: | |
| new_config["pause_after_steps"] = pause_after_steps | |
| return await self.launch(new_config) | |
| def _build_command( | |
| self, config: dict[str, Any], trial_id: int, | |
| ) -> list[str]: | |
| script = str(self.code_dir / "scripts" / "train.py") | |
| config_dir = self.log_dir / f"trial_{trial_id:04d}" | |
| config_dir.mkdir(parents=True, exist_ok=True) | |
| config_path = config_dir / "run_config.json" | |
| config_path.write_text(json.dumps(config, indent=2, default=str)) | |
| cmd = [*self.python.split(), script, "--config", str(config_path)] | |
| return cmd | |
| async def _spawn(self, trial: Trial) -> None: | |
| """Start the training process, with GPU isolation unless MPS is active.""" | |
| env = os.environ.copy() | |
| if trial.gpu_id is not None and not self._is_mps_active(): | |
| env["CUDA_VISIBLE_DEVICES"] = str(trial.gpu_id) | |
| Path(trial.log_path).parent.mkdir(parents=True, exist_ok=True) | |
| log_fd = open(trial.log_path, "w") | |
| try: | |
| proc = subprocess.Popen( | |
| trial.cli_command, | |
| stdout=log_fd, | |
| stderr=subprocess.STDOUT, | |
| env=env, | |
| cwd=str(self.code_dir), | |
| ) | |
| finally: | |
| log_fd.close() | |
| trial.pid = proc.pid | |
| trial.status = "running" | |
| trial.start_time = time.time() | |
| log.info("Spawned trial %d (PID %d) on GPU %d: %s", | |
| trial.trial_id, proc.pid, trial.gpu_id, trial.strategy) | |
| self._monitor_tasks[trial.trial_id] = asyncio.create_task( | |
| self._monitor(trial.trial_id) | |
| ) | |
| async def _monitor(self, trial_id: int) -> None: | |
| """Poll a running trial: check process + read metrics.""" | |
| trial = self.trials[trial_id] | |
| exit_code: int | None = None | |
| try: | |
| while trial.status == "running": | |
| await asyncio.sleep(5.0) | |
| if trial.pid: | |
| alive, code = is_alive(trial.pid) | |
| if not alive: | |
| exit_code = code | |
| break | |
| read_metrics(trial, self.log_dir, self._metrics_offsets) | |
| issue = check_health(trial) | |
| if issue: | |
| log.warning("Trial %d health issue: %s", trial_id, issue) | |
| self._emit("health_warning", trial_id, {"issue": issue}) | |
| # Process exited — final metrics read | |
| read_metrics(trial, self.log_dir, self._metrics_offsets) | |
| if trial.status == "killed": | |
| # Wait for the process to actually exit before releasing GPU. | |
| # kill() sends SIGTERM but graceful shutdown (checkpoint save) | |
| # can take 30-60s. The while loop above exits immediately when | |
| # status changes to "killed", so we poll here. | |
| if trial.pid: | |
| while True: | |
| alive, _ = is_alive(trial.pid) | |
| if not alive: | |
| break | |
| await asyncio.sleep(1.0) | |
| if trial.gpu_id is not None: | |
| self._release_gpu(trial.gpu_id) | |
| read_metrics(trial, self.log_dir, self._metrics_offsets) | |
| self._save_state() | |
| elif exit_code == 0 or trial.best_val_loss is not None: | |
| self._complete(trial_id) | |
| else: | |
| reason = f"exit code {exit_code}" if exit_code is not None else "process exited" | |
| self._fail(trial_id, reason) | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as e: | |
| log.error("Monitor error for trial %d: %s", trial_id, e, exc_info=True) | |
| self._fail(trial_id, str(e)) | |
| def _complete(self, trial_id: int) -> None: | |
| trial = self.trials[trial_id] | |
| trial.status = "completed" | |
| trial.end_time = time.time() | |
| if trial.gpu_id is not None: | |
| self._release_gpu(trial.gpu_id) | |
| log.info("Trial %d completed: val_loss=%s acc=%s", | |
| trial_id, trial.best_val_loss, trial.best_accuracy) | |
| self._emit("trial_completed", trial_id, { | |
| "best_val_loss": trial.best_val_loss, | |
| "best_accuracy": trial.best_accuracy, | |
| "param_count": trial.actual_param_count, | |
| "steps": trial.current_step, | |
| }) | |
| if all(v is None for v in self.gpu_assignments.values()): | |
| self._emit("gpu_idle", data={"message": "All GPUs are idle"}) | |
| self._save_state() | |
| self.render_progress_log() | |
| def _fail(self, trial_id: int, reason: str) -> None: | |
| trial = self.trials[trial_id] | |
| trial.status = "failed" | |
| trial.end_time = time.time() | |
| if trial.gpu_id is not None: | |
| self._release_gpu(trial.gpu_id) | |
| log.warning("Trial %d failed: %s", trial_id, reason) | |
| self._emit("trial_failed", trial_id, {"reason": reason}) | |
| self._save_state() | |
| self.render_progress_log() | |
| async def kill(self, trial_id: int) -> dict[str, Any]: | |
| """Kill a running trial via SIGTERM. | |
| Sets status to 'killed' and sends SIGTERM, but lets the monitor | |
| task detect the actual process exit and release the GPU. This | |
| avoids a window where the GPU appears free while the process is | |
| still doing graceful shutdown (checkpoint saving). | |
| """ | |
| trial = self.trials.get(trial_id) | |
| if not trial: | |
| return {"error": f"Trial {trial_id} not found"} | |
| if trial.status != "running": | |
| return {"error": f"Trial {trial_id} is {trial.status}, not running"} | |
| if trial.pid: | |
| try: | |
| os.kill(trial.pid, signal.SIGTERM) | |
| except ProcessLookupError: | |
| pass | |
| trial.status = "killed" | |
| trial.end_time = time.time() | |
| self._emit("trial_killed", trial_id) | |
| self._save_state() | |
| self.render_progress_log() | |
| return {"killed": trial_id} | |
| # ======================================================================= | |
| # Events | |
| # ======================================================================= | |
| def _emit( | |
| self, | |
| event_type: str, | |
| trial_id: int | None = None, | |
| data: dict[str, Any] | None = None, | |
| ) -> None: | |
| self.event_seq += 1 | |
| event = { | |
| "seq": self.event_seq, | |
| "type": event_type, | |
| "trial_id": trial_id, | |
| "timestamp": _now_iso(), | |
| "data": data or {}, | |
| } | |
| self.events.append(event) | |
| try: | |
| with open(self.events_path, "a") as f: | |
| f.write(json.dumps(event, default=str) + "\n") | |
| except OSError as e: | |
| log.error("Failed to write event: %s", e) | |
| def events_since(self, seq: int = 0) -> list[dict[str, Any]]: | |
| return [e for e in self.events if e["seq"] > seq] | |
| # ======================================================================= | |
| # Reporting | |
| # ======================================================================= | |
| def status(self) -> dict[str, Any]: | |
| running = [] | |
| for t in self.trials.values(): | |
| if t.status == "running": | |
| cfg = t.config or t.params | |
| running.append({ | |
| "trial": t.trial_id, "strategy": t.strategy, | |
| "step": t.current_step, "total": t.total_steps, | |
| "sps": round(t.steps_per_sec, 2), | |
| "eta": _format_duration(t.eta_seconds()), | |
| "val_loss": t.best_val_loss, "acc": t.best_accuracy, | |
| "params": t.actual_param_count, "pid": t.pid, "gpu": t.gpu_id, | |
| "key_hp": {k: v for k, v in cfg.items() | |
| if k in ("lr", "lora_rank", "bottleneck_dim", | |
| "density", "d_model", "n_layers", "batch_size")}, | |
| }) | |
| elapsed = time.time() - self.start_time | |
| cost = (self.cost_per_hour * elapsed / 3600) if self.cost_per_hour else None | |
| return { | |
| "gpus": self.gpu_utilization(), | |
| "gpu_count": self.gpu_count, | |
| "gpu_names": self.gpu_names, | |
| "running": running, | |
| "total_trials": len(self.trials), | |
| "completed": sum(1 for t in self.trials.values() if t.status == "completed"), | |
| "failed": sum(1 for t in self.trials.values() if t.status == "failed"), | |
| "elapsed": _format_duration(elapsed), | |
| "cost_per_hour": self.cost_per_hour, | |
| "estimated_cost": round(cost, 2) if cost else None, | |
| } | |
| def results(self, strategy: str | None = None) -> dict[str, Any]: | |
| rows = [] | |
| for t in sorted(self.trials.values(), key=lambda t: t.trial_id): | |
| elapsed = (t.end_time - t.start_time) if t.end_time and t.start_time else None | |
| cfg = t.config or t.params | |
| rows.append({ | |
| "trial": t.trial_id, "strategy": t.strategy, | |
| "params": t.actual_param_count, "steps": t.current_step, | |
| "val_loss": t.best_val_loss, "accuracy": t.best_accuracy, | |
| "status": t.status, "notes": t.notes, | |
| "wall_time": _format_duration(elapsed), | |
| "key_hp": {k: v for k, v in cfg.items() | |
| if k in ("lr", "lora_rank", "bottleneck_dim", "density", | |
| "d_model", "n_layers", "batch_size")}, | |
| }) | |
| # Pareto front: trials not dominated on (param_count, val_loss). | |
| # A trial is dominated if another trial has both fewer (or equal) | |
| # params AND lower (or equal) val_loss, with at least one strict. | |
| completed = [r for r in rows if r["status"] == "completed" | |
| and r["val_loss"] is not None and r["params"] is not None] | |
| pareto: list[dict[str, Any]] = [] | |
| for r in completed: | |
| dominated = False | |
| for other in completed: | |
| if other is r: | |
| continue | |
| if (other["params"] <= r["params"] | |
| and other["val_loss"] <= r["val_loss"] | |
| and (other["params"] < r["params"] | |
| or other["val_loss"] < r["val_loss"])): | |
| dominated = True | |
| break | |
| if not dominated: | |
| pareto.append(r) | |
| pareto.sort(key=lambda r: r["params"]) | |
| # Infer strategy for suggestions from completed trials if not provided | |
| suggest_strategy = strategy | |
| if suggest_strategy is None and completed: | |
| strategies = {r["strategy"] for r in completed} | |
| if len(strategies) == 1: | |
| suggest_strategy = strategies.pop() | |
| result: dict[str, Any] = {"trials": rows, "pareto_front": pareto} | |
| if suggest_strategy: | |
| result["suggestions"] = self._suggest(suggest_strategy, completed) | |
| else: | |
| result["suggestions"] = [] | |
| return result | |
| def _suggest(self, strategy: str, completed: list[dict[str, Any]], n: int = 3) -> list[dict[str, Any]]: | |
| """Create an ephemeral Optuna study, seed it, and return N suggestions.""" | |
| try: | |
| import optuna | |
| from pawn.lab.sweep import builtin_distributions | |
| optuna.logging.set_verbosity(optuna.logging.WARNING) | |
| dists = builtin_distributions(strategy) | |
| study = optuna.create_study(study_name="suggest", direction="minimize") | |
| seeded = 0 | |
| for r in completed: | |
| hp = r.get("key_hp", {}) | |
| trial_dists = {k: v for k, v in dists.items() if k in hp} | |
| trial_params = {k: v for k, v in hp.items() if k in dists} | |
| if not trial_dists: | |
| continue | |
| try: | |
| frozen = optuna.trial.create_trial( | |
| params=trial_params, distributions=trial_dists, | |
| values=[r["val_loss"]], state=optuna.trial.TrialState.COMPLETE, | |
| ) | |
| study.add_trial(frozen) | |
| seeded += 1 | |
| except Exception: | |
| pass | |
| suggestions = [] | |
| for _ in range(n): | |
| trial = study.ask(dists) | |
| suggestions.append(trial.params) | |
| return suggestions | |
| except Exception as e: | |
| log.debug("Suggestion failed: %s", e) | |
| return [] | |
| def trial_log(self, trial_id: int, lines: int = 50) -> dict[str, Any]: | |
| """Return the last N lines of a trial's stdout log.""" | |
| trial = self.trials.get(trial_id) | |
| if not trial: | |
| return {"error": f"Trial {trial_id} not found"} | |
| log_path = Path(trial.log_path) | |
| if not log_path.exists(): | |
| return {"error": f"Log file not found: {trial.log_path}"} | |
| all_lines = log_path.read_text().splitlines() | |
| return {"trial": trial_id, "lines": all_lines[-lines:]} | |
| def add_notes(self, trial_id: int, notes: str) -> dict[str, Any]: | |
| trial = self.trials.get(trial_id) | |
| if not trial: | |
| return {"error": f"Trial {trial_id} not found"} | |
| trial.notes = notes | |
| self._save_state() | |
| return {"ok": True} | |
| # ======================================================================= | |
| # Progress log | |
| # ======================================================================= | |
| def render_progress_log(self) -> str: | |
| """Render pod_manager.md from current state.""" | |
| lines: list[str] = ["# Pod Manager Log\n"] | |
| lines.append("## Environment") | |
| lines.append(f"- GPUs: {self.gpu_count}x {self.gpu_names[0] if self.gpu_names else '?'}, " | |
| f"{self.gpu_vram_mb[0] if self.gpu_vram_mb else '?'} MB each") | |
| lines.append(f"- Persistent storage: {self.workspace}") | |
| lines.append("") | |
| elapsed = time.time() - self.start_time | |
| lines.append("## Current Status") | |
| lines.append(f"- Uptime: {_format_duration(elapsed)}") | |
| if self.cost_per_hour: | |
| cost = self.cost_per_hour * elapsed / 3600 | |
| lines.append(f"- Cost: ${self.cost_per_hour}/hr, ~${cost:.2f} so far") | |
| lines.append("") | |
| running = [t for t in self.trials.values() if t.status == "running"] | |
| if running: | |
| lines.append("## Active Processes") | |
| lines.append("| PID | GPU | Trial | Strategy | Step | Total | Step/s | ETA |") | |
| lines.append("|-----|-----|-------|----------|------|-------|--------|-----|") | |
| for t in running: | |
| eta = _format_duration(t.eta_seconds()) | |
| lines.append( | |
| f"| {t.pid} | {t.gpu_id} | {t.trial_id} | {t.strategy} " | |
| f"| {t.current_step} | {t.total_steps} " | |
| f"| {t.steps_per_sec:.1f} | {eta} |" | |
| ) | |
| lines.append("") | |
| completed = [t for t in self.trials.values() | |
| if t.status in ("completed", "failed", "killed")] | |
| if completed: | |
| lines.append("## Results") | |
| lines.append("| Trial | Strategy | Params | val_loss | Acc | Status | Notes |") | |
| lines.append("|-------|----------|--------|----------|-----|--------|-------|") | |
| for t in sorted(completed, key=lambda t: t.trial_id): | |
| vl = f"{t.best_val_loss:.4f}" if t.best_val_loss else "---" | |
| acc = f"{t.best_accuracy:.1%}" if t.best_accuracy else "---" | |
| pc = f"{t.actual_param_count:,}" if t.actual_param_count else "?" | |
| lines.append( | |
| f"| {t.trial_id} | {t.strategy} | {pc} | {vl} " | |
| f"| {acc} | {t.status} | {t.notes} |" | |
| ) | |
| lines.append("") | |
| recent = self.events[-10:] | |
| if recent: | |
| lines.append("## Recent Events") | |
| for e in recent: | |
| tid = e.get("trial_id") | |
| tid_str = f" (trial {tid})" if tid is not None else "" | |
| data_str = json.dumps(e.get("data", {}), default=str) | |
| lines.append(f"- [{e['timestamp']}] {e['type']}{tid_str} {data_str}") | |
| lines.append("") | |
| content = "\n".join(lines) | |
| try: | |
| self.progress_log_path.write_text(content) | |
| except OSError as e: | |
| log.error("Failed to write progress log: %s", e) | |
| return content | |
| # ======================================================================= | |
| # Recovery | |
| # ======================================================================= | |
| async def recover(self) -> None: | |
| """Re-attach to running processes from persisted state.""" | |
| self._load_state() | |
| if self.events_path.exists(): | |
| self.events = [] | |
| for line in self.events_path.read_text().splitlines(): | |
| try: | |
| self.events.append(json.loads(line)) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| if self.events: | |
| self.event_seq = max(e.get("seq", 0) for e in self.events) | |
| for trial_id, trial in self.trials.items(): | |
| if trial.status != "running": | |
| continue | |
| if trial.pid: | |
| alive, _ = is_alive(trial.pid) | |
| else: | |
| alive = False | |
| if alive: | |
| log.info("Recovering trial %d (PID %d)", trial_id, trial.pid) | |
| self._monitor_tasks[trial_id] = asyncio.create_task( | |
| self._monitor(trial_id) | |
| ) | |
| if trial.gpu_id is not None: | |
| self._assign_gpu(trial_id, trial.gpu_id) | |
| else: | |
| log.warning("Trial %d (PID %d) no longer running", trial_id, trial.pid) | |
| read_metrics(trial, self.log_dir, self._metrics_offsets) | |
| trial.end_time = time.time() | |
| if trial.best_val_loss is not None: | |
| trial.status = "completed" | |
| self._emit("trial_completed", trial_id, { | |
| "best_val_loss": trial.best_val_loss, | |
| "best_accuracy": trial.best_accuracy, | |
| "param_count": trial.actual_param_count, | |
| "steps": trial.current_step, | |
| "recovered": True, | |
| }) | |
| else: | |
| trial.status = "failed" | |
| self._emit("trial_failed", trial_id, { | |
| "reason": "process exited during server downtime", | |
| "recovered": True, | |
| }) | |
| self._save_state() | |
| self.render_progress_log() | |
| log.info("Recovery complete: %d trials, %d still running", | |
| len(self.trials), | |
| sum(1 for t in self.trials.values() if t.status == "running")) | |
| def shutdown(self) -> None: | |
| """Save state on shutdown. Training processes continue independently.""" | |
| for task in self._monitor_tasks.values(): | |
| task.cancel() | |
| self._monitor_tasks.clear() | |
| self._save_state() | |
| self.render_progress_log() | |
| log.info("Runner shutdown (training processes continue)") | |
Xet Storage Details
- Size:
- 31.1 kB
- Xet hash:
- e9c3710d7f8e37e885d760aa2faa7a8214079691a89270153717396763691bfb
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.