xfu314's picture
Add phantom project with submodules and dependencies
96da58e
"""
This module includes:
- Utility classes for modifying sim cameras
- Utility functions for performing common camera operations such as retrieving
camera matrices and transforming from world to camera frame or vice-versa.
"""
import json
import xml.etree.ElementTree as ET
import h5py
import numpy as np
import robosuite
import robosuite.utils.transform_utils as T
from robosuite.wrappers import DomainRandomizationWrapper, VisualizationWrapper
def get_camera_intrinsic_matrix(sim, camera_name, camera_height, camera_width):
"""
Obtains camera intrinsic matrix.
Args:
sim (MjSim): simulator instance
camera_name (str): name of camera
camera_height (int): height of camera images in pixels
camera_width (int): width of camera images in pixels
Return:
K (np.array): 3x3 camera matrix
"""
cam_id = sim.model.camera_name2id(camera_name)
fovy = sim.model.cam_fovy[cam_id]
f = 0.5 * camera_height / np.tan(fovy * np.pi / 360)
K = np.array([[f, 0, camera_width / 2], [0, f, camera_height / 2], [0, 0, 1]])
return K
def get_camera_extrinsic_matrix(sim, camera_name):
"""
Returns a 4x4 homogenous matrix corresponding to the camera pose in the
world frame. MuJoCo has a weird convention for how it sets up the
camera body axis, so we also apply a correction so that the x and y
axis are along the camera view and the z axis points along the
viewpoint.
Normal camera convention: https://docs.opencv.org/2.4/modules/calib3d/doc/camera_calibration_and_3d_reconstruction.html
Args:
sim (MjSim): simulator instance
camera_name (str): name of camera
Return:
R (np.array): 4x4 camera extrinsic matrix
"""
cam_id = sim.model.camera_name2id(camera_name)
camera_pos = sim.data.cam_xpos[cam_id]
camera_rot = sim.data.cam_xmat[cam_id].reshape(3, 3)
R = T.make_pose(camera_pos, camera_rot)
# IMPORTANT! This is a correction so that the camera axis is set up along the viewpoint correctly.
camera_axis_correction = np.array(
[[1.0, 0.0, 0.0, 0.0], [0.0, -1.0, 0.0, 0.0], [0.0, 0.0, -1.0, 0.0], [0.0, 0.0, 0.0, 1.0]]
)
R = R @ camera_axis_correction
return R
def get_camera_transform_matrix(sim, camera_name, camera_height, camera_width):
"""
Camera transform matrix to project from world coordinates to pixel coordinates.
Args:
sim (MjSim): simulator instance
camera_name (str): name of camera
camera_height (int): height of camera images in pixels
camera_width (int): width of camera images in pixels
Return:
K (np.array): 4x4 camera matrix to project from world coordinates to pixel coordinates
"""
R = get_camera_extrinsic_matrix(sim=sim, camera_name=camera_name)
K = get_camera_intrinsic_matrix(
sim=sim, camera_name=camera_name, camera_height=camera_height, camera_width=camera_width
)
K_exp = np.eye(4)
K_exp[:3, :3] = K
# Takes a point in world, transforms to camera frame, and then projects onto image plane.
return K_exp @ T.pose_inv(R)
def get_camera_segmentation(sim, camera_name, camera_height, camera_width):
"""
Obtains camera segmentation matrix.
Args:
sim (MjSim): simulator instance
camera_name (str): name of camera
camera_height (int): height of camera images in pixels
camera_width (int): width of camera images in pixels
Return:
im (np.array): 2-channel segmented image where the first contains the
geom types and the second contains the geom IDs
"""
return sim.render(camera_name=camera_name, height=camera_height, width=camera_width, segmentation=True)[::-1]
def get_real_depth_map(sim, depth_map):
"""
By default, MuJoCo will return a depth map that is normalized in [0, 1]. This
helper function converts the map so that the entries correspond to actual distances.
(see https://github.com/deepmind/dm_control/blob/master/dm_control/mujoco/engine.py#L742)
Args:
sim (MjSim): simulator instance
depth_map (np.array): depth map with values normalized in [0, 1] (default depth map
returned by MuJoCo)
Return:
depth_map (np.array): depth map that corresponds to actual distances
"""
# Make sure that depth values are normalized
assert np.all(depth_map >= 0.0) and np.all(depth_map <= 1.0)
extent = sim.model.stat.extent
far = sim.model.vis.map.zfar * extent
near = sim.model.vis.map.znear * extent
return near / (1.0 - depth_map * (1.0 - near / far))
def project_points_from_world_to_camera(points, world_to_camera_transform, camera_height, camera_width):
"""
Helper function to project a batch of points in the world frame
into camera pixels using the world to camera transformation.
Args:
points (np.array): 3D points in world frame to project onto camera pixel locations. Should
be shape [..., 3].
world_to_camera_transform (np.array): 4x4 Tensor to go from robot coordinates to pixel
coordinates.
camera_height (int): height of the camera image
camera_width (int): width of the camera image
Return:
pixels (np.array): projected pixel indices of shape [..., 2]
"""
assert points.shape[-1] == 3 # last dimension must be 3D
assert len(world_to_camera_transform.shape) == 2
assert world_to_camera_transform.shape[0] == 4 and world_to_camera_transform.shape[1] == 4
# convert points to homogenous coordinates -> (px, py, pz, 1)
ones_pad = np.ones(points.shape[:-1] + (1,))
points = np.concatenate((points, ones_pad), axis=-1) # shape [..., 4]
# batch matrix multiplication of 4 x 4 matrix and 4 x 1 vectors to do robot frame to pixels transform
mat_reshape = [1] * len(points.shape[:-1]) + [4, 4]
cam_trans = world_to_camera_transform.reshape(mat_reshape) # shape [..., 4, 4]
pixels = np.matmul(cam_trans, points[..., None])[..., 0] # shape [..., 4]
# re-scaling from homogenous coordinates to recover pixel values
# (x, y, z) -> (x / z, y / z)
pixels = pixels / pixels[..., 2:3]
pixels = pixels[..., :2].round().astype(int) # shape [..., 2]
# swap first and second coordinates to get pixel indices that correspond to (height, width)
# and also clip pixels that are out of range of the camera image
pixels = np.concatenate(
(
pixels[..., 1:2].clip(0, camera_height - 1),
pixels[..., 0:1].clip(0, camera_width - 1),
),
axis=-1,
)
return pixels
def transform_from_pixels_to_world(pixels, depth_map, camera_to_world_transform):
"""
Helper function to take a batch of pixel locations and the corresponding depth image
and transform these points from the camera frame to the world frame.
Args:
pixels (np.array): pixel coordinates of shape [..., 2]
depth_map (np.array): depth images of shape [..., H, W, 1]
camera_to_world_transform (np.array): 4x4 Tensor to go from pixel coordinates to world
coordinates.
Return:
points (np.array): 3D points in robot frame of shape [..., 3]
"""
# make sure leading dimensions are consistent
pixels_leading_shape = pixels.shape[:-1]
depth_map_leading_shape = depth_map.shape[:-3]
assert depth_map_leading_shape == pixels_leading_shape
# sample from the depth map using the pixel locations with bilinear sampling
pixels = pixels.astype(float)
im_h, im_w = depth_map.shape[-2:]
depth_map_reshaped = depth_map.reshape(-1, im_h, im_w, 1)
z = bilinear_interpolate(im=depth_map_reshaped, x=pixels[..., 1:2], y=pixels[..., 0:1])
z = z.reshape(*depth_map_leading_shape, 1) # shape [..., 1]
# form 4D homogenous camera vector to transform - [x * z, y * z, z, 1]
# (note that we need to swap the first 2 dimensions of pixels to go from pixel indices
# to camera coordinates)
cam_pts = [pixels[..., 1:2] * z, pixels[..., 0:1] * z, z, np.ones_like(z)]
cam_pts = np.concatenate(cam_pts, axis=-1) # shape [..., 4]
# batch matrix multiplication of 4 x 4 matrix and 4 x 1 vectors to do camera to robot frame transform
mat_reshape = [1] * len(cam_pts.shape[:-1]) + [4, 4]
cam_trans = camera_to_world_transform.reshape(mat_reshape) # shape [..., 4, 4]
points = np.matmul(cam_trans, cam_pts[..., None])[..., 0] # shape [..., 4]
return points[..., :3]
def bilinear_interpolate(im, x, y):
"""
Bilinear sampling for pixel coordinates x and y from source image im.
Taken from https://stackoverflow.com/questions/12729228/simple-efficient-bilinear-interpolation-of-images-in-numpy-and-python
"""
x = np.asarray(x)
y = np.asarray(y)
x0 = np.floor(x).astype(int)
x1 = x0 + 1
y0 = np.floor(y).astype(int)
y1 = y0 + 1
x0 = np.clip(x0, 0, im.shape[1] - 1)
x1 = np.clip(x1, 0, im.shape[1] - 1)
y0 = np.clip(y0, 0, im.shape[0] - 1)
y1 = np.clip(y1, 0, im.shape[0] - 1)
Ia = im[y0, x0]
Ib = im[y1, x0]
Ic = im[y0, x1]
Id = im[y1, x1]
wa = (x1 - x) * (y1 - y)
wb = (x1 - x) * (y - y0)
wc = (x - x0) * (y1 - y)
wd = (x - x0) * (y - y0)
return wa * Ia + wb * Ib + wc * Ic + wd * Id
class CameraMover:
"""
A class for manipulating a camera.
WARNING: This class will initially RE-INITIALIZE the environment.
Args:
env (MujocoEnv): Mujoco environment to modify camera
camera (str): Which camera to mobilize during playback, e.g.: frontview, agentview, etc.
init_camera_pos (None or 3-array): If specified, should be the (x,y,z) global cartesian pos to
initialize camera to
init_camera_quat (None or 4-array): If specified, should be the (x,y,z,w) global quaternion orientation to
initialize camera to
"""
def __init__(
self,
env,
camera="frontview",
init_camera_pos=None,
init_camera_quat=None,
):
# Store relevant values and initialize other values
self.env = env
self.camera = camera
self.mover_body_name = f"{self.camera}_cameramover"
# Get state
state = self.env.sim.get_state().flatten()
# Grab environment xml
xml = env.sim.model.get_xml()
# Modify xml to add mocap to move camera around
xml = self.modify_xml_for_camera_movement(xml=xml, camera_name=self.camera)
# Reset the environment and restore the state
self.env.reset_from_xml_string(xml)
self.env.sim.reset()
self.env.sim.set_state_from_flattened(state)
self.env.sim.forward()
# Set initial camera pose
self.set_camera_pose(pos=init_camera_pos, quat=init_camera_quat)
def set_camera_pose(self, pos=None, quat=None):
"""
Sets the camera pose, which optionally includes position and / or quaternion
Args:
pos (None or 3-array): If specified, should be the (x,y,z) global cartesian pos to set camera to
quat (None or 4-array): If specified, should be the (x,y,z,w) global quaternion orientation to set camera to
"""
if pos is not None:
self.env.sim.data.set_mocap_pos(self.mover_body_name, pos)
if quat is not None:
self.env.sim.data.set_mocap_quat(self.mover_body_name, T.convert_quat(quat, to="wxyz"))
# Make sure changes propagate in sim
self.env.sim.forward()
def get_camera_pose(self):
"""
Grab the current camera pose, which optionally includes position and / or quaternion
Returns:
2-tuple:
- 3-array: (x,y,z) camera global cartesian pos
- 4-array: (x,y,z,w) camera global quaternion orientation
"""
# Grab values from sim
pos = self.env.sim.data.get_mocap_pos(self.mover_body_name)
quat = T.convert_quat(self.env.sim.data.get_mocap_quat(self.mover_body_name), to="xyzw")
return pos, quat
def modify_xml_for_camera_movement(self, xml, camera_name):
"""
Cameras in mujoco are 'fixed', so they can't be moved by default.
Although it's possible to hack position movement, rotation movement
does not work. An alternative is to attach a camera to a mocap body,
and move the mocap body.
This function modifies the camera with name @camera_name in the xml
by attaching it to a mocap body that can move around freely. In this
way, we can move the camera by moving the mocap body.
See http://www.mujoco.org/forum/index.php?threads/move-camera.2201/ for
further details.
Args:
xml (str): Mujoco sim XML file as a string
camera_name (str): Name of camera to tune
"""
tree = ET.fromstring(xml)
wb = tree.find("worldbody")
# find the correct camera
camera_elem = None
cameras = wb.findall("camera")
for camera in cameras:
if camera.get("name") == camera_name:
camera_elem = camera
break
assert camera_elem is not None
# add mocap body
mocap = ET.SubElement(wb, "body")
mocap.set("name", self.mover_body_name)
mocap.set("mocap", "true")
mocap.set("pos", camera.get("pos"))
mocap.set("quat", camera.get("quat"))
new_camera = ET.SubElement(mocap, "camera")
new_camera.set("mode", "fixed")
new_camera.set("name", camera.get("name"))
new_camera.set("pos", "0 0 0")
# remove old camera element
wb.remove(camera_elem)
return ET.tostring(tree, encoding="utf8").decode("utf8")
def rotate_camera(self, point, axis, angle):
"""
Rotate the camera view about a direction (in the camera frame).
Args:
point (None or 3-array): (x,y,z) cartesian coordinates about which to rotate camera in camera frame. If None,
assumes the point is the current location of the camera
axis (3-array): (ax,ay,az) axis about which to rotate camera in camera frame
angle (float): how much to rotate about that direction
Returns:
2-tuple:
pos: (x,y,z) updated camera position
quat: (x,y,z,w) updated camera quaternion orientation
"""
# current camera rotation + pos
camera_pos = np.array(self.env.sim.data.get_mocap_pos(self.mover_body_name))
camera_rot = T.quat2mat(T.convert_quat(self.env.sim.data.get_mocap_quat(self.mover_body_name), to="xyzw"))
# rotate by angle and direction to get new camera rotation
rad = np.pi * angle / 180.0
R = T.rotation_matrix(rad, axis, point=point)
camera_pose = np.zeros((4, 4))
camera_pose[:3, :3] = camera_rot
camera_pose[:3, 3] = camera_pos
camera_pose = camera_pose @ R
# Update camera pose
pos, quat = camera_pose[:3, 3], T.mat2quat(camera_pose[:3, :3])
self.set_camera_pose(pos=pos, quat=quat)
return pos, quat
def move_camera(self, direction, scale):
"""
Move the camera view along a direction (in the camera frame).
Args:
direction (3-array): direction vector for where to move camera in camera frame
scale (float): how much to move along that direction
"""
# current camera rotation + pos
camera_pos = np.array(self.env.sim.data.get_mocap_pos(self.mover_body_name))
camera_quat = self.env.sim.data.get_mocap_quat(self.mover_body_name)
camera_rot = T.quat2mat(T.convert_quat(camera_quat, to="xyzw"))
# move along camera frame axis and set new position
camera_pos += scale * camera_rot.dot(direction)
self.set_camera_pose(pos=camera_pos)
return camera_pos, camera_quat
class DemoPlaybackCameraMover(CameraMover):
"""
A class for playing back demonstrations and recording the resulting frames with the flexibility of a mobile camera
that can be set manually or panned automatically frame-by-frame
Note: domain randomization is also supported for playback!
Args:
demo (str): absolute fpath to .hdf5 demo
env_config (None or dict): (optional) values to override inferred environment information from demonstration.
(e.g.: camera h / w, depths, segmentations, etc...)
Any value not specified will be inferred from the extracted demonstration metadata
Note that there are some specific arguments that MUST be set a certain way, if any of these values
are specified with @env_config, an error will be raised
replay_from_actions (bool): If True, will replay demonstration's actions. Otherwise, replays will be hardcoded
from the demonstration states
visualize_sites (bool): If True, will visualize sites during playback. Note that this CANNOT be paired
simultaneously with camera segmentations
camera (str): Which camera to mobilize during playback, e.g.: frontview, agentview, etc.
init_camera_pos (None or 3-array): If specified, should be the (x,y,z) global cartesian pos to
initialize camera to
init_camera_quat (None or 4-array): If specified, should be the (x,y,z,w) global quaternion orientation to
initialize camera to
use_dr (bool): If True, will use domain randomization during playback
dr_args (None or dict): If specified, will set the domain randomization wrapper arguments if using dr
"""
def __init__(
self,
demo,
env_config=None,
replay_from_actions=False,
visualize_sites=False,
camera="frontview",
init_camera_pos=None,
init_camera_quat=None,
use_dr=False,
dr_args=None,
):
# Store relevant values and initialize other values
self.camera_id = None
self.replay_from_actions = replay_from_actions
self.states = None
self.actions = None
self.step = None
self.n_steps = None
self.current_ep = None
self.started = False
# Load the demo
self.f = h5py.File(demo, "r")
# Extract relevant info
env_info = json.loads(self.f["data"].attrs["env_info"])
# Construct default env arguments
default_args = {
"has_renderer": False,
"has_offscreen_renderer": True,
"ignore_done": True,
"use_camera_obs": True,
"reward_shaping": True,
"hard_reset": False,
"camera_names": camera,
}
# If custom env_config is specified, make sure that there's no overlap with default args and merge with config
if env_config is not None:
for k in env_config.keys():
assert k not in default_args, f"Key {k} cannot be specified in env_config!"
env_info.update(env_config)
# Merge in default args
env_info.update(default_args)
# Create env
env = robosuite.make(**env_info)
# Optionally wrap with visualization wrapper
if visualize_sites:
env = VisualizationWrapper(env=self.env)
# Optionally use domain randomization if specified
self.use_dr = use_dr
if self.use_dr:
default_dr_args = {
"seed": 1,
"randomize_camera": False,
"randomize_every_n_steps": 10,
}
default_dr_args.update(dr_args)
env = DomainRandomizationWrapper(
env=self.env,
**default_dr_args,
)
# list of all demonstrations episodes
self.demos = list(self.f["data"].keys())
# Run super init
super().__init__(
env=env,
camera=camera,
init_camera_pos=init_camera_pos,
init_camera_quat=init_camera_quat,
)
# Load episode 0 by default
self.load_episode_xml(demo_num=0)
def load_episode_xml(self, demo_num):
"""
Loads demo episode with specified @demo_num into the simulator.
Args:
demo_num (int): Demonstration number to load
"""
# Grab raw xml file
ep = self.demos[demo_num]
model_xml = self.f[f"data/{ep}"].attrs["model_file"]
# Reset environment
self.env.reset()
xml = self.env.edit_model_xml(model_xml)
xml = self.modify_xml_for_camera_movement(xml, camera_name=self.camera)
self.env.reset_from_xml_string(xml)
self.env.sim.reset()
# Update camera info
self.camera_id = self.env.sim.model.camera_name2id(self.camera)
# Load states and actions
self.states = self.f[f"data/{ep}/states"].value
self.actions = np.array(self.f[f"data/{ep}/actions"].value)
# Set initial state
self.env.sim.set_state_from_flattened(self.states[0])
# Reset step count and set current episode number
self.step = 0
self.n_steps = len(self.actions)
self.current_ep = demo_num
# Notify user of loaded episode
print(f"Loaded episode {demo_num}.")
def grab_next_frame(self):
"""
Grabs the next frame in the demo sequence by stepping the simulation and returning the resulting value(s)
Returns:
dict: Keyword-mapped np.arrays from the demonstration sequence, corresponding to all image modalities used
in the playback environment (e.g.: "image", "depth", "segmentation_instance")
"""
# Make sure the episode isn't completed yet, if so, we load the next episode
if self.step == self.n_steps:
self.load_episode_xml(demo_num=self.current_ep + 1)
# Step the environment and grab obs
if self.replay_from_actions:
obs, _, _, _ = self.env.step(self.actions[self.step])
else: # replay from states
self.env.sim.set_state_from_flattened(self.states[self.step + 1])
if self.use_dr:
self.env.step_randomization()
self.env.sim.forward()
obs = self.env._get_observation()
# Increment the step counter
self.step += 1
# Return all relevant frames
return {k.split(f"{self.camera}_")[-1]: obs[k] for k in obs if self.camera in k}
def grab_episode_frames(self, demo_num, pan_point=(0, 0, 0.8), pan_axis=(0, 0, 1), pan_rate=0.01):
"""
Playback entire episode @demo_num, while optionally rotating the camera about point @pan_point and
axis @pan_axis if @pan_rate > 0
Args:
demo_num (int): Demonstration episode number to load for playback
pan_point (3-array): (x,y,z) cartesian coordinates about which to rotate camera in camera frame
pan_direction (3-array): (ax,ay,az) axis about which to rotate camera in camera frame
pan_rate (float): how quickly to pan camera if not 0
Returns:
dict: Keyword-mapped stacked np.arrays from the demonstration sequence, corresponding to all image
modalities used in the playback environment (e.g.: "image", "depth", "segmentation_instance")
"""
# First, load env
self.load_episode_xml(demo_num=demo_num)
# Initialize dict to return
obs = self.env._get_observation()
frames_dict = {k.split(f"{self.camera}_")[-1]: [] for k in obs if self.camera in k}
# Continue to loop playback steps while there are still frames left in the episode
while self.step < self.n_steps:
# Take playback step and add frames
for k, frame in self.grab_next_frame().items():
frames_dict[k].append(frame)
# Update camera pose
self.rotate_camera(point=pan_point, axis=pan_axis, angle=pan_rate)
# Stack all frames and return
return {k: np.stack(frames) for k, frames in frames_dict.items()}