dp-franka-joint / diffusion_policy /scripts /bet_blockpush_conversion.py
ewykric's picture
Upload folder using huggingface_hub
33c751d verified
if __name__ == "__main__":
import sys
import os
import pathlib
ROOT_DIR = str(pathlib.Path(__file__).parent.parent.parent)
sys.path.append(ROOT_DIR)
import os
import click
import pathlib
import numpy as np
from diffusion_policy.common.replay_buffer import ReplayBuffer
@click.command()
@click.option('-i', '--input', required=True, help='input dir contains npy files')
@click.option('-o', '--output', required=True, help='output zarr path')
@click.option('--abs_action', is_flag=True, default=False)
def main(input, output, abs_action):
data_directory = pathlib.Path(input)
observations = np.load(
data_directory / "multimodal_push_observations.npy"
)
actions = np.load(data_directory / "multimodal_push_actions.npy")
masks = np.load(data_directory / "multimodal_push_masks.npy")
buffer = ReplayBuffer.create_empty_numpy()
for i in range(len(masks)):
eps_len = int(masks[i].sum())
obs = observations[i,:eps_len].astype(np.float32)
action = actions[i,:eps_len].astype(np.float32)
if abs_action:
prev_eef_target = obs[:,8:10]
next_eef_target = prev_eef_target + action
action = next_eef_target
data = {
'obs': obs,
'action': action
}
buffer.add_episode(data)
buffer.save_to_path(zarr_path=output, chunk_length=-1)
if __name__ == '__main__':
main()