ricl / preprocessing /process_collected_demos.py
doanh25032004's picture
Add files using upload-large-folder tool
1ae1bd3 verified
import numpy as np
import json
import h5py
import os
import argparse
from PIL import Image
from openpi.policies.utils import embed_with_batches, load_dinov2, EMBED_DIM
from openpi_client.image_tools import resize_with_pad
import logging
logging.basicConfig(level=logging.INFO, force=True)
logger = logging.getLogger(__name__)
def process(dir, prompts):
# load model for embedding images
dinov2 = load_dinov2()
logger.info(f'loaded dinov2 for image embedding')
# get current directory and append the dir argument to get demo_dir
current_dir = os.path.dirname(os.path.abspath(__file__)) # get current directory
demo_dir = f"{current_dir}/{dir}"
logger.info(f'absolute path of the {demo_dir=}')
# get all the folders (demos) in the demo_dir
demo_folders = [f"{demo_dir}/{f}" for f in os.listdir(demo_dir) if os.path.isdir(f"{demo_dir}/{f}")]
logger.info(f'number of demo folders: {len(demo_folders)}')
# iterate over the demo_folders and read the trajectory.h5 files and the frames
for demo_folder in demo_folders:
if os.path.exists(f"{demo_folder}/processed_demo.npz"):
logger.info(f'{demo_folder=} already processed')
continue
processed_demo = {}
logger.info(f'processing {demo_folder=}')
traj_h5 = h5py.File(f"{demo_folder}/trajectory.h5", 'r')
skip_bools = traj_h5["observation"]["timestamp"]["skip_action"][:]
keep_bools = ~skip_bools
obs_gripper_pos = traj_h5["observation"]["robot_state"]["gripper_position"][:].reshape(-1, 1)[keep_bools]
act_gripper_pos = traj_h5["action"]["gripper_position"][:].reshape(-1, 1)[keep_bools]
obs_joint_pos = traj_h5["observation"]["robot_state"]["joint_positions"][keep_bools]
act_joint_vel = traj_h5["action"]["joint_velocity"][keep_bools]
processed_demo["state"] = np.concatenate([obs_joint_pos, obs_gripper_pos], axis=1)
processed_demo["actions"] = np.concatenate([act_joint_vel, act_gripper_pos], axis=1)
num_steps = processed_demo["state"].shape[0]
assert processed_demo["state"].shape == processed_demo["actions"].shape == (num_steps, 8)
for camera_name, key in zip(['hand_camera', 'varied_camera_1', 'varied_camera_2'], ['wrist_image', 'top_image', 'right_image']):
frames_dir = f"{demo_folder}/recordings/frames/{camera_name}"
logger.info(f'{frames_dir=}')
frames = [f"{frames_dir}/{f}" for f in os.listdir(frames_dir)]
assert len(frames) == num_steps, f'{len(frames)=} {num_steps=}'
frames = [np.array(Image.open(frame)) for frame in frames]
frames = np.stack(frames, axis=0)
assert frames.shape == (num_steps, 720, 1280, 3) and frames.dtype == np.uint8, f'{frames.shape=} {frames.dtype=}'
frames = resize_with_pad(frames, 224, 224)
assert frames.shape == (num_steps, 224, 224, 3) and frames.dtype == np.uint8, f'{frames.shape=} {frames.dtype=}'
processed_demo[key] = frames
embeddings = embed_with_batches(frames, dinov2)
assert embeddings.shape == (num_steps, EMBED_DIM), f'{embeddings.shape=}'
processed_demo[f"{key}_embeddings"] = embeddings
# randomly sample a prompt from the prompts
prompt = np.random.choice(prompts)
processed_demo["prompt"] = prompt
# save the processed episode as a npz file
np.savez(f"{demo_folder}/processed_demo.npz", **processed_demo)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--dir", type=str, default=None)
parser.add_argument("--dir_of_dirs", type=str, default=None)
parser.add_argument("--prompts", nargs="+", type=str, default=None)
args = parser.parse_args()
assert args.dir is not None or args.dir_of_dirs is not None, "Either --dir or --dir_of_dirs must be provided"
if args.dir is not None:
assert args.prompts is not None, "If --dir is provided, --prompts must also be provided"
process(args.dir, args.prompts)
else:
for dir in os.listdir(args.dir_of_dirs):
temp_prompts = [" ".join(dir.split("_")[1:])]
logger.info(f'**About to start processing dir {args.dir_of_dirs}/{dir} with prompts {temp_prompts}**')
process(f"{args.dir_of_dirs}/{dir}", temp_prompts)
print(f'done!')