| | if __name__ == "__main__": |
| | import sys |
| | import os |
| | import pathlib |
| |
|
| | ROOT_DIR = str(pathlib.Path(__file__).parent.parent.parent) |
| | sys.path.append(ROOT_DIR) |
| |
|
| | import os |
| | import click |
| | import pathlib |
| | import zarr |
| | import cv2 |
| | import threadpoolctl |
| | from diffusion_policy.real_world.real_data_conversion import real_data_to_replay_buffer |
| |
|
| | @click.command() |
| | @click.option('--input', '-i', required=True) |
| | @click.option('--output', '-o', default=None) |
| | @click.option('--resolution', '-r', default='640x480') |
| | @click.option('--n_decoding_threads', '-nd', default=-1, type=int) |
| | @click.option('--n_encoding_threads', '-ne', default=-1, type=int) |
| | def main(input, output, resolution, n_decoding_threads, n_encoding_threads): |
| | out_resolution = tuple(int(x) for x in resolution.split('x')) |
| | input = pathlib.Path(os.path.expanduser(input)) |
| | in_zarr_path = input.joinpath('replay_buffer.zarr') |
| | in_video_dir = input.joinpath('videos') |
| | assert in_zarr_path.is_dir() |
| | assert in_video_dir.is_dir() |
| | if output is None: |
| | output = input.joinpath(resolution + '.zarr.zip') |
| | else: |
| | output = pathlib.Path(os.path.expanduser(output)) |
| |
|
| | if output.exists(): |
| | click.confirm('Output path already exists! Overrite?', abort=True) |
| |
|
| | cv2.setNumThreads(1) |
| | with threadpoolctl.threadpool_limits(1): |
| | replay_buffer = real_data_to_replay_buffer( |
| | dataset_path=str(input), |
| | out_resolutions=out_resolution, |
| | n_decoding_threads=n_decoding_threads, |
| | n_encoding_threads=n_encoding_threads |
| | ) |
| | |
| | print('Saving to disk') |
| | if output.suffix == '.zip': |
| | with zarr.ZipStore(output) as zip_store: |
| | replay_buffer.save_to_store( |
| | store=zip_store |
| | ) |
| | else: |
| | with zarr.DirectoryStore(output) as store: |
| | replay_buffer.save_to_store( |
| | store=store |
| | ) |
| |
|
| | if __name__ == '__main__': |
| | main() |
| |
|