lsnu's picture
Add files using upload-large-folder tool
0d89eb9 verified
import numpy as np
import pyrender
import torch
import trimesh
from pyrender.trackball import Trackball
from rlbench.backend.const import DEPTH_SCALE
from scipy.spatial.transform import Rotation
from rlbench.backend.observation import Observation
from rlbench import CameraConfig, ObservationConfig
from pyrep.const import RenderMode
from typing import List
SCALE_FACTOR = DEPTH_SCALE
DEFAULT_SCENE_SCALE = 2.0
def loss_weights(replay_sample, beta=1.0):
loss_weights = 1.0
if "sampling_probabilities" in replay_sample:
probs = replay_sample["sampling_probabilities"]
loss_weights = 1.0 / torch.sqrt(probs + 1e-10)
loss_weights = (loss_weights / torch.max(loss_weights)) ** beta
return loss_weights
def soft_updates(net, target_net, tau):
for param, target_param in zip(net.parameters(), target_net.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
def stack_on_channel(x):
# expect (B, T, C, ...)
return torch.cat(torch.split(x, 1, dim=1), dim=2).squeeze(1)
def normalize_quaternion(quat):
return np.array(quat) / np.linalg.norm(quat, axis=-1, keepdims=True)
def correct_rotation_instability(disc, resolution):
# q1 = discrete_euler_to_quaternion(disc, resolution)
# q2 = discrete_euler_to_quaternion(quaternion_to_discrete_euler(q1, resolution), resolution)
#
# d2 = quaternion_to_discrete_euler(q2, resolution)
#
# # choose the smallest change
# if np.any(disc != d2):
# if np.sum(disc) < np.sum(d2):
# return disc
# else:
# return d2
return disc
def check_gimbal_lock(pred_rot_and_grip, gt_rot_and_grip, resolution):
pred_rot_and_grip_np = pred_rot_and_grip.detach().cpu().numpy()
gt_rot_and_grip_np = gt_rot_and_grip.detach().cpu().numpy()
pred_rot = discrete_euler_to_quaternion(pred_rot_and_grip_np[:, :3], resolution)
gt_rot = discrete_euler_to_quaternion(gt_rot_and_grip_np[:, :3], resolution)
gimbal_lock_matches = [
np.all(np.abs(pred_rot[i] - gt_rot[i]) < 1e-10)
and np.any(pred_rot_and_grip_np[i, :3] != gt_rot_and_grip_np[i, :3])
for i in range(pred_rot.shape[0])
]
return 0
def quaternion_to_discrete_euler(quaternion, resolution):
euler = Rotation.from_quat(quaternion).as_euler("xyz", degrees=True) + 180
assert np.min(euler) >= 0 and np.max(euler) <= 360
disc = np.around((euler / resolution)).astype(int)
disc[disc == int(360 / resolution)] = 0
return disc
def discrete_euler_to_quaternion(discrete_euler, resolution):
euluer = (discrete_euler * resolution) - 180
return Rotation.from_euler("xyz", euluer, degrees=True).as_quat()
def point_to_voxel_index(
point: np.ndarray, voxel_size: np.ndarray, coord_bounds: np.ndarray
):
bb_mins = np.array(coord_bounds[0:3])
bb_maxs = np.array(coord_bounds[3:])
dims_m_one = np.array([voxel_size] * 3) - 1
bb_ranges = bb_maxs - bb_mins
res = bb_ranges / (np.array([voxel_size] * 3) + 1e-12)
voxel_indicy = np.minimum(
np.floor((point - bb_mins) / (res + 1e-12)).astype(np.int32), dims_m_one
)
return voxel_indicy
def voxel_index_to_point(
voxel_index: torch.Tensor, voxel_size: int, coord_bounds: np.ndarray
):
res = (coord_bounds[:, 3:] - coord_bounds[:, :3]) / voxel_size
points = (voxel_index * res) + coord_bounds[:, :3]
return points
def point_to_pixel_index(
point: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray
):
point = np.array([point[0], point[1], point[2], 1])
world_to_cam = np.linalg.inv(extrinsics)
point_in_cam_frame = world_to_cam.dot(point)
px, py, pz = point_in_cam_frame[:3]
px = 2 * intrinsics[0, 2] - int(-intrinsics[0, 0] * (px / pz) + intrinsics[0, 2])
py = 2 * intrinsics[1, 2] - int(-intrinsics[1, 1] * (py / pz) + intrinsics[1, 2])
return px, py
def _compute_initial_camera_pose(scene):
# Adapted from:
# https://github.com/mmatl/pyrender/blob/master/pyrender/viewer.py#L1032
centroid = scene.centroid
scale = scene.scale
if scale == 0.0:
scale = DEFAULT_SCENE_SCALE
s2 = 1.0 / np.sqrt(2.0)
cp = np.eye(4)
cp[:3, :3] = np.array([[0.0, -s2, s2], [1.0, 0.0, 0.0], [0.0, s2, s2]])
hfov = np.pi / 6.0
dist = scale / (2.0 * np.tan(hfov))
cp[:3, 3] = dist * np.array([1.0, 0.0, 1.0]) + centroid
return cp
def _from_trimesh_scene(trimesh_scene, bg_color=None, ambient_light=None):
# convert trimesh geometries to pyrender geometries
geometries = {
name: pyrender.Mesh.from_trimesh(geom, smooth=False)
for name, geom in trimesh_scene.geometry.items()
}
# create the pyrender scene object
scene_pr = pyrender.Scene(bg_color=bg_color, ambient_light=ambient_light)
# add every node with geometry to the pyrender scene
for node in trimesh_scene.graph.nodes_geometry:
pose, geom_name = trimesh_scene.graph[node]
scene_pr.add(geometries[geom_name], pose=pose)
return scene_pr
def _create_bounding_box(scene, voxel_size, res):
l = voxel_size * res
T = np.eye(4)
w = 0.01
for trans in [[0, 0, l / 2], [0, l, l / 2], [l, l, l / 2], [l, 0, l / 2]]:
T[:3, 3] = np.array(trans) - voxel_size / 2
scene.add_geometry(
trimesh.creation.box([w, w, l], T, face_colors=[0, 0, 0, 255])
)
for trans in [[l / 2, 0, 0], [l / 2, 0, l], [l / 2, l, 0], [l / 2, l, l]]:
T[:3, 3] = np.array(trans) - voxel_size / 2
scene.add_geometry(
trimesh.creation.box([l, w, w], T, face_colors=[0, 0, 0, 255])
)
for trans in [[0, l / 2, 0], [0, l / 2, l], [l, l / 2, 0], [l, l / 2, l]]:
T[:3, 3] = np.array(trans) - voxel_size / 2
scene.add_geometry(
trimesh.creation.box([w, l, w], T, face_colors=[0, 0, 0, 255])
)
def create_voxel_scene(
voxel_grid: np.ndarray,
q_attention: np.ndarray = None,
highlight_coordinate: np.ndarray = None,
highlight_gt_coordinate: np.ndarray = None,
highlight_alpha: float = 1.0,
voxel_size: float = 0.1,
show_bb: bool = False,
alpha: float = 0.5,
):
_, d, h, w = voxel_grid.shape
v = voxel_grid.transpose((1, 2, 3, 0))
occupancy = v[:, :, :, -1] != 0
alpha = np.expand_dims(np.full_like(occupancy, alpha, dtype=np.float32), -1)
rgb = np.concatenate([(v[:, :, :, 3:6] + 1) / 2.0, alpha], axis=-1)
if q_attention is not None:
q = np.max(q_attention, 0)
q = q / np.max(q)
show_q = q > 0.75
occupancy = (show_q + occupancy).astype(bool)
q = np.expand_dims(q - 0.5, -1) # Max q can be is 0.9
q_rgb = np.concatenate(
[q, np.zeros_like(q), np.zeros_like(q), np.clip(q, 0, 1)], axis=-1
)
rgb = np.where(np.expand_dims(show_q, -1), q_rgb, rgb)
if highlight_coordinate is not None:
x, y, z = highlight_coordinate
occupancy[x, y, z] = True
rgb[x, y, z] = [1.0, 0.0, 0.0, highlight_alpha]
if highlight_gt_coordinate is not None:
x, y, z = highlight_gt_coordinate
occupancy[x, y, z] = True
rgb[x, y, z] = [0.0, 0.0, 1.0, highlight_alpha]
transform = trimesh.transformations.scale_and_translate(
scale=voxel_size, translate=(0.0, 0.0, 0.0)
)
trimesh_voxel_grid = trimesh.voxel.VoxelGrid(
encoding=occupancy, transform=transform
)
geometry = trimesh_voxel_grid.as_boxes(colors=rgb)
scene = trimesh.Scene()
scene.add_geometry(geometry)
if show_bb:
assert d == h == w
_create_bounding_box(scene, voxel_size, d)
return scene
def visualise_voxel(
voxel_grid: np.ndarray,
q_attention: np.ndarray = None,
highlight_coordinate: np.ndarray = None,
highlight_gt_coordinate: np.ndarray = None,
highlight_alpha: float = 1.0,
rotation_amount: float = 0.0,
show: bool = False,
voxel_size: float = 0.1,
offscreen_renderer: pyrender.OffscreenRenderer = None,
show_bb: bool = False,
alpha: float = 0.5,
):
scene = create_voxel_scene(
voxel_grid,
q_attention,
highlight_coordinate,
highlight_gt_coordinate,
highlight_alpha,
voxel_size,
show_bb,
alpha,
)
if show:
scene.show()
else:
r = offscreen_renderer or pyrender.OffscreenRenderer(
viewport_width=640, viewport_height=480, point_size=1.0
)
s = _from_trimesh_scene(
scene, ambient_light=[0.8, 0.8, 0.8], bg_color=[1.0, 1.0, 1.0]
)
cam = pyrender.PerspectiveCamera(
yfov=np.pi / 4.0, aspectRatio=r.viewport_width / r.viewport_height
)
p = _compute_initial_camera_pose(s)
t = Trackball(p, (r.viewport_width, r.viewport_height), s.scale, s.centroid)
t.rotate(rotation_amount, np.array([0.0, 0.0, 1.0]))
s.add(cam, pose=t.pose)
color, depth = r.render(s)
return color.copy()
def preprocess(img, dist="transporter"):
"""Pre-process input (subtract mean, divide by std)."""
transporter_color_mean = [0.18877631, 0.18877631, 0.18877631]
transporter_color_std = [0.07276466, 0.07276466, 0.07276466]
transporter_depth_mean = 0.00509261
transporter_depth_std = 0.00903967
franka_color_mean = [0.622291933, 0.628313992, 0.623031488]
franka_color_std = [0.168154213, 0.17626014, 0.184527364]
franka_depth_mean = 0.872146842
franka_depth_std = 0.195743116
clip_color_mean = [0.48145466, 0.4578275, 0.40821073]
clip_color_std = [0.26862954, 0.26130258, 0.27577711]
# choose distribution
if dist == "clip":
color_mean = clip_color_mean
color_std = clip_color_std
elif dist == "franka":
color_mean = franka_color_mean
color_std = franka_color_std
else:
color_mean = transporter_color_mean
color_std = transporter_color_std
if dist == "franka":
depth_mean = franka_depth_mean
depth_std = franka_depth_std
else:
depth_mean = transporter_depth_mean
depth_std = transporter_depth_std
# convert to pytorch tensor (if required)
if type(img) == torch.Tensor:
def cast_shape(stat, img):
tensor = torch.from_numpy(np.array(stat)).to(
device=img.device, dtype=img.dtype
)
tensor = tensor.unsqueeze(0).unsqueeze(-1).unsqueeze(-1)
tensor = tensor.repeat(img.shape[0], 1, img.shape[-2], img.shape[-1])
return tensor
color_mean = cast_shape(color_mean, img)
color_std = cast_shape(color_std, img)
depth_mean = cast_shape(depth_mean, img)
depth_std = cast_shape(depth_std, img)
# normalize
img = img.clone()
img[:, :3, :, :] = (img[:, :3, :, :] / 255 - color_mean) / color_std
img[:, 3:, :, :] = (img[:, 3:, :, :] - depth_mean) / depth_std
else:
# normalize
img[:, :, :3] = (img[:, :, :3] / 255 - color_mean) / color_std
img[:, :, 3:] = (img[:, :, 3:] - depth_mean) / depth_std
return img
def rand_dist(size, min=-1.0, max=1.0):
return (max - min) * torch.rand(size) + min
def rand_discrete(size, min=0, max=1):
if min == max:
return torch.zeros(size)
return torch.randint(min, max + 1, size)
def split_list(lst, n):
for i in range(0, len(lst), n):
yield lst[i : i + n]
def get_device(gpu):
if gpu is not None and gpu >= 0 and torch.cuda.is_available():
device = torch.device("cuda:%d" % gpu)
torch.backends.cudnn.enabled = torch.backends.cudnn.benchmark = True
else:
device = torch.device("cpu")
return device