RoboticsDiffusionTransformer / data /preprocess_scripts /roboturk_real_objectsearch.py
euijinrnd's picture
Add files using upload-large-folder tool
9de9fbf verified
raw
history blame
9.75 kB
import tensorflow as tf
import tensorflow_datasets as tfds
from data.utils import clean_task_instruction, quaternion_to_euler
import tensorflow as tf
import h5py
import numpy as np
from tqdm import tqdm
import os
import imageio
import concurrent.futures
def get_frames(file_path):
if not os.path.exists(file_path) or not os.path.isfile(file_path) or not file_path.endswith('.mp4'):
return []
frames = []
with imageio.get_reader(file_path, 'ffmpeg') as reader:
for frame in reader:
frame = np.array(frame, dtype=np.uint8)
frames.append(frame)
return frames
def parallel_get_frames(paths):
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_path = {executor.submit(get_frames, path): path for path in paths}
return [future.result() for future in concurrent.futures.as_completed(future_to_path)]
def count_total_samples(filename):
total_samples = 0
with h5py.File(filename, 'r') as f:
data = f['data']
for user_key in data.keys():
user = data[user_key]
for demo_key in user.keys():
total_samples += 1
return total_samples
def dataset_generator(filename, total_samples):
with h5py.File(filename, 'r') as f:
data = f['data']
for user_key in data.keys():
user = data[user_key]
for demo_key in user.keys():
demo = user[demo_key]
robot_observation = demo['robot_observation']
user_control = demo['user_control']
eef_poses = robot_observation['eef_poses']
joint_states_arm = robot_observation['joint_states_arm']
joint_states_gripper = robot_observation['joint_states_gripper']
user_control_data = user_control['user_control']
attrs = dict(demo.attrs)
top_depth_video_file = attrs['top_depth_video_file']
top_rgb_video_file = attrs['top_rgb_video_file']
front_rgb_video_file = attrs['front_rgb_video_file']
video_root_path = './data/datasets/roboturk/'
top_depth_frames = get_frames(os.path.join(video_root_path, top_depth_video_file))
top_rgb_frames = get_frames(os.path.join(video_root_path, top_rgb_video_file))
front_rgb_frames = get_frames(os.path.join(video_root_path, front_rgb_video_file))
if len(top_rgb_frames) == 0 or len(front_rgb_frames) == 0:
continue
steps = []
for i in range(len(eef_poses)):
task_demo_id = f"SawyerTowerCreation_{demo_key}_{i}"
step = {
'task_demo_id': task_demo_id,
'eef_poses': eef_poses[i],
'joint_states_arm': joint_states_arm[i],
'joint_states_gripper': joint_states_gripper[i],
'user_control': user_control_data[i] if user_control_data.shape[0] > 0 else np.zeros(22),
'observation':{
'top_depth_frame': top_depth_frames[i] if i < len(top_depth_frames) else np.zeros((0,0, 3), dtype=np.uint8),
'top_rgb_frame': top_rgb_frames[i] if i < len(top_rgb_frames) else np.zeros((0, 0, 3), dtype=np.uint8),
'front_rgb_frame': front_rgb_frames[i] if i < len(front_rgb_frames) else np.zeros((0, 0, 3), dtype=np.uint8),
},
'terminate_episode': i == len(eef_poses) - 1
}
steps.append(step)
steps_dataset = tf.data.Dataset.from_generator(
lambda: iter(steps),
output_signature={
'task_demo_id': tf.TensorSpec(shape=(), dtype=tf.string),
'eef_poses': tf.TensorSpec(shape=(7,), dtype=tf.float32),
'joint_states_arm': tf.TensorSpec(shape=(27,), dtype=tf.float32),
'joint_states_gripper': tf.TensorSpec(shape=(3,), dtype=tf.float32),
'user_control': tf.TensorSpec(shape=(22,), dtype=tf.float32),
'observation':{
'top_depth_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8),
'top_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8),
'front_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8),
},
'terminate_episode': tf.TensorSpec(shape=(), dtype=tf.bool),
}
)
yield {'steps': steps_dataset}
def load_dataset():
filename = './data/datasets/roboturk/SawyerObjectSearch_aligned_dataset.hdf5'
total_samples = count_total_samples(filename)
dataset = tf.data.Dataset.from_generator(
lambda: dataset_generator(filename, total_samples),
output_signature={
'steps': tf.data.DatasetSpec(
element_spec={
'task_demo_id': tf.TensorSpec(shape=(), dtype=tf.string),
'eef_poses': tf.TensorSpec(shape=(7,), dtype=tf.float32),
'joint_states_arm': tf.TensorSpec(shape=(27,), dtype=tf.float32),
'joint_states_gripper': tf.TensorSpec(shape=(3,), dtype=tf.float32),
'user_control': tf.TensorSpec(shape=(22,), dtype=tf.float32),
'observation':{
'top_depth_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8),
'top_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8),
'front_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8),
},
'terminate_episode': tf.TensorSpec(shape=(), dtype = tf.bool),
}
)
}
)
return dataset
def terminate_act_to_bool(terminate_act: tf.Tensor) -> tf.Tensor:
"""
Convert terminate action to a boolean, where True means terminate.
"""
return tf.where(tf.equal(terminate_act, tf.constant(0.0, dtype=tf.float32)),tf.constant(False),tf.constant(True))
def process_step(step: dict) -> dict:
"""
Unify the action format and clean the task instruction.
DO NOT use python list, use tf.TensorArray instead.
"""
# Convert raw action to our action
step['action'] = {}
action = step['action']
action['terminate'] = step['terminate_episode']
eef_delta_pos = step['eef_poses'][:3]
eef_ang = step['eef_poses'][3:]
# No base found
# Concatenate the action
arm_action = tf.concat([eef_delta_pos, eef_ang], axis=0)
action['arm_concat'] = arm_action
# Write the action format
action['format'] = tf.constant(
"eef_delta_pos_x,eef_delta_pos_y,eef_delta_pos_z,eef_delta_angle_x,eef_delta_angle_y,eef_delta_angle_z,eef_delta_angle_w")
# No state found
state = step['observation']
# joint_states_arm: dataset of (num_timestamps, 27) shape where each of the 9 joints is represented by the JointState message
# (the nine joints are in order by their ROSBAG names: ['head_pan', 'right_j0', 'right_j1', 'right_j2', 'right_j3', 'right_j4', 'right_j5', 'right_j6', 'torso_t0']. For the most part, head_pan and torso should be zeros)
# [0] the position of the first joint (rad or m)
# [1] the velocity of the first joint (rad/s or m/s)
# [2] the effort that is applied in the first joint
# [3] the position of the second joint...
joint_states_arm = step['joint_states_arm']
joint_pos = joint_states_arm[3:24:3]
joint_vel = joint_states_arm[4:25:3]
# joint_states_gripper: dataset of (num_timestamps, 3) shape
# [0] the position of the gripper (rad or m)
# [1] the velocity of the gripper (rad/s or m/s)
# [2] the effort that is applied in the gripper
joint_states_gripper = step['joint_states_gripper']
gripper_pos = joint_states_gripper[:1]
# remove gripper_vel due to they are all zeros
# gripper_vel = joint_states_gripper[1:2]
# Concatenate the state
# state['arm_concat'] = tf.concat([joint_pos,joint_vel,gripper_pos,gripper_vel], axis=0)
state['arm_concat'] = tf.concat([joint_pos,joint_vel,gripper_pos], axis=0)
# Write the state format
state['format'] = tf.constant(
"arm_joint_0_pos,arm_joint_1_pos,arm_joint_2_pos,arm_joint_3_pos,arm_joint_4_pos,arm_joint_5_pos,arm_joint_6_pos,arm_joint_0_vel,arm_joint_1_vel,arm_joint_2_vel,arm_joint_3_vel,arm_joint_4_vel,arm_joint_5_vel,arm_joint_6_vel,gripper_joint_0_pos")
# Clean the task instruction
# Define the replacements (old, new) as a dictionary
replacements = {
'_': ' ',
'1f': ' ',
'4f': ' ',
'-': ' ',
'50': ' ',
'55': ' ',
'56': ' ',
}
# copied from openxembod
instr = b'create tower'
instr = clean_task_instruction(instr, replacements)
step['observation']['natural_language_instruction'] = instr
return step
if __name__ == "__main__":
import tensorflow_datasets as tfds
from data.utils import dataset_to_path
DATASET_DIR = '/cephfs-thu/gsm_data/openx_embod'
DATASET_NAME = 'roboturk_real_laundrylayout'
# Load the dataset
dataset = load_dataset()
# save_dir = os.path.join(DATASET_DIR, DATASET_NAME)
# if not os.path.exists(save_dir):
# os.makedirs(save_dir)
# tf.data.experimental.save(dataset, save_dir)