| | from typing import Any, Dict, Union |
| |
|
| | import numpy as np |
| | import sapien |
| | import torch |
| |
|
| | import mani_skill.envs.utils.randomization as randomization |
| | from mani_skill.agents.robots import SO100, Fetch, Panda |
| | from mani_skill.envs.sapien_env import BaseEnv |
| | from mani_skill.envs.tasks.tabletop.pick_cube_cfgs import PICK_CUBE_CONFIGS |
| | from mani_skill.sensors.camera import CameraConfig |
| | from mani_skill.utils import sapien_utils |
| | from mani_skill.utils.building import actors |
| | from mani_skill.utils.registration import register_env |
| | from mani_skill.utils.scene_builder.table import TableSceneBuilder |
| | from mani_skill.utils.structs.pose import Pose |
| |
|
| | |
| | import matplotlib.pyplot as plt |
| |
|
| | import random |
| | from mani_skill.utils.geometry.rotation_conversions import ( |
| | euler_angles_to_matrix, |
| | matrix_to_quaternion, |
| | ) |
| | from ...logging_utils import logger |
| |
|
| | TARGET_GRAY = (np.array([128, 128, 128, 255]) / 255).tolist() |
| |
|
| |
|
| |
|
| | def _vec3_of(self, actor): |
| | """Get actor's (x,y,z), compatible with torch / numpy / sapien""" |
| | p = actor.pose.p if hasattr(actor, "pose") else actor.get_pose().p |
| | try: |
| | import torch, numpy as np |
| | if isinstance(p, torch.Tensor): |
| | p = p.detach().cpu().numpy() |
| | p = np.asarray(p).reshape(-1) |
| | except Exception: |
| | p = [float(p[0]), float(p[1]), float(p[2])] |
| | return float(p[0]), float(p[1]), float(p[2]) |
| |
|
| |
|
| | def swap_flat_two_lane( |
| | self, |
| | cube_a, cube_b, |
| | start_step: int, end_step: int, |
| | cur_step: int, |
| | ax=None, ay=None, bx=None, by=None, |
| | z=None, |
| | lane_offset=0.05, |
| | keep_upright=True, |
| | smooth=True, |
| | lock_cube_offset=True, |
| | other_cube=None, |
| | ): |
| | """ |
| | Perform "lane change" swap on same plane (A<->B): |
| | - Use [start_step, end_step] to control time window; |
| | - Automatically capture endpoint poses when entering window for first time, fixed for entire animation; |
| | - Auto no-op outside window; clear cache when reaching end_step. |
| | - If other_bins provided, keep them in original position at each timestep to prevent collision |
| | - other_bins can be single bin or list of bins |
| | """ |
| |
|
| | import math |
| | import numpy as np |
| | import sapien |
| |
|
| | |
| | if cur_step < int(start_step) or cur_step > int(end_step): |
| | return |
| |
|
| | |
| | if not hasattr(self, "_two_lane_swaps"): |
| | self._two_lane_swaps = {} |
| | |
| | key = (id(cube_a), id(cube_b), int(start_step), int(end_step)) |
| |
|
| | |
| | if cur_step == int(start_step) or key not in self._two_lane_swaps: |
| | ax0, ay0, az0 = _vec3_of(self,cube_a) |
| | bx0, by0, bz0 = _vec3_of(self,cube_b) |
| |
|
| | |
| | qa0 = cube_a.pose.q if hasattr(cube_a, "pose") else cube_a.get_pose().q |
| | qb0 = cube_b.pose.q if hasattr(cube_b, "pose") else cube_b.get_pose().q |
| |
|
| | |
| | try: |
| | import torch |
| | if isinstance(qa0, torch.Tensor): |
| | qa0 = qa0.detach().cpu().numpy() |
| | if isinstance(qb0, torch.Tensor): |
| | qb0 = qb0.detach().cpu().numpy() |
| |
|
| | |
| | qa0 = np.asarray(qa0, dtype=np.float32).flatten() |
| | qb0 = np.asarray(qb0, dtype=np.float32).flatten() |
| | except Exception: |
| | pass |
| |
|
| | if ax is None or ay is None or bx is None or by is None: |
| | ax_c, ay_c, bx_c, by_c = ax0, ay0, bx0, by0 |
| | else: |
| | ax_c, ay_c, bx_c, by_c = float(ax), float(ay), float(bx), float(by) |
| |
|
| | if z is None: |
| | z_a0, z_b0 = az0, bz0 |
| | else: |
| | z_a0 = z_b0 = float(z) |
| |
|
| | |
| | other_bins_poses = [] |
| | if other_cube is not None: |
| | |
| | bins_to_lock = other_cube if isinstance(other_cube, list) else [other_cube] |
| |
|
| | for bin_obj in bins_to_lock: |
| | bx0, by0, bz0 = _vec3_of(self, bin_obj) |
| | qb_lock = bin_obj.pose.q if hasattr(bin_obj, "pose") else bin_obj.get_pose().q |
| |
|
| | try: |
| | import torch |
| | if isinstance(qb_lock, torch.Tensor): |
| | qb_lock = qb_lock.detach().cpu().numpy() |
| | qb_lock = np.asarray(qb_lock, dtype=np.float32).flatten() |
| | except Exception: |
| | pass |
| |
|
| | other_bins_poses.append({ |
| | "bin": bin_obj, |
| | "x": bx0, |
| | "y": by0, |
| | "z": bz0, |
| | "q": qb_lock |
| | }) |
| |
|
| | self._two_lane_swaps[key] = { |
| | "ax": ax_c, "ay": ay_c, "az": z_a0, "qa": qa0, |
| | "bx": bx_c, "by": by_c, "bz": z_b0, "qb": qb0, |
| | "other_bins_poses": other_bins_poses |
| | } |
| |
|
| | |
| | ax = self._two_lane_swaps[key]["ax"] |
| | ay = self._two_lane_swaps[key]["ay"] |
| | z_a0 = self._two_lane_swaps[key]["az"] |
| | qa0 = self._two_lane_swaps[key]["qa"] |
| | bx = self._two_lane_swaps[key]["bx"] |
| | by = self._two_lane_swaps[key]["by"] |
| | z_b0 = self._two_lane_swaps[key]["bz"] |
| | qb0 = self._two_lane_swaps[key]["qb"] |
| | other_bins_poses = self._two_lane_swaps[key]["other_bins_poses"] |
| |
|
| | |
| | denom = max(1, int(end_step) - int(start_step)) |
| | alpha = (int(cur_step) - int(start_step)) / denom |
| | alpha = float(np.clip(alpha, 0.0, 1.0)) |
| | if smooth: |
| | alpha = alpha * alpha * (3.0 - 2.0 * alpha) |
| |
|
| | |
| | dx, dy = (bx - ax), (by - ay) |
| | nx, ny = -dy, dx |
| | n_norm = (nx * nx + ny * ny) ** 0.5 |
| | if n_norm > 1e-9: |
| | nx, ny = nx / n_norm, ny / n_norm |
| | else: |
| | nx, ny = 0.0, 0.0 |
| |
|
| | |
| | offset = float(lane_offset) * math.sin(math.pi * alpha) |
| |
|
| | |
| | if other_bins_poses: |
| | for pose_data in other_bins_poses: |
| | try: |
| | pose_data["bin"].set_pose(sapien.Pose( |
| | p=[pose_data["x"], pose_data["y"], pose_data["z"]], |
| | q=pose_data["q"] |
| | )) |
| | except Exception as e: |
| | logger.debug(f"Failed to set pose for locked bin: {e}") |
| |
|
| | |
| | if int(cur_step) >= int(end_step): |
| | cube_a.set_pose(sapien.Pose(p=[bx, by, z_a0], q=qb0)) |
| | cube_b.set_pose(sapien.Pose(p=[ax, ay, z_b0], q=qa0)) |
| |
|
| | |
| | if key in self._two_lane_swaps: |
| | del self._two_lane_swaps[key] |
| | return |
| |
|
| | |
| | xa = ax + dx * alpha + nx * offset |
| | ya = ay + dy * alpha + ny * offset |
| | xb = bx - dx * alpha - nx * offset |
| | yb = by - dy * alpha - ny * offset |
| |
|
| | |
| | |
| | try: |
| | |
| | qa_current = qa0 * (1 - alpha) + qb0 * alpha |
| | qb_current = qb0 * (1 - alpha) + qa0 * alpha |
| |
|
| | |
| | qa_norm = np.linalg.norm(qa_current) |
| | qb_norm = np.linalg.norm(qb_current) |
| | if qa_norm > 1e-6: |
| | qa_current = qa_current / qa_norm |
| | if qb_norm > 1e-6: |
| | qb_current = qb_current / qb_norm |
| |
|
| | cube_a.set_pose(sapien.Pose(p=[xa, ya, z_a0], q=qa_current)) |
| | cube_b.set_pose(sapien.Pose(p=[xb, yb, z_b0], q=qb_current)) |
| |
|
| | except Exception as e: |
| | |
| | cube_a.set_pose(sapien.Pose(p=[xa, ya, z_a0], q=qa0)) |
| | cube_b.set_pose(sapien.Pose(p=[xb, yb, z_b0], q=qb0)) |
| | |
| | def highlight_obj(self, obj, start_step: int, end_step: int, cur_step: int, |
| | highlight_color=None, disk_radius=0.05,disk_half_length=0.001, |
| | use_target_style: bool = False): |
| | """ |
| | Highlight an object during specified timesteps by adding/removing a visible disk below it. |
| | |
| | Args: |
| | obj: The object to highlight |
| | start_step: Start timestep for highlighting |
| | end_step: End timestep for highlighting |
| | cur_step: Current timestep |
| | highlight_color: RGBA color for the disk (default: white) |
| | disk_radius: Radius of the highlight disk |
| | use_target_style: When True, draw concentric target-style rings instead of a solid disk |
| | """ |
| | if highlight_color is None: |
| | highlight_color = TARGET_GRAY if use_target_style else [1.0, 1.0, 1.0, 1.0] |
| |
|
| | |
| | if not hasattr(self, "_highlight_cache"): |
| | self._highlight_cache = {} |
| |
|
| | obj_id = id(obj) |
| |
|
| | |
| | if obj_id not in self._highlight_cache: |
| | self._highlight_cache[obj_id] = { |
| | 'highlight_disk': None, |
| | 'is_highlighted': False, |
| | 'highlight_color_key': None, |
| | } |
| |
|
| | def _to_int(value): |
| | if isinstance(value, torch.Tensor): |
| | if value.numel() == 0: |
| | return 0 |
| | value = value.reshape(-1)[0].item() |
| | elif isinstance(value, np.ndarray): |
| | value = float(np.asarray(value).reshape(-1)[0]) |
| | return int(value) |
| |
|
| | def _set_visibility(disk_actor, visible: bool): |
| | try: |
| | for visual_body in disk_actor.visual_bodies: |
| | visual_body.set_visibility(1.0 if visible else 0.0) |
| | except Exception: |
| | pass |
| |
|
| | def _ensure_disk_position(disk_actor): |
| | try: |
| | obj_pos = obj.pose.p |
| | if hasattr(obj_pos, "cpu"): |
| | obj_pos = obj_pos.cpu().numpy() |
| | elif hasattr(obj_pos, "numpy"): |
| | obj_pos = obj_pos.numpy() |
| | obj_pos = np.asarray(obj_pos, dtype=np.float32).flatten() |
| | if len(obj_pos) < 3: |
| | logger.debug(f"Warning: Object position has insufficient coordinates: {obj_pos}") |
| | return None |
| | disk_pos = [float(obj_pos[0]), float(obj_pos[1]), float(obj_pos[2]) - 0.01] |
| | disk_actor.set_pose(sapien.Pose(p=disk_pos)) |
| | return disk_pos |
| | except Exception as e: |
| | logger.debug(f"Failed to update highlight disk position: {e}") |
| | return None |
| |
|
| | def _rgba_from_spec(color_spec): |
| | if isinstance(color_spec, sapien.render.RenderMaterial): |
| | color_value = getattr(color_spec, "base_color", None) |
| | if color_value is None: |
| | getter = getattr(color_spec, "get_base_color", None) |
| | if callable(getter): |
| | color_value = getter() |
| | if color_value is None: |
| | raise ValueError("RenderMaterial missing base_color information") |
| | else: |
| | color_value = color_spec |
| |
|
| | if isinstance(color_value, torch.Tensor): |
| | color_value = color_value.detach().cpu().reshape(-1).tolist() |
| | elif isinstance(color_value, np.ndarray): |
| | color_value = np.asarray(color_value, dtype=np.float32).reshape(-1).tolist() |
| | elif isinstance(color_value, (tuple, list)): |
| | color_value = list(color_value) |
| | elif isinstance(color_value, str) and color_value.startswith("#"): |
| | color_value = list(sapien_utils.hex2rgba(color_value)) |
| |
|
| | if not isinstance(color_value, (list, tuple)) or len(color_value) == 0: |
| | raise ValueError(f"Unsupported highlight color specification: {color_spec}") |
| |
|
| | color_list = [float(x) for x in color_value] |
| | if len(color_list) == 3: |
| | color_list.append(1.0) |
| | elif len(color_list) > 4: |
| | color_list = color_list[:4] |
| | return color_list |
| |
|
| | def _resolve_material_and_key(color_spec): |
| | if isinstance(color_spec, sapien.render.RenderMaterial): |
| | key = None |
| | base_color_attr = getattr(color_spec, "base_color", None) |
| | if base_color_attr is not None: |
| | try: |
| | color_tuple = tuple(float(x) for x in base_color_attr) |
| | if len(color_tuple) == 3: |
| | color_tuple = (*color_tuple, 1.0) |
| | key = ("rgba", color_tuple) |
| | except Exception: |
| | key = None |
| | if key is None: |
| | maybe_getter = getattr(color_spec, "get_base_color", None) |
| | if callable(maybe_getter): |
| | try: |
| | color_tuple = tuple(float(x) for x in maybe_getter()) |
| | if len(color_tuple) == 3: |
| | color_tuple = (*color_tuple, 1.0) |
| | key = ("rgba", color_tuple) |
| | except Exception: |
| | key = None |
| | if key is None: |
| | key = ("material", id(color_spec)) |
| | return color_spec, key |
| |
|
| | color_value = color_spec |
| | if isinstance(color_value, torch.Tensor): |
| | color_value = color_value.detach().cpu().reshape(-1).tolist() |
| | elif isinstance(color_value, np.ndarray): |
| | color_value = np.asarray(color_value, dtype=np.float32).reshape(-1).tolist() |
| | elif isinstance(color_value, (tuple, list)): |
| | color_value = list(color_value) |
| | elif isinstance(color_value, str) and color_value.startswith("#"): |
| | color_value = list(sapien_utils.hex2rgba(color_value)) |
| |
|
| | if not isinstance(color_value, (list, tuple)) or len(color_value) == 0: |
| | raise ValueError(f"Unsupported highlight color specification: {color_spec}") |
| |
|
| | color_list = [float(x) for x in color_value] |
| | if len(color_list) == 3: |
| | color_list.append(1.0) |
| | elif len(color_list) > 4: |
| | color_list = color_list[:4] |
| |
|
| | material = sapien.render.RenderMaterial() |
| | material.set_base_color(color_list) |
| | return material, ("rgba", tuple(color_list)) |
| |
|
| | def _add_target_visuals(builder, orientation, radius, half_length, ring_color): |
| | target_material = sapien.render.RenderMaterial() |
| | target_material.set_base_color(ring_color) |
| | white_material = sapien.render.RenderMaterial() |
| | white_material.set_base_color([1.0, 1.0, 1.0, 1.0]) |
| | offsets = [0.0, 1e-5, 2e-5, 3e-5, 4e-5] |
| | radii = [radius, radius * 4 / 5, radius * 3 / 5, radius * 2 / 5, radius * 1 / 5] |
| | materials = [target_material, white_material, target_material, white_material, target_material] |
| | for r, offset, mat in zip(radii, offsets, materials): |
| | builder.add_cylinder_visual( |
| | radius=r, |
| | half_length=half_length + offset, |
| | material=mat, |
| | pose=sapien.Pose(q=orientation), |
| | ) |
| |
|
| | start_step = _to_int(start_step) |
| | end_step = _to_int(end_step) |
| | cur_step = _to_int(cur_step) |
| |
|
| | |
| | should_highlight = start_step <= cur_step <= end_step |
| | cache_entry = self._highlight_cache[obj_id] |
| | disk = cache_entry['highlight_disk'] |
| | is_currently_highlighted = cache_entry['is_highlighted'] |
| |
|
| | |
| | if should_highlight: |
| | style_key = "target" if use_target_style else "solid" |
| | ring_color = None |
| | material = None |
| | if use_target_style: |
| | ring_color = _rgba_from_spec(highlight_color) |
| | color_key = (style_key, tuple(ring_color)) |
| | else: |
| | material, material_key = _resolve_material_and_key(highlight_color) |
| | color_key = (style_key, material_key) |
| | needs_new_actor = False |
| |
|
| | if cache_entry.get('highlight_color_key') != color_key and disk is not None: |
| | if style_key == "solid": |
| | updated = False |
| | try: |
| | for visual_body in getattr(disk, "visual_bodies", []): |
| | setter = getattr(visual_body, "set_render_material", None) |
| | if callable(setter): |
| | setter(material) |
| | updated = True |
| | else: |
| | setter = getattr(visual_body, "set_material", None) |
| | if callable(setter): |
| | setter(material) |
| | updated = True |
| | if updated: |
| | cache_entry['highlight_color_key'] = color_key |
| | else: |
| | needs_new_actor = True |
| | except Exception: |
| | needs_new_actor = True |
| | else: |
| | needs_new_actor = True |
| |
|
| | if disk is None or needs_new_actor: |
| | if disk is not None: |
| | try: |
| | remove_actor = getattr(self.scene, "remove_actor", None) |
| | if callable(remove_actor): |
| | remove_actor(disk) |
| | except Exception: |
| | pass |
| | try: |
| | angles = torch.deg2rad(torch.tensor([0.0, 90.0, 0.0], dtype=torch.float32)) |
| | disk_orientation = matrix_to_quaternion( |
| | euler_angles_to_matrix(angles, convention="XYZ") |
| | ) |
| | builder = self.scene.create_actor_builder() |
| | disk_orientation_np = disk_orientation.cpu().numpy() |
| | if use_target_style: |
| | _add_target_visuals( |
| | builder, |
| | disk_orientation_np, |
| | disk_radius, |
| | disk_half_length, |
| | ring_color, |
| | ) |
| | else: |
| | builder.add_cylinder_visual( |
| | radius=disk_radius, |
| | half_length=disk_half_length, |
| | material=material, |
| | pose=sapien.Pose(q=disk_orientation_np), |
| | ) |
| | suffix = cache_entry.get('disk_instance_counter', 0) + 1 |
| | cache_entry['disk_instance_counter'] = suffix |
| | disk_name = f"highlight_disk_{obj_id}_{suffix}" |
| | disk = builder.build_kinematic(name=disk_name) |
| | cache_entry['highlight_disk'] = disk |
| | cache_entry['highlight_color_key'] = color_key |
| | except Exception as e: |
| | logger.debug(f"Failed to create highlight disk: {e}") |
| | cache_entry['highlight_disk'] = None |
| | return |
| | else: |
| | cache_entry.setdefault('highlight_color_key', color_key) |
| |
|
| | _set_visibility(disk, True) |
| | _ensure_disk_position(disk) |
| | cache_entry['is_highlighted'] = True |
| | cache_entry['highlight_disk'] = disk |
| |
|
| | elif is_currently_highlighted and disk is not None: |
| | |
| | _set_visibility(disk, False) |
| | try: |
| | disk.set_pose(sapien.Pose(p=[0.0, 0.0, -10.0])) |
| | except Exception: |
| | pass |
| | cache_entry['is_highlighted'] = False |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | def highlight_position( |
| | self, |
| | position, |
| | start_step: int, |
| | end_step: int, |
| | cur_step: int, |
| | highlight_color=None, |
| | disk_radius: float = 0.02, |
| | disk_half_length: float = 0.01, |
| | ): |
| | """ |
| | Highlight an arbitrary 3D position for a finite timestep window by spawning small spheres. |
| | Each call can introduce a new highlight that remains visible for the requested duration. |
| | Note: `disk_radius` now controls the radius of the highlight sphere. `disk_half_length` |
| | is kept for backward compatibility but is unused. |
| | """ |
| | if highlight_color is None: |
| | highlight_color =[1.0, 1.0, 1.0, 1.0] |
| |
|
| | |
| | if self.agent.tcp.pose.p[0][2] <0.1: |
| | gripper_close_flag = True |
| | else: |
| | gripper_close_flag = False |
| |
|
| |
|
| | if not hasattr(self, "_position_highlight_cache"): |
| | self._position_highlight_cache = {} |
| | self._position_highlight_counter = 0 |
| |
|
| | def _to_int(value): |
| | if isinstance(value, torch.Tensor): |
| | if value.numel() == 0: |
| | return 0 |
| | value = value.reshape(-1)[0].item() |
| | elif isinstance(value, np.ndarray): |
| | value = float(np.asarray(value).reshape(-1)[0]) |
| | return int(value) |
| |
|
| | def _to_vec3(value): |
| | if value is None: |
| | return np.zeros(3, dtype=np.float32) |
| | if isinstance(value, sapien.Pose): |
| | value = value.p |
| | if hasattr(value, "p"): |
| | value = value.p |
| | if isinstance(value, torch.Tensor): |
| | arr = value.detach().cpu().numpy() |
| | elif isinstance(value, np.ndarray): |
| | arr = value |
| | elif hasattr(value, "tolist"): |
| | arr = np.asarray(value.tolist(), dtype=np.float32) |
| | else: |
| | arr = np.asarray(value, dtype=np.float32) |
| | arr = arr.reshape(-1) |
| | if arr.size < 3: |
| | pad = np.zeros(3, dtype=np.float32) |
| | pad[: arr.size] = arr |
| | arr = pad |
| | return np.asarray(arr[:3], dtype=np.float32) |
| |
|
| | def _set_visibility(actor, visible: bool): |
| | try: |
| | for visual_body in actor.visual_bodies: |
| | visual_body.set_visibility(0.1 if visible else 0.0) |
| | except Exception: |
| | pass |
| |
|
| | def _resolve_material(color_spec): |
| | if isinstance(color_spec, sapien.render.RenderMaterial): |
| | return color_spec |
| |
|
| | color_value = color_spec |
| | if isinstance(color_value, torch.Tensor): |
| | color_value = color_value.detach().cpu().reshape(-1).tolist() |
| | elif isinstance(color_value, np.ndarray): |
| | color_value = np.asarray(color_value, dtype=np.float32).reshape(-1).tolist() |
| | elif isinstance(color_value, (tuple, list)): |
| | color_value = list(color_value) |
| | elif isinstance(color_value, str) and color_value.startswith("#"): |
| | color_value = list(sapien_utils.hex2rgba(color_value)) |
| |
|
| | if not isinstance(color_value, (list, tuple)) or len(color_value) == 0: |
| | raise ValueError(f"Unsupported highlight color specification: {color_spec}") |
| |
|
| | color_list = [float(x) for x in color_value] |
| | if len(color_list) == 3: |
| | color_list.append(1.0) |
| | elif len(color_list) > 4: |
| | color_list = color_list[:4] |
| |
|
| | material = sapien.render.RenderMaterial() |
| | material.set_base_color(color_list) |
| | return material |
| |
|
| | def _set_sphere_pose(actor, pos_vec): |
| | sphere_pos = [ |
| | float(pos_vec[0]), |
| | float(pos_vec[1]), |
| | float(0.01), |
| | ] |
| | try: |
| | actor.set_pose(sapien.Pose(p=sphere_pos)) |
| | except Exception: |
| | pass |
| |
|
| | def _teleport_away(actor): |
| | try: |
| | actor.set_pose(sapien.Pose(p=[0.0, 0.0, -10.0])) |
| | except Exception: |
| | pass |
| |
|
| | start_step_i = _to_int(start_step) |
| | end_step_i = _to_int(end_step) |
| | cur_step_i = _to_int(cur_step) |
| |
|
| | cache = self._position_highlight_cache |
| |
|
| | |
| | for key, entry in list(cache.items()): |
| | actor = entry.get("actor") |
| | if actor is None: |
| | cache.pop(key, None) |
| | continue |
| |
|
| | if cur_step_i > entry["end_step"]: |
| | _set_visibility(actor, False) |
| | |
| | _teleport_away(actor) |
| | try: |
| | if hasattr(self.scene, "remove_actor"): |
| | self.scene.remove_actor(actor) |
| | except Exception: |
| | pass |
| | cache.pop(key, None) |
| | continue |
| |
|
| | if entry["start_step"] <= cur_step_i <= entry["end_step"]: |
| | _set_visibility(actor, True) |
| | _set_sphere_pose(actor, entry["position"]) |
| | else: |
| | _set_visibility(actor, False) |
| |
|
| | if not gripper_close_flag: |
| | |
| | return |
| |
|
| | if cur_step_i < start_step_i or cur_step_i > end_step_i: |
| | return |
| |
|
| | pos_vec = _to_vec3(position) |
| | material = _resolve_material(highlight_color) |
| |
|
| | builder = self.scene.create_actor_builder() |
| | builder.add_sphere_visual( |
| | radius=float(disk_radius), |
| | material=material, |
| | ) |
| |
|
| | self._position_highlight_counter += 1 |
| | actor_name = f"position_highlight_{self._position_highlight_counter}" |
| | initial_pose = sapien.Pose( |
| | p=[float(pos_vec[0]), float(pos_vec[1]), float(0.01)] |
| | ) |
| | builder.set_initial_pose(initial_pose) |
| | disk_actor = builder.build_kinematic(name=actor_name) |
| |
|
| | _set_visibility(disk_actor, True) |
| | _set_sphere_pose(disk_actor, pos_vec) |
| |
|
| | cache[self._position_highlight_counter] = { |
| | "actor": disk_actor, |
| | "start_step": start_step_i, |
| | "end_step": end_step_i, |
| | "position": pos_vec, |
| | } |
| |
|
| | def lift_and_drop_objects_back_to_original( |
| | self, |
| | obj, |
| | start_step: int, |
| | end_step: int, |
| | cur_step: int, |
| | ): |
| | """Temporarily move object away within window, and release above original position at midpoint.""" |
| |
|
| | import numpy as np |
| | import sapien |
| |
|
| | start_step = int(start_step) |
| | end_step = int(end_step) |
| | cur_step = int(cur_step) |
| |
|
| | if not hasattr(self, "_lift_drop_cache"): |
| | self._lift_drop_cache = {} |
| |
|
| | key = (id(obj), start_step, end_step) |
| |
|
| | if cur_step > end_step: |
| | self._lift_drop_cache.pop(key, None) |
| | return |
| |
|
| | if cur_step < start_step: |
| | return |
| |
|
| | cache = self._lift_drop_cache.get(key) |
| | if cache is None: |
| | x0, y0, z0 = _vec3_of(self, obj) |
| | quat = obj.pose.q if hasattr(obj, "pose") else obj.get_pose().q |
| |
|
| | try: |
| | import torch |
| | if isinstance(quat, torch.Tensor): |
| | quat = quat.detach().cpu().numpy() |
| | except Exception: |
| | pass |
| |
|
| | quat = np.asarray(quat, dtype=np.float32).flatten() |
| |
|
| | cache = { |
| | "origin": np.array([x0, y0, z0], dtype=np.float32), |
| | "quat": quat, |
| | "height": 0.0, |
| | "drop_done": False, |
| | } |
| |
|
| | drop_height = cache["height"] |
| |
|
| | |
| | away = np.array([10.0, 10.0, 10.0], dtype=np.float32) |
| |
|
| | |
| | drop_target = cache["origin"].copy() |
| | drop_target[2] += drop_height |
| |
|
| | duration = max(1, end_step - start_step) |
| | half_window = max(1, duration // 2) |
| | drop_step = min(end_step, start_step + half_window) |
| |
|
| | cache["away_pos"] = away |
| | cache["drop_pos"] = drop_target |
| | cache["drop_step"] = drop_step |
| |
|
| | self._lift_drop_cache[key] = cache |
| |
|
| | def _teleport(target_pos): |
| | pose = sapien.Pose(p=target_pos.tolist(), q=cache["quat"]) |
| | try: |
| | obj.set_pose(pose) |
| | try: |
| | obj.set_linear_velocity(np.zeros(3)) |
| | obj.set_angular_velocity(np.zeros(3)) |
| | except Exception: |
| | pass |
| | except Exception as exc: |
| | logger.debug(f"Failed to teleport object: {exc}") |
| |
|
| | drop_step = cache["drop_step"] |
| |
|
| | if cur_step < drop_step: |
| | _teleport(cache["away_pos"]) |
| | return |
| |
|
| | if cur_step == drop_step: |
| | _teleport(cache["drop_pos"]) |
| | return |
| |
|
| | if cur_step > drop_step: |
| | self._lift_drop_cache.pop(key, None) |
| |
|
| |
|
| | def lift_and_drop_objectA_onto_objectB( |
| | self, |
| | obj_a, |
| | obj_b, |
| | start_step: int, |
| | end_step: int, |
| | cur_step: int, |
| | ): |
| | """ |
| | Teleport obj_a away during [start_step, end_step), then place it on top of obj_b at end_step. |
| | |
| | Args: |
| | obj_a: Object to be moved |
| | obj_b: Target object (obj_a will be placed on top of this) |
| | start_step: Start timestep (obj_a teleported away) |
| | end_step: End timestep (obj_a placed on obj_b) |
| | cur_step: Current timestep |
| | z_offset: Additional height offset above obj_b (default: 0.0) |
| | """ |
| |
|
| | import numpy as np |
| | import sapien |
| |
|
| | start_step = int(start_step) |
| | end_step = int(end_step) |
| | cur_step = int(cur_step) |
| |
|
| | |
| | if not hasattr(self, "_lift_drop_onto_cache"): |
| | self._lift_drop_onto_cache = {} |
| |
|
| | key = (id(obj_a), id(obj_b), start_step, end_step) |
| |
|
| | |
| | if cur_step < start_step: |
| | return |
| |
|
| | |
| | if cur_step > end_step: |
| | self._lift_drop_onto_cache.pop(key, None) |
| | return |
| |
|
| | |
| | cache = self._lift_drop_onto_cache.get(key) |
| | if cache is None: |
| | |
| | quat_a = obj_a.pose.q if hasattr(obj_a, "pose") else obj_a.get_pose().q |
| |
|
| | try: |
| | import torch |
| | if isinstance(quat_a, torch.Tensor): |
| | quat_a = quat_a.detach().cpu().numpy() |
| | except Exception: |
| | pass |
| |
|
| | quat_a = np.asarray(quat_a, dtype=np.float32).flatten() |
| |
|
| | cache = { |
| | "quat_a": quat_a, |
| | "away_pos": np.array([10.0, 10.0, 10.0], dtype=np.float32), |
| | |
| | "origin_z": _vec3_of(self, obj_a)[2], |
| | } |
| | self._lift_drop_onto_cache[key] = cache |
| |
|
| | def _teleport(target_pos, quat): |
| | """Helper function to teleport object and zero velocities""" |
| | pose = sapien.Pose(p=target_pos.tolist(), q=quat) |
| | try: |
| | obj_a.set_pose(pose) |
| | try: |
| | obj_a.set_linear_velocity(np.zeros(3)) |
| | obj_a.set_angular_velocity(np.zeros(3)) |
| | except Exception: |
| | pass |
| | except Exception as exc: |
| | logger.debug(f"Failed to teleport object: {exc}") |
| |
|
| | |
| | if cur_step < end_step: |
| | _teleport(cache["away_pos"], cache["quat_a"]) |
| | return |
| |
|
| | |
| | if cur_step == end_step: |
| | |
| | bx, by, bz = _vec3_of(self, obj_b) |
| |
|
| | |
| | obj_a_half_size = getattr(obj_a, "_cube_half_size", 0.02) |
| |
|
| | |
| | |
| | if hasattr(obj_b, "_bin_height"): |
| | obj_b_height = obj_b._bin_height |
| | else: |
| | |
| | obj_b_height = getattr(self, "cube_half_size", 0.02) * 2.5 |
| |
|
| | |
| | target_z = cache["origin_z"] |
| |
|
| | target_pos = np.array([bx, by, target_z], dtype=np.float32) |
| | _teleport(target_pos, cache["quat_a"]) |
| |
|
| | |
| | self._lift_drop_onto_cache.pop(key, None) |
| | return |
| |
|
| |
|
| | def rotate_points_random(points, angle_range, generator=None): |
| | """ |
| | Generate randomly rotated points |
| | |
| | Args: |
| | points: torch.Tensor or list, shape (N, 2) - N 2D points |
| | angle_range: tuple - (min_angle, max_angle) angle range (radians) |
| | generator: torch.Generator, optional - random number generator |
| | |
| | Returns: |
| | tuple: (angle, rotated_points) - rotation angle and rotated points |
| | """ |
| | |
| | if not isinstance(points, torch.Tensor): |
| | points = torch.tensor(points, dtype=torch.float32) |
| |
|
| | min_angle, max_angle = angle_range |
| |
|
| | |
| | angle = torch.rand(1, generator=generator) * (max_angle - min_angle) + min_angle |
| |
|
| | |
| | cos_angle = torch.cos(angle) |
| | sin_angle = torch.sin(angle) |
| | rotation_matrix = torch.tensor([ |
| | [cos_angle, -sin_angle], |
| | [sin_angle, cos_angle] |
| | ], dtype=points.dtype).squeeze() |
| |
|
| | |
| | rotated_points = torch.matmul(points, rotation_matrix.T) |
| |
|
| | return angle.item(), rotated_points.tolist() |
| |
|
| |
|
| | def move_straight_line(self, cube, start_step, end_step, cur_step, start_pos, end_pos,stop=False): |
| | """ |
| | Move a cube in a straight line from start_pos to end_pos during [start_step, end_step]. |
| | |
| | Args: |
| | cube: The cube object to move |
| | start_step: Start timestep for movement |
| | end_step: End timestep for movement |
| | cur_step: Current timestep |
| | start_pos: Starting position [x, y, z] or tuple (x, y, z) |
| | end_pos: Ending position [x, y, z] or tuple (x, y, z) |
| | """ |
| | import numpy as np |
| | import sapien |
| | if not stop: |
| | start_step = int(start_step) |
| | end_step = int(end_step) |
| | cur_step = int(cur_step) |
| |
|
| | |
| | if cur_step < start_step: |
| | return |
| |
|
| | |
| | if not hasattr(self, "_move_straight_cache"): |
| | self._move_straight_cache = {} |
| |
|
| | key = (id(cube), start_step, end_step) |
| |
|
| | if cur_step > end_step: |
| | self._move_straight_cache.pop(key, None) |
| | return |
| |
|
| | |
| | if cur_step == start_step or key not in self._move_straight_cache: |
| | |
| | quat = cube.pose.q if hasattr(cube, "pose") else cube.get_pose().q |
| |
|
| | try: |
| | import torch |
| | if isinstance(quat, torch.Tensor): |
| | quat = quat.detach().cpu().numpy() |
| | except Exception: |
| | pass |
| |
|
| | quat = np.asarray(quat, dtype=np.float32).flatten() |
| |
|
| | |
| | start = np.array(start_pos, dtype=np.float32) |
| | end = np.array(end_pos, dtype=np.float32) |
| |
|
| | self._move_straight_cache[key] = { |
| | "start_pos": start, |
| | "end_pos": end, |
| | "quat": quat, |
| | } |
| |
|
| | |
| | cache = self._move_straight_cache[key] |
| | start = cache["start_pos"] |
| | end = cache["end_pos"] |
| | quat = cache["quat"] |
| |
|
| | |
| | duration = max(1, end_step - start_step) |
| | alpha = (cur_step - start_step) / duration |
| | alpha = float(np.clip(alpha, 0.0, 1.0)) |
| |
|
| | |
| | alpha = alpha * alpha * (3.0 - 2.0 * alpha) |
| |
|
| | |
| | current_pos = start + (end - start) * alpha |
| |
|
| | |
| | try: |
| | cube.set_pose(sapien.Pose(p=current_pos.tolist(), q=quat)) |
| | except Exception as e: |
| | logger.debug(f"Failed to set cube pose in move_straight_line: {e}") |
| |
|
| | |
| | if cur_step >= end_step: |
| | self._move_straight_cache.pop(key, None) |
| |
|