File size: 14,556 Bytes
5b2e2d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a9aa4ae
5b2e2d7
 
a9aa4ae
 
 
 
 
5b2e2d7
 
a9aa4ae
5b2e2d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
"""RunnerProtocol β€” the seam between profile_run/benchmark and the actual GPU.

This is the testability fix from the brooks-audit (Warning #2): without this
abstraction, every change to a tool that touches profiling required an
MI300X cloud session. With it, Backend Lead develops on a laptop using
FakeRunner; Day-3 swaps in the real runner for the canonical demo.

Real implementations subclass `Runner` and call into goblin_runner.sh.
Tests and laptop dev use FakeRunner, which loads canned RunMetrics from
workloads/synthetic/.

`LiveRunner` is the production path: it shells out to `goblin_runner.sh`
(which itself wraps rocprofv3 + torch.profiler), parses the resulting
artefacts via `runner.profile_parser.parse`, and on ANY failure
(missing tools, no GPU, subprocess error) falls back to FakeRunner so the
demo still works on a laptop.
"""

from __future__ import annotations

import json
import logging
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Protocol

from agent.schemas import RunMetrics, WorkloadConfig

_LOG = logging.getLogger(__name__)


class Runner(Protocol):
    """Anything that can take a WorkloadConfig and produce RunMetrics."""

    def run(self, config: WorkloadConfig, steps: int) -> RunMetrics:  # pragma: no cover
        ...


# ---------------------------------------------------------------------------
# GPU detection β€” used by LiveRunner to decide live vs. fallback.
# ---------------------------------------------------------------------------


def _has_render_device() -> bool:
    """At least one /dev/dri/renderD* device is present."""
    dri = Path("/dev/dri")
    if not dri.exists():
        return False
    try:
        return any(child.name.startswith("renderD") for child in dri.iterdir())
    except OSError:
        return False


def gpu_available() -> tuple[bool, str | None]:
    """Return `(ok, reason_if_missing)`.

    LiveRunner is only safe to invoke when ALL of the following hold:
      1. `rocprofv3` is on PATH (the kernel-trace driver).
      2. `amd-smi` is on PATH (HBM/power telemetry sampler).
      3. /dev/dri has at least one `renderD*` node (a real AMD GPU).

    If any check fails we fall back to FakeRunner with a clear warning.
    """
    if shutil.which("rocprofv3") is None:
        return False, "rocprofv3 not found on PATH"
    if shutil.which("amd-smi") is None:
        return False, "amd-smi not found on PATH"
    if not _has_render_device():
        return False, "no /dev/dri/renderD* device present"
    return True, None


# ---------------------------------------------------------------------------
# FakeRunner β€” loads canned RunMetrics from workloads/synthetic/.
# ---------------------------------------------------------------------------


class FakeRunner:
    """Loads pre-recorded RunMetrics from workloads/synthetic/<scenario>/cached_metrics.json.

    The scenario is selected by matching `WorkloadConfig` fields against each
    synthetic scenario's `match` block in its manifest. If multiple scenarios
    match, the most specific one wins (highest number of matched keys).
    If none match, returns a generic baseline.

    This lets us:
      1. Develop the agent loop without an MI300X.
      2. Demo when MI300X cloud is unreachable (offline-replay lane).
      3. Run integration tests deterministically.
    """

    def __init__(self, corpus_dir: Path | str = "workloads/synthetic") -> None:
        self.corpus_dir = Path(corpus_dir)

    def run(self, config: WorkloadConfig, steps: int) -> RunMetrics:
        scenario = self._match_scenario(config)
        if scenario is None:
            return self._default_metrics(steps)

        metrics_path = scenario / "cached_metrics.json"
        if not metrics_path.exists():
            return self._default_metrics(steps)

        data = json.loads(metrics_path.read_text())
        # The cached file may not have steps populated; let the caller's
        # request take precedence so profile_run vs benchmark return as expected.
        data["steps"] = steps
        data["runner_kind"] = "fake"
        return RunMetrics.model_validate(data)

    # ------------------------------------------------------------------
    # Scenario matching
    # ------------------------------------------------------------------

    def _match_scenario(self, config: WorkloadConfig) -> Path | None:
        if not self.corpus_dir.exists():
            return None
        best: tuple[int, Path] | None = None
        for scenario_dir in sorted(self.corpus_dir.iterdir()):
            if not scenario_dir.is_dir():
                continue
            manifest = scenario_dir / "manifest.json"
            if not manifest.exists():
                continue
            try:
                spec = json.loads(manifest.read_text())
            except json.JSONDecodeError:
                continue
            match_block = spec.get("match", {})
            score = self._score(config, match_block)
            if score < 0:
                continue
            if best is None or score > best[0]:
                best = (score, scenario_dir)
        return best[1] if best else None

    @staticmethod
    def _score(config: WorkloadConfig, match: dict) -> int:
        """Return number of keys that match, or -1 if any key conflicts."""
        cfg = config.model_dump()
        score = 0
        for key, expected in match.items():
            if cfg.get(key) != expected:
                return -1
            score += 1
        return score

    @staticmethod
    def _default_metrics(steps: int) -> RunMetrics:
        from agent.schemas import KernelEntry, WasteBudget

        return RunMetrics(
            steps=steps,
            tokens_per_sec=120.0,
            mfu_pct=22.0,
            hbm_peak_gb=72.0,
            hbm_avg_gb=58.0,
            gpu_util_pct=45.0,
            top_kernels=[
                KernelEntry(name="aten::matmul", pct_time=42.0),
                KernelEntry(name="aten::scaled_dot_product_attention", pct_time=18.0),
                KernelEntry(name="aten::layer_norm", pct_time=7.0),
            ],
            attention_kernel_loaded="sdpa",
            waste_budget=WasteBudget(
                useful_gpu=0.55,
                data_wait=0.18,
                host_gap=0.07,
                comm_excess=0.0,
                memory_headroom=0.10,
                precision_path=0.06,
                kernel_shape=0.04,
            ),
            warnings=["FakeRunner: no matching scenario, returning generic baseline."],
            runner_kind="fake",
        )


# ---------------------------------------------------------------------------
# LiveRunner β€” production path. Spawns goblin_runner.sh under rocprofv3.
# ---------------------------------------------------------------------------


# Defaults are pinned to the repo layout. Override via env vars in tests / CI.
_REPO_ROOT = Path(__file__).resolve().parent.parent
_DEFAULT_RUNNER_SCRIPT = _REPO_ROOT / "runner" / "goblin_runner.sh"
_DEFAULT_USER_SCRIPT = _REPO_ROOT / "workloads" / "train_qwen_lora.py"
_FAILURE_ARCHIVE_ROOT = _REPO_ROOT / "bench_cache"


def _archive_failure(out_dir: Path, proc: subprocess.CompletedProcess) -> Path:
    """Copy a failed runner's out_dir into bench_cache/last_runner_failure_<ts>/
    along with the subprocess's captured stdout/stderr. The directory survives
    after the tempdir cleanup so the user can `tail -n 100 stderr.log` etc.
    """
    import shutil
    import time

    ts = time.strftime("%Y%m%dT%H%M%S")
    dest = _FAILURE_ARCHIVE_ROOT / f"last_runner_failure_{ts}"
    try:
        dest.mkdir(parents=True, exist_ok=True)
        if out_dir.exists():
            for child in out_dir.iterdir():
                target = dest / child.name
                if child.is_dir():
                    shutil.copytree(child, target, dirs_exist_ok=True)
                else:
                    shutil.copy2(child, target)
        # Also persist the subprocess's own captured output β€” these are what
        # goblin_runner.sh's failure trap dumped.
        (dest / "subprocess_stdout.log").write_text(proc.stdout or "")
        (dest / "subprocess_stderr.log").write_text(proc.stderr or "")
        (dest / "subprocess_returncode").write_text(str(proc.returncode))
    except OSError as exc:
        _LOG.warning("LiveRunner: could not archive failure logs (%s)", exc)
        return dest
    return dest


class LiveRunner:
    """Real-MI300X path: shells out to goblin_runner.sh and parses artefacts.

    Auto-falls-back to FakeRunner whenever the host can't actually run a live
    profile (missing rocprofv3/amd-smi, no AMD GPU, subprocess error, or
    parser failure). The fallback path is the demo safety net.

    Public API matches the `Runner` Protocol: `run(config, steps) -> RunMetrics`.
    """

    def __init__(
        self,
        runner_script: Path | str = _DEFAULT_RUNNER_SCRIPT,
        user_script: Path | str = _DEFAULT_USER_SCRIPT,
        timeout_seconds: int = 600,
        fake_fallback: FakeRunner | None = None,
    ) -> None:
        # Default 600s (10 min). Profile runs (10 steps) finish in seconds
        # on a healthy MI300X; benchmarks (50 steps) in a couple of minutes.
        # 30 minutes was a leftover from a workload that wasn't honoring
        # --max_steps and silently trained for hours. With max_steps wired
        # correctly, 600s is generous.
        self.runner_script = Path(runner_script)
        self.user_script = Path(user_script)
        self.timeout_seconds = timeout_seconds
        self._fake = fake_fallback or FakeRunner()

    # ------------------------------------------------------------------

    def run(self, config: WorkloadConfig, steps: int) -> RunMetrics:
        ok, reason = gpu_available()
        if not ok:
            return self._fallback(
                config,
                steps,
                f"LiveRunner: GPU/profiler unavailable ({reason}); using FakeRunner.",
            )

        # Sanity-check the runner script before spawning anything.
        if not self.runner_script.exists():
            return self._fallback(
                config,
                steps,
                f"LiveRunner: runner script not found at {self.runner_script}; using FakeRunner.",
            )
        if not os.access(self.runner_script, os.X_OK):
            return self._fallback(
                config,
                steps,
                f"LiveRunner: runner script {self.runner_script} not executable; using FakeRunner.",
            )

        # Late import β€” only needed on the live path. Keeps laptop-only test
        # runs from importing parser dependencies (csv stdlib is fine, but
        # this also keeps the dependency direction explicit).
        from runner import profile_parser

        with tempfile.TemporaryDirectory(prefix="goblin_run_") as out_dir_str:
            out_dir = Path(out_dir_str)

            env = os.environ.copy()
            env["USER_SCRIPT"] = str(self.user_script)
            env["OUT_DIR"] = str(out_dir)
            env["STEPS"] = str(steps)

            cmd = [str(self.runner_script)]
            try:
                proc = subprocess.run(
                    cmd,
                    env=env,
                    capture_output=True,
                    text=True,
                    timeout=self.timeout_seconds,
                    check=False,
                )
            except subprocess.TimeoutExpired:
                return self._fallback(
                    config,
                    steps,
                    f"LiveRunner: goblin_runner.sh timed out after "
                    f"{self.timeout_seconds}s; using FakeRunner.",
                )
            except OSError as exc:
                return self._fallback(
                    config,
                    steps,
                    f"LiveRunner: failed to spawn goblin_runner.sh ({exc}); using FakeRunner.",
                )

            if proc.returncode != 0:
                # Archive the full out_dir to bench_cache/last_runner_failure_<ts>/
                # so the user can inspect stdout.log / stderr.log / amd_smi.err
                # after the tempdir cleanup. The path goes into the warning
                # message so it's surfaced through ToolResult.warnings.
                archive_path = _archive_failure(out_dir, proc)
                stderr_tail = (proc.stderr or "").strip().splitlines()[-15:]
                stdout_tail = (proc.stdout or "").strip().splitlines()[-5:]
                return self._fallback(
                    config,
                    steps,
                    "LiveRunner: goblin_runner.sh exited with "
                    f"code {proc.returncode}; using FakeRunner. "
                    f"Failure logs archived at {archive_path}. "
                    f"stderr tail: {stderr_tail}. "
                    f"stdout tail: {stdout_tail}.",
                )

            try:
                metrics = profile_parser.parse(out_dir, config=config, steps=steps)
            except Exception as exc:  # pragma: no cover β€” defensive
                return self._fallback(
                    config,
                    steps,
                    f"LiveRunner: profile_parser.parse failed ({type(exc).__name__}: {exc}); "
                    "using FakeRunner.",
                )

            metrics.runner_kind = "live"
            return metrics

    # ------------------------------------------------------------------

    def _fallback(self, config: WorkloadConfig, steps: int, warning: str) -> RunMetrics:
        _LOG.warning(warning)
        metrics = self._fake.run(config, steps)
        # Make the fallback observable to upstream tools β€” they surface
        # warnings into the final report.
        metrics.warnings = [warning, *metrics.warnings]
        metrics.runner_kind = "fake"
        return metrics


# ---------------------------------------------------------------------------
# Module-level factory used by agent/tools/{profile_run,benchmark}.py.
# ---------------------------------------------------------------------------


def _default_runner() -> Runner:
    """Return the runner profile_run / benchmark should use by default.

    Always returns a `LiveRunner` β€” `LiveRunner.run` itself decides whether to
    actually invoke the GPU pipeline or fall back to FakeRunner. Centralising
    this here means the live-vs-fake decision lives in exactly one place.
    """
    return LiveRunner()