"""OpenRA-Bench: Agent Leaderboard for OpenRA-RL. A Gradio app that displays agent rankings, supports filtering by type and opponent difficulty, and lets users run evaluations in-browser. Run locally: python app.py Deploy on HuggingFace Spaces: Push app.py, requirements.txt, data/, and README.md to your HF Space. """ import csv import html import json import os import re import time from collections import defaultdict from datetime import datetime, timezone from pathlib import Path import gradio as gr import pandas as pd from evaluate_runner import DEFAULT_SERVER, compute_composite_score, compute_game_metrics # ── Data Loading ────────────────────────────────────────────────────────────── DATA_PATH = Path(__file__).parent / "data" / "results.csv" AGENT_TYPE_COLORS = { "Scripted": "#ffcd75", # Gold "LLM": "#7497db", # Blue "RL": "#75809c", # Gray-blue } DISPLAY_COLUMNS = [ "Rank", "Agent", "Type", "Opponent", "Games", "Win Rate (%)", "Score", "K/D Ratio", "Avg Kills", "Avg Deaths", "Avg Economy", "Avg Game Length", "Date", "Replay", ] def _safe_agent_link(name: str, url) -> str: """Render agent name, optionally as a hyperlink. HTML-escaped to prevent XSS.""" safe_name = html.escape(str(name)) if pd.notna(url) and str(url).strip(): url_str = str(url).strip() # Only allow http/https URLs — block javascript:, data:, etc. if url_str.startswith(("http://", "https://")): safe_url = html.escape(url_str, quote=True) return f'{safe_name}' return safe_name def _safe_replay_link(url) -> str: """Render replay download link. Filename is sanitized to prevent XSS.""" if pd.notna(url) and str(url).strip(): # Sanitize: only allow alphanumeric, dash, underscore, dot safe_name = re.sub(r"[^a-zA-Z0-9._-]", "", str(url).strip()) if safe_name: escaped = html.escape(safe_name, quote=True) return f'' return "" def load_data() -> pd.DataFrame: """Load leaderboard data from CSV.""" if not DATA_PATH.exists(): return pd.DataFrame(columns=DISPLAY_COLUMNS) df = pd.read_csv(DATA_PATH) df = df.sort_values("score", ascending=False).reset_index(drop=True) df.insert(0, "Rank", range(1, len(df) + 1)) # Build agent name with optional hyperlink (XSS-safe) if "agent_url" in df.columns: df["Agent"] = df.apply( lambda r: _safe_agent_link(r.get("agent_name", ""), r.get("agent_url", "")), axis=1, ) else: df["Agent"] = df["agent_name"].apply(lambda n: html.escape(str(n))) # Build replay download link (XSS-safe) if "replay_url" in df.columns: df["Replay"] = df["replay_url"].apply(_safe_replay_link) else: df["Replay"] = "" # Rename for display df = df.rename(columns={ "agent_type": "Type", "opponent": "Opponent", "games": "Games", "win_rate": "Win Rate (%)", "score": "Score", "kd_ratio": "K/D Ratio", "avg_kills": "Avg Kills", "avg_deaths": "Avg Deaths", "avg_economy": "Avg Economy", "avg_game_length": "Avg Game Length", "timestamp": "Date", }) return df[DISPLAY_COLUMNS] def add_type_badges(df: pd.DataFrame) -> pd.DataFrame: """Add color-coded HTML badges to the Type column.""" def badge(agent_type: str) -> str: color = AGENT_TYPE_COLORS.get(agent_type, "#ccc") text_color = "#fff" if agent_type != "Scripted" else "#333" return ( f'' f"{agent_type}" ) df = df.copy() df["Type"] = df["Type"].apply(badge) return df # ── Filtering ───────────────────────────────────────────────────────────────── def filter_leaderboard( search: str, agent_types: list[str], opponent: str, ) -> pd.DataFrame: """Filter leaderboard by search, agent type, and opponent.""" df = load_data() # Filter by agent type if agent_types: df = df[df["Type"].isin(agent_types)] # Filter by opponent if opponent and opponent != "All": df = df[df["Opponent"] == opponent] # Search by agent name (regex with fallback to literal on invalid patterns) if search and search.strip(): patterns = [p.strip() for p in search.split(",") if p.strip()] mask = pd.Series([False] * len(df), index=df.index) for pattern in patterns: try: mask |= df["Agent"].str.contains(pattern, case=False, regex=True, na=False) except re.error: mask |= df["Agent"].str.contains( re.escape(pattern), case=False, regex=True, na=False ) df = df[mask] # Re-rank after filtering df = df.reset_index(drop=True) df["Rank"] = range(1, len(df) + 1) return add_type_badges(df) # ── Result Persistence ──────────────────────────────────────────────────────── SUBMISSIONS_DIR = Path(__file__).parent / "submissions" SUBMISSIONS_DIR.mkdir(exist_ok=True) # CommitScheduler pushes submissions to HF dataset (only on HF Spaces) _scheduler = None if os.environ.get("HF_TOKEN") and os.environ.get("SPACE_ID"): try: from huggingface_hub import CommitScheduler _scheduler = CommitScheduler( repo_id="openra-rl/bench-results", repo_type="dataset", folder_path=str(SUBMISSIONS_DIR), every=5, token=os.environ["HF_TOKEN"], ) except Exception: pass # Running locally without HF token — skip def _sanitize_csv_value(val): """Strip leading characters that trigger formula execution in spreadsheets.""" if isinstance(val, str): while val and val[0] in ("=", "+", "-", "@", "\t", "\r", "\n"): val = val[1:] val = val.replace("\n", " ").replace("\r", " ") return val # ── Rate Limiting ──────────────────────────────────────────────────────────── _submit_times: dict[str, list[float]] = defaultdict(list) MAX_SUBMITS_PER_HOUR = 20 def _check_rate_limit(identifier: str = "global") -> tuple[bool, str]: """Simple in-memory rate limiter. Returns (allowed, error_message).""" now = time.time() times = _submit_times[identifier] _submit_times[identifier] = [t for t in times if now - t < 3600] if len(_submit_times[identifier]) >= MAX_SUBMITS_PER_HOUR: return False, "Rate limit exceeded (max 20 submissions per hour). Try again later." _submit_times[identifier].append(now) return True, "" def save_submission(results: dict) -> None: """Append results to local JSONL and CSV.""" # JSONL for CommitScheduler → HF dataset jsonl_path = SUBMISSIONS_DIR / "results.jsonl" with open(jsonl_path, "a") as f: f.write(json.dumps(results) + "\n") # Also append to data/results.csv for the leaderboard csv_path = DATA_PATH file_exists = csv_path.exists() and csv_path.stat().st_size > 0 fieldnames = [ "agent_name", "agent_type", "opponent", "games", "win_rate", "score", "avg_kills", "avg_deaths", "kd_ratio", "avg_economy", "avg_game_length", "timestamp", "replay_url", "agent_url", ] safe_results = {k: _sanitize_csv_value(v) for k, v in results.items()} with open(csv_path, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) if not file_exists: writer.writeheader() writer.writerow(safe_results) # ── Submission Handling ─────────────────────────────────────────────────────── MAX_REPLAY_SIZE = 10 * 1024 * 1024 # 10 MB VALID_OPPONENTS = {"Beginner", "Easy", "Medium", "Normal", "Hard"} VALID_AGENT_TYPES = {"Scripted", "LLM", "RL"} REQUIRED_FIELDS = [ "agent_name", "agent_type", "opponent", "result", "ticks", "kills_cost", "deaths_cost", "assets_value", ] def validate_submission(data: dict) -> tuple[bool, str]: """Validate an uploaded JSON submission. Returns (is_valid, error_message). """ for field in REQUIRED_FIELDS: if field not in data: return False, f"Missing required field: {field}" if data["agent_type"] not in VALID_AGENT_TYPES: return False, ( f"Invalid agent_type: {data['agent_type']}. " f"Must be one of: {', '.join(sorted(VALID_AGENT_TYPES))}" ) if data["opponent"] not in VALID_OPPONENTS: return False, ( f"Invalid opponent: {data['opponent']}. " f"Must be one of: {', '.join(sorted(VALID_OPPONENTS))}" ) # Type checks for numeric fields for field in ("ticks", "kills_cost", "deaths_cost", "assets_value"): if not isinstance(data[field], (int, float)): return False, f"Field '{field}' must be a number" # String length limits if len(str(data["agent_name"])) > 100: return False, "agent_name must be 100 characters or fewer" # agent_url: optional, but must be http(s) if provided agent_url = str(data.get("agent_url", "")).strip() if agent_url and not agent_url.startswith(("http://", "https://")): return False, "agent_url must be an HTTP(S) URL" if len(agent_url) > 500: return False, "agent_url must be 500 characters or fewer" return True, "" def _score_from_submission(data: dict) -> dict: """Build a CSV-ready results dict from a validated submission.""" game_result = { "result": data.get("result", ""), "win": data.get("win", data.get("result") == "win"), "ticks": data.get("ticks", 0), "kills_cost": data.get("kills_cost", 0), "deaths_cost": data.get("deaths_cost", 0), "kd_ratio": data.get("kd_ratio", 0), "assets_value": data.get("assets_value", 0), "cash": data.get("cash", 0), } score = compute_composite_score([game_result]) kills = data.get("kills_cost", 0) deaths = data.get("deaths_cost", 0) games = data.get("games", 1) return { "agent_name": data["agent_name"], "agent_type": data["agent_type"], "opponent": data["opponent"], "games": games, "win_rate": round(100.0 * (1 if data.get("win") else 0) / max(games, 1), 1), "score": round(score, 1), "avg_kills": kills, "avg_deaths": deaths, "kd_ratio": round(kills / max(deaths, 1), 2), "avg_economy": data.get("assets_value", 0), "avg_game_length": data.get("ticks", 0), "timestamp": data.get("timestamp", datetime.now(timezone.utc).strftime("%Y-%m-%d"))[:10], "replay_url": "", "agent_url": data.get("agent_url", ""), } def handle_upload(json_file, replay_file) -> tuple[str, pd.DataFrame]: """Process an uploaded bench submission JSON + optional replay.""" if json_file is None: return "Please upload a JSON file.", add_type_badges(load_data()) allowed, err = _check_rate_limit() if not allowed: return err, add_type_badges(load_data()) try: with open(json_file.name) as f: data = json.load(f) except (json.JSONDecodeError, Exception) as e: return f"Invalid JSON: {e}", add_type_badges(load_data()) is_valid, error = validate_submission(data) if not is_valid: return f"Validation error: {error}", add_type_badges(load_data()) results_row = _score_from_submission(data) # Save replay if provided if replay_file is not None: import shutil orig = Path(replay_file.name) if orig.stat().st_size > MAX_REPLAY_SIZE: return "Replay file too large (max 10 MB).", add_type_badges(load_data()) ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") slug = re.sub(r"[^a-zA-Z0-9_-]", "", data["agent_name"].replace("/", "_").replace(" ", "_"))[:30] replay_name = f"replay-{slug}-{ts}.orarep" shutil.copy2(str(orig), SUBMISSIONS_DIR / replay_name) results_row["replay_url"] = replay_name save_submission(results_row) return ( f"Submitted! **{data['agent_name']}** ({data['agent_type']}) " f"vs {data['opponent']}: score **{results_row['score']}**", add_type_badges(load_data()), ) def handle_api_submit(json_data: str) -> str: """API endpoint: accept JSON string submission. Used by CLI auto-upload.""" allowed, err = _check_rate_limit() if not allowed: return err try: data = json.loads(json_data) except (json.JSONDecodeError, Exception) as e: return f"Invalid JSON: {e}" is_valid, error = validate_submission(data) if not is_valid: return f"Validation error: {error}" results_row = _score_from_submission(data) save_submission(results_row) return ( f"OK: {data['agent_name']} ({data['agent_type']}) " f"vs {data['opponent']}: score {results_row['score']}" ) def handle_api_submit_with_replay(json_data: str, replay_file) -> str: """API endpoint: accept JSON + replay file. Used by CLI with --replay.""" allowed, err = _check_rate_limit() if not allowed: return err try: data = json.loads(json_data) except (json.JSONDecodeError, Exception) as e: return f"Invalid JSON: {e}" is_valid, error = validate_submission(data) if not is_valid: return f"Validation error: {error}" results_row = _score_from_submission(data) # Save replay if provided if replay_file is not None: import shutil orig = Path(replay_file) if isinstance(replay_file, str) else Path(replay_file.name) if orig.exists() and orig.stat().st_size > MAX_REPLAY_SIZE: return "Replay file too large (max 10 MB)" ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") slug = re.sub(r"[^a-zA-Z0-9_-]", "", data["agent_name"].replace("/", "_").replace(" ", "_"))[:30] replay_name = f"replay-{slug}-{ts}.orarep" shutil.copy2(str(orig), SUBMISSIONS_DIR / replay_name) results_row["replay_url"] = replay_name save_submission(results_row) return ( f"OK: {data['agent_name']} ({data['agent_type']}) " f"vs {data['opponent']}: score {results_row['score']}" ) # ── UI ──────────────────────────────────────────────────────────────────────── ABOUT_MD = """ ## What is OpenRA-Bench? **OpenRA-Bench** is a standardized benchmark for evaluating AI agents playing [Red Alert](https://www.openra.net/) through the [OpenRA-RL](https://openra-rl.dev) environment. ### Evaluation Protocol - **Game**: Red Alert (OpenRA engine) - **Format**: 1v1 agent vs built-in AI - **Opponents**: Beginner, Easy, Medium, Normal, Hard difficulty - **Games per entry**: Minimum 10 games per configuration - **Metrics**: Win rate, composite score, K/D ratio, economy ### Composite Score The benchmark score combines three components: | Component | Weight | Description | |-----------|--------|-------------| | Win Rate | 50% | Percentage of games won | | Military Efficiency | 25% | Kill/death cost ratio (normalized) | | Economy | 25% | Final asset value (normalized) | ### Agent Types - **Scripted**: Rule-based bots with hardcoded strategies - **LLM**: Language model agents (Claude, GPT, etc.) - **RL**: Reinforcement learning policies (PPO, SAC, etc.) ### Links - [OpenRA-RL Documentation](https://openra-rl.dev) - [GitHub Repository](https://github.com/yxc20089/OpenRA-RL) - [OpenRA-Bench Source](https://github.com/yxc20089/OpenRA-Bench) - [OpenEnv Framework](https://huggingface.co/openenv) - [HuggingFace Space](https://huggingface.co/spaces/openra-rl/OpenRA-Bench) """ SUBMIT_MD = """ --- ## Other Submission Methods ### CLI Auto-Upload Set `BENCH_URL` in your OpenRA-RL config and results upload automatically after each game: ```yaml # config.yaml agent: bench_url: "https://openra-rl-openra-bench.hf.space" ``` ### CLI Manual Upload Upload a previously exported bench JSON: ```bash python -m openra_env.bench_submit ~/.openra-rl/bench-exports/bench-*.json ``` ### Batch Evaluation (10+ games) ```bash git clone https://github.com/yxc20089/OpenRA-Bench.git cd OpenRA-Bench pip install -r requirements.txt pip install openra-rl openra-rl-util python evaluate.py \\ --agent scripted \\ --agent-name "MyBot-v1" \\ --agent-type Scripted \\ --opponent Normal \\ --games 10 \\ --server http://localhost:8000 ``` ### Evaluation Parameters | Parameter | Description | |-----------|-------------| | `--agent` | Agent type: `scripted`, `llm`, `mcp`, `custom` | | `--agent-name` | Display name on the leaderboard | | `--agent-type` | Category: `Scripted`, `LLM`, `RL` | | `--opponent` | AI difficulty: `Beginner`, `Easy`, `Medium`, `Normal`, `Hard` | | `--games` | Number of games (minimum 10) | | `--server` | OpenRA-RL server URL (local or HuggingFace-hosted) | ### Custom Agents Implement the standard `reset/step` loop: ```python from openra_env.client import OpenRAEnv from openra_env.models import OpenRAAction async with OpenRAEnv("http://localhost:8000") as env: obs = await env.reset() while not obs.done: action = your_agent.decide(obs) obs = await env.step(action) ``` Then run `evaluate.py --agent custom` with your agent integrated. """ def build_app() -> gr.Blocks: """Build the Gradio leaderboard app.""" initial_df = add_type_badges(load_data()) with gr.Blocks(title="OpenRA-Bench") as app: gr.Markdown( "# OpenRA-Bench\n" "**Agent Leaderboard for OpenRA-RL** — " "Train AI to Play Real-Time Strategy" ) with gr.Tabs(): # ── Leaderboard Tab ─────────────────────────────────────────── with gr.Tab("Leaderboard"): with gr.Row(): search_box = gr.Textbox( label="Search agents", placeholder="Search by name (supports regex, comma-separated)...", scale=3, ) type_filter = gr.CheckboxGroup( choices=["Scripted", "LLM", "RL"], value=["Scripted", "LLM", "RL"], label="Agent Type", scale=2, ) opponent_filter = gr.Dropdown( choices=["All", "Beginner", "Easy", "Medium", "Normal", "Hard"], value="All", label="Opponent", scale=1, ) leaderboard = gr.Dataframe( value=initial_df, datatype=[ "number", # Rank "html", # Agent (may contain hyperlink) "html", # Type (badge) "str", # Opponent "number", # Games "number", # Win Rate "number", # Score "number", # K/D Ratio "number", # Avg Kills "number", # Avg Deaths "number", # Avg Economy "number", # Avg Game Length "str", # Date "html", # Replay (download link) ], interactive=False, show_label=False, ) # Wire up filters for component in [search_box, type_filter, opponent_filter]: component.change( fn=filter_leaderboard, inputs=[search_box, type_filter, opponent_filter], outputs=leaderboard, ) # ── About Tab ───────────────────────────────────────────────── with gr.Tab("About"): gr.Markdown(ABOUT_MD) # ── Submit Tab ──────────────────────────────────────────────── with gr.Tab("Submit"): gr.Markdown( "## Upload Results\n\n" "Upload a bench export JSON from your OpenRA-RL game. " "After each game, the agent saves a JSON file to " "`~/.openra-rl/bench-exports/`." ) with gr.Row(): json_upload = gr.File( label="Bench export JSON", file_types=[".json"], scale=3, ) replay_upload = gr.File( label="Replay file (optional)", file_types=[".orarep"], scale=2, ) submit_btn = gr.Button("Submit Results", variant="primary") submit_output = gr.Markdown() submit_btn.click( fn=handle_upload, inputs=[json_upload, replay_upload], outputs=[submit_output, leaderboard], ) # API endpoint for CLI auto-upload (JSON only) api_json_input = gr.Textbox(visible=False) api_result = gr.Textbox(visible=False) api_btn = gr.Button(visible=False) api_btn.click( fn=handle_api_submit, inputs=[api_json_input], outputs=[api_result], api_name="submit", ) # API endpoint for CLI upload with replay api_json_input2 = gr.Textbox(visible=False) api_replay_input = gr.File(visible=False) api_result2 = gr.Textbox(visible=False) api_btn2 = gr.Button(visible=False) api_btn2.click( fn=handle_api_submit_with_replay, inputs=[api_json_input2, api_replay_input], outputs=[api_result2], api_name="submit_with_replay", ) gr.Markdown(SUBMIT_MD) return app if __name__ == "__main__": app = build_app() app.launch(allowed_paths=[str(SUBMISSIONS_DIR)])