import os import json from isaacsim import SimulationApp def _env_flag(name: str, default: bool = False) -> bool: value = os.environ.get(name) if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} def _env_float(name: str): value = os.environ.get(name) if value is None or value == "": return None return float(value) def _env_text(name: str): value = os.environ.get(name) if value is None or value == "": return None return value HEADLESS = _env_flag("DEXGARMENTLAB_HEADLESS", True) EMIT_JSON = _env_flag("DEXGARMENTLAB_EMIT_JSON", False) RESULT_PREFIX = "DEXGARMENTLAB_RESULT=" simulation_app = SimulationApp({"headless": HEADLESS}) # load external package import sys import time import numpy as np import open3d as o3d from termcolor import cprint import threading # load isaac-relevant package import omni.replicator.core as rep import isaacsim.core.utils.prims as prims_utils from pxr import UsdGeom,UsdPhysics,PhysxSchema, Gf from isaacsim.core.api import World from isaacsim.core.api import SimulationContext from isaacsim.core.api.objects import DynamicCuboid, FixedCuboid, VisualCuboid from isaacsim.core.utils.prims import is_prim_path_valid, set_prim_visibility, delete_prim from isaacsim.core.utils.string import find_unique_string_name from isaacsim.core.utils.viewports import set_camera_view from isaacsim.core.utils.stage import add_reference_to_stage, is_stage_loading from isaacsim.core.prims import SingleXFormPrim, SingleClothPrim, SingleRigidPrim, SingleGeometryPrim, SingleParticleSystem, SingleDeformablePrim from isaacsim.core.prims import XFormPrim, ClothPrim, RigidPrim, GeometryPrim, ParticleSystem from isaacsim.core.utils.rotations import euler_angles_to_quat from isaacsim.core.utils.types import ArticulationAction, ArticulationActions from omni.physx.scripts import deformableUtils,particleUtils,physicsUtils # load custom package sys.path.append(os.getcwd()) from Env_StandAlone.BaseEnv import BaseEnv from Env_Config.Garment.Particle_Garment import Particle_Garment from Env_Config.Garment.Deformable_Garment import Deformable_Garment from Env_Config.Robot.BimanualDex_Ur10e import Bimanual_Ur10e from Env_Config.Camera.Recording_Camera import Recording_Camera from Env_Config.Room.Real_Ground import Real_Ground from Env_Config.Room.Object_Tools import pusher_loader, set_prim_visible_group, delete_prim_group from Env_Config.Utils_Project.Code_Tools import get_unique_filename, normalize_columns from Env_Config.Utils_Project.Parse import parse_args_val from Env_Config.Utils_Project.Collision_Group import CollisionGroup from Env_Config.Utils_Project.Point_Cloud_Manip import rotate_point_cloud from Model_HALO.GAM.GAM_Encapsulation import GAM_Encapsulation from Model_HALO.SADP.SADP import SADP class StoreTops_Env(BaseEnv): def __init__( self, pos:np.ndarray=None, ori:np.ndarray=None, usd_path:str=None, env_dx:float=0.0, env_dy:float=0.0, ground_material_usd:str=None, record_video_flag:bool=False, training_data_num:int=100, stage_1_checkpoint_num:int=1500, stage_2_checkpoint_num:int=1500, stage_3_checkpoint_num:int=1500, ): # load BaseEnv super().__init__() # ------------------------------------ # # --- Add Env Assets --- # # ------------------------------------ # # add ground self.ground = Real_Ground( self.scene, visual_material_usd = ground_material_usd, # you can use materials in 'Assets/Material/Floor' to change the texture of ground. ) # load garment self.garment = Particle_Garment( self.world, pos=np.array([0, 3.0, 0.6]), ori=np.array([0, 0, 0]), usd_path="Assets/Garment/Tops/Collar_noSleeve_FrontClose/TCNC_Top338/TCNC_Top338_obj.usd" if usd_path is None else usd_path, ) # load bimanual_dex self.bimanual_dex = Bimanual_Ur10e( self.world, dexleft_pos=np.array([-0.9, 0.0, 0.5]), dexleft_ori=np.array([0.0, 0.0, 0.0]), dexright_pos=np.array([0.9, 0.0, 0.5]), dexright_ori=np.array([0.0, 0.0, 0.0]), ) # load camera self.garment_camera = Recording_Camera( camera_position=np.array([pos[0], pos[1], 6.75]), camera_orientation=np.array([0, 90.0, 90.0]), prim_path="/World/garment_camera", ) self.env_camera = Recording_Camera( camera_position=np.array([0.0, 5.22, 8.11]), camera_orientation=np.array([0, 60, -90.0]), prim_path="/World/env_camera", ) self.judge_camera = Recording_Camera( camera_position=np.array([0.0+env_dx,1.25+env_dy,6.75]), camera_orientation=np.array([0, 90, -90.0]), prim_path="/World/judge_camera", ) self.env_dx = env_dx self.env_dy = env_dy self.pusher = pusher_loader(self.scene) # load GAM Model self.model = GAM_Encapsulation(catogory="Tops_NoSleeve") # define collision group - helper path self.helper_path=['/World/defaultGroundPlane/GroundPlane', '/World/pusher'] self.collisiongroup = CollisionGroup( self.world, helper_path=self.helper_path, garment=True, collide_with_garment=True, collide_with_robot=False, ) self.object_camera = Recording_Camera( camera_position=np.array([0.0, -6.6, 4.9]), camera_orientation=np.array([0, 30.0, 90.0]), prim_path="/World/object_camera", ) self.garment_pcd = None self.object_pcd = None self.points_affordance_feature = None self.sadp = SADP(task_name="Store_Tops_stage_1", data_num=training_data_num, checkpoint_num=stage_1_checkpoint_num) # ------------------------------------ # # --- Initialize World to be Ready --- # # ------------------------------------ # # initialize world self.reset() self.garment.set_pose(pos=np.array([pos[0], pos[1], 0.20]), ori=ori) self.position = [pos[0], pos[1], 0.2] self.orientation = ori # initialize recording camera to obtain point cloud data of garment self.garment_camera.initialize( segment_pc_enable=True, segment_prim_path_list=[ "/World/Garment/garment", ] ) # initialize gif camera to obtain rgb with the aim of creating gif self.env_camera.initialize(depth_enable=True) self.judge_camera.initialize( segment_pc_enable=True, segment_prim_path_list=[ "/World/Garment/garment", ] ) self.object_camera.initialize( segment_pc_enable=True, segment_prim_path_list=[ "/World/pusher", ] ) # add thread and record gif Asynchronously(use to collect rgb data for generating gif) if record_video_flag: self.thread_record = threading.Thread(target=self.env_camera.collect_rgb_graph_for_video) self.thread_record.daemon = True # step world to make it ready for i in range(100): self.step() cprint("----------- World Configuration -----------", color="magenta", attrs=["bold"]) cprint(f"usd_path: {usd_path}", "magenta") cprint(f"pos_x: {pos[0]}", "magenta") cprint(f"pos_y: {pos[1]}", "magenta") cprint(f"env_dx: {env_dx}", "magenta") cprint(f"env_dy: {env_dy}", "magenta") cprint("----------- World Configuration -----------", color="magenta", attrs=["bold"]) cprint("World Ready!", "green", "on_green") def StoreTops(pos, ori, usd_path, env_dx, env_dy, ground_material_usd, validation_flag, record_video_flag, training_data_num, stage_1_checkpoint_num, stage_2_checkpoint_num, stage_3_checkpoint_num): env = StoreTops_Env(pos, ori, usd_path, env_dx, env_dy, ground_material_usd, record_video_flag, training_data_num, stage_1_checkpoint_num, stage_2_checkpoint_num, stage_3_checkpoint_num) # hide prim to get garment point cloud set_prim_visible_group( prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/pusher"], visible=False, ) for i in range(50): env.step() env.garment_pcd, color = env.garment_camera.get_point_cloud_data_from_segment( save_or_not=False, save_path=get_unique_filename("data", extension=".ply"), ) set_prim_visible_group( prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/pusher"], visible=True, ) for i in range(50): env.step() if record_video_flag: env.thread_record.start() manipulation_points, indices, points_similarity = env.model.get_manipulation_points(input_pcd=env.garment_pcd, index_list=[1954, 1832, 528, 587]) manipulation_points[:, 2] = 0.025 # set z-axis to 0.005 to make sure dexhand can grasp the garment env.points_affordance_feature = normalize_columns(points_similarity[2:4].T) # move both dexhand to the manipulation points env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0], left_ori=np.array([0.579, -0.579, -0.406, 0.406]), right_pos=manipulation_points[1], right_ori=np.array([0.406, -0.406, -0.579, 0.579])) env.bimanual_dex.set_both_hand_state(left_hand_state="open", right_hand_state="open") env.bimanual_dex.set_both_hand_state(left_hand_state="close", right_hand_state="close") for i in range(20): env.step() left_dis=np.sqrt((manipulation_points[0][0]-manipulation_points[2][0])**2+(manipulation_points[0][1]-manipulation_points[2][1])**2) right_dis=np.sqrt((manipulation_points[1][0]-manipulation_points[3][0])**2+(manipulation_points[1][1]-manipulation_points[3][1])**2) distance=(left_dis+right_dis)/4 # get lift points y_off=0.01 z_off=0.005 left_lift_points,right_lift_points=np.array([manipulation_points[0][0], manipulation_points[0][1]-distance+y_off, distance+z_off]), np.array([manipulation_points[1][0], manipulation_points[1][1]-distance+y_off, distance+z_off]) # move both dexhand to the lift points env.bimanual_dex.dense_move_both_ik(left_pos=left_lift_points, left_ori=np.array([0.579, -0.579, -0.406, 0.406]), right_pos=right_lift_points, right_ori=np.array([0.406, -0.406, -0.579, 0.579])) left_lift_points,right_lift_points=np.array([manipulation_points[0][0], manipulation_points[2][1]+0.06, distance+z_off]), np.array([manipulation_points[1][0], manipulation_points[3][1]+0.06, distance+z_off]) env.bimanual_dex.dense_move_both_ik(left_pos=left_lift_points, left_ori=np.array([0.579, -0.579, -0.406, 0.406]), right_pos=right_lift_points, right_ori=np.array([0.406, -0.406, -0.579, 0.579])) # release the garment env.bimanual_dex.set_both_hand_state(left_hand_state="open", right_hand_state="open") env.garment.particle_material.set_gravity_scale(10.0) for i in range(200): env.step() env.garment.particle_material.set_gravity_scale(1.0) cprint("Store World Fold Procedure Finish! Store Procedure Begins!", "green", "on_green") pusher_center = np.array([0.0+env_dx, 1.10+env_dy, 0.0]) env.pusher.set_world_pose(position=pusher_center) # hide prim to get object point cloud set_prim_visible_group( prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/Garment/garment"], visible=False, ) for i in range(50): env.step() env.object_pcd, color = env.object_camera.get_point_cloud_data_from_segment( save_or_not=False, save_path=get_unique_filename("data", extension=".ply"), # real_time_watch=True, ) set_prim_visible_group( prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/Garment/garment"], visible=True, ) for i in range(50): env.step() # # hide prim to get garment point cloud # set_prim_visible_group( # prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/pusher"], # visible=False, # ) # for i in range(50): # env.step() # env.garment_pcd, color = env.garment_camera.get_point_cloud_data_from_segment( # save_or_not=False, # save_path=get_unique_filename("data", extension=".ply"), # ) # set_prim_visible_group( # prim_path_list=["/World/DexLeft", "/World/DexRight", "/World/pusher"], # visible=True, # ) # for i in range(50): # env.step() left_ori=np.array([0.7010574,0.5609855, 0.4304593, 0.092296]) right_ori=np.array([ 0.4304593, 0.092296, 0.7010574,0.5609855]) left_lift_points,right_lift_points=np.array([-0.5, 0.6, 0.65]), np.array([0.5, 0.6, 0.65]) env.bimanual_dex.dense_move_both_ik(left_pos=left_lift_points, left_ori=left_ori, right_pos=right_lift_points, right_ori=right_ori) # env.bimanual_dex.set_both_hand_state(left_hand_state="smooth", right_hand_state="smooth") for i in range(50): env.step() garment_fold_length = np.max([ abs(manipulation_points[0][1] - manipulation_points[2][1]), abs(manipulation_points[1][1] - manipulation_points[3][1]) ]) / 2 garment_fold_width = np.max([ abs(manipulation_points[0][0] - manipulation_points[1][0]), abs(manipulation_points[2][0] - manipulation_points[3][0]) ]) manipulation_points=manipulation_points[2:] left_off0=np.array([-0.08,-0.0,0.005]) right_off0=np.array([0.08,-0.0,0.005]) left_off1=np.array([-0.05,-0.0,0.002]) right_off1=np.array([0.05,-0.0,0.002]) left_off2=np.array([0.005,0.05,-0.0]) right_off2=np.array([-0.005,0.05,-0.0]) manipulation_points[:,2]=0.00 env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0]+left_off0, left_ori=left_ori, right_pos=manipulation_points[1]+right_off0, right_ori=right_ori) for i in range(20): env.step() env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0]+left_off1, left_ori=left_ori, right_pos=manipulation_points[1]+right_off1, right_ori=right_ori) for i in range(20): env.step() env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0]+left_off2, left_ori=left_ori, right_pos=manipulation_points[1]+right_off2, right_ori=right_ori) for i in range(20): env.step() # env.bimanual_dex.dense_move_both_ik(left_pos=manipulation_points[0]+left_off3, left_ori=left_ori, right_pos=manipulation_points[1]+right_off3, right_ori=right_ori) # for i in range(20): # env.step() for i in range(20): env.step() for i in range(10): print(f"Stage_1_Step: {i}") joint_pos_L = env.bimanual_dex.dexleft.get_joint_positions() joint_pos_R = env.bimanual_dex.dexright.get_joint_positions() joint_state = np.concatenate([joint_pos_L, joint_pos_R]) obs = dict() obs['agent_pos']=joint_state obs['environment_point_cloud']=env.env_camera.get_pointcloud_from_depth() obs['garment_point_cloud']=env.garment_pcd obs['object_point_cloud']=env.object_pcd obs['points_affordance_feature']=env.points_affordance_feature action=env.sadp.get_action(obs) print("action_shape:",action.shape) for j in range(4): action_L = ArticulationAction(joint_positions=action[j][:30]) action_R = ArticulationAction(joint_positions=action[j][30:]) env.bimanual_dex.dexleft.apply_action(action_L) env.bimanual_dex.dexright.apply_action(action_R) for _ in range(5): env.step() joint_pos_L = env.bimanual_dex.dexleft.get_joint_positions() joint_pos_R = env.bimanual_dex.dexright.get_joint_positions() joint_state = np.concatenate([joint_pos_L, joint_pos_R]) obs = dict() obs['agent_pos']=joint_state obs['environment_point_cloud']=env.env_camera.get_pointcloud_from_depth() obs['garment_point_cloud']=env.garment_pcd obs['object_point_cloud']=env.object_pcd obs['points_affordance_feature']=env.points_affordance_feature env.sadp.update_obs(obs) # if you wanna create gif, use this code. Need Cooperation with thread. if record_video_flag: if not os.path.exists("Data/Store_Tops_Validation_HALO/video"): os.makedirs("Data/Store_Tops_Validation_HALO/video") env.env_camera.create_mp4(get_unique_filename("Data/Store_Tops_Validation_HALO/video/video", ".mp4")) delete_prim("/World/DexLeft") delete_prim("/World/DexRight") for i in range(50): env.step() success=True store_state, color = env.judge_camera.get_point_cloud_data_from_segment(save_or_not=False) # get max_x min_x max_y min_y max_x = np.max(store_state[:, 0]) min_x = np.min(store_state[:, 0]) max_y = np.max(store_state[:, 1]) min_y = np.min(store_state[:, 1]) # get the center of the point cloud center_x = (max_x + min_x) / 2 center_y = (max_y + min_y) / 2 # get the distance between the center and the pusher distance = np.sqrt((center_x - pusher_center[0]) ** 2 + (center_y - pusher_center[1]) ** 2) # if the distance is less than 0.05, it is considered as success if distance < 0.1: success=True else: success=False cprint("----------- Judge Begin -----------", "blue", attrs=["bold"]) cprint(f"garment_center: {center_x}, {center_y}", "blue") cprint(f"pusher_center: {pusher_center[0]}, {pusher_center[1]}", "blue") cprint(f"distance: {distance}", "blue") cprint("----------- Judge End -----------", "blue", attrs=["bold"]) cprint(f"final result: {success}", color="green", on_color="on_green") if validation_flag: if not os.path.exists("Data/Store_Tops_Validation_HALO"): os.makedirs("Data/Store_Tops_Validation_HALO") # write into .log file with open("Data/Store_Tops_Validation_HALO/validation_log.txt", "a") as f: f.write(f"result:{success} usd_path:{env.garment.usd_path} pos_x:{pos[0]} pos_y:{pos[1]} env_dx:{env_dx} env_dy:{env_dy} \n") if not os.path.exists("Data/Store_Tops_Validation_HALO/final_state_pic"): os.makedirs("Data/Store_Tops_Validation_HALO/final_state_pic") env.env_camera.get_rgb_graph(save_or_not=True,save_path=get_unique_filename("Data/Store_Tops_Validation_HALO/final_state_pic/img",".png")) result = { "task_name": "Store Tops HALO", "success": bool(success), "garment_point_count": int(env.garment_pcd.shape[0]) if env.garment_pcd is not None else 0, "object_point_count": int(env.object_pcd.shape[0]) if env.object_pcd is not None else 0, "judge_point_count": int(store_state.shape[0]), "affordance_feature_norm": float(np.linalg.norm(env.points_affordance_feature)) if env.points_affordance_feature is not None else 0.0, "distance_to_target_center": float(distance), } return result if __name__=="__main__": args = parse_args_val() # initial setting pos = np.array([0.0, 0.7, 0.20]) ori = np.array([0.0, 0.0, 0.0]) usd_path = None env_dx = 0.0 env_dy = 0.0 if args.env_random_flag or args.garment_random_flag: rng_seed = _env_text("DEXGARMENTLAB_RNG_SEED") np.random.seed(int(rng_seed) if rng_seed is not None else int(time.time())) if args.env_random_flag: env_dx = np.random.uniform(-0.3, 0.3) # changeable env_dy = np.random.uniform(-0.1, 0.1) # changeable if args.garment_random_flag: x = np.random.uniform(-0.05, 0.05) # changeable y = np.random.uniform(0.65, 0.75) # changeable pos = np.array([x,y,0.0]) ori = np.array([0.0, 0.0, 0.0]) Base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) assets_lists = os.path.join(Base_dir,"Model_HALO/GAM/checkpoints/Tops_NoSleeve/assets_list.txt") assets_list = [] with open(assets_lists,"r",encoding='utf-8') as f: for line in f: clean_line = line.rstrip('\n') assets_list.append(clean_line) usd_path=np.random.choice(assets_list) forced_usd_path = _env_text("DEXGARMENTLAB_USD_PATH") if forced_usd_path is not None: usd_path = forced_usd_path forced_pos_x = _env_float("DEXGARMENTLAB_POS_X") forced_pos_y = _env_float("DEXGARMENTLAB_POS_Y") if forced_pos_x is not None or forced_pos_y is not None: pos = np.array([ pos[0] if forced_pos_x is None else forced_pos_x, pos[1] if forced_pos_y is None else forced_pos_y, pos[2], ]) forced_ori_z = _env_float("DEXGARMENTLAB_ORI_Z") if forced_ori_z is not None: ori = np.array([ori[0], ori[1], forced_ori_z]) forced_env_dx = _env_float("DEXGARMENTLAB_ENV_DX") if forced_env_dx is not None: env_dx = forced_env_dx forced_env_dy = _env_float("DEXGARMENTLAB_ENV_DY") if forced_env_dy is not None: env_dy = forced_env_dy result = StoreTops(pos, ori, usd_path, env_dx, env_dy, args.ground_material_usd, args.validation_flag, args.record_video_flag, args.training_data_num, args.stage_1_checkpoint_num, args.stage_2_checkpoint_num, args.stage_3_checkpoint_num) if EMIT_JSON: print(f"{RESULT_PREFIX}{json.dumps(result, sort_keys=True)}") if args.validation_flag: simulation_app.close() elif HEADLESS: simulation_app.close() else: while simulation_app.is_running(): simulation_app.update() simulation_app.close()