""" This file contains several utility functions for working with environment wrappers provided by the repository, and with environment metadata saved in dataset files. """ from copy import deepcopy import robomimic.envs.env_base as EB from robomimic.utils.log_utils import log_warning def get_env_class(env_meta=None, env_type=None, env=None): """ Return env class from either env_meta, env_type, or env. Note the use of lazy imports - this ensures that modules are only imported when the corresponding env type is requested. This can be useful in practice. For example, a training run that only requires access to gym environments should not need to import robosuite. Args: env_meta (dict): environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys: :`'env_name'`: name of environment :`'type'`: type of environment, should be a value in EB.EnvType :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor env_type (int): the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType. env (instance of EB.EnvBase): environment instance """ env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env) if env_type == EB.EnvType.ROBOSUITE_TYPE: from robomimic.envs.env_robosuite import EnvRobosuite return EnvRobosuite elif env_type == EB.EnvType.GYM_TYPE: from robomimic.envs.env_gym import EnvGym return EnvGym elif env_type == EB.EnvType.IG_MOMART_TYPE: from robomimic.envs.env_ig_momart import EnvGibsonMOMART return EnvGibsonMOMART elif env_type == EB.EnvType.REAL_TYPE: from robomimic.envs.env_real_panda import EnvRealPanda return EnvRealPanda elif env_type == EB.EnvType.GPRS_REAL_TYPE: from robomimic.envs.env_real_panda_gprs import EnvRealPandaGPRS return EnvRealPandaGPRS raise Exception("code should never reach this point") def get_env_type(env_meta=None, env_type=None, env=None): """ Helper function to get env_type from a variety of inputs. Args: env_meta (dict): environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys: :`'env_name'`: name of environment :`'type'`: type of environment, should be a value in EB.EnvType :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor env_type (int): the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType. env (instance of EB.EnvBase): environment instance """ checks = [(env_meta is not None), (env_type is not None), (env is not None)] assert sum(checks) == 1, "should provide only one of env_meta, env_type, env" if env_meta is not None: env_type = env_meta["type"] elif env is not None: env_type = env.type return env_type def check_env_type(type_to_check, env_meta=None, env_type=None, env=None): """ Checks whether the passed env_meta, env_type, or env is of type @type_to_check. Type corresponds to EB.EnvType. Args: type_to_check (int): type to check equality against env_meta (dict): environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys: :`'env_name'`: name of environment :`'type'`: type of environment, should be a value in EB.EnvType :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor env_type (int): the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType. env (instance of EB.EnvBase): environment instance """ env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env) return (env_type == type_to_check) def check_env_version(env, env_meta): """ Checks whether the passed env and env_meta dictionary having matching environment versions. Logs warning if cannot find version or versions do not match. Args: env (instance of EB.EnvBase): environment instance env_meta (dict): environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains following key: :`'env_version'`: environment version, type str """ env_system_version = env.version env_meta_version = env_meta.get("env_version", None) if env_meta_version is None: log_warning( "No environment version found in dataset!"\ "\nCannot verify if dataset and installed environment versions match"\ ) elif env_system_version != env_meta_version: log_warning( "Dataset and installed environment version mismatch!"\ "\nDataset environment version: {meta}"\ "\nInstalled environment version: {sys}".format( sys=env_system_version, meta=env_meta_version, ) ) def is_robosuite_env(env_meta=None, env_type=None, env=None): """ Determines whether the environment is a robosuite environment. Accepts either env_meta, env_type, or env. """ return check_env_type(type_to_check=EB.EnvType.ROBOSUITE_TYPE, env_meta=env_meta, env_type=env_type, env=env) def is_simpler_env(env_meta=None, env_type=None, env=None): return False def is_simpler_ov_env(env_meta=None, env_type=None, env=None): return False def is_factory_env(env_meta=None, env_type=None, env=None): return False def is_furniture_sim_env(env_meta=None, env_type=None, env=None): return False def is_real_robot_env(env_meta=None, env_type=None, env=None): """ Determines whether the environment is a real robot environment. Accepts either env_meta, env_type, or env. """ return check_env_type(type_to_check=EB.EnvType.REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env) def is_real_robot_gprs_env(env_meta=None, env_type=None, env=None): """ Determines whether the environment is a real robot environment. Accepts either env_meta, env_type, or env. """ return check_env_type(type_to_check=EB.EnvType.GPRS_REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env) def create_env( env_type, env_name, env_class=None, render=False, render_offscreen=False, use_image_obs=False, use_depth_obs=False, **kwargs, ): """ Create environment. Args: env_type (int): the type of environment, which determines the env class that will be instantiated. Should be a value in EB.EnvType. env_name (str): name of environment render (bool): if True, environment supports on-screen rendering render_offscreen (bool): if True, environment supports off-screen rendering. This is forced to be True if @use_image_obs is True. use_image_obs (bool): if True, environment is expected to render rgb image observations on every env.step call. Set this to False for efficiency reasons, if image observations are not required. use_depth_obs (bool): if True, environment is expected to render depth image observations on every env.step call. Set this to False for efficiency reasons, if depth observations are not required. """ # note: pass @postprocess_visual_obs True, to make sure images are processed for network inputs if env_class is None: env_class = get_env_class(env_type=env_type) env = env_class( env_name=env_name, render=render, render_offscreen=render_offscreen, use_image_obs=use_image_obs, use_depth_obs=use_depth_obs, postprocess_visual_obs=True, **kwargs, ) print("Created environment with name {}".format(env_name)) print("Action size is {}".format(env.action_dimension)) return env def create_env_from_metadata( env_meta, env_name=None, env_class=None, render=False, render_offscreen=False, use_image_obs=False, use_depth_obs=False, ): """ Create environment. Args: env_meta (dict): environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys: :`'env_name'`: name of environment :`'type'`: type of environment, should be a value in EB.EnvType :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor env_name (str): name of environment. Only needs to be provided if making a different environment from the one in @env_meta. render (bool): if True, environment supports on-screen rendering render_offscreen (bool): if True, environment supports off-screen rendering. This is forced to be True if @use_image_obs is True. use_image_obs (bool): if True, environment is expected to render rgb image observations on every env.step call. Set this to False for efficiency reasons, if image observations are not required. use_depth_obs (bool): if True, environment is expected to render depth image observations on every env.step call. Set this to False for efficiency reasons, if depth observations are not required. """ if env_name is None: env_name = env_meta["env_name"] env_type = get_env_type(env_meta=env_meta) env_kwargs = env_meta["env_kwargs"] env_kwargs.pop("use_image_obs", None) env_kwargs.pop("use_depth_obs", None) env = create_env( env_type=env_type, env_name=env_name, env_class=env_class, render=render, render_offscreen=render_offscreen, use_image_obs=use_image_obs, use_depth_obs=use_depth_obs, **env_kwargs, ) check_env_version(env, env_meta) return env def create_env_for_data_processing( env_meta, camera_names, camera_height, camera_width, reward_shaping, env_class=None, render=None, render_offscreen=None, use_image_obs=None, use_depth_obs=None, ): """ Creates environment for processing dataset observations and rewards. Args: env_meta (dict): environment metadata, which should be loaded from demonstration hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see @FileUtils.env_from_checkpoint). Contains 3 keys: :`'env_name'`: name of environment :`'type'`: type of environment, should be a value in EB.EnvType :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor camera_names (list of st): list of camera names that correspond to image observations camera_height (int): camera height for all cameras camera_width (int): camera width for all cameras reward_shaping (bool): if True, use shaped environment rewards, else use sparse task completion rewards render (bool or None): optionally override rendering behavior render_offscreen (bool or None): optionally override rendering behavior use_image_obs (bool or None): optionally override rendering behavior use_depth_obs (bool or None): optionally override rendering behavior """ env_name = env_meta["env_name"] env_type = get_env_type(env_meta=env_meta) env_kwargs = env_meta["env_kwargs"] if env_class is None: env_class = get_env_class(env_type=env_type) # remove possibly redundant values in kwargs env_kwargs = deepcopy(env_kwargs) env_kwargs.pop("env_name", None) env_kwargs.pop("camera_names", None) env_kwargs.pop("camera_height", None) env_kwargs.pop("camera_width", None) env_kwargs.pop("reward_shaping", None) env_kwargs.pop("render", None) env_kwargs.pop("render_offscreen", None) env_kwargs.pop("use_image_obs", None) env_kwargs.pop("use_depth_obs", None) env = env_class.create_for_data_processing( env_name=env_name, camera_names=camera_names, camera_height=camera_height, camera_width=camera_width, reward_shaping=reward_shaping, render=render, render_offscreen=render_offscreen, use_image_obs=use_image_obs, use_depth_obs=use_depth_obs, **env_kwargs, ) check_env_version(env, env_meta) return env def set_env_specific_obs_processing(env_meta=None, env_type=None, env=None): """ Sets env-specific observation processing. As an example, robosuite depth observations correspond to raw depth and should not be normalized by default, while default depth processing normalizes and clips all values to [0, 1]. """ if is_robosuite_env(env_meta=env_meta, env_type=env_type, env=env): from robomimic.utils.obs_utils import DepthModality, process_frame, unprocess_frame DepthModality.set_obs_processor(processor=( lambda obs: process_frame(frame=obs, channel_dim=1, scale=None) )) DepthModality.set_obs_unprocessor(unprocessor=( lambda obs: unprocess_frame(frame=obs, channel_dim=1, scale=None) )) def wrap_env_from_config(env, config): """ Wraps environment using the provided Config object to determine which wrappers to use (if any). """ if ("frame_stack" in config.train) and (config.train.frame_stack > 1): from robomimic.envs.wrappers import FrameStackWrapper env = FrameStackWrapper(env, num_frames=config.train.frame_stack) return env