xfu314's picture
Add phantom project with submodules and dependencies
96da58e
"""
This file contains several utility functions for working with environment
wrappers provided by the repository, and with environment metadata saved
in dataset files.
"""
from copy import deepcopy
import robomimic.envs.env_base as EB
from robomimic.utils.log_utils import log_warning
def get_env_class(env_meta=None, env_type=None, env=None):
"""
Return env class from either env_meta, env_type, or env.
Note the use of lazy imports - this ensures that modules are only
imported when the corresponding env type is requested. This can
be useful in practice. For example, a training run that only
requires access to gym environments should not need to import
robosuite.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env (instance of EB.EnvBase): environment instance
"""
env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env)
if env_type == EB.EnvType.ROBOSUITE_TYPE:
from robomimic.envs.env_robosuite import EnvRobosuite
return EnvRobosuite
elif env_type == EB.EnvType.GYM_TYPE:
from robomimic.envs.env_gym import EnvGym
return EnvGym
elif env_type == EB.EnvType.IG_MOMART_TYPE:
from robomimic.envs.env_ig_momart import EnvGibsonMOMART
return EnvGibsonMOMART
elif env_type == EB.EnvType.REAL_TYPE:
from robomimic.envs.env_real_panda import EnvRealPanda
return EnvRealPanda
elif env_type == EB.EnvType.GPRS_REAL_TYPE:
from robomimic.envs.env_real_panda_gprs import EnvRealPandaGPRS
return EnvRealPandaGPRS
raise Exception("code should never reach this point")
def get_env_type(env_meta=None, env_type=None, env=None):
"""
Helper function to get env_type from a variety of inputs.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env (instance of EB.EnvBase): environment instance
"""
checks = [(env_meta is not None), (env_type is not None), (env is not None)]
assert sum(checks) == 1, "should provide only one of env_meta, env_type, env"
if env_meta is not None:
env_type = env_meta["type"]
elif env is not None:
env_type = env.type
return env_type
def check_env_type(type_to_check, env_meta=None, env_type=None, env=None):
"""
Checks whether the passed env_meta, env_type, or env is of type @type_to_check.
Type corresponds to EB.EnvType.
Args:
type_to_check (int): type to check equality against
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env (instance of EB.EnvBase): environment instance
"""
env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env)
return (env_type == type_to_check)
def check_env_version(env, env_meta):
"""
Checks whether the passed env and env_meta dictionary having matching environment versions.
Logs warning if cannot find version or versions do not match.
Args:
env (instance of EB.EnvBase): environment instance
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains following key:
:`'env_version'`: environment version, type str
"""
env_system_version = env.version
env_meta_version = env_meta.get("env_version", None)
if env_meta_version is None:
log_warning(
"No environment version found in dataset!"\
"\nCannot verify if dataset and installed environment versions match"\
)
elif env_system_version != env_meta_version:
log_warning(
"Dataset and installed environment version mismatch!"\
"\nDataset environment version: {meta}"\
"\nInstalled environment version: {sys}".format(
sys=env_system_version,
meta=env_meta_version,
)
)
def is_robosuite_env(env_meta=None, env_type=None, env=None):
"""
Determines whether the environment is a robosuite environment. Accepts
either env_meta, env_type, or env.
"""
return check_env_type(type_to_check=EB.EnvType.ROBOSUITE_TYPE, env_meta=env_meta, env_type=env_type, env=env)
def is_simpler_env(env_meta=None, env_type=None, env=None):
return False
def is_simpler_ov_env(env_meta=None, env_type=None, env=None):
return False
def is_factory_env(env_meta=None, env_type=None, env=None):
return False
def is_furniture_sim_env(env_meta=None, env_type=None, env=None):
return False
def is_real_robot_env(env_meta=None, env_type=None, env=None):
"""
Determines whether the environment is a real robot environment. Accepts
either env_meta, env_type, or env.
"""
return check_env_type(type_to_check=EB.EnvType.REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env)
def is_real_robot_gprs_env(env_meta=None, env_type=None, env=None):
"""
Determines whether the environment is a real robot environment. Accepts
either env_meta, env_type, or env.
"""
return check_env_type(type_to_check=EB.EnvType.GPRS_REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env)
def create_env(
env_type,
env_name,
env_class=None,
render=False,
render_offscreen=False,
use_image_obs=False,
use_depth_obs=False,
**kwargs,
):
"""
Create environment.
Args:
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env_name (str): name of environment
render (bool): if True, environment supports on-screen rendering
render_offscreen (bool): if True, environment supports off-screen rendering. This
is forced to be True if @use_image_obs is True.
use_image_obs (bool): if True, environment is expected to render rgb image observations
on every env.step call. Set this to False for efficiency reasons, if image
observations are not required.
use_depth_obs (bool): if True, environment is expected to render depth image observations
on every env.step call. Set this to False for efficiency reasons, if depth
observations are not required.
"""
# note: pass @postprocess_visual_obs True, to make sure images are processed for network inputs
if env_class is None:
env_class = get_env_class(env_type=env_type)
env = env_class(
env_name=env_name,
render=render,
render_offscreen=render_offscreen,
use_image_obs=use_image_obs,
use_depth_obs=use_depth_obs,
postprocess_visual_obs=True,
**kwargs,
)
print("Created environment with name {}".format(env_name))
print("Action size is {}".format(env.action_dimension))
return env
def create_env_from_metadata(
env_meta,
env_name=None,
env_class=None,
render=False,
render_offscreen=False,
use_image_obs=False,
use_depth_obs=False,
):
"""
Create environment.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_name (str): name of environment. Only needs to be provided if making a different
environment from the one in @env_meta.
render (bool): if True, environment supports on-screen rendering
render_offscreen (bool): if True, environment supports off-screen rendering. This
is forced to be True if @use_image_obs is True.
use_image_obs (bool): if True, environment is expected to render rgb image observations
on every env.step call. Set this to False for efficiency reasons, if image
observations are not required.
use_depth_obs (bool): if True, environment is expected to render depth image observations
on every env.step call. Set this to False for efficiency reasons, if depth
observations are not required.
"""
if env_name is None:
env_name = env_meta["env_name"]
env_type = get_env_type(env_meta=env_meta)
env_kwargs = env_meta["env_kwargs"]
env_kwargs.pop("use_image_obs", None)
env_kwargs.pop("use_depth_obs", None)
env = create_env(
env_type=env_type,
env_name=env_name,
env_class=env_class,
render=render,
render_offscreen=render_offscreen,
use_image_obs=use_image_obs,
use_depth_obs=use_depth_obs,
**env_kwargs,
)
check_env_version(env, env_meta)
return env
def create_env_for_data_processing(
env_meta,
camera_names,
camera_height,
camera_width,
reward_shaping,
env_class=None,
render=None,
render_offscreen=None,
use_image_obs=None,
use_depth_obs=None,
):
"""
Creates environment for processing dataset observations and rewards.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
camera_names (list of st): list of camera names that correspond to image observations
camera_height (int): camera height for all cameras
camera_width (int): camera width for all cameras
reward_shaping (bool): if True, use shaped environment rewards, else use sparse task completion rewards
render (bool or None): optionally override rendering behavior
render_offscreen (bool or None): optionally override rendering behavior
use_image_obs (bool or None): optionally override rendering behavior
use_depth_obs (bool or None): optionally override rendering behavior
"""
env_name = env_meta["env_name"]
env_type = get_env_type(env_meta=env_meta)
env_kwargs = env_meta["env_kwargs"]
if env_class is None:
env_class = get_env_class(env_type=env_type)
# remove possibly redundant values in kwargs
env_kwargs = deepcopy(env_kwargs)
env_kwargs.pop("env_name", None)
env_kwargs.pop("camera_names", None)
env_kwargs.pop("camera_height", None)
env_kwargs.pop("camera_width", None)
env_kwargs.pop("reward_shaping", None)
env_kwargs.pop("render", None)
env_kwargs.pop("render_offscreen", None)
env_kwargs.pop("use_image_obs", None)
env_kwargs.pop("use_depth_obs", None)
env = env_class.create_for_data_processing(
env_name=env_name,
camera_names=camera_names,
camera_height=camera_height,
camera_width=camera_width,
reward_shaping=reward_shaping,
render=render,
render_offscreen=render_offscreen,
use_image_obs=use_image_obs,
use_depth_obs=use_depth_obs,
**env_kwargs,
)
check_env_version(env, env_meta)
return env
def set_env_specific_obs_processing(env_meta=None, env_type=None, env=None):
"""
Sets env-specific observation processing. As an example, robosuite depth observations
correspond to raw depth and should not be normalized by default, while default depth
processing normalizes and clips all values to [0, 1].
"""
if is_robosuite_env(env_meta=env_meta, env_type=env_type, env=env):
from robomimic.utils.obs_utils import DepthModality, process_frame, unprocess_frame
DepthModality.set_obs_processor(processor=(
lambda obs: process_frame(frame=obs, channel_dim=1, scale=None)
))
DepthModality.set_obs_unprocessor(unprocessor=(
lambda obs: unprocess_frame(frame=obs, channel_dim=1, scale=None)
))
def wrap_env_from_config(env, config):
"""
Wraps environment using the provided Config object to determine which wrappers
to use (if any).
"""
if ("frame_stack" in config.train) and (config.train.frame_stack > 1):
from robomimic.envs.wrappers import FrameStackWrapper
env = FrameStackWrapper(env, num_frames=config.train.frame_stack)
return env