# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ FastAPI application for the Toxic Royale Env Environment. This module creates an HTTP server that exposes the ToxicRoyaleEnvironment over HTTP and WebSocket endpoints, compatible with EnvClient. Endpoints: - POST /reset: Reset the environment - POST /step: Execute an action - GET /state: Get current environment state - GET /schema: Get action/observation schemas - WS /ws: WebSocket endpoint for persistent sessions Usage: # Development (with auto-reload): uvicorn toxic_royale_env.server.app:app --reload --host 0.0.0.0 --port 8000 # Production: uvicorn toxic_royale_env.server.app:app --host 0.0.0.0 --port 8000 --workers 4 # Or run directly: python -m toxic_royale_env.server.app """ try: from openenv.core.env_server.http_server import create_app except Exception as e: # pragma: no cover raise ImportError( "openenv is required for the web interface. Install dependencies with '\n uv sync\n'" ) from e import json import time from collections import deque from threading import Lock from fastapi.responses import HTMLResponse, StreamingResponse from toxic_royale_env.models import ToxicRoyaleAction, ToxicRoyaleObservation from toxic_royale_env.policy_models import FramePacket, PolicyActResponse, PolicyAction from toxic_royale_env.policy_utils import append_jsonl, now_ms, rule_policy, should_gate from toxic_royale_env.server.toxic_royale_env_environment import ToxicRoyaleEnvironment # Create the app with web interface and README integration app = create_app( ToxicRoyaleEnvironment, ToxicRoyaleAction, ToxicRoyaleObservation, env_name="toxic_royale_env", max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions ) # --- Live viewer memory (in-RAM ring buffer) --- _LIVE_LOCK = Lock() _LIVE_EVENTS: deque[dict] = deque(maxlen=2000) # last ~2000 ticks across matches def _live_push(obj: dict) -> None: with _LIVE_LOCK: _LIVE_EVENTS.append(obj) def _live_snapshot(match_id: str | None = None) -> list[dict]: with _LIVE_LOCK: if match_id: return [e for e in list(_LIVE_EVENTS) if e.get("match_id") == match_id] return list(_LIVE_EVENTS) @app.post("/policy/act", response_model=PolicyActResponse) def policy_act(packet: FramePacket) -> PolicyActResponse: """ Stateless policy endpoint for the Windows live pipeline. Input: structured FramePacket (no images) Output: one action (play/wait) for this tick """ t0 = now_ms() tick_id = int(packet.meta.tick_id or 0) if should_gate(packet): action = PolicyAction(kind="wait", emote="yawn", reasoning="low_detection_quality") resp = PolicyActResponse(tick_id=tick_id, action=action, source="gated") else: action = rule_policy(packet) resp = PolicyActResponse(tick_id=tick_id, action=action, source="rules") latency_ms = now_ms() - t0 live_obj = { "ts_ms": now_ms(), "match_id": packet.meta.match_id, "tick_id": tick_id, "time_left_s": (packet.ui.time_left_s if packet.ui else None), "overall_q": ( packet.debug.detections_quality.overall if packet.debug and packet.debug.detections_quality else None ), "my_elixir": packet.player.elixir if packet.player else None, "hand": [c.model_dump() for c in (packet.player.hand if packet.player else [])], "action": resp.action.model_dump(), "source": resp.source, "latency_ms": latency_ms, } _live_push(live_obj) append_jsonl( # stored under toxic_royale_env/outputs/policy_logs/ __import__("pathlib").Path(__file__).resolve().parents[1] / "outputs" / "policy_logs" / "policy_act.jsonl", live_obj, ) return resp @app.get("/live", response_class=HTMLResponse) def live_page() -> str: return """ ToxicRoyale Live Match

Live Match Viewer

Match
-
Tick
-
Time left
-
Elixir
-
Quality
-
Chosen action
-
-

Hand

slotcardcostplayableconf

Raw event JSON

""" @app.get("/live/stream") def live_stream(match_id: str | None = None): def gen(): last_ts = 0 while True: events = _live_snapshot(match_id) if events: ev = events[-1] if int(ev.get("ts_ms") or 0) != last_ts: last_ts = int(ev.get("ts_ms") or 0) yield f"data: {json.dumps(ev, ensure_ascii=False)}\\n\\n" time.sleep(0.2) return StreamingResponse(gen(), media_type="text/event-stream") def main(host: str = "0.0.0.0", port: int = 8000): """ Entry point for direct execution via uv run or python -m. This function enables running the server without Docker: uv run --project . server uv run --project . server --port 8001 python -m toxic_royale_env.server.app Args: host: Host address to bind to (default: "0.0.0.0") port: Port number to listen on (default: 8000) For production deployments, consider using uvicorn directly with multiple workers: uvicorn toxic_royale_env.server.app:app --workers 4 """ import uvicorn uvicorn.run(app, host=host, port=port) if __name__ == "__main__": main()