"""FastAPI application for the OpenRA-RL environment. Creates the OpenEnv-compatible server using create_app(). """ import asyncio import json import os import time from fastapi import Query from fastapi.responses import HTMLResponse, StreamingResponse from openenv.core.env_server import create_app from openra_env.models import OpenRAAction, OpenRAObservation from openra_env.server.openra_environment import OpenRAEnvironment app = create_app( OpenRAEnvironment, OpenRAAction, OpenRAObservation, env_name="openra_env", ) # ── Try Agent: LLM demo endpoint ──────────────────────────────────────────── _TRY_MAX_TURNS = 30 _TRY_MAX_TIME = 300 # 5 minutes _COMMENTARY_SYSTEM_PROMPT = ( "You are a real-time commentator for an AI playing Command & Conquer: Red Alert. " "Given the AI's recent actions and current game state, write 1-2 sentences " "explaining what the AI is doing and why, in an engaging style. " "Keep it concise and accessible to viewers who may not know RTS games well." ) _COMMENTARY_MAX_TOKENS = 150 def _sse(event_type: str, data: dict) -> str: """Format a Server-Sent Event.""" return f"event: {event_type}\ndata: {json.dumps(data)}\n\n" async def _generate_commentary(user_content: str, llm_config, broadcaster) -> None: """Generate commentary in the background and broadcast it.""" import httpx as _httpx try: headers = dict(llm_config.extra_headers) if llm_config.api_key: headers["Authorization"] = f"Bearer {llm_config.api_key}" payload = { "model": llm_config.model, "messages": [ {"role": "system", "content": _COMMENTARY_SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], "max_tokens": 400, "reasoning": {"effort": "low"}, "temperature": 0.6, "top_p": 0.95, } async with _httpx.AsyncClient() as client: resp = await client.post( llm_config.base_url, headers=headers, json=payload, timeout=llm_config.request_timeout_s, ) if resp.status_code != 200: return data = resp.json() msg = data["choices"][0]["message"] text = msg.get("content") or "" if not text: # Reasoning models may put output in 'reasoning' if content is empty text = msg.get("reasoning") or "" if text: sentences = [s.strip() for s in text.replace("\n", " ").split(".") if s.strip()] text = ". ".join(sentences[-2:]) + "." if sentences else "" if text: broadcaster._broadcast(_sse("commentary", {"text": text.strip()})) except Exception: pass # Commentary is non-essential class TryGameBroadcaster: """Manages a single game broadcast to multiple SSE subscribers.""" def __init__(self): self._event_history: list[str] = [] self._subscribers: set[asyncio.Queue] = set() self._game_running: bool = False self._game_task: asyncio.Task | None = None self._opponent: str = "" self._start_lock = asyncio.Lock() @property def game_running(self) -> bool: return self._game_running def subscribe(self) -> asyncio.Queue: queue: asyncio.Queue = asyncio.Queue() self._subscribers.add(queue) return queue def unsubscribe(self, queue: asyncio.Queue) -> None: self._subscribers.discard(queue) def _broadcast(self, event: str) -> None: self._event_history.append(event) for q in self._subscribers: q.put_nowait(event) async def replay_to(self, queue: asyncio.Queue) -> None: for event in list(self._event_history): await queue.put(event) async def start_game(self, opponent: str) -> None: async with self._start_lock: if self._game_running: return self._event_history.clear() self._opponent = opponent self._game_running = True self._game_task = asyncio.create_task(self._run_game(opponent)) async def _run_game(self, opponent: str) -> None: try: async for event in _run_try_agent(opponent): self._broadcast(event) finally: self._game_running = False sentinel = _sse("_stream_end", {}) for q in self._subscribers: q.put_nowait(sentinel) _broadcaster = TryGameBroadcaster() async def _run_try_agent(opponent: str): """Run LLM agent for one demo game, yielding SSE events.""" from openra_env.agent import ( SYSTEM_PROMPT, chat_completion, compose_pregame_briefing, compress_history, format_state_briefing, mcp_tools_to_openai, ) from openra_env.config import LLMConfig from openra_env.mcp_ws_client import OpenRAMCPClient api_key = os.environ.get("OPENROUTER_API_KEY", "") if not api_key: yield _sse("error_event", {"message": "Server not configured for demo play (no API key)."}) return llm_config = LLMConfig( api_key=api_key, model="stepfun/step-3.5-flash", base_url="https://openrouter.ai/api/v1/chat/completions", max_tokens=1500, temperature=1.0, top_p=0.95, reasoning_effort="low", extra_headers={ "HTTP-Referer": "https://openra-rl.dev", "X-Title": "OpenRA-RL Try Agent", }, ) commentary_config = LLMConfig( api_key=api_key, model="stepfun/step-3.5-flash", base_url="https://openrouter.ai/api/v1/chat/completions", max_tokens=_COMMENTARY_MAX_TOKENS, request_timeout_s=15.0, extra_headers={ "HTTP-Referer": "https://openra-rl.dev", "X-Title": "OpenRA-RL Commentary", }, ) # Configure opponent difficulty for the next game os.environ["BOT_TYPE"] = opponent.lower() yield _sse("status", {"message": f"Launching game vs {opponent} AI..."}) try: async with OpenRAMCPClient( base_url="http://localhost:8000", message_timeout_s=300.0 ) as env: yield _sse("status", {"message": "Resetting environment..."}) await env.reset() # Discover tools mcp_tools = await env.list_tools() openai_tools = mcp_tools_to_openai(mcp_tools) # Start + end planning to trigger session start (unpauses game) yield _sse("status", {"message": "Starting game session..."}) await env.call_tool("start_planning_phase") await env.call_tool("end_planning_phase", strategy="Demo game - aggressive rush") yield _sse("status", {"message": f"Game started. {len(mcp_tools)} tools available."}) # Initialize conversation messages = [{"role": "system", "content": SYSTEM_PROMPT}] # Get initial state and compose briefing state = await env.call_tool("get_game_state") briefing = compose_pregame_briefing(state) messages.append({ "role": "user", "content": ( f"Game started!\n\n{briefing}\n\n" f"## Current State\n```json\n{json.dumps(state, indent=2)}\n```\n\n" f"ACT NOW! Deploy your MCV immediately, then start building power plant + barracks. " f"Expand fast — every idle second costs you. Use plan() to chain: " f"deploy MCV → build power plant → build barracks → build refinery. " f"Then focus on economy (3+ refineries) and defense turrets toward the enemy." ), }) yield _sse("game_state", { "tick": state.get("tick", 0), "units": state.get("own_units", 0), "buildings": state.get("own_buildings", 0), "cash": state.get("economy", {}).get("cash", 0), }) total_tool_calls = 0 total_api_calls = 0 start_time = time.time() game_done = False consecutive_errors = 0 for turn in range(1, _TRY_MAX_TURNS + 1): elapsed = time.time() - start_time if elapsed >= _TRY_MAX_TIME: yield _sse("status", {"message": f"Time limit reached ({_TRY_MAX_TIME}s)."}) break # Compress history to stay within context limits messages = compress_history(messages, keep_last=40) # Inject state briefing (skip first turn — initial state already sent) if total_api_calls > 0: try: briefing_state = await env.call_tool("get_game_state") brief = format_state_briefing(briefing_state) if brief: messages.append({"role": "user", "content": brief}) if isinstance(briefing_state, dict) and briefing_state.get("done"): game_done = True yield _sse("done", { "result": briefing_state.get("result", "?"), "tick": briefing_state.get("tick", 0), }) break except Exception: pass # Call LLM try: response = await chat_completion(messages, openai_tools, llm_config) except Exception as e: yield _sse("error_event", {"message": f"LLM error: {e}"}) break total_api_calls += 1 choice = response["choices"][0] assistant_msg = choice["message"] messages.append(assistant_msg) # Emit LLM reasoning if assistant_msg.get("content"): yield _sse("llm", {"content": assistant_msg["content"][:500]}) yield _sse("turn", { "turn": turn, "api_calls": total_api_calls, "elapsed": round(elapsed), }) # Handle tool calls tool_calls = assistant_msg.get("tool_calls", []) if not tool_calls: messages.append({ "role": "user", "content": "Please use the game tools to take action.", }) continue for tc in tool_calls: fn_name = tc["function"]["name"] try: fn_args = json.loads(tc["function"].get("arguments", "{}")) except (json.JSONDecodeError, TypeError): fn_args = {} total_tool_calls += 1 args_str = json.dumps(fn_args) if len(args_str) > 120: args_str = args_str[:120] + "..." yield _sse("tool_call", {"name": fn_name, "args": args_str}) try: result = await env.call_tool(fn_name, **fn_args) consecutive_errors = 0 except Exception as e: result = {"error": str(e)} # Detect game crash if isinstance(result, dict) and "connection lost" in str( result.get("error", "") ).lower(): consecutive_errors += 1 if consecutive_errors >= 3: yield _sse("error_event", {"message": "Game connection lost."}) game_done = True result_str = ( json.dumps(result) if not isinstance(result, str) else result ) messages.append({ "role": "tool", "tool_call_id": tc["id"], "content": result_str, }) # Check game over if isinstance(result, dict): if result.get("done"): game_done = True yield _sse("done", { "result": result.get("result", "?"), "tick": result.get("tick", 0), }) elif "tick" in result and "economy" in result: yield _sse("game_state", { "tick": result.get("tick", 0), "units": result.get("own_units", 0), "buildings": result.get("own_buildings", 0), "cash": result.get("economy", {}).get("cash", 0), }) # Fire-and-forget async commentary (doesn't block game loop) if tool_calls and not game_done: action_summaries = [] for tc in tool_calls: fn = tc["function"]["name"] try: fa = json.loads(tc["function"].get("arguments", "{}")) except (json.JSONDecodeError, TypeError): fa = {} action_summaries.append(f"{fn}({json.dumps(fa)})") commentary_user = ( f"Turn {turn} actions:\n" + "\n".join(f"- {a}" for a in action_summaries[:8]) ) asyncio.create_task(_generate_commentary( commentary_user, commentary_config, _broadcaster, )) if game_done: break if choice.get("finish_reason") == "stop" and not tool_calls: messages.append({ "role": "user", "content": "Continue playing. Use game tools to check state and take actions.", }) # Surrender if game didn't end naturally if not game_done: try: await env.call_tool("surrender") except Exception: pass # Emit final scorecard try: final = await env.call_tool("get_game_state") mil = final.get("military", {}) eco = final.get("economy", {}) yield _sse("final", { "result": final.get("result", "ongoing"), "tick": final.get("tick", 0), "turns": total_api_calls, "tool_calls": total_tool_calls, "elapsed": round(time.time() - start_time), "kills_cost": mil.get("kills_cost", 0), "deaths_cost": mil.get("deaths_cost", 0), "units_killed": mil.get("units_killed", 0), "units_lost": mil.get("units_lost", 0), "cash": eco.get("cash", 0), "units": final.get("own_units", 0), "buildings": final.get("own_buildings", 0), }) except Exception: pass except Exception as e: yield _sse("error_event", {"message": str(e)}) @app.get("/try-agent") async def try_agent( opponent: str = Query("Normal", pattern="^(Easy|Normal|Hard)$"), ): """SSE stream of an LLM agent playing Red Alert. Multiple viewers can watch simultaneously. The first request starts a new game; subsequent requests join as spectators of the ongoing game. """ queue = _broadcaster.subscribe() if _broadcaster.game_running: await queue.put(_sse("status", {"message": "Joining ongoing game as spectator..."})) await _broadcaster.replay_to(queue) else: await _broadcaster.start_game(opponent) async def stream(): try: while True: event = await asyncio.wait_for(queue.get(), timeout=360) if '"_stream_end"' in event: break yield event except asyncio.TimeoutError: pass finally: _broadcaster.unsubscribe(queue) return StreamingResponse( stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) LANDING_PAGE = """\ OpenRA-RL — OpenEnv Environment
SYSTEM OVERRIDE ACTIVE

OPENRA-RL

OpenEnv environment for training AI agents to play Red Alert through the OpenRA engine. Connect via WebSocket or HTTP, send actions, observe the battlefield.
WATCH AI PLAY DOCUMENTATION LEADERBOARD

Endpoints

API DOCS

Interactive Swagger UI with all REST and WebSocket endpoints.

/docs →

HEALTH CHECK

Server status and readiness probe for monitoring.

/health →

ENV SCHEMA

JSON schemas for actions, observations, and game state.

/schema →

Connect to Environment

Use the Python client to connect, reset the environment, and step through the game loop. Works with both local Docker and this HuggingFace-hosted server.

API REFERENCE
terminal
$ pip install openra-rl

from openra_env.client import OpenRAEnv
from openra_env.models import OpenRAAction

url = "https://openra-rl-openra-rl.hf.space"

async with OpenRAEnv(url) as env:
    obs = await env.reset()
    while not obs.done:
        action = your_agent(obs)
        obs = await env.step(action)
""" @app.get("/", response_class=HTMLResponse) async def root(): """Landing page for the HuggingFace Space.""" return LANDING_PAGE # ── Try Page: Watch AI Play ────────────────────────────────────────────────── TRY_PAGE = """\ Try — Watch AI Play Red Alert

Watch AI Play

A pre-configured LLM agent plays Red Alert against the built-in AI. No setup needed.

Waiting to start...\n

Scorecard

""" @app.get("/try-status") async def try_status(): """Check if a game is currently running.""" return { "game_running": _broadcaster.game_running, "opponent": _broadcaster._opponent if _broadcaster.game_running else "", } @app.get("/try", response_class=HTMLResponse) async def try_page(): """Interactive page to watch an LLM agent play Red Alert.""" return TRY_PAGE def main(): import uvicorn uvicorn.run( app, host="0.0.0.0", port=8000, ws_ping_interval=None, ws_ping_timeout=None, ) if __name__ == "__main__": main()