gpu-goblin / runner /protocol.py
bharathtelu's picture
Deploy auto-tune UI + scripts (work-from-91d0cf0)
a9aa4ae verified
Raw
History Blame Contribute Delete
14.6 kB
"""RunnerProtocol — the seam between profile_run/benchmark and the actual GPU.
This is the testability fix from the brooks-audit (Warning #2): without this
abstraction, every change to a tool that touches profiling required an
MI300X cloud session. With it, Backend Lead develops on a laptop using
FakeRunner; Day-3 swaps in the real runner for the canonical demo.
Real implementations subclass `Runner` and call into goblin_runner.sh.
Tests and laptop dev use FakeRunner, which loads canned RunMetrics from
workloads/synthetic/.
`LiveRunner` is the production path: it shells out to `goblin_runner.sh`
(which itself wraps rocprofv3 + torch.profiler), parses the resulting
artefacts via `runner.profile_parser.parse`, and on ANY failure
(missing tools, no GPU, subprocess error) falls back to FakeRunner so the
demo still works on a laptop.
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Protocol
from agent.schemas import RunMetrics, WorkloadConfig
_LOG = logging.getLogger(__name__)
class Runner(Protocol):
"""Anything that can take a WorkloadConfig and produce RunMetrics."""
def run(self, config: WorkloadConfig, steps: int) -> RunMetrics: # pragma: no cover
...
# ---------------------------------------------------------------------------
# GPU detection — used by LiveRunner to decide live vs. fallback.
# ---------------------------------------------------------------------------
def _has_render_device() -> bool:
"""At least one /dev/dri/renderD* device is present."""
dri = Path("/dev/dri")
if not dri.exists():
return False
try:
return any(child.name.startswith("renderD") for child in dri.iterdir())
except OSError:
return False
def gpu_available() -> tuple[bool, str | None]:
"""Return `(ok, reason_if_missing)`.
LiveRunner is only safe to invoke when ALL of the following hold:
1. `rocprofv3` is on PATH (the kernel-trace driver).
2. `amd-smi` is on PATH (HBM/power telemetry sampler).
3. /dev/dri has at least one `renderD*` node (a real AMD GPU).
If any check fails we fall back to FakeRunner with a clear warning.
"""
if shutil.which("rocprofv3") is None:
return False, "rocprofv3 not found on PATH"
if shutil.which("amd-smi") is None:
return False, "amd-smi not found on PATH"
if not _has_render_device():
return False, "no /dev/dri/renderD* device present"
return True, None
# ---------------------------------------------------------------------------
# FakeRunner — loads canned RunMetrics from workloads/synthetic/.
# ---------------------------------------------------------------------------
class FakeRunner:
"""Loads pre-recorded RunMetrics from workloads/synthetic/<scenario>/cached_metrics.json.
The scenario is selected by matching `WorkloadConfig` fields against each
synthetic scenario's `match` block in its manifest. If multiple scenarios
match, the most specific one wins (highest number of matched keys).
If none match, returns a generic baseline.
This lets us:
1. Develop the agent loop without an MI300X.
2. Demo when MI300X cloud is unreachable (offline-replay lane).
3. Run integration tests deterministically.
"""
def __init__(self, corpus_dir: Path | str = "workloads/synthetic") -> None:
self.corpus_dir = Path(corpus_dir)
def run(self, config: WorkloadConfig, steps: int) -> RunMetrics:
scenario = self._match_scenario(config)
if scenario is None:
return self._default_metrics(steps)
metrics_path = scenario / "cached_metrics.json"
if not metrics_path.exists():
return self._default_metrics(steps)
data = json.loads(metrics_path.read_text())
# The cached file may not have steps populated; let the caller's
# request take precedence so profile_run vs benchmark return as expected.
data["steps"] = steps
data["runner_kind"] = "fake"
return RunMetrics.model_validate(data)
# ------------------------------------------------------------------
# Scenario matching
# ------------------------------------------------------------------
def _match_scenario(self, config: WorkloadConfig) -> Path | None:
if not self.corpus_dir.exists():
return None
best: tuple[int, Path] | None = None
for scenario_dir in sorted(self.corpus_dir.iterdir()):
if not scenario_dir.is_dir():
continue
manifest = scenario_dir / "manifest.json"
if not manifest.exists():
continue
try:
spec = json.loads(manifest.read_text())
except json.JSONDecodeError:
continue
match_block = spec.get("match", {})
score = self._score(config, match_block)
if score < 0:
continue
if best is None or score > best[0]:
best = (score, scenario_dir)
return best[1] if best else None
@staticmethod
def _score(config: WorkloadConfig, match: dict) -> int:
"""Return number of keys that match, or -1 if any key conflicts."""
cfg = config.model_dump()
score = 0
for key, expected in match.items():
if cfg.get(key) != expected:
return -1
score += 1
return score
@staticmethod
def _default_metrics(steps: int) -> RunMetrics:
from agent.schemas import KernelEntry, WasteBudget
return RunMetrics(
steps=steps,
tokens_per_sec=120.0,
mfu_pct=22.0,
hbm_peak_gb=72.0,
hbm_avg_gb=58.0,
gpu_util_pct=45.0,
top_kernels=[
KernelEntry(name="aten::matmul", pct_time=42.0),
KernelEntry(name="aten::scaled_dot_product_attention", pct_time=18.0),
KernelEntry(name="aten::layer_norm", pct_time=7.0),
],
attention_kernel_loaded="sdpa",
waste_budget=WasteBudget(
useful_gpu=0.55,
data_wait=0.18,
host_gap=0.07,
comm_excess=0.0,
memory_headroom=0.10,
precision_path=0.06,
kernel_shape=0.04,
),
warnings=["FakeRunner: no matching scenario, returning generic baseline."],
runner_kind="fake",
)
# ---------------------------------------------------------------------------
# LiveRunner — production path. Spawns goblin_runner.sh under rocprofv3.
# ---------------------------------------------------------------------------
# Defaults are pinned to the repo layout. Override via env vars in tests / CI.
_REPO_ROOT = Path(__file__).resolve().parent.parent
_DEFAULT_RUNNER_SCRIPT = _REPO_ROOT / "runner" / "goblin_runner.sh"
_DEFAULT_USER_SCRIPT = _REPO_ROOT / "workloads" / "train_qwen_lora.py"
_FAILURE_ARCHIVE_ROOT = _REPO_ROOT / "bench_cache"
def _archive_failure(out_dir: Path, proc: subprocess.CompletedProcess) -> Path:
"""Copy a failed runner's out_dir into bench_cache/last_runner_failure_<ts>/
along with the subprocess's captured stdout/stderr. The directory survives
after the tempdir cleanup so the user can `tail -n 100 stderr.log` etc.
"""
import shutil
import time
ts = time.strftime("%Y%m%dT%H%M%S")
dest = _FAILURE_ARCHIVE_ROOT / f"last_runner_failure_{ts}"
try:
dest.mkdir(parents=True, exist_ok=True)
if out_dir.exists():
for child in out_dir.iterdir():
target = dest / child.name
if child.is_dir():
shutil.copytree(child, target, dirs_exist_ok=True)
else:
shutil.copy2(child, target)
# Also persist the subprocess's own captured output — these are what
# goblin_runner.sh's failure trap dumped.
(dest / "subprocess_stdout.log").write_text(proc.stdout or "")
(dest / "subprocess_stderr.log").write_text(proc.stderr or "")
(dest / "subprocess_returncode").write_text(str(proc.returncode))
except OSError as exc:
_LOG.warning("LiveRunner: could not archive failure logs (%s)", exc)
return dest
return dest
class LiveRunner:
"""Real-MI300X path: shells out to goblin_runner.sh and parses artefacts.
Auto-falls-back to FakeRunner whenever the host can't actually run a live
profile (missing rocprofv3/amd-smi, no AMD GPU, subprocess error, or
parser failure). The fallback path is the demo safety net.
Public API matches the `Runner` Protocol: `run(config, steps) -> RunMetrics`.
"""
def __init__(
self,
runner_script: Path | str = _DEFAULT_RUNNER_SCRIPT,
user_script: Path | str = _DEFAULT_USER_SCRIPT,
timeout_seconds: int = 600,
fake_fallback: FakeRunner | None = None,
) -> None:
# Default 600s (10 min). Profile runs (10 steps) finish in seconds
# on a healthy MI300X; benchmarks (50 steps) in a couple of minutes.
# 30 minutes was a leftover from a workload that wasn't honoring
# --max_steps and silently trained for hours. With max_steps wired
# correctly, 600s is generous.
self.runner_script = Path(runner_script)
self.user_script = Path(user_script)
self.timeout_seconds = timeout_seconds
self._fake = fake_fallback or FakeRunner()
# ------------------------------------------------------------------
def run(self, config: WorkloadConfig, steps: int) -> RunMetrics:
ok, reason = gpu_available()
if not ok:
return self._fallback(
config,
steps,
f"LiveRunner: GPU/profiler unavailable ({reason}); using FakeRunner.",
)
# Sanity-check the runner script before spawning anything.
if not self.runner_script.exists():
return self._fallback(
config,
steps,
f"LiveRunner: runner script not found at {self.runner_script}; using FakeRunner.",
)
if not os.access(self.runner_script, os.X_OK):
return self._fallback(
config,
steps,
f"LiveRunner: runner script {self.runner_script} not executable; using FakeRunner.",
)
# Late import — only needed on the live path. Keeps laptop-only test
# runs from importing parser dependencies (csv stdlib is fine, but
# this also keeps the dependency direction explicit).
from runner import profile_parser
with tempfile.TemporaryDirectory(prefix="goblin_run_") as out_dir_str:
out_dir = Path(out_dir_str)
env = os.environ.copy()
env["USER_SCRIPT"] = str(self.user_script)
env["OUT_DIR"] = str(out_dir)
env["STEPS"] = str(steps)
cmd = [str(self.runner_script)]
try:
proc = subprocess.run(
cmd,
env=env,
capture_output=True,
text=True,
timeout=self.timeout_seconds,
check=False,
)
except subprocess.TimeoutExpired:
return self._fallback(
config,
steps,
f"LiveRunner: goblin_runner.sh timed out after "
f"{self.timeout_seconds}s; using FakeRunner.",
)
except OSError as exc:
return self._fallback(
config,
steps,
f"LiveRunner: failed to spawn goblin_runner.sh ({exc}); using FakeRunner.",
)
if proc.returncode != 0:
# Archive the full out_dir to bench_cache/last_runner_failure_<ts>/
# so the user can inspect stdout.log / stderr.log / amd_smi.err
# after the tempdir cleanup. The path goes into the warning
# message so it's surfaced through ToolResult.warnings.
archive_path = _archive_failure(out_dir, proc)
stderr_tail = (proc.stderr or "").strip().splitlines()[-15:]
stdout_tail = (proc.stdout or "").strip().splitlines()[-5:]
return self._fallback(
config,
steps,
"LiveRunner: goblin_runner.sh exited with "
f"code {proc.returncode}; using FakeRunner. "
f"Failure logs archived at {archive_path}. "
f"stderr tail: {stderr_tail}. "
f"stdout tail: {stdout_tail}.",
)
try:
metrics = profile_parser.parse(out_dir, config=config, steps=steps)
except Exception as exc: # pragma: no cover — defensive
return self._fallback(
config,
steps,
f"LiveRunner: profile_parser.parse failed ({type(exc).__name__}: {exc}); "
"using FakeRunner.",
)
metrics.runner_kind = "live"
return metrics
# ------------------------------------------------------------------
def _fallback(self, config: WorkloadConfig, steps: int, warning: str) -> RunMetrics:
_LOG.warning(warning)
metrics = self._fake.run(config, steps)
# Make the fallback observable to upstream tools — they surface
# warnings into the final report.
metrics.warnings = [warning, *metrics.warnings]
metrics.runner_kind = "fake"
return metrics
# ---------------------------------------------------------------------------
# Module-level factory used by agent/tools/{profile_run,benchmark}.py.
# ---------------------------------------------------------------------------
def _default_runner() -> Runner:
"""Return the runner profile_run / benchmark should use by default.
Always returns a `LiveRunner` — `LiveRunner.run` itself decides whether to
actually invoke the GPU pipeline or fall back to FakeRunner. Centralising
this here means the live-vs-fake decision lives in exactly one place.
"""
return LiveRunner()