| """FastAPI server for GPU Goblin. |
| |
| One audit endpoint plus a health probe. Streams the agent loop's `SSEEvent`s |
| to the UI via Server-Sent Events. CORS is wide open because Streamlit runs on |
| a different port — fine for a hackathon. |
| |
| The agent runs on Qwen via Hugging Face Inference Providers. HF_TOKEN is |
| read at startup; if it's missing the server still starts (so the offline- |
| replay UI lane keeps working) but `/audit` yields a single error event. |
| We never crash on missing keys. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import os |
| import subprocess |
| import sys |
| import tempfile |
| from collections.abc import AsyncIterator |
| from pathlib import Path |
| from typing import Any |
|
|
| from fastapi import FastAPI, File, HTTPException, UploadFile |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
| from sse_starlette.sse import EventSourceResponse |
|
|
| from agent.backends import active_backend_name |
| from agent.loop import run_audit |
| from agent.schemas import SSEEvent |
| from agent.tools import ALL_TOOLS |
|
|
| _REPO_ROOT = Path(__file__).resolve().parent.parent |
| _AUTO_TUNE_SCRIPT = _REPO_ROOT / "scripts" / "auto_tune.py" |
|
|
| app = FastAPI(title="GPU Goblin Agent", version="0.1.0") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=False, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| def _has_hf_token() -> bool: |
| return bool(os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACEHUB_API_TOKEN")) |
|
|
|
|
| @app.get("/healthz") |
| async def healthz() -> dict: |
| """Liveness + tool inventory + active backend. UI uses this to confirm |
| the agent is reachable and configured.""" |
| name = active_backend_name() |
| base = { |
| "ok": True, |
| "tools": [t.name for t in ALL_TOOLS], |
| "backend": name, |
| } |
| if name == "qwen-vllm": |
| base.update( |
| { |
| "model": os.environ.get( |
| "GOBLIN_QWEN_VLLM_MODEL", "Qwen/Qwen2.5-7B-Instruct" |
| ), |
| "vllm_url": os.environ.get( |
| "GOBLIN_QWEN_VLLM_URL", "http://localhost:8000/v1" |
| ), |
| "has_api_key": True, |
| } |
| ) |
| else: |
| base.update( |
| { |
| "model": os.environ.get( |
| "GOBLIN_QWEN_MODEL", "Qwen/Qwen2.5-7B-Instruct" |
| ), |
| "provider": os.environ.get("GOBLIN_QWEN_PROVIDER", "auto"), |
| "has_api_key": _has_hf_token(), |
| } |
| ) |
| return base |
|
|
|
|
| async def _stream_audit(file_path: str) -> AsyncIterator[dict]: |
| """Bridge `run_audit`'s SSEEvent generator into the dict shape that |
| sse-starlette expects. Each yielded dict becomes one `data: ...\\n\\n` |
| SSE message. |
| """ |
| if not _has_hf_token(): |
| |
| yield { |
| "data": SSEEvent( |
| type="error", |
| data={ |
| "message": ( |
| "HF_TOKEN not set on the server — Qwen agent loop is " |
| "unavailable. Set HF_TOKEN (or HUGGINGFACEHUB_API_TOKEN) " |
| "or use the offline-replay UI lane." |
| ) |
| }, |
| ).model_dump_json() |
| } |
| return |
|
|
| try: |
| async for event in run_audit(file_path): |
| yield {"data": event.model_dump_json()} |
| except Exception as exc: |
| yield { |
| "data": SSEEvent( |
| type="error", data={"message": f"server: {exc}"} |
| ).model_dump_json() |
| } |
|
|
|
|
| @app.post("/audit") |
| async def audit(file: UploadFile = File(...)) -> EventSourceResponse: |
| """Accept a multipart file upload and stream the agent's audit events. |
| |
| The uploaded file is saved to a tempfile (preserving the extension so |
| `parse_config`'s extension-dispatched parser picks the right path) and |
| handed to `run_audit`. We don't delete the temp file here — the audit |
| might still be reading it; the OS reaps it eventually and `bench_cache/` |
| is gitignored. |
| """ |
| suffix = Path(file.filename or "").suffix or ".bin" |
| fd, tmp_path = tempfile.mkstemp(prefix="goblin_upload_", suffix=suffix) |
| try: |
| with os.fdopen(fd, "wb") as f: |
| f.write(await file.read()) |
| except Exception: |
| |
| async def _err() -> AsyncIterator[dict]: |
| yield { |
| "data": SSEEvent( |
| type="error", |
| data={"message": "Failed to save uploaded file."}, |
| ).model_dump_json() |
| } |
|
|
| return EventSourceResponse(_err()) |
|
|
| return EventSourceResponse(_stream_audit(tmp_path)) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| class AutoTuneRequest(BaseModel): |
| """JSON shape the /auto-tune endpoint accepts. Mirrors the auto_tune.py |
| CLI surface so the UI just sends what the user picked in the form.""" |
|
|
| model: str | None = Field( |
| default=None, |
| description="HuggingFace model id (e.g. Qwen/Qwen2.5-7B-Instruct). " |
| "Mutually exclusive with `workload`.", |
| ) |
| workload: str | None = Field( |
| default=None, |
| description="Path to a workload script ON THE SERVER's filesystem. " |
| "Mutually exclusive with `model`.", |
| ) |
| mode: str = Field(default="hardcoded", pattern="^(hardcoded|llm|llm-explore)$") |
| candidates_per_iteration: int = Field(default=3, ge=2, le=10) |
| steps: int = Field(default=20, ge=1, le=500) |
| max_iterations: int = Field(default=10, ge=1, le=50) |
| early_stop_after: int = Field(default=3, ge=1, le=20) |
| max_crashes: int = Field(default=4, ge=1, le=20) |
| improvement_threshold: float = Field(default=0.0, ge=0.0, le=20.0) |
|
|
|
|
| def _build_auto_tune_cmd(req: AutoTuneRequest, events_file: Path) -> list[str]: |
| cmd: list[str] = [sys.executable, "-u", str(_AUTO_TUNE_SCRIPT)] |
| if req.model: |
| cmd.extend(["--model", req.model]) |
| elif req.workload: |
| cmd.append(req.workload) |
| cmd.extend([ |
| "--mode", req.mode, |
| "--steps", str(req.steps), |
| "--max-iterations", str(req.max_iterations), |
| "--early-stop-after", str(req.early_stop_after), |
| "--max-crashes", str(req.max_crashes), |
| "--improvement-threshold", str(req.improvement_threshold), |
| "--events", str(events_file), |
| ]) |
| if req.mode == "llm-explore": |
| cmd.extend(["--candidates-per-iteration", str(req.candidates_per_iteration)]) |
| return cmd |
|
|
|
|
| async def _stream_auto_tune(req: AutoTuneRequest) -> AsyncIterator[dict]: |
| """Spawn auto_tune.py and forward its NDJSON --events stream as SSE. |
| |
| Each event is forwarded verbatim — the UI gets the same structured |
| payload it would see when running auto_tune.py locally. We discard |
| the subprocess's stdout/stderr; any errors are surfaced via the |
| `summary` event's absence at process exit. |
| """ |
| events_file = Path(tempfile.mktemp(prefix="auto_tune_events_", suffix=".ndjson")) |
| events_file.write_text("") |
|
|
| cmd = _build_auto_tune_cmd(req, events_file) |
|
|
| |
| |
| if not req.model and not req.workload: |
| yield {"data": json.dumps({ |
| "type": "error", |
| "message": "Pass either `model` or `workload`, not neither." |
| })} |
| return |
| if req.model and req.workload: |
| yield {"data": json.dumps({ |
| "type": "error", |
| "message": "Pass either `model` or `workload`, not both." |
| })} |
| return |
|
|
| proc = subprocess.Popen( |
| cmd, |
| cwd=str(_REPO_ROOT), |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| env={**os.environ}, |
| ) |
|
|
| seen_bytes = 0 |
| try: |
| while True: |
| |
| try: |
| with events_file.open("r") as f: |
| f.seek(seen_bytes) |
| chunk = f.read() |
| new_seen = f.tell() |
| except OSError: |
| chunk = "" |
| new_seen = seen_bytes |
|
|
| if chunk: |
| |
| |
| lines = chunk.splitlines(keepends=True) |
| if lines and not lines[-1].endswith("\n"): |
| partial = lines.pop() |
| new_seen -= len(partial.encode("utf-8")) |
| for line in lines: |
| line = line.strip() |
| if line: |
| yield {"data": line} |
| seen_bytes = new_seen |
|
|
| if proc.poll() is not None: |
| |
| try: |
| with events_file.open("r") as f: |
| f.seek(seen_bytes) |
| tail = f.read() |
| except OSError: |
| tail = "" |
| for line in tail.splitlines(): |
| line = line.strip() |
| if line: |
| yield {"data": line} |
| if proc.returncode != 0: |
| yield {"data": json.dumps({ |
| "type": "process_exit", |
| "returncode": proc.returncode, |
| "message": ( |
| f"auto_tune.py exited with code {proc.returncode}. " |
| "Check the server's stderr or check `last_runner_failure_*` " |
| "in `bench_cache/` for goblin_runner.sh failure logs." |
| ), |
| })} |
| break |
|
|
| await asyncio.sleep(0.5) |
| finally: |
| if proc.poll() is None: |
| proc.terminate() |
| try: |
| proc.wait(timeout=3) |
| except subprocess.TimeoutExpired: |
| proc.kill() |
| try: |
| events_file.unlink() |
| except OSError: |
| pass |
|
|
|
|
| @app.post("/auto-tune") |
| async def auto_tune_endpoint(req: AutoTuneRequest) -> EventSourceResponse: |
| """Stream auto_tune.py events back to the caller as SSE. |
| |
| Run a UI on any host (HF Spaces, local laptop), point it at this |
| endpoint, and the actual GPU work happens on the server hosting the |
| FastAPI app. Subprocess output is discarded — only the --events |
| NDJSON stream crosses the wire, one structured event per SSE message. |
| """ |
| if not _AUTO_TUNE_SCRIPT.exists(): |
| raise HTTPException( |
| status_code=500, |
| detail=f"auto_tune.py not found at {_AUTO_TUNE_SCRIPT}", |
| ) |
| return EventSourceResponse(_stream_auto_tune(req)) |
|
|
|
|
| |
| __all__ = ["app"] |
|
|
|
|
| def _decode_event(raw: str) -> dict: |
| """Helper for the CLI driver — parse a serialized SSEEvent JSON payload. |
| |
| Lives here so __main__.py and tests can share one parser. |
| """ |
| return json.loads(raw) |
|
|