Spaces:
Build error
Build error
| from collections import OrderedDict | |
| from typing import Any, Callable, List, Optional, Sequence, Type, Union | |
| import gym | |
| import numpy as np | |
| from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvIndices, VecEnvObs, VecEnvStepReturn | |
| from stable_baselines3.common.vec_env.util import dict_to_obs, obs_space_info | |
| import torch | |
| class MyDummyVecEnv(VecEnv): | |
| """ | |
| Creates a simple vectorized wrapper for multiple environments, calling each environment in sequence on the current | |
| Python process. This is useful for computationally simple environment such as ``cartpole-v1``, | |
| as the overhead of multiprocess or multithread outweighs the environment computation time. | |
| This can also be used for RL methods that | |
| require a vectorized environment, but that you want a single environments to train with. | |
| :param env_fns: a list of functions | |
| that return environments to vectorize | |
| """ | |
| def __init__(self, env_fns: List[Callable[[], gym.Env]], device): | |
| self.envs = [fn() for fn in env_fns] | |
| env = self.envs[0] | |
| VecEnv.__init__(self, len(env_fns), env.observation_space, env.action_space) | |
| obs_space = env.observation_space | |
| self.keys, shapes, dtypes = obs_space_info(obs_space) | |
| self.device = device | |
| self.buf_obs = OrderedDict( | |
| [(k, torch.zeros((self.num_envs,) + tuple(shapes[k]), dtype=torch.float, device=self.device)) for k in self.keys]) | |
| self.buf_dones = np.zeros((self.num_envs,), dtype=bool) | |
| self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32) | |
| self.buf_infos = [{} for _ in range(self.num_envs)] | |
| self.actions = None | |
| def step_async(self, actions: np.ndarray) -> None: | |
| self.actions = actions | |
| def step_wait(self) -> VecEnvStepReturn: | |
| for env_idx in range(self.num_envs): | |
| obs, self.buf_rews[env_idx], self.buf_dones[env_idx], self.buf_infos[env_idx] = self.envs[env_idx].step( | |
| self.actions[env_idx] | |
| ) | |
| if self.buf_dones[env_idx]: | |
| # save final observation where user can get it, then reset | |
| self.buf_infos[env_idx]["terminal_observation"] = obs | |
| obs = self.envs[env_idx].reset() | |
| self._save_obs(env_idx, obs) | |
| return (self._obs_from_buf(), self.buf_rews, self.buf_dones, self.buf_infos) | |
| def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: | |
| seeds = list() | |
| for idx, env in enumerate(self.envs): | |
| seeds.append(env.seed(seed + idx)) | |
| return seeds | |
| def reset(self) -> VecEnvObs: | |
| for env_idx in range(self.num_envs): | |
| obs = self.envs[env_idx].reset() | |
| self._save_obs(env_idx, obs) | |
| return self._obs_from_buf() | |
| def close(self) -> None: | |
| for env in self.envs: | |
| env.close() | |
| def get_images(self) -> Sequence[np.ndarray]: | |
| return [env.render(mode="rgb_array") for env in self.envs] | |
| def render(self, mode: str = "human") -> Optional[np.ndarray]: | |
| """ | |
| Gym environment rendering. If there are multiple environments then | |
| they are tiled together in one image via ``BaseVecEnv.render()``. | |
| Otherwise (if ``self.num_envs == 1``), we pass the render call directly to the | |
| underlying environment. | |
| Therefore, some arguments such as ``mode`` will have values that are valid | |
| only when ``num_envs == 1``. | |
| :param mode: The rendering type. | |
| """ | |
| if self.num_envs == 1: | |
| return self.envs[0].render(mode=mode) | |
| else: | |
| return super().render(mode=mode) | |
| def _save_obs(self, env_idx: int, obs: VecEnvObs) -> None: | |
| for key in self.keys: | |
| self.buf_obs[key][env_idx] = torch.from_numpy(obs[key]).to(self.device, non_blocking=True) | |
| def _obs_from_buf(self) -> VecEnvObs: | |
| return dict_to_obs(self.observation_space, self.buf_obs) | |
| def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]: | |
| """Return attribute from vectorized environment (see base class).""" | |
| target_envs = self._get_target_envs(indices) | |
| return [getattr(env_i, attr_name) for env_i in target_envs] | |
| def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None: | |
| """Set attribute inside vectorized environments (see base class).""" | |
| target_envs = self._get_target_envs(indices) | |
| for env_i in target_envs: | |
| setattr(env_i, attr_name, value) | |
| def env_method(self, method_name: str, *method_args, indices: VecEnvIndices = None, **method_kwargs) -> List[Any]: | |
| """Call instance methods of vectorized environments.""" | |
| target_envs = self._get_target_envs(indices) | |
| return [getattr(env_i, method_name)(*method_args, **method_kwargs) for env_i in target_envs] | |
| def env_is_wrapped(self, wrapper_class: Type[gym.Wrapper], indices: VecEnvIndices = None) -> List[bool]: | |
| """Check if worker environments are wrapped with a given wrapper""" | |
| target_envs = self._get_target_envs(indices) | |
| # Import here to avoid a circular import | |
| from stable_baselines3.common import env_util | |
| return [env_util.is_wrapped(env_i, wrapper_class) for env_i in target_envs] | |
| def _get_target_envs(self, indices: VecEnvIndices) -> List[gym.Env]: | |
| indices = self._get_indices(indices) | |
| return [self.envs[i] for i in indices] | |