|
|
import tensorflow as tf |
|
|
import tensorflow_datasets as tfds |
|
|
from data.utils import clean_task_instruction, quaternion_to_euler |
|
|
import tensorflow as tf |
|
|
import h5py |
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
import os |
|
|
import imageio |
|
|
import concurrent.futures |
|
|
|
|
|
def get_frames(file_path): |
|
|
if not os.path.exists(file_path) or not os.path.isfile(file_path) or not file_path.endswith('.mp4'): |
|
|
return [] |
|
|
frames = [] |
|
|
with imageio.get_reader(file_path, 'ffmpeg') as reader: |
|
|
for frame in reader: |
|
|
frame = np.array(frame, dtype=np.uint8) |
|
|
frames.append(frame) |
|
|
return frames |
|
|
|
|
|
def parallel_get_frames(paths): |
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
future_to_path = {executor.submit(get_frames, path): path for path in paths} |
|
|
return [future.result() for future in concurrent.futures.as_completed(future_to_path)] |
|
|
|
|
|
def count_total_samples(filename): |
|
|
total_samples = 0 |
|
|
with h5py.File(filename, 'r') as f: |
|
|
data = f['data'] |
|
|
for user_key in data.keys(): |
|
|
user = data[user_key] |
|
|
for demo_key in user.keys(): |
|
|
total_samples += 1 |
|
|
return total_samples |
|
|
|
|
|
def dataset_generator(filename, total_samples): |
|
|
with h5py.File(filename, 'r') as f: |
|
|
data = f['data'] |
|
|
for user_key in data.keys(): |
|
|
user = data[user_key] |
|
|
for demo_key in user.keys(): |
|
|
demo = user[demo_key] |
|
|
robot_observation = demo['robot_observation'] |
|
|
user_control = demo['user_control'] |
|
|
|
|
|
eef_poses = robot_observation['eef_poses'] |
|
|
joint_states_arm = robot_observation['joint_states_arm'] |
|
|
joint_states_gripper = robot_observation['joint_states_gripper'] |
|
|
user_control_data = user_control['user_control'] |
|
|
|
|
|
attrs = dict(demo.attrs) |
|
|
top_depth_video_file = attrs['top_depth_video_file'] |
|
|
top_rgb_video_file = attrs['top_rgb_video_file'] |
|
|
front_rgb_video_file = attrs['front_rgb_video_file'] |
|
|
|
|
|
video_root_path = './data/datasets/roboturk/' |
|
|
top_depth_frames = get_frames(os.path.join(video_root_path, top_depth_video_file)) |
|
|
top_rgb_frames = get_frames(os.path.join(video_root_path, top_rgb_video_file)) |
|
|
front_rgb_frames = get_frames(os.path.join(video_root_path, front_rgb_video_file)) |
|
|
|
|
|
if len(top_rgb_frames) == 0 or len(front_rgb_frames) == 0: |
|
|
continue |
|
|
|
|
|
steps = [] |
|
|
for i in range(len(eef_poses)): |
|
|
task_demo_id = f"SawyerTowerCreation_{demo_key}_{i}" |
|
|
step = { |
|
|
'task_demo_id': task_demo_id, |
|
|
'eef_poses': eef_poses[i], |
|
|
'joint_states_arm': joint_states_arm[i], |
|
|
'joint_states_gripper': joint_states_gripper[i], |
|
|
'user_control': user_control_data[i] if user_control_data.shape[0] > 0 else np.zeros(22), |
|
|
'observation':{ |
|
|
'top_depth_frame': top_depth_frames[i] if i < len(top_depth_frames) else np.zeros((0,0, 3), dtype=np.uint8), |
|
|
'top_rgb_frame': top_rgb_frames[i] if i < len(top_rgb_frames) else np.zeros((0, 0, 3), dtype=np.uint8), |
|
|
'front_rgb_frame': front_rgb_frames[i] if i < len(front_rgb_frames) else np.zeros((0, 0, 3), dtype=np.uint8), |
|
|
}, |
|
|
'terminate_episode': i == len(eef_poses) - 1 |
|
|
} |
|
|
steps.append(step) |
|
|
|
|
|
|
|
|
steps_dataset = tf.data.Dataset.from_generator( |
|
|
lambda: iter(steps), |
|
|
output_signature={ |
|
|
'task_demo_id': tf.TensorSpec(shape=(), dtype=tf.string), |
|
|
'eef_poses': tf.TensorSpec(shape=(7,), dtype=tf.float32), |
|
|
'joint_states_arm': tf.TensorSpec(shape=(27,), dtype=tf.float32), |
|
|
'joint_states_gripper': tf.TensorSpec(shape=(3,), dtype=tf.float32), |
|
|
'user_control': tf.TensorSpec(shape=(22,), dtype=tf.float32), |
|
|
'observation':{ |
|
|
'top_depth_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8), |
|
|
'top_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8), |
|
|
'front_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8), |
|
|
}, |
|
|
'terminate_episode': tf.TensorSpec(shape=(), dtype=tf.bool), |
|
|
} |
|
|
) |
|
|
|
|
|
yield {'steps': steps_dataset} |
|
|
|
|
|
def load_dataset(): |
|
|
filename = './data/datasets/roboturk/SawyerObjectSearch_aligned_dataset.hdf5' |
|
|
total_samples = count_total_samples(filename) |
|
|
dataset = tf.data.Dataset.from_generator( |
|
|
lambda: dataset_generator(filename, total_samples), |
|
|
output_signature={ |
|
|
'steps': tf.data.DatasetSpec( |
|
|
element_spec={ |
|
|
'task_demo_id': tf.TensorSpec(shape=(), dtype=tf.string), |
|
|
'eef_poses': tf.TensorSpec(shape=(7,), dtype=tf.float32), |
|
|
'joint_states_arm': tf.TensorSpec(shape=(27,), dtype=tf.float32), |
|
|
'joint_states_gripper': tf.TensorSpec(shape=(3,), dtype=tf.float32), |
|
|
'user_control': tf.TensorSpec(shape=(22,), dtype=tf.float32), |
|
|
'observation':{ |
|
|
'top_depth_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8), |
|
|
'top_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8), |
|
|
'front_rgb_frame': tf.TensorSpec(shape=(None, None, 3), dtype=tf.uint8), |
|
|
}, |
|
|
'terminate_episode': tf.TensorSpec(shape=(), dtype = tf.bool), |
|
|
} |
|
|
) |
|
|
} |
|
|
) |
|
|
return dataset |
|
|
|
|
|
def terminate_act_to_bool(terminate_act: tf.Tensor) -> tf.Tensor: |
|
|
""" |
|
|
Convert terminate action to a boolean, where True means terminate. |
|
|
""" |
|
|
return tf.where(tf.equal(terminate_act, tf.constant(0.0, dtype=tf.float32)),tf.constant(False),tf.constant(True)) |
|
|
|
|
|
|
|
|
def process_step(step: dict) -> dict: |
|
|
""" |
|
|
Unify the action format and clean the task instruction. |
|
|
|
|
|
DO NOT use python list, use tf.TensorArray instead. |
|
|
""" |
|
|
|
|
|
step['action'] = {} |
|
|
action = step['action'] |
|
|
action['terminate'] = step['terminate_episode'] |
|
|
|
|
|
eef_delta_pos = step['eef_poses'][:3] |
|
|
eef_ang = step['eef_poses'][3:] |
|
|
|
|
|
|
|
|
|
|
|
arm_action = tf.concat([eef_delta_pos, eef_ang], axis=0) |
|
|
action['arm_concat'] = arm_action |
|
|
|
|
|
|
|
|
action['format'] = tf.constant( |
|
|
"eef_delta_pos_x,eef_delta_pos_y,eef_delta_pos_z,eef_delta_angle_x,eef_delta_angle_y,eef_delta_angle_z,eef_delta_angle_w") |
|
|
|
|
|
|
|
|
state = step['observation'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
joint_states_arm = step['joint_states_arm'] |
|
|
joint_pos = joint_states_arm[3:24:3] |
|
|
joint_vel = joint_states_arm[4:25:3] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
joint_states_gripper = step['joint_states_gripper'] |
|
|
gripper_pos = joint_states_gripper[:1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
state['arm_concat'] = tf.concat([joint_pos,joint_vel,gripper_pos], axis=0) |
|
|
|
|
|
state['format'] = tf.constant( |
|
|
"arm_joint_0_pos,arm_joint_1_pos,arm_joint_2_pos,arm_joint_3_pos,arm_joint_4_pos,arm_joint_5_pos,arm_joint_6_pos,arm_joint_0_vel,arm_joint_1_vel,arm_joint_2_vel,arm_joint_3_vel,arm_joint_4_vel,arm_joint_5_vel,arm_joint_6_vel,gripper_joint_0_pos") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
replacements = { |
|
|
'_': ' ', |
|
|
'1f': ' ', |
|
|
'4f': ' ', |
|
|
'-': ' ', |
|
|
'50': ' ', |
|
|
'55': ' ', |
|
|
'56': ' ', |
|
|
|
|
|
} |
|
|
|
|
|
instr = b'create tower' |
|
|
instr = clean_task_instruction(instr, replacements) |
|
|
step['observation']['natural_language_instruction'] = instr |
|
|
|
|
|
return step |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import tensorflow_datasets as tfds |
|
|
from data.utils import dataset_to_path |
|
|
|
|
|
DATASET_DIR = '/cephfs-thu/gsm_data/openx_embod' |
|
|
DATASET_NAME = 'roboturk_real_laundrylayout' |
|
|
|
|
|
dataset = load_dataset() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|