| | import numpy as np |
| | import sapien |
| | import gymnasium as gym |
| |
|
| | import torch |
| | from robomme.robomme_env import * |
| | from . import reset_panda |
| |
|
| | from ...logging_utils import logger |
| |
|
| | from mani_skill.examples.motionplanning.panda.motionplanner import \ |
| | PandaArmMotionPlanningSolver |
| | from mani_skill.examples.motionplanning.base_motionplanner.utils import ( |
| | compute_grasp_info_by_obb, |
| | get_actor_obb, |
| | ) |
| |
|
| | from mani_skill.utils.geometry.rotation_conversions import ( |
| | euler_angles_to_matrix, |
| | matrix_to_quaternion, |
| | quaternion_multiply, |
| | ) |
| | from robomme.robomme_env.utils import * |
| |
|
| | |
| | FAILED_HOVER_PROB = 0.03 |
| |
|
| |
|
| | def _coerce_seed_to_int(value, default: int = 0) -> int: |
| | if value is None: |
| | return default |
| | if isinstance(value, torch.Tensor): |
| | value = value.detach().cpu().item() |
| | elif isinstance(value, np.ndarray): |
| | value = np.asarray(value).reshape(-1)[0] |
| | try: |
| | return int(value) |
| | except (TypeError, ValueError): |
| | return default |
| |
|
| |
|
| | def _get_fail_recover_rng(env): |
| | env_unwrapped = getattr(env, "unwrapped", env) |
| | seed_anchor = _coerce_seed_to_int(getattr(env_unwrapped, "seed", None), default=0) |
| | cached_seed = getattr(env_unwrapped, "_fail_recover_seed_anchor", None) |
| | generator = getattr(env_unwrapped, "_fail_recover_rng", None) |
| | if not isinstance(generator, torch.Generator) or cached_seed != seed_anchor: |
| | generator = torch.Generator() |
| | generator.manual_seed(seed_anchor) |
| | env_unwrapped._fail_recover_rng = generator |
| | env_unwrapped._fail_recover_seed_anchor = seed_anchor |
| | return generator, seed_anchor |
| |
|
| |
|
| | def _sample_fail_recover_xy_signs(env) -> tuple[np.ndarray, int]: |
| | generator, seed_anchor = _get_fail_recover_rng(env) |
| | signs = torch.randint(-1, 2, (2,), generator=generator, dtype=torch.int64) |
| | while bool(torch.all(signs == 0)): |
| | signs = torch.randint(-1, 2, (2,), generator=generator, dtype=torch.int64) |
| | return signs.detach().cpu().numpy().astype(np.int32), seed_anchor |
| |
|
| |
|
| | def grasp_and_lift_peg_side(env, planner,obj): |
| | planner.open_gripper() |
| | """Move to the peg tail, close gripper, lift, and keep holding.""" |
| | pose = obj.pose |
| | lift_height=0.2 |
| |
|
| | grasp_pose_p = pose.p |
| | if isinstance(grasp_pose_p, torch.Tensor): |
| | grasp_pose_p = grasp_pose_p.detach().cpu().numpy() |
| | grasp_pose_p = np.asarray(grasp_pose_p, dtype=np.float32).reshape(-1) |
| |
|
| | grasp_pose_q = pose.q |
| | if isinstance(grasp_pose_q, torch.Tensor): |
| | grasp_pose_q = grasp_pose_q.detach().cpu().numpy() |
| | grasp_pose_q = np.asarray(grasp_pose_q, dtype=np.float32).reshape(-1) |
| |
|
| | flip_angles = torch.tensor([[np.pi, 0.0, 0.0]], dtype=torch.float32) |
| | flip_matrix = euler_angles_to_matrix(flip_angles, convention="XYZ") |
| | flip_quat = matrix_to_quaternion(flip_matrix)[0] |
| |
|
| | grasp_pose_q_tensor = torch.from_numpy(grasp_pose_q).to( |
| | dtype=torch.float32, device=flip_quat.device |
| | ) |
| | rotated_quat = quaternion_multiply( |
| | grasp_pose_q_tensor.unsqueeze(0), flip_quat.unsqueeze(0) |
| | ).squeeze(0) |
| | grasp_pose_q = rotated_quat.detach().cpu().numpy().astype(np.float32) |
| | norm = np.linalg.norm(grasp_pose_q) |
| | if norm > 0: |
| | grasp_pose_q /= norm |
| |
|
| | lifted_pose_p = grasp_pose_p.copy() |
| | lifted_pose_p[2] = lift_height |
| |
|
| |
|
| | _record_waypoint( |
| | env, |
| | "grasp_and_lift_peg_side", |
| | "open", |
| | waypoint_p=lifted_pose_p, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=lifted_pose_p, q=grasp_pose_q)) |
| |
|
| | _record_waypoint( |
| | env, |
| | "grasp_and_lift_peg_side", |
| | "close", |
| | waypoint_p=grasp_pose_p, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=grasp_pose_p, q=grasp_pose_q)) |
| | planner.close_gripper() |
| |
|
| | lifted_pose_p = grasp_pose_p.copy() |
| | lifted_pose_p[2] = lift_height |
| | _record_waypoint( |
| | env, |
| | "grasp_and_lift_peg_side", |
| | "close", |
| | waypoint_p=lifted_pose_p, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=lifted_pose_p, q=grasp_pose_q)) |
| |
|
| | planner.close_gripper() |
| |
|
| | current_grasp_pose=sapien.Pose(p=grasp_pose_p, q=grasp_pose_q) |
| | env.current_grasp_pose = current_grasp_pose |
| |
|
| |
|
| | def return_to_original_pose(env,planner,current_grasp_pose): |
| |
|
| | grasp_pose_p=current_grasp_pose.p |
| | grasp_pose_q=current_grasp_pose.q |
| | lifted_pose_p = grasp_pose_p.copy() |
| | lifted_pose_p[2] = 0.2 |
| | planner.move_to_pose_with_screw(sapien.Pose(p=lifted_pose_p, q=grasp_pose_q)) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=grasp_pose_p, q=grasp_pose_q)) |
| | planner.open_gripper() |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | def insert_peg(env, planner,direction,obj,insert_obj=None,cut_retreat=False): |
| | """Insert the peg into the box.""" |
| |
|
| | if insert_obj is None: |
| | raise ValueError("insert_obj must be provided to compute the insert pose.") |
| |
|
| | def _compute_insert_pose(): |
| | pose = env.box.pose * insert_obj.pose.inv() * env.agent.tcp.pose |
| | if obj == -1: |
| | pose = pose * sapien.Pose(q=[0, 0, 0, 1]) |
| | if direction == -1: |
| | pose = pose * sapien.Pose(q=[0, 0, 0, 1]) |
| | return pose |
| |
|
| | def _resolve_target_pose(offset): |
| | current_pose = _compute_insert_pose() |
| | offset_vec = np.asarray(offset, dtype=np.float32).reshape(-1).copy() |
| | if obj == 1 and direction == -1 and offset_vec[0] < 0: |
| | |
| | relative_pose = insert_obj.pose.inv() * env.agent.tcp.pose |
| | relative_p = np.asarray(relative_pose.p, dtype=np.float32).reshape(-1) |
| | offset_vec[0] += relative_p[0] |
| | return current_pose * sapien.Pose(offset_vec.tolist()) |
| |
|
| | def _record_target_waypoint(target_pose, waypoint_type="close"): |
| | _record_waypoint( |
| | env, |
| | "insert_peg", |
| | waypoint_type, |
| | waypoint_p=np.asarray(target_pose.p, dtype=np.float32).reshape(-1), |
| | waypoint_q=np.asarray(target_pose.q, dtype=np.float32).reshape(-1), |
| | ) |
| |
|
| | def _move_with_offset(offset, *, target_pose=None): |
| | if target_pose is None: |
| | target_pose = _resolve_target_pose(offset) |
| | planner.move_to_pose_with_screw(target_pose) |
| |
|
| | def _move_with_offset_with_break(offset, *, target_pose=None): |
| | """Move with interrupt check; stop when elapsed_steps > end_steps + 3.""" |
| | end_steps = getattr(env, "end_steps", None) |
| | if end_steps is not None and int(getattr(env, "elapsed_steps", 0)) > end_steps + 3: |
| | return True |
| |
|
| | if target_pose is None: |
| | target_pose = _resolve_target_pose(offset) |
| |
|
| | |
| | pose_for_plan = planner._transform_pose_for_planning(target_pose) |
| | pose_p = np.asarray(pose_for_plan.p, dtype=np.float32).reshape(-1) |
| | pose_q = np.asarray(pose_for_plan.q, dtype=np.float32).reshape(-1) |
| | result = planner.planner.plan_screw( |
| | np.concatenate([pose_p, pose_q]), |
| | planner.robot.get_qpos().cpu().numpy()[0], |
| | time_step=planner.base_env.control_timestep, |
| | use_point_cloud=planner.use_point_cloud, |
| | ) |
| | if result["status"] != "Success": |
| | return False |
| |
|
| | |
| | n_step = result["position"].shape[0] |
| | for i in range(n_step): |
| | if end_steps is not None and int(getattr(env, "elapsed_steps", 0)) > end_steps + 3: |
| | logger.debug("break early") |
| | return True |
| | qpos = result["position"][i] |
| | if planner.control_mode == "pd_joint_pos_vel": |
| | qvel = result["velocity"][i] |
| | action = np.hstack([qpos, qvel, planner.gripper_state]) |
| | else: |
| | action = np.hstack([qpos, planner.gripper_state]) |
| | planner.env.step(action) |
| | planner.elapsed_steps += 1 |
| | return True |
| |
|
| | def _record_and_move(offset, *, with_break=False, waypoint_type="close"): |
| | target_pose = _resolve_target_pose(offset) |
| | _record_target_waypoint(target_pose, waypoint_type=waypoint_type) |
| | if with_break: |
| | return _move_with_offset_with_break(offset, target_pose=target_pose) |
| | _move_with_offset(offset, target_pose=target_pose) |
| | return True |
| | |
| | if obj==-1: |
| | _record_and_move([0.2, 0, -0.15]) |
| | _record_and_move([0.2, 0, 0]) |
| |
|
| | _move_with_offset([0.15 , 0, 0]) |
| | |
| |
|
| |
|
| | |
| | |
| | if cut_retreat!=True: |
| | _record_and_move([-0.05, 0, 0]) |
| |
|
| |
|
| |
|
| | else: |
| | logger.debug( |
| | f"cut_retreat mode (obj=-1): " |
| | f"elapsed_steps={int(getattr(env, 'elapsed_steps', 0))}, " |
| | f"end_steps={getattr(env, 'end_steps', None)}" |
| | ) |
| | _record_and_move([-0.05, 0, 0], with_break=True) |
| |
|
| |
|
| |
|
| |
|
| | else: |
| | _record_and_move([-0.2, 0, -0.15]) |
| | _record_and_move([-0.2, 0, 0]) |
| |
|
| | _move_with_offset([-0.15 , 0, 0]) |
| | |
| |
|
| |
|
| | |
| | |
| | if cut_retreat!=True: |
| | _record_and_move([-0.05, 0, 0]) |
| |
|
| |
|
| | else: |
| | logger.debug( |
| | f"cut_retreat mode (obj=1): " |
| | f"elapsed_steps={int(getattr(env, 'elapsed_steps', 0))}, " |
| | f"end_steps={getattr(env, 'end_steps', None)}" |
| | ) |
| | _record_and_move([-0.05, 0, 0], with_break=True) |
| |
|
| |
|
| |
|
| | def _zero_action_for_space(space): |
| | if isinstance(space, gym.spaces.Box): |
| | return np.zeros(space.shape, dtype=space.dtype) |
| | if isinstance(space, gym.spaces.Dict): |
| | return {k: _zero_action_for_space(subspace) for k, subspace in space.spaces.items()} |
| | sample = space.sample() |
| | if isinstance(sample, np.ndarray): |
| | return np.zeros_like(sample) |
| | if isinstance(sample, dict): |
| | return {k: np.zeros_like(v) for k, v in sample.items()} |
| | raise NotImplementedError("Unsupported action space type for zero action generation") |
| |
|
| |
|
| | def _flag_to_bool(flag): |
| | if flag is None: |
| | return False |
| | if isinstance(flag, (bool, np.bool_, np.bool8)): |
| | return bool(flag) |
| | if isinstance(flag, torch.Tensor): |
| | return bool(flag.detach().cpu().bool().any()) |
| | if isinstance(flag, np.ndarray): |
| | return bool(flag.any()) |
| | return bool(flag) |
| |
|
| | def solve_liftup_Xdistance(env,planner,distance): |
| | original_pose = env.agent.tcp.pose |
| | lift_pose_p=original_pose.p.tolist()[0] |
| | lift_pose_q=original_pose.q.tolist()[0] |
| | lift_pose_p[2]+=distance |
| | planner.move_to_pose_with_screw(sapien.Pose(p=lift_pose_p,q=lift_pose_q)) |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | def solve_push_to_target(env, planner, obj=None, target=None): |
| | planner.open_gripper() |
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | obj_pos = obj.pose.sp.p if hasattr(obj.pose.sp.p, '__iter__') else np.array(obj.pose.sp.p) |
| | target_pos = target.pose.sp.p if hasattr(target.pose.sp.p, '__iter__') else np.array(target.pose.sp.p) |
| | |
| | |
| | push_direction_xy = target_pos[:2] - obj_pos[:2] |
| | push_direction_xy = push_direction_xy / np.linalg.norm(push_direction_xy) |
| |
|
| | |
| | push_direction_3d = np.array([push_direction_xy[0], push_direction_xy[1], 0]) |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | x_axis = np.array([push_direction_xy[0], push_direction_xy[1], 0]) |
| |
|
| | |
| | z_axis = np.array([0, 0, -1]) |
| |
|
| | |
| | y_axis = np.cross(z_axis, x_axis) |
| | y_norm = np.linalg.norm(y_axis) |
| | if y_norm < 1e-6: |
| | raise ValueError("Push direction invalid; cannot construct gripper frame.") |
| | y_axis = y_axis / y_norm |
| |
|
| | |
| | rotation_matrix = np.column_stack([x_axis, y_axis, z_axis]) |
| |
|
| | |
| | rotation_matrix_torch = torch.from_numpy(rotation_matrix).float().unsqueeze(0) |
| | push_quat = matrix_to_quaternion(rotation_matrix_torch)[0] |
| |
|
| | |
| | |
| | |
| | if target_pos[0] < obj_pos[0] : |
| | z_rotation_angles = torch.deg2rad( |
| | torch.tensor([0.0, 0.0, 180], dtype=torch.float32) |
| | ) |
| | z_rotation_matrix = euler_angles_to_matrix(z_rotation_angles, convention="XYZ") |
| | z_rotation_quat = matrix_to_quaternion(z_rotation_matrix.unsqueeze(0))[0] |
| |
|
| | |
| | push_quat = quaternion_multiply( |
| | push_quat.unsqueeze(0), z_rotation_quat.unsqueeze(0) |
| | )[0] |
| |
|
| |
|
| | |
| | push_pose = sapien.Pose(p=obj_pos, q=push_quat.detach().cpu().numpy()) |
| | |
| | |
| | |
| | |
| | offset_distance = 0.05 |
| | start_pos = obj_pos - push_direction_3d * offset_distance |
| | start_pos[2] = push_pose.p[2] |
| | |
| | reach_pose_q = push_pose.q.tolist() if hasattr(push_pose.q, 'tolist') else list(push_pose.q) |
| | |
| | |
| | |
| | _record_waypoint( |
| | env, |
| | "solve_push_to_target", |
| | "close", |
| | waypoint_p=start_pos, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=start_pos.tolist(), q=reach_pose_q)) |
| |
|
| | |
| | planner.close_gripper() |
| | |
| | |
| | |
| | |
| | end_pos = target_pos.copy() - push_direction_3d * env.cube_half_size |
| | end_pos[2] = start_pos[2] |
| |
|
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_push_to_target", |
| | "close", |
| | waypoint_p=end_pos, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=end_pos.tolist(), q=reach_pose_q)) |
| |
|
| | |
| | _record_waypoint( |
| | env, |
| | "solve_push_to_target", |
| | "open", |
| | waypoint_p=end_pos, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.open_gripper() |
| |
|
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | def solve_push_to_target_with_peg(env, planner, obj=None, target=None, direction=None, obj_flag=None): |
| |
|
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | |
| | |
| | obj_pos = obj.pose.sp.p if hasattr(obj.pose.sp.p, '__iter__') else np.array(obj.pose.sp.p) |
| | target_pos = target.pose.sp.p if hasattr(target.pose.sp.p, '__iter__') else np.array(target.pose.sp.p) |
| |
|
| | |
| | |
| | |
| | push_direction_xy = target_pos[:2] - obj_pos[:2] |
| | push_direction_xy = push_direction_xy / np.linalg.norm(push_direction_xy) |
| |
|
| | |
| | push_direction_3d = np.array([push_direction_xy[0], push_direction_xy[1], 0]) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | x_axis = np.array([push_direction_xy[0], push_direction_xy[1], 0]) |
| | z_axis = np.array([0, 0, -1]) |
| | y_axis = np.cross(z_axis, x_axis) |
| |
|
| | |
| | y_norm = np.linalg.norm(y_axis) |
| | if y_norm < 1e-6: |
| | raise ValueError("Invalid push direction; failed to construct gripper frame.") |
| | y_axis = y_axis / y_norm |
| |
|
| | |
| | rotation_matrix = np.column_stack([x_axis, y_axis, z_axis]) |
| |
|
| | |
| | |
| | |
| | rotation_matrix_torch = torch.from_numpy(rotation_matrix).float().unsqueeze(0) |
| | base_quat = matrix_to_quaternion(rotation_matrix_torch)[0] |
| |
|
| | |
| | |
| | |
| | z_rotation_angles = torch.deg2rad( |
| | torch.tensor([0.0, 0.0, 90.0 * direction * obj_flag], dtype=torch.float32) |
| | ) |
| | z_rotation_matrix = euler_angles_to_matrix(z_rotation_angles, convention="XYZ") |
| | z_rotation_quat = matrix_to_quaternion(z_rotation_matrix.unsqueeze(0))[0] |
| |
|
| | |
| | push_quat = quaternion_multiply( |
| | base_quat.unsqueeze(0), z_rotation_quat.unsqueeze(0) |
| | )[0].cpu().numpy() |
| |
|
| | |
| | push_pose = sapien.Pose(p=obj_pos, q=push_quat) |
| |
|
| | |
| | |
| | |
| | offset_distance = 0.1 |
| | start_pos = obj_pos - push_direction_3d * offset_distance |
| | start_pos[2] = obj_pos[2] |
| |
|
| | |
| | lateral_unit = np.array([-push_direction_xy[1], push_direction_xy[0], 0], dtype=np.float32) |
| | lateral_norm = np.linalg.norm(lateral_unit[:2]) |
| |
|
| | |
| | if lateral_norm < 1e-6: |
| | lateral_unit = np.array([0.0, 1.0, 0.0], dtype=np.float32) |
| | else: |
| | lateral_unit /= lateral_norm |
| |
|
| | |
| | lateral_distance = 0.1 * direction |
| | start_pos = start_pos - lateral_unit * lateral_distance |
| |
|
| | |
| | reach_pose_q = push_pose.q.tolist() if hasattr(push_pose.q, "tolist") else list(push_pose.q) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | start_ready_pos = start_pos.copy() |
| | |
| | |
| | _record_waypoint( |
| | env, |
| | "solve_push_to_target_with_peg", |
| | "close", |
| | waypoint_p=start_pos, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=start_ready_pos.tolist(), q=reach_pose_q)) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=start_pos.tolist(), q=reach_pose_q)) |
| |
|
| |
|
| | |
| | |
| | |
| | planner.close_gripper() |
| |
|
| | |
| | |
| | |
| | |
| | end_pos = target_pos - push_direction_3d * 0.03 |
| | end_pos[2] = start_pos[2] |
| | end_pos = end_pos - lateral_unit * lateral_distance |
| | _record_waypoint( |
| | env, |
| | "solve_push_to_target_with_peg", |
| | "close", |
| | waypoint_p=end_pos, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=end_pos.tolist(), q=reach_pose_q)) |
| |
|
| | |
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_push_to_target_with_peg", |
| | "open", |
| | waypoint_p=end_pos, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.open_gripper() |
| | |
| |
|
| | def move_to_avoid(env, planner): |
| |
|
| | original_pose = env.agent.tcp.pose |
| | lift_pose_p=original_pose.p.tolist()[0] |
| | lift_pose_q=original_pose.q.tolist()[0] |
| | lift_pose_p=[-0.1,0,0.1] |
| | planner.move_to_pose_with_screw(sapien.Pose(p=lift_pose_p,q=lift_pose_q)) |
| |
|
| | return None |
| |
|
| |
|
| | def solve_pickup_fail(env, planner, obj=None,z_offset=None,xy_offset=None,obj_type="cube",mode=None): |
| | """Hover directly above grasp pose with slight random +z, close then reopen gripper.""" |
| | if obj is None: |
| | return None |
| |
|
| | env = getattr(env, "unwrapped", env) |
| | planner.open_gripper() |
| |
|
| | |
| | obb = get_actor_obb(obj) |
| | approaching = np.array([0, 0, -1]) |
| | target_closing = env.agent.tcp.pose.to_transformation_matrix()[0, :3, 1].cpu().numpy() |
| | grasp_info = compute_grasp_info_by_obb( |
| | obb, |
| | approaching=approaching, |
| | target_closing=target_closing, |
| | depth=0.025, |
| | ) |
| | closing = grasp_info["closing"] |
| | grasp_pose = env.agent.build_grasp_pose(approaching, closing, obj.pose.sp.p) |
| | if obj_type=="bin": |
| | grasp_pose = grasp_pose * sapien.Pose([0, 0, -0.01]) |
| |
|
| | fail_pose_p = np.asarray(grasp_pose.p, dtype=np.float32).reshape(-1).tolist() |
| | fail_pose_q = np.asarray(grasp_pose.q, dtype=np.float32).reshape(-1).tolist() |
| |
|
| | normalized_mode = mode.lower() if isinstance(mode, str) else mode |
| |
|
| | fail_seed_anchor = _coerce_seed_to_int(getattr(env, "seed", None), default=0) |
| | env.fail_recover_mode = normalized_mode |
| | env.fail_recover_seed_anchor = fail_seed_anchor |
| | env.fail_recover_xy_signs = None |
| | env.fail_recover_xy_signed_offset = None |
| |
|
| | if normalized_mode == "xy": |
| | env.fail="xy" |
| | xy_offset = np.asarray(xy_offset, dtype=np.float32).reshape(-1) |
| | if xy_offset.size == 1: |
| | xy_offset = np.repeat(xy_offset, 2) |
| | signs, fail_seed_anchor = _sample_fail_recover_xy_signs(env) |
| | signed_offset = xy_offset * signs.astype(np.float32) |
| | fail_pose_p[0] += float(signed_offset[0]) |
| | fail_pose_p[1] += float(signed_offset[1]) |
| | env.fail_recover_seed_anchor = fail_seed_anchor |
| | env.fail_recover_xy_signs = signs.astype(np.int32) |
| | env.fail_recover_xy_signed_offset = signed_offset.astype(np.float32) |
| | elif normalized_mode == "z": |
| | env.fail="z" |
| | z_shift = z_offset |
| | fail_pose_p[2] += z_shift |
| | else: |
| | raise ValueError(f"Invalid fail mode: {mode}") |
| | |
| | ready_pose_p = fail_pose_p.copy() |
| | ready_pose_p[2] = 0.15 |
| | if obj_type=="bin": |
| | ready_pose_p[2] = 0.2 |
| | ready_pose = sapien.Pose(p=ready_pose_p, q=fail_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_pickup_fail", |
| | "open", |
| | waypoint_p=ready_pose_p, |
| | waypoint_q=fail_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(ready_pose) |
| |
|
| | fail_pose = sapien.Pose(p=fail_pose_p, q=fail_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_pickup_fail", |
| | "close", |
| | waypoint_p=fail_pose_p, |
| | waypoint_q=fail_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(fail_pose) |
| | planner.close_gripper() |
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_pickup_fail", |
| | "open", |
| | waypoint_p=ready_pose_p, |
| | waypoint_q=fail_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(ready_pose) |
| | planner.open_gripper() |
| |
|
| | env.use_fail_planner=True |
| |
|
| | return None |
| |
|
| |
|
| | def _record_waypoint(env, solve_function, waypoint_type, *, waypoint_p, waypoint_q): |
| | """ |
| | Record waypoint info to env attribute, for subsequent writing to hdf5 file. |
| | |
| | Args: |
| | env: Environment object |
| | solve_function: solve function name string (e.g., 'solve_pickup') |
| | waypoint_type: waypoint type string (e.g., 'reach_pose', 'grasp_pose') |
| | waypoint_p: waypoint position (3D) |
| | waypoint_q: waypoint quaternion (4D) |
| | """ |
| | if waypoint_type not in ["open", "close"]: |
| | raise ValueError(f"waypoint_type must be 'open' or 'close', but got '{waypoint_type}'") |
| |
|
| | waypoint_p_np = np.asarray(waypoint_p, dtype=np.float32).reshape(-1) |
| | waypoint_q_np = np.asarray(waypoint_q, dtype=np.float32).reshape(-1) |
| | if waypoint_p_np.size != 3: |
| | raise ValueError(f"waypoint_p must have 3 elements, got shape {waypoint_p_np.shape}") |
| | if waypoint_q_np.size != 4: |
| | raise ValueError(f"waypoint_q must have 4 elements, got shape {waypoint_q_np.shape}") |
| | if not np.isfinite(waypoint_p_np).all(): |
| | raise ValueError(f"waypoint_p contains non-finite values: {waypoint_p_np}") |
| | if not np.isfinite(waypoint_q_np).all(): |
| | raise ValueError(f"waypoint_q contains non-finite values: {waypoint_q_np}") |
| |
|
| | env = getattr(env, "unwrapped", env) |
| | |
| | |
| | waypoint_info = { |
| | 'solve_function': solve_function, |
| | 'waypoint_type': waypoint_type, |
| | 'waypoint_p': waypoint_p_np, |
| | 'waypoint_q': waypoint_q_np, |
| | |
| | |
| | 'waypoint_phase_is_demo': bool(getattr(env, "current_task_demonstration", False)), |
| | } |
| | |
| | env._pending_waypoint = waypoint_info |
| |
|
| |
|
| | def solve_pickup(env, planner, obj=None,fail_grasp=False,mode=None): |
| | |
| | planner.open_gripper() |
| | if(env.use_demonstrationwrapper==False): |
| | if fail_grasp==True: |
| | solve_pickup_fail(env, planner, obj,z_offset=env.cube_half_size*2,xy_offset=env.cube_half_size*2,obj_type="cube",mode=mode) |
| |
|
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | obb = get_actor_obb(obj) |
| |
|
| | approaching = np.array([0, 0, -1]) |
| | |
| | target_closing = env.agent.tcp.pose.to_transformation_matrix()[0, :3, 1].cpu().numpy() |
| | |
| | grasp_info = compute_grasp_info_by_obb( |
| | obb, |
| | approaching=approaching, |
| | target_closing=target_closing, |
| | depth=FINGER_LENGTH, |
| | ) |
| | closing, center = grasp_info["closing"], grasp_info["center"] |
| | grasp_pose = env.agent.build_grasp_pose(approaching, closing, obj.pose.sp.p) |
| |
|
| | |
| | |
| | |
| | reach_pose_p = grasp_pose.p.tolist() if hasattr(grasp_pose.p, 'tolist') else list(grasp_pose.p) |
| | reach_pose_q = grasp_pose.q.tolist() if hasattr(grasp_pose.q, 'tolist') else list(grasp_pose.q) |
| | reach_pose_p[2]=0.15 |
| | _record_waypoint( |
| | env, |
| | "solve_pickup", |
| | "open", |
| | waypoint_p=reach_pose_p, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=reach_pose_p,q=reach_pose_q)) |
| |
|
| | planner.open_gripper() |
| |
|
| | |
| | |
| | |
| | grasp_pose_p = grasp_pose.p.tolist() if hasattr(grasp_pose.p, 'tolist') else list(grasp_pose.p) |
| | grasp_pose_q = grasp_pose.q.tolist() if hasattr(grasp_pose.q, 'tolist') else list(grasp_pose.q) |
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_pickup", |
| | "close", |
| | waypoint_p=grasp_pose_p, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(grasp_pose) |
| |
|
| | planner.close_gripper() |
| |
|
| | |
| | |
| | |
| | goal_pose_P=obj.pose.p.tolist()[0] |
| | goal_pose_P[2]=0.15 |
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose.q) |
| | _record_waypoint( |
| | env, |
| | "solve_pickup", |
| | "close", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| |
|
| | |
| |
|
| | planner.close() |
| | return res |
| |
|
| | def solve_pickup_bin(env, planner, obj=None, fail_grasp=False, mode=None): |
| | planner.open_gripper() |
| | |
| | if(env.use_demonstrationwrapper==False): |
| | if fail_grasp==True: |
| | solve_pickup_fail(env, planner, obj,z_offset=0.035,xy_offset=0.035,obj_type="bin", mode=mode) |
| |
|
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | obb = get_actor_obb(obj) |
| |
|
| | approaching = np.array([0, 0, -1]) |
| | |
| | target_closing = env.agent.tcp.pose.to_transformation_matrix()[0, :3, 1].cpu().numpy() |
| | |
| | grasp_info = compute_grasp_info_by_obb( |
| | obb, |
| | approaching=approaching, |
| | target_closing=target_closing, |
| | depth=FINGER_LENGTH, |
| | ) |
| | closing, center = grasp_info["closing"], grasp_info["center"] |
| | grasp_pose = env.agent.build_grasp_pose(approaching, closing, obj.pose.sp.p) |
| |
|
| | |
| | |
| | |
| | reach_pose = grasp_pose * sapien.Pose([0, 0, -0.15]) |
| | reach_pose_p=reach_pose.p.tolist() |
| | reach_pose_p[2]=0.2 |
| | reach_pose_q=reach_pose.q.tolist() |
| | reach_pose_fix=sapien.Pose(reach_pose_p,reach_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_pickup_bin", |
| | "open", |
| | waypoint_p=reach_pose_p, |
| | waypoint_q=reach_pose_q, |
| | ) |
| | planner.move_to_pose_with_screw(reach_pose_fix) |
| | planner.open_gripper() |
| |
|
| | |
| | |
| | |
| | grasp_pose_up=grasp_pose * sapien.Pose([0, 0, -0.01]) |
| | _record_waypoint( |
| | env, |
| | "solve_pickup_bin", |
| | "close", |
| | waypoint_p=np.asarray(grasp_pose_up.p, dtype=np.float32).reshape(-1), |
| | waypoint_q=np.asarray(grasp_pose_up.q, dtype=np.float32).reshape(-1), |
| | ) |
| | planner.move_to_pose_with_screw(grasp_pose_up) |
| | planner.close_gripper() |
| | |
| | |
| |
|
| | |
| | |
| | |
| | goal_pose_P=obj.pose.p.tolist()[0] |
| | goal_pose_P[2]=0.2 |
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose.q) |
| | _record_waypoint( |
| | env, |
| | "solve_pickup_bin", |
| | "close", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=np.asarray(grasp_pose.q, dtype=np.float32).reshape(-1), |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| | |
| |
|
| | planner.close() |
| | return res |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | def solve_putonto_whenhold(env, planner,target=None,height=None): |
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | grasp_pose_q=env.agent.tcp.pose.q.tolist()[0] |
| |
|
| | goal_pose_P_prepare=target.pose.p.tolist()[0] |
| | goal_pose_P_prepare[2]=0.15 |
| | goal_pose = sapien.Pose(goal_pose_P_prepare, grasp_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_putonto_whenhold", |
| | "close", |
| | waypoint_p=goal_pose_P_prepare, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| |
|
| | goal_pose_P=target.pose.p.tolist()[0] |
| | |
| | if height is not None: |
| | goal_pose_P[2]=height |
| |
|
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_putonto_whenhold", |
| | "open", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| |
|
| | planner.open_gripper() |
| |
|
| | goal_pose_P=target.pose.p.tolist()[0] |
| | goal_pose_P[2]=0.15 |
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_putonto_whenhold", |
| | "open", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| | |
| | planner.close() |
| | return res |
| | def solve_swingonto_whenhold(env, planner,target=None,height=0.05): |
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | grasp_pose_q=env.agent.tcp.pose.q.tolist()[0] |
| | goal_pose_P=target.pose.p.tolist()[0] |
| | goal_pose_P[2]=height |
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_swingonto_whenhold", |
| | "close", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| | planner.close() |
| | return res |
| | def solve_swingonto_withDirection(env, planner, target=None, radius=0.1, direction="counterclockwise"): |
| | """Planar arc motion at z=0.07 from current TCP to target. |
| | |
| | direction: "counterclockwise" means left side of t_start->t_end (positive cross product); "clockwise" means right side.""" |
| | if target is None: |
| | raise ValueError("target must be provided for swing onto motion.") |
| |
|
| | start_pos = env.agent.tcp.pose.p.reshape(-1, 3)[0] |
| | end_pos = target.pose.p.reshape(-1, 3)[0] |
| | start_xy = np.asarray(start_pos[:2], dtype=np.float32) |
| | end_xy = np.asarray(end_pos[:2], dtype=np.float32) |
| |
|
| | chord_vec = end_xy - start_xy |
| | chord_len = np.linalg.norm(chord_vec) |
| | current_qpos = env.agent.tcp.pose.q.reshape(-1, 4)[0].tolist() |
| |
|
| | |
| | init_qpos_tensor = planner.robot.get_qpos() |
| | qpos_device = init_qpos_tensor.device if hasattr(init_qpos_tensor, "device") else None |
| | qpos_dtype = init_qpos_tensor.dtype if hasattr(init_qpos_tensor, "dtype") else torch.float32 |
| | init_qpos = ( |
| | init_qpos_tensor.detach().cpu().numpy() if hasattr(init_qpos_tensor, "detach") else np.asarray(init_qpos_tensor) |
| | ).reshape(-1) |
| | plan_start_qpos = init_qpos.copy() |
| |
|
| |
|
| | waypoints = [] |
| | if chord_len < 1e-6: |
| | goal_p = end_pos.tolist() |
| | goal_p[2] = 0.07 |
| | waypoints.append(sapien.Pose(goal_p, current_qpos)) |
| | else: |
| | |
| | lateral_offset = float(max(radius, 1e-4)) |
| | dir_unit = chord_vec / chord_len |
| | perp = np.array([-dir_unit[1], dir_unit[0]]) |
| | direction_l = str(direction).lower() |
| | if direction_l == "counterclockwise": |
| | sign = -1.0 |
| | elif direction_l == "clockwise": |
| | sign = 1.0 |
| | else: |
| | |
| | sign = 1.0 if direction_l == "left" else -1.0 |
| | control_xy = (start_xy + end_xy) / 2.0 + sign * lateral_offset * perp |
| |
|
| | |
| | num_steps = 45 |
| | for t in np.linspace(0.0, 1.0, num_steps): |
| | one_minus_t = 1.0 - t |
| | waypoint_xy = ( |
| | (one_minus_t ** 2) * start_xy |
| | + 2 * one_minus_t * t * control_xy |
| | + (t ** 2) * end_xy |
| | ) |
| | goal_p = [float(waypoint_xy[0]), float(waypoint_xy[1]), 0.07] |
| | waypoints.append(sapien.Pose(goal_p, current_qpos)) |
| | if waypoints: |
| | |
| | last_p = np.asarray(waypoints[-1].p, dtype=np.float32).reshape(-1).tolist() |
| | last_q = np.asarray(waypoints[-1].q, dtype=np.float32).reshape(-1).tolist() |
| | for _ in range(5): |
| | waypoints.append(sapien.Pose(last_p, last_q)) |
| | logger.debug(" get waypoint") |
| | |
| | positions = [] |
| | last_res = None |
| | |
| | plan_start_qpos_full = planner.planner.pad_qpos(plan_start_qpos.copy()) |
| | for idx, wp in enumerate(waypoints): |
| | pose_for_plan = planner._transform_pose_for_planning(wp) |
| | pose_p = np.asarray(pose_for_plan.p, dtype=np.float32).reshape(-1) |
| | pose_q = np.asarray(pose_for_plan.q, dtype=np.float32).reshape(-1) |
| | goal_world = np.concatenate([pose_p, pose_q]) |
| | goal_base = planner.planner.transform_goal_to_wrt_base(goal_world) |
| |
|
| | ik_status, ik_solutions = planner.planner.IK( |
| | goal_base, |
| | plan_start_qpos_full.copy(), |
| | ) |
| | if ik_status != "Success" or len(ik_solutions) == 0: |
| | logger.debug(f"IK failed at waypoint {idx}: {ik_status}") |
| | continue |
| |
|
| | |
| | qpos_sol = ik_solutions[0] |
| | padded_qpos = plan_start_qpos_full.copy() |
| | padded_qpos[: qpos_sol.shape[0]] = qpos_sol |
| | positions.append(padded_qpos) |
| | |
| | plan_start_qpos_full = padded_qpos |
| |
|
| | if len(positions) == 0: |
| | logger.debug("No IK solutions found for waypoints, aborting follow_path.") |
| | return None |
| |
|
| | full_positions = np.stack(positions, axis=0) |
| | mid_idx = len(full_positions) // 2 |
| |
|
| | |
| | traj_res_1 = { |
| | "status": "Success", |
| | "position": full_positions[:mid_idx], |
| | } |
| | if planner.control_mode == "pd_joint_pos_vel": |
| | traj_res_1["velocity"] = np.zeros_like(traj_res_1["position"]) |
| | |
| | |
| | |
| | mid_pose = waypoints[mid_idx - 1] if mid_idx > 0 else waypoints[0] |
| | _record_waypoint( |
| | env, |
| | "solve_swingonto_withDirection", |
| | "close", |
| | waypoint_p=np.asarray(mid_pose.p, dtype=np.float32).reshape(-1), |
| | waypoint_q=np.asarray(mid_pose.q, dtype=np.float32).reshape(-1), |
| | ) |
| | planner.follow_path(traj_res_1) |
| |
|
| | |
| | traj_res_2 = { |
| | "status": "Success", |
| | "position": full_positions[mid_idx:], |
| | } |
| | if planner.control_mode == "pd_joint_pos_vel": |
| | traj_res_2["velocity"] = np.zeros_like(traj_res_2["position"]) |
| |
|
| | |
| | |
| | end_pose = waypoints[-1] |
| | _record_waypoint( |
| | env, |
| | "solve_swingonto_withDirection", |
| | "close", |
| | waypoint_p=np.asarray(end_pose.p, dtype=np.float32).reshape(-1), |
| | waypoint_q=np.asarray(end_pose.q, dtype=np.float32).reshape(-1), |
| | ) |
| | last_res = planner.follow_path(traj_res_2) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | return last_res |
| |
|
| | def solve_swingonto(env, planner,target=None,record_swing_qpos=False): |
| | env = env.unwrapped |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | current_qpos = env.agent.tcp.pose.q.reshape(-1, 4)[0] |
| |
|
| | |
| | goal_pose_P=target.pose.p.tolist()[0] |
| | goal_pose_P[2]=0.07 |
| | goal_pose = sapien.Pose(goal_pose_P, current_qpos) |
| | for i in range(2): |
| | if i == 1 : |
| | _record_waypoint( |
| | env, |
| | "solve_swingonto", |
| | "close", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=current_qpos, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| | try: |
| | planner.close_gripper() |
| | except: |
| | AttributeError |
| |
|
| |
|
| | if record_swing_qpos==True: |
| | env.swing_qpos=env.agent.robot.qpos |
| |
|
| | |
| | |
| |
|
| | return res |
| |
|
| | def solve_strong_reset(env, planner,timestep=30,gripper=None,action=None): |
| | try: |
| | planner.open_gripper() |
| | except: |
| | AttributeError |
| | if action==None: |
| | action=reset_panda.get_reset_panda_param("action",gripper=gripper) |
| | for i in range(timestep): |
| | env.step(action) |
| | logger.debug("strong reset!!") |
| | env.unwrapped.reset_in_proecess=True |
| | env.unwrapped.after_demo=True |
| | env.unwrapped.reset_in_proecess=False |
| | def solve_reset(env, planner): |
| | pose_p=[0,0,0.2] |
| | pose_q=env.agent.tcp.pose.q.tolist()[0] |
| | planner.move_to_pose_with_screw(sapien.Pose(p=pose_p,q=pose_q)) |
| | planner.open_gripper() |
| |
|
| | def solve_putdown_whenhold(env, planner,release_z=0.07): |
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | grasp_pose_q=env.agent.tcp.pose.q.tolist()[0] |
| |
|
| | goal_pose_P=env.agent.tcp.pose.p.tolist()[0] |
| | goal_pose_P[2]=release_z |
| | |
| | goal_pose = sapien.Pose(goal_pose_P,grasp_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_putdown_whenhold", |
| | "open", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| | planner.open_gripper() |
| |
|
| | goal_pose_P=env.agent.tcp.pose.p.tolist()[0] |
| | goal_pose_P[2]=0.15 |
| |
|
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose_q) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| |
|
| |
|
| | |
| |
|
| | planner.close() |
| | return res |
| |
|
| |
|
| | def solve_putonto_whenhold_binspecial(env, planner,target=None): |
| | FINGER_LENGTH = 0.025 |
| | env = env.unwrapped |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | grasp_pose_q=env.agent.tcp.pose.q.tolist()[0] |
| |
|
| | goal_pose_P=target.pose.p.tolist()[0] |
| | goal_pose_P[2]=0.2 |
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose_q) |
| | _record_waypoint( |
| | env, |
| | "solve_putonto_whenhold_binspecial", |
| | "close", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_putonto_whenhold_binspecial", |
| | "open", |
| | waypoint_p=goal_pose_P, |
| | waypoint_q=grasp_pose_q, |
| | ) |
| | planner.open_gripper() |
| |
|
| | goal_pose_P=target.pose.p.tolist()[0] |
| | goal_pose_P[0]=goal_pose_P[0]-0.1 |
| | goal_pose_P[2]=0.2 |
| | goal_pose = sapien.Pose(goal_pose_P, grasp_pose_q) |
| | res = planner.move_to_pose_with_screw(goal_pose) |
| | |
| | planner.close() |
| | return res |
| |
|
| | def solve_hold_obj(env, planner, static_steps,close=False): |
| | start_step = int(getattr(env, "elapsed_steps", 0)) |
| | target_step = start_step + static_steps |
| | while int(getattr(env, "elapsed_steps", 0)) < target_step: |
| | if close: |
| | try: |
| | planner.close_gripper() |
| | except: |
| | AttributeError |
| |
|
| | else: |
| | try: |
| | planner.open_gripper() |
| | except: |
| | AttributeError |
| |
|
| | current_step = int(getattr(env, "elapsed_steps", 0)) |
| | |
| | return None |
| |
|
| | def solve_hold_obj_absTimestep(env,planner,absTimestep): |
| | current_pose = env.agent.tcp.pose |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | while int(getattr(env, "elapsed_steps", 0)) < absTimestep: |
| | planner.close_gripper() |
| | return None |
| |
|
| | def solve_button(env, planner,obj,steps_press=None,interval=20,without_hold=False): |
| | |
| | |
| | |
| | |
| | |
| |
|
| | FINGER_LENGTH = 0.025 |
| | env=env.unwrapped |
| | position=obj.pose.p.tolist()[0] |
| | ready_position=position.copy() |
| | ready_position[2]=0.15 |
| |
|
| | angles = torch.deg2rad(torch.tensor([180.0, 0.0, 0.0], dtype=torch.float32)) |
| | rotate = matrix_to_quaternion( |
| | euler_angles_to_matrix(angles, convention="XYZ") |
| | ) |
| | _record_waypoint( |
| | env, |
| | "solve_button", |
| | "open", |
| | waypoint_p=ready_position, |
| | waypoint_q=rotate, |
| | ) |
| | if without_hold==False: |
| | planner.move_to_pose_with_screw(sapien.Pose(p=ready_position,q=rotate)) |
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_button", |
| | "close", |
| | waypoint_p=ready_position, |
| | waypoint_q=rotate, |
| | ) |
| | planner.close_gripper() |
| |
|
| | steps=env.elapsed_steps.item() |
| | logger.debug("press button at step %s", steps) |
| | |
| | rotate_list = rotate.tolist() if hasattr(rotate, 'tolist') else rotate |
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_button", |
| | "close", |
| | waypoint_p=position, |
| | waypoint_q=rotate, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=position,q=rotate)) |
| |
|
| | _record_waypoint( |
| | env, |
| | "solve_button", |
| | "close", |
| | waypoint_p=ready_position, |
| | waypoint_q=rotate, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=ready_position, q=rotate)) |
| | |
| |
|
| | def solve_button_ready(env, planner,obj): |
| | FINGER_LENGTH = 0.025 |
| | env=env.unwrapped |
| | position=obj.pose.p.tolist()[0] |
| | ready_position=position.copy() |
| | ready_position[2]=0.15 |
| |
|
| | angles = torch.deg2rad(torch.tensor([180.0, 0.0, 0.0], dtype=torch.float32)) |
| | rotate = matrix_to_quaternion( |
| | euler_angles_to_matrix(angles, convention="XYZ") |
| | ) |
| | |
| | _record_waypoint( |
| | env, |
| | "solve_button_ready", |
| | "close", |
| | waypoint_p=ready_position, |
| | waypoint_q=rotate, |
| | ) |
| | planner.move_to_pose_with_screw(sapien.Pose(p=ready_position,q=rotate)) |
| | planner.close_gripper() |
| |
|