File size: 6,825 Bytes
45950ff | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | import sys
sys.path.append('/mnt/shenzhen2cephfs/capybarali/codes/humanoid')
import torch, yaml, os
from tqdm import tqdm
from src.utils.rotation_conversions import quaternion_to_matrix, matrix_to_rotation_6d, matrix_to_axis_angle, rotation_6d_to_matrix, matrix_to_quaternion, axis_angle_to_matrix
import numpy as np
from argparse import ArgumentParser
import joblib
from data.vis import vis_3d_motion
from data.vis_g1 import vis_3d_g1
from copy import deepcopy
from scipy.spatial.transform import Rotation as R
from scipy.spatial.transform import Slerp
from scipy import interpolate
from joblib import Parallel, delayed
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
parser = ArgumentParser(description="Launch MoCap processing")
parser.add_argument('--save_root', type=str, default="data/gmr_data")
parser.add_argument('--start_idx', type=int, default=0)
parser.add_argument('--interval', type=int, default=1)
args = parser.parse_args()
os.makedirs(args.save_root, exist_ok=True)
def extract_g1_component(x):
vel_xy = x[:, :2]
dof = x[:, -29:]
root_rot_mat = rotation_6d_to_matrix(x[:, 2:8])
trans_xyz = torch.cat([
torch.cumsum(vel_xy[:, :1], dim=0), # new_x = cumsum(Δnew_x)
x[:, 9:10], # new_y = height (direct)
torch.cumsum(vel_xy[:, 1:], dim=0), # new_z = cumsum(Δnew_z)
], dim=1)
rot_mat = torch.tensor([[1, 0, 0], [0, 0, -1], [0, 1, 0]]).float()
global_orient_mat = root_rot_mat.squeeze(1).float()
global_orient_mat = torch.einsum('ij,tjk->tik', rot_mat, global_orient_mat)
rot_quat = matrix_to_quaternion(global_orient_mat) # (T, 4) wxyz order
transl = trans_xyz.float()
transl = torch.einsum('ij,tj->ti', rot_mat, transl)
return dof, rot_quat, transl
def interpolate_pos(pos, fps, target_fps, method='linear'):
'''
高性能版本,使用向量化操作
pos: (T, ..., 3) 原始位置序列
fps: int 原始帧率
target_fps: int 目标帧率
'''
original_shape = pos.shape
T = original_shape[0]
total_time = T / fps
t_original = np.linspace(0, total_time, T)
target_T = int(np.ceil(total_time * target_fps))
t_target = np.linspace(0, total_time, target_T)
# 重塑数据为 (T, -1)
flattened_pos = pos.reshape(T, -1)
# 线性插值
f = interpolate.interp1d(t_original, flattened_pos, axis=0,
kind='linear', bounds_error=False,
fill_value='extrapolate')
# 插值
interpolated_flattened = f(t_target)
# 恢复形状
new_shape = list(original_shape)
new_shape[0] = target_T
interpolated_pos = interpolated_flattened.reshape(new_shape)
return interpolated_pos
def interpolate_quat(quat, fps, target_fps):
'''
使用scipy内置函数的简化版本
quat: (T, 4) 原始四元数序列,T为原始帧数,4为四元数维度
fps: int 原始帧率
target_fps: int 目标帧率
'''
T = quat.shape[0]
total_time = T / fps
# 原始时间点
t_original = np.linspace(0, total_time, T)
# 目标时间点
target_T = int(np.ceil(total_time * target_fps))
t_target = np.linspace(0, total_time, target_T)
# 确保四元数是单位四元数
quat_normalized = quat / np.linalg.norm(quat, axis=1, keepdims=True)
# 创建旋转对象
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.transform.Rotation.from_quat.html
# scalar-first order – (w, x, y, z)
rotations = R.from_quat(quat_normalized, scalar_first=True)
# 创建Slerp插值器
slerp = Slerp(t_original, rotations)
# 在目标时间点进行插值
rotations_interp = slerp(t_target)
# 获取插值后的四元数
quat_interp = rotations_interp.as_quat(scalar_first=True)
return quat_interp
def get_g1_motion(data_path):
g1_data = np.load(data_path)
ori_fps = g1_data['fps']
target_fps = 30
dof = g1_data['joint_pos'] # T, 29
root_ori = g1_data['body_quat_w'][:, 0] # T, 4, wxyz
joints = g1_data['body_pos_w'] # T, J, 3
# 插值
dof = interpolate_pos(dof, ori_fps, target_fps)
joints = interpolate_pos(joints, ori_fps, target_fps)
global_orient = interpolate_quat(root_ori, ori_fps, target_fps) # wxyz
rotation_matrix = torch.tensor([[1.0, 0, 0], [0, 0, -1], [0, 1, 0]]).inverse()
global_orient_mat = quaternion_to_matrix(torch.from_numpy(global_orient)).float()
global_orient_mat = torch.einsum('ij,tjk->tik', rotation_matrix, global_orient_mat)
global_orient = matrix_to_axis_angle(global_orient_mat)
position_data = torch.einsum('ij,tkj->tki', rotation_matrix, torch.from_numpy(joints).float())
position_val_data = position_data[1:] - position_data[:-1]
root_idx = 0
# put on floor and put root on origin for the first frame
ori = deepcopy(position_data[0, root_idx]) # first frame root position
y_min = torch.min(position_data[:, :, 1])
# ori[1] = y_min
# position_data = position_data - ori
velocities_root = position_data[1:, root_idx, :] - position_data[:-1, root_idx, :]
position_data_cp = deepcopy(position_data)
position_data[:,:,0] -= position_data_cp[:,0:1,0]
position_data[:,:,2] -= position_data_cp[:,0:1,2]
# vis_3d_g1([position_data.numpy()[:, 1:]], None, ['video.mp4'], fps=30)
T, njoint, _ = position_data.shape
final_x = torch.zeros((T, 2 + 6 + njoint * 3 + njoint * 3))
final_x[1:, 0] = velocities_root[:, 0]
final_x[1:, 1] = velocities_root[:, 2]
final_x[:, 2:2+6] = matrix_to_rotation_6d(global_orient_mat)
final_x[:, 8:8+njoint*3] = position_data.flatten(1, 2)
final_x[1:, 8+njoint*3:8+njoint*6] = position_val_data.flatten(1, 2) # T, 140
final_x = torch.concat([final_x, torch.from_numpy(dof).float()], dim=-1)
# if final_x.shape[0] > 200:
# import ipdb; ipdb.set_trace()
# dof, rot_quat, transl = extract_g1_component(final_x)
# joblib.dump(dict(dof=dof, rot_quat=rot_quat, transl=transl), 'data.pkl')
return final_x # 217
# python -m data.motionmillion.tools.process --save_root "data/motionmillion/final_data"
def func(line):
data_path = line.strip()
g1_motion = get_g1_motion(data_path)
data_path = data_path.replace('.npz', '.npy').replace('/mnt/shenzhen2cephfs/capybarali/codes/neobot/data/motionmillion/2_gmr_retarget_full/', '')
save_path = args.save_root + '/' + data_path
os.makedirs(os.path.dirname(save_path), exist_ok=True)
np.save(save_path, g1_motion)
if __name__ == '__main__':
with open('data/gmr_path.txt', 'r') as f:
paths = f.readlines()
Parallel(n_jobs=64)(delayed(func)(line) for line in tqdm(paths[args.start_idx::args.interval]))
|