if __name__ == "__main__": import sys import os import pathlib ROOT_DIR = str(pathlib.Path(__file__).parent.parent.parent) sys.path.append(ROOT_DIR) import os import click import pathlib import zarr import cv2 import threadpoolctl from diffusion_policy.real_world.real_data_conversion import real_data_to_replay_buffer @click.command() @click.option('--input', '-i', required=True) @click.option('--output', '-o', default=None) @click.option('--resolution', '-r', default='640x480') @click.option('--n_decoding_threads', '-nd', default=-1, type=int) @click.option('--n_encoding_threads', '-ne', default=-1, type=int) def main(input, output, resolution, n_decoding_threads, n_encoding_threads): out_resolution = tuple(int(x) for x in resolution.split('x')) input = pathlib.Path(os.path.expanduser(input)) in_zarr_path = input.joinpath('replay_buffer.zarr') in_video_dir = input.joinpath('videos') assert in_zarr_path.is_dir() assert in_video_dir.is_dir() if output is None: output = input.joinpath(resolution + '.zarr.zip') else: output = pathlib.Path(os.path.expanduser(output)) if output.exists(): click.confirm('Output path already exists! Overrite?', abort=True) cv2.setNumThreads(1) with threadpoolctl.threadpool_limits(1): replay_buffer = real_data_to_replay_buffer( dataset_path=str(input), out_resolutions=out_resolution, n_decoding_threads=n_decoding_threads, n_encoding_threads=n_encoding_threads ) print('Saving to disk') if output.suffix == '.zip': with zarr.ZipStore(output) as zip_store: replay_buffer.save_to_store( store=zip_store ) else: with zarr.DirectoryStore(output) as store: replay_buffer.save_to_store( store=store ) if __name__ == '__main__': main()