import numpy as np import pyrender import torch import trimesh from pyrender.trackball import Trackball from rlbench.backend.const import DEPTH_SCALE from scipy.spatial.transform import Rotation from rlbench.backend.observation import Observation from rlbench import CameraConfig, ObservationConfig from pyrep.const import RenderMode from typing import List SCALE_FACTOR = DEPTH_SCALE DEFAULT_SCENE_SCALE = 2.0 def loss_weights(replay_sample, beta=1.0): loss_weights = 1.0 if "sampling_probabilities" in replay_sample: probs = replay_sample["sampling_probabilities"] loss_weights = 1.0 / torch.sqrt(probs + 1e-10) loss_weights = (loss_weights / torch.max(loss_weights)) ** beta return loss_weights def soft_updates(net, target_net, tau): for param, target_param in zip(net.parameters(), target_net.parameters()): target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data) def stack_on_channel(x): # expect (B, T, C, ...) return torch.cat(torch.split(x, 1, dim=1), dim=2).squeeze(1) def normalize_quaternion(quat): return np.array(quat) / np.linalg.norm(quat, axis=-1, keepdims=True) def correct_rotation_instability(disc, resolution): # q1 = discrete_euler_to_quaternion(disc, resolution) # q2 = discrete_euler_to_quaternion(quaternion_to_discrete_euler(q1, resolution), resolution) # # d2 = quaternion_to_discrete_euler(q2, resolution) # # # choose the smallest change # if np.any(disc != d2): # if np.sum(disc) < np.sum(d2): # return disc # else: # return d2 return disc def check_gimbal_lock(pred_rot_and_grip, gt_rot_and_grip, resolution): pred_rot_and_grip_np = pred_rot_and_grip.detach().cpu().numpy() gt_rot_and_grip_np = gt_rot_and_grip.detach().cpu().numpy() pred_rot = discrete_euler_to_quaternion(pred_rot_and_grip_np[:, :3], resolution) gt_rot = discrete_euler_to_quaternion(gt_rot_and_grip_np[:, :3], resolution) gimbal_lock_matches = [ np.all(np.abs(pred_rot[i] - gt_rot[i]) < 1e-10) and np.any(pred_rot_and_grip_np[i, :3] != gt_rot_and_grip_np[i, :3]) for i in range(pred_rot.shape[0]) ] return 0 def quaternion_to_discrete_euler(quaternion, resolution): euler = Rotation.from_quat(quaternion).as_euler("xyz", degrees=True) + 180 assert np.min(euler) >= 0 and np.max(euler) <= 360 disc = np.around((euler / resolution)).astype(int) disc[disc == int(360 / resolution)] = 0 return disc def discrete_euler_to_quaternion(discrete_euler, resolution): euluer = (discrete_euler * resolution) - 180 return Rotation.from_euler("xyz", euluer, degrees=True).as_quat() def point_to_voxel_index( point: np.ndarray, voxel_size: np.ndarray, coord_bounds: np.ndarray ): bb_mins = np.array(coord_bounds[0:3]) bb_maxs = np.array(coord_bounds[3:]) dims_m_one = np.array([voxel_size] * 3) - 1 bb_ranges = bb_maxs - bb_mins res = bb_ranges / (np.array([voxel_size] * 3) + 1e-12) voxel_indicy = np.minimum( np.floor((point - bb_mins) / (res + 1e-12)).astype(np.int32), dims_m_one ) return voxel_indicy def voxel_index_to_point( voxel_index: torch.Tensor, voxel_size: int, coord_bounds: np.ndarray ): res = (coord_bounds[:, 3:] - coord_bounds[:, :3]) / voxel_size points = (voxel_index * res) + coord_bounds[:, :3] return points def point_to_pixel_index( point: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray ): point = np.array([point[0], point[1], point[2], 1]) world_to_cam = np.linalg.inv(extrinsics) point_in_cam_frame = world_to_cam.dot(point) px, py, pz = point_in_cam_frame[:3] px = 2 * intrinsics[0, 2] - int(-intrinsics[0, 0] * (px / pz) + intrinsics[0, 2]) py = 2 * intrinsics[1, 2] - int(-intrinsics[1, 1] * (py / pz) + intrinsics[1, 2]) return px, py def _compute_initial_camera_pose(scene): # Adapted from: # https://github.com/mmatl/pyrender/blob/master/pyrender/viewer.py#L1032 centroid = scene.centroid scale = scene.scale if scale == 0.0: scale = DEFAULT_SCENE_SCALE s2 = 1.0 / np.sqrt(2.0) cp = np.eye(4) cp[:3, :3] = np.array([[0.0, -s2, s2], [1.0, 0.0, 0.0], [0.0, s2, s2]]) hfov = np.pi / 6.0 dist = scale / (2.0 * np.tan(hfov)) cp[:3, 3] = dist * np.array([1.0, 0.0, 1.0]) + centroid return cp def _from_trimesh_scene(trimesh_scene, bg_color=None, ambient_light=None): # convert trimesh geometries to pyrender geometries geometries = { name: pyrender.Mesh.from_trimesh(geom, smooth=False) for name, geom in trimesh_scene.geometry.items() } # create the pyrender scene object scene_pr = pyrender.Scene(bg_color=bg_color, ambient_light=ambient_light) # add every node with geometry to the pyrender scene for node in trimesh_scene.graph.nodes_geometry: pose, geom_name = trimesh_scene.graph[node] scene_pr.add(geometries[geom_name], pose=pose) return scene_pr def _create_bounding_box(scene, voxel_size, res): l = voxel_size * res T = np.eye(4) w = 0.01 for trans in [[0, 0, l / 2], [0, l, l / 2], [l, l, l / 2], [l, 0, l / 2]]: T[:3, 3] = np.array(trans) - voxel_size / 2 scene.add_geometry( trimesh.creation.box([w, w, l], T, face_colors=[0, 0, 0, 255]) ) for trans in [[l / 2, 0, 0], [l / 2, 0, l], [l / 2, l, 0], [l / 2, l, l]]: T[:3, 3] = np.array(trans) - voxel_size / 2 scene.add_geometry( trimesh.creation.box([l, w, w], T, face_colors=[0, 0, 0, 255]) ) for trans in [[0, l / 2, 0], [0, l / 2, l], [l, l / 2, 0], [l, l / 2, l]]: T[:3, 3] = np.array(trans) - voxel_size / 2 scene.add_geometry( trimesh.creation.box([w, l, w], T, face_colors=[0, 0, 0, 255]) ) def create_voxel_scene( voxel_grid: np.ndarray, q_attention: np.ndarray = None, highlight_coordinate: np.ndarray = None, highlight_gt_coordinate: np.ndarray = None, highlight_alpha: float = 1.0, voxel_size: float = 0.1, show_bb: bool = False, alpha: float = 0.5, ): _, d, h, w = voxel_grid.shape v = voxel_grid.transpose((1, 2, 3, 0)) occupancy = v[:, :, :, -1] != 0 alpha = np.expand_dims(np.full_like(occupancy, alpha, dtype=np.float32), -1) rgb = np.concatenate([(v[:, :, :, 3:6] + 1) / 2.0, alpha], axis=-1) if q_attention is not None: q = np.max(q_attention, 0) q = q / np.max(q) show_q = q > 0.75 occupancy = (show_q + occupancy).astype(bool) q = np.expand_dims(q - 0.5, -1) # Max q can be is 0.9 q_rgb = np.concatenate( [q, np.zeros_like(q), np.zeros_like(q), np.clip(q, 0, 1)], axis=-1 ) rgb = np.where(np.expand_dims(show_q, -1), q_rgb, rgb) if highlight_coordinate is not None: x, y, z = highlight_coordinate occupancy[x, y, z] = True rgb[x, y, z] = [1.0, 0.0, 0.0, highlight_alpha] if highlight_gt_coordinate is not None: x, y, z = highlight_gt_coordinate occupancy[x, y, z] = True rgb[x, y, z] = [0.0, 0.0, 1.0, highlight_alpha] transform = trimesh.transformations.scale_and_translate( scale=voxel_size, translate=(0.0, 0.0, 0.0) ) trimesh_voxel_grid = trimesh.voxel.VoxelGrid( encoding=occupancy, transform=transform ) geometry = trimesh_voxel_grid.as_boxes(colors=rgb) scene = trimesh.Scene() scene.add_geometry(geometry) if show_bb: assert d == h == w _create_bounding_box(scene, voxel_size, d) return scene def visualise_voxel( voxel_grid: np.ndarray, q_attention: np.ndarray = None, highlight_coordinate: np.ndarray = None, highlight_gt_coordinate: np.ndarray = None, highlight_alpha: float = 1.0, rotation_amount: float = 0.0, show: bool = False, voxel_size: float = 0.1, offscreen_renderer: pyrender.OffscreenRenderer = None, show_bb: bool = False, alpha: float = 0.5, ): scene = create_voxel_scene( voxel_grid, q_attention, highlight_coordinate, highlight_gt_coordinate, highlight_alpha, voxel_size, show_bb, alpha, ) if show: scene.show() else: r = offscreen_renderer or pyrender.OffscreenRenderer( viewport_width=640, viewport_height=480, point_size=1.0 ) s = _from_trimesh_scene( scene, ambient_light=[0.8, 0.8, 0.8], bg_color=[1.0, 1.0, 1.0] ) cam = pyrender.PerspectiveCamera( yfov=np.pi / 4.0, aspectRatio=r.viewport_width / r.viewport_height ) p = _compute_initial_camera_pose(s) t = Trackball(p, (r.viewport_width, r.viewport_height), s.scale, s.centroid) t.rotate(rotation_amount, np.array([0.0, 0.0, 1.0])) s.add(cam, pose=t.pose) color, depth = r.render(s) return color.copy() def preprocess(img, dist="transporter"): """Pre-process input (subtract mean, divide by std).""" transporter_color_mean = [0.18877631, 0.18877631, 0.18877631] transporter_color_std = [0.07276466, 0.07276466, 0.07276466] transporter_depth_mean = 0.00509261 transporter_depth_std = 0.00903967 franka_color_mean = [0.622291933, 0.628313992, 0.623031488] franka_color_std = [0.168154213, 0.17626014, 0.184527364] franka_depth_mean = 0.872146842 franka_depth_std = 0.195743116 clip_color_mean = [0.48145466, 0.4578275, 0.40821073] clip_color_std = [0.26862954, 0.26130258, 0.27577711] # choose distribution if dist == "clip": color_mean = clip_color_mean color_std = clip_color_std elif dist == "franka": color_mean = franka_color_mean color_std = franka_color_std else: color_mean = transporter_color_mean color_std = transporter_color_std if dist == "franka": depth_mean = franka_depth_mean depth_std = franka_depth_std else: depth_mean = transporter_depth_mean depth_std = transporter_depth_std # convert to pytorch tensor (if required) if type(img) == torch.Tensor: def cast_shape(stat, img): tensor = torch.from_numpy(np.array(stat)).to( device=img.device, dtype=img.dtype ) tensor = tensor.unsqueeze(0).unsqueeze(-1).unsqueeze(-1) tensor = tensor.repeat(img.shape[0], 1, img.shape[-2], img.shape[-1]) return tensor color_mean = cast_shape(color_mean, img) color_std = cast_shape(color_std, img) depth_mean = cast_shape(depth_mean, img) depth_std = cast_shape(depth_std, img) # normalize img = img.clone() img[:, :3, :, :] = (img[:, :3, :, :] / 255 - color_mean) / color_std img[:, 3:, :, :] = (img[:, 3:, :, :] - depth_mean) / depth_std else: # normalize img[:, :, :3] = (img[:, :, :3] / 255 - color_mean) / color_std img[:, :, 3:] = (img[:, :, 3:] - depth_mean) / depth_std return img def rand_dist(size, min=-1.0, max=1.0): return (max - min) * torch.rand(size) + min def rand_discrete(size, min=0, max=1): if min == max: return torch.zeros(size) return torch.randint(min, max + 1, size) def split_list(lst, n): for i in range(0, len(lst), n): yield lst[i : i + n] def get_device(gpu): if gpu is not None and gpu >= 0 and torch.cuda.is_available(): device = torch.device("cuda:%d" % gpu) torch.backends.cudnn.enabled = torch.backends.cudnn.benchmark = True else: device = torch.device("cpu") return device