import argparse import json import math import os import pickle from dataclasses import dataclass, asdict from pathlib import Path from typing import Dict, Iterable, List, Optional, Sequence, Tuple import numpy as np import pandas as pd from natsort import natsorted from PIL import Image from pyrep.const import ConfigurationPathAlgorithms as Algos from pyrep.errors import ConfigurationPathError from pyrep.objects.joint import Joint from pyrep.objects.shape import Shape from rlbench.action_modes.action_mode import BimanualJointPositionActionMode from rlbench.action_modes.gripper_action_modes import BimanualDiscrete from rlbench.backend.const import DEPTH_SCALE from rlbench.backend.utils import image_to_float_array, rgb_handles_to_mask from rlbench.bimanual_tasks.bimanual_take_tray_out_of_oven import ( BimanualTakeTrayOutOfOven, ) from rlbench.demo import Demo from rlbench.environment import Environment from rlbench.observation_config import CameraConfig, ObservationConfig from sklearn.metrics import ( average_precision_score, f1_score, roc_auc_score, ) FULL_CAMERA_SET = [ "front", "overhead", "wrist_right", "wrist_left", "over_shoulder_left", "over_shoulder_right", ] THREE_VIEW_SET = ["front", "wrist_right", "wrist_left"] DISPLAY = ":99" DEMO_DT = 0.05 DEFAULT_IMAGE_SIZE = (128, 128) DEFAULT_PATH_SCALE = 0.75 DEFAULT_VISIBILITY_TAU = 0.35 DEFAULT_PEXT_TAU = 0.45 DEFAULT_DOOR_SPEED_TAU = 0.08 DEFAULT_PLAN_TRIALS = 48 DEFAULT_PLAN_MAX_CONFIGS = 4 DEFAULT_PLAN_MAX_TIME_MS = 10 DEFAULT_PLAN_TRIALS_PER_GOAL = 4 @dataclass class SimulatorSnapshot: task_state: Tuple[bytes, int] right_arm_tree: bytes right_gripper_tree: bytes left_arm_tree: bytes left_gripper_tree: bytes right_grasped: Tuple[str, ...] left_grasped: Tuple[str, ...] @dataclass class ReplayState: frame_index: int tray_pose: np.ndarray door_angle: float right_gripper_pose: np.ndarray left_gripper_pose: np.ndarray right_gripper_open: float left_gripper_open: float snapshot: Optional[SimulatorSnapshot] = None @dataclass class MotionTemplates: pregrasp_rel_pose: np.ndarray grasp_rel_pose: np.ndarray retreat_rel_poses: List[np.ndarray] grasp_local_center: np.ndarray grasp_region_extents: np.ndarray hold_open_angle: float open_more_delta: float reference_tray_height: float def to_json(self) -> Dict[str, object]: return { "pregrasp_rel_pose": self.pregrasp_rel_pose.tolist(), "grasp_rel_pose": self.grasp_rel_pose.tolist(), "retreat_rel_poses": [pose.tolist() for pose in self.retreat_rel_poses], "grasp_local_center": self.grasp_local_center.tolist(), "grasp_region_extents": self.grasp_region_extents.tolist(), "hold_open_angle": float(self.hold_open_angle), "open_more_delta": float(self.open_more_delta), "reference_tray_height": float(self.reference_tray_height), } @dataclass class EpisodeArtifacts: episode_name: str dense: pd.DataFrame keyframes: pd.DataFrame metrics: Dict[str, object] template_frames: Dict[str, int] def _configure_runtime() -> None: os.environ["DISPLAY"] = DISPLAY os.environ["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim" ld_library_path = os.environ.get("LD_LIBRARY_PATH", "") if "/workspace/coppelia_sim" not in ld_library_path: os.environ["LD_LIBRARY_PATH"] = ( f"{ld_library_path}:/workspace/coppelia_sim" if ld_library_path else "/workspace/coppelia_sim" ) os.environ["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim" os.environ.setdefault("XDG_RUNTIME_DIR", "/tmp/runtime-root") def _minimal_camera_config() -> Dict[str, CameraConfig]: return { "front": CameraConfig( rgb=False, depth=False, point_cloud=False, mask=False, image_size=DEFAULT_IMAGE_SIZE, ) } def _make_observation_config() -> ObservationConfig: return ObservationConfig( camera_configs=_minimal_camera_config(), joint_velocities=True, joint_positions=True, joint_forces=False, gripper_open=True, gripper_pose=True, gripper_matrix=False, gripper_joint_positions=True, gripper_touch_forces=False, task_low_dim_state=False, robot_name="bimanual", ) def _launch_replay_env() -> Environment: _configure_runtime() env = Environment( action_mode=BimanualJointPositionActionMode(), obs_config=_make_observation_config(), headless=True, robot_setup="dual_panda", ) env.launch() return env def _load_demo(episode_dir: Path) -> Demo: with episode_dir.joinpath("low_dim_obs.pkl").open("rb") as handle: return pickle.load(handle) def _load_descriptions(episode_dir: Path) -> List[str]: with episode_dir.joinpath("variation_descriptions.pkl").open("rb") as handle: return pickle.load(handle) def _episode_dirs(dataset_root: Path) -> List[Path]: episodes_dir = dataset_root.joinpath("all_variations", "episodes") return [ episodes_dir.joinpath(name) for name in natsorted(os.listdir(episodes_dir)) if episodes_dir.joinpath(name, "low_dim_obs.pkl").exists() ] def _camera_file(episode_dir: Path, camera_name: str, kind: str, frame_index: int) -> Path: if kind == "rgb": return episode_dir.joinpath(f"{camera_name}_rgb", f"rgb_{frame_index:04d}.png") if kind == "depth": return episode_dir.joinpath( f"{camera_name}_depth", f"depth_{frame_index:04d}.png" ) if kind == "mask": return episode_dir.joinpath(f"{camera_name}_mask", f"mask_{frame_index:04d}.png") raise ValueError(f"unknown kind: {kind}") def _load_depth_meters(episode_dir: Path, demo: Demo, frame_index: int, camera_name: str) -> np.ndarray: image = Image.open(_camera_file(episode_dir, camera_name, "depth", frame_index)) depth = image_to_float_array(image, DEPTH_SCALE) near = demo[frame_index].misc[f"{camera_name}_camera_near"] far = demo[frame_index].misc[f"{camera_name}_camera_far"] return near + depth * (far - near) def _load_mask(episode_dir: Path, frame_index: int, camera_name: str) -> np.ndarray: image = np.array(Image.open(_camera_file(episode_dir, camera_name, "mask", frame_index))) return rgb_handles_to_mask(image) def _capture_snapshot(task) -> SimulatorSnapshot: robot = task._scene.robot return SimulatorSnapshot( task_state=task._task.get_state(), right_arm_tree=robot.right_arm.get_configuration_tree(), right_gripper_tree=robot.right_gripper.get_configuration_tree(), left_arm_tree=robot.left_arm.get_configuration_tree(), left_gripper_tree=robot.left_gripper.get_configuration_tree(), right_grasped=tuple(obj.get_name() for obj in robot.right_gripper.get_grasped_objects()), left_grasped=tuple(obj.get_name() for obj in robot.left_gripper.get_grasped_objects()), ) def _restore_snapshot(task, snapshot: SimulatorSnapshot) -> None: robot = task._scene.robot robot.release_gripper() task._pyrep.set_configuration_tree(snapshot.right_arm_tree) task._pyrep.set_configuration_tree(snapshot.right_gripper_tree) task._pyrep.set_configuration_tree(snapshot.left_arm_tree) task._pyrep.set_configuration_tree(snapshot.left_gripper_tree) task._pyrep.set_configuration_tree(snapshot.task_state[0]) for name in snapshot.right_grasped: robot.right_gripper.grasp(Shape(name)) for name in snapshot.left_grasped: robot.left_gripper.grasp(Shape(name)) task._pyrep.step() def _build_joint_action(target_obs) -> np.ndarray: def _joint_vector(value, fallback) -> np.ndarray: array = np.asarray(fallback if value is None else value, dtype=np.float64) if array.ndim != 1 or array.shape[0] != 7: array = np.asarray(fallback, dtype=np.float64) if array.ndim != 1 or array.shape[0] != 7: raise ValueError(f"invalid joint vector shape: {array.shape}") return array right = _joint_vector( target_obs.misc.get("right_executed_demo_joint_position_action"), target_obs.right.joint_positions, ) left = _joint_vector( target_obs.misc.get("left_executed_demo_joint_position_action"), target_obs.left.joint_positions, ) return np.concatenate( [ right, np.array([target_obs.right.gripper_open], dtype=np.float64), left, np.array([target_obs.left.gripper_open], dtype=np.float64), ] ) class ReplayCache: def __init__(self, task, demo: Demo, checkpoint_stride: int = 16): self.task = task self.demo = demo self.checkpoint_stride = checkpoint_stride self.current_index = 0 self.current_obs = None self.checkpoints: Dict[int, SimulatorSnapshot] = {} self.discrete_gripper = BimanualDiscrete() def reset(self) -> None: _, self.current_obs = self.task.reset_to_demo(self.demo) self.current_index = 0 self.checkpoints = {0: _capture_snapshot(self.task)} def step_to(self, target_index: int): if target_index < self.current_index: checkpoint_index = max(i for i in self.checkpoints if i <= target_index) _restore_snapshot(self.task, self.checkpoints[checkpoint_index]) self.current_index = checkpoint_index self.current_obs = self._observation_from_scene() while self.current_index < target_index: next_index = self.current_index + 1 self.task._action_mode.action( self.task._scene, _build_joint_action(self.demo[next_index]) ) self._apply_gripper_replay(self.demo[next_index]) self.current_obs = self._observation_from_scene() self.current_index = next_index if self.current_index % self.checkpoint_stride == 0: self.checkpoints[self.current_index] = _capture_snapshot(self.task) return self.current_obs def current_state(self) -> ReplayState: return ReplayState( frame_index=self.current_index, tray_pose=Shape("tray").get_pose(), door_angle=Joint("oven_door_joint").get_joint_position(), right_gripper_pose=self.current_obs.right.gripper_pose.copy(), left_gripper_pose=self.current_obs.left.gripper_pose.copy(), right_gripper_open=float(self.current_obs.right.gripper_open), left_gripper_open=float(self.current_obs.left.gripper_open), snapshot=None, ) def snapshot(self) -> SimulatorSnapshot: return _capture_snapshot(self.task) def restore(self, snapshot: SimulatorSnapshot) -> None: _restore_snapshot(self.task, snapshot) self.current_obs = self._observation_from_scene() def restore_to_index(self, snapshot: SimulatorSnapshot, frame_index: int) -> None: self.restore(snapshot) self.current_index = frame_index def _observation_from_scene(self): return self.task._scene.get_observation() def _apply_gripper_replay(self, target_obs) -> None: desired = np.array( [target_obs.right.gripper_open, target_obs.left.gripper_open], dtype=np.float64, ) current = np.array( [ float(all(x > 0.9 for x in self.task._scene.robot.right_gripper.get_open_amount())), float(all(x > 0.9 for x in self.task._scene.robot.left_gripper.get_open_amount())), ], dtype=np.float64, ) if not np.allclose(current, desired): self.discrete_gripper.action(self.task._scene, desired) self.current_obs = self._observation_from_scene() def _quat_to_matrix(quat: Sequence[float]) -> np.ndarray: x, y, z, w = quat xx, yy, zz = x * x, y * y, z * z xy, xz, yz = x * y, x * z, y * z wx, wy, wz = w * x, w * y, w * z return np.array( [ [1 - 2 * (yy + zz), 2 * (xy - wz), 2 * (xz + wy)], [2 * (xy + wz), 1 - 2 * (xx + zz), 2 * (yz - wx)], [2 * (xz - wy), 2 * (yz + wx), 1 - 2 * (xx + yy)], ], dtype=np.float64, ) def _matrix_to_quat(matrix: np.ndarray) -> np.ndarray: trace = np.trace(matrix) if trace > 0.0: s = math.sqrt(trace + 1.0) * 2.0 w = 0.25 * s x = (matrix[2, 1] - matrix[1, 2]) / s y = (matrix[0, 2] - matrix[2, 0]) / s z = (matrix[1, 0] - matrix[0, 1]) / s elif matrix[0, 0] > matrix[1, 1] and matrix[0, 0] > matrix[2, 2]: s = math.sqrt(1.0 + matrix[0, 0] - matrix[1, 1] - matrix[2, 2]) * 2.0 w = (matrix[2, 1] - matrix[1, 2]) / s x = 0.25 * s y = (matrix[0, 1] + matrix[1, 0]) / s z = (matrix[0, 2] + matrix[2, 0]) / s elif matrix[1, 1] > matrix[2, 2]: s = math.sqrt(1.0 + matrix[1, 1] - matrix[0, 0] - matrix[2, 2]) * 2.0 w = (matrix[0, 2] - matrix[2, 0]) / s x = (matrix[0, 1] + matrix[1, 0]) / s y = 0.25 * s z = (matrix[1, 2] + matrix[2, 1]) / s else: s = math.sqrt(1.0 + matrix[2, 2] - matrix[0, 0] - matrix[1, 1]) * 2.0 w = (matrix[1, 0] - matrix[0, 1]) / s x = (matrix[0, 2] + matrix[2, 0]) / s y = (matrix[1, 2] + matrix[2, 1]) / s z = 0.25 * s quat = np.array([x, y, z, w], dtype=np.float64) return quat / np.linalg.norm(quat) def _pose_to_matrix(pose: Sequence[float]) -> np.ndarray: matrix = np.eye(4, dtype=np.float64) matrix[:3, :3] = _quat_to_matrix(pose[3:]) matrix[:3, 3] = pose[:3] return matrix def _matrix_to_pose(matrix: np.ndarray) -> np.ndarray: return np.concatenate([matrix[:3, 3], _matrix_to_quat(matrix[:3, :3])], axis=0) def _relative_pose(reference_pose: Sequence[float], target_pose: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) target = _pose_to_matrix(target_pose) rel = np.linalg.inv(reference) @ target return _matrix_to_pose(rel) def _apply_relative_pose(reference_pose: Sequence[float], relative_pose: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) relative = _pose_to_matrix(relative_pose) world = reference @ relative return _matrix_to_pose(world) def _world_to_local(reference_pose: Sequence[float], point_world: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) point = np.concatenate([np.asarray(point_world, dtype=np.float64), [1.0]]) return (np.linalg.inv(reference) @ point)[:3] def _local_to_world(reference_pose: Sequence[float], point_local: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) point = np.concatenate([np.asarray(point_local, dtype=np.float64), [1.0]]) return (reference @ point)[:3] def _first_transition(demo: Demo, side: str, open_to_closed: bool) -> int: values = [getattr(demo[i], side).gripper_open for i in range(len(demo))] for i in range(1, len(values)): if open_to_closed and values[i - 1] > 0.5 and values[i] < 0.5: return i if not open_to_closed and values[i - 1] < 0.5 and values[i] > 0.5: return i raise RuntimeError(f"no gripper transition found for {side}") def _derive_templates(dataset_root: Path, template_episode_dir: Path) -> Tuple[MotionTemplates, Dict[str, int]]: env = _launch_replay_env() try: demo = _load_demo(template_episode_dir) task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=8) cache.reset() base_pose = task._task.get_base().get_pose() left_close = _first_transition(demo, "left", open_to_closed=True) left_open = _first_transition(demo, "left", open_to_closed=False) pregrasp_index = max(0, left_close - 5) right_close = _first_transition(demo, "right", open_to_closed=True) right_open = _first_transition(demo, "right", open_to_closed=False) interesting = sorted( { pregrasp_index, left_close, min(len(demo) - 1, left_close + 10), min(len(demo) - 1, left_close + 20), min(len(demo) - 1, left_close + 30), max(left_close + 1, left_open - 20), max(left_close + 1, left_open - 10), max(left_close + 1, left_open - 5), right_close, right_open, left_open, } ) states: Dict[int, ReplayState] = {} for frame_index in interesting: cache.step_to(frame_index) states[frame_index] = cache.current_state() pregrasp_rel_pose = _relative_pose( states[pregrasp_index].tray_pose, states[pregrasp_index].left_gripper_pose ) grasp_rel_pose = _relative_pose( states[left_close].tray_pose, states[left_close].left_gripper_pose ) retreat_rel_poses = [ _relative_pose(base_pose, states[index].left_gripper_pose) for index in interesting if index > left_close ] grasp_local_center = _world_to_local( states[left_close].tray_pose, states[left_close].left_gripper_pose[:3] ) templates = MotionTemplates( pregrasp_rel_pose=pregrasp_rel_pose, grasp_rel_pose=grasp_rel_pose, retreat_rel_poses=retreat_rel_poses, grasp_local_center=grasp_local_center, grasp_region_extents=np.array([0.03, 0.015, 0.004], dtype=np.float64), hold_open_angle=float(states[right_open].door_angle), open_more_delta=max( 0.12, float(states[right_open].door_angle - states[right_close].door_angle) * 0.25, ), reference_tray_height=float(states[left_close].tray_pose[2]), ) template_frames = { "pregrasp": pregrasp_index, "grasp": left_close, "right_close": right_close, "right_open": right_open, } return templates, template_frames finally: env.shutdown() def _camera_projection(extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: camera_pos = extrinsics[:3, 3:4] rotation = extrinsics[:3, :3] world_to_camera = np.concatenate([rotation.T, -(rotation.T @ camera_pos)], axis=1) projection = intrinsics @ world_to_camera return projection, world_to_camera def _project_points(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: projection, world_to_camera = _camera_projection(extrinsics, intrinsics) homogeneous = np.concatenate([points_world, np.ones((len(points_world), 1))], axis=1) camera_xyz = (world_to_camera @ homogeneous.T).T image_xyz = (projection @ homogeneous.T).T uv = image_xyz[:, :2] / image_xyz[:, 2:3] return uv, camera_xyz def _sample_grasp_points(templates: MotionTemplates, tray_pose: np.ndarray) -> np.ndarray: center = templates.grasp_local_center extents = templates.grasp_region_extents xs = np.linspace(center[0] - extents[0], center[0] + extents[0], 9) ys = np.linspace(center[1] - extents[1], center[1] + extents[1], 5) zs = np.linspace(center[2] - extents[2], center[2] + extents[2], 3) points_local = np.array([[x, y, z] for x in xs for y in ys for z in zs], dtype=np.float64) return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) def _sample_full_tray_points(tray_pose: np.ndarray) -> np.ndarray: tray = Shape("tray") bbox = np.asarray(tray.get_bounding_box(), dtype=np.float64) xs = np.linspace(bbox[0], bbox[1], 10) ys = np.linspace(bbox[2], bbox[3], 12) zs = np.linspace(bbox[4], bbox[5], 3) points_local = np.array([[x, y, z] for x in xs for y in ys for z in zs], dtype=np.float64) return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) def _visibility_ratio( points_world: np.ndarray, depth_m: np.ndarray, mask: np.ndarray, tray_handle: int, extrinsics: np.ndarray, intrinsics: np.ndarray, depth_tol: float = 0.02, ) -> float: uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) height, width = depth_m.shape visible = 0 total = 0 for (u, v), (_, _, camera_depth) in zip(uv, camera_xyz): if camera_depth <= 0: continue if not (0 <= u < width and 0 <= v < height): continue total += 1 px = int(round(u)) py = int(round(v)) px = min(max(px, 0), width - 1) py = min(max(py, 0), height - 1) observed = float(depth_m[py, px]) if abs(observed - camera_depth) <= depth_tol: visible += 1 return float(visible / total) if total else 0.0 def _union_visibility(values: Iterable[float]) -> float: product = 1.0 for value in values: product *= 1.0 - float(value) return 1.0 - product def _keypoint_discovery(demo: Demo, stopping_delta: float = 0.1) -> List[int]: keypoints: List[int] = [] right_prev = demo[0].right.gripper_open left_prev = demo[0].left.gripper_open stopped_buffer = 0 for i, obs in enumerate(demo._observations): if i < 2 or i >= len(demo) - 1: right_stopped = left_stopped = False else: right_stopped = ( np.allclose(obs.right.joint_velocities, 0, atol=stopping_delta) and obs.right.gripper_open == demo[i + 1].right.gripper_open and obs.right.gripper_open == demo[i - 1].right.gripper_open and demo[i - 2].right.gripper_open == demo[i - 1].right.gripper_open ) left_stopped = ( np.allclose(obs.left.joint_velocities, 0, atol=stopping_delta) and obs.left.gripper_open == demo[i + 1].left.gripper_open and obs.left.gripper_open == demo[i - 1].left.gripper_open and demo[i - 2].left.gripper_open == demo[i - 1].left.gripper_open ) stopped = stopped_buffer <= 0 and right_stopped and left_stopped stopped_buffer = 4 if stopped else stopped_buffer - 1 last = i == len(demo) - 1 state_changed = ( obs.right.gripper_open != right_prev or obs.left.gripper_open != left_prev ) if i != 0 and (state_changed or last or stopped): keypoints.append(i) right_prev = obs.right.gripper_open left_prev = obs.left.gripper_open if len(keypoints) > 1 and (keypoints[-1] - 1) == keypoints[-2]: keypoints.pop(-2) return keypoints def _plan_path(scene, arm_name: str, pose: np.ndarray, ignore_collisions: bool = False): arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm try: return arm.get_path( pose[:3], quaternion=pose[3:], ignore_collisions=ignore_collisions, trials=DEFAULT_PLAN_TRIALS, max_configs=DEFAULT_PLAN_MAX_CONFIGS, max_time_ms=DEFAULT_PLAN_MAX_TIME_MS, trials_per_goal=DEFAULT_PLAN_TRIALS_PER_GOAL, algorithm=Algos.RRTConnect, ) except Exception: return None def _path_length(path) -> float: if path is None: return math.inf try: return float(path._get_path_point_lengths()[-1]) except Exception: return math.inf def _pregrasp_candidates(tray_pose: np.ndarray, templates: MotionTemplates) -> List[np.ndarray]: candidates = [] base = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) candidates.append(base) for dx in (-0.02, 0.02): perturbed = base.copy() perturbed[0] += dx candidates.append(perturbed) return candidates def _extract_sequence_poses( tray_pose: np.ndarray, task_base_pose: np.ndarray, templates: MotionTemplates ) -> List[np.ndarray]: poses = [ _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose), _apply_relative_pose(tray_pose, templates.grasp_rel_pose), ] poses.extend( _apply_relative_pose(task_base_pose, pose) for pose in templates.retreat_rel_poses[:3] ) return poses def _extract_height_threshold(templates: MotionTemplates) -> float: return templates.reference_tray_height + 0.06 def _extraction_progress_score(current_height: float, templates: MotionTemplates) -> float: threshold = _extract_height_threshold(templates) margin = max(0.0, float(current_height) - threshold) # Saturate smoothly once the tray is clearly lifted above the oven lip. return min(1.0, 0.8 + margin / 0.12) def _pregrasp_score_and_success(task, templates: MotionTemplates) -> Tuple[float, bool]: tray = Shape("tray") if any( grasped.get_name() == tray.get_name() for grasped in task._scene.robot.left_gripper.get_grasped_objects() ): return 1.0, True tray_pose = Shape("tray").get_pose() best = 0.0 success = False for pose in _pregrasp_candidates(tray_pose, templates): path = _plan_path(task._scene, "left", pose, ignore_collisions=False) length = _path_length(path) if np.isfinite(length): success = True best = max(best, math.exp(-length / DEFAULT_PATH_SCALE)) return best, success def _extract_score_and_success(task, templates: MotionTemplates) -> Tuple[float, bool]: tray = Shape("tray") robot = task._scene.robot snapshot = _capture_snapshot(task) try: total_length = 0.0 current_height = float(tray.get_position()[2]) already_grasped = any( grasped.get_name() == tray.get_name() for grasped in robot.left_gripper.get_grasped_objects() ) if already_grasped and current_height >= _extract_height_threshold(templates): return _extraction_progress_score(current_height, templates), True poses = _extract_sequence_poses( tray.get_pose(), task._task.get_base().get_pose(), templates ) approach_poses = [] if already_grasped else poses[:2] retreat_poses = poses[2:] if not already_grasped else poses[2:] for pose in approach_poses: path = _plan_path(task._scene, "left", pose, ignore_collisions=False) if path is None: return 0.0, False total_length += _path_length(path) path.set_to_end(disable_dynamics=True) task._pyrep.step() if not already_grasped: robot.left_gripper.grasp(tray) for pose in retreat_poses: path = _plan_path(task._scene, "left", pose, ignore_collisions=True) if path is None: return 0.0, False total_length += _path_length(path) path.set_to_end(disable_dynamics=True) task._pyrep.step() final_height = float(tray.get_position()[2]) success = final_height >= _extract_height_threshold(templates) score = max( math.exp(-total_length / (DEFAULT_PATH_SCALE * 2.5)), _extraction_progress_score(final_height, templates) if success else 0.0, ) return score, bool(success) finally: _restore_snapshot(task, snapshot) def _wait_branch(task, steps: int = 5) -> None: for _ in range(steps): task._scene.step() def _open_more_branch(task, templates: MotionTemplates) -> None: joint = Joint("oven_door_joint") current = joint.get_joint_position() joint.set_joint_position(current - templates.open_more_delta, disable_dynamics=True) for _ in range(3): task._pyrep.step() def _hold_open_branch(task, templates: MotionTemplates) -> None: joint = Joint("oven_door_joint") current = joint.get_joint_position() joint.set_joint_position(min(current, templates.hold_open_angle), disable_dynamics=True) for _ in range(3): task._pyrep.step() def _frame_metrics( episode_dir: Path, demo: Demo, frame_state: ReplayState, templates: MotionTemplates, tray_handle: int, ) -> Dict[str, float]: grasp_points = _sample_grasp_points(templates, frame_state.tray_pose) full_tray_points = _sample_full_tray_points(frame_state.tray_pose) camera_values: Dict[str, Dict[str, float]] = {} for camera_name in FULL_CAMERA_SET: depth_m = _load_depth_meters(episode_dir, demo, frame_state.frame_index, camera_name) mask = _load_mask(episode_dir, frame_state.frame_index, camera_name) extrinsics = demo[frame_state.frame_index].misc[f"{camera_name}_camera_extrinsics"] intrinsics = demo[frame_state.frame_index].misc[f"{camera_name}_camera_intrinsics"] camera_values[camera_name] = { "grasp_visibility": _visibility_ratio( grasp_points, depth_m, mask, tray_handle, extrinsics, intrinsics ), "tray_visibility": _visibility_ratio( full_tray_points, depth_m, mask, tray_handle, extrinsics, intrinsics ), } values: Dict[str, float] = {} for name, camera_set in {"three_view": THREE_VIEW_SET, "full_view": FULL_CAMERA_SET}.items(): values[f"{name}_visibility"] = _union_visibility( camera_values[camera_name]["grasp_visibility"] for camera_name in camera_set ) values[f"{name}_whole_tray_visibility"] = _union_visibility( camera_values[camera_name]["tray_visibility"] for camera_name in camera_set ) return values def _compute_frame_row_isolated( episode_dir: Path, demo: Demo, templates: MotionTemplates, checkpoint_stride: int, frame_index: int, ) -> Dict[str, float]: env = _launch_replay_env() try: task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) cache.reset() cache.step_to(frame_index) state = cache.current_state() visibility = _frame_metrics(episode_dir, demo, state, templates, Shape("tray").get_handle()) p_pre, y_pre = _pregrasp_score_and_success(task, templates) p_ext, y_ext = _extract_score_and_success(task, templates) return { "frame_index": frame_index, "time_norm": frame_index / max(1, len(demo) - 1), "door_angle": state.door_angle, "right_gripper_open": state.right_gripper_open, "left_gripper_open": state.left_gripper_open, "p_pre": p_pre, "p_ext": p_ext, "y_pre": float(bool(y_pre)), "y_ext": float(bool(y_ext)), **visibility, } finally: env.shutdown() def _safe_auc(y_true: np.ndarray, y_score: np.ndarray) -> float: if len(np.unique(y_true)) < 2: return float("nan") return float(roc_auc_score(y_true, y_score)) def _safe_auprc(y_true: np.ndarray, y_score: np.ndarray) -> float: if len(np.unique(y_true)) < 2: return float("nan") return float(average_precision_score(y_true, y_score)) def _first_crossing(values: np.ndarray, threshold: float) -> int: above = np.flatnonzero(values >= threshold) return int(above[0]) if len(above) else -1 def _transition_count(binary_values: np.ndarray) -> Tuple[int, int]: diffs = np.diff(binary_values.astype(int)) return int(np.sum(diffs == 1)), int(np.sum(diffs == -1)) def _keyframe_subset(frame_df: pd.DataFrame, keyframes: Sequence[int]) -> pd.DataFrame: key_df = frame_df.iloc[list(keyframes)].copy() key_df["keyframe_ordinal"] = np.arange(len(key_df)) return key_df def _interventional_validity( task, cache: ReplayCache, episode_dir: Path, demo: Demo, templates: MotionTemplates, tray_handle: int, frame_df: pd.DataFrame, ) -> Dict[str, float]: ready_indices = np.flatnonzero(frame_df["y_ready"].to_numpy(dtype=bool)) ready_onset = int(ready_indices[0]) if len(ready_indices) else len(frame_df) // 2 initial_snapshot = cache.snapshot() sample_indices = sorted( { max(0, ready_onset - 10), max(0, ready_onset - 5), ready_onset, min(len(frame_df) - 1, ready_onset + 20), } ) stats = { "pre_ready_open_more_increases_pext": 0, "pre_ready_open_more_trials": 0, "pre_ready_hold_open_increases_pext": 0, "pre_ready_hold_open_trials": 0, "pre_ready_extract_success": 0, "pre_ready_extract_trials": 0, "pre_ready_wait_extract_success": 0, "pre_ready_wait_trials": 0, "post_ready_extract_success": 0, "post_ready_extract_trials": 0, "post_ready_open_more_low_gain": 0, "post_ready_open_more_trials": 0, "post_ready_hold_open_low_gain": 0, "post_ready_hold_open_trials": 0, } for frame_index in sample_indices: cache.restore_to_index(initial_snapshot, 0) cache.step_to(frame_index) snapshot = cache.snapshot() base_pext, base_extract_success = _extract_score_and_success(task, templates) pre_ready = not bool(frame_df.iloc[frame_index]["y_ready"]) _restore_snapshot(task, snapshot) _open_more_branch(task, templates) open_pext, _ = _extract_score_and_success(task, templates) _restore_snapshot(task, snapshot) _hold_open_branch(task, templates) hold_pext, _ = _extract_score_and_success(task, templates) _restore_snapshot(task, snapshot) _wait_branch(task) _, wait_extract_success = _extract_score_and_success(task, templates) _restore_snapshot(task, snapshot) if pre_ready: stats["pre_ready_open_more_trials"] += 1 stats["pre_ready_hold_open_trials"] += 1 stats["pre_ready_extract_trials"] += 1 stats["pre_ready_wait_trials"] += 1 if open_pext > base_pext: stats["pre_ready_open_more_increases_pext"] += 1 if hold_pext > base_pext: stats["pre_ready_hold_open_increases_pext"] += 1 if base_extract_success: stats["pre_ready_extract_success"] += 1 if wait_extract_success: stats["pre_ready_wait_extract_success"] += 1 else: stats["post_ready_extract_trials"] += 1 stats["post_ready_open_more_trials"] += 1 stats["post_ready_hold_open_trials"] += 1 if base_extract_success: stats["post_ready_extract_success"] += 1 if (open_pext - base_pext) <= 0.05: stats["post_ready_open_more_low_gain"] += 1 if (hold_pext - base_pext) <= 0.05: stats["post_ready_hold_open_low_gain"] += 1 return { key: float(value) for key, value in stats.items() } def _analyze_episode( dataset_root: Path, episode_dir: Path, templates: MotionTemplates, template_frames: Dict[str, int], checkpoint_stride: int = 16, max_frames: Optional[int] = None, independent_replay: bool = False, ) -> EpisodeArtifacts: env = _launch_replay_env() try: demo = _load_demo(episode_dir) descriptions = _load_descriptions(episode_dir) task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) cache.reset() tray_handle = Shape("tray").get_handle() num_frames = len(demo) if max_frames is None else min(len(demo), max_frames) rows: List[Dict[str, float]] = [] door_angles: List[float] = [] initial_snapshot = cache.checkpoints[0] if independent_replay else None for frame_index in range(num_frames): if independent_replay: cache.restore_to_index(initial_snapshot, 0) cache.step_to(frame_index) frame_snapshot = cache.snapshot() if not independent_replay else None state = cache.current_state() door_angles.append(state.door_angle) visibility = _frame_metrics(episode_dir, demo, state, templates, tray_handle) p_pre, y_pre = _pregrasp_score_and_success(task, templates) p_ext, y_ext = _extract_score_and_success(task, templates) rows.append( { "frame_index": frame_index, "time_norm": frame_index / max(1, num_frames - 1), "door_angle": state.door_angle, "right_gripper_open": state.right_gripper_open, "left_gripper_open": state.left_gripper_open, "p_pre": p_pre, "p_ext": p_ext, "y_pre": float(bool(y_pre)), "y_ext": float(bool(y_ext)), **visibility, } ) if frame_snapshot is not None: cache.restore(frame_snapshot) if (frame_index + 1) % 25 == 0 or (frame_index + 1) == num_frames: print( f"[{episode_dir.name}] analyzed {frame_index + 1}/{num_frames} dense frames", flush=True, ) frame_df = pd.DataFrame(rows) door_speed = np.gradient(frame_df["door_angle"].to_numpy(), DEMO_DT) frame_df["door_speed_abs"] = np.abs(door_speed) y_ext_binary = frame_df["y_ext"].to_numpy(dtype=bool) y_ready = np.zeros(len(frame_df), dtype=bool) for i in range(len(frame_df)): window = y_ext_binary[i : i + 3] if len(window) == 3 and np.all(window) and frame_df.iloc[i]["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU: y_ready[i] = True if np.any(y_ready): first_ready = int(np.flatnonzero(y_ready)[0]) y_ready[first_ready:] = True frame_df["y_ready"] = y_ready.astype(float) phase_trigger = ( (frame_df["full_view_visibility"] >= DEFAULT_VISIBILITY_TAU) & (frame_df["p_ext"] >= DEFAULT_PEXT_TAU) & (frame_df["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU) ).to_numpy(dtype=bool) phase_switch = np.zeros(len(frame_df), dtype=bool) if np.any(phase_trigger): first_phase = int(np.flatnonzero(phase_trigger)[0]) phase_switch[first_phase:] = True frame_df["phase_switch"] = phase_switch.astype(float) keyframes = [index for index in _keypoint_discovery(demo) if index < num_frames] key_df = _keyframe_subset(frame_df, keyframes) y_pre_arr = frame_df["y_pre"].to_numpy(dtype=int) y_ext_arr = frame_df["y_ext"].to_numpy(dtype=int) y_ready_arr = frame_df["y_ready"].to_numpy(dtype=int) p_pre_arr = frame_df["p_pre"].to_numpy(dtype=float) p_ext_arr = frame_df["p_ext"].to_numpy(dtype=float) phase_arr = frame_df["phase_switch"].to_numpy(dtype=int) full_vis = frame_df["full_view_visibility"].to_numpy(dtype=float) whole_vis = frame_df["full_view_whole_tray_visibility"].to_numpy(dtype=float) door_angle_arr = frame_df["door_angle"].to_numpy(dtype=float) time_arr = frame_df["time_norm"].to_numpy(dtype=float) ppre_cross = _first_crossing(p_pre_arr, DEFAULT_PEXT_TAU) pext_cross = _first_crossing(p_ext_arr, DEFAULT_PEXT_TAU) phase_cross = _first_crossing(frame_df["phase_switch"].to_numpy(dtype=float), 0.5) ready_cross = _first_crossing(y_ready_arr.astype(float), 0.5) phase_rises, phase_falls = _transition_count(phase_arr) key_phase_cross = _first_crossing(key_df["phase_switch"].to_numpy(dtype=float), 0.5) key_ready_cross = _first_crossing(key_df["y_ready"].to_numpy(dtype=float), 0.5) interventions = _interventional_validity( task, cache, episode_dir, demo, templates, tray_handle, frame_df ) metrics = { "episode_name": episode_dir.name, "description": descriptions[0], "num_dense_frames": int(num_frames), "num_keyframes": int(len(key_df)), "phase_switch_rises": int(phase_rises), "phase_switch_falls": int(phase_falls), "ppre_cross_frame": int(ppre_cross), "pext_cross_frame": int(pext_cross), "phase_cross_frame": int(phase_cross), "ready_cross_frame": int(ready_cross), "ordering_ok": bool(ppre_cross == -1 or pext_cross == -1 or ppre_cross <= pext_cross), "dense_boundary_error_frames": float(abs(phase_cross - ready_cross)) if phase_cross >= 0 and ready_cross >= 0 else float("nan"), "dense_boundary_error_fraction": float(abs(phase_cross - ready_cross) / len(frame_df)) if phase_cross >= 0 and ready_cross >= 0 else float("nan"), "key_boundary_error_keyframes": float(abs(key_phase_cross - key_ready_cross)) if key_phase_cross >= 0 and key_ready_cross >= 0 else float("nan"), "auroc_vret_ypre_three": _safe_auc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), "auprc_vret_ypre_three": _safe_auprc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), "auroc_vret_ypre_full": _safe_auc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), "auprc_vret_ypre_full": _safe_auprc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), "auroc_ppre_ypre": _safe_auc(y_pre_arr, p_pre_arr), "auprc_ppre_ypre": _safe_auprc(y_pre_arr, p_pre_arr), "auroc_pext_yext": _safe_auc(y_ext_arr, p_ext_arr), "auprc_pext_yext": _safe_auprc(y_ext_arr, p_ext_arr), "auroc_phase_yready": _safe_auc(y_ready_arr, frame_df["phase_switch"].to_numpy(dtype=float)), "auprc_phase_yready": _safe_auprc(y_ready_arr, frame_df["phase_switch"].to_numpy(dtype=float)), "f1_phase_yready": float(f1_score(y_ready_arr, phase_arr)) if np.any(y_ready_arr) and np.any(phase_arr) else float("nan"), "baseline_auroc_door_yext": _safe_auc(y_ext_arr, door_angle_arr), "baseline_auprc_door_yext": _safe_auprc(y_ext_arr, door_angle_arr), "baseline_auroc_time_yext": _safe_auc(y_ext_arr, time_arr), "baseline_auprc_time_yext": _safe_auprc(y_ext_arr, time_arr), "baseline_auroc_whole_vis_yext": _safe_auc(y_ext_arr, whole_vis), "baseline_auprc_whole_vis_yext": _safe_auprc(y_ext_arr, whole_vis), **interventions, } return EpisodeArtifacts( episode_name=episode_dir.name, dense=frame_df, keyframes=key_df, metrics=metrics, template_frames=template_frames, ) finally: env.shutdown() def _aggregate_summary(episode_metrics: List[Dict[str, object]]) -> Dict[str, object]: frame = pd.DataFrame(episode_metrics) numeric = frame.select_dtypes(include=[np.number]) summary = { "num_episodes": int(len(frame)), "mean_metrics": numeric.mean(numeric_only=True).to_dict(), "median_metrics": numeric.median(numeric_only=True).to_dict(), "single_switch_rate": float((frame["phase_switch_rises"] == 1).mean()) if len(frame) else float("nan"), "reversion_rate": float((frame["phase_switch_falls"] > 0).mean()) if len(frame) else float("nan"), "ordering_ok_rate": float(frame["ordering_ok"].mean()) if len(frame) else float("nan"), } return summary def run_study( dataset_root: str, result_dir: str, max_episodes: Optional[int] = None, checkpoint_stride: int = 16, max_frames: Optional[int] = None, episode_offset: int = 0, template_episode_index: int = 0, independent_replay: bool = False, ) -> Dict[str, object]: dataset_path = Path(dataset_root) result_path = Path(result_dir) result_path.mkdir(parents=True, exist_ok=True) all_episode_dirs = _episode_dirs(dataset_path) if not all_episode_dirs: raise RuntimeError(f"no episodes available under {dataset_root}") if not (0 <= template_episode_index < len(all_episode_dirs)): raise ValueError( f"template_episode_index {template_episode_index} outside available range 0..{len(all_episode_dirs) - 1}" ) episode_dirs = all_episode_dirs[episode_offset:] if max_episodes is not None: episode_dirs = episode_dirs[:max_episodes] if not episode_dirs: raise RuntimeError( f"no episodes selected under {dataset_root} with offset={episode_offset} max_episodes={max_episodes}" ) template_episode_dir = all_episode_dirs[template_episode_index] templates, template_frames = _derive_templates(dataset_path, template_episode_dir) with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle: json.dump( { "templates": templates.to_json(), "template_episode": template_episode_dir.name, "template_frames": template_frames, "episode_offset": episode_offset, }, handle, indent=2, ) episode_metrics: List[Dict[str, object]] = [] for episode_dir in episode_dirs: artifacts = _analyze_episode( dataset_path, episode_dir, templates, template_frames, checkpoint_stride=checkpoint_stride, max_frames=max_frames, independent_replay=independent_replay, ) artifacts.dense.to_csv(result_path.joinpath(f"{episode_dir.name}.dense.csv"), index=False) artifacts.keyframes.to_csv( result_path.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False ) with result_path.joinpath(f"{episode_dir.name}.metrics.json").open( "w", encoding="utf-8" ) as handle: json.dump(artifacts.metrics, handle, indent=2) episode_metrics.append(artifacts.metrics) summary = _aggregate_summary(episode_metrics) with result_path.joinpath("summary.json").open("w", encoding="utf-8") as handle: json.dump(summary, handle, indent=2) return summary def main(argv: Optional[Sequence[str]] = None) -> int: parser = argparse.ArgumentParser() parser.add_argument( "--dataset-root", default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", ) parser.add_argument( "--result-dir", default="/workspace/reveal_retrieve_label_study/results/oven_first_pass", ) parser.add_argument("--max-episodes", type=int, default=1) parser.add_argument("--checkpoint-stride", type=int, default=16) parser.add_argument("--max-frames", type=int) parser.add_argument("--episode-offset", type=int, default=0) parser.add_argument("--template-episode-index", type=int, default=0) parser.add_argument("--independent-replay", action="store_true") args = parser.parse_args(argv) summary = run_study( dataset_root=args.dataset_root, result_dir=args.result_dir, max_episodes=args.max_episodes, checkpoint_stride=args.checkpoint_stride, max_frames=args.max_frames, episode_offset=args.episode_offset, template_episode_index=args.template_episode_index, independent_replay=args.independent_replay, ) print(json.dumps(summary, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())