Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Doom Environment Implementation. | |
| Wraps ViZDoom for reinforcement learning research with OpenEnv interface. | |
| ViZDoom is a Doom-based AI research platform for visual RL. | |
| """ | |
| import uuid | |
| from typing import List, Literal, Optional | |
| try: | |
| from ..models import DoomAction, DoomObservation | |
| except ImportError: | |
| from models import DoomAction, DoomObservation | |
| from openenv_core.env_server.interfaces import Environment | |
| from openenv_core.env_server.types import State | |
| # Import ViZDoom | |
| try: | |
| import numpy as np | |
| import vizdoom as vzd | |
| except ImportError as e: | |
| raise ImportError( | |
| "ViZDoom is not installed. " "Please install it with: pip install vizdoom" | |
| ) from e | |
| class DoomEnvironment(Environment): | |
| """ | |
| Doom Environment wrapper for OpenEnv. | |
| This environment wraps ViZDoom scenarios and provides a clean interface | |
| for RL training with visual observations and game variables. | |
| Args: | |
| scenario: Name of the scenario to load (e.g., "basic", "deadly_corridor"). | |
| Can also be a path to a .cfg file. | |
| screen_resolution: Screen resolution - one of ViZDoom's resolutions. | |
| screen_format: Screen format - "CRCGCB", "RGB24", "GRAY8", etc. | |
| window_visible: Whether to show the game window. | |
| use_discrete_actions: If True, use pre-defined discrete action space. | |
| If False, use continuous button combinations. | |
| Example: | |
| >>> env = DoomEnvironment("basic") | |
| >>> obs = env.reset() | |
| >>> print(obs.screen_shape) # e.g., [120, 160, 3] | |
| >>> obs = env.step(DoomAction(action_id=0)) # Take action | |
| >>> print(obs.reward, obs.done) | |
| """ | |
| def __init__( | |
| self, | |
| scenario: str = "basic", | |
| screen_resolution: str = "RES_160X120", | |
| screen_format: str = "RGB24", | |
| window_visible: bool = False, | |
| use_discrete_actions: bool = True, | |
| ): | |
| """Initialize Doom environment.""" | |
| super().__init__() | |
| self.scenario = scenario | |
| self.screen_resolution = screen_resolution | |
| self.screen_format = screen_format | |
| self.window_visible = window_visible | |
| self.use_discrete_actions = use_discrete_actions | |
| # Create DoomGame instance | |
| self.game = vzd.DoomGame() | |
| # Load configuration | |
| self._load_scenario(scenario) | |
| # Configure visual settings | |
| self._configure_visuals(screen_resolution, screen_format, window_visible) | |
| # Initialize the game | |
| self.game.init() | |
| # Get available actions | |
| self.available_buttons = self.game.get_available_buttons() | |
| self.num_buttons = len(self.available_buttons) | |
| # Create discrete action space if requested | |
| if use_discrete_actions: | |
| self._create_discrete_actions() | |
| else: | |
| self.discrete_actions = None | |
| # Get screen dimensions | |
| self._update_screen_shape() | |
| # Initialize state | |
| self._state = State(episode_id=str(uuid.uuid4()), step_count=0) | |
| # Rendering state | |
| self._render_window = None | |
| def _load_scenario(self, scenario: str) -> None: | |
| """Load scenario configuration.""" | |
| # Check if scenario is a path to .cfg file | |
| if scenario.endswith(".cfg"): | |
| self.game.load_config(scenario) | |
| else: | |
| # Try to load built-in scenario | |
| try: | |
| # ViZDoom scenarios are typically in scenarios/ directory | |
| scenario_path = vzd.scenarios_path + f"/{scenario}.cfg" | |
| self.game.load_config(scenario_path) | |
| except Exception: | |
| # If built-in scenario not found, try as direct path | |
| try: | |
| self.game.load_config(scenario) | |
| except Exception as e: | |
| raise ValueError( | |
| f"Could not load scenario '{scenario}'. " | |
| f"Provide either a built-in scenario name (e.g., 'basic') " | |
| f"or a path to a .cfg file." | |
| ) from e | |
| def _configure_visuals( | |
| self, screen_resolution: str, screen_format: str, window_visible: bool | |
| ) -> None: | |
| """Configure visual settings.""" | |
| # Set screen resolution | |
| resolution_map = { | |
| "RES_160X120": vzd.ScreenResolution.RES_160X120, | |
| "RES_320X240": vzd.ScreenResolution.RES_320X240, | |
| "RES_640X480": vzd.ScreenResolution.RES_640X480, | |
| "RES_800X600": vzd.ScreenResolution.RES_800X600, | |
| "RES_1024X768": vzd.ScreenResolution.RES_1024X768, | |
| } | |
| if screen_resolution in resolution_map: | |
| self.game.set_screen_resolution(resolution_map[screen_resolution]) | |
| else: | |
| # Try to use it directly as a ViZDoom resolution | |
| try: | |
| self.game.set_screen_resolution( | |
| getattr(vzd.ScreenResolution, screen_resolution) | |
| ) | |
| except AttributeError: | |
| raise ValueError(f"Invalid screen resolution: {screen_resolution}") | |
| # Set screen format | |
| format_map = { | |
| "RGB24": vzd.ScreenFormat.RGB24, | |
| "GRAY8": vzd.ScreenFormat.GRAY8, | |
| "CRCGCB": vzd.ScreenFormat.CRCGCB, | |
| "CBCGCR": vzd.ScreenFormat.CBCGCR, | |
| "DOOM_256_COLORS8": vzd.ScreenFormat.DOOM_256_COLORS8, | |
| } | |
| if screen_format in format_map: | |
| self.game.set_screen_format(format_map[screen_format]) | |
| else: | |
| try: | |
| self.game.set_screen_format(getattr(vzd.ScreenFormat, screen_format)) | |
| except AttributeError: | |
| raise ValueError(f"Invalid screen format: {screen_format}") | |
| # Set window visibility | |
| self.game.set_window_visible(window_visible) | |
| def _create_discrete_actions(self) -> None: | |
| """ | |
| Create a discrete action space. | |
| Common actions for most scenarios: | |
| 0: No action | |
| 1: Move left | |
| 2: Move right | |
| 3: Move forward | |
| 4: Move backward | |
| 5: Turn left | |
| 6: Turn right | |
| 7: Attack/Shoot | |
| """ | |
| # Create some basic discrete actions | |
| # Each action is a list of button presses | |
| self.discrete_actions = [] | |
| # Action 0: No-op | |
| self.discrete_actions.append([0] * self.num_buttons) | |
| # Create single-button actions | |
| for i in range(self.num_buttons): | |
| action = [0] * self.num_buttons | |
| action[i] = 1 | |
| self.discrete_actions.append(action) | |
| # You can add more complex combinations here if needed | |
| # For example, move forward + shoot: | |
| # if self.num_buttons >= 2: | |
| # action = [0] * self.num_buttons | |
| # action[0] = 1 # e.g., move forward | |
| # action[1] = 1 # e.g., shoot | |
| # self.discrete_actions.append(action) | |
| def _update_screen_shape(self) -> None: | |
| """Update screen shape based on current state.""" | |
| if self.game.is_episode_finished(): | |
| # Use default shape if episode is finished | |
| if self.screen_format == "RGB24": | |
| channels = 3 | |
| elif self.screen_format == "GRAY8": | |
| channels = 1 | |
| else: | |
| channels = 3 # default | |
| # Get resolution from settings | |
| res_str = self.screen_resolution.replace("RES_", "").replace("X", "x") | |
| width, height = map(int, res_str.lower().split("x")) | |
| self.screen_shape = ( | |
| [height, width, channels] if channels > 1 else [height, width] | |
| ) | |
| else: | |
| state = self.game.get_state() | |
| if state is not None and state.screen_buffer is not None: | |
| self.screen_shape = list(state.screen_buffer.shape) | |
| else: | |
| # Fallback | |
| self.screen_shape = [120, 160, 3] | |
| def reset(self) -> DoomObservation: | |
| """ | |
| Reset the environment and return initial observation. | |
| Returns: | |
| Initial observation for the agent. | |
| """ | |
| # Start new episode | |
| self.game.new_episode() | |
| # Reset state tracking | |
| self._state.episode_id = str(uuid.uuid4()) | |
| self._state.step_count = 0 | |
| # Get initial observation | |
| return self._make_observation() | |
| def step(self, action: DoomAction) -> DoomObservation: | |
| """ | |
| Execute agent's action and return resulting observation. | |
| Args: | |
| action: DoomAction containing either action_id or buttons. | |
| Returns: | |
| Observation after action execution. | |
| Raises: | |
| ValueError: If action is invalid. | |
| """ | |
| if not isinstance(action, DoomAction): | |
| raise ValueError(f"Expected DoomAction, got {type(action)}") | |
| # Convert action to button presses | |
| if action.action_id is not None: | |
| # Use discrete action | |
| if not self.use_discrete_actions or self.discrete_actions is None: | |
| raise ValueError("discrete actions not enabled") | |
| if action.action_id < 0 or action.action_id >= len(self.discrete_actions): | |
| raise ValueError( | |
| f"Invalid action_id: {action.action_id}. " | |
| f"Valid range: [0, {len(self.discrete_actions) - 1}]" | |
| ) | |
| buttons = self.discrete_actions[action.action_id] | |
| elif action.buttons is not None: | |
| # Use button combination | |
| if len(action.buttons) != self.num_buttons: | |
| raise ValueError( | |
| f"Invalid button count: {len(action.buttons)}. " | |
| f"Expected {self.num_buttons} buttons." | |
| ) | |
| buttons = action.buttons | |
| else: | |
| raise ValueError("Either action_id or buttons must be provided") | |
| # Execute action | |
| reward = self.game.make_action(buttons) | |
| self._state.step_count += 1 | |
| # Get observation | |
| obs = self._make_observation() | |
| obs.reward = float(reward) | |
| return obs | |
| def state(self) -> State: | |
| """Get current environment state.""" | |
| return self._state | |
| def _make_observation(self) -> DoomObservation: | |
| """ | |
| Create a DoomObservation from current game state. | |
| Returns: | |
| DoomObservation for the agent. | |
| """ | |
| # Check if episode is finished | |
| episode_finished = self.game.is_episode_finished() | |
| if episode_finished: | |
| # Return empty observation when episode is done | |
| screen_flat = [0] * int(np.prod(self.screen_shape)) | |
| game_vars = [] | |
| else: | |
| # Get current state | |
| state = self.game.get_state() | |
| # Get screen buffer | |
| if state.screen_buffer is not None: | |
| screen = state.screen_buffer | |
| # Flatten screen for JSON serialization | |
| screen_flat = screen.flatten().tolist() | |
| self.screen_shape = list(screen.shape) | |
| else: | |
| screen_flat = [0] * int(np.prod(self.screen_shape)) | |
| # Get game variables | |
| if hasattr(state, "game_variables"): | |
| game_vars = ( | |
| state.game_variables.tolist() | |
| if state.game_variables is not None | |
| else [] | |
| ) | |
| else: | |
| game_vars = [] | |
| # Get available actions | |
| if self.use_discrete_actions and self.discrete_actions is not None: | |
| available_actions = list(range(len(self.discrete_actions))) | |
| else: | |
| available_actions = [] | |
| # Create observation | |
| obs = DoomObservation( | |
| screen_buffer=screen_flat, | |
| screen_shape=self.screen_shape, | |
| game_variables=game_vars, | |
| available_actions=available_actions, | |
| episode_finished=episode_finished, | |
| done=episode_finished, | |
| reward=0.0, # Will be filled in by step() | |
| metadata={ | |
| "scenario": self.scenario, | |
| "num_buttons": self.num_buttons, | |
| "available_buttons": [str(b) for b in self.available_buttons], | |
| }, | |
| ) | |
| return obs | |
| def render(self, mode: str = "human") -> Optional[np.ndarray]: | |
| """ | |
| Render the environment. | |
| Args: | |
| mode: Render mode - "human" for window display, "rgb_array" for array return. | |
| Returns: | |
| RGB array if mode is "rgb_array", None otherwise. | |
| """ | |
| if self.game.is_episode_finished(): | |
| # Can't render if episode is finished | |
| if mode == "rgb_array": | |
| return np.zeros(self.screen_shape, dtype=np.uint8) | |
| return None | |
| state = self.game.get_state() | |
| if state is None or state.screen_buffer is None: | |
| if mode == "rgb_array": | |
| return np.zeros(self.screen_shape, dtype=np.uint8) | |
| return None | |
| screen = state.screen_buffer | |
| if mode == "rgb_array": | |
| # Return the screen buffer as numpy array | |
| return screen | |
| elif mode == "human": | |
| # Display using matplotlib or cv2 | |
| try: | |
| import cv2 | |
| # Create window if it doesn't exist | |
| if self._render_window is None: | |
| self._render_window = "ViZDoom - Doom Environment" | |
| cv2.namedWindow(self._render_window, cv2.WINDOW_NORMAL) | |
| # Convert to BGR for OpenCV (if RGB) | |
| if len(screen.shape) == 3 and screen.shape[2] == 3: | |
| screen_bgr = cv2.cvtColor(screen, cv2.COLOR_RGB2BGR) | |
| else: | |
| screen_bgr = screen | |
| # Display | |
| cv2.imshow(self._render_window, screen_bgr) | |
| cv2.waitKey(1) # Small delay to update window | |
| except ImportError: | |
| # Fallback to matplotlib if cv2 not available | |
| try: | |
| import matplotlib.pyplot as plt | |
| if self._render_window is None: | |
| plt.ion() # Enable interactive mode | |
| self._render_window = plt.figure(figsize=(8, 6)) | |
| self._render_window.canvas.manager.set_window_title( | |
| "ViZDoom - Doom Environment" | |
| ) | |
| plt.clf() | |
| if len(screen.shape) == 3: | |
| plt.imshow(screen) | |
| else: | |
| plt.imshow(screen, cmap="gray") | |
| plt.axis("off") | |
| plt.pause(0.001) # Small delay to update window | |
| except ImportError: | |
| print( | |
| "Warning: Neither cv2 nor matplotlib available for rendering. " | |
| "Install with: pip install opencv-python or pip install matplotlib" | |
| ) | |
| return None | |
| else: | |
| raise ValueError( | |
| f"Invalid render mode: {mode}. Use 'human' or 'rgb_array'." | |
| ) | |
| def close(self) -> None: | |
| """Clean up resources.""" | |
| # Close render window if it exists | |
| if self._render_window is not None: | |
| try: | |
| import cv2 | |
| cv2.destroyAllWindows() | |
| except ImportError: | |
| try: | |
| import matplotlib.pyplot as plt | |
| plt.close("all") | |
| except ImportError: | |
| pass | |
| self._render_window = None | |
| # Close the game | |
| if hasattr(self, "game") and self.game is not None: | |
| self.game.close() | |
| def __del__(self): | |
| """Destructor to ensure game is closed.""" | |
| self.close() | |