| import os |
| import h5py |
| import yaml |
| import numpy as np |
| |
| from configs.state_vec import STATE_VEC_IDX_MAPPING |
| import glob |
| from scipy.interpolate import interp1d |
| from PIL import Image |
|
|
|
|
| def interpolate_action_sequence(action_sequence, target_size): |
| """ |
| Extend the action sequece to `target_size` by linear interpolation. |
| |
| Args: |
| action_sequence (np.ndarray): original action sequence, shape (N, D). |
| target_size (int): target sequence length. |
| |
| Returns: |
| extended_sequence (np.ndarray): extended action sequence, shape (target_size, D). |
| """ |
| N, D = action_sequence.shape |
| indices_old = np.arange(N) |
| indices_new = np.linspace(0, N - 1, target_size) |
|
|
| interp_func = interp1d(indices_old, action_sequence, |
| kind='linear', axis=0, assume_sorted=True) |
| action_sequence_new = interp_func(indices_new) |
|
|
| return action_sequence_new |
|
|
|
|
| class HDF5VLADataset: |
| """ |
| This class is used to sample episodes from the embodiment dataset |
| stored in HDF5 files. |
| """ |
| def __init__(self): |
| |
| self.DATASET_NAME = "agilex" |
|
|
| self.data_dir = "data/datasets/rdt-ft-data/demo_1k" |
| self.tasks = os.listdir(self.data_dir) |
| |
| self.tasks = ['PickCube-v1', 'StackCube-v1', 'PlugCharger-v1', 'PushCube-v1', 'PegInsertionSide-v1'] |
| |
| with open('configs/base.yaml', 'r') as file: |
| config = yaml.safe_load(file) |
| self.CHUNK_SIZE = config['common']['action_chunk_size'] |
| self.IMG_HISTORY_SIZE = config['common']['img_history_size'] |
| self.STATE_DIM = config['common']['state_dim'] |
|
|
| self.num_episode_per_task = 1000 |
| self.img = [] |
| self.state = [] |
| self.action = [] |
|
|
| |
| for task in self.tasks: |
| file_path = glob.glob(os.path.join(self.data_dir, task, 'motionplanning', '*.h5'))[0] |
| with h5py.File(file_path, "r") as f: |
| trajs = f.keys() |
| |
| trajs = sorted(trajs, key=lambda x: int(x.split('_')[-1])) |
| for traj in trajs: |
| |
| states = f[traj]['obs']['agent']['qpos'][:] |
| actions = f[traj]['actions'][:] |
|
|
| self.state.append(states) |
| self.action.append(actions) |
| |
| |
| self.state_min = np.concatenate(self.state).min(axis=0) |
| self.state_max = np.concatenate(self.state).max(axis=0) |
| self.action_min = np.concatenate(self.action).min(axis=0) |
| self.action_max = np.concatenate(self.action).max(axis=0) |
| self.action_std = np.concatenate(self.action).std(axis=0) |
| self.action_mean = np.concatenate(self.action).mean(axis=0) |
| |
| self.task2lang = { |
| "PegInsertionSide-v1": "Pick up a orange-white peg and insert the orange end into the box with a hole in it.", |
| "PickCube-v1": "Grasp a red cube and move it to a target goal position.", |
| "StackCube-v1": "Pick up a red cube and stack it on top of a green cube and let go of the cube without it falling.", |
| "PlugCharger-v1": "Pick up one of the misplaced shapes on the board/kit and insert it into the correct empty slot.", |
| "PushCube-v1": "Push and move a cube to a goal region in front of it." |
| } |
|
|
| def __len__(self): |
| |
| return len(self.tasks) * self.num_episode_per_task |
|
|
| def get_dataset_name(self): |
| return self.DATASET_NAME |
|
|
| def get_item(self, index=None): |
| """ |
| Get a training sample at a random timestep. |
| |
| Args: |
| index (int, optional): The index of the episode. |
| If not provided, a random episode will be selected. |
| state_only (bool, optional): Whether to return only the state. |
| In this way, the sample will contain a complete trajectory rather |
| than a single timestep. Defaults to False. |
| |
| Returns: |
| sample (dict): A dictionary containing the training sample. |
| """ |
| while True: |
| if index is None: |
| index = np.random.randint(0, self.__len__()) |
| valid, sample = self.parse_hdf5_file(index) |
| if valid: |
| return sample |
| else: |
| index = np.random.randint(0, self.__len__()) |
|
|
| def parse_hdf5_file(self, index): |
| """ |
| Parse an HDF5 file to generate a training sample at a random timestep. |
| |
| Args: |
| file_path (str): The path to the HDF5 file. |
| |
| Returns: |
| valid (bool): Whether the episode is valid. |
| dict: A dictionary containing the training sample. |
| """ |
| num_steps = len(self.action[index]) |
| step_index = np.random.randint(0, num_steps) |
| task_index = index // self.num_episode_per_task |
| language = self.task2lang[self.tasks[task_index]] |
| task_inner_index = index % self.num_episode_per_task |
| |
| if self.tasks[task_index] == 'PegInsertionSide-v1' and task_inner_index > 400: |
| return False, None |
| proc_index = task_inner_index // 100 |
| episode_index = task_inner_index % 100 |
| |
| |
| states = (self.state[index] - self.state_min) / (self.state_max - self.state_min) * 2 - 1 |
| states = states[:, :-1] |
| actions = (self.action[index] - self.action_min) / (self.action_max - self.action_min) * 2 - 1 |
| |
| |
| start_img_idx = max(0, step_index - self.IMG_HISTORY_SIZE + 1) |
| end_img_idx = step_index + 1 |
| img_history = [] |
| for i in range(start_img_idx, end_img_idx): |
| img_path = os.path.join(self.data_dir, self.tasks[task_index], 'motionplanning', f'{proc_index}', f'{episode_index}', f"{i + 1}.png") |
| img = np.array(Image.open(img_path)) |
| img_history.append(img) |
| img_history = np.array(img_history) |
| |
| img_valid_len = img_history.shape[0] |
|
|
| |
| if img_valid_len < self.IMG_HISTORY_SIZE: |
| padding = np.tile(img_history[0:1], (self.IMG_HISTORY_SIZE - img_valid_len, 1, 1, 1)) |
| img_history = np.concatenate([padding, img_history], axis=0) |
|
|
| img_history_mask = np.array( |
| [False] * (self.IMG_HISTORY_SIZE - img_valid_len) + [True] * img_valid_len |
| ) |
|
|
| |
| state_std = np.std(states, axis=0) |
| state_mean = np.mean(states, axis=0) |
| state_norm = np.sqrt(np.mean(states ** 2, axis=0)) |
|
|
| |
| state = states[step_index: step_index + 1] |
| runtime_chunksize = self.CHUNK_SIZE // 4 |
| action_sequence = actions[step_index: step_index + runtime_chunksize] |
| |
|
|
| |
| if action_sequence.shape[0] < runtime_chunksize: |
| padding = np.tile(action_sequence[-1:], (runtime_chunksize - action_sequence.shape[0], 1)) |
| action_sequence = np.concatenate([action_sequence, padding], axis=0) |
|
|
| action_sequence = interpolate_action_sequence(action_sequence, self.CHUNK_SIZE) |
|
|
| |
| def fill_in_state(values): |
| UNI_STATE_INDICES = [ |
| STATE_VEC_IDX_MAPPING[f"right_arm_joint_{i}_pos"] for i in range(7) |
| ] + [ |
| STATE_VEC_IDX_MAPPING[f"right_gripper_open"] |
| ] |
| uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
| uni_vec[..., UNI_STATE_INDICES] = values |
| return uni_vec |
|
|
| state_indicator = fill_in_state(np.ones_like(state_std)) |
| state = fill_in_state(state) |
| state_std = fill_in_state(state_std) |
| state_mean = fill_in_state(state_mean) |
| state_norm = fill_in_state(state_norm) |
| action_sequence = fill_in_state(action_sequence) |
|
|
| |
| meta = { |
| "dataset_name": self.DATASET_NAME, |
| "#steps": num_steps, |
| "step_id": step_index, |
| "instruction": language |
| } |
|
|
| |
| return True, { |
| "meta": meta, |
| "state": state, |
| "state_std": state_std, |
| "state_mean": state_mean, |
| "state_norm": state_norm, |
| "actions": action_sequence, |
| "state_indicator": state_indicator, |
| "cam_high": img_history, |
| "cam_high_mask": img_history_mask, |
| "cam_left_wrist": np.zeros((self.IMG_HISTORY_SIZE, 0, 0, 0)), |
| "cam_left_wrist_mask": np.zeros(self.IMG_HISTORY_SIZE, dtype=bool), |
| "cam_right_wrist": np.zeros((self.IMG_HISTORY_SIZE, 0, 0, 0)), |
| "cam_right_wrist_mask": np.zeros(self.IMG_HISTORY_SIZE, dtype=bool), |
| } |
|
|
|
|
| if __name__ == "__main__": |
| from PIL import Image |
| |
| ds = HDF5VLADataset() |
|
|
| json_data = { |
| 'state_min': ds.state_min.tolist(), |
| 'state_max': ds.state_max.tolist(), |
| 'action_min': ds.action_min.tolist(), |
| 'action_max': ds.action_max.tolist(), |
| } |
| print(json_data) |
|
|
| |
|
|