Spaces:
Sleeping
Sleeping
disable flaky LLM judge layer (default-denied every clean wish on llama.cpp); wordlist layers 1-2 remain the gate
db1aa38 verified | """GODSEED — Space entrypoint. gr.Server (FastAPI) + SSE + the WebGL planet. | |
| Nine billion parameters. One shared world. | |
| Wiring (ARCHITECTURE.md is the contract): | |
| web/ (Three.js planet) <-- GET / static (mounted after launch) | |
| Genesis Log <-- GET /book static (web/book.html) | |
| world snapshot <-- GET /api/state | |
| make a wish <-- POST /api/wish (rate-limited, moderation fast-path) | |
| live liturgy <-- GET /api/stream SSE broadcast (X-Accel-Buffering: no) | |
| the rite's token <-- GET /api/turn (cookie-authenticated; owner only) | |
| grant the wish (GPU) <-- gradio @app.api "grant" (runs in the wisher's quota) | |
| Genesis Log index <-- GET /api/wishes | |
| single trace (replay) <-- GET /api/wishes/{id} | |
| Run: python app.py (gr.Server launch on $PORT, default 7860) | |
| Backends: GODSEED_BACKEND = mock | llamacpp | zerogpu (default mock; live = zerogpu) | |
| Sync: HF_TOKEN + GODSEED_DATASET enable trace persistence to an HF dataset. | |
| The deterministic engine owns all world state; the mind only interprets and narrates; | |
| this file only moves bytes. Engine/mind constructor assumptions are concentrated in | |
| ``_default_wiring`` below — the single integration point. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import inspect | |
| import logging | |
| import os | |
| # ZeroGPU runtime: the `spaces` lib must be imported before torch anywhere in | |
| # the process (it patches torch to virtualize the GPU). No-op everywhere else. | |
| if os.environ.get("GODSEED_BACKEND", "").strip().lower() == "zerogpu": | |
| try: | |
| import spaces # noqa: F401 | |
| except ImportError: | |
| pass | |
| # No GPU exists at startup, so torch never loads its bundled CUDA runtime — | |
| # but mamba_ssm's compiled kernels (imported at module level by NVIDIA's | |
| # NemotronH remote code) link libcudart.so.12. Preload it so imports resolve. | |
| try: | |
| import ctypes | |
| import glob | |
| for _lib in glob.glob( | |
| "/usr/local/lib/python3*/site-packages/nvidia/cuda_runtime/lib/libcudart.so.12*" | |
| ): | |
| try: | |
| ctypes.CDLL(_lib, mode=ctypes.RTLD_GLOBAL) | |
| break | |
| except OSError: | |
| continue | |
| except Exception: # noqa: BLE001 — best-effort; worst case the slow path runs | |
| pass | |
| from contextlib import asynccontextmanager | |
| from pathlib import Path | |
| from typing import Any, Callable, Optional, Protocol | |
| from fastapi import FastAPI, HTTPException, Query, Request, Response | |
| from fastapi.responses import FileResponse, JSONResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from server.persistence import PersistenceService | |
| from server.ratelimit import ( | |
| COOKIE_NAME, | |
| DEFAULT_IP_LIMIT, | |
| ClientIdentity, | |
| RateLimiter, | |
| client_ip, | |
| ) | |
| from server.schemas import ( | |
| QueueInfo, | |
| StateResponse, | |
| WishAccepted, | |
| WishRequest, | |
| WishSummary, | |
| WorldState, | |
| ) | |
| from server.sse import SSEHub | |
| log = logging.getLogger("godseed") | |
| ROOT = Path(__file__).resolve().parent | |
| WEB_DIR = ROOT / "web" | |
| TRACES_DIR = ROOT / "traces" | |
| VALID_BACKENDS = ("mock", "llamacpp", "zerogpu") | |
| # Poetry for the unhappy paths — the god never returns a stack trace. | |
| RATE_LIMIT_REASON = ( | |
| "The god hears only three wishes an hour from one voice. " | |
| "Walk the world a while; return when the shadows have moved." | |
| ) | |
| QUEUE_FULL_REASON = "The god is overwhelmed; return at dusk." | |
| NOT_FOUND_REASON = "No such wish has been granted." | |
| # --------------------------------------------------------------------------- protocols | |
| class WorldLike(Protocol): | |
| """What the server reads from the engine's world (engine owns all writes).""" | |
| version: int | |
| epoch: int | |
| features: list[dict[str, Any]] | |
| class QueueLike(Protocol): | |
| """What the server needs from engine.queue_worker.""" | |
| async def submit(self, text: str, client_id: str) -> tuple[str, int]: ... | |
| def snapshot(self) -> dict[str, Any]: ... # {"length": int, "current": {...}|None} | |
| FastCheck = Callable[[str], Optional[str]] # returns a poetic rejection reason or None | |
| # ----------------------------------------------------------------- default (real) wiring | |
| def _default_wiring(traces_dir: Path) -> dict[str, Any]: | |
| """Build the real engine + mind stack. ALL cross-component wiring lives here so | |
| integration fixes touch exactly one function.""" | |
| backend_name = os.environ.get("GODSEED_BACKEND", "mock").strip().lower() or "mock" | |
| if backend_name not in VALID_BACKENDS: | |
| log.warning("unknown GODSEED_BACKEND=%r; falling back to mock", backend_name) | |
| backend_name = "mock" | |
| from engine.genesis import genesis_features | |
| from engine.moderation import Moderator | |
| from engine.queue_worker import QueueWorker | |
| from engine.world import World | |
| from mind.backends import make_backend | |
| from mind.moderation_judge import judge as judge_fn | |
| from mind.planner import Planner | |
| # Boot: restore wishes.jsonl from the dataset if configured, rebuild the ordered | |
| # feature list (genesis + every trace's features), load the world verbatim. | |
| persistence = PersistenceService(traces_dir) | |
| features = persistence.boot(genesis_features()) | |
| epitaphs = [t.get("epitaph") for t in persistence.traces() if t.get("epitaph")] | |
| world = World.load(features, epitaphs) | |
| # The zerogpu backend eager-loads the 9B at construction (ZeroGPU needs the | |
| # weights resident before the first request). If that fails — OOM, a model | |
| # download hiccup — DON'T crash-loop the whole Space: the planet, the Genesis | |
| # Log and every zero-GPU view must still serve. Fall back to mock so visitors | |
| # see the world; granting degrades to scripted until the Space is healthy. | |
| try: | |
| backend = make_backend(backend_name) | |
| except Exception: | |
| log.exception("backend %r failed to load; serving with mock granting", backend_name) | |
| backend = make_backend("mock") | |
| backend_name = "mock" | |
| planner = Planner(backend) | |
| # Moderation layers 1-2 (charset/length + a thorough wordlist with | |
| # confusable/leetspeak/skeleton folding) are the reliable gate and run on | |
| # EVERY wish. Layer 3 (an LLM yes/no judge through the same model) is opt-in: | |
| # on llama.cpp its grammar-constrained verdict often fails to parse and then | |
| # DEFAULT-DENIES every clean wish (June 12: "found a town" rejected as | |
| # 'uncertain'). Off by default; GODSEED_JUDGE=on re-enables it. | |
| use_judge = os.environ.get("GODSEED_JUDGE", "").strip().lower() in ("1", "on", "true") | |
| judge = (lambda text: judge_fn(text, backend)) if use_judge else None | |
| moderator = Moderator(judge=judge) | |
| def fast_check(text: str) -> Optional[str]: | |
| verdict = moderator.precheck(text) | |
| if verdict.allowed: | |
| return None | |
| return verdict.poetic_reason or "The god declines this wish." | |
| hub = SSEHub() | |
| async def persist(trace: Any) -> None: | |
| """Worker hands over the WishTrace; embed the wish's engine-built features | |
| (filtered from the world by wish_id) so boot rebuild is exact.""" | |
| if not isinstance(trace, dict): | |
| trace = dict(getattr(trace, "__dict__", None) or {}) | |
| wish_id = trace.get("wish_id") | |
| wish_features = [ | |
| f.to_dict() for f in world.features if f.wish_id == wish_id | |
| ] | |
| trace.setdefault("feature_ids", [f["id"] for f in wish_features]) | |
| await persistence.record(trace, wish_features) | |
| # ZeroGPU: GPU work must run inside the wisher's gradio request (their HF | |
| # auth headers carry the quota), so the worker hands the rite to the browser | |
| # via your_turn/invoke. Override with GODSEED_INVOKE=server|browser. | |
| invoke_mode = os.environ.get("GODSEED_INVOKE", "").strip().lower() | |
| browser_invoked = ( | |
| invoke_mode == "browser" | |
| if invoke_mode in ("browser", "server") | |
| else backend_name == "zerogpu" | |
| ) | |
| # Wish numbering must survive zero-feature wishes across restarts: the | |
| # worker derives its counter from WORLD features, but a wish whose calls | |
| # all failed leaves no feature — after a restart the same id was reissued | |
| # (June 12: two w_000004 in the log). Traces are the complete record. | |
| import re as _re | |
| trace_max = 0 | |
| for t in persistence.traces(): | |
| m = _re.match(r"w_(\d{6})$", str(t.get("wish_id") or "")) | |
| if m: | |
| trace_max = max(trace_max, int(m.group(1))) | |
| queue = QueueWorker( | |
| world=world, | |
| moderator=moderator, | |
| planner=planner, | |
| emit=hub.emit, | |
| persist=persist, | |
| browser_invoked=browser_invoked, | |
| next_wish_index=max(QueueWorker._derive_wish_counter(world), trace_max + 1), | |
| ) | |
| log.info( | |
| "godseed wired: backend=%s features=%d traces=%d dataset=%s", | |
| backend_name, | |
| world.version, | |
| len(persistence.traces()), | |
| persistence.sync.dataset if persistence.sync.enabled else "off", | |
| ) | |
| return { | |
| "world": world, | |
| "queue": queue, | |
| "hub": hub, | |
| "persistence": persistence, | |
| "fast_check": fast_check, | |
| } | |
| # --------------------------------------------------------------------------- helpers | |
| async def _maybe_await(result: Any) -> Any: | |
| if inspect.isawaitable(result): | |
| return await result | |
| return result | |
| def _set_identity_cookie(response: Response, request: Request, value: str) -> None: | |
| """Spaces serve over https inside an iframe (cross-site -> SameSite=None+Secure); | |
| local dev is plain http (Lax, not Secure).""" | |
| scheme = request.headers.get("x-forwarded-proto", request.url.scheme) | |
| secure = scheme == "https" | |
| response.set_cookie( | |
| COOKIE_NAME, | |
| value, | |
| max_age=365 * 24 * 3600, | |
| path="/", | |
| httponly=True, | |
| secure=secure, | |
| samesite="none" if secure else "lax", | |
| ) | |
| def _world_state(world: WorldLike) -> WorldState: | |
| # engine.world.World exposes state_dict() (features as plain dicts); the model | |
| # ignores its extra derived keys. Attribute fallback covers simpler worlds. | |
| if hasattr(world, "state_dict"): | |
| return WorldState.model_validate(world.state_dict()) | |
| return WorldState( | |
| version=int(world.version), | |
| epoch=int(world.epoch), | |
| features=list(world.features), | |
| ) | |
| def _queue_info(queue: QueueLike) -> QueueInfo: | |
| try: | |
| snapshot = queue.snapshot() or {} | |
| except Exception: # noqa: BLE001 — a queue hiccup must not kill /api/state | |
| log.exception("queue snapshot failed") | |
| snapshot = {} | |
| return QueueInfo.model_validate( | |
| {"length": snapshot.get("length", 0), "current": snapshot.get("current")} | |
| ) | |
| def _wish_summary(trace: dict[str, Any], epoch: int) -> WishSummary: | |
| return WishSummary( | |
| wish_id=str(trace.get("wish_id", "")), | |
| text=str(trace.get("text", "")), | |
| epitaph=str(trace.get("epitaph") or ""), | |
| epoch=epoch, | |
| ts=float(trace.get("submitted_at") or 0.0), | |
| ) | |
| # --------------------------------------------------------------------------- the app | |
| def create_app( | |
| *, | |
| world: Optional[WorldLike] = None, | |
| queue: Optional[QueueLike] = None, | |
| hub: Optional[SSEHub] = None, | |
| persistence: Optional[PersistenceService] = None, | |
| fast_check: Optional[FastCheck] = None, | |
| rate_limiter: Optional[RateLimiter] = None, | |
| ip_rate_limiter: Optional[RateLimiter] = None, | |
| identity: Optional[ClientIdentity] = None, | |
| traces_dir: Optional[Path | str] = None, | |
| mount_web: bool = True, | |
| mount_panel: bool = True, | |
| ) -> FastAPI: | |
| """Build the ASGI app. With no arguments this wires the real engine + mind from | |
| env config; tests inject fakes for everything engine/mind-shaped.""" | |
| traces_path = Path(traces_dir) if traces_dir is not None else TRACES_DIR | |
| if queue is None: | |
| wired = _default_wiring(traces_path) | |
| world = wired["world"] | |
| queue = wired["queue"] | |
| # The default queue emits into the hub _default_wiring built; an injected hub | |
| # is only honored together with an injected queue. | |
| hub = wired["hub"] | |
| persistence = wired["persistence"] | |
| fast_check = wired["fast_check"] | |
| if world is None or hub is None or persistence is None: | |
| raise ValueError( | |
| "create_app: when injecting `queue`, also inject `world`, `hub`, and " | |
| "`persistence`" | |
| ) | |
| rate_limiter = rate_limiter or RateLimiter() | |
| ip_rate_limiter = ip_rate_limiter or RateLimiter(limit=DEFAULT_IP_LIMIT) | |
| identity = identity or ClientIdentity() | |
| async def lifespan(app: FastAPI): | |
| queue_task: Optional[asyncio.Task[Any]] = None | |
| if hasattr(queue, "start"): | |
| await _maybe_await(queue.start()) | |
| elif hasattr(queue, "run"): | |
| queue_task = asyncio.create_task(queue.run()) | |
| yield | |
| if hasattr(queue, "stop"): | |
| await _maybe_await(queue.stop()) | |
| elif queue_task is not None: | |
| queue_task.cancel() | |
| await asyncio.gather(queue_task, return_exceptions=True) | |
| await persistence.drain() | |
| # Gradio Server mode (the "her" idiom): gr.Server IS a FastAPI app with | |
| # gradio's API engine attached — the sdk:gradio runtime serves it natively, | |
| # and @app.api endpoints are gradio endpoints the browser calls via | |
| # @gradio/client, which forwards the HF auth headers ZeroGPU quota needs. | |
| try: | |
| import gradio as gr | |
| app: FastAPI = gr.Server(title="GODSEED") | |
| app.router.lifespan_context = lifespan | |
| except Exception: # pragma: no cover — gradio always present in prod | |
| app = FastAPI(title="GODSEED", docs_url=None, redoc_url=None, lifespan=lifespan) | |
| app.state.world = world | |
| app.state.queue = queue | |
| app.state.hub = hub | |
| app.state.persistence = persistence | |
| async def ensure_worker() -> None: | |
| """Start the queue worker on the SERVING event loop, lazily. gradio's | |
| launch() runs uvicorn without honoring an injected lifespan (verified | |
| June 12: the worker never started under Server mode), so the first | |
| request wakes the god instead. Idempotent and loop-correct everywhere.""" | |
| if ( | |
| hasattr(queue, "start") | |
| and hasattr(queue, "is_running") | |
| and not queue.is_running | |
| ): | |
| await _maybe_await(queue.start()) | |
| # GPU rite endpoint: when the queue announces your_turn over SSE, the | |
| # wisher's browser calls this named gradio endpoint with the one-time token. | |
| if hasattr(app, "api") and hasattr(queue, "invoke"): | |
| async def grant(wish_id: str = "", token: str = "") -> dict: | |
| try: | |
| await ensure_worker() | |
| return await queue.invoke(wish_id, token) | |
| except Exception as exc: | |
| reason = getattr(exc, "reason", None) or "the rite failed; the god moved on" | |
| return {"ok": False, "wish_id": wish_id, "reason": str(reason)} | |
| # ------------------------------------------------------------------- /api/turn | |
| # The owner's browser fetches its secret invocation token here. The token is | |
| # identity-bound (signed cookie) and never broadcast over SSE, so a stranger | |
| # tailing /api/stream cannot steal another visitor's rite (verify-fleet | |
| # critical, June 12). A plain FastAPI route DOES see the cookie (unlike the | |
| # gradio @app.api endpoint), which is why the auth check lives here. | |
| if hasattr(queue, "turn_token"): | |
| async def turn(wish_id: str, request: Request, response: Response) -> JSONResponse: | |
| await ensure_worker() | |
| cid, new_cookie = identity.resolve(request) | |
| if new_cookie: | |
| _set_identity_cookie(response, request, new_cookie) | |
| tok = queue.turn_token(wish_id, cid) | |
| if tok is None: | |
| return JSONResponse({"error": "not your turn"}, status_code=403) | |
| return JSONResponse({"wish_id": wish_id, "token": tok}) | |
| # ------------------------------------------------------------------- /api/state | |
| async def get_state() -> StateResponse: | |
| await ensure_worker() | |
| traces = persistence.traces() | |
| recent = [ | |
| _wish_summary(trace, epoch) | |
| for epoch, trace in enumerate(traces, start=1) | |
| ][-10:][::-1] | |
| return StateResponse( | |
| world=_world_state(world), | |
| queue=_queue_info(queue), | |
| wishes_recent=recent, | |
| ) | |
| # -------------------------------------------------------------------- /api/wish | |
| async def post_wish( | |
| payload: WishRequest, request: Request, response: Response | |
| ) -> WishAccepted: | |
| await ensure_worker() | |
| cid, new_cookie = identity.resolve(request) | |
| if new_cookie: | |
| _set_identity_cookie(response, request, new_cookie) | |
| ip_ok, ip_retry = ip_rate_limiter.hit(f"ip:{client_ip(request)}") | |
| allowed, retry_after = ( | |
| rate_limiter.hit(cid) if ip_ok else (False, ip_retry) | |
| ) | |
| if not allowed: | |
| raise HTTPException( | |
| status_code=429, | |
| detail=RATE_LIMIT_REASON, | |
| headers={"Retry-After": str(int(retry_after) + 1)}, | |
| ) | |
| if fast_check is not None: | |
| reason = fast_check(payload.text) | |
| if reason: | |
| raise HTTPException(status_code=400, detail=str(reason)) | |
| try: | |
| wish_id, position = await queue.submit(payload.text, cid) | |
| except HTTPException: | |
| raise | |
| except Exception as exc: # queue full / per-client pending cap (engine policy) | |
| log.info("wish refused by queue: %s", exc) | |
| # engine.queue_worker raises QueueError subclasses with a poetic .reason; | |
| # AlreadyPending is a client-pacing problem (429), the rest read as 503. | |
| status = int(getattr(exc, "status_code", 0)) or ( | |
| 429 if type(exc).__name__ == "AlreadyPending" else 503 | |
| ) | |
| detail = str(getattr(exc, "reason", "") or exc) or QUEUE_FULL_REASON | |
| raise HTTPException(status_code=status, detail=detail) from exc | |
| return WishAccepted(wish_id=wish_id, position=position) | |
| # ------------------------------------------------------------------ /api/stream | |
| async def stream( | |
| limit: Optional[int] = Query( | |
| default=None, | |
| ge=1, | |
| le=10_000, | |
| description="debug/test aid: close the stream after N events", | |
| ), | |
| ) -> StreamingResponse: | |
| initial = [ | |
| { | |
| "type": "hello", | |
| "world_version": int(world.version), | |
| "epoch": int(world.epoch), | |
| }, | |
| {"type": "queue", **_queue_info(queue).model_dump()}, | |
| ] | |
| return StreamingResponse( | |
| hub.event_stream(initial_events=initial, limit=limit), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no", # required on Spaces or the proxy buffers | |
| "Connection": "keep-alive", | |
| }, | |
| ) | |
| # ------------------------------------------------------- /api/wishes (Genesis Log) | |
| async def wishes_index() -> list[WishSummary]: | |
| return [ | |
| _wish_summary(trace, epoch) | |
| for epoch, trace in enumerate(persistence.traces(), start=1) | |
| ] | |
| async def wish_trace(wish_id: str) -> JSONResponse: | |
| trace = persistence.find(wish_id) | |
| if trace is None: | |
| raise HTTPException(status_code=404, detail=NOT_FOUND_REASON) | |
| return JSONResponse(trace) | |
| # ------------------------------------------------------------------ static + book | |
| async def book() -> FileResponse: | |
| page = WEB_DIR / "book.html" | |
| if not page.is_file(): | |
| raise HTTPException(status_code=404, detail=NOT_FOUND_REASON) | |
| return FileResponse(page) | |
| # (No Blocks panel: in Server mode the whole app IS the gradio app — a | |
| # Blocks instance would race the runtime's auto-launch for port 7860.) | |
| def _mount_static() -> None: | |
| """Root static mount. A "/" catch-all registered BEFORE gr.Server.launch() | |
| shadows the /gradio_api routes gradio adds at launch time (verified June | |
| 12: the launch self-check 404s and the app dies). So the mount is | |
| deferred: _serve() calls this AFTER launch; the plain-FastAPI/uvicorn | |
| path mounts immediately below.""" | |
| if WEB_DIR.is_dir(): | |
| # gradio's launch registers its own SPA index at "/" (GET+HEAD) — | |
| # evict exactly those so the planet owns the root; every other | |
| # gradio route (/config, /gradio_api/*, assets) must survive for | |
| # @gradio/client connectivity. | |
| app.router.routes[:] = [ | |
| r for r in app.router.routes if getattr(r, "path", None) != "/" | |
| ] | |
| app.mount("/", StaticFiles(directory=str(WEB_DIR), html=True), name="web") | |
| else: | |
| log.warning("web/ not found at %s; serving API only", WEB_DIR) | |
| async def root_placeholder() -> JSONResponse: | |
| return JSONResponse({"ok": True, "hint": "frontend not built"}) | |
| app.state.mount_static = _mount_static | |
| if mount_web and not hasattr(app, "launch"): | |
| _mount_static() | |
| return app | |
| # --------------------------------------------------------------------------- entrypoint | |
| def _serve(application: FastAPI) -> None: | |
| port = int(os.environ.get("PORT", os.environ.get("GRADIO_SERVER_PORT", 7860))) | |
| log.info("GODSEED listening on http://0.0.0.0:%d", port) | |
| if hasattr(application, "launch"): | |
| # Gradio Server mode — launch non-blocking so the web root can mount | |
| # AFTER gradio registers its /gradio_api routes (order = precedence). | |
| application.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| show_error=False, | |
| prevent_thread_lock=True, | |
| ) | |
| mount_static = getattr(application.state, "mount_static", None) | |
| if mount_static is not None: | |
| mount_static() | |
| import time as _time | |
| while True: # keep the process alive; the server runs in gradio's thread | |
| _time.sleep(3600) | |
| else: # pragma: no cover — gradio-less dev fallback | |
| import uvicorn | |
| uvicorn.run(application, host="0.0.0.0", port=port) | |
| # HF Spaces (sdk:gradio) executes this file; locally it's `python app.py`. | |
| # Test imports leave both conditions false and get no side effects. | |
| if __name__ == "__main__" or os.environ.get("SPACE_ID"): | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s" | |
| ) | |
| _serve(create_app()) | |