from typing import Any, Dict, Union import numpy as np import sapien import torch import mani_skill.envs.utils.randomization as randomization from mani_skill.agents.robots import SO100, Fetch, Panda from mani_skill.envs.sapien_env import BaseEnv from mani_skill.envs.tasks.tabletop.pick_cube_cfgs import PICK_CUBE_CONFIGS from mani_skill.sensors.camera import CameraConfig from mani_skill.utils import sapien_utils from mani_skill.utils.building import actors from mani_skill.utils.registration import register_env from mani_skill.utils.scene_builder.table import TableSceneBuilder from mani_skill.utils.structs.pose import Pose from mani_skill.utils.structs import Actor, Link #Robomme import matplotlib.pyplot as plt import random from mani_skill.utils.geometry.rotation_conversions import ( euler_angles_to_matrix, matrix_to_quaternion, ) from .utils import * from .utils.subgoal_evaluate_func import static_check from .utils.object_generation import spawn_fixed_cube, build_board_with_hole from .utils import reset_panda from .utils import subgoal_language from .utils.difficulty import normalize_robomme_difficulty from ..logging_utils import logger @register_env("BinFill") class BinFill(BaseEnv): _sample_video_link = "https://github.com/haosulab/ManiSkill/raw/main/figures/environment_demos/PickCube-v1_rt.mp4" SUPPORTED_ROBOTS = [ "panda", "fetch", "xarm6_robotiq", "so100", "widowxai", ] agent: Union[Panda] goal_thresh = 0.025 cube_spawn_half_size = 0.05 cube_spawn_center = (0, 0) # config_hard = { # 'color': 3, # 'spawn_cubes':4, # "put_in_color":3, # } # config_easy = { # 'color': 1, # 'spawn_cubes':8, # "put_in_color":1, # } # config_medium = { # 'color': 3, # 'spawn_cubes':4, # "put_in_color":1, # } config_easy = { 'color': 1, 'spawn_cubes':[4,6], "put_in_color":[1,1], "put_in_numbers":[1,3] } config_medium = { 'color': 2, 'spawn_cubes':[8,10], "put_in_color":[1,2], "put_in_numbers":[2,4] } config_hard = { 'color': 3, 'spawn_cubes':[10,12], "put_in_color":[2,3], "put_in_numbers":[3,5] } # Combine into a dictionary configs = { 'hard': config_hard, 'easy': config_easy, 'medium': config_medium } def __init__(self, *args, robot_uids="panda_wristcam", robot_init_qpos_noise=0,seed=0,Robomme_video_episode=None,Robomme_video_path=None, **kwargs): self.robot_init_qpos_noise = robot_init_qpos_noise self.use_demonstrationwrapper=False self.demonstration_record_traj=False normalized_robomme_difficulty = normalize_robomme_difficulty( kwargs.pop("difficulty", None) ) self.robomme_failure_recovery = bool( kwargs.pop("robomme_failure_recovery", False) ) self.robomme_failure_recovery_mode = kwargs.pop( "robomme_failure_recovery_mode", None ) if isinstance(self.robomme_failure_recovery_mode, str): self.robomme_failure_recovery_mode = self.robomme_failure_recovery_mode.lower() if normalized_robomme_difficulty is not None: self.difficulty = normalized_robomme_difficulty else: # Determine difficulty based on seed % 3 seed_mod = seed % 3 if seed_mod == 0: self.difficulty = "easy" elif seed_mod == 1: self.difficulty = "medium" else: # seed_mod == 2 self.difficulty = "hard" #self.difficulty = "hard" if robot_uids in PICK_CUBE_CONFIGS: cfg = PICK_CUBE_CONFIGS[robot_uids] else: cfg = PICK_CUBE_CONFIGS["panda"] self.cube_half_size = cfg["cube_half_size"] self.goal_thresh = cfg["goal_thresh"] self.cube_spawn_half_size = cfg["cube_spawn_half_size"] self.cube_spawn_center = cfg["cube_spawn_center"] self.max_goal_height = cfg["max_goal_height"] self.sensor_cam_eye_pos = cfg["sensor_cam_eye_pos"] self.sensor_cam_target_pos = cfg["sensor_cam_target_pos"] self.human_cam_eye_pos = cfg["human_cam_eye_pos"] self.human_cam_target_pos = cfg["human_cam_target_pos"] self.seed = seed self.generator = torch.Generator() self.generator.manual_seed(seed) self.dynamic=bool(torch.randint(0, 2, (1,), generator=self.generator).item()) # Track the color order and counts used to describe the language goal. self.binfill_language_sequence = [] super().__init__(*args, robot_uids=robot_uids, **kwargs) @property def _default_sensor_configs(self): pose = sapien_utils.look_at( eye=self.sensor_cam_eye_pos, target=self.sensor_cam_target_pos ) camera_eye=[0.3,0,0.4] camera_target =[0,0,-0.2] pose = sapien_utils.look_at( eye=camera_eye, target=camera_target ) return [CameraConfig("base_camera", pose, 256, 256, np.pi / 2, 0.01, 100)] @property def _default_human_render_camera_configs(self): pose = sapien_utils.look_at( eye=self.human_cam_eye_pos, target=self.human_cam_target_pos ) camera_eye=[1,0,0.4] camera_target =[0,0,0.4] pose = sapien_utils.look_at( eye=camera_eye, target=camera_target ) return CameraConfig("render_camera", pose, 512, 512, 1, 0.01, 100) def _load_agent(self, options: dict): super()._load_agent(options, sapien.Pose(p=[-0.615, 0, 0])) def _load_scene(self, options: dict): self.table_scene = TableSceneBuilder( self, robot_init_qpos_noise=self.robot_init_qpos_noise ) self.table_scene.build() # Create generator for all randomization generator = self.generator button_obb = build_button( self, center_xy=(-0.2, 0), scale=1.5, generator=generator, ) avoid = [button_obb] # Create square board with square hole x_var = torch.rand(1, generator=generator).item() * 0.2 - 0.2 # [-0.25, 0.25] y_var = torch.rand(1, generator=generator).item() * 0.4 - 0.2 # [-0.25, 0.25] z_rot_deg = (torch.rand(1, generator=generator).item() * 40.0 - 20.0) # [-20, 20] degrees z_rot_rad = torch.deg2rad(torch.tensor(z_rot_deg)) # Create rotation quaternion for z-axis rotation rot_mat = euler_angles_to_matrix(torch.tensor([[0.0, 0.0, z_rot_rad]]), convention="XYZ") rot_quat = matrix_to_quaternion(rot_mat)[0] # [w, x, y, z] self.board_with_hole = build_board_with_hole( self, board_side=0.1, # Side length of square board hole_side=0.08, # Side length of square hole, slightly larger than cube for passing thickness=0.05, # Board thickness position=[0.15 + x_var, 0.0 + y_var, 0.0], # Board position rotation_quat=rot_quat.tolist(), # z-axis rotation name="board_with_hole" ) avoid += [self.board_with_hole] ### ### ### ### ### # First generate target_number (put_in): # If put_in_color == 1: Randomly select a color, assign target count in range [put_in_range[0], put_in_range[1]] # If put_in_color == 3: # First generate total target count total_target from put_in_range # Start from [0, 0, 0], randomly distribute to three colors (no requirement for min 1 per color) # Then generate spawn_number: # If num_colors == 1: Only the color with target will spawn cube, spawn count = max(total_spawn, target count) # If num_colors == 3: Spawn count for each color at least equals target, remaining spawn count distributed randomly # This ensures spawn >= target for each color. # Get configuration for current difficulty config = self.configs[self.difficulty] num_colors = config['color'] # 1 or 3 spawn_range = config['spawn_cubes'] # [min, max] put_in_color_range = config['put_in_color'] color_pool = torch.randperm(3, generator=generator).tolist()[:num_colors] put_in_color = torch.randint( put_in_color_range[0], put_in_color_range[1] + 1, (1,), generator=generator ).item() put_in_color = max(1, min(3, put_in_color)) put_in_color = min(put_in_color, max(1, num_colors)) active_color_indices = color_pool[:put_in_color] put_in_range = config['put_in_numbers'] # [min, max] # First generate target_number (put_in) target_numbers = [0, 0, 0] if put_in_color == 1: # Only one color needs to be put in bin selected_idx = active_color_indices[0] target_numbers[selected_idx] = torch.randint(put_in_range[0], put_in_range[1] + 1, (1,), generator=generator).item() else: # All 3 colors need to be put in bin, generate total number first then distribute total_target = torch.randint(put_in_range[0], put_in_range[1] + 1, (1,), generator=generator).item() # Randomly distribute target number to three colors for _ in range(total_target): idx = torch.randint(0, len(active_color_indices), (1,), generator=generator).item() target_numbers[active_color_indices[idx]] += 1 self.red_cubes_target_number = target_numbers[0] self.blue_cubes_target_number = target_numbers[1] self.green_cubes_target_number = target_numbers[2] # Then generate spawn_number, ensure spawn >= target total_spawn = torch.randint(spawn_range[0], spawn_range[1] + 1, (1,), generator=generator).item() if num_colors == 1: # Only one color has cube, choose the one with target (if none, use first color in color_pool) spawn_numbers = [0, 0, 0] active_idx = next((i for i in color_pool if target_numbers[i] > 0), color_pool[0]) # Spawn number at least equals target number spawn_numbers[active_idx] = max(total_spawn, target_numbers[active_idx]) else: # num_colors controls 1/2/3 colors: ensure each selected color has at least 1 spawn, and spawn >= target spawn_numbers = [0, 0, 0] for i in color_pool: spawn_numbers[i] = max(target_numbers[i], 1) used_spawn = sum(spawn_numbers[i] for i in color_pool) remaining = total_spawn - used_spawn # Randomly distribute remaining spawn count for _ in range(max(0, remaining)): idx = torch.randint(0, len(color_pool), (1,), generator=generator).item() spawn_numbers[color_pool[idx]] += 1 self.red_cubes_spawn_number = spawn_numbers[0] self.blue_cubes_spawn_number = spawn_numbers[1] self.green_cubes_spawn_number = spawn_numbers[2] logger.debug(f"Target numbers - Red: {self.red_cubes_target_number}, Blue: {self.blue_cubes_target_number}, Green: {self.green_cubes_target_number}") logger.debug(f"Spawn numbers - Red: {self.red_cubes_spawn_number}, Blue: {self.blue_cubes_spawn_number}, Green: {self.green_cubes_spawn_number}") ### ### ### ### ### self.all_cubes = [] self.red_cubes, self.blue_cubes, self.green_cubes = [], [], [] color_info = [ {"color": (1, 0, 0, 1), "name": "red", "list": self.red_cubes, "spawn_num": self.red_cubes_spawn_number}, {"color": (0, 0, 1, 1), "name": "blue", "list": self.blue_cubes, "spawn_num": self.blue_cubes_spawn_number}, {"color": (0, 1, 0, 1), "name": "green", "list": self.green_cubes, "spawn_num": self.green_cubes_spawn_number} ] # Generate task list for all cubes and shuffle order cube_tasks = [] for info in color_info: for idx in range(info["spawn_num"]): cube_tasks.append({"color": info["color"], "name": info["name"], "list": info["list"], "idx": idx}) # Shuffle generation order shuffle_order = torch.randperm(len(cube_tasks), generator=generator).tolist() cube_tasks = [cube_tasks[i] for i in shuffle_order] # Spawn cubes in shuffled order for task in cube_tasks: try: cube = spawn_random_cube( self, color=task["color"], avoid=avoid, include_existing=False, include_goal=False, region_center=[-0.1, 0], region_half_size=[0.2, 0.25], half_size=self.cube_half_size, min_gap=self.cube_half_size, random_yaw=True, name_prefix=f"cube_{task['name']}_{task['idx']}", generator=generator, ) self.all_cubes.append(cube) task["list"].append(cube) avoid.append(cube) except RuntimeError as e: logger.debug(f"Failed to spawn {task['name']} cube {task['idx']}: {e}") logger.debug(f"Generated {len(self.all_cubes)} cubes total (red: {len(self.red_cubes)}, blue: {len(self.blue_cubes)}, green: {len(self.green_cubes)})") def _initialize_episode(self, env_idx: torch.Tensor, options: dict): with torch.device(self.device): b = len(env_idx) self.table_scene.initialize(env_idx) qpos=reset_panda.get_reset_panda_param("qpos") self.agent.reset(qpos) tasks=[] self.red_cubes_in_bin=0 self.blue_cubes_in_bin=0 self.green_cubes_in_bin=0 self.binfill_language_sequence = [] color_task_definitions = [ ("blue", self.blue_cubes, self.blue_cubes_target_number), ("red", self.red_cubes, self.red_cubes_target_number), ("green", self.green_cubes, self.green_cubes_target_number), ] color_order = torch.randperm(len(color_task_definitions), generator=self.generator).tolist() for color_idx in color_order: color_name, cube_collection, target_number = color_task_definitions[color_idx] if target_number <= 0: continue self.binfill_language_sequence.append((color_name, target_number)) for i in range(target_number): cube = cube_collection[i] tasks.append({ "func": lambda c=self.all_cubes: is_any_obj_pickup_flag_currentpickup(self,objects=c), "name": subgoal_language.get_subgoal_with_index(i, "pick up the {idx} {color} cube", color=color_name), "subgoal_segment": subgoal_language.get_subgoal_with_index(i, "pick up the {idx} {color} cube at <>", color=color_name), "choice_label": "pick up the cube", "demonstration": False, "failure_func": lambda:is_button_pressed(self, obj=self.button), "solve": lambda env, planner, c=cube: solve_pickup(env, planner, obj=c), "segment":[cube_collection[i]] }) tasks.append({ "func": lambda c=self.all_cubes: is_any_obj_dropped_onto_delete(self, objects=c, target=self.board_with_hole), "name": f"put it into the bin", "subgoal_segment":"put it into the bin at <>", "choice_label": "put it into the bin", "demonstration": False, "failure_func": lambda:is_button_pressed(self, obj=self.button), "solve": lambda env, planner, c=cube: [ solve_putonto_whenhold_binspecial(env, planner, target=self.board_with_hole), ], "segment":[self.board_with_hole] }) tasks.append({ "func": lambda: is_button_pressed(self, obj=self.button), "name": "press the button", "subgoal_segment":"press the button at <>", "choice_label": "press the button", "demonstration": False, "failure_func":lambda c=self.all_cubes:[not check_in_bin_number(self,in_bin_list= [self.red_cubes_in_bin, self.blue_cubes_in_bin, self.green_cubes_in_bin], total_number_list=[self.red_cubes_target_number, self.blue_cubes_target_number, self.green_cubes_target_number]) ,is_any_obj_dropped_onto_delete(self, objects=c, target=self.board_with_hole)], "solve": lambda env, planner: [solve_button(env, planner, obj=self.button)], "segment":self.cap_link }) self.task_list=tasks # Record pickup related task indices and items for recovery self.recovery_pickup_indices, self.recovery_pickup_tasks = task4recovery(self.task_list) if self.robomme_failure_recovery: # Only inject an intentional failed grasp when recovery mode is enabled self.fail_grasp_task_index = inject_fail_grasp( self.task_list, generator=self.generator, mode=self.robomme_failure_recovery_mode, ) else: self.fail_grasp_task_index = None def _get_obs_extra(self, info: Dict): return dict() def evaluate(self,solve_complete_eval=False): self.successflag=torch.tensor([False]) # Save current_task_failure state before calling sequential_task_check # This is because failure might be detected during step(), but sequential_task_check might reset it previous_failure = getattr(self, "current_task_failure", False) self.failureflag = torch.tensor([False]) if(self.use_demonstrationwrapper==False):# change subgoal after planner ends during recording if solve_complete_eval==True: allow_subgoal_change_this_timestep=True else: allow_subgoal_change_this_timestep=False else:# during demonstration, video needs to call evaluate(solve_complete_eval), video ends and flag changes in demonstrationwrapper if solve_complete_eval==True or self.demonstration_record_traj==False: allow_subgoal_change_this_timestep=True else: allow_subgoal_change_this_timestep=False # Use encapsulated sequence task check function all_tasks_completed, current_task_name, task_failed ,self.current_task_specialflag= sequential_task_check(self, self.task_list,allow_subgoal_change_this_timestep=allow_subgoal_change_this_timestep) # If task failed, mark as failed immediately # Or if failure was detected previously (previous_failure), also mark as failed if task_failed or previous_failure: self.failureflag = torch.tensor([True]) if task_failed: logger.debug(f"Task failed: {current_task_name}") elif previous_failure: # If marked failed due to previous_failure, ensure current_task_failure is also set self.current_task_failure = True # If static_check succeeds or all tasks completed, set success flag if all_tasks_completed and not task_failed: self.successflag = torch.tensor([True]) return { "success": self.successflag, "fail": self.failureflag, } def compute_dense_reward(self, obs: Any, action: torch.Tensor, info: Dict): tcp_to_obj_dist = torch.linalg.norm( self.agent.tcp_pose.p - self.agent.tcp_pose.p, axis=1 ) reaching_reward = 1 - torch.tanh(5 * tcp_to_obj_dist) reward = reaching_reward*0 return reward def compute_normalized_dense_reward( self, obs: Any, action: torch.Tensor, info: Dict ): return self.compute_dense_reward(obs=obs, action=action, info=info) / 5 #Robomme def step(self, action: Union[None, np.ndarray, torch.Tensor, Dict]): self.vis_obj_id_list=[] timestep = self.elapsed_steps if self.dynamic: # Dynamically lift cubes for each color (starting from 2nd cube) for cube_list in [self.red_cubes, self.blue_cubes, self.green_cubes]: for idx in range(1, len(cube_list)): lift_and_drop_objects_back_to_original( self, obj=cube_list[idx], start_step=0, end_step=idx * 100, cur_step=timestep, ) obs, reward, terminated, truncated, info = super().step(action) return obs, reward, terminated, truncated, info