| import argparse |
| import json |
| import math |
| import os |
| import pickle |
| from dataclasses import dataclass, asdict |
| from pathlib import Path |
| from typing import Dict, Iterable, List, Optional, Sequence, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| from natsort import natsorted |
| from PIL import Image |
| from pyrep.const import ConfigurationPathAlgorithms as Algos |
| from pyrep.errors import ConfigurationPathError |
| from pyrep.objects.joint import Joint |
| from pyrep.objects.shape import Shape |
| from rlbench.action_modes.action_mode import BimanualJointPositionActionMode |
| from rlbench.action_modes.gripper_action_modes import BimanualDiscrete |
| from rlbench.backend.const import DEPTH_SCALE |
| from rlbench.backend.utils import image_to_float_array, rgb_handles_to_mask |
| from rlbench.bimanual_tasks.bimanual_take_tray_out_of_oven import ( |
| BimanualTakeTrayOutOfOven, |
| ) |
| from rlbench.demo import Demo |
| from rlbench.environment import Environment |
| from rlbench.observation_config import CameraConfig, ObservationConfig |
| from sklearn.metrics import ( |
| average_precision_score, |
| f1_score, |
| roc_auc_score, |
| ) |
|
|
|
|
| FULL_CAMERA_SET = [ |
| "front", |
| "overhead", |
| "wrist_right", |
| "wrist_left", |
| "over_shoulder_left", |
| "over_shoulder_right", |
| ] |
| THREE_VIEW_SET = ["front", "wrist_right", "wrist_left"] |
| DISPLAY = ":99" |
| DEMO_DT = 0.05 |
| DEFAULT_IMAGE_SIZE = (128, 128) |
| DEFAULT_PATH_SCALE = 0.75 |
| DEFAULT_VISIBILITY_TAU = 0.35 |
| DEFAULT_PEXT_TAU = 0.45 |
| DEFAULT_DOOR_SPEED_TAU = 0.08 |
| DEFAULT_PLAN_TRIALS = 48 |
| DEFAULT_PLAN_MAX_CONFIGS = 4 |
| DEFAULT_PLAN_MAX_TIME_MS = 10 |
| DEFAULT_PLAN_TRIALS_PER_GOAL = 4 |
|
|
|
|
| @dataclass |
| class SimulatorSnapshot: |
| task_state: Tuple[bytes, int] |
| right_arm_tree: bytes |
| right_gripper_tree: bytes |
| left_arm_tree: bytes |
| left_gripper_tree: bytes |
| right_grasped: Tuple[str, ...] |
| left_grasped: Tuple[str, ...] |
|
|
|
|
| @dataclass |
| class ReplayState: |
| frame_index: int |
| tray_pose: np.ndarray |
| door_angle: float |
| right_gripper_pose: np.ndarray |
| left_gripper_pose: np.ndarray |
| right_gripper_open: float |
| left_gripper_open: float |
| snapshot: Optional[SimulatorSnapshot] = None |
|
|
|
|
| @dataclass |
| class MotionTemplates: |
| pregrasp_rel_pose: np.ndarray |
| grasp_rel_pose: np.ndarray |
| retreat_rel_poses: List[np.ndarray] |
| grasp_local_center: np.ndarray |
| grasp_region_extents: np.ndarray |
| hold_open_angle: float |
| open_more_delta: float |
| reference_tray_height: float |
|
|
| def to_json(self) -> Dict[str, object]: |
| return { |
| "pregrasp_rel_pose": self.pregrasp_rel_pose.tolist(), |
| "grasp_rel_pose": self.grasp_rel_pose.tolist(), |
| "retreat_rel_poses": [pose.tolist() for pose in self.retreat_rel_poses], |
| "grasp_local_center": self.grasp_local_center.tolist(), |
| "grasp_region_extents": self.grasp_region_extents.tolist(), |
| "hold_open_angle": float(self.hold_open_angle), |
| "open_more_delta": float(self.open_more_delta), |
| "reference_tray_height": float(self.reference_tray_height), |
| } |
|
|
|
|
| @dataclass |
| class EpisodeArtifacts: |
| episode_name: str |
| dense: pd.DataFrame |
| keyframes: pd.DataFrame |
| metrics: Dict[str, object] |
| template_frames: Dict[str, int] |
|
|
|
|
| def _configure_runtime() -> None: |
| os.environ["DISPLAY"] = DISPLAY |
| os.environ["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim" |
| ld_library_path = os.environ.get("LD_LIBRARY_PATH", "") |
| if "/workspace/coppelia_sim" not in ld_library_path: |
| os.environ["LD_LIBRARY_PATH"] = ( |
| f"{ld_library_path}:/workspace/coppelia_sim" |
| if ld_library_path |
| else "/workspace/coppelia_sim" |
| ) |
| os.environ["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim" |
| os.environ.setdefault("XDG_RUNTIME_DIR", "/tmp/runtime-root") |
|
|
|
|
| def _minimal_camera_config() -> Dict[str, CameraConfig]: |
| return { |
| "front": CameraConfig( |
| rgb=False, |
| depth=False, |
| point_cloud=False, |
| mask=False, |
| image_size=DEFAULT_IMAGE_SIZE, |
| ) |
| } |
|
|
|
|
| def _make_observation_config() -> ObservationConfig: |
| return ObservationConfig( |
| camera_configs=_minimal_camera_config(), |
| joint_velocities=True, |
| joint_positions=True, |
| joint_forces=False, |
| gripper_open=True, |
| gripper_pose=True, |
| gripper_matrix=False, |
| gripper_joint_positions=True, |
| gripper_touch_forces=False, |
| task_low_dim_state=False, |
| robot_name="bimanual", |
| ) |
|
|
|
|
| def _launch_replay_env() -> Environment: |
| _configure_runtime() |
| env = Environment( |
| action_mode=BimanualJointPositionActionMode(), |
| obs_config=_make_observation_config(), |
| headless=True, |
| robot_setup="dual_panda", |
| ) |
| env.launch() |
| return env |
|
|
|
|
| def _load_demo(episode_dir: Path) -> Demo: |
| with episode_dir.joinpath("low_dim_obs.pkl").open("rb") as handle: |
| return pickle.load(handle) |
|
|
|
|
| def _load_descriptions(episode_dir: Path) -> List[str]: |
| with episode_dir.joinpath("variation_descriptions.pkl").open("rb") as handle: |
| return pickle.load(handle) |
|
|
|
|
| def _episode_dirs(dataset_root: Path) -> List[Path]: |
| episodes_dir = dataset_root.joinpath("all_variations", "episodes") |
| return [ |
| episodes_dir.joinpath(name) |
| for name in natsorted(os.listdir(episodes_dir)) |
| if episodes_dir.joinpath(name, "low_dim_obs.pkl").exists() |
| ] |
|
|
|
|
| def _camera_file(episode_dir: Path, camera_name: str, kind: str, frame_index: int) -> Path: |
| if kind == "rgb": |
| return episode_dir.joinpath(f"{camera_name}_rgb", f"rgb_{frame_index:04d}.png") |
| if kind == "depth": |
| return episode_dir.joinpath( |
| f"{camera_name}_depth", f"depth_{frame_index:04d}.png" |
| ) |
| if kind == "mask": |
| return episode_dir.joinpath(f"{camera_name}_mask", f"mask_{frame_index:04d}.png") |
| raise ValueError(f"unknown kind: {kind}") |
|
|
|
|
| def _load_depth_meters(episode_dir: Path, demo: Demo, frame_index: int, camera_name: str) -> np.ndarray: |
| image = Image.open(_camera_file(episode_dir, camera_name, "depth", frame_index)) |
| depth = image_to_float_array(image, DEPTH_SCALE) |
| near = demo[frame_index].misc[f"{camera_name}_camera_near"] |
| far = demo[frame_index].misc[f"{camera_name}_camera_far"] |
| return near + depth * (far - near) |
|
|
|
|
| def _load_mask(episode_dir: Path, frame_index: int, camera_name: str) -> np.ndarray: |
| image = np.array(Image.open(_camera_file(episode_dir, camera_name, "mask", frame_index))) |
| return rgb_handles_to_mask(image) |
|
|
|
|
| def _capture_snapshot(task) -> SimulatorSnapshot: |
| robot = task._scene.robot |
| return SimulatorSnapshot( |
| task_state=task._task.get_state(), |
| right_arm_tree=robot.right_arm.get_configuration_tree(), |
| right_gripper_tree=robot.right_gripper.get_configuration_tree(), |
| left_arm_tree=robot.left_arm.get_configuration_tree(), |
| left_gripper_tree=robot.left_gripper.get_configuration_tree(), |
| right_grasped=tuple(obj.get_name() for obj in robot.right_gripper.get_grasped_objects()), |
| left_grasped=tuple(obj.get_name() for obj in robot.left_gripper.get_grasped_objects()), |
| ) |
|
|
|
|
| def _restore_snapshot(task, snapshot: SimulatorSnapshot) -> None: |
| robot = task._scene.robot |
| robot.release_gripper() |
| task._pyrep.set_configuration_tree(snapshot.right_arm_tree) |
| task._pyrep.set_configuration_tree(snapshot.right_gripper_tree) |
| task._pyrep.set_configuration_tree(snapshot.left_arm_tree) |
| task._pyrep.set_configuration_tree(snapshot.left_gripper_tree) |
| task._pyrep.set_configuration_tree(snapshot.task_state[0]) |
| for name in snapshot.right_grasped: |
| robot.right_gripper.grasp(Shape(name)) |
| for name in snapshot.left_grasped: |
| robot.left_gripper.grasp(Shape(name)) |
| task._pyrep.step() |
|
|
|
|
| def _build_joint_action(target_obs) -> np.ndarray: |
| def _joint_vector(value, fallback) -> np.ndarray: |
| array = np.asarray(fallback if value is None else value, dtype=np.float64) |
| if array.ndim != 1 or array.shape[0] != 7: |
| array = np.asarray(fallback, dtype=np.float64) |
| if array.ndim != 1 or array.shape[0] != 7: |
| raise ValueError(f"invalid joint vector shape: {array.shape}") |
| return array |
|
|
| right = _joint_vector( |
| target_obs.misc.get("right_executed_demo_joint_position_action"), |
| target_obs.right.joint_positions, |
| ) |
| left = _joint_vector( |
| target_obs.misc.get("left_executed_demo_joint_position_action"), |
| target_obs.left.joint_positions, |
| ) |
| return np.concatenate( |
| [ |
| right, |
| np.array([target_obs.right.gripper_open], dtype=np.float64), |
| left, |
| np.array([target_obs.left.gripper_open], dtype=np.float64), |
| ] |
| ) |
|
|
|
|
| class ReplayCache: |
| def __init__(self, task, demo: Demo, checkpoint_stride: int = 16): |
| self.task = task |
| self.demo = demo |
| self.checkpoint_stride = checkpoint_stride |
| self.current_index = 0 |
| self.current_obs = None |
| self.checkpoints: Dict[int, SimulatorSnapshot] = {} |
| self.discrete_gripper = BimanualDiscrete() |
|
|
| def reset(self) -> None: |
| _, self.current_obs = self.task.reset_to_demo(self.demo) |
| self.current_index = 0 |
| self.checkpoints = {0: _capture_snapshot(self.task)} |
|
|
| def step_to(self, target_index: int): |
| if target_index < self.current_index: |
| checkpoint_index = max(i for i in self.checkpoints if i <= target_index) |
| _restore_snapshot(self.task, self.checkpoints[checkpoint_index]) |
| self.current_index = checkpoint_index |
| self.current_obs = self._observation_from_scene() |
| while self.current_index < target_index: |
| next_index = self.current_index + 1 |
| self.task._action_mode.action( |
| self.task._scene, _build_joint_action(self.demo[next_index]) |
| ) |
| self._apply_gripper_replay(self.demo[next_index]) |
| self.current_obs = self._observation_from_scene() |
| self.current_index = next_index |
| if self.current_index % self.checkpoint_stride == 0: |
| self.checkpoints[self.current_index] = _capture_snapshot(self.task) |
| return self.current_obs |
|
|
| def current_state(self) -> ReplayState: |
| return ReplayState( |
| frame_index=self.current_index, |
| tray_pose=Shape("tray").get_pose(), |
| door_angle=Joint("oven_door_joint").get_joint_position(), |
| right_gripper_pose=self.current_obs.right.gripper_pose.copy(), |
| left_gripper_pose=self.current_obs.left.gripper_pose.copy(), |
| right_gripper_open=float(self.current_obs.right.gripper_open), |
| left_gripper_open=float(self.current_obs.left.gripper_open), |
| snapshot=None, |
| ) |
|
|
| def snapshot(self) -> SimulatorSnapshot: |
| return _capture_snapshot(self.task) |
|
|
| def restore(self, snapshot: SimulatorSnapshot) -> None: |
| _restore_snapshot(self.task, snapshot) |
| self.current_obs = self._observation_from_scene() |
|
|
| def restore_to_index(self, snapshot: SimulatorSnapshot, frame_index: int) -> None: |
| self.restore(snapshot) |
| self.current_index = frame_index |
|
|
| def _observation_from_scene(self): |
| return self.task._scene.get_observation() |
|
|
| def _apply_gripper_replay(self, target_obs) -> None: |
| desired = np.array( |
| [target_obs.right.gripper_open, target_obs.left.gripper_open], |
| dtype=np.float64, |
| ) |
| current = np.array( |
| [ |
| float(all(x > 0.9 for x in self.task._scene.robot.right_gripper.get_open_amount())), |
| float(all(x > 0.9 for x in self.task._scene.robot.left_gripper.get_open_amount())), |
| ], |
| dtype=np.float64, |
| ) |
| if not np.allclose(current, desired): |
| self.discrete_gripper.action(self.task._scene, desired) |
| self.current_obs = self._observation_from_scene() |
|
|
|
|
| def _quat_to_matrix(quat: Sequence[float]) -> np.ndarray: |
| x, y, z, w = quat |
| xx, yy, zz = x * x, y * y, z * z |
| xy, xz, yz = x * y, x * z, y * z |
| wx, wy, wz = w * x, w * y, w * z |
| return np.array( |
| [ |
| [1 - 2 * (yy + zz), 2 * (xy - wz), 2 * (xz + wy)], |
| [2 * (xy + wz), 1 - 2 * (xx + zz), 2 * (yz - wx)], |
| [2 * (xz - wy), 2 * (yz + wx), 1 - 2 * (xx + yy)], |
| ], |
| dtype=np.float64, |
| ) |
|
|
|
|
| def _matrix_to_quat(matrix: np.ndarray) -> np.ndarray: |
| trace = np.trace(matrix) |
| if trace > 0.0: |
| s = math.sqrt(trace + 1.0) * 2.0 |
| w = 0.25 * s |
| x = (matrix[2, 1] - matrix[1, 2]) / s |
| y = (matrix[0, 2] - matrix[2, 0]) / s |
| z = (matrix[1, 0] - matrix[0, 1]) / s |
| elif matrix[0, 0] > matrix[1, 1] and matrix[0, 0] > matrix[2, 2]: |
| s = math.sqrt(1.0 + matrix[0, 0] - matrix[1, 1] - matrix[2, 2]) * 2.0 |
| w = (matrix[2, 1] - matrix[1, 2]) / s |
| x = 0.25 * s |
| y = (matrix[0, 1] + matrix[1, 0]) / s |
| z = (matrix[0, 2] + matrix[2, 0]) / s |
| elif matrix[1, 1] > matrix[2, 2]: |
| s = math.sqrt(1.0 + matrix[1, 1] - matrix[0, 0] - matrix[2, 2]) * 2.0 |
| w = (matrix[0, 2] - matrix[2, 0]) / s |
| x = (matrix[0, 1] + matrix[1, 0]) / s |
| y = 0.25 * s |
| z = (matrix[1, 2] + matrix[2, 1]) / s |
| else: |
| s = math.sqrt(1.0 + matrix[2, 2] - matrix[0, 0] - matrix[1, 1]) * 2.0 |
| w = (matrix[1, 0] - matrix[0, 1]) / s |
| x = (matrix[0, 2] + matrix[2, 0]) / s |
| y = (matrix[1, 2] + matrix[2, 1]) / s |
| z = 0.25 * s |
| quat = np.array([x, y, z, w], dtype=np.float64) |
| return quat / np.linalg.norm(quat) |
|
|
|
|
| def _pose_to_matrix(pose: Sequence[float]) -> np.ndarray: |
| matrix = np.eye(4, dtype=np.float64) |
| matrix[:3, :3] = _quat_to_matrix(pose[3:]) |
| matrix[:3, 3] = pose[:3] |
| return matrix |
|
|
|
|
| def _matrix_to_pose(matrix: np.ndarray) -> np.ndarray: |
| return np.concatenate([matrix[:3, 3], _matrix_to_quat(matrix[:3, :3])], axis=0) |
|
|
|
|
| def _relative_pose(reference_pose: Sequence[float], target_pose: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| target = _pose_to_matrix(target_pose) |
| rel = np.linalg.inv(reference) @ target |
| return _matrix_to_pose(rel) |
|
|
|
|
| def _apply_relative_pose(reference_pose: Sequence[float], relative_pose: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| relative = _pose_to_matrix(relative_pose) |
| world = reference @ relative |
| return _matrix_to_pose(world) |
|
|
|
|
| def _world_to_local(reference_pose: Sequence[float], point_world: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| point = np.concatenate([np.asarray(point_world, dtype=np.float64), [1.0]]) |
| return (np.linalg.inv(reference) @ point)[:3] |
|
|
|
|
| def _local_to_world(reference_pose: Sequence[float], point_local: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| point = np.concatenate([np.asarray(point_local, dtype=np.float64), [1.0]]) |
| return (reference @ point)[:3] |
|
|
|
|
| def _first_transition(demo: Demo, side: str, open_to_closed: bool) -> int: |
| values = [getattr(demo[i], side).gripper_open for i in range(len(demo))] |
| for i in range(1, len(values)): |
| if open_to_closed and values[i - 1] > 0.5 and values[i] < 0.5: |
| return i |
| if not open_to_closed and values[i - 1] < 0.5 and values[i] > 0.5: |
| return i |
| raise RuntimeError(f"no gripper transition found for {side}") |
|
|
|
|
| def _derive_templates(dataset_root: Path, template_episode_dir: Path) -> Tuple[MotionTemplates, Dict[str, int]]: |
| env = _launch_replay_env() |
| try: |
| demo = _load_demo(template_episode_dir) |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=8) |
| cache.reset() |
| base_pose = task._task.get_base().get_pose() |
|
|
| left_close = _first_transition(demo, "left", open_to_closed=True) |
| left_open = _first_transition(demo, "left", open_to_closed=False) |
| pregrasp_index = max(0, left_close - 5) |
| right_close = _first_transition(demo, "right", open_to_closed=True) |
| right_open = _first_transition(demo, "right", open_to_closed=False) |
|
|
| interesting = sorted( |
| { |
| pregrasp_index, |
| left_close, |
| min(len(demo) - 1, left_close + 10), |
| min(len(demo) - 1, left_close + 20), |
| min(len(demo) - 1, left_close + 30), |
| max(left_close + 1, left_open - 20), |
| max(left_close + 1, left_open - 10), |
| max(left_close + 1, left_open - 5), |
| right_close, |
| right_open, |
| left_open, |
| } |
| ) |
| states: Dict[int, ReplayState] = {} |
| for frame_index in interesting: |
| cache.step_to(frame_index) |
| states[frame_index] = cache.current_state() |
|
|
| pregrasp_rel_pose = _relative_pose( |
| states[pregrasp_index].tray_pose, states[pregrasp_index].left_gripper_pose |
| ) |
| grasp_rel_pose = _relative_pose( |
| states[left_close].tray_pose, states[left_close].left_gripper_pose |
| ) |
| retreat_rel_poses = [ |
| _relative_pose(base_pose, states[index].left_gripper_pose) |
| for index in interesting |
| if index > left_close |
| ] |
| grasp_local_center = _world_to_local( |
| states[left_close].tray_pose, states[left_close].left_gripper_pose[:3] |
| ) |
| templates = MotionTemplates( |
| pregrasp_rel_pose=pregrasp_rel_pose, |
| grasp_rel_pose=grasp_rel_pose, |
| retreat_rel_poses=retreat_rel_poses, |
| grasp_local_center=grasp_local_center, |
| grasp_region_extents=np.array([0.03, 0.015, 0.004], dtype=np.float64), |
| hold_open_angle=float(states[right_open].door_angle), |
| open_more_delta=max( |
| 0.12, |
| float(states[right_open].door_angle - states[right_close].door_angle) * 0.25, |
| ), |
| reference_tray_height=float(states[left_close].tray_pose[2]), |
| ) |
| template_frames = { |
| "pregrasp": pregrasp_index, |
| "grasp": left_close, |
| "right_close": right_close, |
| "right_open": right_open, |
| } |
| return templates, template_frames |
| finally: |
| env.shutdown() |
|
|
|
|
| def _camera_projection(extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| camera_pos = extrinsics[:3, 3:4] |
| rotation = extrinsics[:3, :3] |
| world_to_camera = np.concatenate([rotation.T, -(rotation.T @ camera_pos)], axis=1) |
| projection = intrinsics @ world_to_camera |
| return projection, world_to_camera |
|
|
|
|
| def _project_points(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| projection, world_to_camera = _camera_projection(extrinsics, intrinsics) |
| homogeneous = np.concatenate([points_world, np.ones((len(points_world), 1))], axis=1) |
| camera_xyz = (world_to_camera @ homogeneous.T).T |
| image_xyz = (projection @ homogeneous.T).T |
| uv = image_xyz[:, :2] / image_xyz[:, 2:3] |
| return uv, camera_xyz |
|
|
|
|
| def _sample_grasp_points(templates: MotionTemplates, tray_pose: np.ndarray) -> np.ndarray: |
| center = templates.grasp_local_center |
| extents = templates.grasp_region_extents |
| xs = np.linspace(center[0] - extents[0], center[0] + extents[0], 9) |
| ys = np.linspace(center[1] - extents[1], center[1] + extents[1], 5) |
| zs = np.linspace(center[2] - extents[2], center[2] + extents[2], 3) |
| points_local = np.array([[x, y, z] for x in xs for y in ys for z in zs], dtype=np.float64) |
| return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) |
|
|
|
|
| def _sample_full_tray_points(tray_pose: np.ndarray) -> np.ndarray: |
| tray = Shape("tray") |
| bbox = np.asarray(tray.get_bounding_box(), dtype=np.float64) |
| xs = np.linspace(bbox[0], bbox[1], 10) |
| ys = np.linspace(bbox[2], bbox[3], 12) |
| zs = np.linspace(bbox[4], bbox[5], 3) |
| points_local = np.array([[x, y, z] for x in xs for y in ys for z in zs], dtype=np.float64) |
| return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) |
|
|
|
|
| def _visibility_ratio( |
| points_world: np.ndarray, |
| depth_m: np.ndarray, |
| mask: np.ndarray, |
| tray_handle: int, |
| extrinsics: np.ndarray, |
| intrinsics: np.ndarray, |
| depth_tol: float = 0.02, |
| ) -> float: |
| uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) |
| height, width = depth_m.shape |
| visible = 0 |
| total = 0 |
| for (u, v), (_, _, camera_depth) in zip(uv, camera_xyz): |
| if camera_depth <= 0: |
| continue |
| if not (0 <= u < width and 0 <= v < height): |
| continue |
| total += 1 |
| px = int(round(u)) |
| py = int(round(v)) |
| px = min(max(px, 0), width - 1) |
| py = min(max(py, 0), height - 1) |
| observed = float(depth_m[py, px]) |
| if abs(observed - camera_depth) <= depth_tol: |
| visible += 1 |
| return float(visible / total) if total else 0.0 |
|
|
|
|
| def _union_visibility(values: Iterable[float]) -> float: |
| product = 1.0 |
| for value in values: |
| product *= 1.0 - float(value) |
| return 1.0 - product |
|
|
|
|
| def _keypoint_discovery(demo: Demo, stopping_delta: float = 0.1) -> List[int]: |
| keypoints: List[int] = [] |
| right_prev = demo[0].right.gripper_open |
| left_prev = demo[0].left.gripper_open |
| stopped_buffer = 0 |
| for i, obs in enumerate(demo._observations): |
| if i < 2 or i >= len(demo) - 1: |
| right_stopped = left_stopped = False |
| else: |
| right_stopped = ( |
| np.allclose(obs.right.joint_velocities, 0, atol=stopping_delta) |
| and obs.right.gripper_open == demo[i + 1].right.gripper_open |
| and obs.right.gripper_open == demo[i - 1].right.gripper_open |
| and demo[i - 2].right.gripper_open == demo[i - 1].right.gripper_open |
| ) |
| left_stopped = ( |
| np.allclose(obs.left.joint_velocities, 0, atol=stopping_delta) |
| and obs.left.gripper_open == demo[i + 1].left.gripper_open |
| and obs.left.gripper_open == demo[i - 1].left.gripper_open |
| and demo[i - 2].left.gripper_open == demo[i - 1].left.gripper_open |
| ) |
| stopped = stopped_buffer <= 0 and right_stopped and left_stopped |
| stopped_buffer = 4 if stopped else stopped_buffer - 1 |
| last = i == len(demo) - 1 |
| state_changed = ( |
| obs.right.gripper_open != right_prev or obs.left.gripper_open != left_prev |
| ) |
| if i != 0 and (state_changed or last or stopped): |
| keypoints.append(i) |
| right_prev = obs.right.gripper_open |
| left_prev = obs.left.gripper_open |
| if len(keypoints) > 1 and (keypoints[-1] - 1) == keypoints[-2]: |
| keypoints.pop(-2) |
| return keypoints |
|
|
|
|
| def _plan_path(scene, arm_name: str, pose: np.ndarray, ignore_collisions: bool = False): |
| arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm |
| try: |
| return arm.get_path( |
| pose[:3], |
| quaternion=pose[3:], |
| ignore_collisions=ignore_collisions, |
| trials=DEFAULT_PLAN_TRIALS, |
| max_configs=DEFAULT_PLAN_MAX_CONFIGS, |
| max_time_ms=DEFAULT_PLAN_MAX_TIME_MS, |
| trials_per_goal=DEFAULT_PLAN_TRIALS_PER_GOAL, |
| algorithm=Algos.RRTConnect, |
| ) |
| except Exception: |
| return None |
|
|
|
|
| def _path_length(path) -> float: |
| if path is None: |
| return math.inf |
| try: |
| return float(path._get_path_point_lengths()[-1]) |
| except Exception: |
| return math.inf |
|
|
|
|
| def _pregrasp_candidates(tray_pose: np.ndarray, templates: MotionTemplates) -> List[np.ndarray]: |
| candidates = [] |
| base = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) |
| candidates.append(base) |
| for dx in (-0.02, 0.02): |
| perturbed = base.copy() |
| perturbed[0] += dx |
| candidates.append(perturbed) |
| return candidates |
|
|
|
|
| def _extract_sequence_poses( |
| tray_pose: np.ndarray, task_base_pose: np.ndarray, templates: MotionTemplates |
| ) -> List[np.ndarray]: |
| poses = [ |
| _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose), |
| _apply_relative_pose(tray_pose, templates.grasp_rel_pose), |
| ] |
| poses.extend( |
| _apply_relative_pose(task_base_pose, pose) for pose in templates.retreat_rel_poses[:3] |
| ) |
| return poses |
|
|
|
|
| def _extract_height_threshold(templates: MotionTemplates) -> float: |
| return templates.reference_tray_height + 0.06 |
|
|
|
|
| def _extraction_progress_score(current_height: float, templates: MotionTemplates) -> float: |
| threshold = _extract_height_threshold(templates) |
| margin = max(0.0, float(current_height) - threshold) |
| |
| return min(1.0, 0.8 + margin / 0.12) |
|
|
|
|
| def _pregrasp_score_and_success(task, templates: MotionTemplates) -> Tuple[float, bool]: |
| tray = Shape("tray") |
| if any( |
| grasped.get_name() == tray.get_name() |
| for grasped in task._scene.robot.left_gripper.get_grasped_objects() |
| ): |
| return 1.0, True |
| tray_pose = Shape("tray").get_pose() |
| best = 0.0 |
| success = False |
| for pose in _pregrasp_candidates(tray_pose, templates): |
| path = _plan_path(task._scene, "left", pose, ignore_collisions=False) |
| length = _path_length(path) |
| if np.isfinite(length): |
| success = True |
| best = max(best, math.exp(-length / DEFAULT_PATH_SCALE)) |
| return best, success |
|
|
|
|
| def _extract_score_and_success(task, templates: MotionTemplates) -> Tuple[float, bool]: |
| tray = Shape("tray") |
| robot = task._scene.robot |
| snapshot = _capture_snapshot(task) |
| try: |
| total_length = 0.0 |
| current_height = float(tray.get_position()[2]) |
| already_grasped = any( |
| grasped.get_name() == tray.get_name() |
| for grasped in robot.left_gripper.get_grasped_objects() |
| ) |
| if already_grasped and current_height >= _extract_height_threshold(templates): |
| return _extraction_progress_score(current_height, templates), True |
| poses = _extract_sequence_poses( |
| tray.get_pose(), task._task.get_base().get_pose(), templates |
| ) |
| approach_poses = [] if already_grasped else poses[:2] |
| retreat_poses = poses[2:] if not already_grasped else poses[2:] |
|
|
| for pose in approach_poses: |
| path = _plan_path(task._scene, "left", pose, ignore_collisions=False) |
| if path is None: |
| return 0.0, False |
| total_length += _path_length(path) |
| path.set_to_end(disable_dynamics=True) |
| task._pyrep.step() |
| if not already_grasped: |
| robot.left_gripper.grasp(tray) |
| for pose in retreat_poses: |
| path = _plan_path(task._scene, "left", pose, ignore_collisions=True) |
| if path is None: |
| return 0.0, False |
| total_length += _path_length(path) |
| path.set_to_end(disable_dynamics=True) |
| task._pyrep.step() |
| final_height = float(tray.get_position()[2]) |
| success = final_height >= _extract_height_threshold(templates) |
| score = max( |
| math.exp(-total_length / (DEFAULT_PATH_SCALE * 2.5)), |
| _extraction_progress_score(final_height, templates) if success else 0.0, |
| ) |
| return score, bool(success) |
| finally: |
| _restore_snapshot(task, snapshot) |
|
|
|
|
| def _wait_branch(task, steps: int = 5) -> None: |
| for _ in range(steps): |
| task._scene.step() |
|
|
|
|
| def _open_more_branch(task, templates: MotionTemplates) -> None: |
| joint = Joint("oven_door_joint") |
| current = joint.get_joint_position() |
| joint.set_joint_position(current - templates.open_more_delta, disable_dynamics=True) |
| for _ in range(3): |
| task._pyrep.step() |
|
|
|
|
| def _hold_open_branch(task, templates: MotionTemplates) -> None: |
| joint = Joint("oven_door_joint") |
| current = joint.get_joint_position() |
| joint.set_joint_position(min(current, templates.hold_open_angle), disable_dynamics=True) |
| for _ in range(3): |
| task._pyrep.step() |
|
|
|
|
| def _frame_metrics( |
| episode_dir: Path, |
| demo: Demo, |
| frame_state: ReplayState, |
| templates: MotionTemplates, |
| tray_handle: int, |
| ) -> Dict[str, float]: |
| grasp_points = _sample_grasp_points(templates, frame_state.tray_pose) |
| full_tray_points = _sample_full_tray_points(frame_state.tray_pose) |
| camera_values: Dict[str, Dict[str, float]] = {} |
| for camera_name in FULL_CAMERA_SET: |
| depth_m = _load_depth_meters(episode_dir, demo, frame_state.frame_index, camera_name) |
| mask = _load_mask(episode_dir, frame_state.frame_index, camera_name) |
| extrinsics = demo[frame_state.frame_index].misc[f"{camera_name}_camera_extrinsics"] |
| intrinsics = demo[frame_state.frame_index].misc[f"{camera_name}_camera_intrinsics"] |
| camera_values[camera_name] = { |
| "grasp_visibility": _visibility_ratio( |
| grasp_points, depth_m, mask, tray_handle, extrinsics, intrinsics |
| ), |
| "tray_visibility": _visibility_ratio( |
| full_tray_points, depth_m, mask, tray_handle, extrinsics, intrinsics |
| ), |
| } |
|
|
| values: Dict[str, float] = {} |
| for name, camera_set in {"three_view": THREE_VIEW_SET, "full_view": FULL_CAMERA_SET}.items(): |
| values[f"{name}_visibility"] = _union_visibility( |
| camera_values[camera_name]["grasp_visibility"] for camera_name in camera_set |
| ) |
| values[f"{name}_whole_tray_visibility"] = _union_visibility( |
| camera_values[camera_name]["tray_visibility"] for camera_name in camera_set |
| ) |
| return values |
|
|
|
|
| def _compute_frame_row_isolated( |
| episode_dir: Path, |
| demo: Demo, |
| templates: MotionTemplates, |
| checkpoint_stride: int, |
| frame_index: int, |
| ) -> Dict[str, float]: |
| env = _launch_replay_env() |
| try: |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) |
| cache.reset() |
| cache.step_to(frame_index) |
| state = cache.current_state() |
| visibility = _frame_metrics(episode_dir, demo, state, templates, Shape("tray").get_handle()) |
| p_pre, y_pre = _pregrasp_score_and_success(task, templates) |
| p_ext, y_ext = _extract_score_and_success(task, templates) |
| return { |
| "frame_index": frame_index, |
| "time_norm": frame_index / max(1, len(demo) - 1), |
| "door_angle": state.door_angle, |
| "right_gripper_open": state.right_gripper_open, |
| "left_gripper_open": state.left_gripper_open, |
| "p_pre": p_pre, |
| "p_ext": p_ext, |
| "y_pre": float(bool(y_pre)), |
| "y_ext": float(bool(y_ext)), |
| **visibility, |
| } |
| finally: |
| env.shutdown() |
|
|
|
|
| def _safe_auc(y_true: np.ndarray, y_score: np.ndarray) -> float: |
| if len(np.unique(y_true)) < 2: |
| return float("nan") |
| return float(roc_auc_score(y_true, y_score)) |
|
|
|
|
| def _safe_auprc(y_true: np.ndarray, y_score: np.ndarray) -> float: |
| if len(np.unique(y_true)) < 2: |
| return float("nan") |
| return float(average_precision_score(y_true, y_score)) |
|
|
|
|
| def _first_crossing(values: np.ndarray, threshold: float) -> int: |
| above = np.flatnonzero(values >= threshold) |
| return int(above[0]) if len(above) else -1 |
|
|
|
|
| def _transition_count(binary_values: np.ndarray) -> Tuple[int, int]: |
| diffs = np.diff(binary_values.astype(int)) |
| return int(np.sum(diffs == 1)), int(np.sum(diffs == -1)) |
|
|
|
|
| def _keyframe_subset(frame_df: pd.DataFrame, keyframes: Sequence[int]) -> pd.DataFrame: |
| key_df = frame_df.iloc[list(keyframes)].copy() |
| key_df["keyframe_ordinal"] = np.arange(len(key_df)) |
| return key_df |
|
|
|
|
| def _interventional_validity( |
| task, |
| cache: ReplayCache, |
| episode_dir: Path, |
| demo: Demo, |
| templates: MotionTemplates, |
| tray_handle: int, |
| frame_df: pd.DataFrame, |
| ) -> Dict[str, float]: |
| ready_indices = np.flatnonzero(frame_df["y_ready"].to_numpy(dtype=bool)) |
| ready_onset = int(ready_indices[0]) if len(ready_indices) else len(frame_df) // 2 |
| initial_snapshot = cache.snapshot() |
| sample_indices = sorted( |
| { |
| max(0, ready_onset - 10), |
| max(0, ready_onset - 5), |
| ready_onset, |
| min(len(frame_df) - 1, ready_onset + 20), |
| } |
| ) |
| stats = { |
| "pre_ready_open_more_increases_pext": 0, |
| "pre_ready_open_more_trials": 0, |
| "pre_ready_hold_open_increases_pext": 0, |
| "pre_ready_hold_open_trials": 0, |
| "pre_ready_extract_success": 0, |
| "pre_ready_extract_trials": 0, |
| "pre_ready_wait_extract_success": 0, |
| "pre_ready_wait_trials": 0, |
| "post_ready_extract_success": 0, |
| "post_ready_extract_trials": 0, |
| "post_ready_open_more_low_gain": 0, |
| "post_ready_open_more_trials": 0, |
| "post_ready_hold_open_low_gain": 0, |
| "post_ready_hold_open_trials": 0, |
| } |
| for frame_index in sample_indices: |
| cache.restore_to_index(initial_snapshot, 0) |
| cache.step_to(frame_index) |
| snapshot = cache.snapshot() |
| base_pext, base_extract_success = _extract_score_and_success(task, templates) |
| pre_ready = not bool(frame_df.iloc[frame_index]["y_ready"]) |
|
|
| _restore_snapshot(task, snapshot) |
| _open_more_branch(task, templates) |
| open_pext, _ = _extract_score_and_success(task, templates) |
|
|
| _restore_snapshot(task, snapshot) |
| _hold_open_branch(task, templates) |
| hold_pext, _ = _extract_score_and_success(task, templates) |
|
|
| _restore_snapshot(task, snapshot) |
| _wait_branch(task) |
| _, wait_extract_success = _extract_score_and_success(task, templates) |
| _restore_snapshot(task, snapshot) |
|
|
| if pre_ready: |
| stats["pre_ready_open_more_trials"] += 1 |
| stats["pre_ready_hold_open_trials"] += 1 |
| stats["pre_ready_extract_trials"] += 1 |
| stats["pre_ready_wait_trials"] += 1 |
| if open_pext > base_pext: |
| stats["pre_ready_open_more_increases_pext"] += 1 |
| if hold_pext > base_pext: |
| stats["pre_ready_hold_open_increases_pext"] += 1 |
| if base_extract_success: |
| stats["pre_ready_extract_success"] += 1 |
| if wait_extract_success: |
| stats["pre_ready_wait_extract_success"] += 1 |
| else: |
| stats["post_ready_extract_trials"] += 1 |
| stats["post_ready_open_more_trials"] += 1 |
| stats["post_ready_hold_open_trials"] += 1 |
| if base_extract_success: |
| stats["post_ready_extract_success"] += 1 |
| if (open_pext - base_pext) <= 0.05: |
| stats["post_ready_open_more_low_gain"] += 1 |
| if (hold_pext - base_pext) <= 0.05: |
| stats["post_ready_hold_open_low_gain"] += 1 |
| return { |
| key: float(value) for key, value in stats.items() |
| } |
|
|
|
|
| def _analyze_episode( |
| dataset_root: Path, |
| episode_dir: Path, |
| templates: MotionTemplates, |
| template_frames: Dict[str, int], |
| checkpoint_stride: int = 16, |
| max_frames: Optional[int] = None, |
| independent_replay: bool = False, |
| ) -> EpisodeArtifacts: |
| env = _launch_replay_env() |
| try: |
| demo = _load_demo(episode_dir) |
| descriptions = _load_descriptions(episode_dir) |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) |
| cache.reset() |
| tray_handle = Shape("tray").get_handle() |
| num_frames = len(demo) if max_frames is None else min(len(demo), max_frames) |
|
|
| rows: List[Dict[str, float]] = [] |
| door_angles: List[float] = [] |
| initial_snapshot = cache.checkpoints[0] if independent_replay else None |
| for frame_index in range(num_frames): |
| if independent_replay: |
| cache.restore_to_index(initial_snapshot, 0) |
| cache.step_to(frame_index) |
| frame_snapshot = cache.snapshot() if not independent_replay else None |
| state = cache.current_state() |
| door_angles.append(state.door_angle) |
| visibility = _frame_metrics(episode_dir, demo, state, templates, tray_handle) |
| p_pre, y_pre = _pregrasp_score_and_success(task, templates) |
| p_ext, y_ext = _extract_score_and_success(task, templates) |
| rows.append( |
| { |
| "frame_index": frame_index, |
| "time_norm": frame_index / max(1, num_frames - 1), |
| "door_angle": state.door_angle, |
| "right_gripper_open": state.right_gripper_open, |
| "left_gripper_open": state.left_gripper_open, |
| "p_pre": p_pre, |
| "p_ext": p_ext, |
| "y_pre": float(bool(y_pre)), |
| "y_ext": float(bool(y_ext)), |
| **visibility, |
| } |
| ) |
| if frame_snapshot is not None: |
| cache.restore(frame_snapshot) |
| if (frame_index + 1) % 25 == 0 or (frame_index + 1) == num_frames: |
| print( |
| f"[{episode_dir.name}] analyzed {frame_index + 1}/{num_frames} dense frames", |
| flush=True, |
| ) |
|
|
| frame_df = pd.DataFrame(rows) |
| door_speed = np.gradient(frame_df["door_angle"].to_numpy(), DEMO_DT) |
| frame_df["door_speed_abs"] = np.abs(door_speed) |
| y_ext_binary = frame_df["y_ext"].to_numpy(dtype=bool) |
| y_ready = np.zeros(len(frame_df), dtype=bool) |
| for i in range(len(frame_df)): |
| window = y_ext_binary[i : i + 3] |
| if len(window) == 3 and np.all(window) and frame_df.iloc[i]["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU: |
| y_ready[i] = True |
| if np.any(y_ready): |
| first_ready = int(np.flatnonzero(y_ready)[0]) |
| y_ready[first_ready:] = True |
| frame_df["y_ready"] = y_ready.astype(float) |
| phase_trigger = ( |
| (frame_df["full_view_visibility"] >= DEFAULT_VISIBILITY_TAU) |
| & (frame_df["p_ext"] >= DEFAULT_PEXT_TAU) |
| & (frame_df["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU) |
| ).to_numpy(dtype=bool) |
| phase_switch = np.zeros(len(frame_df), dtype=bool) |
| if np.any(phase_trigger): |
| first_phase = int(np.flatnonzero(phase_trigger)[0]) |
| phase_switch[first_phase:] = True |
| frame_df["phase_switch"] = phase_switch.astype(float) |
|
|
| keyframes = [index for index in _keypoint_discovery(demo) if index < num_frames] |
| key_df = _keyframe_subset(frame_df, keyframes) |
|
|
| y_pre_arr = frame_df["y_pre"].to_numpy(dtype=int) |
| y_ext_arr = frame_df["y_ext"].to_numpy(dtype=int) |
| y_ready_arr = frame_df["y_ready"].to_numpy(dtype=int) |
| p_pre_arr = frame_df["p_pre"].to_numpy(dtype=float) |
| p_ext_arr = frame_df["p_ext"].to_numpy(dtype=float) |
| phase_arr = frame_df["phase_switch"].to_numpy(dtype=int) |
| full_vis = frame_df["full_view_visibility"].to_numpy(dtype=float) |
| whole_vis = frame_df["full_view_whole_tray_visibility"].to_numpy(dtype=float) |
| door_angle_arr = frame_df["door_angle"].to_numpy(dtype=float) |
| time_arr = frame_df["time_norm"].to_numpy(dtype=float) |
|
|
| ppre_cross = _first_crossing(p_pre_arr, DEFAULT_PEXT_TAU) |
| pext_cross = _first_crossing(p_ext_arr, DEFAULT_PEXT_TAU) |
| phase_cross = _first_crossing(frame_df["phase_switch"].to_numpy(dtype=float), 0.5) |
| ready_cross = _first_crossing(y_ready_arr.astype(float), 0.5) |
| phase_rises, phase_falls = _transition_count(phase_arr) |
|
|
| key_phase_cross = _first_crossing(key_df["phase_switch"].to_numpy(dtype=float), 0.5) |
| key_ready_cross = _first_crossing(key_df["y_ready"].to_numpy(dtype=float), 0.5) |
|
|
| interventions = _interventional_validity( |
| task, cache, episode_dir, demo, templates, tray_handle, frame_df |
| ) |
| metrics = { |
| "episode_name": episode_dir.name, |
| "description": descriptions[0], |
| "num_dense_frames": int(num_frames), |
| "num_keyframes": int(len(key_df)), |
| "phase_switch_rises": int(phase_rises), |
| "phase_switch_falls": int(phase_falls), |
| "ppre_cross_frame": int(ppre_cross), |
| "pext_cross_frame": int(pext_cross), |
| "phase_cross_frame": int(phase_cross), |
| "ready_cross_frame": int(ready_cross), |
| "ordering_ok": bool(ppre_cross == -1 or pext_cross == -1 or ppre_cross <= pext_cross), |
| "dense_boundary_error_frames": float(abs(phase_cross - ready_cross)) |
| if phase_cross >= 0 and ready_cross >= 0 |
| else float("nan"), |
| "dense_boundary_error_fraction": float(abs(phase_cross - ready_cross) / len(frame_df)) |
| if phase_cross >= 0 and ready_cross >= 0 |
| else float("nan"), |
| "key_boundary_error_keyframes": float(abs(key_phase_cross - key_ready_cross)) |
| if key_phase_cross >= 0 and key_ready_cross >= 0 |
| else float("nan"), |
| "auroc_vret_ypre_three": _safe_auc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), |
| "auprc_vret_ypre_three": _safe_auprc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), |
| "auroc_vret_ypre_full": _safe_auc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), |
| "auprc_vret_ypre_full": _safe_auprc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), |
| "auroc_ppre_ypre": _safe_auc(y_pre_arr, p_pre_arr), |
| "auprc_ppre_ypre": _safe_auprc(y_pre_arr, p_pre_arr), |
| "auroc_pext_yext": _safe_auc(y_ext_arr, p_ext_arr), |
| "auprc_pext_yext": _safe_auprc(y_ext_arr, p_ext_arr), |
| "auroc_phase_yready": _safe_auc(y_ready_arr, frame_df["phase_switch"].to_numpy(dtype=float)), |
| "auprc_phase_yready": _safe_auprc(y_ready_arr, frame_df["phase_switch"].to_numpy(dtype=float)), |
| "f1_phase_yready": float(f1_score(y_ready_arr, phase_arr)) |
| if np.any(y_ready_arr) and np.any(phase_arr) |
| else float("nan"), |
| "baseline_auroc_door_yext": _safe_auc(y_ext_arr, door_angle_arr), |
| "baseline_auprc_door_yext": _safe_auprc(y_ext_arr, door_angle_arr), |
| "baseline_auroc_time_yext": _safe_auc(y_ext_arr, time_arr), |
| "baseline_auprc_time_yext": _safe_auprc(y_ext_arr, time_arr), |
| "baseline_auroc_whole_vis_yext": _safe_auc(y_ext_arr, whole_vis), |
| "baseline_auprc_whole_vis_yext": _safe_auprc(y_ext_arr, whole_vis), |
| **interventions, |
| } |
| return EpisodeArtifacts( |
| episode_name=episode_dir.name, |
| dense=frame_df, |
| keyframes=key_df, |
| metrics=metrics, |
| template_frames=template_frames, |
| ) |
| finally: |
| env.shutdown() |
|
|
|
|
| def _aggregate_summary(episode_metrics: List[Dict[str, object]]) -> Dict[str, object]: |
| frame = pd.DataFrame(episode_metrics) |
| numeric = frame.select_dtypes(include=[np.number]) |
| summary = { |
| "num_episodes": int(len(frame)), |
| "mean_metrics": numeric.mean(numeric_only=True).to_dict(), |
| "median_metrics": numeric.median(numeric_only=True).to_dict(), |
| "single_switch_rate": float((frame["phase_switch_rises"] == 1).mean()) |
| if len(frame) |
| else float("nan"), |
| "reversion_rate": float((frame["phase_switch_falls"] > 0).mean()) |
| if len(frame) |
| else float("nan"), |
| "ordering_ok_rate": float(frame["ordering_ok"].mean()) if len(frame) else float("nan"), |
| } |
| return summary |
|
|
|
|
| def run_study( |
| dataset_root: str, |
| result_dir: str, |
| max_episodes: Optional[int] = None, |
| checkpoint_stride: int = 16, |
| max_frames: Optional[int] = None, |
| episode_offset: int = 0, |
| template_episode_index: int = 0, |
| independent_replay: bool = False, |
| ) -> Dict[str, object]: |
| dataset_path = Path(dataset_root) |
| result_path = Path(result_dir) |
| result_path.mkdir(parents=True, exist_ok=True) |
|
|
| all_episode_dirs = _episode_dirs(dataset_path) |
| if not all_episode_dirs: |
| raise RuntimeError(f"no episodes available under {dataset_root}") |
| if not (0 <= template_episode_index < len(all_episode_dirs)): |
| raise ValueError( |
| f"template_episode_index {template_episode_index} outside available range 0..{len(all_episode_dirs) - 1}" |
| ) |
|
|
| episode_dirs = all_episode_dirs[episode_offset:] |
| if max_episodes is not None: |
| episode_dirs = episode_dirs[:max_episodes] |
| if not episode_dirs: |
| raise RuntimeError( |
| f"no episodes selected under {dataset_root} with offset={episode_offset} max_episodes={max_episodes}" |
| ) |
|
|
| template_episode_dir = all_episode_dirs[template_episode_index] |
| templates, template_frames = _derive_templates(dataset_path, template_episode_dir) |
| with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle: |
| json.dump( |
| { |
| "templates": templates.to_json(), |
| "template_episode": template_episode_dir.name, |
| "template_frames": template_frames, |
| "episode_offset": episode_offset, |
| }, |
| handle, |
| indent=2, |
| ) |
|
|
| episode_metrics: List[Dict[str, object]] = [] |
| for episode_dir in episode_dirs: |
| artifacts = _analyze_episode( |
| dataset_path, |
| episode_dir, |
| templates, |
| template_frames, |
| checkpoint_stride=checkpoint_stride, |
| max_frames=max_frames, |
| independent_replay=independent_replay, |
| ) |
| artifacts.dense.to_csv(result_path.joinpath(f"{episode_dir.name}.dense.csv"), index=False) |
| artifacts.keyframes.to_csv( |
| result_path.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False |
| ) |
| with result_path.joinpath(f"{episode_dir.name}.metrics.json").open( |
| "w", encoding="utf-8" |
| ) as handle: |
| json.dump(artifacts.metrics, handle, indent=2) |
| episode_metrics.append(artifacts.metrics) |
|
|
| summary = _aggregate_summary(episode_metrics) |
| with result_path.joinpath("summary.json").open("w", encoding="utf-8") as handle: |
| json.dump(summary, handle, indent=2) |
| return summary |
|
|
|
|
| def main(argv: Optional[Sequence[str]] = None) -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--dataset-root", |
| default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", |
| ) |
| parser.add_argument( |
| "--result-dir", |
| default="/workspace/reveal_retrieve_label_study/results/oven_first_pass", |
| ) |
| parser.add_argument("--max-episodes", type=int, default=1) |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--max-frames", type=int) |
| parser.add_argument("--episode-offset", type=int, default=0) |
| parser.add_argument("--template-episode-index", type=int, default=0) |
| parser.add_argument("--independent-replay", action="store_true") |
| args = parser.parse_args(argv) |
|
|
| summary = run_study( |
| dataset_root=args.dataset_root, |
| result_dir=args.result_dir, |
| max_episodes=args.max_episodes, |
| checkpoint_stride=args.checkpoint_stride, |
| max_frames=args.max_frames, |
| episode_offset=args.episode_offset, |
| template_episode_index=args.template_episode_index, |
| independent_replay=args.independent_replay, |
| ) |
| print(json.dumps(summary, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|