godseed / app.py
AndresCarreon's picture
disable flaky LLM judge layer (default-denied every clean wish on llama.cpp); wordlist layers 1-2 remain the gate
db1aa38 verified
Raw
History Blame Contribute Delete
23.4 kB
"""GODSEED — Space entrypoint. gr.Server (FastAPI) + SSE + the WebGL planet.
Nine billion parameters. One shared world.
Wiring (ARCHITECTURE.md is the contract):
web/ (Three.js planet) <-- GET / static (mounted after launch)
Genesis Log <-- GET /book static (web/book.html)
world snapshot <-- GET /api/state
make a wish <-- POST /api/wish (rate-limited, moderation fast-path)
live liturgy <-- GET /api/stream SSE broadcast (X-Accel-Buffering: no)
the rite's token <-- GET /api/turn (cookie-authenticated; owner only)
grant the wish (GPU) <-- gradio @app.api "grant" (runs in the wisher's quota)
Genesis Log index <-- GET /api/wishes
single trace (replay) <-- GET /api/wishes/{id}
Run: python app.py (gr.Server launch on $PORT, default 7860)
Backends: GODSEED_BACKEND = mock | llamacpp | zerogpu (default mock; live = zerogpu)
Sync: HF_TOKEN + GODSEED_DATASET enable trace persistence to an HF dataset.
The deterministic engine owns all world state; the mind only interprets and narrates;
this file only moves bytes. Engine/mind constructor assumptions are concentrated in
``_default_wiring`` below — the single integration point.
"""
from __future__ import annotations
import asyncio
import inspect
import logging
import os
# ZeroGPU runtime: the `spaces` lib must be imported before torch anywhere in
# the process (it patches torch to virtualize the GPU). No-op everywhere else.
if os.environ.get("GODSEED_BACKEND", "").strip().lower() == "zerogpu":
try:
import spaces # noqa: F401
except ImportError:
pass
# No GPU exists at startup, so torch never loads its bundled CUDA runtime —
# but mamba_ssm's compiled kernels (imported at module level by NVIDIA's
# NemotronH remote code) link libcudart.so.12. Preload it so imports resolve.
try:
import ctypes
import glob
for _lib in glob.glob(
"/usr/local/lib/python3*/site-packages/nvidia/cuda_runtime/lib/libcudart.so.12*"
):
try:
ctypes.CDLL(_lib, mode=ctypes.RTLD_GLOBAL)
break
except OSError:
continue
except Exception: # noqa: BLE001 — best-effort; worst case the slow path runs
pass
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Callable, Optional, Protocol
from fastapi import FastAPI, HTTPException, Query, Request, Response
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from server.persistence import PersistenceService
from server.ratelimit import (
COOKIE_NAME,
DEFAULT_IP_LIMIT,
ClientIdentity,
RateLimiter,
client_ip,
)
from server.schemas import (
QueueInfo,
StateResponse,
WishAccepted,
WishRequest,
WishSummary,
WorldState,
)
from server.sse import SSEHub
log = logging.getLogger("godseed")
ROOT = Path(__file__).resolve().parent
WEB_DIR = ROOT / "web"
TRACES_DIR = ROOT / "traces"
VALID_BACKENDS = ("mock", "llamacpp", "zerogpu")
# Poetry for the unhappy paths — the god never returns a stack trace.
RATE_LIMIT_REASON = (
"The god hears only three wishes an hour from one voice. "
"Walk the world a while; return when the shadows have moved."
)
QUEUE_FULL_REASON = "The god is overwhelmed; return at dusk."
NOT_FOUND_REASON = "No such wish has been granted."
# --------------------------------------------------------------------------- protocols
class WorldLike(Protocol):
"""What the server reads from the engine's world (engine owns all writes)."""
version: int
epoch: int
features: list[dict[str, Any]]
class QueueLike(Protocol):
"""What the server needs from engine.queue_worker."""
async def submit(self, text: str, client_id: str) -> tuple[str, int]: ...
def snapshot(self) -> dict[str, Any]: ... # {"length": int, "current": {...}|None}
FastCheck = Callable[[str], Optional[str]] # returns a poetic rejection reason or None
# ----------------------------------------------------------------- default (real) wiring
def _default_wiring(traces_dir: Path) -> dict[str, Any]:
"""Build the real engine + mind stack. ALL cross-component wiring lives here so
integration fixes touch exactly one function."""
backend_name = os.environ.get("GODSEED_BACKEND", "mock").strip().lower() or "mock"
if backend_name not in VALID_BACKENDS:
log.warning("unknown GODSEED_BACKEND=%r; falling back to mock", backend_name)
backend_name = "mock"
from engine.genesis import genesis_features
from engine.moderation import Moderator
from engine.queue_worker import QueueWorker
from engine.world import World
from mind.backends import make_backend
from mind.moderation_judge import judge as judge_fn
from mind.planner import Planner
# Boot: restore wishes.jsonl from the dataset if configured, rebuild the ordered
# feature list (genesis + every trace's features), load the world verbatim.
persistence = PersistenceService(traces_dir)
features = persistence.boot(genesis_features())
epitaphs = [t.get("epitaph") for t in persistence.traces() if t.get("epitaph")]
world = World.load(features, epitaphs)
# The zerogpu backend eager-loads the 9B at construction (ZeroGPU needs the
# weights resident before the first request). If that fails — OOM, a model
# download hiccup — DON'T crash-loop the whole Space: the planet, the Genesis
# Log and every zero-GPU view must still serve. Fall back to mock so visitors
# see the world; granting degrades to scripted until the Space is healthy.
try:
backend = make_backend(backend_name)
except Exception:
log.exception("backend %r failed to load; serving with mock granting", backend_name)
backend = make_backend("mock")
backend_name = "mock"
planner = Planner(backend)
# Moderation layers 1-2 (charset/length + a thorough wordlist with
# confusable/leetspeak/skeleton folding) are the reliable gate and run on
# EVERY wish. Layer 3 (an LLM yes/no judge through the same model) is opt-in:
# on llama.cpp its grammar-constrained verdict often fails to parse and then
# DEFAULT-DENIES every clean wish (June 12: "found a town" rejected as
# 'uncertain'). Off by default; GODSEED_JUDGE=on re-enables it.
use_judge = os.environ.get("GODSEED_JUDGE", "").strip().lower() in ("1", "on", "true")
judge = (lambda text: judge_fn(text, backend)) if use_judge else None
moderator = Moderator(judge=judge)
def fast_check(text: str) -> Optional[str]:
verdict = moderator.precheck(text)
if verdict.allowed:
return None
return verdict.poetic_reason or "The god declines this wish."
hub = SSEHub()
async def persist(trace: Any) -> None:
"""Worker hands over the WishTrace; embed the wish's engine-built features
(filtered from the world by wish_id) so boot rebuild is exact."""
if not isinstance(trace, dict):
trace = dict(getattr(trace, "__dict__", None) or {})
wish_id = trace.get("wish_id")
wish_features = [
f.to_dict() for f in world.features if f.wish_id == wish_id
]
trace.setdefault("feature_ids", [f["id"] for f in wish_features])
await persistence.record(trace, wish_features)
# ZeroGPU: GPU work must run inside the wisher's gradio request (their HF
# auth headers carry the quota), so the worker hands the rite to the browser
# via your_turn/invoke. Override with GODSEED_INVOKE=server|browser.
invoke_mode = os.environ.get("GODSEED_INVOKE", "").strip().lower()
browser_invoked = (
invoke_mode == "browser"
if invoke_mode in ("browser", "server")
else backend_name == "zerogpu"
)
# Wish numbering must survive zero-feature wishes across restarts: the
# worker derives its counter from WORLD features, but a wish whose calls
# all failed leaves no feature — after a restart the same id was reissued
# (June 12: two w_000004 in the log). Traces are the complete record.
import re as _re
trace_max = 0
for t in persistence.traces():
m = _re.match(r"w_(\d{6})$", str(t.get("wish_id") or ""))
if m:
trace_max = max(trace_max, int(m.group(1)))
queue = QueueWorker(
world=world,
moderator=moderator,
planner=planner,
emit=hub.emit,
persist=persist,
browser_invoked=browser_invoked,
next_wish_index=max(QueueWorker._derive_wish_counter(world), trace_max + 1),
)
log.info(
"godseed wired: backend=%s features=%d traces=%d dataset=%s",
backend_name,
world.version,
len(persistence.traces()),
persistence.sync.dataset if persistence.sync.enabled else "off",
)
return {
"world": world,
"queue": queue,
"hub": hub,
"persistence": persistence,
"fast_check": fast_check,
}
# --------------------------------------------------------------------------- helpers
async def _maybe_await(result: Any) -> Any:
if inspect.isawaitable(result):
return await result
return result
def _set_identity_cookie(response: Response, request: Request, value: str) -> None:
"""Spaces serve over https inside an iframe (cross-site -> SameSite=None+Secure);
local dev is plain http (Lax, not Secure)."""
scheme = request.headers.get("x-forwarded-proto", request.url.scheme)
secure = scheme == "https"
response.set_cookie(
COOKIE_NAME,
value,
max_age=365 * 24 * 3600,
path="/",
httponly=True,
secure=secure,
samesite="none" if secure else "lax",
)
def _world_state(world: WorldLike) -> WorldState:
# engine.world.World exposes state_dict() (features as plain dicts); the model
# ignores its extra derived keys. Attribute fallback covers simpler worlds.
if hasattr(world, "state_dict"):
return WorldState.model_validate(world.state_dict())
return WorldState(
version=int(world.version),
epoch=int(world.epoch),
features=list(world.features),
)
def _queue_info(queue: QueueLike) -> QueueInfo:
try:
snapshot = queue.snapshot() or {}
except Exception: # noqa: BLE001 — a queue hiccup must not kill /api/state
log.exception("queue snapshot failed")
snapshot = {}
return QueueInfo.model_validate(
{"length": snapshot.get("length", 0), "current": snapshot.get("current")}
)
def _wish_summary(trace: dict[str, Any], epoch: int) -> WishSummary:
return WishSummary(
wish_id=str(trace.get("wish_id", "")),
text=str(trace.get("text", "")),
epitaph=str(trace.get("epitaph") or ""),
epoch=epoch,
ts=float(trace.get("submitted_at") or 0.0),
)
# --------------------------------------------------------------------------- the app
def create_app(
*,
world: Optional[WorldLike] = None,
queue: Optional[QueueLike] = None,
hub: Optional[SSEHub] = None,
persistence: Optional[PersistenceService] = None,
fast_check: Optional[FastCheck] = None,
rate_limiter: Optional[RateLimiter] = None,
ip_rate_limiter: Optional[RateLimiter] = None,
identity: Optional[ClientIdentity] = None,
traces_dir: Optional[Path | str] = None,
mount_web: bool = True,
mount_panel: bool = True,
) -> FastAPI:
"""Build the ASGI app. With no arguments this wires the real engine + mind from
env config; tests inject fakes for everything engine/mind-shaped."""
traces_path = Path(traces_dir) if traces_dir is not None else TRACES_DIR
if queue is None:
wired = _default_wiring(traces_path)
world = wired["world"]
queue = wired["queue"]
# The default queue emits into the hub _default_wiring built; an injected hub
# is only honored together with an injected queue.
hub = wired["hub"]
persistence = wired["persistence"]
fast_check = wired["fast_check"]
if world is None or hub is None or persistence is None:
raise ValueError(
"create_app: when injecting `queue`, also inject `world`, `hub`, and "
"`persistence`"
)
rate_limiter = rate_limiter or RateLimiter()
ip_rate_limiter = ip_rate_limiter or RateLimiter(limit=DEFAULT_IP_LIMIT)
identity = identity or ClientIdentity()
@asynccontextmanager
async def lifespan(app: FastAPI):
queue_task: Optional[asyncio.Task[Any]] = None
if hasattr(queue, "start"):
await _maybe_await(queue.start())
elif hasattr(queue, "run"):
queue_task = asyncio.create_task(queue.run())
yield
if hasattr(queue, "stop"):
await _maybe_await(queue.stop())
elif queue_task is not None:
queue_task.cancel()
await asyncio.gather(queue_task, return_exceptions=True)
await persistence.drain()
# Gradio Server mode (the "her" idiom): gr.Server IS a FastAPI app with
# gradio's API engine attached — the sdk:gradio runtime serves it natively,
# and @app.api endpoints are gradio endpoints the browser calls via
# @gradio/client, which forwards the HF auth headers ZeroGPU quota needs.
try:
import gradio as gr
app: FastAPI = gr.Server(title="GODSEED")
app.router.lifespan_context = lifespan
except Exception: # pragma: no cover — gradio always present in prod
app = FastAPI(title="GODSEED", docs_url=None, redoc_url=None, lifespan=lifespan)
app.state.world = world
app.state.queue = queue
app.state.hub = hub
app.state.persistence = persistence
async def ensure_worker() -> None:
"""Start the queue worker on the SERVING event loop, lazily. gradio's
launch() runs uvicorn without honoring an injected lifespan (verified
June 12: the worker never started under Server mode), so the first
request wakes the god instead. Idempotent and loop-correct everywhere."""
if (
hasattr(queue, "start")
and hasattr(queue, "is_running")
and not queue.is_running
):
await _maybe_await(queue.start())
# GPU rite endpoint: when the queue announces your_turn over SSE, the
# wisher's browser calls this named gradio endpoint with the one-time token.
if hasattr(app, "api") and hasattr(queue, "invoke"):
@app.api(name="grant")
async def grant(wish_id: str = "", token: str = "") -> dict:
try:
await ensure_worker()
return await queue.invoke(wish_id, token)
except Exception as exc:
reason = getattr(exc, "reason", None) or "the rite failed; the god moved on"
return {"ok": False, "wish_id": wish_id, "reason": str(reason)}
# ------------------------------------------------------------------- /api/turn
# The owner's browser fetches its secret invocation token here. The token is
# identity-bound (signed cookie) and never broadcast over SSE, so a stranger
# tailing /api/stream cannot steal another visitor's rite (verify-fleet
# critical, June 12). A plain FastAPI route DOES see the cookie (unlike the
# gradio @app.api endpoint), which is why the auth check lives here.
if hasattr(queue, "turn_token"):
@app.get("/api/turn", include_in_schema=False)
async def turn(wish_id: str, request: Request, response: Response) -> JSONResponse:
await ensure_worker()
cid, new_cookie = identity.resolve(request)
if new_cookie:
_set_identity_cookie(response, request, new_cookie)
tok = queue.turn_token(wish_id, cid)
if tok is None:
return JSONResponse({"error": "not your turn"}, status_code=403)
return JSONResponse({"wish_id": wish_id, "token": tok})
# ------------------------------------------------------------------- /api/state
@app.get("/api/state", response_model=StateResponse)
async def get_state() -> StateResponse:
await ensure_worker()
traces = persistence.traces()
recent = [
_wish_summary(trace, epoch)
for epoch, trace in enumerate(traces, start=1)
][-10:][::-1]
return StateResponse(
world=_world_state(world),
queue=_queue_info(queue),
wishes_recent=recent,
)
# -------------------------------------------------------------------- /api/wish
@app.post("/api/wish", response_model=WishAccepted)
async def post_wish(
payload: WishRequest, request: Request, response: Response
) -> WishAccepted:
await ensure_worker()
cid, new_cookie = identity.resolve(request)
if new_cookie:
_set_identity_cookie(response, request, new_cookie)
ip_ok, ip_retry = ip_rate_limiter.hit(f"ip:{client_ip(request)}")
allowed, retry_after = (
rate_limiter.hit(cid) if ip_ok else (False, ip_retry)
)
if not allowed:
raise HTTPException(
status_code=429,
detail=RATE_LIMIT_REASON,
headers={"Retry-After": str(int(retry_after) + 1)},
)
if fast_check is not None:
reason = fast_check(payload.text)
if reason:
raise HTTPException(status_code=400, detail=str(reason))
try:
wish_id, position = await queue.submit(payload.text, cid)
except HTTPException:
raise
except Exception as exc: # queue full / per-client pending cap (engine policy)
log.info("wish refused by queue: %s", exc)
# engine.queue_worker raises QueueError subclasses with a poetic .reason;
# AlreadyPending is a client-pacing problem (429), the rest read as 503.
status = int(getattr(exc, "status_code", 0)) or (
429 if type(exc).__name__ == "AlreadyPending" else 503
)
detail = str(getattr(exc, "reason", "") or exc) or QUEUE_FULL_REASON
raise HTTPException(status_code=status, detail=detail) from exc
return WishAccepted(wish_id=wish_id, position=position)
# ------------------------------------------------------------------ /api/stream
@app.get("/api/stream")
async def stream(
limit: Optional[int] = Query(
default=None,
ge=1,
le=10_000,
description="debug/test aid: close the stream after N events",
),
) -> StreamingResponse:
initial = [
{
"type": "hello",
"world_version": int(world.version),
"epoch": int(world.epoch),
},
{"type": "queue", **_queue_info(queue).model_dump()},
]
return StreamingResponse(
hub.event_stream(initial_events=initial, limit=limit),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # required on Spaces or the proxy buffers
"Connection": "keep-alive",
},
)
# ------------------------------------------------------- /api/wishes (Genesis Log)
@app.get("/api/wishes", response_model=list[WishSummary])
async def wishes_index() -> list[WishSummary]:
return [
_wish_summary(trace, epoch)
for epoch, trace in enumerate(persistence.traces(), start=1)
]
@app.get("/api/wishes/{wish_id}")
async def wish_trace(wish_id: str) -> JSONResponse:
trace = persistence.find(wish_id)
if trace is None:
raise HTTPException(status_code=404, detail=NOT_FOUND_REASON)
return JSONResponse(trace)
# ------------------------------------------------------------------ static + book
@app.get("/book", include_in_schema=False)
async def book() -> FileResponse:
page = WEB_DIR / "book.html"
if not page.is_file():
raise HTTPException(status_code=404, detail=NOT_FOUND_REASON)
return FileResponse(page)
# (No Blocks panel: in Server mode the whole app IS the gradio app — a
# Blocks instance would race the runtime's auto-launch for port 7860.)
def _mount_static() -> None:
"""Root static mount. A "/" catch-all registered BEFORE gr.Server.launch()
shadows the /gradio_api routes gradio adds at launch time (verified June
12: the launch self-check 404s and the app dies). So the mount is
deferred: _serve() calls this AFTER launch; the plain-FastAPI/uvicorn
path mounts immediately below."""
if WEB_DIR.is_dir():
# gradio's launch registers its own SPA index at "/" (GET+HEAD) —
# evict exactly those so the planet owns the root; every other
# gradio route (/config, /gradio_api/*, assets) must survive for
# @gradio/client connectivity.
app.router.routes[:] = [
r for r in app.router.routes if getattr(r, "path", None) != "/"
]
app.mount("/", StaticFiles(directory=str(WEB_DIR), html=True), name="web")
else:
log.warning("web/ not found at %s; serving API only", WEB_DIR)
@app.get("/", include_in_schema=False)
async def root_placeholder() -> JSONResponse:
return JSONResponse({"ok": True, "hint": "frontend not built"})
app.state.mount_static = _mount_static
if mount_web and not hasattr(app, "launch"):
_mount_static()
return app
# --------------------------------------------------------------------------- entrypoint
def _serve(application: FastAPI) -> None:
port = int(os.environ.get("PORT", os.environ.get("GRADIO_SERVER_PORT", 7860)))
log.info("GODSEED listening on http://0.0.0.0:%d", port)
if hasattr(application, "launch"):
# Gradio Server mode — launch non-blocking so the web root can mount
# AFTER gradio registers its /gradio_api routes (order = precedence).
application.launch(
server_name="0.0.0.0",
server_port=port,
show_error=False,
prevent_thread_lock=True,
)
mount_static = getattr(application.state, "mount_static", None)
if mount_static is not None:
mount_static()
import time as _time
while True: # keep the process alive; the server runs in gradio's thread
_time.sleep(3600)
else: # pragma: no cover — gradio-less dev fallback
import uvicorn
uvicorn.run(application, host="0.0.0.0", port=port)
# HF Spaces (sdk:gradio) executes this file; locally it's `python app.py`.
# Test imports leave both conditions false and get no side effects.
if __name__ == "__main__" or os.environ.get("SPACE_ID"):
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s"
)
_serve(create_app())