"""FastAPI application for the OpenRA-RL environment. Creates the OpenEnv-compatible server using create_app(). """ import asyncio import json import os import time from fastapi import Query from fastapi.responses import HTMLResponse, StreamingResponse from openenv.core.env_server import create_app from openra_env.models import OpenRAAction, OpenRAObservation from openra_env.server.openra_environment import OpenRAEnvironment app = create_app( OpenRAEnvironment, OpenRAAction, OpenRAObservation, env_name="openra_env", ) # ── Try Agent: LLM demo endpoint ──────────────────────────────────────────── _TRY_MAX_TURNS = 30 _TRY_MAX_TIME = 300 # 5 minutes _COMMENTARY_SYSTEM_PROMPT = ( "You are a real-time commentator for an AI playing Command & Conquer: Red Alert. " "Given the AI's recent actions and current game state, write 1-2 sentences " "explaining what the AI is doing and why, in an engaging style. " "Keep it concise and accessible to viewers who may not know RTS games well." ) _COMMENTARY_MAX_TOKENS = 150 def _sse(event_type: str, data: dict) -> str: """Format a Server-Sent Event.""" return f"event: {event_type}\ndata: {json.dumps(data)}\n\n" async def _generate_commentary(user_content: str, llm_config, broadcaster) -> None: """Generate commentary in the background and broadcast it.""" import httpx as _httpx try: headers = dict(llm_config.extra_headers) if llm_config.api_key: headers["Authorization"] = f"Bearer {llm_config.api_key}" payload = { "model": llm_config.model, "messages": [ {"role": "system", "content": _COMMENTARY_SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], "max_tokens": 400, "reasoning": {"effort": "low"}, "temperature": 0.6, "top_p": 0.95, } async with _httpx.AsyncClient() as client: resp = await client.post( llm_config.base_url, headers=headers, json=payload, timeout=llm_config.request_timeout_s, ) if resp.status_code != 200: return data = resp.json() msg = data["choices"][0]["message"] text = msg.get("content") or "" if not text: # Reasoning models may put output in 'reasoning' if content is empty text = msg.get("reasoning") or "" if text: sentences = [s.strip() for s in text.replace("\n", " ").split(".") if s.strip()] text = ". ".join(sentences[-2:]) + "." if sentences else "" if text: broadcaster._broadcast(_sse("commentary", {"text": text.strip()})) except Exception: pass # Commentary is non-essential class TryGameBroadcaster: """Manages a single game broadcast to multiple SSE subscribers.""" def __init__(self): self._event_history: list[str] = [] self._subscribers: set[asyncio.Queue] = set() self._game_running: bool = False self._game_task: asyncio.Task | None = None self._opponent: str = "" self._start_lock = asyncio.Lock() @property def game_running(self) -> bool: return self._game_running def subscribe(self) -> asyncio.Queue: queue: asyncio.Queue = asyncio.Queue() self._subscribers.add(queue) return queue def unsubscribe(self, queue: asyncio.Queue) -> None: self._subscribers.discard(queue) def _broadcast(self, event: str) -> None: self._event_history.append(event) for q in self._subscribers: q.put_nowait(event) async def replay_to(self, queue: asyncio.Queue) -> None: for event in list(self._event_history): await queue.put(event) async def start_game(self, opponent: str) -> None: async with self._start_lock: if self._game_running: return self._event_history.clear() self._opponent = opponent self._game_running = True self._game_task = asyncio.create_task(self._run_game(opponent)) async def _run_game(self, opponent: str) -> None: try: async for event in _run_try_agent(opponent): self._broadcast(event) finally: self._game_running = False sentinel = _sse("_stream_end", {}) for q in self._subscribers: q.put_nowait(sentinel) _broadcaster = TryGameBroadcaster() async def _run_try_agent(opponent: str): """Run LLM agent for one demo game, yielding SSE events.""" from openra_env.agent import ( SYSTEM_PROMPT, chat_completion, compose_pregame_briefing, compress_history, format_state_briefing, mcp_tools_to_openai, ) from openra_env.config import LLMConfig from openra_env.mcp_ws_client import OpenRAMCPClient api_key = os.environ.get("OPENROUTER_API_KEY", "") if not api_key: yield _sse("error_event", {"message": "Server not configured for demo play (no API key)."}) return llm_config = LLMConfig( api_key=api_key, model="stepfun/step-3.5-flash", base_url="https://openrouter.ai/api/v1/chat/completions", max_tokens=1500, temperature=1.0, top_p=0.95, reasoning_effort="low", extra_headers={ "HTTP-Referer": "https://openra-rl.dev", "X-Title": "OpenRA-RL Try Agent", }, ) commentary_config = LLMConfig( api_key=api_key, model="stepfun/step-3.5-flash", base_url="https://openrouter.ai/api/v1/chat/completions", max_tokens=_COMMENTARY_MAX_TOKENS, request_timeout_s=15.0, extra_headers={ "HTTP-Referer": "https://openra-rl.dev", "X-Title": "OpenRA-RL Commentary", }, ) # Configure opponent difficulty for the next game os.environ["BOT_TYPE"] = opponent.lower() yield _sse("status", {"message": f"Launching game vs {opponent} AI..."}) try: async with OpenRAMCPClient( base_url="http://localhost:8000", message_timeout_s=300.0 ) as env: yield _sse("status", {"message": "Resetting environment..."}) await env.reset() # Discover tools mcp_tools = await env.list_tools() openai_tools = mcp_tools_to_openai(mcp_tools) # Start + end planning to trigger session start (unpauses game) yield _sse("status", {"message": "Starting game session..."}) await env.call_tool("start_planning_phase") await env.call_tool("end_planning_phase", strategy="Demo game - aggressive rush") yield _sse("status", {"message": f"Game started. {len(mcp_tools)} tools available."}) # Initialize conversation messages = [{"role": "system", "content": SYSTEM_PROMPT}] # Get initial state and compose briefing state = await env.call_tool("get_game_state") briefing = compose_pregame_briefing(state) messages.append({ "role": "user", "content": ( f"Game started!\n\n{briefing}\n\n" f"## Current State\n```json\n{json.dumps(state, indent=2)}\n```\n\n" f"ACT NOW! Deploy your MCV immediately, then start building power plant + barracks. " f"Expand fast — every idle second costs you. Use plan() to chain: " f"deploy MCV → build power plant → build barracks → build refinery. " f"Then focus on economy (3+ refineries) and defense turrets toward the enemy." ), }) yield _sse("game_state", { "tick": state.get("tick", 0), "units": state.get("own_units", 0), "buildings": state.get("own_buildings", 0), "cash": state.get("economy", {}).get("cash", 0), }) total_tool_calls = 0 total_api_calls = 0 start_time = time.time() game_done = False consecutive_errors = 0 for turn in range(1, _TRY_MAX_TURNS + 1): elapsed = time.time() - start_time if elapsed >= _TRY_MAX_TIME: yield _sse("status", {"message": f"Time limit reached ({_TRY_MAX_TIME}s)."}) break # Compress history to stay within context limits messages = compress_history(messages, keep_last=40) # Inject state briefing (skip first turn — initial state already sent) if total_api_calls > 0: try: briefing_state = await env.call_tool("get_game_state") brief = format_state_briefing(briefing_state) if brief: messages.append({"role": "user", "content": brief}) if isinstance(briefing_state, dict) and briefing_state.get("done"): game_done = True yield _sse("done", { "result": briefing_state.get("result", "?"), "tick": briefing_state.get("tick", 0), }) break except Exception: pass # Call LLM try: response = await chat_completion(messages, openai_tools, llm_config) except Exception as e: yield _sse("error_event", {"message": f"LLM error: {e}"}) break total_api_calls += 1 choice = response["choices"][0] assistant_msg = choice["message"] messages.append(assistant_msg) # Emit LLM reasoning if assistant_msg.get("content"): yield _sse("llm", {"content": assistant_msg["content"][:500]}) yield _sse("turn", { "turn": turn, "api_calls": total_api_calls, "elapsed": round(elapsed), }) # Handle tool calls tool_calls = assistant_msg.get("tool_calls", []) if not tool_calls: messages.append({ "role": "user", "content": "Please use the game tools to take action.", }) continue for tc in tool_calls: fn_name = tc["function"]["name"] try: fn_args = json.loads(tc["function"].get("arguments", "{}")) except (json.JSONDecodeError, TypeError): fn_args = {} total_tool_calls += 1 args_str = json.dumps(fn_args) if len(args_str) > 120: args_str = args_str[:120] + "..." yield _sse("tool_call", {"name": fn_name, "args": args_str}) try: result = await env.call_tool(fn_name, **fn_args) consecutive_errors = 0 except Exception as e: result = {"error": str(e)} # Detect game crash if isinstance(result, dict) and "connection lost" in str( result.get("error", "") ).lower(): consecutive_errors += 1 if consecutive_errors >= 3: yield _sse("error_event", {"message": "Game connection lost."}) game_done = True result_str = ( json.dumps(result) if not isinstance(result, str) else result ) messages.append({ "role": "tool", "tool_call_id": tc["id"], "content": result_str, }) # Check game over if isinstance(result, dict): if result.get("done"): game_done = True yield _sse("done", { "result": result.get("result", "?"), "tick": result.get("tick", 0), }) elif "tick" in result and "economy" in result: yield _sse("game_state", { "tick": result.get("tick", 0), "units": result.get("own_units", 0), "buildings": result.get("own_buildings", 0), "cash": result.get("economy", {}).get("cash", 0), }) # Fire-and-forget async commentary (doesn't block game loop) if tool_calls and not game_done: action_summaries = [] for tc in tool_calls: fn = tc["function"]["name"] try: fa = json.loads(tc["function"].get("arguments", "{}")) except (json.JSONDecodeError, TypeError): fa = {} action_summaries.append(f"{fn}({json.dumps(fa)})") commentary_user = ( f"Turn {turn} actions:\n" + "\n".join(f"- {a}" for a in action_summaries[:8]) ) asyncio.create_task(_generate_commentary( commentary_user, commentary_config, _broadcaster, )) if game_done: break if choice.get("finish_reason") == "stop" and not tool_calls: messages.append({ "role": "user", "content": "Continue playing. Use game tools to check state and take actions.", }) # Surrender if game didn't end naturally if not game_done: try: await env.call_tool("surrender") except Exception: pass # Emit final scorecard try: final = await env.call_tool("get_game_state") mil = final.get("military", {}) eco = final.get("economy", {}) yield _sse("final", { "result": final.get("result", "ongoing"), "tick": final.get("tick", 0), "turns": total_api_calls, "tool_calls": total_tool_calls, "elapsed": round(time.time() - start_time), "kills_cost": mil.get("kills_cost", 0), "deaths_cost": mil.get("deaths_cost", 0), "units_killed": mil.get("units_killed", 0), "units_lost": mil.get("units_lost", 0), "cash": eco.get("cash", 0), "units": final.get("own_units", 0), "buildings": final.get("own_buildings", 0), }) except Exception: pass except Exception as e: yield _sse("error_event", {"message": str(e)}) @app.get("/try-agent") async def try_agent( opponent: str = Query("Normal", pattern="^(Easy|Normal|Hard)$"), ): """SSE stream of an LLM agent playing Red Alert. Multiple viewers can watch simultaneously. The first request starts a new game; subsequent requests join as spectators of the ongoing game. """ queue = _broadcaster.subscribe() if _broadcaster.game_running: await queue.put(_sse("status", {"message": "Joining ongoing game as spectator..."})) await _broadcaster.replay_to(queue) else: await _broadcaster.start_game(opponent) async def stream(): try: while True: event = await asyncio.wait_for(queue.get(), timeout=360) if '"_stream_end"' in event: break yield event except asyncio.TimeoutError: pass finally: _broadcaster.unsubscribe(queue) return StreamingResponse( stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) LANDING_PAGE = """\
Use the Python client to connect, reset the environment, and step through the game loop. Works with both local Docker and this HuggingFace-hosted server.
API REFERENCE$ pip install openra-rl from openra_env.client import OpenRAEnv from openra_env.models import OpenRAAction url = "https://openra-rl-openra-rl.hf.space" async with OpenRAEnv(url) as env: obs = await env.reset() while not obs.done: action = your_agent(obs) obs = await env.step(action)
A pre-configured LLM agent plays Red Alert against the built-in AI. No setup needed.