File size: 14,438 Bytes
96da58e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
"""
This file contains several utility functions for working with environment
wrappers provided by the repository, and with environment metadata saved
in dataset files.
"""
from copy import deepcopy
import robomimic.envs.env_base as EB
from robomimic.utils.log_utils import log_warning


def get_env_class(env_meta=None, env_type=None, env=None):
    """
    Return env class from either env_meta, env_type, or env.
    Note the use of lazy imports - this ensures that modules are only
    imported when the corresponding env type is requested. This can
    be useful in practice. For example, a training run that only
    requires access to gym environments should not need to import
    robosuite.

    Args:
        env_meta (dict): environment metadata, which should be loaded from demonstration
            hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
            @FileUtils.env_from_checkpoint). Contains 3 keys:

                :`'env_name'`: name of environment
                :`'type'`: type of environment, should be a value in EB.EnvType
                :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor

        env_type (int): the type of environment, which determines the env class that will
            be instantiated. Should be a value in EB.EnvType.

        env (instance of EB.EnvBase): environment instance
    """
    env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env)
    if env_type == EB.EnvType.ROBOSUITE_TYPE:
        from robomimic.envs.env_robosuite import EnvRobosuite
        return EnvRobosuite
    elif env_type == EB.EnvType.GYM_TYPE:
        from robomimic.envs.env_gym import EnvGym
        return EnvGym
    elif env_type == EB.EnvType.IG_MOMART_TYPE:
        from robomimic.envs.env_ig_momart import EnvGibsonMOMART
        return EnvGibsonMOMART
    elif env_type == EB.EnvType.REAL_TYPE:
        from robomimic.envs.env_real_panda import EnvRealPanda
        return EnvRealPanda
    elif env_type == EB.EnvType.GPRS_REAL_TYPE:
        from robomimic.envs.env_real_panda_gprs import EnvRealPandaGPRS
        return EnvRealPandaGPRS
    raise Exception("code should never reach this point")


def get_env_type(env_meta=None, env_type=None, env=None):
    """
    Helper function to get env_type from a variety of inputs.

    Args:
        env_meta (dict): environment metadata, which should be loaded from demonstration
            hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
            @FileUtils.env_from_checkpoint). Contains 3 keys:

                :`'env_name'`: name of environment
                :`'type'`: type of environment, should be a value in EB.EnvType
                :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor

        env_type (int): the type of environment, which determines the env class that will
            be instantiated. Should be a value in EB.EnvType.

        env (instance of EB.EnvBase): environment instance
    """
    checks = [(env_meta is not None), (env_type is not None), (env is not None)]
    assert sum(checks) == 1, "should provide only one of env_meta, env_type, env"
    if env_meta is not None:
        env_type = env_meta["type"]
    elif env is not None:
        env_type = env.type
    return env_type


def check_env_type(type_to_check, env_meta=None, env_type=None, env=None):
    """
    Checks whether the passed env_meta, env_type, or env is of type @type_to_check.
    Type corresponds to EB.EnvType.

    Args:
        type_to_check (int): type to check equality against

        env_meta (dict): environment metadata, which should be loaded from demonstration
            hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
            @FileUtils.env_from_checkpoint). Contains 3 keys:

                :`'env_name'`: name of environment
                :`'type'`: type of environment, should be a value in EB.EnvType
                :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor

        env_type (int): the type of environment, which determines the env class that will
            be instantiated. Should be a value in EB.EnvType.

        env (instance of EB.EnvBase): environment instance
    """
    env_type = get_env_type(env_meta=env_meta, env_type=env_type, env=env)
    return (env_type == type_to_check)


def check_env_version(env, env_meta):
    """
    Checks whether the passed env and env_meta dictionary having matching environment versions.
    Logs warning if cannot find version or versions do not match.

    Args:
        env (instance of EB.EnvBase): environment instance

        env_meta (dict): environment metadata, which should be loaded from demonstration
            hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
            @FileUtils.env_from_checkpoint). Contains following key:

                :`'env_version'`: environment version, type str
    """
    env_system_version = env.version
    env_meta_version = env_meta.get("env_version", None)

    if env_meta_version is None:
        log_warning(
            "No environment version found in dataset!"\
            "\nCannot verify if dataset and installed environment versions match"\
        )
    elif env_system_version != env_meta_version:
        log_warning(
            "Dataset and installed environment version mismatch!"\
            "\nDataset environment version: {meta}"\
            "\nInstalled environment version: {sys}".format(
                sys=env_system_version,
                meta=env_meta_version,
            )
        )


def is_robosuite_env(env_meta=None, env_type=None, env=None):
    """
    Determines whether the environment is a robosuite environment. Accepts
    either env_meta, env_type, or env.
    """
    return check_env_type(type_to_check=EB.EnvType.ROBOSUITE_TYPE, env_meta=env_meta, env_type=env_type, env=env)


def is_simpler_env(env_meta=None, env_type=None, env=None):
    return False


def is_simpler_ov_env(env_meta=None, env_type=None, env=None):
    return False


def is_factory_env(env_meta=None, env_type=None, env=None):
    return False


def is_furniture_sim_env(env_meta=None, env_type=None, env=None):
    return False


def is_real_robot_env(env_meta=None, env_type=None, env=None):
    """
    Determines whether the environment is a real robot environment. Accepts
    either env_meta, env_type, or env.
    """
    return check_env_type(type_to_check=EB.EnvType.REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env)


def is_real_robot_gprs_env(env_meta=None, env_type=None, env=None):
    """
    Determines whether the environment is a real robot environment. Accepts
    either env_meta, env_type, or env.
    """
    return check_env_type(type_to_check=EB.EnvType.GPRS_REAL_TYPE, env_meta=env_meta, env_type=env_type, env=env)


def create_env(
    env_type,
    env_name,
    env_class=None,
    render=False, 
    render_offscreen=False, 
    use_image_obs=False, 
    use_depth_obs=False,
    **kwargs,
):
    """
    Create environment.

    Args:
        env_type (int): the type of environment, which determines the env class that will
            be instantiated. Should be a value in EB.EnvType.

        env_name (str): name of environment

        render (bool): if True, environment supports on-screen rendering

        render_offscreen (bool): if True, environment supports off-screen rendering. This
            is forced to be True if @use_image_obs is True.

        use_image_obs (bool): if True, environment is expected to render rgb image observations
            on every env.step call. Set this to False for efficiency reasons, if image
            observations are not required.

        use_depth_obs (bool): if True, environment is expected to render depth image observations
            on every env.step call. Set this to False for efficiency reasons, if depth
            observations are not required.
    """

    # note: pass @postprocess_visual_obs True, to make sure images are processed for network inputs
    if env_class is None:
        env_class = get_env_class(env_type=env_type)
    env = env_class(
        env_name=env_name, 
        render=render, 
        render_offscreen=render_offscreen, 
        use_image_obs=use_image_obs,
        use_depth_obs=use_depth_obs,
        postprocess_visual_obs=True,
        **kwargs,
    )
    print("Created environment with name {}".format(env_name))
    print("Action size is {}".format(env.action_dimension))
    return env


def create_env_from_metadata(
    env_meta,
    env_name=None,
    env_class=None,
    render=False, 
    render_offscreen=False, 
    use_image_obs=False, 
    use_depth_obs=False,
):
    """
    Create environment.

    Args:
        env_meta (dict): environment metadata, which should be loaded from demonstration
            hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
            @FileUtils.env_from_checkpoint). Contains 3 keys:

                :`'env_name'`: name of environment
                :`'type'`: type of environment, should be a value in EB.EnvType
                :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor

        env_name (str): name of environment. Only needs to be provided if making a different
            environment from the one in @env_meta.

        render (bool): if True, environment supports on-screen rendering

        render_offscreen (bool): if True, environment supports off-screen rendering. This
            is forced to be True if @use_image_obs is True.

        use_image_obs (bool): if True, environment is expected to render rgb image observations
            on every env.step call. Set this to False for efficiency reasons, if image
            observations are not required.

        use_depth_obs (bool): if True, environment is expected to render depth image observations
            on every env.step call. Set this to False for efficiency reasons, if depth
            observations are not required.
    """
    if env_name is None:
        env_name = env_meta["env_name"]
    env_type = get_env_type(env_meta=env_meta)
    env_kwargs = env_meta["env_kwargs"]
    env_kwargs.pop("use_image_obs", None)
    env_kwargs.pop("use_depth_obs", None)

    env = create_env(
        env_type=env_type,
        env_name=env_name,  
        env_class=env_class,
        render=render,
        render_offscreen=render_offscreen, 
        use_image_obs=use_image_obs, 
        use_depth_obs=use_depth_obs,
        **env_kwargs,
    )
    check_env_version(env, env_meta)
    return env


def create_env_for_data_processing(
    env_meta,
    camera_names, 
    camera_height, 
    camera_width, 
    reward_shaping,
    env_class=None,
    render=None,
    render_offscreen=None,
    use_image_obs=None,
    use_depth_obs=None,
):
    """
    Creates environment for processing dataset observations and rewards.

    Args:
        env_meta (dict): environment metadata, which should be loaded from demonstration
            hdf5 with @FileUtils.get_env_metadata_from_dataset or from checkpoint (see
            @FileUtils.env_from_checkpoint). Contains 3 keys:

                :`'env_name'`: name of environment
                :`'type'`: type of environment, should be a value in EB.EnvType
                :`'env_kwargs'`: dictionary of keyword arguments to pass to environment constructor

        camera_names (list of st): list of camera names that correspond to image observations

        camera_height (int): camera height for all cameras

        camera_width (int): camera width for all cameras

        reward_shaping (bool): if True, use shaped environment rewards, else use sparse task completion rewards

        render (bool or None): optionally override rendering behavior

        render_offscreen (bool or None): optionally override rendering behavior

        use_image_obs (bool or None): optionally override rendering behavior

        use_depth_obs (bool or None): optionally override rendering behavior
    """
    env_name = env_meta["env_name"]
    env_type = get_env_type(env_meta=env_meta)
    env_kwargs = env_meta["env_kwargs"]
    if env_class is None:
        env_class = get_env_class(env_type=env_type)

    # remove possibly redundant values in kwargs
    env_kwargs = deepcopy(env_kwargs)
    env_kwargs.pop("env_name", None)
    env_kwargs.pop("camera_names", None)
    env_kwargs.pop("camera_height", None)
    env_kwargs.pop("camera_width", None)
    env_kwargs.pop("reward_shaping", None)
    env_kwargs.pop("render", None)
    env_kwargs.pop("render_offscreen", None)
    env_kwargs.pop("use_image_obs", None)
    env_kwargs.pop("use_depth_obs", None)

    env = env_class.create_for_data_processing(
        env_name=env_name, 
        camera_names=camera_names, 
        camera_height=camera_height, 
        camera_width=camera_width, 
        reward_shaping=reward_shaping, 
        render=render,
        render_offscreen=render_offscreen,
        use_image_obs=use_image_obs,
        use_depth_obs=use_depth_obs,
        **env_kwargs,
    )
    check_env_version(env, env_meta)
    return env


def set_env_specific_obs_processing(env_meta=None, env_type=None, env=None):
    """
    Sets env-specific observation processing. As an example, robosuite depth observations
    correspond to raw depth and should not be normalized by default, while default depth
    processing normalizes and clips all values to [0, 1].
    """
    if is_robosuite_env(env_meta=env_meta, env_type=env_type, env=env):
        from robomimic.utils.obs_utils import DepthModality, process_frame, unprocess_frame
        DepthModality.set_obs_processor(processor=(
            lambda obs: process_frame(frame=obs, channel_dim=1, scale=None)
        ))
        DepthModality.set_obs_unprocessor(unprocessor=(
            lambda obs: unprocess_frame(frame=obs, channel_dim=1, scale=None)
        ))


def wrap_env_from_config(env, config):
    """
    Wraps environment using the provided Config object to determine which wrappers
    to use (if any).
    """
    if ("frame_stack" in config.train) and (config.train.frame_stack > 1):
        from robomimic.envs.wrappers import FrameStackWrapper
        env = FrameStackWrapper(env, num_frames=config.train.frame_stack)

    return env