NMR / tools /data_process /process /process_g1.py
Xxx999's picture
upload
45950ff
import sys
sys.path.append('/mnt/shenzhen2cephfs/capybarali/codes/humanoid')
import torch, yaml, os
from tqdm import tqdm
from src.utils.rotation_conversions import quaternion_to_matrix, matrix_to_rotation_6d, matrix_to_axis_angle, rotation_6d_to_matrix, matrix_to_quaternion, axis_angle_to_matrix
import numpy as np
from argparse import ArgumentParser
import joblib
from data.vis import vis_3d_motion
from data.vis_g1 import vis_3d_g1
from copy import deepcopy
from scipy.spatial.transform import Rotation as R
from scipy.spatial.transform import Slerp
from scipy import interpolate
from joblib import Parallel, delayed
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
parser = ArgumentParser(description="Launch MoCap processing")
parser.add_argument('--save_root', type=str, default="data/gmr_data")
parser.add_argument('--start_idx', type=int, default=0)
parser.add_argument('--interval', type=int, default=1)
args = parser.parse_args()
os.makedirs(args.save_root, exist_ok=True)
def extract_g1_component(x):
vel_xy = x[:, :2]
dof = x[:, -29:]
root_rot_mat = rotation_6d_to_matrix(x[:, 2:8])
trans_xyz = torch.cat([
torch.cumsum(vel_xy[:, :1], dim=0), # new_x = cumsum(Δnew_x)
x[:, 9:10], # new_y = height (direct)
torch.cumsum(vel_xy[:, 1:], dim=0), # new_z = cumsum(Δnew_z)
], dim=1)
rot_mat = torch.tensor([[1, 0, 0], [0, 0, -1], [0, 1, 0]]).float()
global_orient_mat = root_rot_mat.squeeze(1).float()
global_orient_mat = torch.einsum('ij,tjk->tik', rot_mat, global_orient_mat)
rot_quat = matrix_to_quaternion(global_orient_mat) # (T, 4) wxyz order
transl = trans_xyz.float()
transl = torch.einsum('ij,tj->ti', rot_mat, transl)
return dof, rot_quat, transl
def interpolate_pos(pos, fps, target_fps, method='linear'):
'''
高性能版本,使用向量化操作
pos: (T, ..., 3) 原始位置序列
fps: int 原始帧率
target_fps: int 目标帧率
'''
original_shape = pos.shape
T = original_shape[0]
total_time = T / fps
t_original = np.linspace(0, total_time, T)
target_T = int(np.ceil(total_time * target_fps))
t_target = np.linspace(0, total_time, target_T)
# 重塑数据为 (T, -1)
flattened_pos = pos.reshape(T, -1)
# 线性插值
f = interpolate.interp1d(t_original, flattened_pos, axis=0,
kind='linear', bounds_error=False,
fill_value='extrapolate')
# 插值
interpolated_flattened = f(t_target)
# 恢复形状
new_shape = list(original_shape)
new_shape[0] = target_T
interpolated_pos = interpolated_flattened.reshape(new_shape)
return interpolated_pos
def interpolate_quat(quat, fps, target_fps):
'''
使用scipy内置函数的简化版本
quat: (T, 4) 原始四元数序列,T为原始帧数,4为四元数维度
fps: int 原始帧率
target_fps: int 目标帧率
'''
T = quat.shape[0]
total_time = T / fps
# 原始时间点
t_original = np.linspace(0, total_time, T)
# 目标时间点
target_T = int(np.ceil(total_time * target_fps))
t_target = np.linspace(0, total_time, target_T)
# 确保四元数是单位四元数
quat_normalized = quat / np.linalg.norm(quat, axis=1, keepdims=True)
# 创建旋转对象
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.transform.Rotation.from_quat.html
# scalar-first order – (w, x, y, z)
rotations = R.from_quat(quat_normalized, scalar_first=True)
# 创建Slerp插值器
slerp = Slerp(t_original, rotations)
# 在目标时间点进行插值
rotations_interp = slerp(t_target)
# 获取插值后的四元数
quat_interp = rotations_interp.as_quat(scalar_first=True)
return quat_interp
def get_g1_motion(data_path):
g1_data = np.load(data_path)
ori_fps = g1_data['fps']
target_fps = 30
dof = g1_data['joint_pos'] # T, 29
root_ori = g1_data['body_quat_w'][:, 0] # T, 4, wxyz
joints = g1_data['body_pos_w'] # T, J, 3
# 插值
dof = interpolate_pos(dof, ori_fps, target_fps)
joints = interpolate_pos(joints, ori_fps, target_fps)
global_orient = interpolate_quat(root_ori, ori_fps, target_fps) # wxyz
rotation_matrix = torch.tensor([[1.0, 0, 0], [0, 0, -1], [0, 1, 0]]).inverse()
global_orient_mat = quaternion_to_matrix(torch.from_numpy(global_orient)).float()
global_orient_mat = torch.einsum('ij,tjk->tik', rotation_matrix, global_orient_mat)
global_orient = matrix_to_axis_angle(global_orient_mat)
position_data = torch.einsum('ij,tkj->tki', rotation_matrix, torch.from_numpy(joints).float())
position_val_data = position_data[1:] - position_data[:-1]
root_idx = 0
# put on floor and put root on origin for the first frame
ori = deepcopy(position_data[0, root_idx]) # first frame root position
y_min = torch.min(position_data[:, :, 1])
# ori[1] = y_min
# position_data = position_data - ori
velocities_root = position_data[1:, root_idx, :] - position_data[:-1, root_idx, :]
position_data_cp = deepcopy(position_data)
position_data[:,:,0] -= position_data_cp[:,0:1,0]
position_data[:,:,2] -= position_data_cp[:,0:1,2]
# vis_3d_g1([position_data.numpy()[:, 1:]], None, ['video.mp4'], fps=30)
T, njoint, _ = position_data.shape
final_x = torch.zeros((T, 2 + 6 + njoint * 3 + njoint * 3))
final_x[1:, 0] = velocities_root[:, 0]
final_x[1:, 1] = velocities_root[:, 2]
final_x[:, 2:2+6] = matrix_to_rotation_6d(global_orient_mat)
final_x[:, 8:8+njoint*3] = position_data.flatten(1, 2)
final_x[1:, 8+njoint*3:8+njoint*6] = position_val_data.flatten(1, 2) # T, 140
final_x = torch.concat([final_x, torch.from_numpy(dof).float()], dim=-1)
# if final_x.shape[0] > 200:
# import ipdb; ipdb.set_trace()
# dof, rot_quat, transl = extract_g1_component(final_x)
# joblib.dump(dict(dof=dof, rot_quat=rot_quat, transl=transl), 'data.pkl')
return final_x # 217
# python -m data.motionmillion.tools.process --save_root "data/motionmillion/final_data"
def func(line):
data_path = line.strip()
g1_motion = get_g1_motion(data_path)
data_path = data_path.replace('.npz', '.npy').replace('/mnt/shenzhen2cephfs/capybarali/codes/neobot/data/motionmillion/2_gmr_retarget_full/', '')
save_path = args.save_root + '/' + data_path
os.makedirs(os.path.dirname(save_path), exist_ok=True)
np.save(save_path, g1_motion)
if __name__ == '__main__':
with open('data/gmr_path.txt', 'r') as f:
paths = f.readlines()
Parallel(n_jobs=64)(delayed(func)(line) for line in tqdm(paths[args.start_idx::args.interval]))