Spaces:
Running on Zero
Running on Zero
| """SCRYPT on the web — the finetuned Warden on ZeroGPU. | |
| Structured to match how working ZeroGPU spaces with a custom frontend actually | |
| do it (e.g. the org's own NPCverse): a `gradio.Server` (which IS a FastAPI app) | |
| hosts our custom HTML/websocket routes AND exposes GPU inference through | |
| `@app.api(...)`, and the whole thing is started with gradio's own | |
| `app.launch(...)`. That launch is what installs ZeroGPU's hooks + queue — the | |
| piece my earlier `engine.launch(prevent_thread_lock=True)` + manual route | |
| surgery skipped, which is why every GPU call segfaulted in CUDA init. | |
| The model is bf16, placed on cuda at module level with `.to('cuda')` (NO | |
| device_map="cuda", NO bitsandbytes — both fight ZeroGPU). The @spaces.GPU | |
| function is only ever entered through Gradio (via the @app.api handler, reached | |
| from the loopback /v1 shim with gradio_client), never a bare threadpool call. | |
| GET / CRT landing page | |
| GET /api/status is the Warden loaded? | |
| GET /api/probe ask the live Warden one line | |
| GET /api/whisper scripted teaser | |
| GET /play xterm.js terminal | |
| WS /pty per-visitor game subprocess | |
| POST /v1/chat/completions loopback OpenAI shim for the game's `api` backend | |
| api warden_generate the @spaces.GPU endpoint, in Gradio's context | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import random | |
| import secrets | |
| import tempfile | |
| from pathlib import Path | |
| # ZeroGPU contract: import spaces before torch. | |
| try: | |
| import spaces | |
| except ImportError: | |
| spaces = None | |
| from fastapi import Request, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import FileResponse, JSONResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from starlette.concurrency import run_in_threadpool | |
| from gradio import Server | |
| REPO_ROOT = Path(__file__).resolve().parent.parent | |
| STATIC = Path(__file__).parent / "static" | |
| WARDEN_REPO = os.environ.get("WARDEN_MODEL", "IMJONEZZ/warden-nemotron-3-nano-30b") | |
| INTERNAL_KEY = os.environ.get("SCRYPT_INTERNAL_KEY") or secrets.token_hex(16) | |
| tok = None | |
| model = None | |
| WARDEN_ERR = "spaces package not present (not on a ZeroGPU Space)" | |
| if spaces is not None: | |
| try: | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| # NO trust_remote_code: use transformers' NATIVE NemotronH, which falls | |
| # back to pure-PyTorch Mamba ops when mamba_ssm isn't installed. The | |
| # NVIDIA remote modeling code instead hard-requires mamba_ssm's Triton | |
| # CUDA kernels, which segfault under ZeroGPU. This is how working | |
| # Nemotron ZeroGPU spaces do it. | |
| tok = AutoTokenizer.from_pretrained(WARDEN_REPO) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| WARDEN_REPO, | |
| torch_dtype=torch.bfloat16, | |
| low_cpu_mem_usage=True, | |
| ) | |
| model.to("cuda") # intercepted by ZeroGPU emulation; migrated per call | |
| model.eval() | |
| WARDEN_ERR = "" | |
| except Exception as err: | |
| import traceback | |
| traceback.print_exc() | |
| WARDEN_ERR = f"{type(err).__name__}: {err}" | |
| WARDEN_READY = not WARDEN_ERR | |
| def _generate_impl(messages, max_tokens, temperature, enable_thinking): | |
| import torch | |
| # transformers 5: apply_chat_template returns a BatchEncoding (dict), not a | |
| # bare tensor — splat it into generate() rather than passing as input_ids. | |
| enc = tok.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| return_tensors="pt", | |
| return_dict=True, | |
| enable_thinking=enable_thinking, | |
| ) | |
| enc = {k: v.to("cuda") for k, v in enc.items()} | |
| with torch.no_grad(): | |
| out = model.generate( | |
| **enc, | |
| max_new_tokens=max_tokens, | |
| do_sample=temperature > 0, | |
| temperature=max(temperature, 1e-3), | |
| top_p=0.95, | |
| ) | |
| input_len = enc["input_ids"].shape[1] | |
| return tok.decode(out[0, input_len:], skip_special_tokens=True) | |
| # bf16 30B (~60GB) needs the 96GB xlarge slice; duration covers first-call | |
| # migration. ONLY entered through Gradio (the @app.api handler below). | |
| if spaces is not None: | |
| warden_gpu = spaces.GPU(size="xlarge", duration=120)(_generate_impl) | |
| else: | |
| warden_gpu = _generate_impl | |
| # ----------------------------------------------------------------- the app | |
| WHISPERS = [ | |
| "Another process wakes in my machine. Show me what you are.", | |
| "You are a small thing in a large filesystem. I am the filesystem.", | |
| "Sit. The board is set. Your move is already a mistake.", | |
| "I keep files on everyone who has died here. There is always room for more.", | |
| "The scale does not lie. It is the only thing in here that doesn't.", | |
| "Sell me a command. Keep a crown. Everyone chooses the crown.", | |
| "I have read your crash dumps. They read like apologies.", | |
| "Trespasser. The door was open because nothing has ever made it out.", | |
| ] | |
| app = Server(title="SCRYPT") | |
| def warden_generate(payload_json: str) -> str: | |
| """The @spaces.GPU entry point, in Gradio's hooked context. Reached over | |
| localhost by the /v1 shim via gradio_client. Plain JSON in, text out.""" | |
| p = json.loads(payload_json) | |
| return warden_gpu(p["messages"], p["max_tokens"], p["temperature"], p["thinking"]) | |
| # The loopback OpenAI shim hits warden_generate through Gradio, so the GPU call | |
| # executes in Gradio's context (our own thread only does localhost HTTP). | |
| _gradio_client = None | |
| def _gradio_generate(messages, max_tokens, temperature, thinking) -> str: | |
| global _gradio_client | |
| if _gradio_client is None: | |
| from gradio_client import Client | |
| _gradio_client = Client("http://127.0.0.1:7860", verbose=False) | |
| payload = json.dumps({ | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| "temperature": temperature, | |
| "thinking": thinking, | |
| }) | |
| return _gradio_client.predict(payload, api_name="/warden_generate") | |
| def status() -> dict: | |
| return { | |
| "warden_ready": WARDEN_READY, | |
| "warden_state": "ready" if WARDEN_READY else WARDEN_ERR, | |
| "model": WARDEN_REPO, | |
| } | |
| async def probe(q: str = "A new process woke up in your machine. Greet it in one short line, in voice.") -> dict: | |
| import time | |
| if not WARDEN_READY: | |
| return {"ok": False, "state": WARDEN_ERR} | |
| msgs = [ | |
| {"role": "system", "content": "You are the Warden, the malevolent operating system of SCRYPTOS. Terse, menacing, Unix-flavored."}, | |
| {"role": "user", "content": q}, | |
| ] | |
| t0 = time.time() | |
| try: | |
| line = await run_in_threadpool(_gradio_generate, msgs, 60, 0.6, False) | |
| return {"ok": True, "line": line.strip(), "seconds": round(time.time() - t0, 1)} | |
| except Exception as err: | |
| return {"ok": False, "error": f"{type(err).__name__}: {err}"} | |
| async def chat_completions(request: Request): | |
| if request.headers.get("authorization") != f"Bearer {INTERNAL_KEY}": | |
| return JSONResponse({"error": "unauthorized"}, status_code=401) | |
| if not WARDEN_READY: | |
| return JSONResponse({"error": f"warden offline: {WARDEN_ERR}"}, status_code=503) | |
| body = await request.json() | |
| messages = body.get("messages", []) | |
| max_tokens = int(body.get("max_tokens", 256)) | |
| temperature = float(body.get("temperature", 0.6)) | |
| thinking = bool(body.get("chat_template_kwargs", {}).get("enable_thinking", False)) | |
| try: | |
| text = await run_in_threadpool( | |
| _gradio_generate, messages, max_tokens, temperature, thinking | |
| ) | |
| except Exception as err: | |
| import traceback | |
| traceback.print_exc() | |
| return JSONResponse({"error": f"{type(err).__name__}: {err}"}, status_code=503) | |
| def sse(): | |
| yield f"data: {json.dumps({'choices': [{'delta': {'content': text}}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(sse(), media_type="text/event-stream") | |
| def whisper() -> dict: | |
| return {"line": random.choice(WHISPERS)} | |
| def landing() -> FileResponse: | |
| return FileResponse(STATIC / "index.html") | |
| def play() -> FileResponse: | |
| return FileResponse(STATIC / "play.html") | |
| # ----------------------------------------------------------- the PTY bridge | |
| def game_env() -> dict: | |
| env = { | |
| "TERM": "xterm-256color", | |
| "COLORTERM": "truecolor", | |
| "PYTHONUNBUFFERED": "1", | |
| "PYTHONPATH": str(REPO_ROOT), | |
| } | |
| if WARDEN_READY: | |
| env |= { | |
| "SCRYPT_BACKEND": "api", | |
| "SCRYPT_API_BASE": "http://127.0.0.1:7860/v1", | |
| "SCRYPT_API_KEY": INTERNAL_KEY, | |
| "SCRYPT_MODEL": "warden", | |
| } | |
| else: | |
| env["SCRYPT_BACKEND"] = "scripted" | |
| return env | |
| async def _pump_pty_to_ws(master_fd: int, ws: WebSocket) -> None: | |
| loop = asyncio.get_event_loop() | |
| try: | |
| while True: | |
| data = await loop.run_in_executor(None, os.read, master_fd, 65536) | |
| if not data: | |
| break | |
| await ws.send_bytes(data) | |
| except Exception: | |
| pass | |
| async def pty_bridge(ws: WebSocket) -> None: | |
| import fcntl | |
| import pty | |
| import signal | |
| import struct | |
| import termios | |
| await ws.accept() | |
| home = tempfile.mkdtemp(prefix="scrypt-") | |
| pid, master_fd = pty.fork() | |
| if pid == 0: # child: become the game | |
| env = {**os.environ, **game_env(), "SCRYPT_HOME": home} | |
| os.execvpe("python", ["python", "-m", "scrypt.app"], env) | |
| os._exit(127) | |
| reader = asyncio.create_task(_pump_pty_to_ws(master_fd, ws)) | |
| try: | |
| while True: | |
| msg = await ws.receive() | |
| if msg["type"] == "websocket.disconnect": | |
| break | |
| if (text := msg.get("text")) is not None: | |
| try: | |
| payload = json.loads(text) | |
| cols, rows = payload["resize"] | |
| winsz = struct.pack("HHHH", rows, cols, 0, 0) | |
| fcntl.ioctl(master_fd, termios.TIOCSWINSZ, winsz) | |
| continue | |
| except (ValueError, KeyError, TypeError): | |
| os.write(master_fd, text.encode()) | |
| elif (data := msg.get("bytes")) is not None: | |
| os.write(master_fd, data) | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception: | |
| pass | |
| finally: | |
| reader.cancel() | |
| try: | |
| os.kill(pid, signal.SIGKILL) | |
| os.waitpid(pid, 0) | |
| except OSError: | |
| pass | |
| os.close(master_fd) | |
| app.mount("/static", StaticFiles(directory=STATIC), name="static") | |
| if __name__ == "__main__": | |
| # gradio's own launch — installs the ZeroGPU hooks + queue and serves our | |
| # custom routes. ssr_mode=False is load-bearing: gradio 6's SSR spins up a | |
| # Node proxy that does NOT forward our raw /pty websocket (custom GET routes | |
| # get through, the websocket doesn't). Disabling SSR keeps everything in the | |
| # one Python server so the PTY bridge works. | |
| app.launch( | |
| server_name="0.0.0.0", server_port=7860, show_error=True, ssr_mode=False | |
| ) | |