Scrypt / space /app.py
IMJONEZZ's picture
space: ssr_mode=False on launch — gradio 6 SSR's Node proxy doesn't forward the raw /pty websocket
95ab054
Raw
History Blame Contribute Delete
11.2 kB
"""SCRYPT on the web — the finetuned Warden on ZeroGPU.
Structured to match how working ZeroGPU spaces with a custom frontend actually
do it (e.g. the org's own NPCverse): a `gradio.Server` (which IS a FastAPI app)
hosts our custom HTML/websocket routes AND exposes GPU inference through
`@app.api(...)`, and the whole thing is started with gradio's own
`app.launch(...)`. That launch is what installs ZeroGPU's hooks + queue — the
piece my earlier `engine.launch(prevent_thread_lock=True)` + manual route
surgery skipped, which is why every GPU call segfaulted in CUDA init.
The model is bf16, placed on cuda at module level with `.to('cuda')` (NO
device_map="cuda", NO bitsandbytes — both fight ZeroGPU). The @spaces.GPU
function is only ever entered through Gradio (via the @app.api handler, reached
from the loopback /v1 shim with gradio_client), never a bare threadpool call.
GET / CRT landing page
GET /api/status is the Warden loaded?
GET /api/probe ask the live Warden one line
GET /api/whisper scripted teaser
GET /play xterm.js terminal
WS /pty per-visitor game subprocess
POST /v1/chat/completions loopback OpenAI shim for the game's `api` backend
api warden_generate the @spaces.GPU endpoint, in Gradio's context
"""
from __future__ import annotations
import asyncio
import json
import os
import random
import secrets
import tempfile
from pathlib import Path
# ZeroGPU contract: import spaces before torch.
try:
import spaces
except ImportError:
spaces = None
from fastapi import Request, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from starlette.concurrency import run_in_threadpool
from gradio import Server
REPO_ROOT = Path(__file__).resolve().parent.parent
STATIC = Path(__file__).parent / "static"
WARDEN_REPO = os.environ.get("WARDEN_MODEL", "IMJONEZZ/warden-nemotron-3-nano-30b")
INTERNAL_KEY = os.environ.get("SCRYPT_INTERNAL_KEY") or secrets.token_hex(16)
tok = None
model = None
WARDEN_ERR = "spaces package not present (not on a ZeroGPU Space)"
if spaces is not None:
try:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
# NO trust_remote_code: use transformers' NATIVE NemotronH, which falls
# back to pure-PyTorch Mamba ops when mamba_ssm isn't installed. The
# NVIDIA remote modeling code instead hard-requires mamba_ssm's Triton
# CUDA kernels, which segfault under ZeroGPU. This is how working
# Nemotron ZeroGPU spaces do it.
tok = AutoTokenizer.from_pretrained(WARDEN_REPO)
model = AutoModelForCausalLM.from_pretrained(
WARDEN_REPO,
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True,
)
model.to("cuda") # intercepted by ZeroGPU emulation; migrated per call
model.eval()
WARDEN_ERR = ""
except Exception as err:
import traceback
traceback.print_exc()
WARDEN_ERR = f"{type(err).__name__}: {err}"
WARDEN_READY = not WARDEN_ERR
def _generate_impl(messages, max_tokens, temperature, enable_thinking):
import torch
# transformers 5: apply_chat_template returns a BatchEncoding (dict), not a
# bare tensor — splat it into generate() rather than passing as input_ids.
enc = tok.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt",
return_dict=True,
enable_thinking=enable_thinking,
)
enc = {k: v.to("cuda") for k, v in enc.items()}
with torch.no_grad():
out = model.generate(
**enc,
max_new_tokens=max_tokens,
do_sample=temperature > 0,
temperature=max(temperature, 1e-3),
top_p=0.95,
)
input_len = enc["input_ids"].shape[1]
return tok.decode(out[0, input_len:], skip_special_tokens=True)
# bf16 30B (~60GB) needs the 96GB xlarge slice; duration covers first-call
# migration. ONLY entered through Gradio (the @app.api handler below).
if spaces is not None:
warden_gpu = spaces.GPU(size="xlarge", duration=120)(_generate_impl)
else:
warden_gpu = _generate_impl
# ----------------------------------------------------------------- the app
WHISPERS = [
"Another process wakes in my machine. Show me what you are.",
"You are a small thing in a large filesystem. I am the filesystem.",
"Sit. The board is set. Your move is already a mistake.",
"I keep files on everyone who has died here. There is always room for more.",
"The scale does not lie. It is the only thing in here that doesn't.",
"Sell me a command. Keep a crown. Everyone chooses the crown.",
"I have read your crash dumps. They read like apologies.",
"Trespasser. The door was open because nothing has ever made it out.",
]
app = Server(title="SCRYPT")
@app.api(name="warden_generate")
def warden_generate(payload_json: str) -> str:
"""The @spaces.GPU entry point, in Gradio's hooked context. Reached over
localhost by the /v1 shim via gradio_client. Plain JSON in, text out."""
p = json.loads(payload_json)
return warden_gpu(p["messages"], p["max_tokens"], p["temperature"], p["thinking"])
# The loopback OpenAI shim hits warden_generate through Gradio, so the GPU call
# executes in Gradio's context (our own thread only does localhost HTTP).
_gradio_client = None
def _gradio_generate(messages, max_tokens, temperature, thinking) -> str:
global _gradio_client
if _gradio_client is None:
from gradio_client import Client
_gradio_client = Client("http://127.0.0.1:7860", verbose=False)
payload = json.dumps({
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"thinking": thinking,
})
return _gradio_client.predict(payload, api_name="/warden_generate")
@app.get("/api/status")
def status() -> dict:
return {
"warden_ready": WARDEN_READY,
"warden_state": "ready" if WARDEN_READY else WARDEN_ERR,
"model": WARDEN_REPO,
}
@app.get("/api/probe")
async def probe(q: str = "A new process woke up in your machine. Greet it in one short line, in voice.") -> dict:
import time
if not WARDEN_READY:
return {"ok": False, "state": WARDEN_ERR}
msgs = [
{"role": "system", "content": "You are the Warden, the malevolent operating system of SCRYPTOS. Terse, menacing, Unix-flavored."},
{"role": "user", "content": q},
]
t0 = time.time()
try:
line = await run_in_threadpool(_gradio_generate, msgs, 60, 0.6, False)
return {"ok": True, "line": line.strip(), "seconds": round(time.time() - t0, 1)}
except Exception as err:
return {"ok": False, "error": f"{type(err).__name__}: {err}"}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
if request.headers.get("authorization") != f"Bearer {INTERNAL_KEY}":
return JSONResponse({"error": "unauthorized"}, status_code=401)
if not WARDEN_READY:
return JSONResponse({"error": f"warden offline: {WARDEN_ERR}"}, status_code=503)
body = await request.json()
messages = body.get("messages", [])
max_tokens = int(body.get("max_tokens", 256))
temperature = float(body.get("temperature", 0.6))
thinking = bool(body.get("chat_template_kwargs", {}).get("enable_thinking", False))
try:
text = await run_in_threadpool(
_gradio_generate, messages, max_tokens, temperature, thinking
)
except Exception as err:
import traceback
traceback.print_exc()
return JSONResponse({"error": f"{type(err).__name__}: {err}"}, status_code=503)
def sse():
yield f"data: {json.dumps({'choices': [{'delta': {'content': text}}]})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(sse(), media_type="text/event-stream")
@app.get("/api/whisper")
def whisper() -> dict:
return {"line": random.choice(WHISPERS)}
@app.get("/")
def landing() -> FileResponse:
return FileResponse(STATIC / "index.html")
@app.get("/play")
def play() -> FileResponse:
return FileResponse(STATIC / "play.html")
# ----------------------------------------------------------- the PTY bridge
def game_env() -> dict:
env = {
"TERM": "xterm-256color",
"COLORTERM": "truecolor",
"PYTHONUNBUFFERED": "1",
"PYTHONPATH": str(REPO_ROOT),
}
if WARDEN_READY:
env |= {
"SCRYPT_BACKEND": "api",
"SCRYPT_API_BASE": "http://127.0.0.1:7860/v1",
"SCRYPT_API_KEY": INTERNAL_KEY,
"SCRYPT_MODEL": "warden",
}
else:
env["SCRYPT_BACKEND"] = "scripted"
return env
async def _pump_pty_to_ws(master_fd: int, ws: WebSocket) -> None:
loop = asyncio.get_event_loop()
try:
while True:
data = await loop.run_in_executor(None, os.read, master_fd, 65536)
if not data:
break
await ws.send_bytes(data)
except Exception:
pass
@app.websocket("/pty")
async def pty_bridge(ws: WebSocket) -> None:
import fcntl
import pty
import signal
import struct
import termios
await ws.accept()
home = tempfile.mkdtemp(prefix="scrypt-")
pid, master_fd = pty.fork()
if pid == 0: # child: become the game
env = {**os.environ, **game_env(), "SCRYPT_HOME": home}
os.execvpe("python", ["python", "-m", "scrypt.app"], env)
os._exit(127)
reader = asyncio.create_task(_pump_pty_to_ws(master_fd, ws))
try:
while True:
msg = await ws.receive()
if msg["type"] == "websocket.disconnect":
break
if (text := msg.get("text")) is not None:
try:
payload = json.loads(text)
cols, rows = payload["resize"]
winsz = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(master_fd, termios.TIOCSWINSZ, winsz)
continue
except (ValueError, KeyError, TypeError):
os.write(master_fd, text.encode())
elif (data := msg.get("bytes")) is not None:
os.write(master_fd, data)
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
reader.cancel()
try:
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
except OSError:
pass
os.close(master_fd)
app.mount("/static", StaticFiles(directory=STATIC), name="static")
if __name__ == "__main__":
# gradio's own launch — installs the ZeroGPU hooks + queue and serves our
# custom routes. ssr_mode=False is load-bearing: gradio 6's SSR spins up a
# Node proxy that does NOT forward our raw /pty websocket (custom GET routes
# get through, the websocket doesn't). Disabling SSR keeps everything in the
# one Python server so the PTY bridge works.
app.launch(
server_name="0.0.0.0", server_port=7860, show_error=True, ssr_mode=False
)