Spaces:
Build error
Build error
| """Environment class.""" | |
| import os | |
| import tempfile | |
| import time | |
| import cv2 | |
| import imageio | |
| import gym | |
| import numpy as np | |
| from cliport.tasks import cameras | |
| from cliport.utils import pybullet_utils | |
| from cliport.utils import utils | |
| import string | |
| import pybullet as p | |
| import tempfile | |
| import random | |
| import sys | |
| PLACE_STEP = 0.0003 | |
| PLACE_DELTA_THRESHOLD = 0.005 | |
| UR5_URDF_PATH = 'ur5/ur5.urdf' | |
| UR5_WORKSPACE_URDF_PATH = 'ur5/workspace.urdf' | |
| PLANE_URDF_PATH = 'plane/plane.urdf' | |
| class Environment(gym.Env): | |
| """OpenAI Gym-style environment class.""" | |
| def __init__(self, | |
| assets_root, | |
| task=None, | |
| disp=False, | |
| shared_memory=False, | |
| hz=240, | |
| record_cfg=None): | |
| """Creates OpenAI Gym-style environment with PyBullet. | |
| Args: | |
| assets_root: root directory of assets. | |
| task: the task to use. If None, the user must call set_task for the | |
| environment to work properly. | |
| disp: show environment with PyBullet's built-in display viewer. | |
| shared_memory: run with shared memory. | |
| hz: PyBullet physics simulation step speed. Set to 480 for deformables. | |
| Raises: | |
| RuntimeError: if pybullet cannot load fileIOPlugin. | |
| """ | |
| self.curr_video = [] | |
| self.pix_size = 0.003125 | |
| self.obj_ids = {'fixed': [], 'rigid': [], 'deformable': []} | |
| self.objects = self.obj_ids # make a copy | |
| self.homej = np.array([-1, -0.5, 0.5, -0.5, -0.5, 0]) * np.pi | |
| self.agent_cams = cameras.RealSenseD415.CONFIG | |
| self.record_cfg = record_cfg | |
| self.save_video = True | |
| self.step_counter = 0 | |
| self.assets_root = assets_root | |
| color_tuple = [ | |
| gym.spaces.Box(0, 255, config['image_size'] + (3,), dtype=np.uint8) | |
| for config in self.agent_cams | |
| ] | |
| depth_tuple = [ | |
| gym.spaces.Box(0.0, 20.0, config['image_size'], dtype=np.float32) | |
| for config in self.agent_cams | |
| ] | |
| self.observation_space = gym.spaces.Dict({ | |
| 'color': gym.spaces.Tuple(color_tuple), | |
| 'depth': gym.spaces.Tuple(depth_tuple), | |
| }) | |
| self.position_bounds = gym.spaces.Box( | |
| low=np.array([0.25, -0.5, 0.], dtype=np.float32), | |
| high=np.array([0.75, 0.5, 0.28], dtype=np.float32), | |
| dtype=np.float32) | |
| self.bounds = np.array([[0.25, 0.75], [-0.5, 0.5], [0, 0.3]]) | |
| self.action_space = gym.spaces.Dict({ | |
| 'pose0': | |
| gym.spaces.Tuple( | |
| (self.position_bounds, | |
| gym.spaces.Box(-1.0, 1.0, shape=(4,), dtype=np.float32))), | |
| 'pose1': | |
| gym.spaces.Tuple( | |
| (self.position_bounds, | |
| gym.spaces.Box(-1.0, 1.0, shape=(4,), dtype=np.float32))) | |
| }) | |
| # Start PyBullet. | |
| disp_option = p.DIRECT | |
| if disp: | |
| disp_option = p.GUI | |
| if shared_memory: | |
| disp_option = p.SHARED_MEMORY | |
| client = p.connect(disp_option) | |
| file_io = p.loadPlugin('fileIOPlugin', physicsClientId=client) | |
| if file_io < 0: | |
| raise RuntimeError('pybullet: cannot load FileIO!') | |
| if file_io >= 0: | |
| p.executePluginCommand( | |
| file_io, | |
| textArgument=assets_root, | |
| intArgs=[p.AddFileIOAction], | |
| physicsClientId=client) | |
| p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) | |
| p.setPhysicsEngineParameter(enableFileCaching=0) | |
| p.setAdditionalSearchPath(assets_root) | |
| p.setAdditionalSearchPath(tempfile.gettempdir()) | |
| p.setTimeStep(1. / hz) | |
| # If using --disp, move default camera closer to the scene. | |
| if disp: | |
| target = p.getDebugVisualizerCamera()[11] | |
| p.resetDebugVisualizerCamera( | |
| cameraDistance=1.1, | |
| cameraYaw=90, | |
| cameraPitch=-25, | |
| cameraTargetPosition=target) | |
| if task: | |
| self.set_task(task) | |
| def __del__(self): | |
| if hasattr(self, 'video_writer'): | |
| self.video_writer.close() | |
| def is_static(self): | |
| """Return true if objects are no longer moving.""" | |
| v = [np.linalg.norm(p.getBaseVelocity(i)[0]) | |
| for i in self.obj_ids['rigid']] | |
| return all(np.array(v) < 5e-3) | |
| def fill_dummy_template(self, template): | |
| """check if there are empty templates that haven't been fulfilled yet. if so. fill in dummy numbers """ | |
| full_template_path = os.path.join(self.assets_root, template) | |
| with open(full_template_path, 'r') as file: | |
| fdata = file.read() | |
| fill = False | |
| for field in ['DIMH', 'DIMR', 'DIMX', 'DIMY', 'DIMZ', 'DIM']: | |
| # usually 3 should be enough | |
| if field in fdata: | |
| default_replace_vals = np.random.uniform(0.03, 0.05, size=(3,)).tolist() # [0.03,0.03,0.03] | |
| for i in range(len(default_replace_vals)): | |
| fdata = fdata.replace(f'{field}{i}', str(default_replace_vals[i])) | |
| fill = True | |
| for field in ['HALF']: | |
| # usually 3 should be enough | |
| if field in fdata: | |
| default_replace_vals = np.random.uniform(0.01, 0.03, size=(3,)).tolist() # [0.015,0.015,0.015] | |
| for i in range(len(default_replace_vals)): | |
| fdata = fdata.replace(f'{field}{i}', str(default_replace_vals[i])) | |
| fill = True | |
| if fill: | |
| alphabet = string.ascii_lowercase + string.digits | |
| rname = ''.join(random.choices(alphabet, k=16)) | |
| tmpdir = tempfile.gettempdir() | |
| template_filename = os.path.split(template)[-1] | |
| fname = os.path.join(tmpdir, f'{template_filename}.{rname}') | |
| with open(fname, 'w') as file: | |
| file.write(fdata) | |
| # print("fill-in dummys") | |
| return fname | |
| else: | |
| return template | |
| def add_object(self, urdf, pose, category='rigid', color=None, **kwargs): | |
| """List of (fixed, rigid, or deformable) objects in env.""" | |
| fixed_base = 1 if category == 'fixed' else 0 | |
| if 'template' in urdf: | |
| if not os.path.exists(os.path.join(self.assets_root, urdf)): | |
| urdf = urdf.replace("-template", "") | |
| urdf = self.fill_dummy_template(urdf) | |
| if not os.path.exists(os.path.join(self.assets_root, urdf)): | |
| print(f"missing urdf error: {os.path.join(self.assets_root, urdf)}. use dummy block.") | |
| urdf = 'stacking/block.urdf' | |
| obj_id = pybullet_utils.load_urdf( | |
| p, | |
| os.path.join(self.assets_root, urdf), | |
| pose[0], | |
| pose[1], | |
| useFixedBase=fixed_base) | |
| if not obj_id is None: | |
| self.obj_ids[category].append(obj_id) | |
| if color is not None: | |
| if type(color) is str: | |
| color = utils.COLORS[color] | |
| color = color + [1.] | |
| p.changeVisualShape(obj_id, -1, rgbaColor=color) | |
| if hasattr(self, 'record_cfg') and 'blender_render' in self.record_cfg and self.record_cfg['blender_render']: | |
| # print("urdf:", os.path.join(self.assets_root, urdf)) | |
| # if color is None: | |
| # color = (0.5,0.5,0.5,1) # by default | |
| print("color:", color) | |
| self.blender_recorder.register_object(obj_id, os.path.join(self.assets_root, urdf), color=color) | |
| return obj_id | |
| def set_color(self, obj_id, color): | |
| p.changeVisualShape(obj_id, -1, rgbaColor=color + [1]) | |
| def set_object_color(self, *args, **kwargs): | |
| return self.set_color(*args, **kwargs) | |
| # --------------------------------------------------------------------------- | |
| # Standard Gym Functions | |
| # --------------------------------------------------------------------------- | |
| def seed(self, seed=None): | |
| self._random = np.random.RandomState(seed) | |
| return seed | |
| def reset(self): | |
| """Performs common reset functionality for all supported tasks.""" | |
| if not self.task: | |
| raise ValueError('environment task must be set. Call set_task or pass ' | |
| 'the task arg in the environment constructor.') | |
| self.obj_ids = {'fixed': [], 'rigid': [], 'deformable': []} | |
| p.resetSimulation(p.RESET_USE_DEFORMABLE_WORLD) | |
| p.setGravity(0, 0, -9.8) | |
| # Temporarily disable rendering to load scene faster. | |
| p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 0) | |
| plane = pybullet_utils.load_urdf(p, os.path.join(self.assets_root, PLANE_URDF_PATH), | |
| [0, 0, -0.001]) | |
| workspace = pybullet_utils.load_urdf( | |
| p, os.path.join(self.assets_root, UR5_WORKSPACE_URDF_PATH), [0.5, 0, 0]) | |
| # Load UR5 robot arm equipped with suction end effector. | |
| # TODO(andyzeng): add back parallel-jaw grippers. | |
| self.ur5 = pybullet_utils.load_urdf( | |
| p, os.path.join(self.assets_root, UR5_URDF_PATH)) | |
| self.ee = self.task.ee(self.assets_root, self.ur5, 9, self.obj_ids) | |
| self.ee_tip = 10 # Link ID of suction cup. | |
| if hasattr(self, 'record_cfg') and 'blender_render' in self.record_cfg and self.record_cfg['blender_render']: | |
| from misc.pyBulletSimRecorder import PyBulletRecorder | |
| self.blender_recorder = PyBulletRecorder() | |
| self.blender_recorder.register_object(plane, os.path.join(self.assets_root, PLANE_URDF_PATH)) | |
| self.blender_recorder.register_object(workspace, os.path.join(self.assets_root, UR5_WORKSPACE_URDF_PATH)) | |
| self.blender_recorder.register_object(self.ur5, os.path.join(self.assets_root, UR5_URDF_PATH)) | |
| self.blender_recorder.register_object(self.ee.base, self.ee.base_urdf_path) | |
| if hasattr(self.ee, 'body'): | |
| self.blender_recorder.register_object(self.ee.body, self.ee.urdf_path) | |
| # Get revolute joint indices of robot (skip fixed joints). | |
| n_joints = p.getNumJoints(self.ur5) | |
| joints = [p.getJointInfo(self.ur5, i) for i in range(n_joints)] | |
| self.joints = [j[0] for j in joints if j[2] == p.JOINT_REVOLUTE] | |
| # Move robot to home joint configuration. | |
| for i in range(len(self.joints)): | |
| p.resetJointState(self.ur5, self.joints[i], self.homej[i]) | |
| # Reset end effector. | |
| self.ee.release() | |
| # Reset task. | |
| self.task.reset(self) | |
| # Re-enable rendering. | |
| p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1) | |
| self.step() | |
| # obs, _, _, _ = self.step() | |
| # return obs | |
| def step(self, action=None): | |
| """Execute action with specified primitive. | |
| Args: | |
| action: action to execute. | |
| Returns: | |
| (obs, reward, done, info) tuple containing MDP step data. | |
| """ | |
| if action is not None: | |
| timeout = self.task.primitive(self.movej, self.movep, self.ee, action['pose0'], action['pose1']) | |
| # Exit early if action times out. We still return an observation | |
| # so that we don't break the Gym API contract. | |
| if timeout: | |
| obs = {'color': (), 'depth': ()} | |
| for config in self.agent_cams: | |
| color, depth, _ = self.render_camera(config) | |
| obs['color'] += (color,) | |
| obs['depth'] += (depth,) | |
| return obs, 0.0, True, self.info | |
| start_time = time.time() | |
| # Step simulator asynchronously until objects settle. | |
| while not self.is_static: | |
| self.step_simulation() | |
| if time.time() - start_time > 5: # timeout | |
| break | |
| # Get task rewards. | |
| reward, info = self.task.reward() if action is not None else (0, {}) | |
| done = self.task.done() | |
| # Add ground truth robot state into info. | |
| info.update(self.info) | |
| obs = self._get_obs() | |
| # if not os.path.exists(self.record_cfg['save_video_path']): | |
| # os.mkdir(self.record_cfg['save_video_path']) | |
| # self.video_path = os.path.join(self.record_cfg['save_video_path'], "123.mp4") | |
| # video_writer = imageio.get_writer(self.video_path, | |
| # fps=self.record_cfg['fps'], | |
| # format='FFMPEG', | |
| # codec='h264', ) | |
| # print(f"has {len(self.curr_video)} frames to save") | |
| # for color in self.curr_video: | |
| # video_writer.append_data(color) | |
| # video_writer.close() | |
| # print("save video to ", self.video_path) | |
| # self.video_path = None | |
| self.cur_obs = obs | |
| self.cur_reward = reward | |
| self.cur_done = done | |
| self.cur_info = info | |
| # yield "Task Generated ==> Asset Generated ==> API Reviewed ==> Error Reviewed ==> Code Generated ==> Running Simulation", self.generated_code, self.video_path | |
| def step_simulation(self): | |
| p.stepSimulation() | |
| self.step_counter += 1 | |
| if self.save_video and self.step_counter % 60 == 0: | |
| self.add_video_frame() | |
| def render(self, mode='rgb_array'): | |
| # Render only the color image from the first camera. | |
| # Only support rgb_array for now. | |
| if mode != 'rgb_array': | |
| raise NotImplementedError('Only rgb_array implemented') | |
| color, _, _ = self.render_camera(self.agent_cams[0]) | |
| return color | |
| def render_camera_without_seg(self, config, image_size=None, shadow=1): | |
| """Render RGB-D image with specified camera configuration.""" | |
| if not image_size: | |
| image_size = config['image_size'] | |
| # OpenGL camera settings. | |
| lookdir = np.float32([0, 0, 1]).reshape(3, 1) | |
| updir = np.float32([0, -1, 0]).reshape(3, 1) | |
| rotation = p.getMatrixFromQuaternion(config['rotation']) | |
| rotm = np.float32(rotation).reshape(3, 3) | |
| lookdir = (rotm @ lookdir).reshape(-1) | |
| updir = (rotm @ updir).reshape(-1) | |
| lookat = config['position'] + lookdir | |
| focal_len = config['intrinsics'][0] | |
| znear, zfar = config['zrange'] | |
| viewm = p.computeViewMatrix(config['position'], lookat, updir) | |
| fovh = (image_size[0] / 2) / focal_len | |
| fovh = (640 / 2) / focal_len | |
| fovh = 180 * np.arctan(fovh) * 2 / np.pi | |
| # Notes: 1) FOV is vertical FOV 2) aspect must be float | |
| aspect_ratio = image_size[1] / image_size[0] | |
| projm = p.computeProjectionMatrixFOV(fovh, aspect_ratio, znear, zfar) | |
| # Render with OpenGL camera settings. | |
| _, _, color, depth, segm = p.getCameraImage( | |
| width=image_size[1], | |
| height=image_size[0], | |
| viewMatrix=viewm, | |
| projectionMatrix=projm, | |
| shadow=shadow, | |
| flags=p.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX, | |
| renderer=p.ER_BULLET_HARDWARE_OPENGL) | |
| # Get color image. | |
| color_image_size = (image_size[0], image_size[1], 4) | |
| color = np.array(color, dtype=np.uint8).reshape(color_image_size) | |
| color = color[:, :, :3] # remove alpha channel | |
| if config['noise']: | |
| color = np.int32(color) | |
| color += np.int32(self._random.normal(0, 3, image_size)) | |
| color = np.uint8(np.clip(color, 0, 255)) | |
| # Get depth image. | |
| depth_image_size = (image_size[0], image_size[1]) | |
| zbuffer = np.array(depth).reshape(depth_image_size) | |
| depth = (zfar + znear - (2. * zbuffer - 1.) * (zfar - znear)) | |
| depth = (2. * znear * zfar) / depth | |
| if config['noise']: | |
| depth += self._random.normal(0, 0.003, depth_image_size) | |
| # Get segmentation image. | |
| segm = np.uint8(segm).reshape(depth_image_size) | |
| return color, depth, segm | |
| def render_camera(self, config, image_size=None, shadow=1): | |
| """Render RGB-D image with specified camera configuration.""" | |
| if not image_size: | |
| image_size = config['image_size'] | |
| # OpenGL camera settings. | |
| lookdir = np.float32([0, 0, 1]).reshape(3, 1) | |
| updir = np.float32([0, -1, 0]).reshape(3, 1) | |
| rotation = p.getMatrixFromQuaternion(config['rotation']) | |
| rotm = np.float32(rotation).reshape(3, 3) | |
| lookdir = (rotm @ lookdir).reshape(-1) | |
| updir = (rotm @ updir).reshape(-1) | |
| lookat = config['position'] + lookdir | |
| focal_len = config['intrinsics'][0] | |
| znear, zfar = config['zrange'] | |
| viewm = p.computeViewMatrix(config['position'], lookat, updir) | |
| fovh = (image_size[0] / 2) / focal_len | |
| fovh = 180 * np.arctan(fovh) * 2 / np.pi | |
| # Notes: 1) FOV is vertical FOV 2) aspect must be float | |
| aspect_ratio = image_size[1] / image_size[0] | |
| projm = p.computeProjectionMatrixFOV(fovh, aspect_ratio, znear, zfar) | |
| # Render with OpenGL camera settings. | |
| _, _, color, depth, segm = p.getCameraImage( | |
| width=image_size[1], | |
| height=image_size[0], | |
| viewMatrix=viewm, | |
| projectionMatrix=projm, | |
| shadow=shadow, | |
| flags=p.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX, | |
| renderer=p.ER_BULLET_HARDWARE_OPENGL) | |
| # Get color image. | |
| color_image_size = (image_size[0], image_size[1], 4) | |
| color = np.array(color, dtype=np.uint8).reshape(color_image_size) | |
| color = color[:, :, :3] # remove alpha channel | |
| if config['noise']: | |
| color = np.int32(color) | |
| color += np.int32(self._random.normal(0, 3, image_size)) | |
| color = np.uint8(np.clip(color, 0, 255)) | |
| # Get depth image. | |
| depth_image_size = (image_size[0], image_size[1]) | |
| zbuffer = np.array(depth).reshape(depth_image_size) | |
| depth = (zfar + znear - (2. * zbuffer - 1.) * (zfar - znear)) | |
| depth = (2. * znear * zfar) / depth | |
| if config['noise']: | |
| depth += self._random.normal(0, 0.003, depth_image_size) | |
| # Get segmentation image. | |
| segm = np.uint8(segm).reshape(depth_image_size) | |
| return color, depth, segm | |
| def info(self): | |
| """Environment info variable with object poses, dimensions, and colors.""" | |
| # Some tasks create and remove zones, so ignore those IDs. | |
| # removed_ids = [] | |
| # if (isinstance(self.task, tasks.names['cloth-flat-notarget']) or | |
| # isinstance(self.task, tasks.names['bag-alone-open'])): | |
| # removed_ids.append(self.task.zone_id) | |
| info = {} # object id : (position, rotation, dimensions) | |
| for obj_ids in self.obj_ids.values(): | |
| for obj_id in obj_ids: | |
| pos, rot = p.getBasePositionAndOrientation(obj_id) | |
| dim = p.getVisualShapeData(obj_id)[0][3] | |
| info[obj_id] = (pos, rot, dim) | |
| info['lang_goal'] = self.get_lang_goal() | |
| return info | |
| def set_task(self, task): | |
| task.set_assets_root(self.assets_root) | |
| self.task = task | |
| def get_task_name(self): | |
| return type(self.task).__name__ | |
| def get_lang_goal(self): | |
| if self.task: | |
| return self.task.get_lang_goal() | |
| else: | |
| raise Exception("No task for was set") | |
| # --------------------------------------------------------------------------- | |
| # Robot Movement Functions | |
| # --------------------------------------------------------------------------- | |
| def movej(self, targj, speed=0.01, timeout=5): | |
| """Move UR5 to target joint configuration.""" | |
| if self.save_video: | |
| timeout = timeout * 2 # 50? | |
| t0 = time.time() | |
| while (time.time() - t0) < timeout: | |
| currj = [p.getJointState(self.ur5, i)[0] for i in self.joints] | |
| currj = np.array(currj) | |
| diffj = targj - currj | |
| if all(np.abs(diffj) < 1e-2): | |
| return False | |
| # Move with constant velocity | |
| norm = np.linalg.norm(diffj) | |
| v = diffj / norm if norm > 0 else 0 | |
| stepj = currj + v * speed | |
| gains = np.ones(len(self.joints)) | |
| p.setJointMotorControlArray( | |
| bodyIndex=self.ur5, | |
| jointIndices=self.joints, | |
| controlMode=p.POSITION_CONTROL, | |
| targetPositions=stepj, | |
| positionGains=gains) | |
| self.step_counter += 1 | |
| self.step_simulation() | |
| print(f'Warning: movej exceeded {timeout} second timeout. Skipping.') | |
| return True | |
| def start_rec(self, video_filename): | |
| assert self.record_cfg | |
| # make video directory | |
| if not os.path.exists(self.record_cfg['save_video_path']): | |
| os.makedirs(self.record_cfg['save_video_path']) | |
| # close and save existing writer | |
| if hasattr(self, 'video_writer'): | |
| self.video_writer.close() | |
| # initialize writer | |
| self.video_writer = imageio.get_writer(os.path.join(self.record_cfg['save_video_path'], | |
| f"{video_filename}.mp4"), | |
| fps=self.record_cfg['fps'], | |
| format='FFMPEG', | |
| codec='h264',) | |
| p.setRealTimeSimulation(False) | |
| self.save_video = True | |
| def end_rec(self): | |
| if hasattr(self, 'video_writer'): | |
| self.video_writer.close() | |
| p.setRealTimeSimulation(True) | |
| self.save_video = False | |
| def add_video_frame(self): | |
| # Render frame. | |
| config = self.agent_cams[0] | |
| image_size = (self.record_cfg['video_height'], self.record_cfg['video_width']) | |
| color, _, _ = self.render_camera_without_seg(config, image_size, shadow=0) | |
| color = np.array(color) | |
| # if hasattr(self.record_cfg, 'blender_render') and self.record_cfg['blender_render']: | |
| # print("add blender key frame") | |
| # self.blender_recorder.add_keyframe() | |
| # Add language instruction to video. | |
| # if self.record_cfg['add_text']: | |
| # lang_goal = self.get_lang_goal() | |
| # reward = f"Success: {self.task.get_reward():.3f}" | |
| # | |
| # font = cv2.FONT_HERSHEY_DUPLEX | |
| # font_scale = 0.65 | |
| # font_thickness = 1 | |
| # | |
| # # Write language goal. | |
| # line_length = 60 | |
| # for i in range(len(lang_goal) // line_length + 1): | |
| # lang_textsize = cv2.getTextSize(lang_goal[i*line_length:(i+1)*line_length], font, font_scale, font_thickness)[0] | |
| # lang_textX = (image_size[1] - lang_textsize[0]) // 2 | |
| # color = cv2.putText(color, lang_goal[i*line_length:(i+1)*line_length], org=(lang_textX, 570+i*30), # 600 | |
| # fontScale=font_scale, | |
| # fontFace=font, | |
| # color=(0, 0, 0), | |
| # thickness=font_thickness, lineType=cv2.LINE_AA) | |
| # | |
| # ## Write Reward. | |
| # # reward_textsize = cv2.getTextSize(reward, font, font_scale, font_thickness)[0] | |
| # # reward_textX = (image_size[1] - reward_textsize[0]) // 2 | |
| # # | |
| # # color = cv2.putText(color, reward, org=(reward_textX, 634), | |
| # # fontScale=font_scale, | |
| # # fontFace=font, | |
| # # color=(0, 0, 0), | |
| # # thickness=font_thickness, lineType=cv2.LINE_AA) | |
| # | |
| # color = np.array(color) | |
| # | |
| # if 'add_task_text' in self.record_cfg and self.record_cfg['add_task_text']: | |
| # lang_goal = self.get_task_name() | |
| # reward = f"Success: {self.task.get_reward():.3f}" | |
| # | |
| # font = cv2.FONT_HERSHEY_DUPLEX | |
| # font_scale = 1 | |
| # font_thickness = 2 | |
| # | |
| # # Write language goal. | |
| # lang_textsize = cv2.getTextSize(lang_goal, font, font_scale, font_thickness)[0] | |
| # lang_textX = (image_size[1] - lang_textsize[0]) // 2 | |
| # | |
| # color = cv2.putText(color, lang_goal, org=(lang_textX, 600), | |
| # fontScale=font_scale, | |
| # fontFace=font, | |
| # color=(255, 0, 0), | |
| # thickness=font_thickness, lineType=cv2.LINE_AA) | |
| # | |
| # color = np.array(color) | |
| self.curr_video.append(color) | |
| # self.video_writer.append_data(color) | |
| def movep(self, pose, speed=0.01): | |
| """Move UR5 to target end effector pose.""" | |
| targj = self.solve_ik(pose) | |
| return self.movej(targj, speed) | |
| def solve_ik(self, pose): | |
| """Calculate joint configuration with inverse kinematics.""" | |
| joints = p.calculateInverseKinematics( | |
| bodyUniqueId=self.ur5, | |
| endEffectorLinkIndex=self.ee_tip, | |
| targetPosition=pose[0], | |
| targetOrientation=pose[1], | |
| lowerLimits=[-3 * np.pi / 2, -2.3562, -17, -17, -17, -17], | |
| upperLimits=[-np.pi / 2, 0, 17, 17, 17, 17], | |
| jointRanges=[np.pi, 2.3562, 34, 34, 34, 34], # * 6, | |
| restPoses=np.float32(self.homej).tolist(), | |
| maxNumIterations=100, | |
| residualThreshold=1e-5) | |
| joints = np.float32(joints) | |
| joints[2:] = (joints[2:] + np.pi) % (2 * np.pi) - np.pi | |
| return joints | |
| def _get_obs(self): | |
| # Get RGB-D camera image observations. | |
| obs = {'color': (), 'depth': ()} | |
| for config in self.agent_cams: | |
| color, depth, _ = self.render_camera(config) | |
| obs['color'] += (color,) | |
| obs['depth'] += (depth,) | |
| return obs | |
| def get_object_pose(self, obj_id): | |
| return p.getBasePositionAndOrientation(obj_id) | |
| def get_object_size(self, obj_id): | |
| """ approximate object's size using AABB """ | |
| aabb_min, aabb_max = p.getAABB(obj_id) | |
| size_x = aabb_max[0] - aabb_min[0] | |
| size_y = aabb_max[1] - aabb_min[1] | |
| size_z = aabb_max[2] - aabb_min[2] | |
| return size_z * size_y * size_x | |
| class EnvironmentNoRotationsWithHeightmap(Environment): | |
| """Environment that disables any rotations and always passes [0, 0, 0, 1].""" | |
| def __init__(self, | |
| assets_root, | |
| task=None, | |
| disp=False, | |
| shared_memory=False, | |
| hz=240): | |
| super(EnvironmentNoRotationsWithHeightmap, | |
| self).__init__(assets_root, task, disp, shared_memory, hz) | |
| heightmap_tuple = [ | |
| gym.spaces.Box(0.0, 20.0, (320, 160, 3), dtype=np.float32), | |
| gym.spaces.Box(0.0, 20.0, (320, 160), dtype=np.float32), | |
| ] | |
| self.observation_space = gym.spaces.Dict({ | |
| 'heightmap': gym.spaces.Tuple(heightmap_tuple), | |
| }) | |
| self.action_space = gym.spaces.Dict({ | |
| 'pose0': gym.spaces.Tuple((self.position_bounds,)), | |
| 'pose1': gym.spaces.Tuple((self.position_bounds,)) | |
| }) | |
| def step(self, action=None): | |
| """Execute action with specified primitive. | |
| Args: | |
| action: action to execute. | |
| Returns: | |
| (obs, reward, done, info) tuple containing MDP step data. | |
| """ | |
| if action is not None: | |
| action = { | |
| 'pose0': (action['pose0'][0], [0., 0., 0., 1.]), | |
| 'pose1': (action['pose1'][0], [0., 0., 0., 1.]), | |
| } | |
| return super(EnvironmentNoRotationsWithHeightmap, self).step(action) | |
| def _get_obs(self): | |
| obs = {} | |
| color_depth_obs = {'color': (), 'depth': ()} | |
| for config in self.agent_cams: | |
| color, depth, _ = self.render_camera(config) | |
| color_depth_obs['color'] += (color,) | |
| color_depth_obs['depth'] += (depth,) | |
| cmap, hmap = utils.get_fused_heightmap(color_depth_obs, self.agent_cams, | |
| self.task.bounds, pix_size=0.003125) | |
| obs['heightmap'] = (cmap, hmap) | |
| return obs | |