| import os |
| import json |
| from isaacsim import SimulationApp |
|
|
|
|
| def _env_flag(name: str, default: bool = False) -> bool: |
| value = os.environ.get(name) |
| if value is None: |
| return default |
| return value.strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def _env_float(name: str): |
| value = os.environ.get(name) |
| if value is None or value == "": |
| return None |
| return float(value) |
|
|
|
|
| def _env_text(name: str): |
| value = os.environ.get(name) |
| if value is None or value == "": |
| return None |
| return value |
|
|
|
|
| HEADLESS = _env_flag("DEXGARMENTLAB_HEADLESS", True) |
| EMIT_JSON = _env_flag("DEXGARMENTLAB_EMIT_JSON", False) |
| RESULT_PREFIX = "DEXGARMENTLAB_RESULT=" |
| simulation_app = SimulationApp({"headless": HEADLESS}) |
|
|
| |
| import sys |
| import time |
| import numpy as np |
| import open3d as o3d |
| from termcolor import cprint |
| import threading |
|
|
| |
| import omni.replicator.core as rep |
| import isaacsim.core.utils.prims as prims_utils |
| from pxr import UsdGeom,UsdPhysics,PhysxSchema, Gf |
| from isaacsim.core.api import World |
| from isaacsim.core.api import SimulationContext |
| from isaacsim.core.api.objects import DynamicCuboid, FixedCuboid, VisualCuboid |
| from isaacsim.core.utils.prims import is_prim_path_valid, set_prim_visibility, delete_prim |
| from isaacsim.core.utils.string import find_unique_string_name |
| from isaacsim.core.utils.viewports import set_camera_view |
| from isaacsim.core.utils.stage import add_reference_to_stage, is_stage_loading |
| from isaacsim.core.prims import SingleXFormPrim, SingleClothPrim, SingleRigidPrim, SingleGeometryPrim, SingleParticleSystem, SingleDeformablePrim |
| from isaacsim.core.prims import XFormPrim, ClothPrim, RigidPrim, GeometryPrim, ParticleSystem |
| from isaacsim.core.utils.rotations import euler_angles_to_quat |
| from isaacsim.core.utils.types import ArticulationAction, ArticulationActions |
| from omni.physx.scripts import deformableUtils,particleUtils,physicsUtils |
|
|
| |
| sys.path.append(os.getcwd()) |
| from Env_StandAlone.BaseEnv import BaseEnv |
| from Env_Config.Garment.Particle_Garment import Particle_Garment |
| from Env_Config.Garment.Deformable_Garment import Deformable_Garment |
| from Env_Config.Robot.BimanualDex_Ur10e import Bimanual_Ur10e |
| from Env_Config.Camera.Recording_Camera import Recording_Camera |
| from Env_Config.Room.Real_Ground import Real_Ground |
| from Env_Config.Room.Object_Tools import pusher_loader, set_prim_visible_group, delete_prim_group |
| from Env_Config.Utils_Project.Code_Tools import get_unique_filename, normalize_columns |
| from Env_Config.Utils_Project.Parse import parse_args_val |
| from Env_Config.Utils_Project.Collision_Group import CollisionGroup |
| from Env_Config.Utils_Project.Point_Cloud_Manip import rotate_point_cloud |
| from Model_HALO.GAM.GAM_Encapsulation import GAM_Encapsulation |
| from Model_HALO.SADP.SADP import SADP |
|
|
| class StoreTops_Env(BaseEnv): |
| def __init__( |
| self, |
| pos:np.ndarray=None, |
| ori:np.ndarray=None, |
| usd_path:str=None, |
| env_dx:float=0.0, |
| env_dy:float=0.0, |
| ground_material_usd:str=None, |
| record_video_flag:bool=False, |
| training_data_num:int=100, |
| stage_1_checkpoint_num:int=1500, |
| stage_2_checkpoint_num:int=1500, |
| stage_3_checkpoint_num:int=1500, |
| ): |
| |
| super().__init__() |
|
|
| |
| |
| |
| |
| |
| self.ground = Real_Ground( |
| self.scene, |
| visual_material_usd = ground_material_usd, |
| |
| ) |
| |
| |
| self.garment = Particle_Garment( |
| self.world, |
| pos=np.array([0, 3.0, 0.6]), |
| ori=np.array([0, 0, 0]), |
| usd_path="Assets/Garment/Tops/Collar_noSleeve_FrontClose/TCNC_Top338/TCNC_Top338_obj.usd" if usd_path is None else usd_path, |
| ) |
| |
| |
| self.bimanual_dex = Bimanual_Ur10e( |
| self.world, |
| dexleft_pos=np.array([-0.9, 0.0, 0.5]), |
| dexleft_ori=np.array([0.0, 0.0, 0.0]), |
| dexright_pos=np.array([0.9, 0.0, 0.5]), |
| dexright_ori=np.array([0.0, 0.0, 0.0]), |
| ) |
|
|
| |
| self.garment_camera = Recording_Camera( |
| camera_position=np.array([pos[0], pos[1], 6.75]), |
| camera_orientation=np.array([0, 90.0, 90.0]), |
| prim_path="/World/garment_camera", |
| ) |
| |
| self.env_camera = Recording_Camera( |
| camera_position=np.array([0.0, 5.22, 8.11]), |
| camera_orientation=np.array([0, 60, -90.0]), |
| prim_path="/World/env_camera", |
| ) |
| |
| self.judge_camera = Recording_Camera( |
| camera_position=np.array([0.0+env_dx,1.25+env_dy,6.75]), |
| camera_orientation=np.array([0, 90, -90.0]), |
| prim_path="/World/judge_camera", |
| ) |
| self.env_dx = env_dx |
| self.env_dy = env_dy |
| self.pusher = pusher_loader(self.scene) |
| |
| |
| self.model = GAM_Encapsulation(catogory="Tops_NoSleeve") |
| |
| |
| self.helper_path=['/World/defaultGroundPlane/GroundPlane', '/World/pusher'] |
| self.collisiongroup = CollisionGroup( |
| self.world, |
| helper_path=self.helper_path, |
| garment=True, |
| collide_with_garment=True, |
| collide_with_robot=False, |
| ) |
| |
| self.object_camera = Recording_Camera( |
| camera_position=np.array([0.0, -6.6, 4.9]), |
| camera_orientation=np.array([0, 30.0, 90.0]), |
| prim_path="/World/object_camera", |
| ) |
|
|
| self.garment_pcd = None |
| self.object_pcd = None |
| self.points_affordance_feature = None |
| |
| self.sadp = SADP(task_name="Store_Tops_stage_1", data_num=training_data_num, checkpoint_num=stage_1_checkpoint_num) |
| |
| |
| |
| |
| |
| self.reset() |
| |
| self.garment.set_pose(pos=np.array([pos[0], pos[1], 0.20]), ori=ori) |
| self.position = [pos[0], pos[1], 0.2] |
| self.orientation = ori |
| |
| |
| self.garment_camera.initialize( |
| segment_pc_enable=True, |
| segment_prim_path_list=[ |
| "/World/Garment/garment", |
| ] |
| ) |
| |
| |
| self.env_camera.initialize(depth_enable=True) |
| |
| self.judge_camera.initialize( |
| segment_pc_enable=True, |
| segment_prim_path_list=[ |
| "/World/Garment/garment", |
| ] |
| ) |
| |
| self.object_camera.initialize( |
| segment_pc_enable=True, |
| segment_prim_path_list=[ |
| "/World/pusher", |
| ] |
| ) |
| |
| |
| if record_video_flag: |
| self.thread_record = threading.Thread(target=self.env_camera.collect_rgb_graph_for_video) |
| self.thread_record.daemon = True |
|
|
| |
| for i in range(100): |
| self.step() |
| |
| cprint("----------- World Configuration -----------", color="magenta", attrs=["bold"]) |
| cprint(f"usd_path: {usd_path}", "magenta") |
| cprint(f"pos_x: {pos[0]}", "magenta") |
| cprint(f"pos_y: {pos[1]}", "magenta") |
| cprint(f"env_dx: {env_dx}", "magenta") |
| cprint(f"env_dy: {env_dy}", "magenta") |
| cprint("----------- World Configuration -----------", color="magenta", attrs=["bold"]) |
|
|
| |
| cprint("World Ready!", "green", "on_green") |
|
|
|
|
| def StoreTops(pos, ori, usd_path, env_dx, env_dy, ground_material_usd, validation_flag, record_video_flag, training_data_num, stage_1_checkpoint_num, stage_2_checkpoint_num, stage_3_checkpoint_num): |
| |
| env = StoreTops_Env(pos, ori, usd_path, env_dx, env_dy, ground_material_usd, record_video_flag, training_data_num, stage_1_checkpoint_num, stage_2_checkpoint_num, stage_3_checkpoint_num) |
| |
| |
| set_prim_visible_group( |
| prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/pusher"], |
| visible=False, |
| ) |
| for i in range(50): |
| env.step() |
| |
| env.garment_pcd, color = env.garment_camera.get_point_cloud_data_from_segment( |
| save_or_not=False, |
| save_path=get_unique_filename("data", extension=".ply"), |
| ) |
| |
| set_prim_visible_group( |
| prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/pusher"], |
| visible=True, |
| ) |
| for i in range(50): |
| env.step() |
| |
| if record_video_flag: |
| env.thread_record.start() |
| |
| manipulation_points, indices, points_similarity = env.model.get_manipulation_points(input_pcd=env.garment_pcd, index_list=[1954, 1832, 528, 587]) |
| manipulation_points[:, 2] = 0.025 |
| |
| env.points_affordance_feature = normalize_columns(points_similarity[2:4].T) |
| |
| |
| env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0], left_ori=np.array([0.579, -0.579, -0.406, 0.406]), right_pos=manipulation_points[1], right_ori=np.array([0.406, -0.406, -0.579, 0.579])) |
|
|
| env.bimanual_dex.set_both_hand_state(left_hand_state="open", right_hand_state="open") |
| |
| env.bimanual_dex.set_both_hand_state(left_hand_state="close", right_hand_state="close") |
| |
| for i in range(20): |
| env.step() |
| |
| left_dis=np.sqrt((manipulation_points[0][0]-manipulation_points[2][0])**2+(manipulation_points[0][1]-manipulation_points[2][1])**2) |
| right_dis=np.sqrt((manipulation_points[1][0]-manipulation_points[3][0])**2+(manipulation_points[1][1]-manipulation_points[3][1])**2) |
| distance=(left_dis+right_dis)/4 |
| |
| y_off=0.01 |
| z_off=0.005 |
| left_lift_points,right_lift_points=np.array([manipulation_points[0][0], manipulation_points[0][1]-distance+y_off, distance+z_off]), np.array([manipulation_points[1][0], manipulation_points[1][1]-distance+y_off, distance+z_off]) |
| |
| |
| env.bimanual_dex.dense_move_both_ik(left_pos=left_lift_points, left_ori=np.array([0.579, -0.579, -0.406, 0.406]), right_pos=right_lift_points, right_ori=np.array([0.406, -0.406, -0.579, 0.579])) |
|
|
| left_lift_points,right_lift_points=np.array([manipulation_points[0][0], manipulation_points[2][1]+0.06, distance+z_off]), np.array([manipulation_points[1][0], manipulation_points[3][1]+0.06, distance+z_off]) |
| env.bimanual_dex.dense_move_both_ik(left_pos=left_lift_points, left_ori=np.array([0.579, -0.579, -0.406, 0.406]), right_pos=right_lift_points, right_ori=np.array([0.406, -0.406, -0.579, 0.579])) |
|
|
| |
| env.bimanual_dex.set_both_hand_state(left_hand_state="open", right_hand_state="open") |
| |
| env.garment.particle_material.set_gravity_scale(10.0) |
| |
| for i in range(200): |
| env.step() |
| |
| env.garment.particle_material.set_gravity_scale(1.0) |
| |
| cprint("Store World Fold Procedure Finish! Store Procedure Begins!", "green", "on_green") |
| |
| pusher_center = np.array([0.0+env_dx, 1.10+env_dy, 0.0]) |
| env.pusher.set_world_pose(position=pusher_center) |
|
|
| |
| set_prim_visible_group( |
| prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/Garment/garment"], |
| visible=False, |
| ) |
| for i in range(50): |
| env.step() |
| |
| env.object_pcd, color = env.object_camera.get_point_cloud_data_from_segment( |
| save_or_not=False, |
| save_path=get_unique_filename("data", extension=".ply"), |
| |
| ) |
| |
| set_prim_visible_group( |
| prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/Garment/garment"], |
| visible=True, |
| ) |
| for i in range(50): |
| env.step() |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| left_ori=np.array([0.7010574,0.5609855, 0.4304593, 0.092296]) |
| right_ori=np.array([ 0.4304593, 0.092296, 0.7010574,0.5609855]) |
| left_lift_points,right_lift_points=np.array([-0.5, 0.6, 0.65]), np.array([0.5, 0.6, 0.65]) |
| env.bimanual_dex.dense_move_both_ik(left_pos=left_lift_points, left_ori=left_ori, right_pos=right_lift_points, right_ori=right_ori) |
| |
|
|
| for i in range(50): |
| env.step() |
| |
| garment_fold_length = np.max([ |
| abs(manipulation_points[0][1] - manipulation_points[2][1]), |
| abs(manipulation_points[1][1] - manipulation_points[3][1]) |
| ]) / 2 |
|
|
| garment_fold_width = np.max([ |
| abs(manipulation_points[0][0] - manipulation_points[1][0]), |
| abs(manipulation_points[2][0] - manipulation_points[3][0]) |
| ]) |
|
|
| |
| manipulation_points=manipulation_points[2:] |
| left_off0=np.array([-0.08,-0.0,0.005]) |
| right_off0=np.array([0.08,-0.0,0.005]) |
| left_off1=np.array([-0.05,-0.0,0.002]) |
| right_off1=np.array([0.05,-0.0,0.002]) |
| left_off2=np.array([0.005,0.05,-0.0]) |
| right_off2=np.array([-0.005,0.05,-0.0]) |
| manipulation_points[:,2]=0.00 |
| |
| env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0]+left_off0, left_ori=left_ori, right_pos=manipulation_points[1]+right_off0, right_ori=right_ori) |
| for i in range(20): |
| env.step() |
| env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0]+left_off1, left_ori=left_ori, right_pos=manipulation_points[1]+right_off1, right_ori=right_ori) |
| for i in range(20): |
| env.step() |
| env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0]+left_off2, left_ori=left_ori, right_pos=manipulation_points[1]+right_off2, right_ori=right_ori) |
| for i in range(20): |
| env.step() |
| |
| |
| |
| |
| for i in range(20): |
| env.step() |
|
|
| for i in range(10): |
| |
| print(f"Stage_1_Step: {i}") |
|
|
| joint_pos_L = env.bimanual_dex.dexleft.get_joint_positions() |
| joint_pos_R = env.bimanual_dex.dexright.get_joint_positions() |
| joint_state = np.concatenate([joint_pos_L, joint_pos_R]) |
| |
| obs = dict() |
| obs['agent_pos']=joint_state |
| obs['environment_point_cloud']=env.env_camera.get_pointcloud_from_depth() |
| obs['garment_point_cloud']=env.garment_pcd |
| obs['object_point_cloud']=env.object_pcd |
| obs['points_affordance_feature']=env.points_affordance_feature |
|
|
| action=env.sadp.get_action(obs) |
| |
| print("action_shape:",action.shape) |
| |
| for j in range(4): |
| |
| action_L = ArticulationAction(joint_positions=action[j][:30]) |
| action_R = ArticulationAction(joint_positions=action[j][30:]) |
|
|
| env.bimanual_dex.dexleft.apply_action(action_L) |
| env.bimanual_dex.dexright.apply_action(action_R) |
| |
| for _ in range(5): |
| env.step() |
| |
| joint_pos_L = env.bimanual_dex.dexleft.get_joint_positions() |
| joint_pos_R = env.bimanual_dex.dexright.get_joint_positions() |
| joint_state = np.concatenate([joint_pos_L, joint_pos_R]) |
| |
| obs = dict() |
| obs['agent_pos']=joint_state |
| obs['environment_point_cloud']=env.env_camera.get_pointcloud_from_depth() |
| obs['garment_point_cloud']=env.garment_pcd |
| obs['object_point_cloud']=env.object_pcd |
| obs['points_affordance_feature']=env.points_affordance_feature |
| |
| env.sadp.update_obs(obs) |
| |
| |
| if record_video_flag: |
| if not os.path.exists("Data/Store_Tops_Validation_HALO/video"): |
| os.makedirs("Data/Store_Tops_Validation_HALO/video") |
| env.env_camera.create_mp4(get_unique_filename("Data/Store_Tops_Validation_HALO/video/video", ".mp4")) |
| |
| delete_prim("/World/DexLeft") |
| delete_prim("/World/DexRight") |
| for i in range(50): |
| env.step() |
| |
| success=True |
| store_state, color = env.judge_camera.get_point_cloud_data_from_segment(save_or_not=False) |
| |
| max_x = np.max(store_state[:, 0]) |
| min_x = np.min(store_state[:, 0]) |
| max_y = np.max(store_state[:, 1]) |
| min_y = np.min(store_state[:, 1]) |
| |
| center_x = (max_x + min_x) / 2 |
| center_y = (max_y + min_y) / 2 |
| |
| distance = np.sqrt((center_x - pusher_center[0]) ** 2 + (center_y - pusher_center[1]) ** 2) |
| |
| if distance < 0.1: |
| success=True |
| else: |
| success=False |
| |
| cprint("----------- Judge Begin -----------", "blue", attrs=["bold"]) |
| cprint(f"garment_center: {center_x}, {center_y}", "blue") |
| cprint(f"pusher_center: {pusher_center[0]}, {pusher_center[1]}", "blue") |
| cprint(f"distance: {distance}", "blue") |
| cprint("----------- Judge End -----------", "blue", attrs=["bold"]) |
| cprint(f"final result: {success}", color="green", on_color="on_green") |
|
|
| if validation_flag: |
| if not os.path.exists("Data/Store_Tops_Validation_HALO"): |
| os.makedirs("Data/Store_Tops_Validation_HALO") |
| |
| with open("Data/Store_Tops_Validation_HALO/validation_log.txt", "a") as f: |
| f.write(f"result:{success} usd_path:{env.garment.usd_path} pos_x:{pos[0]} pos_y:{pos[1]} env_dx:{env_dx} env_dy:{env_dy} \n") |
|
|
| if not os.path.exists("Data/Store_Tops_Validation_HALO/final_state_pic"): |
| os.makedirs("Data/Store_Tops_Validation_HALO/final_state_pic") |
| env.env_camera.get_rgb_graph(save_or_not=True,save_path=get_unique_filename("Data/Store_Tops_Validation_HALO/final_state_pic/img",".png")) |
|
|
| result = { |
| "task_name": "Store Tops HALO", |
| "success": bool(success), |
| "garment_point_count": int(env.garment_pcd.shape[0]) if env.garment_pcd is not None else 0, |
| "object_point_count": int(env.object_pcd.shape[0]) if env.object_pcd is not None else 0, |
| "judge_point_count": int(store_state.shape[0]), |
| "affordance_feature_norm": float(np.linalg.norm(env.points_affordance_feature)) if env.points_affordance_feature is not None else 0.0, |
| "distance_to_target_center": float(distance), |
| } |
| return result |
|
|
| |
|
|
| |
| if __name__=="__main__": |
| |
| args = parse_args_val() |
| |
| |
| pos = np.array([0.0, 0.7, 0.20]) |
| ori = np.array([0.0, 0.0, 0.0]) |
| usd_path = None |
| env_dx = 0.0 |
| env_dy = 0.0 |
|
|
| if args.env_random_flag or args.garment_random_flag: |
| rng_seed = _env_text("DEXGARMENTLAB_RNG_SEED") |
| np.random.seed(int(rng_seed) if rng_seed is not None else int(time.time())) |
| if args.env_random_flag: |
| env_dx = np.random.uniform(-0.3, 0.3) |
| env_dy = np.random.uniform(-0.1, 0.1) |
| if args.garment_random_flag: |
| x = np.random.uniform(-0.05, 0.05) |
| y = np.random.uniform(0.65, 0.75) |
| pos = np.array([x,y,0.0]) |
| ori = np.array([0.0, 0.0, 0.0]) |
| Base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| assets_lists = os.path.join(Base_dir,"Model_HALO/GAM/checkpoints/Tops_NoSleeve/assets_list.txt") |
| assets_list = [] |
| with open(assets_lists,"r",encoding='utf-8') as f: |
| for line in f: |
| clean_line = line.rstrip('\n') |
| assets_list.append(clean_line) |
| usd_path=np.random.choice(assets_list) |
|
|
| forced_usd_path = _env_text("DEXGARMENTLAB_USD_PATH") |
| if forced_usd_path is not None: |
| usd_path = forced_usd_path |
| forced_pos_x = _env_float("DEXGARMENTLAB_POS_X") |
| forced_pos_y = _env_float("DEXGARMENTLAB_POS_Y") |
| if forced_pos_x is not None or forced_pos_y is not None: |
| pos = np.array([ |
| pos[0] if forced_pos_x is None else forced_pos_x, |
| pos[1] if forced_pos_y is None else forced_pos_y, |
| pos[2], |
| ]) |
| forced_ori_z = _env_float("DEXGARMENTLAB_ORI_Z") |
| if forced_ori_z is not None: |
| ori = np.array([ori[0], ori[1], forced_ori_z]) |
| forced_env_dx = _env_float("DEXGARMENTLAB_ENV_DX") |
| if forced_env_dx is not None: |
| env_dx = forced_env_dx |
| forced_env_dy = _env_float("DEXGARMENTLAB_ENV_DY") |
| if forced_env_dy is not None: |
| env_dy = forced_env_dy |
|
|
| result = StoreTops(pos, ori, usd_path, env_dx, env_dy, args.ground_material_usd, args.validation_flag, args.record_video_flag, args.training_data_num, args.stage_1_checkpoint_num, args.stage_2_checkpoint_num, args.stage_3_checkpoint_num) |
| if EMIT_JSON: |
| print(f"{RESULT_PREFIX}{json.dumps(result, sort_keys=True)}") |
|
|
| if args.validation_flag: |
| simulation_app.close() |
| elif HEADLESS: |
| simulation_app.close() |
| else: |
| while simulation_app.is_running(): |
| simulation_app.update() |
|
|
| simulation_app.close() |
|
|