File size: 7,239 Bytes
96da58e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
"""
This file contains the base class for environment wrappers that are used
to provide a standardized environment API for training policies and interacting
with metadata present in datasets.
"""
import abc


class EnvType:
    """
    Holds environment types - one per environment class.
    These act as identifiers for different environments.
    """
    ROBOSUITE_TYPE = 1
    GYM_TYPE = 2
    IG_MOMART_TYPE = 3
    REAL_TYPE = 6
    GPRS_REAL_TYPE = 7
    REAL_UR5E_TYPE = 8
    REAL_KINOVA_TYPE = 9


class EnvBase(abc.ABC):
    """A base class method for environments used by this repo."""
    @abc.abstractmethod
    def __init__(
        self,
        env_name, 
        render=False, 
        render_offscreen=False, 
        use_image_obs=False, 
        use_depth_obs=False,
        postprocess_visual_obs=True,
        **kwargs,
    ):
        """
        Args:
            env_name (str): name of environment. Only needs to be provided if making a different
                environment from the one in @env_meta.

            render (bool): if True, environment supports on-screen rendering

            render_offscreen (bool): if True, environment supports off-screen rendering. This
                is forced to be True if @env_meta["use_images"] is True.

            use_image_obs (bool): if True, environment is expected to render rgb image observations
                on every env.step call. Set this to False for efficiency reasons, if image
                observations are not required.

            use_depth_obs (bool): if True, environment is expected to render depth image observations
                on every env.step call. Set this to False for efficiency reasons, if depth
                observations are not required.

            postprocess_visual_obs (bool): if True, postprocess image observations
                to prepare for learning. This should only be False when extracting observations
                for saving to a dataset (to save space on RGB images for example).
        """
        return

    @abc.abstractmethod
    def step(self, action):
        """
        Step in the environment with an action.

        Args:
            action (np.array): action to take

        Returns:
            observation (dict): new observation dictionary
            reward (float): reward for this step
            done (bool): whether the task is done
            info (dict): extra information
        """
        return

    @abc.abstractmethod
    def reset(self):
        """
        Reset environment.

        Returns:
            observation (dict): initial observation dictionary.
        """
        return

    @abc.abstractmethod
    def reset_to(self, state):
        """
        Reset to a specific simulator state.

        Args:
            state (dict): current simulator state
        
        Returns:
            observation (dict): observation dictionary after setting the simulator state
        """
        return

    @abc.abstractmethod
    def render(self, mode="human", height=None, width=None, camera_name=None):
        """Render"""
        return

    @abc.abstractmethod
    def get_observation(self):
        """Get environment observation"""
        return

    @abc.abstractmethod
    def get_state(self):
        """Get environment simulator state, compatible with @reset_to"""
        return

    @abc.abstractmethod
    def get_reward(self):
        """
        Get current reward.
        """
        return

    @abc.abstractmethod
    def get_goal(self):
        """
        Get goal observation. Not all environments support this.
        """
        return

    @abc.abstractmethod
    def set_goal(self, **kwargs):
        """
        Set goal observation with external specification. Not all environments support this.
        """
        return

    @abc.abstractmethod
    def is_done(self):
        """
        Check if the task is done (not necessarily successful).
        """
        return

    @abc.abstractmethod
    def is_success(self):
        """
        Check if the task condition(s) is reached. Should return a dictionary
        { str: bool } with at least a "task" key for the overall task success,
        and additional optional keys corresponding to other task criteria.
        """
        return

    @property
    @abc.abstractmethod
    def action_dimension(self):
        """
        Returns dimension of actions (int).
        """
        return

    @property
    @abc.abstractmethod
    def name(self):
        """
        Returns name of environment name (str).
        """
        return

    @property
    @abc.abstractmethod
    def type(self):
        """
        Returns environment type (int) for this kind of environment.
        This helps identify this env class.
        """
        return

    @property
    def version(self):
        """
        Returns version of environment (str).
        This is not an abstract method, some subclasses do not implement it
        """
        return None

    @abc.abstractmethod
    def serialize(self):
        """
        Save all information needed to re-instantiate this environment in a dictionary.
        This is the same as @env_meta - environment metadata stored in hdf5 datasets,
        and used in utils/env_utils.py.
        """
        return

    @classmethod
    @abc.abstractmethod
    def create_for_data_processing(
        cls,
        camera_names,
        camera_height,
        camera_width,
        reward_shaping,
        render=None,
        render_offscreen=None,
        use_image_obs=None,
        use_depth_obs=None,
        **kwargs,
    ):
        """
        Create environment for processing datasets, which includes extracting
        observations, labeling dense / sparse rewards, and annotating dones in
        transitions. 

        Args:
            camera_names ([str]): list of camera names that correspond to image observations
            camera_height (int): camera height for all cameras
            camera_width (int): camera width for all cameras
            reward_shaping (bool): if True, use shaped environment rewards, else use sparse task completion rewards
            render (bool or None): optionally override rendering behavior. Defaults to False.
            render_offscreen (bool or None): optionally override rendering behavior. The default value is True if
                @camera_names is non-empty, False otherwise.
            use_image_obs (bool or None): optionally override rendering behavior. The default value is True if
                @camera_names is non-empty, False otherwise.
            use_depth_obs (bool): if True, use depth observations

        Returns:
            env (EnvBase instance)
        """
        return

    @property
    @abc.abstractmethod
    def rollout_exceptions(self):
        """
        Return tuple of exceptions to except when doing rollouts. This is useful to ensure
        that the entire training run doesn't crash because of a bad policy that causes unstable
        simulation computations.
        """
        return

    @property
    @abc.abstractmethod
    def base_env(self):
        """
        Grabs base simulation environment.
        """
        return