openapp_env / server /openapp_environment.py
Crashbandicoote2's picture
Upload folder using huggingface_hub
446b0a9 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
OpenApp Environment Implementation.
A web application simulation environment that wraps OpenApps and BrowserGym.
This environment provides agent interaction with simulated web apps including
calendar, todo, messenger, and maps applications.
"""
import os
import subprocess
import time
import urllib.request
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from uuid import uuid4
# Support both in-repo and standalone imports
try:
# In-repo imports (when running from OpenEnv repository)
from core.env_server.interfaces import Environment
from core.env_server.types import State
from ..models import OpenAppAction, OpenAppObservation
except ImportError:
# Standalone imports (when environment is standalone with openenv-core from pip)
from openenv_core.env_server.interfaces import Environment
from openenv_core.env_server.types import State
from openapp_env.models import OpenAppAction, OpenAppObservation
class GenericOpenAppsTask:
"""
A generic task for OpenApps interaction without specific goals.
This is a simple wrapper that allows BrowserGym to interact with OpenApps
without requiring a specific task. For task-based interaction, use the
OpenAppsTask from open_apps.tasks.add_tasks_to_browsergym.
"""
def __init__(
self,
base_url: str,
seed: int = 1,
**kwargs,
) -> None:
"""
Initialize generic OpenApps task.
Args:
base_url: Base URL of the OpenApps server
seed: Random seed (required by BrowserGym)
**kwargs: Additional arguments (ignored)
"""
try:
from browsergym.core.task import AbstractBrowserTask
import playwright.sync_api
except ImportError:
raise ImportError(
"BrowserGym is required. Install with: pip install browsergym"
)
# Store as instance attributes
self.base_url = base_url
self.seed = seed
# BrowserGym task properties
self.viewport = {"width": 1024, "height": 768}
self.slow_mo = 100
self.timeout = 5000
# Additional properties that BrowserGym might expect
self.locale = None
self.timezone_id = None
self.geolocation = None
def setup(
self, page: "playwright.sync_api.Page"
) -> Tuple[str, Dict[str, Any]]:
"""
Set up the task by navigating to the base URL.
Args:
page: Playwright page object
Returns:
Tuple of (goal_string, info_dict)
"""
page.goto(self.base_url)
return "Explore OpenApps", {}
def teardown(self) -> None:
"""Clean up after task completion."""
pass
def validate(
self, page: "playwright.sync_api.Page", chat_messages: list[str]
) -> Tuple[float, bool, str, Dict[str, Any]]:
"""
Validate task state and return reward.
Args:
page: Playwright page object
chat_messages: List of chat messages
Returns:
Tuple of (reward, done, message, info)
"""
# Generic task never completes automatically
return 0.0, False, "", {}
def cheat(
self, page: "playwright.sync_api.Page", chat_messages: list[str]
) -> None:
"""Cheat method (no-op for generic task)."""
pass
class OpenAppEnvironment(Environment):
"""
A web application environment that wraps OpenApps and BrowserGym.
This environment launches OpenApps web server and provides a BrowserGym-like
interface for agents to interact with simulated web applications.
Args:
openapps_path: Path to OpenApps directory (default: auto-detect)
web_app_port: Port for OpenApps web server (default: 5001)
headless: Run browser in headless mode (default: True)
task_name: Optional task name to evaluate (e.g., "add_meeting_with_dennis")
apps_config: Configuration for apps (default: all enabled)
max_steps: Maximum steps per episode (default: 50)
Example:
>>> env = OpenAppEnvironment()
>>> obs = env.reset()
>>> print(obs.url) # Starting page URL
>>>
>>> # Click on an element
>>> action = OpenAppAction(action_type="click", bid="calendar-btn")
>>> obs = env.step(action)
>>> print(obs.html)
"""
def __init__(
self,
openapps_url: Optional[str] = None,
openapps_path: Optional[str] = None,
web_app_port: int = 5001,
headless: bool = True,
task_name: Optional[str] = None,
apps_config: Optional[Dict[str, Any]] = None,
max_steps: int = 50,
):
"""Initialize the OpenApp environment."""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._max_steps = max_steps
# OpenApps configuration
# Priority: 1. openapps_url, 2. OPENAPPS_URL env var, 3. Try to find/launch
self.openapps_url = openapps_url or os.environ.get("OPENAPPS_URL")
if not self.openapps_url:
self.web_app_port = web_app_port
self.openapps_url = f"http://localhost:{web_app_port}"
self.openapps_path = openapps_path
self.headless = headless
self.task_name = task_name
self.apps_config = apps_config or {}
# Runtime state
self._apps_process: Optional[subprocess.Popen] = None
self._browser_env = None
self._current_html = ""
self._current_url = ""
self._current_axtree = ""
self._app_state = {}
self._last_action_error = None
self._episode_reward = 0.0
def _detect_openapps_path(self) -> str:
"""
Auto-detect OpenApps path.
Since OpenApps is installed as a Python package, we use the installed
package location instead of requiring a separate directory.
"""
# Check if user provided a custom path via environment variable
env_path = os.environ.get("OPENAPPS_PATH")
if env_path and Path(env_path).exists():
return env_path
# Try to find OpenApps as an installed package
try:
import open_apps
openapps_pkg_path = Path(open_apps.__file__).parent.parent
if openapps_pkg_path.exists():
return str(openapps_pkg_path)
except ImportError:
pass
raise ValueError(
"OpenApps not found. Please install it with: "
"pip install git+https://github.com/facebookresearch/OpenApps.git "
"or set OPENAPPS_PATH environment variable."
)
def _launch_openapps_server(self) -> Optional[subprocess.Popen]:
"""
Launch OpenApps web server in background.
Returns None if server is expected to be already running (OPENAPPS_URL set).
"""
# If OPENAPPS_URL is set, assume server is already running
if os.environ.get("OPENAPPS_URL"):
print(f"Using existing OpenApps server at {self.openapps_url}")
# Wait for server to be available
self._wait_for_server(max_wait=5)
return None
# Otherwise, provide helpful error message
raise NotImplementedError(
"Automatic OpenApps server launch is not yet implemented.\n"
"\n"
"Please start OpenApps manually in a separate terminal:\n"
" 1. Clone OpenApps: git clone https://github.com/facebookresearch/OpenApps.git\n"
" 2. Install: cd OpenApps && uv sync\n"
" 3. Run: uv run launch.py\n"
"\n"
"Then set the OPENAPPS_URL environment variable:\n"
" export OPENAPPS_URL=http://localhost:5001\n"
"\n"
"Or use Docker mode which handles this automatically:\n"
" python examples/openapp_example.py --mode docker\n"
)
def _wait_for_server(self, max_wait: int = 30):
"""Wait for OpenApps server to become available."""
for i in range(max_wait):
try:
response = urllib.request.urlopen(self.openapps_url, timeout=2)
if response.status == 200:
return
except Exception:
pass
time.sleep(1)
raise TimeoutError(f"OpenApps server did not start within {max_wait} seconds")
def _initialize_browser(self):
"""Initialize BrowserGym environment for interaction."""
try:
from browsergym.core.env import BrowserEnv
except ImportError:
raise ImportError(
"BrowserGym is required for OpenApp environment. "
"Install it with: pip install browsergym"
)
# Create BrowserGym environment with generic OpenApps task
self._browser_env = BrowserEnv(
task_entrypoint=GenericOpenAppsTask,
task_kwargs={"base_url": self.openapps_url},
headless=self.headless,
slow_mo=200, # Slow down actions so they're visible (200ms delay)
)
def _get_current_observation(self) -> Dict[str, Any]:
"""Extract current observation from browser state."""
if self._browser_env is None:
return {
"html": "",
"url": self.openapps_url,
"open_pages_urls": [self.openapps_url],
"active_page_index": 0,
"axtree_txt": "",
"app_state": {},
}
# Get browser state (implementation depends on BrowserGym API)
# This is a simplified version - actual implementation would use BrowserGym's observation
return {
"html": self._current_html,
"url": self._current_url,
"open_pages_urls": [self._current_url],
"active_page_index": 0,
"axtree_txt": self._current_axtree,
"app_state": self._app_state,
}
def reset(self) -> OpenAppObservation:
"""
Reset the environment.
Returns:
OpenAppObservation with initial state
"""
# Reset state
self._state = State(episode_id=str(uuid4()), step_count=0)
self._episode_reward = 0.0
self._last_action_error = None
# Check if OpenApps server is running, start if needed
if self._apps_process is None and not os.environ.get("OPENAPPS_URL"):
self._apps_process = self._launch_openapps_server()
# Initialize browser
if self._browser_env is None:
self._initialize_browser()
# Reset the BrowserGym environment
try:
obs, info = self._browser_env.reset()
# Extract observation data from BrowserGym
self._current_url = obs.get("url", self.openapps_url)
self._current_html = obs.get("dom_txt", "")
self._current_axtree = obs.get("axtree_txt", "")
self._app_state = {}
except Exception as e:
print(f"Warning: Failed to reset browser environment: {e}")
# Fallback to placeholder values
self._current_url = self.openapps_url
self._current_html = "<html><body>OpenApps Ready</body></html>"
self._current_axtree = ""
self._app_state = {}
obs_data = self._get_current_observation()
return OpenAppObservation(
html=obs_data["html"],
url=obs_data["url"],
open_pages_urls=obs_data["open_pages_urls"],
active_page_index=obs_data["active_page_index"],
axtree_txt=obs_data["axtree_txt"],
app_state=obs_data["app_state"],
task_info={"task_name": self.task_name} if self.task_name else None,
last_action_error=None,
done=False,
reward=0.0,
)
def step(self, action: OpenAppAction) -> OpenAppObservation: # type: ignore[override]
"""
Execute a step in the environment.
Args:
action: OpenAppAction to execute
Returns:
OpenAppObservation with resulting state and reward
"""
self._state.step_count += 1
self._last_action_error = None
reward = 0.0
try:
# Execute action based on type
if action.action_type == "click":
reward = self._execute_click(action.bid)
elif action.action_type == "fill":
reward = self._execute_fill(action.bid, action.text)
elif action.action_type == "select_option":
reward = self._execute_select(action.bid, action.value)
elif action.action_type == "goto":
reward = self._execute_goto(action.url)
elif action.action_type == "scroll":
reward = self._execute_scroll(action.direction)
elif action.action_type == "send_keys":
reward = self._execute_send_keys(action.text)
elif action.action_type == "noop":
reward = 0.0
else:
self._last_action_error = f"Unknown action type: {action.action_type}"
reward = -0.1
except Exception as e:
self._last_action_error = str(e)
reward = -0.1
# Update cumulative reward
self._episode_reward += reward
# Check if episode is done
done = self._state.step_count >= self._max_steps
# Get current observation
obs_data = self._get_current_observation()
return OpenAppObservation(
html=obs_data["html"],
url=obs_data["url"],
open_pages_urls=obs_data["open_pages_urls"],
active_page_index=obs_data["active_page_index"],
axtree_txt=obs_data["axtree_txt"],
app_state=obs_data["app_state"],
task_info={"task_name": self.task_name} if self.task_name else None,
last_action_error=self._last_action_error,
done=done,
reward=reward,
metadata={"cumulative_reward": self._episode_reward},
)
def _execute_click(self, bid: str) -> float:
"""Execute click action. Returns reward.
Supports two modes:
1. CSS selector mode: If bid starts with '#', '.', or '[', it's treated as a CSS selector
and uses Playwright directly (e.g., bid="#msg-input")
2. BrowserGym mode: Otherwise, uses BrowserGym's accessibility tree bid
"""
if self._browser_env is None:
return 0.0
try:
# Check if bid is a CSS selector (starts with # or other CSS selector chars)
if bid.startswith('#') or bid.startswith('.') or bid.startswith('['):
# Use Playwright directly for CSS selectors
return self._execute_click_playwright(bid)
# BrowserGym action format: click("bid")
action = f'click("{bid}")'
obs, reward, done, truncated, info = self._browser_env.step(action)
# Update current state from observation
self._current_url = obs.get("url", self._current_url)
self._current_html = obs.get("dom_txt", self._current_html)
self._current_axtree = obs.get("axtree_txt", self._current_axtree)
return float(reward) if reward else 0.0
except Exception as e:
self._last_action_error = f"Click failed: {str(e)}"
return -0.1
def _execute_fill(self, bid: str, text: str) -> float:
"""Execute fill action. Returns reward.
Supports two modes:
1. CSS selector mode: If bid starts with '#', it's treated as an HTML ID selector
and uses Playwright directly (e.g., bid="#msg-input")
2. BrowserGym mode: Otherwise, uses BrowserGym's accessibility tree bid
"""
if self._browser_env is None:
return 0.0
try:
# Check if bid is a CSS selector (starts with # or other CSS selector chars)
if bid.startswith('#') or bid.startswith('.') or bid.startswith('['):
# Use Playwright directly for CSS selectors
return self._execute_fill_playwright(bid, text)
# BrowserGym action format: fill("bid", "text")
action = f'fill("{bid}", "{text}")'
obs, reward, done, truncated, info = self._browser_env.step(action)
# Update current state from observation
self._current_url = obs.get("url", self._current_url)
self._current_html = obs.get("dom_txt", self._current_html)
self._current_axtree = obs.get("axtree_txt", self._current_axtree)
return float(reward) if reward else 0.0
except Exception as e:
self._last_action_error = f"Fill failed: {str(e)}"
return -0.1
def _execute_fill_playwright(self, selector: str, text: str) -> float:
"""Execute fill action using Playwright directly with CSS selector."""
try:
# Access the underlying Playwright page from BrowserGym
page = self._browser_env.unwrapped.page
# Wait for element and fill it
page.wait_for_selector(selector, timeout=5000)
page.fill(selector, text)
# Small delay to let the page update
page.wait_for_timeout(200)
# Update observation after action
self._update_observation_from_page(page)
return 0.0
except Exception as e:
self._last_action_error = f"Fill (Playwright) failed: {str(e)}"
return -0.1
def _execute_click_playwright(self, selector: str) -> float:
"""Execute click action using Playwright directly with CSS selector."""
try:
# Access the underlying Playwright page from BrowserGym
page = self._browser_env.unwrapped.page
# Wait for element and click it
page.wait_for_selector(selector, timeout=5000)
page.click(selector)
# Longer delay to let HTMX process the request
page.wait_for_timeout(500)
# Update observation after action
self._update_observation_from_page(page)
return 0.0
except Exception as e:
self._last_action_error = f"Click (Playwright) failed: {str(e)}"
return -0.1
def _execute_press_key_playwright(self, key: str) -> float:
"""Execute key press using Playwright directly."""
try:
# Access the underlying Playwright page from BrowserGym
page = self._browser_env.unwrapped.page
# Press the key
page.keyboard.press(key)
# Delay to let the page update
page.wait_for_timeout(500)
# Update observation after action
self._update_observation_from_page(page)
return 0.0
except Exception as e:
self._last_action_error = f"Press key (Playwright) failed: {str(e)}"
return -0.1
def _update_observation_from_page(self, page) -> None:
"""Update internal observation state from Playwright page."""
try:
self._current_url = page.url
# Note: We can't easily get axtree from Playwright directly,
# so we'll just update URL. The next BrowserGym action will sync the state.
except Exception:
pass
def _execute_select(self, bid: str, value: str) -> float:
"""Execute select option action. Returns reward."""
if self._browser_env is None:
return 0.0
try:
# BrowserGym action format: select_option("bid", "value")
action = f'select_option("{bid}", "{value}")'
obs, reward, done, truncated, info = self._browser_env.step(action)
# Update current state from observation
self._current_url = obs.get("url", self._current_url)
self._current_html = obs.get("dom_txt", self._current_html)
self._current_axtree = obs.get("axtree_txt", self._current_axtree)
return float(reward) if reward else 0.0
except Exception as e:
self._last_action_error = f"Select failed: {str(e)}"
return -0.1
def _execute_goto(self, url: str) -> float:
"""Execute navigation action. Returns reward."""
if self._browser_env is None:
self._current_url = url
return 0.0
try:
# BrowserGym action format: goto("url")
action = f'goto("{url}")'
obs, reward, done, truncated, info = self._browser_env.step(action)
# Update current state from observation
self._current_url = obs.get("url", url)
self._current_html = obs.get("dom_txt", self._current_html)
self._current_axtree = obs.get("axtree_txt", self._current_axtree)
return float(reward) if reward else 0.0
except Exception as e:
self._last_action_error = f"Goto failed: {str(e)}"
self._current_url = url # Update URL even if failed
return -0.1
def _execute_scroll(self, direction: str) -> float:
"""Execute scroll action. Returns reward."""
if self._browser_env is None:
return 0.0
try:
# BrowserGym action format: scroll("direction")
action = f'scroll("{direction}")'
obs, reward, done, truncated, info = self._browser_env.step(action)
# Update current state from observation
self._current_url = obs.get("url", self._current_url)
self._current_html = obs.get("dom_txt", self._current_html)
self._current_axtree = obs.get("axtree_txt", self._current_axtree)
return float(reward) if reward else 0.0
except Exception as e:
self._last_action_error = f"Scroll failed: {str(e)}"
return -0.1
def _execute_send_keys(self, text: str) -> float:
"""Execute send keys action. Returns reward."""
if self._browser_env is None:
return 0.0
try:
# Special handling for Enter key - use Playwright directly for reliability
if text == "\n" or text.lower() == "enter":
return self._execute_press_key_playwright("Enter")
# BrowserGym action format: send_keys("text")
action = f'send_keys("{text}")'
obs, reward, done, truncated, info = self._browser_env.step(action)
# Update current state from observation
self._current_url = obs.get("url", self._current_url)
self._current_html = obs.get("dom_txt", self._current_html)
self._current_axtree = obs.get("axtree_txt", self._current_axtree)
return float(reward) if reward else 0.0
except Exception as e:
self._last_action_error = f"Send keys failed: {str(e)}"
return -0.1
@property
def state(self) -> State:
"""
Get the current environment state.
Returns:
Current State with episode_id and step_count
"""
return self._state
def close(self):
"""Clean up resources."""
if hasattr(self, "_browser_env") and self._browser_env is not None:
try:
self._browser_env.close()
except Exception:
pass
self._browser_env = None
if hasattr(self, "_apps_process") and self._apps_process is not None:
try:
self._apps_process.terminate()
self._apps_process.wait(timeout=5)
except Exception:
self._apps_process.kill()
self._apps_process = None
def __del__(self):
"""Cleanup on deletion."""
self.close()