if __name__ == "__main__": import sys import os import pathlib ROOT_DIR = str(pathlib.Path(__file__).parent.parent.parent) sys.path.append(ROOT_DIR) import os import click import pathlib import numpy as np from diffusion_policy.common.replay_buffer import ReplayBuffer @click.command() @click.option('-i', '--input', required=True, help='input dir contains npy files') @click.option('-o', '--output', required=True, help='output zarr path') @click.option('--abs_action', is_flag=True, default=False) def main(input, output, abs_action): data_directory = pathlib.Path(input) observations = np.load( data_directory / "multimodal_push_observations.npy" ) actions = np.load(data_directory / "multimodal_push_actions.npy") masks = np.load(data_directory / "multimodal_push_masks.npy") buffer = ReplayBuffer.create_empty_numpy() for i in range(len(masks)): eps_len = int(masks[i].sum()) obs = observations[i,:eps_len].astype(np.float32) action = actions[i,:eps_len].astype(np.float32) if abs_action: prev_eef_target = obs[:,8:10] next_eef_target = prev_eef_target + action action = next_eef_target data = { 'obs': obs, 'action': action } buffer.add_episode(data) buffer.save_to_path(zarr_path=output, chunk_length=-1) if __name__ == '__main__': main()