lsnu's picture
Add files using upload-large-folder tool
150d02a verified
raw
history blame
48.2 kB
import argparse
import json
import math
import os
import pickle
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
from natsort import natsorted
from PIL import Image
from pyrep.const import ConfigurationPathAlgorithms as Algos
from pyrep.errors import ConfigurationPathError
from pyrep.objects.joint import Joint
from pyrep.objects.shape import Shape
from rlbench.action_modes.action_mode import BimanualJointPositionActionMode
from rlbench.action_modes.gripper_action_modes import BimanualDiscrete
from rlbench.backend.const import DEPTH_SCALE
from rlbench.backend.utils import image_to_float_array, rgb_handles_to_mask
from rlbench.bimanual_tasks.bimanual_take_tray_out_of_oven import (
BimanualTakeTrayOutOfOven,
)
from rlbench.demo import Demo
from rlbench.environment import Environment
from rlbench.observation_config import CameraConfig, ObservationConfig
from sklearn.metrics import (
average_precision_score,
f1_score,
roc_auc_score,
)
FULL_CAMERA_SET = [
"front",
"overhead",
"wrist_right",
"wrist_left",
"over_shoulder_left",
"over_shoulder_right",
]
THREE_VIEW_SET = ["front", "wrist_right", "wrist_left"]
DISPLAY = ":99"
DEMO_DT = 0.05
DEFAULT_IMAGE_SIZE = (128, 128)
DEFAULT_PATH_SCALE = 0.75
DEFAULT_VISIBILITY_TAU = 0.35
DEFAULT_PEXT_TAU = 0.45
DEFAULT_DOOR_SPEED_TAU = 0.08
DEFAULT_PLAN_TRIALS = 48
DEFAULT_PLAN_MAX_CONFIGS = 4
DEFAULT_PLAN_MAX_TIME_MS = 10
DEFAULT_PLAN_TRIALS_PER_GOAL = 4
@dataclass
class SimulatorSnapshot:
task_state: Tuple[bytes, int]
right_arm_tree: bytes
right_gripper_tree: bytes
left_arm_tree: bytes
left_gripper_tree: bytes
right_grasped: Tuple[str, ...]
left_grasped: Tuple[str, ...]
@dataclass
class ReplayState:
frame_index: int
tray_pose: np.ndarray
door_angle: float
right_gripper_pose: np.ndarray
left_gripper_pose: np.ndarray
right_gripper_open: float
left_gripper_open: float
snapshot: Optional[SimulatorSnapshot] = None
@dataclass
class MotionTemplates:
pregrasp_rel_pose: np.ndarray
grasp_rel_pose: np.ndarray
retreat_rel_poses: List[np.ndarray]
grasp_local_center: np.ndarray
grasp_region_extents: np.ndarray
hold_open_angle: float
open_more_delta: float
reference_tray_height: float
def to_json(self) -> Dict[str, object]:
return {
"pregrasp_rel_pose": self.pregrasp_rel_pose.tolist(),
"grasp_rel_pose": self.grasp_rel_pose.tolist(),
"retreat_rel_poses": [pose.tolist() for pose in self.retreat_rel_poses],
"grasp_local_center": self.grasp_local_center.tolist(),
"grasp_region_extents": self.grasp_region_extents.tolist(),
"hold_open_angle": float(self.hold_open_angle),
"open_more_delta": float(self.open_more_delta),
"reference_tray_height": float(self.reference_tray_height),
}
@dataclass
class EpisodeArtifacts:
episode_name: str
dense: pd.DataFrame
keyframes: pd.DataFrame
metrics: Dict[str, object]
template_frames: Dict[str, int]
def _configure_runtime() -> None:
os.environ["DISPLAY"] = DISPLAY
os.environ["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim"
ld_library_path = os.environ.get("LD_LIBRARY_PATH", "")
if "/workspace/coppelia_sim" not in ld_library_path:
os.environ["LD_LIBRARY_PATH"] = (
f"{ld_library_path}:/workspace/coppelia_sim"
if ld_library_path
else "/workspace/coppelia_sim"
)
os.environ["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim"
os.environ.setdefault("XDG_RUNTIME_DIR", "/tmp/runtime-root")
def _minimal_camera_config() -> Dict[str, CameraConfig]:
return {
"front": CameraConfig(
rgb=False,
depth=False,
point_cloud=False,
mask=False,
image_size=DEFAULT_IMAGE_SIZE,
)
}
def _make_observation_config() -> ObservationConfig:
return ObservationConfig(
camera_configs=_minimal_camera_config(),
joint_velocities=True,
joint_positions=True,
joint_forces=False,
gripper_open=True,
gripper_pose=True,
gripper_matrix=False,
gripper_joint_positions=True,
gripper_touch_forces=False,
task_low_dim_state=False,
robot_name="bimanual",
)
def _launch_replay_env() -> Environment:
_configure_runtime()
env = Environment(
action_mode=BimanualJointPositionActionMode(),
obs_config=_make_observation_config(),
headless=True,
robot_setup="dual_panda",
)
env.launch()
return env
def _load_demo(episode_dir: Path) -> Demo:
with episode_dir.joinpath("low_dim_obs.pkl").open("rb") as handle:
return pickle.load(handle)
def _load_descriptions(episode_dir: Path) -> List[str]:
with episode_dir.joinpath("variation_descriptions.pkl").open("rb") as handle:
return pickle.load(handle)
def _episode_dirs(dataset_root: Path) -> List[Path]:
episodes_dir = dataset_root.joinpath("all_variations", "episodes")
return [
episodes_dir.joinpath(name)
for name in natsorted(os.listdir(episodes_dir))
if episodes_dir.joinpath(name, "low_dim_obs.pkl").exists()
]
def _camera_file(episode_dir: Path, camera_name: str, kind: str, frame_index: int) -> Path:
if kind == "rgb":
return episode_dir.joinpath(f"{camera_name}_rgb", f"rgb_{frame_index:04d}.png")
if kind == "depth":
return episode_dir.joinpath(
f"{camera_name}_depth", f"depth_{frame_index:04d}.png"
)
if kind == "mask":
return episode_dir.joinpath(f"{camera_name}_mask", f"mask_{frame_index:04d}.png")
raise ValueError(f"unknown kind: {kind}")
def _load_depth_meters(episode_dir: Path, demo: Demo, frame_index: int, camera_name: str) -> np.ndarray:
image = Image.open(_camera_file(episode_dir, camera_name, "depth", frame_index))
depth = image_to_float_array(image, DEPTH_SCALE)
near = demo[frame_index].misc[f"{camera_name}_camera_near"]
far = demo[frame_index].misc[f"{camera_name}_camera_far"]
return near + depth * (far - near)
def _load_mask(episode_dir: Path, frame_index: int, camera_name: str) -> np.ndarray:
image = np.array(Image.open(_camera_file(episode_dir, camera_name, "mask", frame_index)))
return rgb_handles_to_mask(image)
def _capture_snapshot(task) -> SimulatorSnapshot:
robot = task._scene.robot
return SimulatorSnapshot(
task_state=task._task.get_state(),
right_arm_tree=robot.right_arm.get_configuration_tree(),
right_gripper_tree=robot.right_gripper.get_configuration_tree(),
left_arm_tree=robot.left_arm.get_configuration_tree(),
left_gripper_tree=robot.left_gripper.get_configuration_tree(),
right_grasped=tuple(obj.get_name() for obj in robot.right_gripper.get_grasped_objects()),
left_grasped=tuple(obj.get_name() for obj in robot.left_gripper.get_grasped_objects()),
)
def _restore_snapshot(task, snapshot: SimulatorSnapshot) -> None:
robot = task._scene.robot
robot.release_gripper()
task._pyrep.set_configuration_tree(snapshot.right_arm_tree)
task._pyrep.set_configuration_tree(snapshot.right_gripper_tree)
task._pyrep.set_configuration_tree(snapshot.left_arm_tree)
task._pyrep.set_configuration_tree(snapshot.left_gripper_tree)
task._pyrep.set_configuration_tree(snapshot.task_state[0])
for name in snapshot.right_grasped:
robot.right_gripper.grasp(Shape(name))
for name in snapshot.left_grasped:
robot.left_gripper.grasp(Shape(name))
task._pyrep.step()
def _build_joint_action(target_obs) -> np.ndarray:
def _joint_vector(value, fallback) -> np.ndarray:
array = np.asarray(fallback if value is None else value, dtype=np.float64)
if array.ndim != 1 or array.shape[0] != 7:
array = np.asarray(fallback, dtype=np.float64)
if array.ndim != 1 or array.shape[0] != 7:
raise ValueError(f"invalid joint vector shape: {array.shape}")
return array
right = _joint_vector(
target_obs.misc.get("right_executed_demo_joint_position_action"),
target_obs.right.joint_positions,
)
left = _joint_vector(
target_obs.misc.get("left_executed_demo_joint_position_action"),
target_obs.left.joint_positions,
)
return np.concatenate(
[
right,
np.array([target_obs.right.gripper_open], dtype=np.float64),
left,
np.array([target_obs.left.gripper_open], dtype=np.float64),
]
)
class ReplayCache:
def __init__(self, task, demo: Demo, checkpoint_stride: int = 16):
self.task = task
self.demo = demo
self.checkpoint_stride = checkpoint_stride
self.current_index = 0
self.current_obs = None
self.checkpoints: Dict[int, SimulatorSnapshot] = {}
self.discrete_gripper = BimanualDiscrete()
def reset(self) -> None:
_, self.current_obs = self.task.reset_to_demo(self.demo)
self.current_index = 0
self.checkpoints = {0: _capture_snapshot(self.task)}
def step_to(self, target_index: int):
if target_index < self.current_index:
checkpoint_index = max(i for i in self.checkpoints if i <= target_index)
_restore_snapshot(self.task, self.checkpoints[checkpoint_index])
self.current_index = checkpoint_index
self.current_obs = self._observation_from_scene()
while self.current_index < target_index:
next_index = self.current_index + 1
self.task._action_mode.action(
self.task._scene, _build_joint_action(self.demo[next_index])
)
self._apply_gripper_replay(self.demo[next_index])
self.current_obs = self._observation_from_scene()
self.current_index = next_index
if self.current_index % self.checkpoint_stride == 0:
self.checkpoints[self.current_index] = _capture_snapshot(self.task)
return self.current_obs
def current_state(self) -> ReplayState:
return ReplayState(
frame_index=self.current_index,
tray_pose=Shape("tray").get_pose(),
door_angle=Joint("oven_door_joint").get_joint_position(),
right_gripper_pose=self.current_obs.right.gripper_pose.copy(),
left_gripper_pose=self.current_obs.left.gripper_pose.copy(),
right_gripper_open=float(self.current_obs.right.gripper_open),
left_gripper_open=float(self.current_obs.left.gripper_open),
snapshot=None,
)
def snapshot(self) -> SimulatorSnapshot:
return _capture_snapshot(self.task)
def restore(self, snapshot: SimulatorSnapshot) -> None:
_restore_snapshot(self.task, snapshot)
self.current_obs = self._observation_from_scene()
def restore_to_index(self, snapshot: SimulatorSnapshot, frame_index: int) -> None:
self.restore(snapshot)
self.current_index = frame_index
def _observation_from_scene(self):
return self.task._scene.get_observation()
def _apply_gripper_replay(self, target_obs) -> None:
desired = np.array(
[target_obs.right.gripper_open, target_obs.left.gripper_open],
dtype=np.float64,
)
current = np.array(
[
float(all(x > 0.9 for x in self.task._scene.robot.right_gripper.get_open_amount())),
float(all(x > 0.9 for x in self.task._scene.robot.left_gripper.get_open_amount())),
],
dtype=np.float64,
)
if not np.allclose(current, desired):
self.discrete_gripper.action(self.task._scene, desired)
self.current_obs = self._observation_from_scene()
def _quat_to_matrix(quat: Sequence[float]) -> np.ndarray:
x, y, z, w = quat
xx, yy, zz = x * x, y * y, z * z
xy, xz, yz = x * y, x * z, y * z
wx, wy, wz = w * x, w * y, w * z
return np.array(
[
[1 - 2 * (yy + zz), 2 * (xy - wz), 2 * (xz + wy)],
[2 * (xy + wz), 1 - 2 * (xx + zz), 2 * (yz - wx)],
[2 * (xz - wy), 2 * (yz + wx), 1 - 2 * (xx + yy)],
],
dtype=np.float64,
)
def _matrix_to_quat(matrix: np.ndarray) -> np.ndarray:
trace = np.trace(matrix)
if trace > 0.0:
s = math.sqrt(trace + 1.0) * 2.0
w = 0.25 * s
x = (matrix[2, 1] - matrix[1, 2]) / s
y = (matrix[0, 2] - matrix[2, 0]) / s
z = (matrix[1, 0] - matrix[0, 1]) / s
elif matrix[0, 0] > matrix[1, 1] and matrix[0, 0] > matrix[2, 2]:
s = math.sqrt(1.0 + matrix[0, 0] - matrix[1, 1] - matrix[2, 2]) * 2.0
w = (matrix[2, 1] - matrix[1, 2]) / s
x = 0.25 * s
y = (matrix[0, 1] + matrix[1, 0]) / s
z = (matrix[0, 2] + matrix[2, 0]) / s
elif matrix[1, 1] > matrix[2, 2]:
s = math.sqrt(1.0 + matrix[1, 1] - matrix[0, 0] - matrix[2, 2]) * 2.0
w = (matrix[0, 2] - matrix[2, 0]) / s
x = (matrix[0, 1] + matrix[1, 0]) / s
y = 0.25 * s
z = (matrix[1, 2] + matrix[2, 1]) / s
else:
s = math.sqrt(1.0 + matrix[2, 2] - matrix[0, 0] - matrix[1, 1]) * 2.0
w = (matrix[1, 0] - matrix[0, 1]) / s
x = (matrix[0, 2] + matrix[2, 0]) / s
y = (matrix[1, 2] + matrix[2, 1]) / s
z = 0.25 * s
quat = np.array([x, y, z, w], dtype=np.float64)
return quat / np.linalg.norm(quat)
def _pose_to_matrix(pose: Sequence[float]) -> np.ndarray:
matrix = np.eye(4, dtype=np.float64)
matrix[:3, :3] = _quat_to_matrix(pose[3:])
matrix[:3, 3] = pose[:3]
return matrix
def _matrix_to_pose(matrix: np.ndarray) -> np.ndarray:
return np.concatenate([matrix[:3, 3], _matrix_to_quat(matrix[:3, :3])], axis=0)
def _relative_pose(reference_pose: Sequence[float], target_pose: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
target = _pose_to_matrix(target_pose)
rel = np.linalg.inv(reference) @ target
return _matrix_to_pose(rel)
def _apply_relative_pose(reference_pose: Sequence[float], relative_pose: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
relative = _pose_to_matrix(relative_pose)
world = reference @ relative
return _matrix_to_pose(world)
def _world_to_local(reference_pose: Sequence[float], point_world: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
point = np.concatenate([np.asarray(point_world, dtype=np.float64), [1.0]])
return (np.linalg.inv(reference) @ point)[:3]
def _local_to_world(reference_pose: Sequence[float], point_local: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
point = np.concatenate([np.asarray(point_local, dtype=np.float64), [1.0]])
return (reference @ point)[:3]
def _first_transition(demo: Demo, side: str, open_to_closed: bool) -> int:
values = [getattr(demo[i], side).gripper_open for i in range(len(demo))]
for i in range(1, len(values)):
if open_to_closed and values[i - 1] > 0.5 and values[i] < 0.5:
return i
if not open_to_closed and values[i - 1] < 0.5 and values[i] > 0.5:
return i
raise RuntimeError(f"no gripper transition found for {side}")
def _derive_templates(dataset_root: Path, template_episode_dir: Path) -> Tuple[MotionTemplates, Dict[str, int]]:
env = _launch_replay_env()
try:
demo = _load_demo(template_episode_dir)
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=8)
cache.reset()
base_pose = task._task.get_base().get_pose()
left_close = _first_transition(demo, "left", open_to_closed=True)
left_open = _first_transition(demo, "left", open_to_closed=False)
pregrasp_index = max(0, left_close - 5)
right_close = _first_transition(demo, "right", open_to_closed=True)
right_open = _first_transition(demo, "right", open_to_closed=False)
interesting = sorted(
{
pregrasp_index,
left_close,
min(len(demo) - 1, left_close + 10),
min(len(demo) - 1, left_close + 20),
min(len(demo) - 1, left_close + 30),
max(left_close + 1, left_open - 20),
max(left_close + 1, left_open - 10),
max(left_close + 1, left_open - 5),
right_close,
right_open,
left_open,
}
)
states: Dict[int, ReplayState] = {}
for frame_index in interesting:
cache.step_to(frame_index)
states[frame_index] = cache.current_state()
pregrasp_rel_pose = _relative_pose(
states[pregrasp_index].tray_pose, states[pregrasp_index].left_gripper_pose
)
grasp_rel_pose = _relative_pose(
states[left_close].tray_pose, states[left_close].left_gripper_pose
)
retreat_rel_poses = [
_relative_pose(base_pose, states[index].left_gripper_pose)
for index in interesting
if index > left_close
]
grasp_local_center = _world_to_local(
states[left_close].tray_pose, states[left_close].left_gripper_pose[:3]
)
templates = MotionTemplates(
pregrasp_rel_pose=pregrasp_rel_pose,
grasp_rel_pose=grasp_rel_pose,
retreat_rel_poses=retreat_rel_poses,
grasp_local_center=grasp_local_center,
grasp_region_extents=np.array([0.03, 0.015, 0.004], dtype=np.float64),
hold_open_angle=float(states[right_open].door_angle),
open_more_delta=max(
0.12,
float(states[right_open].door_angle - states[right_close].door_angle) * 0.25,
),
reference_tray_height=float(states[left_close].tray_pose[2]),
)
template_frames = {
"pregrasp": pregrasp_index,
"grasp": left_close,
"right_close": right_close,
"right_open": right_open,
}
return templates, template_frames
finally:
env.shutdown()
def _camera_projection(extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
camera_pos = extrinsics[:3, 3:4]
rotation = extrinsics[:3, :3]
world_to_camera = np.concatenate([rotation.T, -(rotation.T @ camera_pos)], axis=1)
projection = intrinsics @ world_to_camera
return projection, world_to_camera
def _project_points(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
projection, world_to_camera = _camera_projection(extrinsics, intrinsics)
homogeneous = np.concatenate([points_world, np.ones((len(points_world), 1))], axis=1)
camera_xyz = (world_to_camera @ homogeneous.T).T
image_xyz = (projection @ homogeneous.T).T
uv = image_xyz[:, :2] / image_xyz[:, 2:3]
return uv, camera_xyz
def _sample_grasp_points(templates: MotionTemplates, tray_pose: np.ndarray) -> np.ndarray:
center = templates.grasp_local_center
extents = templates.grasp_region_extents
xs = np.linspace(center[0] - extents[0], center[0] + extents[0], 9)
ys = np.linspace(center[1] - extents[1], center[1] + extents[1], 5)
zs = np.linspace(center[2] - extents[2], center[2] + extents[2], 3)
points_local = np.array([[x, y, z] for x in xs for y in ys for z in zs], dtype=np.float64)
return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64)
def _sample_full_tray_points(tray_pose: np.ndarray) -> np.ndarray:
tray = Shape("tray")
bbox = np.asarray(tray.get_bounding_box(), dtype=np.float64)
xs = np.linspace(bbox[0], bbox[1], 10)
ys = np.linspace(bbox[2], bbox[3], 12)
zs = np.linspace(bbox[4], bbox[5], 3)
points_local = np.array([[x, y, z] for x in xs for y in ys for z in zs], dtype=np.float64)
return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64)
def _visibility_ratio(
points_world: np.ndarray,
depth_m: np.ndarray,
mask: np.ndarray,
tray_handle: int,
extrinsics: np.ndarray,
intrinsics: np.ndarray,
depth_tol: float = 0.02,
) -> float:
uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics)
height, width = depth_m.shape
visible = 0
total = 0
for (u, v), (_, _, camera_depth) in zip(uv, camera_xyz):
if camera_depth <= 0:
continue
if not (0 <= u < width and 0 <= v < height):
continue
total += 1
px = int(round(u))
py = int(round(v))
px = min(max(px, 0), width - 1)
py = min(max(py, 0), height - 1)
observed = float(depth_m[py, px])
if abs(observed - camera_depth) <= depth_tol:
visible += 1
return float(visible / total) if total else 0.0
def _union_visibility(values: Iterable[float]) -> float:
product = 1.0
for value in values:
product *= 1.0 - float(value)
return 1.0 - product
def _keypoint_discovery(demo: Demo, stopping_delta: float = 0.1) -> List[int]:
keypoints: List[int] = []
right_prev = demo[0].right.gripper_open
left_prev = demo[0].left.gripper_open
stopped_buffer = 0
for i, obs in enumerate(demo._observations):
if i < 2 or i >= len(demo) - 1:
right_stopped = left_stopped = False
else:
right_stopped = (
np.allclose(obs.right.joint_velocities, 0, atol=stopping_delta)
and obs.right.gripper_open == demo[i + 1].right.gripper_open
and obs.right.gripper_open == demo[i - 1].right.gripper_open
and demo[i - 2].right.gripper_open == demo[i - 1].right.gripper_open
)
left_stopped = (
np.allclose(obs.left.joint_velocities, 0, atol=stopping_delta)
and obs.left.gripper_open == demo[i + 1].left.gripper_open
and obs.left.gripper_open == demo[i - 1].left.gripper_open
and demo[i - 2].left.gripper_open == demo[i - 1].left.gripper_open
)
stopped = stopped_buffer <= 0 and right_stopped and left_stopped
stopped_buffer = 4 if stopped else stopped_buffer - 1
last = i == len(demo) - 1
state_changed = (
obs.right.gripper_open != right_prev or obs.left.gripper_open != left_prev
)
if i != 0 and (state_changed or last or stopped):
keypoints.append(i)
right_prev = obs.right.gripper_open
left_prev = obs.left.gripper_open
if len(keypoints) > 1 and (keypoints[-1] - 1) == keypoints[-2]:
keypoints.pop(-2)
return keypoints
def _plan_path(scene, arm_name: str, pose: np.ndarray, ignore_collisions: bool = False):
arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm
try:
return arm.get_path(
pose[:3],
quaternion=pose[3:],
ignore_collisions=ignore_collisions,
trials=DEFAULT_PLAN_TRIALS,
max_configs=DEFAULT_PLAN_MAX_CONFIGS,
max_time_ms=DEFAULT_PLAN_MAX_TIME_MS,
trials_per_goal=DEFAULT_PLAN_TRIALS_PER_GOAL,
algorithm=Algos.RRTConnect,
)
except Exception:
return None
def _path_length(path) -> float:
if path is None:
return math.inf
try:
return float(path._get_path_point_lengths()[-1])
except Exception:
return math.inf
def _pregrasp_candidates(tray_pose: np.ndarray, templates: MotionTemplates) -> List[np.ndarray]:
candidates = []
base = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose)
candidates.append(base)
for dx in (-0.02, 0.02):
perturbed = base.copy()
perturbed[0] += dx
candidates.append(perturbed)
return candidates
def _extract_sequence_poses(
tray_pose: np.ndarray, task_base_pose: np.ndarray, templates: MotionTemplates
) -> List[np.ndarray]:
poses = [
_apply_relative_pose(tray_pose, templates.pregrasp_rel_pose),
_apply_relative_pose(tray_pose, templates.grasp_rel_pose),
]
poses.extend(
_apply_relative_pose(task_base_pose, pose) for pose in templates.retreat_rel_poses[:3]
)
return poses
def _extract_height_threshold(templates: MotionTemplates) -> float:
return templates.reference_tray_height + 0.06
def _extraction_progress_score(current_height: float, templates: MotionTemplates) -> float:
threshold = _extract_height_threshold(templates)
margin = max(0.0, float(current_height) - threshold)
# Saturate smoothly once the tray is clearly lifted above the oven lip.
return min(1.0, 0.8 + margin / 0.12)
def _pregrasp_score_and_success(task, templates: MotionTemplates) -> Tuple[float, bool]:
tray = Shape("tray")
if any(
grasped.get_name() == tray.get_name()
for grasped in task._scene.robot.left_gripper.get_grasped_objects()
):
return 1.0, True
tray_pose = Shape("tray").get_pose()
best = 0.0
success = False
for pose in _pregrasp_candidates(tray_pose, templates):
path = _plan_path(task._scene, "left", pose, ignore_collisions=False)
length = _path_length(path)
if np.isfinite(length):
success = True
best = max(best, math.exp(-length / DEFAULT_PATH_SCALE))
return best, success
def _extract_score_and_success(task, templates: MotionTemplates) -> Tuple[float, bool]:
tray = Shape("tray")
robot = task._scene.robot
snapshot = _capture_snapshot(task)
try:
total_length = 0.0
current_height = float(tray.get_position()[2])
already_grasped = any(
grasped.get_name() == tray.get_name()
for grasped in robot.left_gripper.get_grasped_objects()
)
if already_grasped and current_height >= _extract_height_threshold(templates):
return _extraction_progress_score(current_height, templates), True
poses = _extract_sequence_poses(
tray.get_pose(), task._task.get_base().get_pose(), templates
)
approach_poses = [] if already_grasped else poses[:2]
retreat_poses = poses[2:] if not already_grasped else poses[2:]
for pose in approach_poses:
path = _plan_path(task._scene, "left", pose, ignore_collisions=False)
if path is None:
return 0.0, False
total_length += _path_length(path)
path.set_to_end(disable_dynamics=True)
task._pyrep.step()
if not already_grasped:
robot.left_gripper.grasp(tray)
for pose in retreat_poses:
path = _plan_path(task._scene, "left", pose, ignore_collisions=True)
if path is None:
return 0.0, False
total_length += _path_length(path)
path.set_to_end(disable_dynamics=True)
task._pyrep.step()
final_height = float(tray.get_position()[2])
success = final_height >= _extract_height_threshold(templates)
score = max(
math.exp(-total_length / (DEFAULT_PATH_SCALE * 2.5)),
_extraction_progress_score(final_height, templates) if success else 0.0,
)
return score, bool(success)
finally:
_restore_snapshot(task, snapshot)
def _wait_branch(task, steps: int = 5) -> None:
for _ in range(steps):
task._scene.step()
def _open_more_branch(task, templates: MotionTemplates) -> None:
joint = Joint("oven_door_joint")
current = joint.get_joint_position()
joint.set_joint_position(current - templates.open_more_delta, disable_dynamics=True)
for _ in range(3):
task._pyrep.step()
def _hold_open_branch(task, templates: MotionTemplates) -> None:
joint = Joint("oven_door_joint")
current = joint.get_joint_position()
joint.set_joint_position(min(current, templates.hold_open_angle), disable_dynamics=True)
for _ in range(3):
task._pyrep.step()
def _frame_metrics(
episode_dir: Path,
demo: Demo,
frame_state: ReplayState,
templates: MotionTemplates,
tray_handle: int,
) -> Dict[str, float]:
grasp_points = _sample_grasp_points(templates, frame_state.tray_pose)
full_tray_points = _sample_full_tray_points(frame_state.tray_pose)
camera_values: Dict[str, Dict[str, float]] = {}
for camera_name in FULL_CAMERA_SET:
depth_m = _load_depth_meters(episode_dir, demo, frame_state.frame_index, camera_name)
mask = _load_mask(episode_dir, frame_state.frame_index, camera_name)
extrinsics = demo[frame_state.frame_index].misc[f"{camera_name}_camera_extrinsics"]
intrinsics = demo[frame_state.frame_index].misc[f"{camera_name}_camera_intrinsics"]
camera_values[camera_name] = {
"grasp_visibility": _visibility_ratio(
grasp_points, depth_m, mask, tray_handle, extrinsics, intrinsics
),
"tray_visibility": _visibility_ratio(
full_tray_points, depth_m, mask, tray_handle, extrinsics, intrinsics
),
}
values: Dict[str, float] = {}
for name, camera_set in {"three_view": THREE_VIEW_SET, "full_view": FULL_CAMERA_SET}.items():
values[f"{name}_visibility"] = _union_visibility(
camera_values[camera_name]["grasp_visibility"] for camera_name in camera_set
)
values[f"{name}_whole_tray_visibility"] = _union_visibility(
camera_values[camera_name]["tray_visibility"] for camera_name in camera_set
)
return values
def _compute_frame_row_isolated(
episode_dir: Path,
demo: Demo,
templates: MotionTemplates,
checkpoint_stride: int,
frame_index: int,
) -> Dict[str, float]:
env = _launch_replay_env()
try:
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride)
cache.reset()
cache.step_to(frame_index)
state = cache.current_state()
visibility = _frame_metrics(episode_dir, demo, state, templates, Shape("tray").get_handle())
p_pre, y_pre = _pregrasp_score_and_success(task, templates)
p_ext, y_ext = _extract_score_and_success(task, templates)
return {
"frame_index": frame_index,
"time_norm": frame_index / max(1, len(demo) - 1),
"door_angle": state.door_angle,
"right_gripper_open": state.right_gripper_open,
"left_gripper_open": state.left_gripper_open,
"p_pre": p_pre,
"p_ext": p_ext,
"y_pre": float(bool(y_pre)),
"y_ext": float(bool(y_ext)),
**visibility,
}
finally:
env.shutdown()
def _safe_auc(y_true: np.ndarray, y_score: np.ndarray) -> float:
if len(np.unique(y_true)) < 2:
return float("nan")
return float(roc_auc_score(y_true, y_score))
def _safe_auprc(y_true: np.ndarray, y_score: np.ndarray) -> float:
if len(np.unique(y_true)) < 2:
return float("nan")
return float(average_precision_score(y_true, y_score))
def _first_crossing(values: np.ndarray, threshold: float) -> int:
above = np.flatnonzero(values >= threshold)
return int(above[0]) if len(above) else -1
def _transition_count(binary_values: np.ndarray) -> Tuple[int, int]:
diffs = np.diff(binary_values.astype(int))
return int(np.sum(diffs == 1)), int(np.sum(diffs == -1))
def _keyframe_subset(frame_df: pd.DataFrame, keyframes: Sequence[int]) -> pd.DataFrame:
key_df = frame_df.iloc[list(keyframes)].copy()
key_df["keyframe_ordinal"] = np.arange(len(key_df))
return key_df
def _interventional_validity(
task,
cache: ReplayCache,
episode_dir: Path,
demo: Demo,
templates: MotionTemplates,
tray_handle: int,
frame_df: pd.DataFrame,
) -> Dict[str, float]:
ready_indices = np.flatnonzero(frame_df["y_ready"].to_numpy(dtype=bool))
ready_onset = int(ready_indices[0]) if len(ready_indices) else len(frame_df) // 2
initial_snapshot = cache.snapshot()
sample_indices = sorted(
{
max(0, ready_onset - 10),
max(0, ready_onset - 5),
ready_onset,
min(len(frame_df) - 1, ready_onset + 20),
}
)
stats = {
"pre_ready_open_more_increases_pext": 0,
"pre_ready_open_more_trials": 0,
"pre_ready_hold_open_increases_pext": 0,
"pre_ready_hold_open_trials": 0,
"pre_ready_extract_success": 0,
"pre_ready_extract_trials": 0,
"pre_ready_wait_extract_success": 0,
"pre_ready_wait_trials": 0,
"post_ready_extract_success": 0,
"post_ready_extract_trials": 0,
"post_ready_open_more_low_gain": 0,
"post_ready_open_more_trials": 0,
"post_ready_hold_open_low_gain": 0,
"post_ready_hold_open_trials": 0,
}
for frame_index in sample_indices:
cache.restore_to_index(initial_snapshot, 0)
cache.step_to(frame_index)
snapshot = cache.snapshot()
base_pext, base_extract_success = _extract_score_and_success(task, templates)
pre_ready = not bool(frame_df.iloc[frame_index]["y_ready"])
_restore_snapshot(task, snapshot)
_open_more_branch(task, templates)
open_pext, _ = _extract_score_and_success(task, templates)
_restore_snapshot(task, snapshot)
_hold_open_branch(task, templates)
hold_pext, _ = _extract_score_and_success(task, templates)
_restore_snapshot(task, snapshot)
_wait_branch(task)
_, wait_extract_success = _extract_score_and_success(task, templates)
_restore_snapshot(task, snapshot)
if pre_ready:
stats["pre_ready_open_more_trials"] += 1
stats["pre_ready_hold_open_trials"] += 1
stats["pre_ready_extract_trials"] += 1
stats["pre_ready_wait_trials"] += 1
if open_pext > base_pext:
stats["pre_ready_open_more_increases_pext"] += 1
if hold_pext > base_pext:
stats["pre_ready_hold_open_increases_pext"] += 1
if base_extract_success:
stats["pre_ready_extract_success"] += 1
if wait_extract_success:
stats["pre_ready_wait_extract_success"] += 1
else:
stats["post_ready_extract_trials"] += 1
stats["post_ready_open_more_trials"] += 1
stats["post_ready_hold_open_trials"] += 1
if base_extract_success:
stats["post_ready_extract_success"] += 1
if (open_pext - base_pext) <= 0.05:
stats["post_ready_open_more_low_gain"] += 1
if (hold_pext - base_pext) <= 0.05:
stats["post_ready_hold_open_low_gain"] += 1
return {
key: float(value) for key, value in stats.items()
}
def _analyze_episode(
dataset_root: Path,
episode_dir: Path,
templates: MotionTemplates,
template_frames: Dict[str, int],
checkpoint_stride: int = 16,
max_frames: Optional[int] = None,
independent_replay: bool = False,
) -> EpisodeArtifacts:
env = _launch_replay_env()
try:
demo = _load_demo(episode_dir)
descriptions = _load_descriptions(episode_dir)
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride)
cache.reset()
tray_handle = Shape("tray").get_handle()
num_frames = len(demo) if max_frames is None else min(len(demo), max_frames)
rows: List[Dict[str, float]] = []
door_angles: List[float] = []
initial_snapshot = cache.checkpoints[0] if independent_replay else None
for frame_index in range(num_frames):
if independent_replay:
cache.restore_to_index(initial_snapshot, 0)
cache.step_to(frame_index)
frame_snapshot = cache.snapshot() if not independent_replay else None
state = cache.current_state()
door_angles.append(state.door_angle)
visibility = _frame_metrics(episode_dir, demo, state, templates, tray_handle)
p_pre, y_pre = _pregrasp_score_and_success(task, templates)
p_ext, y_ext = _extract_score_and_success(task, templates)
rows.append(
{
"frame_index": frame_index,
"time_norm": frame_index / max(1, num_frames - 1),
"door_angle": state.door_angle,
"right_gripper_open": state.right_gripper_open,
"left_gripper_open": state.left_gripper_open,
"p_pre": p_pre,
"p_ext": p_ext,
"y_pre": float(bool(y_pre)),
"y_ext": float(bool(y_ext)),
**visibility,
}
)
if frame_snapshot is not None:
cache.restore(frame_snapshot)
if (frame_index + 1) % 25 == 0 or (frame_index + 1) == num_frames:
print(
f"[{episode_dir.name}] analyzed {frame_index + 1}/{num_frames} dense frames",
flush=True,
)
frame_df = pd.DataFrame(rows)
door_speed = np.gradient(frame_df["door_angle"].to_numpy(), DEMO_DT)
frame_df["door_speed_abs"] = np.abs(door_speed)
y_ext_binary = frame_df["y_ext"].to_numpy(dtype=bool)
y_ready = np.zeros(len(frame_df), dtype=bool)
for i in range(len(frame_df)):
window = y_ext_binary[i : i + 3]
if len(window) == 3 and np.all(window) and frame_df.iloc[i]["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU:
y_ready[i] = True
if np.any(y_ready):
first_ready = int(np.flatnonzero(y_ready)[0])
y_ready[first_ready:] = True
frame_df["y_ready"] = y_ready.astype(float)
phase_trigger = (
(frame_df["full_view_visibility"] >= DEFAULT_VISIBILITY_TAU)
& (frame_df["p_ext"] >= DEFAULT_PEXT_TAU)
& (frame_df["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU)
).to_numpy(dtype=bool)
phase_switch = np.zeros(len(frame_df), dtype=bool)
if np.any(phase_trigger):
first_phase = int(np.flatnonzero(phase_trigger)[0])
phase_switch[first_phase:] = True
frame_df["phase_switch"] = phase_switch.astype(float)
keyframes = [index for index in _keypoint_discovery(demo) if index < num_frames]
key_df = _keyframe_subset(frame_df, keyframes)
y_pre_arr = frame_df["y_pre"].to_numpy(dtype=int)
y_ext_arr = frame_df["y_ext"].to_numpy(dtype=int)
y_ready_arr = frame_df["y_ready"].to_numpy(dtype=int)
p_pre_arr = frame_df["p_pre"].to_numpy(dtype=float)
p_ext_arr = frame_df["p_ext"].to_numpy(dtype=float)
phase_arr = frame_df["phase_switch"].to_numpy(dtype=int)
full_vis = frame_df["full_view_visibility"].to_numpy(dtype=float)
whole_vis = frame_df["full_view_whole_tray_visibility"].to_numpy(dtype=float)
door_angle_arr = frame_df["door_angle"].to_numpy(dtype=float)
time_arr = frame_df["time_norm"].to_numpy(dtype=float)
ppre_cross = _first_crossing(p_pre_arr, DEFAULT_PEXT_TAU)
pext_cross = _first_crossing(p_ext_arr, DEFAULT_PEXT_TAU)
phase_cross = _first_crossing(frame_df["phase_switch"].to_numpy(dtype=float), 0.5)
ready_cross = _first_crossing(y_ready_arr.astype(float), 0.5)
phase_rises, phase_falls = _transition_count(phase_arr)
key_phase_cross = _first_crossing(key_df["phase_switch"].to_numpy(dtype=float), 0.5)
key_ready_cross = _first_crossing(key_df["y_ready"].to_numpy(dtype=float), 0.5)
interventions = _interventional_validity(
task, cache, episode_dir, demo, templates, tray_handle, frame_df
)
metrics = {
"episode_name": episode_dir.name,
"description": descriptions[0],
"num_dense_frames": int(num_frames),
"num_keyframes": int(len(key_df)),
"phase_switch_rises": int(phase_rises),
"phase_switch_falls": int(phase_falls),
"ppre_cross_frame": int(ppre_cross),
"pext_cross_frame": int(pext_cross),
"phase_cross_frame": int(phase_cross),
"ready_cross_frame": int(ready_cross),
"ordering_ok": bool(ppre_cross == -1 or pext_cross == -1 or ppre_cross <= pext_cross),
"dense_boundary_error_frames": float(abs(phase_cross - ready_cross))
if phase_cross >= 0 and ready_cross >= 0
else float("nan"),
"dense_boundary_error_fraction": float(abs(phase_cross - ready_cross) / len(frame_df))
if phase_cross >= 0 and ready_cross >= 0
else float("nan"),
"key_boundary_error_keyframes": float(abs(key_phase_cross - key_ready_cross))
if key_phase_cross >= 0 and key_ready_cross >= 0
else float("nan"),
"auroc_vret_ypre_three": _safe_auc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)),
"auprc_vret_ypre_three": _safe_auprc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)),
"auroc_vret_ypre_full": _safe_auc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)),
"auprc_vret_ypre_full": _safe_auprc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)),
"auroc_ppre_ypre": _safe_auc(y_pre_arr, p_pre_arr),
"auprc_ppre_ypre": _safe_auprc(y_pre_arr, p_pre_arr),
"auroc_pext_yext": _safe_auc(y_ext_arr, p_ext_arr),
"auprc_pext_yext": _safe_auprc(y_ext_arr, p_ext_arr),
"auroc_phase_yready": _safe_auc(y_ready_arr, frame_df["phase_switch"].to_numpy(dtype=float)),
"auprc_phase_yready": _safe_auprc(y_ready_arr, frame_df["phase_switch"].to_numpy(dtype=float)),
"f1_phase_yready": float(f1_score(y_ready_arr, phase_arr))
if np.any(y_ready_arr) and np.any(phase_arr)
else float("nan"),
"baseline_auroc_door_yext": _safe_auc(y_ext_arr, door_angle_arr),
"baseline_auprc_door_yext": _safe_auprc(y_ext_arr, door_angle_arr),
"baseline_auroc_time_yext": _safe_auc(y_ext_arr, time_arr),
"baseline_auprc_time_yext": _safe_auprc(y_ext_arr, time_arr),
"baseline_auroc_whole_vis_yext": _safe_auc(y_ext_arr, whole_vis),
"baseline_auprc_whole_vis_yext": _safe_auprc(y_ext_arr, whole_vis),
**interventions,
}
return EpisodeArtifacts(
episode_name=episode_dir.name,
dense=frame_df,
keyframes=key_df,
metrics=metrics,
template_frames=template_frames,
)
finally:
env.shutdown()
def _aggregate_summary(episode_metrics: List[Dict[str, object]]) -> Dict[str, object]:
frame = pd.DataFrame(episode_metrics)
numeric = frame.select_dtypes(include=[np.number])
summary = {
"num_episodes": int(len(frame)),
"mean_metrics": numeric.mean(numeric_only=True).to_dict(),
"median_metrics": numeric.median(numeric_only=True).to_dict(),
"single_switch_rate": float((frame["phase_switch_rises"] == 1).mean())
if len(frame)
else float("nan"),
"reversion_rate": float((frame["phase_switch_falls"] > 0).mean())
if len(frame)
else float("nan"),
"ordering_ok_rate": float(frame["ordering_ok"].mean()) if len(frame) else float("nan"),
}
return summary
def run_study(
dataset_root: str,
result_dir: str,
max_episodes: Optional[int] = None,
checkpoint_stride: int = 16,
max_frames: Optional[int] = None,
episode_offset: int = 0,
template_episode_index: int = 0,
independent_replay: bool = False,
) -> Dict[str, object]:
dataset_path = Path(dataset_root)
result_path = Path(result_dir)
result_path.mkdir(parents=True, exist_ok=True)
all_episode_dirs = _episode_dirs(dataset_path)
if not all_episode_dirs:
raise RuntimeError(f"no episodes available under {dataset_root}")
if not (0 <= template_episode_index < len(all_episode_dirs)):
raise ValueError(
f"template_episode_index {template_episode_index} outside available range 0..{len(all_episode_dirs) - 1}"
)
episode_dirs = all_episode_dirs[episode_offset:]
if max_episodes is not None:
episode_dirs = episode_dirs[:max_episodes]
if not episode_dirs:
raise RuntimeError(
f"no episodes selected under {dataset_root} with offset={episode_offset} max_episodes={max_episodes}"
)
template_episode_dir = all_episode_dirs[template_episode_index]
templates, template_frames = _derive_templates(dataset_path, template_episode_dir)
with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle:
json.dump(
{
"templates": templates.to_json(),
"template_episode": template_episode_dir.name,
"template_frames": template_frames,
"episode_offset": episode_offset,
},
handle,
indent=2,
)
episode_metrics: List[Dict[str, object]] = []
for episode_dir in episode_dirs:
artifacts = _analyze_episode(
dataset_path,
episode_dir,
templates,
template_frames,
checkpoint_stride=checkpoint_stride,
max_frames=max_frames,
independent_replay=independent_replay,
)
artifacts.dense.to_csv(result_path.joinpath(f"{episode_dir.name}.dense.csv"), index=False)
artifacts.keyframes.to_csv(
result_path.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False
)
with result_path.joinpath(f"{episode_dir.name}.metrics.json").open(
"w", encoding="utf-8"
) as handle:
json.dump(artifacts.metrics, handle, indent=2)
episode_metrics.append(artifacts.metrics)
summary = _aggregate_summary(episode_metrics)
with result_path.joinpath("summary.json").open("w", encoding="utf-8") as handle:
json.dump(summary, handle, indent=2)
return summary
def main(argv: Optional[Sequence[str]] = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset-root",
default="/workspace/data/bimanual_take_tray_out_of_oven_train_128",
)
parser.add_argument(
"--result-dir",
default="/workspace/reveal_retrieve_label_study/results/oven_first_pass",
)
parser.add_argument("--max-episodes", type=int, default=1)
parser.add_argument("--checkpoint-stride", type=int, default=16)
parser.add_argument("--max-frames", type=int)
parser.add_argument("--episode-offset", type=int, default=0)
parser.add_argument("--template-episode-index", type=int, default=0)
parser.add_argument("--independent-replay", action="store_true")
args = parser.parse_args(argv)
summary = run_study(
dataset_root=args.dataset_root,
result_dir=args.result_dir,
max_episodes=args.max_episodes,
checkpoint_stride=args.checkpoint_stride,
max_frames=args.max_frames,
episode_offset=args.episode_offset,
template_episode_index=args.template_episode_index,
independent_replay=args.independent_replay,
)
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())