Spaces:
Sleeping
Sleeping
File size: 14,438 Bytes
96da58e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 |
"""
This file contains several utility functions for working with environment
wrappers provided by the repository, and with environment metadata saved
in dataset files.
"""
from copy import deepcopy
import robomimic.envs.env_base as EB
from robomimic.utils.log_utils import log_warning
def get_env_class(env_meta=None, env_type=None, env=None):
"""
Return env class from either env_meta, env_type, or env.
Note the use of lazy imports - this ensures that modules are only
imported when the corresponding env type is requested. This can
be useful in practice. For example, a training run that only
requires access to gym environments should not need to import
robosuite.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env (instance of EB.EnvBase): environment instance
"""
env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env)
if env_type == EB.EnvType.ROBOSUITE_TYPE:
from robomimic.envs.env_robosuite import EnvRobosuite
return EnvRobosuite
elif env_type == EB.EnvType.GYM_TYPE:
from robomimic.envs.env_gym import EnvGym
return EnvGym
elif env_type == EB.EnvType.IG_MOMART_TYPE:
from robomimic.envs.env_ig_momart import EnvGibsonMOMART
return EnvGibsonMOMART
elif env_type == EB.EnvType.REAL_TYPE:
from robomimic.envs.env_real_panda import EnvRealPanda
return EnvRealPanda
elif env_type == EB.EnvType.GPRS_REAL_TYPE:
from robomimic.envs.env_real_panda_gprs import EnvRealPandaGPRS
return EnvRealPandaGPRS
raise Exception("code should never reach this point")
def get_env_type(env_meta=None, env_type=None, env=None):
"""
Helper function to get env_type from a variety of inputs.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env (instance of EB.EnvBase): environment instance
"""
checks = [(env_meta is not None), (env_type is not None), (env is not None)]
assert sum(checks) == 1, "should provide only one of env_meta, env_type, env"
if env_meta is not None:
env_type = env_meta["type"]
elif env is not None:
env_type = env.type
return env_type
def check_env_type(type_to_check, env_meta=None, env_type=None, env=None):
"""
Checks whether the passed env_meta, env_type, or env is of type @type_to_check.
Type corresponds to EB.EnvType.
Args:
type_to_check (int): type to check equality against
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env (instance of EB.EnvBase): environment instance
"""
env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env)
return (env_type == type_to_check)
def check_env_version(env, env_meta):
"""
Checks whether the passed env and env_meta dictionary having matching environment versions.
Logs warning if cannot find version or versions do not match.
Args:
env (instance of EB.EnvBase): environment instance
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains following key:
:`'env_version'`: environment version, type str
"""
env_system_version = env.version
env_meta_version = env_meta.get("env_version", None)
if env_meta_version is None:
log_warning(
"No environment version found in dataset!"\
"\nCannot verify if dataset and installed environment versions match"\
)
elif env_system_version != env_meta_version:
log_warning(
"Dataset and installed environment version mismatch!"\
"\nDataset environment version: {meta}"\
"\nInstalled environment version: {sys}".format(
sys=env_system_version,
meta=env_meta_version,
)
)
def is_robosuite_env(env_meta=None, env_type=None, env=None):
"""
Determines whether the environment is a robosuite environment. Accepts
either env_meta, env_type, or env.
"""
return check_env_type(type_to_check=EB.EnvType.ROBOSUITE_TYPE, env_meta=env_meta, env_type=env_type, env=env)
def is_simpler_env(env_meta=None, env_type=None, env=None):
return False
def is_simpler_ov_env(env_meta=None, env_type=None, env=None):
return False
def is_factory_env(env_meta=None, env_type=None, env=None):
return False
def is_furniture_sim_env(env_meta=None, env_type=None, env=None):
return False
def is_real_robot_env(env_meta=None, env_type=None, env=None):
"""
Determines whether the environment is a real robot environment. Accepts
either env_meta, env_type, or env.
"""
return check_env_type(type_to_check=EB.EnvType.REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env)
def is_real_robot_gprs_env(env_meta=None, env_type=None, env=None):
"""
Determines whether the environment is a real robot environment. Accepts
either env_meta, env_type, or env.
"""
return check_env_type(type_to_check=EB.EnvType.GPRS_REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env)
def create_env(
env_type,
env_name,
env_class=None,
render=False,
render_offscreen=False,
use_image_obs=False,
use_depth_obs=False,
**kwargs,
):
"""
Create environment.
Args:
env_type (int): the type of environment, which determines the env class that will
be instantiated. Should be a value in EB.EnvType.
env_name (str): name of environment
render (bool): if True, environment supports on-screen rendering
render_offscreen (bool): if True, environment supports off-screen rendering. This
is forced to be True if @use_image_obs is True.
use_image_obs (bool): if True, environment is expected to render rgb image observations
on every env.step call. Set this to False for efficiency reasons, if image
observations are not required.
use_depth_obs (bool): if True, environment is expected to render depth image observations
on every env.step call. Set this to False for efficiency reasons, if depth
observations are not required.
"""
# note: pass @postprocess_visual_obs True, to make sure images are processed for network inputs
if env_class is None:
env_class = get_env_class(env_type=env_type)
env = env_class(
env_name=env_name,
render=render,
render_offscreen=render_offscreen,
use_image_obs=use_image_obs,
use_depth_obs=use_depth_obs,
postprocess_visual_obs=True,
**kwargs,
)
print("Created environment with name {}".format(env_name))
print("Action size is {}".format(env.action_dimension))
return env
def create_env_from_metadata(
env_meta,
env_name=None,
env_class=None,
render=False,
render_offscreen=False,
use_image_obs=False,
use_depth_obs=False,
):
"""
Create environment.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
env_name (str): name of environment. Only needs to be provided if making a different
environment from the one in @env_meta.
render (bool): if True, environment supports on-screen rendering
render_offscreen (bool): if True, environment supports off-screen rendering. This
is forced to be True if @use_image_obs is True.
use_image_obs (bool): if True, environment is expected to render rgb image observations
on every env.step call. Set this to False for efficiency reasons, if image
observations are not required.
use_depth_obs (bool): if True, environment is expected to render depth image observations
on every env.step call. Set this to False for efficiency reasons, if depth
observations are not required.
"""
if env_name is None:
env_name = env_meta["env_name"]
env_type = get_env_type(env_meta=env_meta)
env_kwargs = env_meta["env_kwargs"]
env_kwargs.pop("use_image_obs", None)
env_kwargs.pop("use_depth_obs", None)
env = create_env(
env_type=env_type,
env_name=env_name,
env_class=env_class,
render=render,
render_offscreen=render_offscreen,
use_image_obs=use_image_obs,
use_depth_obs=use_depth_obs,
**env_kwargs,
)
check_env_version(env, env_meta)
return env
def create_env_for_data_processing(
env_meta,
camera_names,
camera_height,
camera_width,
reward_shaping,
env_class=None,
render=None,
render_offscreen=None,
use_image_obs=None,
use_depth_obs=None,
):
"""
Creates environment for processing dataset observations and rewards.
Args:
env_meta (dict): environment metadata, which should be loaded from demonstration
hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
@FileUtils.env_from_checkpoint). Contains 3 keys:
:`'env_name'`: name of environment
:`'type'`: type of environment, should be a value in EB.EnvType
:`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor
camera_names (list of st): list of camera names that correspond to image observations
camera_height (int): camera height for all cameras
camera_width (int): camera width for all cameras
reward_shaping (bool): if True, use shaped environment rewards, else use sparse task completion rewards
render (bool or None): optionally override rendering behavior
render_offscreen (bool or None): optionally override rendering behavior
use_image_obs (bool or None): optionally override rendering behavior
use_depth_obs (bool or None): optionally override rendering behavior
"""
env_name = env_meta["env_name"]
env_type = get_env_type(env_meta=env_meta)
env_kwargs = env_meta["env_kwargs"]
if env_class is None:
env_class = get_env_class(env_type=env_type)
# remove possibly redundant values in kwargs
env_kwargs = deepcopy(env_kwargs)
env_kwargs.pop("env_name", None)
env_kwargs.pop("camera_names", None)
env_kwargs.pop("camera_height", None)
env_kwargs.pop("camera_width", None)
env_kwargs.pop("reward_shaping", None)
env_kwargs.pop("render", None)
env_kwargs.pop("render_offscreen", None)
env_kwargs.pop("use_image_obs", None)
env_kwargs.pop("use_depth_obs", None)
env = env_class.create_for_data_processing(
env_name=env_name,
camera_names=camera_names,
camera_height=camera_height,
camera_width=camera_width,
reward_shaping=reward_shaping,
render=render,
render_offscreen=render_offscreen,
use_image_obs=use_image_obs,
use_depth_obs=use_depth_obs,
**env_kwargs,
)
check_env_version(env, env_meta)
return env
def set_env_specific_obs_processing(env_meta=None, env_type=None, env=None):
"""
Sets env-specific observation processing. As an example, robosuite depth observations
correspond to raw depth and should not be normalized by default, while default depth
processing normalizes and clips all values to [0, 1].
"""
if is_robosuite_env(env_meta=env_meta, env_type=env_type, env=env):
from robomimic.utils.obs_utils import DepthModality, process_frame, unprocess_frame
DepthModality.set_obs_processor(processor=(
lambda obs: process_frame(frame=obs, channel_dim=1, scale=None)
))
DepthModality.set_obs_unprocessor(unprocessor=(
lambda obs: unprocess_frame(frame=obs, channel_dim=1, scale=None)
))
def wrap_env_from_config(env, config):
"""
Wraps environment using the provided Config object to determine which wrappers
to use (if any).
"""
if ("frame_stack" in config.train) and (config.train.frame_stack > 1):
from robomimic.envs.wrappers import FrameStackWrapper
env = FrameStackWrapper(env, num_frames=config.train.frame_stack)
return env
|