RoboMME / src /robomme /robomme_env /PickXtimes.py
HongzeFu's picture
change to 256
467d2ce
from typing import Any, Dict, Union
import numpy as np
import sapien
import torch
import mani_skill.envs.utils.randomization as randomization
from mani_skill.agents.robots import SO100, Fetch, Panda
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.envs.tasks.tabletop.pick_cube_cfgs import PICK_CUBE_CONFIGS
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import sapien_utils
from mani_skill.utils.building import actors
from mani_skill.utils.registration import register_env
from mani_skill.utils.scene_builder.table import TableSceneBuilder
from mani_skill.utils.structs.pose import Pose
#Robomme
import matplotlib.pyplot as plt
import random
from mani_skill.utils.geometry.rotation_conversions import (
euler_angles_to_matrix,
matrix_to_quaternion,
)
from .utils import *
from .utils.subgoal_evaluate_func import static_check
from .utils import subgoal_language
from .utils.object_generation import spawn_fixed_cube, build_board_with_hole
from .utils import reset_panda
from .utils.difficulty import normalize_robomme_difficulty
from ..logging_utils import logger
PICK_CUBE_DOC_STRING = """**Task Description:**
A simple task where the objective is to grasp a red cube with the {robot_id} robot and move it to a target goal position. This is also the *baseline* task to test whether a robot with manipulation
capabilities can be simulated and trained properly. Hence there is extra code for some robots to set them up properly in this environment as well as the table scene builder.
**Randomizations:**
- the cube's xy position is randomized on top of a table in the region [0.1, 0.1] x [-0.1, -0.1]. It is placed flat on the table
- the cube's z-axis rotation is randomized to a random angle
- the target goal position (marked by a green sphere) of the cube has its xy position randomized in the region [0.1, 0.1] x [-0.1, -0.1] and z randomized in [0, 0.3]
**Success Conditions:**
- the cube position is within `goal_thresh` (default 0.025m) euclidean distance of the goal position
- the robot is static (q velocity < 0.2)
"""
@register_env("PickXtimes")
class PickXtimes(BaseEnv):
_sample_video_link = "https://github.com/haosulab/ManiSkill/raw/main/figures/environment_demos/PickCube-v1_rt.mp4"
SUPPORTED_ROBOTS = [
"panda",
"fetch",
"xarm6_robotiq",
"so100",
"widowxai",
]
agent: Union[Panda]
goal_thresh = 0.025
cube_spawn_half_size = 0.05
cube_spawn_center = (0, 0)
config_hard = {
'color': 3,
'number_min': 4,
'number_max':5,
}
config_easy = {
'color': 1,
'number_min': 1,
'number_max':3
}
config_medium = {
'color': 3,
'number_min': 1,
'number_max':3
}
# Combine into a dictionary
configs = {
'hard': config_hard,
'easy': config_easy,
'medium': config_medium
}
def __init__(self, *args, robot_uids="panda_wristcam", robot_init_qpos_noise=0,seed=0,Robomme_video_episode=None,Robomme_video_path=None,
**kwargs):
self.use_demonstrationwrapper=False
self.demonstration_record_traj=False
self.robot_init_qpos_noise = robot_init_qpos_noise
if robot_uids in PICK_CUBE_CONFIGS:
cfg = PICK_CUBE_CONFIGS[robot_uids]
else:
cfg = PICK_CUBE_CONFIGS["panda"]
self.cube_half_size = cfg["cube_half_size"]
self.goal_thresh = cfg["goal_thresh"]
self.cube_spawn_half_size = cfg["cube_spawn_half_size"]
self.cube_spawn_center = cfg["cube_spawn_center"]
self.max_goal_height = cfg["max_goal_height"]
self.sensor_cam_eye_pos = cfg["sensor_cam_eye_pos"]
self.sensor_cam_target_pos = cfg["sensor_cam_target_pos"]
self.human_cam_eye_pos = cfg["human_cam_eye_pos"]
self.human_cam_target_pos = cfg["human_cam_target_pos"]
self.robomme_failure_recovery = bool(
kwargs.pop("robomme_failure_recovery", False)
)
self.robomme_failure_recovery_mode = kwargs.pop(
"robomme_failure_recovery_mode", None
)
if isinstance(self.robomme_failure_recovery_mode, str):
self.robomme_failure_recovery_mode = (
self.robomme_failure_recovery_mode.lower()
)
self.seed = seed
normalized_robomme_difficulty = normalize_robomme_difficulty(
kwargs.pop("difficulty", None)
)
if normalized_robomme_difficulty is not None:
self.difficulty = normalized_robomme_difficulty
else:
# Determine difficulty based on seed % 3
seed_mod = seed % 3
if seed_mod == 0:
self.difficulty = "easy"
elif seed_mod == 1:
self.difficulty = "medium"
else: # seed_mod == 2
self.difficulty = "hard"
# Use seed to randomly determine number of repetitions (1-5)
generator = torch.Generator()
generator.manual_seed(seed)
self.num_repeats = torch.randint(self.configs[self.difficulty]['number_min'], self.configs[self.difficulty]['number_max']+1, (1,), generator=generator).item()
logger.debug(f"Task will repeat {self.num_repeats} times (pickup-drop cycles)")
super().__init__(*args, robot_uids=robot_uids, **kwargs)
@property
def _default_sensor_configs(self):
pose = sapien_utils.look_at(
eye=self.sensor_cam_eye_pos, target=self.sensor_cam_target_pos
)
camera_eye=[0.3,0,0.4]
camera_target =[0,0,-0.2]
pose = sapien_utils.look_at(
eye=camera_eye, target=camera_target
)
return [CameraConfig("base_camera", pose, 256, 256, np.pi / 2, 0.01, 100)]
@property
def _default_human_render_camera_configs(self):
pose = sapien_utils.look_at(
eye=self.human_cam_eye_pos, target=self.human_cam_target_pos
)
return CameraConfig("render_camera", pose, 512, 512, 1, 0.01, 100)
def _load_agent(self, options: dict):
super()._load_agent(options, sapien.Pose(p=[-0.615, 0, 0]))
def _load_scene(self, options: dict):
generator = torch.Generator()
generator.manual_seed(self.seed)
self.table_scene = TableSceneBuilder(
self, robot_init_qpos_noise=self.robot_init_qpos_noise
)
self.table_scene.build()
button_obb = build_button(
self,
center_xy=(-0.2, 0),
scale=1.5,
generator=generator,
)
avoid = [button_obb]
self.all_cubes = [] # Save all cube objects
# Initialize storage for each color group
self.red_cubes = []
self.red_cube_names = []
self.blue_cubes = []
self.blue_cube_names = []
self.green_cubes = []
self.green_cube_names = []
cubes_per_color = 1
color_groups = [
{"color": (1, 0, 0, 1), "name": "red", "list": self.red_cubes, "name_list": self.red_cube_names},
{"color": (0, 0, 1, 1), "name": "blue", "list": self.blue_cubes, "name_list": self.blue_cube_names},
{"color": (0, 1, 0, 1), "name": "green", "list": self.green_cubes, "name_list": self.green_cube_names}
]
shuffle_indices = torch.randperm(len(color_groups), generator=generator).tolist()
color_groups = [color_groups[i] for i in shuffle_indices]
# Randomly select target color using generator
target_color_idx = torch.randint(0, len(color_groups), (1,), generator=generator).item()
self.target_color_name = color_groups[target_color_idx]["name"]
logger.debug(f"Target color selected: {self.target_color_name}")
# Generate 5 cubes for each color group
for idx, group in enumerate(color_groups):
if idx < self.configs[self.difficulty]['color']:
for idx in range(cubes_per_color):
try:
cube = spawn_random_cube(
self,
color=group["color"],
avoid=avoid,
include_existing=False,
include_goal=False,
region_center=[-0.1, 0],
region_half_size=0.2,
half_size=self.cube_half_size,
min_gap=self.cube_half_size,
random_yaw=True,
name_prefix=f"cube_{group['name']}_{idx}",
generator=generator,
)
except RuntimeError as e:
logger.debug(f"Failed to generate {group['name']} cube {idx}: {e}")
break
self.all_cubes.append(cube)
group["list"].append(cube)
cube_name = f"cube_{group['name']}_{idx}"
group["name_list"].append(cube_name)
setattr(self, cube_name, cube)
avoid.append(cube)
logger.debug(f"Generated {len(group['list'])} {group['name']} cubes")
logger.debug(f"Generated {len(self.all_cubes)} cubes total (red: {len(self.red_cubes)}, blue: {len(self.blue_cubes)}, green: {len(self.green_cubes)})")
try:
target = spawn_random_target(
self,
avoid=avoid, # Use current avoidance list, containing all spawned cubes
include_existing=False, # Manually maintain list
include_goal=False, # Manually maintain list
region_center=[-0.1, 0],
region_half_size=0.2,
radius=self.cube_half_size*2, # Use radius instead of half_size
thickness=0.005, # target thickness
min_gap=self.cube_half_size*2, # Gap requirement same as cube
name_prefix=f"target",
generator=generator
)
except RuntimeError as e:
logger.debug(f"Target sampling failed: {e}")
# Assign target to self.target_0, self.target_1 etc. attributes
setattr(self, f"target", target)
# Add newly generated target to avoidance list
avoid.append(target)
# Randomly select one cube from all available cubes as the target
if len(self.all_cubes) > 0:
target_cube_idx = torch.randint(0, len(self.all_cubes), (1,), generator=generator).item()
self.target_cube = self.all_cubes[target_cube_idx]
# Determine the color of the selected target cube
if self.target_cube in self.red_cubes:
self.target_color_name = "red"
elif self.target_cube in self.blue_cubes:
self.target_color_name = "blue"
elif self.target_cube in self.green_cubes:
self.target_color_name = "green"
logger.debug(f"Target cube selected: {self.target_color_name} cube (index {target_cube_idx} in all_cubes)")
else:
self.target_cube = None
self.target_color_name = None
logger.debug("No cubes generated, no target cube selected")
# Create list of non-target cubes for failure checking
self.non_target_cubes = [cube for cube in self.all_cubes if cube != self.target_cube]
logger.debug(f"Non-target cubes: {len(self.non_target_cubes)}")
# Dynamically generate task list for N pickup-drop cycles
tasks = []
for i in range(self.num_repeats):
tasks.append({
"func": (lambda i=i: is_obj_pickup(self, obj=self.target_cube)),
"name": subgoal_language.get_subgoal_with_index(i, "pick up the {color} cube for the {idx} time", color=self.target_color_name),
"subgoal_segment": subgoal_language.get_subgoal_with_index(i, "pick up the {color} cube at <> for the {idx} time", color=self.target_color_name),
"choice_label": "pick up the cube",
"demonstration": False,
"failure_func": lambda: [is_any_obj_pickup(self, self.non_target_cubes),is_button_pressed(self, obj=self.button)],
"solve": lambda env, planner: solve_pickup(env,planner,self.target_cube),
"segment":self.target_cube
})
tasks.append({
"func": (lambda: is_obj_dropped_onto(self,obj=self.target_cube,target=self.target)),
"name": f"place the {self.target_color_name} cube onto the target",
"subgoal_segment": f"place the {self.target_color_name} cube onto the target at <>",
"choice_label": "place the cube onto the target",
"demonstration": False,
"failure_func": lambda: [is_any_obj_pickup(self, self.non_target_cubes),is_button_pressed(self, obj=self.button)],
"solve": lambda env, planner: solve_putonto_whenhold(env, planner, target=self.target),
"segment":self.target
})
tasks.append( {
"func": lambda:is_button_pressed(self, obj=self.button),
"name": "press the button to stop",
"subgoal_segment": "press the button at <> to stop",
"choice_label": "press the button to stop",
"demonstration": False,
"failure_func":lambda:is_any_obj_pickup(self, self.all_cubes),
"solve": lambda env, planner: solve_button(env, planner, obj=self.button),
"segment":self.cap_link
})
# Store task list for RecordWrapper use
self.task_list = tasks
# Record pickup related task indices and items for recovery
self.recovery_pickup_indices, self.recovery_pickup_tasks = task4recovery(self.task_list)
if self.robomme_failure_recovery:
# Only inject an intentional failed grasp when recovery mode is enabled
self.fail_grasp_task_index = inject_fail_grasp(
self.task_list,
generator=generator,
mode=self.robomme_failure_recovery_mode,
)
else:
self.fail_grasp_task_index = None
def _initialize_episode(self, env_idx: torch.Tensor, options: dict):
with torch.device(self.device):
b = len(env_idx)
self.table_scene.initialize(env_idx)
qpos=reset_panda.get_reset_panda_param("qpos")
self.agent.reset(qpos)
logger.debug(self.agent.robot.qpos)
def _get_obs_extra(self, info: Dict):
return dict()
def evaluate(self,solve_complete_eval=False):
previous_failure = getattr(self, "failureflag", None)
self.successflag = torch.tensor([False])
if previous_failure is not None and bool(previous_failure.item()):
self.failureflag = previous_failure
else:
self.failureflag = torch.tensor([False])
# Use encapsulated sequence task check function
if(self.use_demonstrationwrapper==False):# change subgoal after planner ends during recording
if solve_complete_eval==True:
allow_subgoal_change_this_timestep=True
else:
allow_subgoal_change_this_timestep=False
else:# during demonstration, video needs to call evaluate(solve_complete_eval), video ends and flag changes in demonstrationwrapper
if solve_complete_eval==True or self.demonstration_record_traj==False:
allow_subgoal_change_this_timestep=True
else:
allow_subgoal_change_this_timestep=False
all_tasks_completed, current_task_name, task_failed,self.current_task_specialflag= sequential_task_check(self, self.task_list,allow_subgoal_change_this_timestep=allow_subgoal_change_this_timestep)
# If task failed, mark as failed immediately
if task_failed:
self.failureflag = torch.tensor([True])
logger.debug(f"Task failed: {current_task_name}")
# If static_check succeeds or all tasks completed, set success flag
if all_tasks_completed and not task_failed:
self.successflag = torch.tensor([True])
return {
"success": self.successflag,
"fail": self.failureflag,
}
def compute_dense_reward(self, obs: Any, action: torch.Tensor, info: Dict):
tcp_to_obj_dist = torch.linalg.norm(
self.agent.tcp_pose.p - self.agent.tcp_pose.p, axis=1
)
reaching_reward = 1 - torch.tanh(5 * tcp_to_obj_dist)
reward = reaching_reward*0
return reward
def compute_normalized_dense_reward(
self, obs: Any, action: torch.Tensor, info: Dict
):
return self.compute_dense_reward(obs=obs, action=action, info=info) / 5
#Robomme
def step(self, action: Union[None, np.ndarray, torch.Tensor, Dict]):
# highlight_obj(self,self.target_cube, start_step=0, end_step=30, cur_step=timestep)
obs, reward, terminated, truncated, info = super().step(action)
return obs, reward, terminated, truncated, info