| import time |
| import numpy as np |
| import open3d as o3d |
| import spatialmath.base as sm |
| from pyrep.const import RenderMode |
| from pfp.envs.base_env import BaseEnv |
| from pyrep.errors import IKError |
| from rlbench.environment import Environment |
| from rlbench.backend.observation import Observation |
| from rlbench.backend.exceptions import InvalidActionError |
| from rlbench.action_modes.action_mode import MoveArmThenGripper |
| from rlbench.action_modes.gripper_action_modes import Discrete |
| from rlbench.action_modes.arm_action_modes import EndEffectorPoseViaPlanning |
| from rlbench.observation_config import ObservationConfig, CameraConfig |
| from rlbench.utils import name_to_task_class |
| from pfp.common.visualization import RerunViewer as RV |
| from pfp.common.o3d_utils import make_pcd, merge_pcds |
| from pfp.common.se3_utils import rot6d_to_quat_np, pfp_to_pose_np |
|
|
| try: |
| import rerun as rr |
| except ImportError: |
| print("WARNING: Rerun not installed. Visualization will not work.") |
|
|
|
|
| class RLBenchEnv(BaseEnv): |
| """ |
| DT = 0.05 (50ms/20Hz) |
| robot_state = [px, py, pz, r00, r10, r20, r01, r11, r21, gripper] |
| The pose is the ttip frame, with x pointing backwards, y pointing left, and z pointing down. |
| """ |
|
|
| def __init__( |
| self, |
| task_name: str, |
| voxel_size: float, |
| n_points: int, |
| use_pc_color: bool, |
| headless: bool, |
| vis: bool, |
| obs_mode: str = "pcd", |
| ): |
| assert obs_mode in ["pcd", "rgb"], "Invalid obs_mode" |
| self.obs_mode = obs_mode |
| |
| self.voxel_size = voxel_size |
| self.n_points = n_points |
| self.use_pc_color = use_pc_color |
| camera_config = CameraConfig( |
| rgb=True, |
| depth=False, |
| mask=False, |
| point_cloud=True, |
| image_size=(128, 128), |
| render_mode=RenderMode.OPENGL, |
| ) |
| obs_config = ObservationConfig( |
| camera_configs={ |
| "over_shoulder_left": camera_config, |
| "over_shoulder_right": camera_config, |
| "overhead": camera_config, |
| "wrist": camera_config, |
| "front": camera_config, |
| }, |
| gripper_matrix=True, |
| gripper_joint_positions=True, |
| ) |
| |
| action_mode = MoveArmThenGripper( |
| arm_action_mode=EndEffectorPoseViaPlanning(), gripper_action_mode=Discrete() |
| ) |
| self.env = Environment( |
| action_mode, |
| obs_config=obs_config, |
| headless=headless, |
| ) |
| self.env.launch() |
| self.task = self.env.get_task(name_to_task_class(task_name)) |
| self.robot_position = self.env._robot.arm.get_position() |
| self.ws_aabb = o3d.geometry.AxisAlignedBoundingBox( |
| min_bound=(self.robot_position[0] + 0.1, -0.65, self.robot_position[2] - 0.05), |
| max_bound=(1, 0.65, 2), |
| ) |
| self.vis = vis |
| self.last_obs = None |
| if self.vis: |
| RV.add_axis("vis/origin", np.eye(4), size=0.01, timeless=True) |
| RV.add_aabb( |
| "vis/ws_aabb", self.ws_aabb.get_center(), self.ws_aabb.get_extent(), timeless=True |
| ) |
| return |
|
|
| def reset(self): |
| self.task.reset() |
| return |
|
|
| def reset_rng(self): |
| return |
|
|
| def step(self, robot_state: np.ndarray): |
| ee_position = robot_state[:3] |
| ee_quat = rot6d_to_quat_np(robot_state[3:9]) |
| gripper = robot_state[-1:] |
| action = np.concatenate([ee_position, ee_quat, gripper]) |
| reward, terminate = self._step_safe(action) |
| return reward, terminate |
|
|
| def _step_safe(self, action: np.ndarray, recursion_depth=0): |
| if recursion_depth > 15: |
| print("Warning: Recursion depth limit reached.") |
| return 0.0, True |
| try: |
| _, reward, terminate = self.task.step(action) |
| except (IKError, InvalidActionError, AttributeError, RuntimeError) as e: |
| print(e) |
| cur_position = self.last_obs.gripper_pose[:3] |
| des_position = action[:3] |
| new_position = cur_position + (des_position - cur_position) * 0.25 |
|
|
| cur_quat = self.last_obs.gripper_pose[3:] |
| cur_quat = np.array([cur_quat[3], cur_quat[0], cur_quat[1], cur_quat[2]]) |
| des_quat = action[3:7] |
| des_quat = np.array([des_quat[3], des_quat[0], des_quat[1], des_quat[2]]) |
| new_quat = sm.qslerp(cur_quat, des_quat, 0.25, shortest=True) |
| new_quat = np.array([new_quat[1], new_quat[2], new_quat[3], new_quat[0]]) |
|
|
| new_action = np.concatenate([new_position, new_quat, action[-1:]]) |
| reward, terminate = self._step_safe(new_action, recursion_depth + 1) |
| return reward, terminate |
|
|
| def get_obs(self) -> tuple[np.ndarray, ...]: |
| obs_rlbench = self.task.get_observation() |
| self.last_obs = obs_rlbench |
| robot_state = self.get_robot_state(obs_rlbench) |
| if self.obs_mode == "pcd": |
| pcd_o3d = self.get_pcd(obs_rlbench) |
| pcd = np.asarray(pcd_o3d.points) |
| if self.use_pc_color: |
| pcd_color = np.asarray(pcd_o3d.colors, dtype=np.float32) |
| pcd = np.concatenate([pcd, pcd_color], axis=-1) |
| obs = pcd |
| elif self.obs_mode == "rgb": |
| obs = self.get_images(obs_rlbench) |
| return robot_state, obs |
|
|
| def get_robot_state(self, obs: Observation) -> np.ndarray: |
| ee_position = obs.gripper_matrix[:3, 3] |
| ee_rot6d = obs.gripper_matrix[:3, :2].flatten(order="F") |
| gripper = np.array([obs.gripper_open]) |
| robot_state = np.concatenate([ee_position, ee_rot6d, gripper]) |
| return robot_state |
|
|
| def get_pcd(self, obs: Observation) -> o3d.geometry.PointCloud: |
| perception = obs.perception_data |
| right_pcd = make_pcd( |
| perception["over_shoulder_right_point_cloud"], perception["over_shoulder_right_rgb"] |
| ) |
| left_pcd = make_pcd( |
| perception["over_shoulder_left_point_cloud"], perception["over_shoulder_left_rgb"] |
| ) |
| overhead_pcd = make_pcd(perception["overhead_point_cloud"], perception["overhead_rgb"]) |
| front_pcd = make_pcd(perception["front_point_cloud"], perception["front_rgb"]) |
| wrist_pcd = make_pcd(perception["wrist_point_cloud"], perception["wrist_rgb"]) |
| pcd_list = [right_pcd, left_pcd, overhead_pcd, front_pcd, wrist_pcd] |
| pcd = merge_pcds(self.voxel_size, self.n_points, pcd_list, self.ws_aabb) |
| return pcd |
|
|
| def get_images(self, obs: Observation) -> np.ndarray: |
| perception = obs.perception_data |
| images = np.stack( |
| ( |
| perception["over_shoulder_right_rgb"], |
| perception["over_shoulder_left_rgb"], |
| perception["overhead_rgb"], |
| perception["front_rgb"], |
| perception["wrist_rgb"], |
| ) |
| ) |
| return images |
|
|
| def vis_step(self, robot_state: np.ndarray, obs: np.ndarray, prediction: np.ndarray = None): |
| """ |
| robot_state: the current robot state (10,) |
| obs: either pcd or images |
| - pcd: the current point cloud (N, 6) or (N, 3) |
| - images: the current images (5, H, W, 3) |
| prediction: the full trajectory of robot states (T, 10) |
| """ |
| VIS_FLOW = False |
| if not self.vis: |
| return |
| rr.set_time_seconds("time", time.time()) |
|
|
| |
| if self.obs_mode == "pcd": |
| pcd = obs |
| pcd_xyz = pcd[:, :3] |
| pcd_color = (pcd[:, 3:6] * 255).astype(np.uint8) if self.use_pc_color else None |
| RV.add_np_pointcloud("vis/pcd_obs", points=pcd_xyz, colors_uint8=pcd_color, radii=0.003) |
|
|
| |
| elif self.obs_mode == "rgb": |
| images = obs |
| for i, img in enumerate(images): |
| RV.add_rgb(f"vis/rgb_obs_{i}", img) |
|
|
| |
| ee_pose = pfp_to_pose_np(robot_state[np.newaxis, ...]).squeeze() |
| RV.add_axis("vis/ee_state", ee_pose) |
| rr.log("plot/gripper_state", rr.Scalar(robot_state[-1])) |
|
|
| if prediction is None: |
| return |
|
|
| |
| final_pred = prediction[-1] |
| if VIS_FLOW: |
| for traj in prediction: |
| RV.add_traj("vis/traj_k", traj) |
| else: |
| RV.add_traj("vis/ee_pred", final_pred) |
|
|
| |
| rr.log("plot/gripper_pred", rr.Scalar(final_pred[0, -1])) |
| return |
|
|
| def close(self): |
| self.env.shutdown() |
| return |
|
|
|
|
| if __name__ == "__main__": |
| env = RLBenchEnv( |
| "close_microwave", |
| voxel_size=0.01, |
| n_points=5500, |
| use_pc_color=False, |
| headless=True, |
| vis=True, |
| ) |
| env.reset() |
| for i in range(1000): |
| robot_state, pcd = env.get_obs() |
| next_robot_state = robot_state.copy() |
| next_robot_state[:3] += np.array([-0.005, 0.005, 0.0]) |
| env.step(next_robot_state) |
| env.close() |
|
|