townlet / app.py
Budlee's picture
Disable CPU fallback: GPU-only on Spaces (CPU froze UI)
e0c5d7e verified
Raw
History Blame Contribute Delete
17.1 kB
"""gr.Server entrypoint for Townlet.
`import spaces` MUST come before any CUDA-touching import per the official
Zero-GPU docs (https://huggingface.co/docs/hub/spaces-zerogpu). The skill at
https://github.com/huggingface/skills/blob/main/skills/huggingface-zerogpu
reiterates: the `spaces` module patches torch internals; late imports break
ZeroGPU.
Architecture (see `.claude/plans/as-you-should-know-radiant-breeze.md` and
`CONTEXT.md`):
- The Townlet's background thread advances the world clock and queues
"needs decision" markers. It performs NO LLM calls.
- `tick_decisions` (decorated @spaces.GPU) drains the decision queue and
runs the per-character CodeAgent loop. All LLM work happens inside this
boundary so Zero-GPU's quota / device-mapping semantics hold.
- `world_snapshot` is a cheap, non-GPU read for the frontend's poll loop.
Vendor references cited:
https://huggingface.co/blog/introducing-gradio-server (gr.Server API)
https://huggingface.co/docs/hub/en/spaces-zerogpu (Zero-GPU)
Discord recipe by Dean [UNRL] 2026-05-06 (llama-cpp-python on ZeroGPU)
"""
from __future__ import annotations
# `import spaces` first per Zero-GPU docs — it patches torch before anything
# else can pull torch in.
try:
import spaces
GPU = spaces.GPU
except ImportError:
def GPU(fn=None, **_kwargs):
if fn is None:
return lambda f: f
return fn
import os
import random
import tempfile
from pathlib import Path
from fastapi.responses import HTMLResponse
from gradio import Server
from gradio.data_classes import FileData
from backend.factory import get_image
from game.archetypes import archetype_by_name, random_archetype
from game.character import Character
from game.log import tlog
from game.models import DEFAULT_MODEL_ID, ROSTER, roster_for_ui
from game.scheduler import Townlet
from game.the_dot import THE_DOT
from game.world import stand_tile_for
FRONTEND_DIR = Path(__file__).parent / "frontend"
FRONTEND_HTML = (FRONTEND_DIR / "index.html").read_text(encoding="utf-8")
MAX_CHARACTERS = 10
# 3 characters: Ada (engineer), Bram (diplomat), Cyra (saboteur). Each has
# a pinned archetype that drives visible recording action (shell, board,
# sabotage). Dropped the 4th (Doro the wanderer) — fewer decisions per
# drain means less risk of overrunning the @spaces.GPU(duration=120)
# budget, and the 3 heroes all have explicit dramatic directives.
INITIAL_CHARACTERS = 3
DEFAULT_NAMES = ["Ada", "Bram", "Cyra", "Doro", "Eli", "Faye", "Gus", "Hild", "Ira", "Juno"]
# All characters run on Nemotron 4B (sponsor model, strong reasoning,
# and — critically — sharing one model across every character means each
# @spaces.GPU fork only pays ONE cold-load instead of one-per-distinct-
# model. Used to be a mixed stack (heroes=Nemotron, wanderer=Llama 3.2 3B)
# but the second model added ~17s of per-fork cold-load whenever Doro's
# turn came up, with no narrative benefit. HERO_NAMES still tracks who's
# pinned to a specific archetype (engineer/diplomat/saboteur); model is
# uniform.
HERO_MODEL_ID = "nemotron-4b"
HERO_NAMES = ("Ada", "Bram", "Cyra")
# Each hero is pinned to a specific provocative archetype so the recording
# always shows the dramatic actions: Ada lives in the shell, Bram lives on
# the message board. Both archetypes' personalities explicitly direct the
# agent toward those visible behaviours. Non-heroes draw from any OTHER
# archetype so we don't end up with three engineers crowding the shell.
HERO_ARCHETYPE_FOR = {
"Ada": "the engineer", # shell-obsessed
"Bram": "the diplomat", # board-obsessed
"Cyra": "the saboteur", # board-obsessed
}
HERO_ARCHETYPES = frozenset(HERO_ARCHETYPE_FOR.values())
TOWNLET = Townlet()
RNG = random.Random()
# Resolve GGUF paths at module scope and register them with RealLLMBackend.
# This is the canonical pattern used by Dean's Rizz Therapy and 1000-Rooms:
# the path is resolved once in the parent process (no GPU contention) and
# passed straight into Llama(model_path=...) inside @spaces.GPU. Avoids the
# cache-validation roundtrip per fork that Llama.from_pretrained does.
#
# Side benefit: opening + reading the file populates the OS page cache, so
# the first @spaces.GPU fork mmaps from cache rather than disk.
def _resolve_and_warm_models() -> None:
if not os.getenv("SPACE_ID"):
return
try:
from huggingface_hub import hf_hub_download
from backend.real import RealLLMBackend
except Exception as e:
tlog(f"[townlet] model path resolve skipped: {e}")
return
# Single model — every character uses nemotron-4b. Resolving paths
# for roster models we never load would waste ~5s of startup per model.
for model_id in ("nemotron-4b",):
spec = ROSTER.get(model_id)
if spec is None:
continue
try:
path = hf_hub_download(spec.repo_id, spec.gguf_filename)
RealLLMBackend.register_model_path(model_id, path)
tlog(f"[townlet] resolved {model_id} -> {path}")
# Read-through to populate the OS page cache.
with open(path, "rb") as f:
while f.read(8 * 1024 * 1024): # 8 MB chunks
pass
tlog(f"[townlet] page cache warm: {model_id}")
except Exception as e:
tlog(f"[townlet] resolve/warm failed for {model_id}: {e}")
_resolve_and_warm_models()
def _spawn_one(name: str | None = None) -> Character | None:
snap = TOWNLET.snapshot()
existing = {c["name"] for c in snap["characters"]}
if len(existing) >= MAX_CHARACTERS:
return None
if name is None:
for candidate in DEFAULT_NAMES:
if candidate not in existing:
name = candidate
break
if name is None:
return None
if name in existing:
return None
if name in HERO_ARCHETYPE_FOR:
arch = archetype_by_name(HERO_ARCHETYPE_FOR[name])
else:
arch = random_archetype(RNG, exclude=HERO_ARCHETYPES)
# Spawn at a random named place (never shell, so we don't auto-grab
# the lock). Code, not the LLM, picks the exact stand-tile.
spawn_place = RNG.choice(("cave", "well", "locker_row", "town_hall"))
pos = stand_tile_for(spawn_place, name)
# Single model for all characters — see HERO_MODEL_ID comment above.
model_id = HERO_MODEL_ID
char = Character(
name=name,
personality=arch.personality,
archetype=arch.name,
model_id=model_id,
sprite_id=f"sprite_{len(existing) % 6}",
pos=pos,
goal=arch.starting_goal,
reward=arch.reward,
)
TOWNLET.add_character(char)
return char
def _bootstrap() -> None:
TOWNLET.start()
for _ in range(INITIAL_CHARACTERS):
_spawn_one()
_bootstrap()
app = Server()
def _log_req(endpoint: str, **kwargs) -> None:
"""One-line request marker so we can correlate frontend calls with
server work in the Spaces / local logs.
"""
detail = " ".join(f"{k}={v!r}" for k, v in kwargs.items())
tlog(f"[townlet] REQ {endpoint} {detail}".rstrip())
def _log_resp(endpoint: str, result) -> None:
if isinstance(result, dict):
keys = list(result.keys())
sizes = {k: (len(v) if isinstance(v, (list, dict, str)) else "—") for k, v in result.items()}
tlog(f"[townlet] RES {endpoint} keys={keys} sizes={sizes}")
else:
tlog(f"[townlet] RES {endpoint} type={type(result).__name__}")
@app.get("/", response_class=HTMLResponse)
async def homepage() -> str:
tlog(f"[townlet] REQ GET / (html size={len(FRONTEND_HTML)})")
return FRONTEND_HTML
@app.api(name="world_snapshot")
def world_snapshot() -> dict:
_log_req("world_snapshot")
result = TOWNLET.snapshot()
_log_resp("world_snapshot", result)
return result
# GPU "hog mode": once we've been granted a slot, the cold-load cost
# (~15-20s for Llama.from_pretrained inside the fork) is sunk — every
# subsequent drain in the same fork reuses the already-loaded Llama via
# backend.factory._llms. So instead of running ONE drain per @spaces.GPU
# call and giving the slot back, we run multiple drains back-to-back until
# we approach the time budget or run out of pending decisions.
#
# duration=120 chosen to maximise work-per-acquisition. Per-call quota
# burn is the same whether the fork does 2 drains or 12 drains (we use
# real wall time, not the declared duration). What we want is high
# decisions-per-fork so that even when the allocator denies us 1 in N
# calls, the calls that ARE granted produce a lot of visible action.
#
# CRITICAL: the hog loop must NEVER start a sub-drain that would cross the
# 120s declared duration boundary, or ZeroGPU aborts the worker mid-drain
# (observed 2026-06-15: a sub-drain started at 89s elapsed got killed at
# the 120s wall, "GPU task aborted" surfaced to the caller and the partial
# drain's state was lost). The adaptive deadline below estimates the next
# sub-drain's cost from the rolling average and only starts a new one if
# it can finish with margin to spare.
GPU_DURATION = 120
_HARD_DEADLINE_S = 110.0 # absolute cutoff inside the function (10s safety)
_SAFETY_FACTOR = 1.5 # next sub-drain assumed to take 1.5x recent avg
_MAX_SUB_DRAINS = 12 # safety cap in case sub-drains run very fast
@GPU(duration=GPU_DURATION)
def _drain_gpu() -> dict:
"""Hog the GPU once granted. Runs drain_decisions() in a loop inside
the same fork until either:
- the next sub-drain wouldn't finish before the hard deadline
- we run out of pending decisions
- we hit the safety cap on sub-drain count
Each sub-drain's wall time is measured and used to estimate whether
the next one will fit. The first iteration has no history, so it
runs unconditionally (cold-load + first drain is always under 120s
in observed traffic).
"""
import time
started = time.monotonic()
total = 0
sub_drains = 0
sub_drain_times: list[float] = []
while sub_drains < _MAX_SUB_DRAINS:
elapsed = time.monotonic() - started
# Hard cutoff: don't even check timing math close to the cliff.
if elapsed > _HARD_DEADLINE_S:
break
# Adaptive cutoff: from sub-drain 2 onwards, estimate whether the
# next sub-drain will fit. The first one always runs because we
# need some data to estimate from, and we expect cold-load + drain
# 1 to fit even worst-case (~17s + ~25s = ~42s vs 110s).
if sub_drain_times:
avg = sum(sub_drain_times) / len(sub_drain_times)
est_next = avg * _SAFETY_FACTOR
if elapsed + est_next > _HARD_DEADLINE_S:
break
sub_start = time.monotonic()
n = TOWNLET.drain_decisions()
sub_drain_times.append(time.monotonic() - sub_start)
sub_drains += 1
total += n
if n == 0:
# No pending decisions queued. tick_world inside drain_decisions
# already advanced the clock; idling further would just spin.
break
return {"decisions_run": total, "sub_drains": sub_drains}
# Process-level mutex on CPU drains. CPU LLM inference takes 5+ minutes
# on 2 vCPU; multiple concurrent CPU drains hold the state file lock the
# whole time, blocking world_snapshot and freezing the UI. With this
# mutex, only one CPU drain runs at a time — every subsequent attempt
# bails immediately with mode='cpu_busy'. The world snapshot poll stays
# responsive (it can still acquire the lock between the active CPU
# drain's sub-iterations), so the UI keeps animating even while CPU is
# churning in the background.
import threading
_cpu_drain_lock = threading.Lock()
def _drain_cpu_nonblocking() -> dict:
"""Try to run one CPU drain. If another is already in progress, return
immediately with sub_drains=0 — don't queue, don't block.
"""
if not _cpu_drain_lock.acquire(blocking=False):
tlog("[townlet] CPU drain skipped: another CPU drain is in progress")
return {"decisions_run": 0, "sub_drains": 0, "busy": True}
try:
import os
os.environ["TOWNLET_FORCE_CPU"] = "1"
n = TOWNLET.drain_decisions()
return {"decisions_run": n, "sub_drains": 1, "busy": False}
finally:
_cpu_drain_lock.release()
@app.api(name="tick_decisions")
def tick_decisions_api() -> dict:
"""Try the ZeroGPU path. On any failure (RuntimeError, BrokenProcessPool,
framework-level GPU error that propagates as a generic Exception, etc.)
fall back to the CPU drain — but the CPU drain is guarded by a
process-level mutex so concurrent failures don't stampede and hold the
state file lock for many minutes (which freezes world_snapshot and the
UI). When CPU is busy, we just return mode='cpu_busy' and let the world
keep ticking via snapshot polls.
"""
_log_req("tick_decisions")
sub_drains = 0
try:
gpu_result = _drain_gpu()
n = gpu_result["decisions_run"]
sub_drains = gpu_result["sub_drains"]
mode = "gpu"
tlog(f"[townlet] GPU drain hog: {sub_drains} sub-drain(s), {n} decisions")
except BaseException as e:
# CPU fallback DISABLED on Spaces: Nemotron 4B on the parent
# process's 2 vCPU takes 5+ minutes per drain and holds the state
# file lock the whole time, which freezes world_snapshot and the
# entire UI. Better behaviour: just report gpu_unavailable, let
# the world keep ticking via snapshots, retry GPU on the next
# poll. When credits are funding overage, the next call may
# succeed; when truly out of quota, we wait for reset.
msg = f"{type(e).__name__}: {e}"
tlog(f"[townlet] GPU drain failed ({msg[:120]}); skipping (CPU fallback disabled)")
n = 0
mode = "gpu_unavailable"
result = {"decisions_run": n, "mode": mode, "sub_drains": sub_drains}
_log_resp("tick_decisions", result)
return result
@app.api(name="tick_decisions_cpu")
def tick_decisions_cpu_api() -> dict:
"""Kept as an endpoint for backward compat with old frontend builds,
but CPU drain is disabled — Nemotron 4B on 2 vCPU froze the UI. We
just return mode='gpu_unavailable' so the chip reflects reality
without engaging the slow CPU path.
"""
_log_req("tick_decisions_cpu")
result = {"decisions_run": 0, "mode": "gpu_unavailable"}
_log_resp("tick_decisions_cpu", result)
return result
@app.api(name="list_models")
def list_models() -> dict:
_log_req("list_models")
result = {"roster": roster_for_ui(), "default": DEFAULT_MODEL_ID}
_log_resp("list_models", result)
return result
@app.api(name="get_the_dot")
def get_the_dot() -> dict:
"""Returns THE_DOT verbatim. Powers the in-app Info modal so visitors can
read exactly what every character is told before anything else.
"""
return {"text": THE_DOT}
@app.api(name="set_personality")
def set_personality(name: str, personality: str) -> dict:
_log_req("set_personality", name=name, personality_len=len(personality))
ok = TOWNLET.set_personality(name, personality)
result = {"ok": ok}
_log_resp("set_personality", result)
return result
@app.api(name="set_model")
def set_model(name: str, model_id: str) -> dict:
_log_req("set_model", name=name, model_id=model_id)
ok = TOWNLET.set_model(name, model_id)
result = {"ok": ok}
_log_resp("set_model", result)
return result
@app.api(name="spawn_character")
def spawn_character(name: str | None = None) -> dict:
_log_req("spawn_character", name=name)
char = _spawn_one(name)
if char is None:
result = {"ok": False, "error": "max characters or name taken"}
else:
result = {"ok": True, "name": char.name}
_log_resp("spawn_character", result)
return result
@app.api(name="set_paused")
def set_paused(paused: bool) -> dict:
_log_req("set_paused", paused=paused)
TOWNLET.set_paused(bool(paused))
result = {"ok": True, "paused": bool(paused)}
_log_resp("set_paused", result)
return result
@app.api(name="reset_world")
def reset_world() -> dict:
_log_req("reset_world")
TOWNLET.reset()
for _ in range(INITIAL_CHARACTERS):
_spawn_one()
result = {"ok": True}
_log_resp("reset_world", result)
return result
@app.api(name="generate_background")
def generate_background_api(prompt: str) -> FileData:
# Flux 2 was dropped to save ZeroGPU quota. This endpoint now serves a
# mock gradient (Pillow). Kept as an endpoint in case any hidden caller
# still hits it; no @GPU decorator because no GPU is touched.
png_bytes = get_image().generate(prompt)
tmp = tempfile.NamedTemporaryFile(prefix="bg_", suffix=".png", delete=False)
tmp.write(png_bytes)
tmp.flush()
tmp.close()
return FileData(path=tmp.name)
if __name__ == "__main__":
app.launch(show_error=True)