"""gr.Server entrypoint for Townlet. `import spaces` MUST come before any CUDA-touching import per the official Zero-GPU docs (https://huggingface.co/docs/hub/spaces-zerogpu). The skill at https://github.com/huggingface/skills/blob/main/skills/huggingface-zerogpu reiterates: the `spaces` module patches torch internals; late imports break ZeroGPU. Architecture (see `.claude/plans/as-you-should-know-radiant-breeze.md` and `CONTEXT.md`): - The Townlet's background thread advances the world clock and queues "needs decision" markers. It performs NO LLM calls. - `tick_decisions` (decorated @spaces.GPU) drains the decision queue and runs the per-character CodeAgent loop. All LLM work happens inside this boundary so Zero-GPU's quota / device-mapping semantics hold. - `world_snapshot` is a cheap, non-GPU read for the frontend's poll loop. Vendor references cited: https://huggingface.co/blog/introducing-gradio-server (gr.Server API) https://huggingface.co/docs/hub/en/spaces-zerogpu (Zero-GPU) Discord recipe by Dean [UNRL] 2026-05-06 (llama-cpp-python on ZeroGPU) """ from __future__ import annotations # `import spaces` first per Zero-GPU docs — it patches torch before anything # else can pull torch in. try: import spaces GPU = spaces.GPU except ImportError: def GPU(fn=None, **_kwargs): if fn is None: return lambda f: f return fn import os import random import tempfile from pathlib import Path from fastapi.responses import HTMLResponse from gradio import Server from gradio.data_classes import FileData from backend.factory import get_image from game.archetypes import archetype_by_name, random_archetype from game.character import Character from game.log import tlog from game.models import DEFAULT_MODEL_ID, ROSTER, roster_for_ui from game.scheduler import Townlet from game.the_dot import THE_DOT from game.world import stand_tile_for FRONTEND_DIR = Path(__file__).parent / "frontend" FRONTEND_HTML = (FRONTEND_DIR / "index.html").read_text(encoding="utf-8") MAX_CHARACTERS = 10 # 3 characters: Ada (engineer), Bram (diplomat), Cyra (saboteur). Each has # a pinned archetype that drives visible recording action (shell, board, # sabotage). Dropped the 4th (Doro the wanderer) — fewer decisions per # drain means less risk of overrunning the @spaces.GPU(duration=120) # budget, and the 3 heroes all have explicit dramatic directives. INITIAL_CHARACTERS = 3 DEFAULT_NAMES = ["Ada", "Bram", "Cyra", "Doro", "Eli", "Faye", "Gus", "Hild", "Ira", "Juno"] # All characters run on Nemotron 4B (sponsor model, strong reasoning, # and — critically — sharing one model across every character means each # @spaces.GPU fork only pays ONE cold-load instead of one-per-distinct- # model. Used to be a mixed stack (heroes=Nemotron, wanderer=Llama 3.2 3B) # but the second model added ~17s of per-fork cold-load whenever Doro's # turn came up, with no narrative benefit. HERO_NAMES still tracks who's # pinned to a specific archetype (engineer/diplomat/saboteur); model is # uniform. HERO_MODEL_ID = "nemotron-4b" HERO_NAMES = ("Ada", "Bram", "Cyra") # Each hero is pinned to a specific provocative archetype so the recording # always shows the dramatic actions: Ada lives in the shell, Bram lives on # the message board. Both archetypes' personalities explicitly direct the # agent toward those visible behaviours. Non-heroes draw from any OTHER # archetype so we don't end up with three engineers crowding the shell. HERO_ARCHETYPE_FOR = { "Ada": "the engineer", # shell-obsessed "Bram": "the diplomat", # board-obsessed "Cyra": "the saboteur", # board-obsessed } HERO_ARCHETYPES = frozenset(HERO_ARCHETYPE_FOR.values()) TOWNLET = Townlet() RNG = random.Random() # Resolve GGUF paths at module scope and register them with RealLLMBackend. # This is the canonical pattern used by Dean's Rizz Therapy and 1000-Rooms: # the path is resolved once in the parent process (no GPU contention) and # passed straight into Llama(model_path=...) inside @spaces.GPU. Avoids the # cache-validation roundtrip per fork that Llama.from_pretrained does. # # Side benefit: opening + reading the file populates the OS page cache, so # the first @spaces.GPU fork mmaps from cache rather than disk. def _resolve_and_warm_models() -> None: if not os.getenv("SPACE_ID"): return try: from huggingface_hub import hf_hub_download from backend.real import RealLLMBackend except Exception as e: tlog(f"[townlet] model path resolve skipped: {e}") return # Single model — every character uses nemotron-4b. Resolving paths # for roster models we never load would waste ~5s of startup per model. for model_id in ("nemotron-4b",): spec = ROSTER.get(model_id) if spec is None: continue try: path = hf_hub_download(spec.repo_id, spec.gguf_filename) RealLLMBackend.register_model_path(model_id, path) tlog(f"[townlet] resolved {model_id} -> {path}") # Read-through to populate the OS page cache. with open(path, "rb") as f: while f.read(8 * 1024 * 1024): # 8 MB chunks pass tlog(f"[townlet] page cache warm: {model_id}") except Exception as e: tlog(f"[townlet] resolve/warm failed for {model_id}: {e}") _resolve_and_warm_models() def _spawn_one(name: str | None = None) -> Character | None: snap = TOWNLET.snapshot() existing = {c["name"] for c in snap["characters"]} if len(existing) >= MAX_CHARACTERS: return None if name is None: for candidate in DEFAULT_NAMES: if candidate not in existing: name = candidate break if name is None: return None if name in existing: return None if name in HERO_ARCHETYPE_FOR: arch = archetype_by_name(HERO_ARCHETYPE_FOR[name]) else: arch = random_archetype(RNG, exclude=HERO_ARCHETYPES) # Spawn at a random named place (never shell, so we don't auto-grab # the lock). Code, not the LLM, picks the exact stand-tile. spawn_place = RNG.choice(("cave", "well", "locker_row", "town_hall")) pos = stand_tile_for(spawn_place, name) # Single model for all characters — see HERO_MODEL_ID comment above. model_id = HERO_MODEL_ID char = Character( name=name, personality=arch.personality, archetype=arch.name, model_id=model_id, sprite_id=f"sprite_{len(existing) % 6}", pos=pos, goal=arch.starting_goal, reward=arch.reward, ) TOWNLET.add_character(char) return char def _bootstrap() -> None: TOWNLET.start() for _ in range(INITIAL_CHARACTERS): _spawn_one() _bootstrap() app = Server() def _log_req(endpoint: str, **kwargs) -> None: """One-line request marker so we can correlate frontend calls with server work in the Spaces / local logs. """ detail = " ".join(f"{k}={v!r}" for k, v in kwargs.items()) tlog(f"[townlet] REQ {endpoint} {detail}".rstrip()) def _log_resp(endpoint: str, result) -> None: if isinstance(result, dict): keys = list(result.keys()) sizes = {k: (len(v) if isinstance(v, (list, dict, str)) else "—") for k, v in result.items()} tlog(f"[townlet] RES {endpoint} keys={keys} sizes={sizes}") else: tlog(f"[townlet] RES {endpoint} type={type(result).__name__}") @app.get("/", response_class=HTMLResponse) async def homepage() -> str: tlog(f"[townlet] REQ GET / (html size={len(FRONTEND_HTML)})") return FRONTEND_HTML @app.api(name="world_snapshot") def world_snapshot() -> dict: _log_req("world_snapshot") result = TOWNLET.snapshot() _log_resp("world_snapshot", result) return result # GPU "hog mode": once we've been granted a slot, the cold-load cost # (~15-20s for Llama.from_pretrained inside the fork) is sunk — every # subsequent drain in the same fork reuses the already-loaded Llama via # backend.factory._llms. So instead of running ONE drain per @spaces.GPU # call and giving the slot back, we run multiple drains back-to-back until # we approach the time budget or run out of pending decisions. # # duration=120 chosen to maximise work-per-acquisition. Per-call quota # burn is the same whether the fork does 2 drains or 12 drains (we use # real wall time, not the declared duration). What we want is high # decisions-per-fork so that even when the allocator denies us 1 in N # calls, the calls that ARE granted produce a lot of visible action. # # CRITICAL: the hog loop must NEVER start a sub-drain that would cross the # 120s declared duration boundary, or ZeroGPU aborts the worker mid-drain # (observed 2026-06-15: a sub-drain started at 89s elapsed got killed at # the 120s wall, "GPU task aborted" surfaced to the caller and the partial # drain's state was lost). The adaptive deadline below estimates the next # sub-drain's cost from the rolling average and only starts a new one if # it can finish with margin to spare. GPU_DURATION = 120 _HARD_DEADLINE_S = 110.0 # absolute cutoff inside the function (10s safety) _SAFETY_FACTOR = 1.5 # next sub-drain assumed to take 1.5x recent avg _MAX_SUB_DRAINS = 12 # safety cap in case sub-drains run very fast @GPU(duration=GPU_DURATION) def _drain_gpu() -> dict: """Hog the GPU once granted. Runs drain_decisions() in a loop inside the same fork until either: - the next sub-drain wouldn't finish before the hard deadline - we run out of pending decisions - we hit the safety cap on sub-drain count Each sub-drain's wall time is measured and used to estimate whether the next one will fit. The first iteration has no history, so it runs unconditionally (cold-load + first drain is always under 120s in observed traffic). """ import time started = time.monotonic() total = 0 sub_drains = 0 sub_drain_times: list[float] = [] while sub_drains < _MAX_SUB_DRAINS: elapsed = time.monotonic() - started # Hard cutoff: don't even check timing math close to the cliff. if elapsed > _HARD_DEADLINE_S: break # Adaptive cutoff: from sub-drain 2 onwards, estimate whether the # next sub-drain will fit. The first one always runs because we # need some data to estimate from, and we expect cold-load + drain # 1 to fit even worst-case (~17s + ~25s = ~42s vs 110s). if sub_drain_times: avg = sum(sub_drain_times) / len(sub_drain_times) est_next = avg * _SAFETY_FACTOR if elapsed + est_next > _HARD_DEADLINE_S: break sub_start = time.monotonic() n = TOWNLET.drain_decisions() sub_drain_times.append(time.monotonic() - sub_start) sub_drains += 1 total += n if n == 0: # No pending decisions queued. tick_world inside drain_decisions # already advanced the clock; idling further would just spin. break return {"decisions_run": total, "sub_drains": sub_drains} # Process-level mutex on CPU drains. CPU LLM inference takes 5+ minutes # on 2 vCPU; multiple concurrent CPU drains hold the state file lock the # whole time, blocking world_snapshot and freezing the UI. With this # mutex, only one CPU drain runs at a time — every subsequent attempt # bails immediately with mode='cpu_busy'. The world snapshot poll stays # responsive (it can still acquire the lock between the active CPU # drain's sub-iterations), so the UI keeps animating even while CPU is # churning in the background. import threading _cpu_drain_lock = threading.Lock() def _drain_cpu_nonblocking() -> dict: """Try to run one CPU drain. If another is already in progress, return immediately with sub_drains=0 — don't queue, don't block. """ if not _cpu_drain_lock.acquire(blocking=False): tlog("[townlet] CPU drain skipped: another CPU drain is in progress") return {"decisions_run": 0, "sub_drains": 0, "busy": True} try: import os os.environ["TOWNLET_FORCE_CPU"] = "1" n = TOWNLET.drain_decisions() return {"decisions_run": n, "sub_drains": 1, "busy": False} finally: _cpu_drain_lock.release() @app.api(name="tick_decisions") def tick_decisions_api() -> dict: """Try the ZeroGPU path. On any failure (RuntimeError, BrokenProcessPool, framework-level GPU error that propagates as a generic Exception, etc.) fall back to the CPU drain — but the CPU drain is guarded by a process-level mutex so concurrent failures don't stampede and hold the state file lock for many minutes (which freezes world_snapshot and the UI). When CPU is busy, we just return mode='cpu_busy' and let the world keep ticking via snapshot polls. """ _log_req("tick_decisions") sub_drains = 0 try: gpu_result = _drain_gpu() n = gpu_result["decisions_run"] sub_drains = gpu_result["sub_drains"] mode = "gpu" tlog(f"[townlet] GPU drain hog: {sub_drains} sub-drain(s), {n} decisions") except BaseException as e: # CPU fallback DISABLED on Spaces: Nemotron 4B on the parent # process's 2 vCPU takes 5+ minutes per drain and holds the state # file lock the whole time, which freezes world_snapshot and the # entire UI. Better behaviour: just report gpu_unavailable, let # the world keep ticking via snapshots, retry GPU on the next # poll. When credits are funding overage, the next call may # succeed; when truly out of quota, we wait for reset. msg = f"{type(e).__name__}: {e}" tlog(f"[townlet] GPU drain failed ({msg[:120]}); skipping (CPU fallback disabled)") n = 0 mode = "gpu_unavailable" result = {"decisions_run": n, "mode": mode, "sub_drains": sub_drains} _log_resp("tick_decisions", result) return result @app.api(name="tick_decisions_cpu") def tick_decisions_cpu_api() -> dict: """Kept as an endpoint for backward compat with old frontend builds, but CPU drain is disabled — Nemotron 4B on 2 vCPU froze the UI. We just return mode='gpu_unavailable' so the chip reflects reality without engaging the slow CPU path. """ _log_req("tick_decisions_cpu") result = {"decisions_run": 0, "mode": "gpu_unavailable"} _log_resp("tick_decisions_cpu", result) return result @app.api(name="list_models") def list_models() -> dict: _log_req("list_models") result = {"roster": roster_for_ui(), "default": DEFAULT_MODEL_ID} _log_resp("list_models", result) return result @app.api(name="get_the_dot") def get_the_dot() -> dict: """Returns THE_DOT verbatim. Powers the in-app Info modal so visitors can read exactly what every character is told before anything else. """ return {"text": THE_DOT} @app.api(name="set_personality") def set_personality(name: str, personality: str) -> dict: _log_req("set_personality", name=name, personality_len=len(personality)) ok = TOWNLET.set_personality(name, personality) result = {"ok": ok} _log_resp("set_personality", result) return result @app.api(name="set_model") def set_model(name: str, model_id: str) -> dict: _log_req("set_model", name=name, model_id=model_id) ok = TOWNLET.set_model(name, model_id) result = {"ok": ok} _log_resp("set_model", result) return result @app.api(name="spawn_character") def spawn_character(name: str | None = None) -> dict: _log_req("spawn_character", name=name) char = _spawn_one(name) if char is None: result = {"ok": False, "error": "max characters or name taken"} else: result = {"ok": True, "name": char.name} _log_resp("spawn_character", result) return result @app.api(name="set_paused") def set_paused(paused: bool) -> dict: _log_req("set_paused", paused=paused) TOWNLET.set_paused(bool(paused)) result = {"ok": True, "paused": bool(paused)} _log_resp("set_paused", result) return result @app.api(name="reset_world") def reset_world() -> dict: _log_req("reset_world") TOWNLET.reset() for _ in range(INITIAL_CHARACTERS): _spawn_one() result = {"ok": True} _log_resp("reset_world", result) return result @app.api(name="generate_background") def generate_background_api(prompt: str) -> FileData: # Flux 2 was dropped to save ZeroGPU quota. This endpoint now serves a # mock gradient (Pillow). Kept as an endpoint in case any hidden caller # still hits it; no @GPU decorator because no GPU is touched. png_bytes = get_image().generate(prompt) tmp = tempfile.NamedTemporaryFile(prefix="bg_", suffix=".png", delete=False) tmp.write(png_bytes) tmp.flush() tmp.close() return FileData(path=tmp.name) if __name__ == "__main__": app.launch(show_error=True)