Spaces:
Runtime error
Runtime error
| """ | |
| Preprocessing Tranformers Based on sci-kit's API | |
| By Omid Alemi | |
| Created on June 12, 2017 | |
| """ | |
| import copy | |
| import numpy as np | |
| import pandas as pd | |
| import scipy.ndimage.filters as filters | |
| import transforms3d as t3d | |
| from scipy import signal | |
| from sklearn.base import BaseEstimator, TransformerMixin | |
| from sklearn.pipeline import Pipeline | |
| from pymo.Pivots import Pivots | |
| from pymo.Quaternions import Quaternions | |
| from pymo.rotation_tools import ( | |
| Rotation, | |
| euler2expmap, | |
| euler2expmap2, | |
| euler2vectors, | |
| euler_reorder, | |
| expmap2euler, | |
| unroll, | |
| vectors2euler, | |
| ) | |
| class MocapParameterizer(BaseEstimator, TransformerMixin): | |
| def __init__(self, param_type="euler", ref_pose=None): | |
| """ | |
| param_type = {'euler', 'quat', 'expmap', 'position', 'expmap2pos'} | |
| """ | |
| self.param_type = param_type | |
| if ref_pose is not None: | |
| self.ref_pose = self._to_quat(ref_pose)[0] | |
| else: | |
| self.ref_pose = None | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| print("MocapParameterizer: " + self.param_type) | |
| if self.param_type == "euler": | |
| return X | |
| elif self.param_type == "expmap": | |
| if self.ref_pose is None: | |
| return self._to_expmap(X) | |
| else: | |
| return self._to_expmap2(X) | |
| elif self.param_type == "vectors": | |
| return self._euler_to_vectors(X) | |
| elif self.param_type == "quat": | |
| return self._to_quat(X) | |
| elif self.param_type == "position": | |
| return self._to_pos(X) | |
| elif self.param_type == "expmap2pos": | |
| return self._expmap_to_pos(X) | |
| else: | |
| raise "param types: euler, quat, expmap, position, expmap2pos" | |
| # return X | |
| def inverse_transform(self, X, copy=None): | |
| if self.param_type == "euler": | |
| return X | |
| elif self.param_type == "expmap": | |
| if self.ref_pose is None: | |
| return self._expmap_to_euler(X) | |
| else: | |
| return self._expmap_to_euler2(X) | |
| elif self.param_type == "vectors": | |
| return self._vectors_to_euler(X) | |
| elif self.param_type == "quat": | |
| return self._quat_to_euler(X) | |
| elif self.param_type == "position": | |
| # raise 'positions 2 eulers is not supported' | |
| print("positions 2 eulers is not supported") | |
| return X | |
| else: | |
| raise "param types: euler, quat, expmap, position" | |
| def _to_quat(self, X): | |
| """Converts joints rotations in quaternions""" | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| euler_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| quat_df = euler_df.copy() | |
| # List the columns that contain rotation channels | |
| rot_cols = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| rot_order = track.skeleton[joint]["order"] | |
| # Get the rotation columns that belong to this joint | |
| rc = euler_df[[c for c in rot_cols if joint in c]] | |
| r1_col = "%s_%srotation" % (joint, rot_order[0]) | |
| r2_col = "%s_%srotation" % (joint, rot_order[1]) | |
| r3_col = "%s_%srotation" % (joint, rot_order[2]) | |
| # Make sure the columns are organized in xyz order | |
| if rc.shape[1] < 3: | |
| euler_values = np.zeros((euler_df.shape[0], 3)) | |
| rot_order = "XYZ" | |
| else: | |
| euler_values = ( | |
| np.pi | |
| / 180.0 | |
| * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) | |
| ) | |
| quat_df.drop([r1_col, r2_col, r3_col], axis=1, inplace=True) | |
| quats = Quaternions.from_euler(np.asarray(euler_values), order=rot_order.lower(), world=False) | |
| # Create the corresponding columns in the new DataFrame | |
| quat_df["%s_qWrotation" % joint] = pd.Series(data=[e[0] for e in quats], index=quat_df.index) | |
| quat_df["%s_qXrotation" % joint] = pd.Series(data=[e[1] for e in quats], index=quat_df.index) | |
| quat_df["%s_qYrotation" % joint] = pd.Series(data=[e[2] for e in quats], index=quat_df.index) | |
| quat_df["%s_qZrotation" % joint] = pd.Series(data=[e[3] for e in quats], index=quat_df.index) | |
| new_track = track.clone() | |
| new_track.values = quat_df | |
| Q.append(new_track) | |
| return Q | |
| def _quat_to_euler(self, X): | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| quat_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| # euler_df = pd.DataFrame(index=exp_df.index) | |
| euler_df = quat_df.copy() | |
| # Copy the root positions into the new DataFrame | |
| # rxp = '%s_Xposition'%track.root_name | |
| # ryp = '%s_Yposition'%track.root_name | |
| # rzp = '%s_Zposition'%track.root_name | |
| # euler_df[rxp] = pd.Series(data=exp_df[rxp], index=euler_df.index) | |
| # euler_df[ryp] = pd.Series(data=exp_df[ryp], index=euler_df.index) | |
| # euler_df[rzp] = pd.Series(data=exp_df[rzp], index=euler_df.index) | |
| # List the columns that contain rotation channels | |
| quat_params = [ | |
| c | |
| for c in quat_df.columns | |
| if (any(p in c for p in ["qWrotation", "qXrotation", "qYrotation", "qZrotation"]) and "Nub" not in c) | |
| ] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| r = quat_df[[c for c in quat_params if joint in c]] # Get the columns that belong to this joint | |
| euler_df.drop( | |
| [ | |
| "%s_qWrotation" % joint, | |
| "%s_qXrotation" % joint, | |
| "%s_qYrotation" % joint, | |
| "%s_qZrotation" % joint, | |
| ], | |
| axis=1, | |
| inplace=True, | |
| ) | |
| quat = [ | |
| [ | |
| f[1]["%s_qWrotation" % joint], | |
| f[1]["%s_qXrotation" % joint], | |
| f[1]["%s_qYrotation" % joint], | |
| f[1]["%s_qZrotation" % joint], | |
| ] | |
| for f in r.iterrows() | |
| ] # Make sure the columsn are organized in xyz order | |
| quats = Quaternions(np.asarray(quat)) | |
| euler_rots = 180 / np.pi * quats.euler() | |
| track.skeleton[joint]["order"] = "ZYX" | |
| rot_order = track.skeleton[joint]["order"] | |
| # euler_rots = [Rotation(f, 'expmap').to_euler(True, rot_order) for f in expmap] # Convert the exp maps to eulers | |
| # euler_rots = [expmap2euler(f, rot_order, True) for f in expmap] # Convert the exp maps to eulers | |
| # Create the corresponding columns in the new DataFrame | |
| euler_df["%s_%srotation" % (joint, rot_order[2])] = pd.Series( | |
| data=[e[0] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[1])] = pd.Series( | |
| data=[e[1] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[0])] = pd.Series( | |
| data=[e[2] for e in euler_rots], index=euler_df.index | |
| ) | |
| new_track = track.clone() | |
| new_track.values = euler_df | |
| Q.append(new_track) | |
| return Q | |
| def _to_pos(self, X): | |
| """Converts joints rotations in Euler angles to joint positions""" | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| euler_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| pos_df = pd.DataFrame(index=euler_df.index) | |
| # Copy the root rotations into the new DataFrame | |
| # rxp = '%s_Xrotation'%track.root_name | |
| # ryp = '%s_Yrotation'%track.root_name | |
| # rzp = '%s_Zrotation'%track.root_name | |
| # pos_df[rxp] = pd.Series(data=euler_df[rxp], index=pos_df.index) | |
| # pos_df[ryp] = pd.Series(data=euler_df[ryp], index=pos_df.index) | |
| # pos_df[rzp] = pd.Series(data=euler_df[rzp], index=pos_df.index) | |
| # List the columns that contain rotation channels | |
| rot_cols = [c for c in euler_df.columns if ("rotation" in c)] | |
| # List the columns that contain position channels | |
| pos_cols = [c for c in euler_df.columns if ("position" in c)] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton) | |
| tree_data = {} | |
| for joint in track.traverse(): | |
| parent = track.skeleton[joint]["parent"] | |
| rot_order = track.skeleton[joint]["order"] | |
| # print("rot_order:" + joint + " :" + rot_order) | |
| # Get the rotation columns that belong to this joint | |
| rc = euler_df[[c for c in rot_cols if joint in c]] | |
| # Get the position columns that belong to this joint | |
| pc = euler_df[[c for c in pos_cols if joint in c]] | |
| # Make sure the columns are organized in xyz order | |
| if rc.shape[1] < 3: | |
| euler_values = np.zeros((euler_df.shape[0], 3)) | |
| rot_order = "XYZ" | |
| else: | |
| euler_values = ( | |
| np.pi | |
| / 180.0 | |
| * np.transpose( | |
| np.array( | |
| [ | |
| track.values["%s_%srotation" % (joint, rot_order[0])], | |
| track.values["%s_%srotation" % (joint, rot_order[1])], | |
| track.values["%s_%srotation" % (joint, rot_order[2])], | |
| ] | |
| ) | |
| ) | |
| ) | |
| if pc.shape[1] < 3: | |
| pos_values = np.asarray([[0, 0, 0] for f in pc.iterrows()]) | |
| else: | |
| pos_values = np.asarray( | |
| [ | |
| [f[1]["%s_Xposition" % joint], f[1]["%s_Yposition" % joint], f[1]["%s_Zposition" % joint]] | |
| for f in pc.iterrows() | |
| ] | |
| ) | |
| quats = Quaternions.from_euler(np.asarray(euler_values), order=rot_order.lower(), world=False) | |
| tree_data[joint] = [[], []] # to store the rotation matrix # to store the calculated position | |
| if track.root_name == joint: | |
| tree_data[joint][0] = quats # rotmats | |
| # tree_data[joint][1] = np.add(pos_values, track.skeleton[joint]['offsets']) | |
| tree_data[joint][1] = pos_values | |
| else: | |
| # for every frame i, multiply this joint's rotmat to the rotmat of its parent | |
| tree_data[joint][0] = tree_data[parent][0] * quats # np.matmul(rotmats, tree_data[parent][0]) | |
| # add the position channel to the offset and store it in k, for every frame i | |
| k = pos_values + np.asarray(track.skeleton[joint]["offsets"]) | |
| # multiply k to the rotmat of the parent for every frame i | |
| q = tree_data[parent][0] * k # np.matmul(k.reshape(k.shape[0],1,3), tree_data[parent][0]) | |
| # add q to the position of the parent, for every frame i | |
| tree_data[joint][1] = tree_data[parent][1] + q # q.reshape(k.shape[0],3) + tree_data[parent][1] | |
| # Create the corresponding columns in the new DataFrame | |
| pos_df["%s_Xposition" % joint] = pd.Series(data=[e[0] for e in tree_data[joint][1]], index=pos_df.index) | |
| pos_df["%s_Yposition" % joint] = pd.Series(data=[e[1] for e in tree_data[joint][1]], index=pos_df.index) | |
| pos_df["%s_Zposition" % joint] = pd.Series(data=[e[2] for e in tree_data[joint][1]], index=pos_df.index) | |
| new_track = track.clone() | |
| new_track.values = pos_df | |
| Q.append(new_track) | |
| return Q | |
| def _expmap2rot(self, expmap): | |
| theta = np.linalg.norm(expmap, axis=1, keepdims=True) | |
| nz = np.nonzero(theta)[0] | |
| expmap[nz, :] = expmap[nz, :] / theta[nz] | |
| nrows = expmap.shape[0] | |
| x = expmap[:, 0] | |
| y = expmap[:, 1] | |
| z = expmap[:, 2] | |
| s = np.sin(theta * 0.5).reshape(nrows) | |
| c = np.cos(theta * 0.5).reshape(nrows) | |
| rotmats = np.zeros((nrows, 3, 3)) | |
| rotmats[:, 0, 0] = 2 * (x * x - 1) * s * s + 1 | |
| rotmats[:, 0, 1] = 2 * x * y * s * s - 2 * z * c * s | |
| rotmats[:, 0, 2] = 2 * x * z * s * s + 2 * y * c * s | |
| rotmats[:, 1, 0] = 2 * x * y * s * s + 2 * z * c * s | |
| rotmats[:, 1, 1] = 2 * (y * y - 1) * s * s + 1 | |
| rotmats[:, 1, 2] = 2 * y * z * s * s - 2 * x * c * s | |
| rotmats[:, 2, 0] = 2 * x * z * s * s - 2 * y * c * s | |
| rotmats[:, 2, 1] = 2 * y * z * s * s + 2 * x * c * s | |
| rotmats[:, 2, 2] = 2 * (z * z - 1) * s * s + 1 | |
| return rotmats | |
| def _expmap_to_pos(self, X): | |
| """Converts joints rotations in expmap notation to joint positions""" | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| exp_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| pos_df = pd.DataFrame(index=exp_df.index) | |
| # Copy the root rotations into the new DataFrame | |
| # rxp = '%s_Xrotation'%track.root_name | |
| # ryp = '%s_Yrotation'%track.root_name | |
| # rzp = '%s_Zrotation'%track.root_name | |
| # pos_df[rxp] = pd.Series(data=euler_df[rxp], index=pos_df.index) | |
| # pos_df[ryp] = pd.Series(data=euler_df[ryp], index=pos_df.index) | |
| # pos_df[rzp] = pd.Series(data=euler_df[rzp], index=pos_df.index) | |
| # List the columns that contain rotation channels | |
| exp_params = [ | |
| c for c in exp_df.columns if (any(p in c for p in ["alpha", "beta", "gamma"]) and "Nub" not in c) | |
| ] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton) | |
| tree_data = {} | |
| for joint in track.traverse(): | |
| parent = track.skeleton[joint]["parent"] | |
| if "Nub" not in joint: | |
| r = exp_df[[c for c in exp_params if joint in c]] # Get the columns that belong to this joint | |
| expmap = r.values | |
| # expmap = [[f[1]['%s_alpha'%joint], f[1]['%s_beta'%joint], f[1]['%s_gamma'%joint]] for f in r.iterrows()] | |
| else: | |
| expmap = np.zeros((exp_df.shape[0], 3)) | |
| # Convert the eulers to rotation matrices | |
| # rotmats = np.asarray([Rotation(f, 'expmap').rotmat for f in expmap]) | |
| # angs = np.linalg.norm(expmap,axis=1, keepdims=True) | |
| rotmats = self._expmap2rot(expmap) | |
| tree_data[joint] = [[], []] # to store the rotation matrix # to store the calculated position | |
| pos_values = np.zeros((exp_df.shape[0], 3)) | |
| if track.root_name == joint: | |
| tree_data[joint][0] = rotmats | |
| # tree_data[joint][1] = np.add(pos_values, track.skeleton[joint]['offsets']) | |
| tree_data[joint][1] = pos_values | |
| else: | |
| # for every frame i, multiply this joint's rotmat to the rotmat of its parent | |
| tree_data[joint][0] = np.matmul(rotmats, tree_data[parent][0]) | |
| # add the position channel to the offset and store it in k, for every frame i | |
| k = pos_values + track.skeleton[joint]["offsets"] | |
| # multiply k to the rotmat of the parent for every frame i | |
| q = np.matmul(k.reshape(k.shape[0], 1, 3), tree_data[parent][0]) | |
| # add q to the position of the parent, for every frame i | |
| tree_data[joint][1] = q.reshape(k.shape[0], 3) + tree_data[parent][1] | |
| # Create the corresponding columns in the new DataFrame | |
| pos_df["%s_Xposition" % joint] = pd.Series(data=tree_data[joint][1][:, 0], index=pos_df.index) | |
| pos_df["%s_Yposition" % joint] = pd.Series(data=tree_data[joint][1][:, 1], index=pos_df.index) | |
| pos_df["%s_Zposition" % joint] = pd.Series(data=tree_data[joint][1][:, 2], index=pos_df.index) | |
| new_track = track.clone() | |
| new_track.values = pos_df | |
| Q.append(new_track) | |
| return Q | |
| def _to_expmap(self, X): | |
| """Converts Euler angles to Exponential Maps""" | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| euler_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| exp_df = euler_df.copy() # pd.DataFrame(index=euler_df.index) | |
| # Copy the root positions into the new DataFrame | |
| # rxp = '%s_Xposition'%track.root_name | |
| # ryp = '%s_Yposition'%track.root_name | |
| # rzp = '%s_Zposition'%track.root_name | |
| # exp_df[rxp] = pd.Series(data=euler_df[rxp], index=exp_df.index) | |
| # exp_df[ryp] = pd.Series(data=euler_df[ryp], index=exp_df.index) | |
| # exp_df[rzp] = pd.Series(data=euler_df[rzp], index=exp_df.index) | |
| # List the columns that contain rotation channels | |
| rots = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| # print(joint) | |
| r = euler_df[[c for c in rots if joint in c]] # Get the columns that belong to this joint | |
| rot_order = track.skeleton[joint]["order"] | |
| r1_col = "%s_%srotation" % (joint, rot_order[0]) | |
| r2_col = "%s_%srotation" % (joint, rot_order[1]) | |
| r3_col = "%s_%srotation" % (joint, rot_order[2]) | |
| exp_df.drop([r1_col, r2_col, r3_col], axis=1, inplace=True) | |
| euler = [[f[1][r1_col], f[1][r2_col], f[1][r3_col]] for f in r.iterrows()] | |
| # exps = [Rotation(f, 'euler', from_deg=True, order=rot_order).to_expmap() for f in euler] # Convert the eulers to exp maps | |
| exps = unroll( | |
| np.array([euler2expmap(f, rot_order, True) for f in euler]) | |
| ) # Convert the exp maps to eulers | |
| # exps = euler2expmap2(euler, rot_order, True) # Convert the eulers to exp maps | |
| # Create the corresponding columns in the new DataFrame | |
| exp_df.insert( | |
| loc=0, column="%s_gamma" % joint, value=pd.Series(data=[e[2] for e in exps], index=exp_df.index) | |
| ) | |
| exp_df.insert( | |
| loc=0, column="%s_beta" % joint, value=pd.Series(data=[e[1] for e in exps], index=exp_df.index) | |
| ) | |
| exp_df.insert( | |
| loc=0, column="%s_alpha" % joint, value=pd.Series(data=[e[0] for e in exps], index=exp_df.index) | |
| ) | |
| # print(exp_df.columns) | |
| new_track = track.clone() | |
| new_track.values = exp_df | |
| Q.append(new_track) | |
| return Q | |
| def _expmap_to_euler(self, X): | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| exp_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| # euler_df = pd.DataFrame(index=exp_df.index) | |
| euler_df = exp_df.copy() | |
| # Copy the root positions into the new DataFrame | |
| # rxp = '%s_Xposition'%track.root_name | |
| # ryp = '%s_Yposition'%track.root_name | |
| # rzp = '%s_Zposition'%track.root_name | |
| # euler_df[rxp] = pd.Series(data=exp_df[rxp], index=euler_df.index) | |
| # euler_df[ryp] = pd.Series(data=exp_df[ryp], index=euler_df.index) | |
| # euler_df[rzp] = pd.Series(data=exp_df[rzp], index=euler_df.index) | |
| # List the columns that contain rotation channels | |
| exp_params = [ | |
| c for c in exp_df.columns if (any(p in c for p in ["alpha", "beta", "gamma"]) and "Nub" not in c) | |
| ] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| r = exp_df[[c for c in exp_params if joint in c]] # Get the columns that belong to this joint | |
| euler_df.drop(["%s_alpha" % joint, "%s_beta" % joint, "%s_gamma" % joint], axis=1, inplace=True) | |
| expmap = [ | |
| [f[1]["%s_alpha" % joint], f[1]["%s_beta" % joint], f[1]["%s_gamma" % joint]] for f in r.iterrows() | |
| ] # Make sure the columsn are organized in xyz order | |
| rot_order = track.skeleton[joint]["order"] | |
| # euler_rots = [Rotation(f, 'expmap').to_euler(True, rot_order) for f in expmap] # Convert the exp maps to eulers | |
| euler_rots = [expmap2euler(f, rot_order, True) for f in expmap] # Convert the exp maps to eulers | |
| # Create the corresponding columns in the new DataFrame | |
| euler_df["%s_%srotation" % (joint, rot_order[0])] = pd.Series( | |
| data=[e[0] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[1])] = pd.Series( | |
| data=[e[1] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[2])] = pd.Series( | |
| data=[e[2] for e in euler_rots], index=euler_df.index | |
| ) | |
| new_track = track.clone() | |
| new_track.values = euler_df | |
| Q.append(new_track) | |
| return Q | |
| def _to_expmap2(self, X): | |
| """Converts Euler angles to Exponential Maps""" | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| euler_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| exp_df = euler_df.copy() # pd.DataFrame(index=euler_df.index) | |
| # Copy the root positions into the new DataFrame | |
| # rxp = '%s_Xposition'%track.root_name | |
| # ryp = '%s_Yposition'%track.root_name | |
| # rzp = '%s_Zposition'%track.root_name | |
| # exp_df[rxp] = pd.Series(data=euler_df[rxp], index=exp_df.index) | |
| # exp_df[ryp] = pd.Series(data=euler_df[ryp], index=exp_df.index) | |
| # exp_df[rzp] = pd.Series(data=euler_df[rzp], index=exp_df.index) | |
| # List the columns that contain rotation channels | |
| rots = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| r = euler_df[[c for c in rots if joint in c]] # Get the columns that belong to this joint | |
| rot_order = track.skeleton[joint]["order"] | |
| # Get the rotation columns that belong to this joint | |
| rc = euler_df[[c for c in rots if joint in c]] | |
| r1_col = "%s_%srotation" % (joint, rot_order[0]) | |
| r2_col = "%s_%srotation" % (joint, rot_order[1]) | |
| r3_col = "%s_%srotation" % (joint, rot_order[2]) | |
| # Make sure the columns are organized in xyz order | |
| # print("joint:" + str(joint) + " rot_order:" + str(rot_order)) | |
| if rc.shape[1] < 3: | |
| euler_values = np.zeros((euler_df.shape[0], 3)) | |
| rot_order = "XYZ" | |
| else: | |
| euler_values = ( | |
| np.pi | |
| / 180.0 | |
| * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) | |
| ) | |
| quats = Quaternions.from_euler(np.asarray(euler_values), order=rot_order.lower(), world=False) | |
| # exps = [Rotation(f, 'euler', from_deg=True, order=rot_order).to_expmap() for f in euler] # Convert the eulers to exp maps | |
| # exps = unroll(np.array([euler2expmap(f, rot_order, True) for f in euler])) # Convert the exp maps to eulers | |
| # exps = euler2expmap2(euler, rot_order, True) # Convert the eulers to exp maps | |
| # Create the corresponding columns in the new DataFrame | |
| if self.ref_pose is not None: | |
| q1_col = "%s_qWrotation" % (joint) | |
| q2_col = "%s_qXrotation" % (joint) | |
| q3_col = "%s_qYrotation" % (joint) | |
| q4_col = "%s_qZrotation" % (joint) | |
| ref_q = Quaternions( | |
| np.asarray( | |
| [ | |
| [f[1][q1_col], f[1][q2_col], f[1][q3_col], f[1][q4_col]] | |
| for f in self.ref_pose.values.iterrows() | |
| ] | |
| ) | |
| ) | |
| # print("ref_q:" + str(ref_q.shape)) | |
| ref_q = ref_q[0, :] | |
| quats = (-ref_q) * quats | |
| angles, axis = quats.angle_axis() | |
| aa = np.where(angles > np.pi) | |
| angles[aa] = angles[aa] - 2 * np.pi | |
| # exps = unroll(angles[:,None]*axis) | |
| exps = angles[:, None] * axis | |
| # print(f"{joint}: {str(exps[0,:])}") | |
| # exps = np.array([quat2expmap(f) for f in quats]) | |
| exp_df.drop([r1_col, r2_col, r3_col], axis=1, inplace=True) | |
| exp_df.insert( | |
| loc=0, column="%s_gamma" % joint, value=pd.Series(data=[e[2] for e in exps], index=exp_df.index) | |
| ) | |
| exp_df.insert( | |
| loc=0, column="%s_beta" % joint, value=pd.Series(data=[e[1] for e in exps], index=exp_df.index) | |
| ) | |
| exp_df.insert( | |
| loc=0, column="%s_alpha" % joint, value=pd.Series(data=[e[0] for e in exps], index=exp_df.index) | |
| ) | |
| # print(exp_df.columns) | |
| new_track = track.clone() | |
| new_track.values = exp_df | |
| Q.append(new_track) | |
| return Q | |
| def _expmap_to_euler2(self, X): | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| exp_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| # euler_df = pd.DataFrame(index=exp_df.index) | |
| euler_df = exp_df.copy() | |
| # Copy the root positions into the new DataFrame | |
| # rxp = '%s_Xposition'%track.root_name | |
| # ryp = '%s_Yposition'%track.root_name | |
| # rzp = '%s_Zposition'%track.root_name | |
| # euler_df[rxp] = pd.Series(data=exp_df[rxp], index=euler_df.index) | |
| # euler_df[ryp] = pd.Series(data=exp_df[ryp], index=euler_df.index) | |
| # euler_df[rzp] = pd.Series(data=exp_df[rzp], index=euler_df.index) | |
| # List the columns that contain rotation channels | |
| exp_params = [ | |
| c for c in exp_df.columns if (any(p in c for p in ["alpha", "beta", "gamma"]) and "Nub" not in c) | |
| ] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| r = exp_df[[c for c in exp_params if joint in c]] # Get the columns that belong to this joint | |
| euler_df.drop(["%s_alpha" % joint, "%s_beta" % joint, "%s_gamma" % joint], axis=1, inplace=True) | |
| expmap = [ | |
| [f[1]["%s_alpha" % joint], f[1]["%s_beta" % joint], f[1]["%s_gamma" % joint]] for f in r.iterrows() | |
| ] # Make sure the columsn are organized in xyz order | |
| angs = np.linalg.norm(expmap, axis=1) | |
| quats = Quaternions.from_angle_axis(angs, expmap / (np.tile(angs[:, None] + 1e-10, (1, 3)))) | |
| if self.ref_pose is not None: | |
| q1_col = "%s_qWrotation" % (joint) | |
| q2_col = "%s_qXrotation" % (joint) | |
| q3_col = "%s_qYrotation" % (joint) | |
| q4_col = "%s_qZrotation" % (joint) | |
| ref_q = Quaternions( | |
| np.asarray( | |
| [ | |
| [f[1][q1_col], f[1][q2_col], f[1][q3_col], f[1][q4_col]] | |
| for f in self.ref_pose.values.iterrows() | |
| ] | |
| ) | |
| ) | |
| # print("ref_q:" + str(ref_q.shape)) | |
| ref_q = ref_q[0, :] | |
| quats = ref_q * quats | |
| euler_rots = 180 / np.pi * quats.euler() | |
| track.skeleton[joint]["order"] = "ZYX" | |
| rot_order = track.skeleton[joint]["order"] | |
| # euler_rots = [Rotation(f, 'expmap').to_euler(True, rot_order) for f in expmap] # Convert the exp maps to eulers | |
| # euler_rots = [expmap2euler(f, rot_order, True) for f in expmap] # Convert the exp maps to eulers | |
| # Create the corresponding columns in the new DataFrame | |
| euler_df["%s_%srotation" % (joint, rot_order[2])] = pd.Series( | |
| data=[e[0] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[1])] = pd.Series( | |
| data=[e[1] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[0])] = pd.Series( | |
| data=[e[2] for e in euler_rots], index=euler_df.index | |
| ) | |
| new_track = track.clone() | |
| new_track.values = euler_df | |
| Q.append(new_track) | |
| return Q | |
| def _euler_to_vectors(self, X): | |
| """Converts Euler angles to Up and Fwd vectors""" | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| euler_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| vec_df = euler_df.copy() # pd.DataFrame(index=euler_df.index) | |
| # List the columns that contain rotation channels | |
| rots = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| # print(joint) | |
| r = euler_df[[c for c in rots if joint in c]] # Get the columns that belong to this joint | |
| rot_order = track.skeleton[joint]["order"] | |
| r1_col = "%s_%srotation" % (joint, rot_order[0]) | |
| r2_col = "%s_%srotation" % (joint, rot_order[1]) | |
| r3_col = "%s_%srotation" % (joint, rot_order[2]) | |
| vec_df.drop([r1_col, r2_col, r3_col], axis=1, inplace=True) | |
| euler = [[f[1][r1_col], f[1][r2_col], f[1][r3_col]] for f in r.iterrows()] | |
| vectors = np.array([euler2vectors(f, rot_order, True) for f in euler]) | |
| vec_df.insert( | |
| loc=0, column="%s_xUp" % joint, value=pd.Series(data=[e[0] for e in vectors], index=vec_df.index) | |
| ) | |
| vec_df.insert( | |
| loc=0, column="%s_yUp" % joint, value=pd.Series(data=[e[1] for e in vectors], index=vec_df.index) | |
| ) | |
| vec_df.insert( | |
| loc=0, column="%s_zUp" % joint, value=pd.Series(data=[e[2] for e in vectors], index=vec_df.index) | |
| ) | |
| vec_df.insert( | |
| loc=0, column="%s_xFwd" % joint, value=pd.Series(data=[e[3] for e in vectors], index=vec_df.index) | |
| ) | |
| vec_df.insert( | |
| loc=0, column="%s_yFwd" % joint, value=pd.Series(data=[e[4] for e in vectors], index=vec_df.index) | |
| ) | |
| vec_df.insert( | |
| loc=0, column="%s_zFwd" % joint, value=pd.Series(data=[e[5] for e in vectors], index=vec_df.index) | |
| ) | |
| # print(exp_df.columns) | |
| new_track = track.clone() | |
| new_track.values = vec_df | |
| Q.append(new_track) | |
| return Q | |
| def _vectors_to_euler(self, X): | |
| """Converts Up and Fwd vectors to Euler angles""" | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| vec_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| # euler_df = pd.DataFrame(index=exp_df.index) | |
| euler_df = vec_df.copy() | |
| # List the columns that contain rotation channels | |
| vec_params = [ | |
| c | |
| for c in vec_df.columns | |
| if (any(p in c for p in ["xUp", "yUp", "zUp", "xFwd", "yFwd", "zFwd"]) and "Nub" not in c) | |
| ] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| for joint in joints: | |
| r = vec_df[[c for c in vec_params if joint in c]] # Get the columns that belong to this joint | |
| euler_df.drop( | |
| [ | |
| "%s_xUp" % joint, | |
| "%s_yUp" % joint, | |
| "%s_zUp" % joint, | |
| "%s_xFwd" % joint, | |
| "%s_yFwd" % joint, | |
| "%s_zFwd" % joint, | |
| ], | |
| axis=1, | |
| inplace=True, | |
| ) | |
| vectors = [ | |
| [ | |
| f[1]["%s_xUp" % joint], | |
| f[1]["%s_yUp" % joint], | |
| f[1]["%s_zUp" % joint], | |
| f[1]["%s_xFwd" % joint], | |
| f[1]["%s_yFwd" % joint], | |
| f[1]["%s_zFwd" % joint], | |
| ] | |
| for f in r.iterrows() | |
| ] # Make sure the columsn are organized in xyz order | |
| rot_order = track.skeleton[joint]["order"] | |
| euler_rots = [vectors2euler(f, rot_order, True) for f in vectors] | |
| # Create the corresponding columns in the new DataFrame | |
| euler_df["%s_%srotation" % (joint, rot_order[0])] = pd.Series( | |
| data=[e[0] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[1])] = pd.Series( | |
| data=[e[1] for e in euler_rots], index=euler_df.index | |
| ) | |
| euler_df["%s_%srotation" % (joint, rot_order[2])] = pd.Series( | |
| data=[e[2] for e in euler_rots], index=euler_df.index | |
| ) | |
| new_track = track.clone() | |
| new_track.values = euler_df | |
| Q.append(new_track) | |
| return Q | |
| class Mirror(BaseEstimator, TransformerMixin): | |
| def __init__(self, axis="X", append=True): | |
| """ | |
| Mirrors the data | |
| """ | |
| self.axis = axis | |
| self.append = append | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| print("Mirror: " + self.axis) | |
| Q = [] | |
| if self.append: | |
| for track in X: | |
| Q.append(track) | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| if self.axis == "X": | |
| signs = np.array([1, -1, -1]) | |
| if self.axis == "Y": | |
| signs = np.array([-1, 1, -1]) | |
| if self.axis == "Z": | |
| signs = np.array([-1, -1, 1]) | |
| euler_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| new_df = pd.DataFrame(index=euler_df.index) | |
| # Copy the root positions into the new DataFrame | |
| rxp = "%s_Xposition" % track.root_name | |
| ryp = "%s_Yposition" % track.root_name | |
| rzp = "%s_Zposition" % track.root_name | |
| new_df[rxp] = pd.Series(data=-signs[0] * euler_df[rxp], index=new_df.index) | |
| new_df[ryp] = pd.Series(data=-signs[1] * euler_df[ryp], index=new_df.index) | |
| new_df[rzp] = pd.Series(data=-signs[2] * euler_df[rzp], index=new_df.index) | |
| # List the columns that contain rotation channels | |
| rots = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] | |
| # lft_rots = [c for c in euler_df.columns if ('Left' in c and 'rotation' in c and 'Nub' not in c)] | |
| # rgt_rots = [c for c in euler_df.columns if ('Right' in c and 'rotation' in c and 'Nub' not in c)] | |
| lft_joints = (joint for joint in track.skeleton if "Left" in joint and "Nub" not in joint) | |
| rgt_joints = (joint for joint in track.skeleton if "Right" in joint and "Nub" not in joint) | |
| new_track = track.clone() | |
| for lft_joint in lft_joints: | |
| # lr = euler_df[[c for c in rots if lft_joint + "_" in c]] | |
| # rot_order = track.skeleton[lft_joint]['order'] | |
| # lft_eulers = [[f[1]['%s_Xrotation'%lft_joint], f[1]['%s_Yrotation'%lft_joint], f[1]['%s_Zrotation'%lft_joint]] for f in lr.iterrows()] | |
| rgt_joint = lft_joint.replace("Left", "Right") | |
| # rr = euler_df[[c for c in rots if rgt_joint + "_" in c]] | |
| # rot_order = track.skeleton[rgt_joint]['order'] | |
| # rgt_eulers = [[f[1]['%s_Xrotation'%rgt_joint], f[1]['%s_Yrotation'%rgt_joint], f[1]['%s_Zrotation'%rgt_joint]] for f in rr.iterrows()] | |
| # Create the corresponding columns in the new DataFrame | |
| new_df["%s_Xrotation" % lft_joint] = pd.Series( | |
| data=signs[0] * track.values["%s_Xrotation" % rgt_joint], index=new_df.index | |
| ) | |
| new_df["%s_Yrotation" % lft_joint] = pd.Series( | |
| data=signs[1] * track.values["%s_Yrotation" % rgt_joint], index=new_df.index | |
| ) | |
| new_df["%s_Zrotation" % lft_joint] = pd.Series( | |
| data=signs[2] * track.values["%s_Zrotation" % rgt_joint], index=new_df.index | |
| ) | |
| new_df["%s_Xrotation" % rgt_joint] = pd.Series( | |
| data=signs[0] * track.values["%s_Xrotation" % lft_joint], index=new_df.index | |
| ) | |
| new_df["%s_Yrotation" % rgt_joint] = pd.Series( | |
| data=signs[1] * track.values["%s_Yrotation" % lft_joint], index=new_df.index | |
| ) | |
| new_df["%s_Zrotation" % rgt_joint] = pd.Series( | |
| data=signs[2] * track.values["%s_Zrotation" % lft_joint], index=new_df.index | |
| ) | |
| # List the joints that are not left or right, i.e. are on the trunk | |
| joints = ( | |
| joint for joint in track.skeleton if "Nub" not in joint and "Left" not in joint and "Right" not in joint | |
| ) | |
| for joint in joints: | |
| # r = euler_df[[c for c in rots if joint in c]] # Get the columns that belong to this joint | |
| # rot_order = track.skeleton[joint]['order'] | |
| # eulers = [[f[1]['%s_Xrotation'%joint], f[1]['%s_Yrotation'%joint], f[1]['%s_Zrotation'%joint]] for f in r.iterrows()] | |
| # Create the corresponding columns in the new DataFrame | |
| new_df["%s_Xrotation" % joint] = pd.Series( | |
| data=signs[0] * track.values["%s_Xrotation" % joint], index=new_df.index | |
| ) | |
| new_df["%s_Yrotation" % joint] = pd.Series( | |
| data=signs[1] * track.values["%s_Yrotation" % joint], index=new_df.index | |
| ) | |
| new_df["%s_Zrotation" % joint] = pd.Series( | |
| data=signs[2] * track.values["%s_Zrotation" % joint], index=new_df.index | |
| ) | |
| new_track.values = new_df | |
| new_track.take_name = track.take_name + "_mirrored" | |
| Q.append(new_track) | |
| return Q | |
| def inverse_transform(self, X, copy=None, start_pos=None): | |
| return X | |
| class EulerReorder(BaseEstimator, TransformerMixin): | |
| def __init__(self, new_order): | |
| """ | |
| Add a | |
| """ | |
| self.new_order = new_order | |
| def fit(self, X, y=None): | |
| self.orig_skeleton = copy.deepcopy(X[0].skeleton) | |
| print(self.orig_skeleton) | |
| return self | |
| def transform(self, X, y=None): | |
| print("EulerReorder") | |
| Q = [] | |
| for track in X: | |
| channels = [] | |
| titles = [] | |
| euler_df = track.values | |
| # Create a new DataFrame to store the exponential map rep | |
| # new_df = pd.DataFrame(index=euler_df.index) | |
| new_df = euler_df.copy() | |
| # Copy the root positions into the new DataFrame | |
| rxp = "%s_Xposition" % track.root_name | |
| ryp = "%s_Yposition" % track.root_name | |
| rzp = "%s_Zposition" % track.root_name | |
| new_df[rxp] = pd.Series(data=euler_df[rxp], index=new_df.index) | |
| new_df[ryp] = pd.Series(data=euler_df[ryp], index=new_df.index) | |
| new_df[rzp] = pd.Series(data=euler_df[rzp], index=new_df.index) | |
| # List the columns that contain rotation channels | |
| rots = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] | |
| # List the joints that are not end sites, i.e., have channels | |
| joints = (joint for joint in track.skeleton if "Nub" not in joint) | |
| new_track = track.clone() | |
| for joint in joints: | |
| r = euler_df[[c for c in rots if joint in c]] # Get the columns that belong to this joint | |
| rot_order = track.skeleton[joint]["order"] | |
| r1_col = "%s_%srotation" % (joint, rot_order[0]) | |
| r2_col = "%s_%srotation" % (joint, rot_order[1]) | |
| r3_col = "%s_%srotation" % (joint, rot_order[2]) | |
| euler = [[f[1][r1_col], f[1][r2_col], f[1][r3_col]] for f in r.iterrows()] | |
| # euler = [[f[1]['%s_Xrotation'%(joint)], f[1]['%s_Yrotation'%(joint)], f[1]['%s_Zrotation'%(joint)]] for f in r.iterrows()] | |
| new_euler = [euler_reorder(f, rot_order, self.new_order, True) for f in euler] | |
| # new_euler = euler_reorder2(np.array(euler), rot_order, self.new_order, True) | |
| # Create the corresponding columns in the new DataFrame | |
| new_df["%s_%srotation" % (joint, self.new_order[0])] = pd.Series( | |
| data=[e[0] for e in new_euler], index=new_df.index | |
| ) | |
| new_df["%s_%srotation" % (joint, self.new_order[1])] = pd.Series( | |
| data=[e[1] for e in new_euler], index=new_df.index | |
| ) | |
| new_df["%s_%srotation" % (joint, self.new_order[2])] = pd.Series( | |
| data=[e[2] for e in new_euler], index=new_df.index | |
| ) | |
| new_track.skeleton[joint]["order"] = self.new_order | |
| new_track.values = new_df | |
| Q.append(new_track) | |
| return Q | |
| def inverse_transform(self, X, copy=None, start_pos=None): | |
| return X | |
| class JointSelector(BaseEstimator, TransformerMixin): | |
| """ | |
| Allows for filtering the mocap data to include only the selected joints | |
| """ | |
| def __init__(self, joints, include_root=False): | |
| self.joints = joints | |
| self.include_root = include_root | |
| def fit(self, X, y=None): | |
| selected_joints = [] | |
| selected_channels = [] | |
| if self.include_root: | |
| selected_joints.append(X[0].root_name) | |
| selected_joints.extend(self.joints) | |
| for joint_name in selected_joints: | |
| selected_channels.extend([o for o in X[0].values.columns if (joint_name + "_") in o and "Nub" not in o]) | |
| self.selected_joints = selected_joints | |
| self.selected_channels = selected_channels | |
| self.not_selected = X[0].values.columns.difference(selected_channels) | |
| self.not_selected_values = {c: X[0].values[c].values[0] for c in self.not_selected} | |
| self.orig_skeleton = X[0].skeleton | |
| return self | |
| def transform(self, X, y=None): | |
| print("JointSelector") | |
| Q = [] | |
| for track in X: | |
| t2 = track.clone() | |
| for key in track.skeleton.keys(): | |
| if key not in self.selected_joints: | |
| t2.skeleton.pop(key) | |
| t2.values = track.values[self.selected_channels] | |
| for key in t2.skeleton.keys(): | |
| to_remove = list(set(t2.skeleton[key]["children"]) - set(self.selected_joints)) | |
| [t2.skeleton[key]["children"].remove(c) for c in to_remove] | |
| Q.append(t2) | |
| return Q | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| t2 = track.clone() | |
| skeleton = self.orig_skeleton | |
| for key in track.skeleton.keys(): | |
| skeleton[key]["order"] = track.skeleton[key]["order"] | |
| t2.skeleton = skeleton | |
| for d in self.not_selected: | |
| t2.values[d] = self.not_selected_values[d] | |
| Q.append(t2) | |
| return Q | |
| class Numpyfier(BaseEstimator, TransformerMixin): | |
| """ | |
| Just converts the values in a MocapData object into a numpy array | |
| Useful for the final stage of a pipeline before training | |
| """ | |
| def __init__(self): | |
| pass | |
| def fit(self, X, y=None): | |
| self.org_mocap_ = X[0].clone() | |
| self.org_mocap_.values.drop(self.org_mocap_.values.index, inplace=True) | |
| return self | |
| def transform(self, X, y=None): | |
| print("Numpyfier") | |
| Q = [] | |
| for track in X: | |
| Q.append(track.values.values) | |
| # print("Numpyfier:" + str(track.values.columns)) | |
| return np.array(Q) | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| new_mocap = self.org_mocap_.clone() | |
| time_index = pd.to_timedelta([f for f in range(track.shape[0])], unit="s") * self.org_mocap_.framerate | |
| new_df = pd.DataFrame(data=track, index=time_index, columns=self.org_mocap_.values.columns) | |
| new_mocap.values = new_df | |
| Q.append(new_mocap) | |
| return Q | |
| class Slicer(BaseEstimator, TransformerMixin): | |
| """ | |
| Slice the data into intervals of equal size | |
| """ | |
| def __init__(self, window_size, overlap=0.5): | |
| self.window_size = window_size | |
| self.overlap = overlap | |
| pass | |
| def fit(self, X, y=None): | |
| self.org_mocap_ = X[0].clone() | |
| self.org_mocap_.values.drop(self.org_mocap_.values.index, inplace=True) | |
| return self | |
| def transform(self, X, y=None): | |
| print("Slicer") | |
| Q = [] | |
| for track in X: | |
| vals = track.values.values | |
| nframes = vals.shape[0] | |
| overlap_frames = (int)(self.overlap * self.window_size) | |
| n_sequences = (nframes - overlap_frames) // (self.window_size - overlap_frames) | |
| if n_sequences > 0: | |
| y = np.zeros((n_sequences, self.window_size, vals.shape[1])) | |
| # extract sequences from the input data | |
| for i in range(0, n_sequences): | |
| frameIdx = (self.window_size - overlap_frames) * i | |
| Q.append(vals[frameIdx : frameIdx + self.window_size, :]) | |
| return np.array(Q) | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| new_mocap = self.org_mocap_.clone() | |
| time_index = pd.to_timedelta([f for f in range(track.shape[0])], unit="s") | |
| new_df = pd.DataFrame(data=track, index=time_index, columns=self.org_mocap_.values.columns) | |
| new_mocap.values = new_df | |
| Q.append(new_mocap) | |
| return Q | |
| class RootTransformer(BaseEstimator, TransformerMixin): | |
| def __init__(self, method, hips_axis_order="XYZ", position_smoothing=0, rotation_smoothing=0, separate_root=True): | |
| """ | |
| Accepted methods: | |
| abdolute_translation_deltas | |
| pos_rot_deltas | |
| """ | |
| self.method = method | |
| self.position_smoothing = position_smoothing | |
| self.rotation_smoothing = rotation_smoothing | |
| self.separate_root = separate_root | |
| self.hips_axis_order = hips_axis_order | |
| # relative rotation from the hips awis the the x-side, y-up, z-forward convention | |
| rot_mat = np.zeros((3, 3)) | |
| for i in range(3): | |
| ax_i = ord(hips_axis_order[i]) - ord("X") | |
| rot_mat[i, ax_i] = 1 | |
| self.root_rotation_offset = Quaternions.from_transforms(rot_mat[np.newaxis, :, :]) | |
| self.hips_side_axis = -rot_mat[0, :] | |
| # self.hips_forward_axis = ord(hips_forward_axis)-ord('X') | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| print("RootTransformer") | |
| Q = [] | |
| for track in X: | |
| if self.method == "abdolute_translation_deltas": | |
| new_df = track.values.copy() | |
| xpcol = "%s_Xposition" % track.root_name | |
| ypcol = "%s_Yposition" % track.root_name | |
| zpcol = "%s_Zposition" % track.root_name | |
| dxpcol = "%s_dXposition" % track.root_name | |
| dzpcol = "%s_dZposition" % track.root_name | |
| x = track.values[xpcol].copy() | |
| z = track.values[zpcol].copy() | |
| if self.position_smoothing > 0: | |
| x_sm = filters.gaussian_filter1d(x, self.position_smoothing, axis=0, mode="nearest") | |
| z_sm = filters.gaussian_filter1d(z, self.position_smoothing, axis=0, mode="nearest") | |
| dx = pd.Series(data=x_sm, index=new_df.index).diff() | |
| dz = pd.Series(data=z_sm, index=new_df.index).diff() | |
| new_df[xpcol] = x - x_sm | |
| new_df[zpcol] = z - z_sm | |
| else: | |
| dx = x.diff() | |
| dz = z.diff() | |
| new_df.drop([xpcol, zpcol], axis=1, inplace=True) | |
| dx[0] = dx[1] | |
| dz[0] = dz[1] | |
| new_df[dxpcol] = dx | |
| new_df[dzpcol] = dz | |
| new_track = track.clone() | |
| new_track.values = new_df | |
| # end of abdolute_translation_deltas | |
| elif self.method == "pos_rot_deltas": | |
| new_track = track.clone() | |
| # Absolute columns | |
| xp_col = "%s_Xposition" % track.root_name | |
| yp_col = "%s_Yposition" % track.root_name | |
| zp_col = "%s_Zposition" % track.root_name | |
| # rot_order = track.skeleton[track.root_name]['order'] | |
| # %(joint, rot_order[0]) | |
| rot_order = track.skeleton[track.root_name]["order"] | |
| r1_col = "%s_%srotation" % (track.root_name, rot_order[0]) | |
| r2_col = "%s_%srotation" % (track.root_name, rot_order[1]) | |
| r3_col = "%s_%srotation" % (track.root_name, rot_order[2]) | |
| # Delta columns | |
| # dxp_col = '%s_dXposition'%track.root_name | |
| # dzp_col = '%s_dZposition'%track.root_name | |
| # dxr_col = '%s_dXrotation'%track.root_name | |
| # dyr_col = '%s_dYrotation'%track.root_name | |
| # dzr_col = '%s_dZrotation'%track.root_name | |
| dxp_col = "reference_dXposition" | |
| dzp_col = "reference_dZposition" | |
| dxr_col = "reference_dXrotation" | |
| dyr_col = "reference_dYrotation" | |
| dzr_col = "reference_dZrotation" | |
| positions = np.transpose(np.array([track.values[xp_col], track.values[yp_col], track.values[zp_col]])) | |
| rotations = ( | |
| np.pi | |
| / 180.0 | |
| * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) | |
| ) | |
| """ Get Trajectory and smooth it""" | |
| trajectory_filterwidth = self.position_smoothing | |
| reference = positions.copy() * np.array([1, 0, 1]) | |
| if trajectory_filterwidth > 0: | |
| reference = filters.gaussian_filter1d(reference, trajectory_filterwidth, axis=0, mode="nearest") | |
| """ Get Root Velocity """ | |
| velocity = np.diff(reference, axis=0) | |
| velocity = np.vstack((velocity[0, :], velocity)) | |
| """ Remove Root Translation """ | |
| positions = positions - reference | |
| """ Get Forward Direction along the x-z plane, assuming character is facig z-forward """ | |
| # forward = [Rotation(f, 'euler', from_deg=True, order=rot_order).rotmat[:,2] for f in rotations] # get the z-axis of the rotation matrix, assuming character is facig z-forward | |
| # print("order:" + rot_order.lower()) | |
| quats = Quaternions.from_euler(rotations, order=rot_order.lower(), world=False) | |
| # forward = quats*np.array([[0,0,1]]) | |
| # forward[:,1] = 0 | |
| side_dirs = quats * self.hips_side_axis | |
| forward = np.cross(np.array([[0, 1, 0]]), side_dirs) | |
| """ Smooth Forward Direction """ | |
| direction_filterwidth = self.rotation_smoothing | |
| if direction_filterwidth > 0: | |
| forward = filters.gaussian_filter1d(forward, direction_filterwidth, axis=0, mode="nearest") | |
| forward = forward / np.sqrt((forward**2).sum(axis=-1))[..., np.newaxis] | |
| """ Remove Y Rotation """ | |
| target = np.array([[0, 0, 1]]).repeat(len(forward), axis=0) | |
| rotation = Quaternions.between(target, forward)[:, np.newaxis] | |
| positions = (-rotation[:, 0]) * positions | |
| # new_rotations = (-rotation[:,0]) * quats | |
| new_rotations = (-self.root_rotation_offset) * (-rotation[:, 0]) * quats | |
| """ Get Root Rotation """ | |
| # print(rotation[:,0]) | |
| velocity = (-rotation[:, 0]) * velocity | |
| rvelocity = Pivots.from_quaternions(rotation[1:] * -rotation[:-1]).ps | |
| rvelocity = np.vstack((rvelocity[0], rvelocity)) | |
| eulers = ( | |
| np.array( | |
| [t3d.euler.quat2euler(q, axes=("s" + rot_order.lower()[::-1]))[::-1] for q in new_rotations] | |
| ) | |
| * 180.0 | |
| / np.pi | |
| ) | |
| new_df = track.values.copy() | |
| root_pos_x = pd.Series(data=positions[:, 0], index=new_df.index) | |
| root_pos_y = pd.Series(data=positions[:, 1], index=new_df.index) | |
| root_pos_z = pd.Series(data=positions[:, 2], index=new_df.index) | |
| root_pos_x_diff = pd.Series(data=velocity[:, 0], index=new_df.index) | |
| root_pos_z_diff = pd.Series(data=velocity[:, 2], index=new_df.index) | |
| root_rot_1 = pd.Series(data=eulers[:, 0], index=new_df.index) | |
| root_rot_2 = pd.Series(data=eulers[:, 1], index=new_df.index) | |
| root_rot_3 = pd.Series(data=eulers[:, 2], index=new_df.index) | |
| root_rot_y_diff = pd.Series(data=rvelocity[:, 0], index=new_df.index) | |
| # new_df.drop([xr_col, yr_col, zr_col, xp_col, zp_col], axis=1, inplace=True) | |
| new_df[xp_col] = root_pos_x | |
| new_df[yp_col] = root_pos_y | |
| new_df[zp_col] = root_pos_z | |
| new_df[dxp_col] = root_pos_x_diff | |
| new_df[dzp_col] = root_pos_z_diff | |
| new_df[r1_col] = root_rot_1 | |
| new_df[r2_col] = root_rot_2 | |
| new_df[r3_col] = root_rot_3 | |
| # new_df[dxr_col] = root_rot_x_diff | |
| new_df[dyr_col] = root_rot_y_diff | |
| # new_df[dzr_col] = root_rot_z_diff | |
| new_track.values = new_df | |
| elif self.method == "pos_xyz_rot_deltas": | |
| new_track = track.clone() | |
| # Absolute columns | |
| xp_col = "%s_Xposition" % track.root_name | |
| yp_col = "%s_Yposition" % track.root_name | |
| zp_col = "%s_Zposition" % track.root_name | |
| # rot_order = track.skeleton[track.root_name]['order'] | |
| # %(joint, rot_order[0]) | |
| rot_order = track.skeleton[track.root_name]["order"] | |
| r1_col = "%s_%srotation" % (track.root_name, rot_order[0]) | |
| r2_col = "%s_%srotation" % (track.root_name, rot_order[1]) | |
| r3_col = "%s_%srotation" % (track.root_name, rot_order[2]) | |
| # Delta columns | |
| # dxp_col = '%s_dXposition'%track.root_name | |
| # dzp_col = '%s_dZposition'%track.root_name | |
| # dxr_col = '%s_dXrotation'%track.root_name | |
| # dyr_col = '%s_dYrotation'%track.root_name | |
| # dzr_col = '%s_dZrotation'%track.root_name | |
| dxp_col = "reference_dXposition" | |
| dyp_col = "reference_dYposition" | |
| dzp_col = "reference_dZposition" | |
| dxr_col = "reference_dXrotation" | |
| dyr_col = "reference_dYrotation" | |
| dzr_col = "reference_dZrotation" | |
| positions = np.transpose(np.array([track.values[xp_col], track.values[yp_col], track.values[zp_col]])) | |
| rotations = ( | |
| np.pi | |
| / 180.0 | |
| * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) | |
| ) | |
| """ Get Trajectory and smooth it""" | |
| trajectory_filterwidth = self.position_smoothing | |
| # reference = positions.copy()*np.array([1,0,1]) | |
| if trajectory_filterwidth > 0: | |
| reference = filters.gaussian_filter1d(positions, trajectory_filterwidth, axis=0, mode="nearest") | |
| """ Get Root Velocity """ | |
| velocity = np.diff(reference, axis=0) | |
| velocity = np.vstack((velocity[0, :], velocity)) | |
| """ Remove Root Translation """ | |
| positions = positions - reference | |
| """ Get Forward Direction along the x-z plane, assuming character is facig z-forward """ | |
| # forward = [Rotation(f, 'euler', from_deg=True, order=rot_order).rotmat[:,2] for f in rotations] # get the z-axis of the rotation matrix, assuming character is facig z-forward | |
| # print("order:" + rot_order.lower()) | |
| quats = Quaternions.from_euler(rotations, order=rot_order.lower(), world=False) | |
| # calculate the hips forward directions given in global cordinates | |
| # side_ax = np.zeros((1,3)) | |
| # side_ax[0,self.hips_side_axis]=1 | |
| # side_dirs = quats*side_ax | |
| side_dirs = quats * self.hips_side_axis | |
| forward = np.cross(np.array([[0, 1, 0]]), side_dirs) | |
| """ Smooth Forward Direction """ | |
| direction_filterwidth = self.rotation_smoothing | |
| if direction_filterwidth > 0: | |
| forward = filters.gaussian_filter1d(forward, direction_filterwidth, axis=0, mode="nearest") | |
| # make unit vector | |
| forward = forward / np.sqrt((forward**2).sum(axis=-1))[..., np.newaxis] | |
| """ Remove Y Rotation """ | |
| target = np.array([[0, 0, 1]]).repeat(len(forward), axis=0) | |
| rotation = Quaternions.between(target, forward)[:, np.newaxis] | |
| positions = (-rotation[:, 0]) * positions | |
| new_rotations = (-self.root_rotation_offset) * (-rotation[:, 0]) * quats | |
| """ Get Root Rotation """ | |
| # print(rotation[:,0]) | |
| velocity = (-rotation[:, 0]) * velocity | |
| rvelocity = Pivots.from_quaternions(rotation[1:] * -rotation[:-1]).ps | |
| rvelocity = np.vstack((rvelocity[0], rvelocity)) | |
| eulers = ( | |
| np.array( | |
| [t3d.euler.quat2euler(q, axes=("s" + rot_order.lower()[::-1]))[::-1] for q in new_rotations] | |
| ) | |
| * 180.0 | |
| / np.pi | |
| ) | |
| new_df = track.values.copy() | |
| root_pos_x = pd.Series(data=positions[:, 0], index=new_df.index) | |
| root_pos_y = pd.Series(data=positions[:, 1], index=new_df.index) | |
| root_pos_z = pd.Series(data=positions[:, 2], index=new_df.index) | |
| root_pos_x_diff = pd.Series(data=velocity[:, 0], index=new_df.index) | |
| root_pos_y_diff = pd.Series(data=velocity[:, 1], index=new_df.index) | |
| root_pos_z_diff = pd.Series(data=velocity[:, 2], index=new_df.index) | |
| root_rot_1 = pd.Series(data=eulers[:, 0], index=new_df.index) | |
| root_rot_2 = pd.Series(data=eulers[:, 1], index=new_df.index) | |
| root_rot_3 = pd.Series(data=eulers[:, 2], index=new_df.index) | |
| root_rot_y_diff = pd.Series(data=rvelocity[:, 0], index=new_df.index) | |
| # new_df.drop([xr_col, yr_col, zr_col, xp_col, zp_col], axis=1, inplace=True) | |
| new_df[xp_col] = root_pos_x | |
| new_df[yp_col] = root_pos_y | |
| new_df[zp_col] = root_pos_z | |
| new_df[dxp_col] = root_pos_x_diff | |
| new_df[dyp_col] = root_pos_y_diff | |
| new_df[dzp_col] = root_pos_z_diff | |
| new_df[r1_col] = root_rot_1 | |
| new_df[r2_col] = root_rot_2 | |
| new_df[r3_col] = root_rot_3 | |
| # new_df[dxr_col] = root_rot_x_diff | |
| new_df[dyr_col] = root_rot_y_diff | |
| # new_df[dzr_col] = root_rot_z_diff | |
| new_track.values = new_df | |
| elif self.method == "hip_centric": | |
| new_track = track.clone() | |
| # Absolute columns | |
| xp_col = "%s_Xposition" % track.root_name | |
| yp_col = "%s_Yposition" % track.root_name | |
| zp_col = "%s_Zposition" % track.root_name | |
| xr_col = "%s_Xrotation" % track.root_name | |
| yr_col = "%s_Yrotation" % track.root_name | |
| zr_col = "%s_Zrotation" % track.root_name | |
| new_df = track.values.copy() | |
| all_zeros = np.zeros(track.values[xp_col].values.shape) | |
| new_df[xp_col] = pd.Series(data=all_zeros, index=new_df.index) | |
| new_df[yp_col] = pd.Series(data=all_zeros, index=new_df.index) | |
| new_df[zp_col] = pd.Series(data=all_zeros, index=new_df.index) | |
| new_df[xr_col] = pd.Series(data=all_zeros, index=new_df.index) | |
| new_df[yr_col] = pd.Series(data=all_zeros, index=new_df.index) | |
| new_df[zr_col] = pd.Series(data=all_zeros, index=new_df.index) | |
| new_track.values = new_df | |
| # print(new_track.values.columns) | |
| Q.append(new_track) | |
| return Q | |
| def inverse_transform(self, X, copy=None, start_pos=None): | |
| Q = [] | |
| # TODO: simplify this implementation | |
| startx = 0 | |
| startz = 0 | |
| if start_pos is not None: | |
| startx, startz = start_pos | |
| for track in X: | |
| new_track = track.clone() | |
| if self.method == "abdolute_translation_deltas": | |
| new_df = new_track.values | |
| xpcol = "%s_Xposition" % track.root_name | |
| ypcol = "%s_Yposition" % track.root_name | |
| zpcol = "%s_Zposition" % track.root_name | |
| dxpcol = "%s_dXposition" % track.root_name | |
| dzpcol = "%s_dZposition" % track.root_name | |
| dx = track.values[dxpcol].values | |
| dz = track.values[dzpcol].values | |
| recx = [startx] | |
| recz = [startz] | |
| for i in range(dx.shape[0] - 1): | |
| recx.append(recx[i] + dx[i + 1]) | |
| recz.append(recz[i] + dz[i + 1]) | |
| # recx = [recx[i]+dx[i+1] for i in range(dx.shape[0]-1)] | |
| # recz = [recz[i]+dz[i+1] for i in range(dz.shape[0]-1)] | |
| # recx = dx[:-1] + dx[1:] | |
| # recz = dz[:-1] + dz[1:] | |
| if self.position_smoothing > 0: | |
| new_df[xpcol] = pd.Series(data=new_df[xpcol] + recx, index=new_df.index) | |
| new_df[zpcol] = pd.Series(data=new_df[zpcol] + recz, index=new_df.index) | |
| else: | |
| new_df[xpcol] = pd.Series(data=recx, index=new_df.index) | |
| new_df[zpcol] = pd.Series(data=recz, index=new_df.index) | |
| new_df.drop([dxpcol, dzpcol], axis=1, inplace=True) | |
| new_track.values = new_df | |
| # end of abdolute_translation_deltas | |
| elif self.method == "pos_rot_deltas": | |
| # Absolute columns | |
| rot_order = track.skeleton[track.root_name]["order"] | |
| xp_col = "%s_Xposition" % track.root_name | |
| yp_col = "%s_Yposition" % track.root_name | |
| zp_col = "%s_Zposition" % track.root_name | |
| xr_col = "%s_Xrotation" % track.root_name | |
| yr_col = "%s_Yrotation" % track.root_name | |
| zr_col = "%s_Zrotation" % track.root_name | |
| r1_col = "%s_%srotation" % (track.root_name, rot_order[0]) | |
| r2_col = "%s_%srotation" % (track.root_name, rot_order[1]) | |
| r3_col = "%s_%srotation" % (track.root_name, rot_order[2]) | |
| # Delta columns | |
| # dxp_col = '%s_dXposition'%track.root_name | |
| # dzp_col = '%s_dZposition'%track.root_name | |
| # dyr_col = '%s_dYrotation'%track.root_name | |
| dxp_col = "reference_dXposition" | |
| dzp_col = "reference_dZposition" | |
| dyr_col = "reference_dYrotation" | |
| positions = np.transpose(np.array([track.values[xp_col], track.values[yp_col], track.values[zp_col]])) | |
| rotations = ( | |
| np.pi | |
| / 180.0 | |
| * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) | |
| ) | |
| quats = Quaternions.from_euler(rotations, order=rot_order.lower(), world=False) | |
| new_df = track.values.copy() | |
| dx = track.values[dxp_col].values | |
| dz = track.values[dzp_col].values | |
| dry = track.values[dyr_col].values | |
| # rec_p = np.array([startx, 0, startz])+positions[0,:] | |
| rec_ry = Quaternions.id(quats.shape[0]) | |
| rec_xp = [0] | |
| rec_zp = [0] | |
| # rec_r = Quaternions.id(quats.shape[0]) | |
| for i in range(dx.shape[0] - 1): | |
| # print(dry[i]) | |
| q_y = Quaternions.from_angle_axis(np.array(dry[i + 1]), np.array([0, 1, 0])) | |
| rec_ry[i + 1] = q_y * rec_ry[i] | |
| # print("dx: + " + str(dx[i+1])) | |
| dp = rec_ry[i + 1] * np.array([dx[i + 1], 0, dz[i + 1]]) | |
| rec_xp.append(rec_xp[i] + dp[0, 0]) | |
| rec_zp.append(rec_zp[i] + dp[0, 2]) | |
| if self.separate_root: | |
| qq = quats | |
| xx = positions[:, 0] | |
| zz = positions[:, 2] | |
| else: | |
| qq = rec_ry * self.root_rotation_offset * quats | |
| pp = rec_ry * positions | |
| xx = rec_xp + pp[:, 0] | |
| zz = rec_zp + pp[:, 2] | |
| eulers = ( | |
| np.array([t3d.euler.quat2euler(q, axes=("s" + rot_order.lower()[::-1]))[::-1] for q in qq]) | |
| * 180.0 | |
| / np.pi | |
| ) | |
| new_df = track.values.copy() | |
| root_rot_1 = pd.Series(data=eulers[:, 0], index=new_df.index) | |
| root_rot_2 = pd.Series(data=eulers[:, 1], index=new_df.index) | |
| root_rot_3 = pd.Series(data=eulers[:, 2], index=new_df.index) | |
| new_df[xp_col] = pd.Series(data=xx, index=new_df.index) | |
| new_df[zp_col] = pd.Series(data=zz, index=new_df.index) | |
| new_df[r1_col] = pd.Series(data=root_rot_1, index=new_df.index) | |
| new_df[r2_col] = pd.Series(data=root_rot_2, index=new_df.index) | |
| new_df[r3_col] = pd.Series(data=root_rot_3, index=new_df.index) | |
| if self.separate_root: | |
| new_df["reference_Xposition"] = pd.Series(data=rec_xp, index=new_df.index) | |
| new_df["reference_Zposition"] = pd.Series(data=rec_zp, index=new_df.index) | |
| eulers_ry = ( | |
| np.array([t3d.euler.quat2euler(q, axes=("s" + rot_order.lower()[::-1]))[::-1] for q in rec_ry]) | |
| * 180.0 | |
| / np.pi | |
| ) | |
| new_df["reference_Yrotation"] = pd.Series( | |
| data=eulers_ry[:, rot_order.find("Y")], index=new_df.index | |
| ) | |
| new_df.drop([dyr_col, dxp_col, dzp_col], axis=1, inplace=True) | |
| new_track.values = new_df | |
| elif self.method == "pos_xyz_rot_deltas": | |
| # Absolute columns | |
| rot_order = track.skeleton[track.root_name]["order"] | |
| xp_col = "%s_Xposition" % track.root_name | |
| yp_col = "%s_Yposition" % track.root_name | |
| zp_col = "%s_Zposition" % track.root_name | |
| xr_col = "%s_Xrotation" % track.root_name | |
| yr_col = "%s_Yrotation" % track.root_name | |
| zr_col = "%s_Zrotation" % track.root_name | |
| r1_col = "%s_%srotation" % (track.root_name, rot_order[0]) | |
| r2_col = "%s_%srotation" % (track.root_name, rot_order[1]) | |
| r3_col = "%s_%srotation" % (track.root_name, rot_order[2]) | |
| # Delta columns | |
| # dxp_col = '%s_dXposition'%track.root_name | |
| # dzp_col = '%s_dZposition'%track.root_name | |
| # dyr_col = '%s_dYrotation'%track.root_name | |
| dxp_col = "reference_dXposition" | |
| dyp_col = "reference_dYposition" | |
| dzp_col = "reference_dZposition" | |
| dyr_col = "reference_dYrotation" | |
| positions = np.transpose(np.array([track.values[xp_col], track.values[yp_col], track.values[zp_col]])) | |
| rotations = ( | |
| np.pi | |
| / 180.0 | |
| * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) | |
| ) | |
| quats = Quaternions.from_euler(rotations, order=rot_order.lower(), world=False) | |
| new_df = track.values.copy() | |
| dx = track.values[dxp_col].values | |
| dy = track.values[dyp_col].values | |
| dz = track.values[dzp_col].values | |
| dry = track.values[dyr_col].values | |
| # rec_p = np.array([startx, 0, startz])+positions[0,:] | |
| rec_ry = Quaternions.id(quats.shape[0]) | |
| rec_xp = [0] | |
| rec_yp = [0] | |
| rec_zp = [0] | |
| # rec_r = Quaternions.id(quats.shape[0]) | |
| for i in range(dx.shape[0] - 1): | |
| # print(dry[i]) | |
| q_y = Quaternions.from_angle_axis(np.array(dry[i + 1]), np.array([0, 1, 0])) | |
| rec_ry[i + 1] = q_y * rec_ry[i] | |
| # print("dx: + " + str(dx[i+1])) | |
| dp = rec_ry[i + 1] * np.array([dx[i + 1], dy[i + 1], dz[i + 1]]) | |
| rec_xp.append(rec_xp[i] + dp[0, 0]) | |
| rec_yp.append(rec_yp[i] + dp[0, 1]) | |
| rec_zp.append(rec_zp[i] + dp[0, 2]) | |
| if self.separate_root: | |
| qq = quats | |
| xx = positions[:, 0] | |
| yy = positions[:, 1] | |
| zz = positions[:, 2] | |
| else: | |
| qq = rec_ry * self.root_rotation_offset * quats | |
| pp = rec_ry * positions | |
| xx = rec_xp + pp[:, 0] | |
| yy = rec_yp + pp[:, 1] | |
| zz = rec_zp + pp[:, 2] | |
| eulers = ( | |
| np.array([t3d.euler.quat2euler(q, axes=("s" + rot_order.lower()[::-1]))[::-1] for q in qq]) | |
| * 180.0 | |
| / np.pi | |
| ) | |
| new_df = track.values.copy() | |
| root_rot_1 = pd.Series(data=eulers[:, 0], index=new_df.index) | |
| root_rot_2 = pd.Series(data=eulers[:, 1], index=new_df.index) | |
| root_rot_3 = pd.Series(data=eulers[:, 2], index=new_df.index) | |
| new_df[xp_col] = pd.Series(data=xx, index=new_df.index) | |
| new_df[yp_col] = pd.Series(data=yy, index=new_df.index) | |
| new_df[zp_col] = pd.Series(data=zz, index=new_df.index) | |
| new_df[r1_col] = pd.Series(data=root_rot_1, index=new_df.index) | |
| new_df[r2_col] = pd.Series(data=root_rot_2, index=new_df.index) | |
| new_df[r3_col] = pd.Series(data=root_rot_3, index=new_df.index) | |
| if self.separate_root: | |
| new_df["reference_Xposition"] = pd.Series(data=rec_xp, index=new_df.index) | |
| new_df["reference_Yposition"] = pd.Series(data=rec_yp, index=new_df.index) | |
| new_df["reference_Zposition"] = pd.Series(data=rec_zp, index=new_df.index) | |
| eulers_ry = ( | |
| np.array([t3d.euler.quat2euler(q, axes=("s" + rot_order.lower()[::-1]))[::-1] for q in rec_ry]) | |
| * 180.0 | |
| / np.pi | |
| ) | |
| new_df["reference_Yrotation"] = pd.Series( | |
| data=eulers_ry[:, rot_order.find("Y")], index=new_df.index | |
| ) | |
| new_df.drop([dyr_col, dxp_col, dyp_col, dzp_col], axis=1, inplace=True) | |
| new_track.values = new_df | |
| # print(new_track.values.columns) | |
| Q.append(new_track) | |
| return Q | |
| class RootCentricPositionNormalizer(BaseEstimator, TransformerMixin): | |
| def __init__(self): | |
| pass | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| Q = [] | |
| for track in X: | |
| new_track = track.clone() | |
| rxp = "%s_Xposition" % track.root_name | |
| ryp = "%s_Yposition" % track.root_name | |
| rzp = "%s_Zposition" % track.root_name | |
| projected_root_pos = track.values[[rxp, ryp, rzp]] | |
| projected_root_pos.loc[:, ryp] = 0 # we want the root's projection on the floor plane as the ref | |
| new_df = pd.DataFrame(index=track.values.index) | |
| all_but_root = [joint for joint in track.skeleton if track.root_name not in joint] | |
| # all_but_root = [joint for joint in track.skeleton] | |
| for joint in all_but_root: | |
| new_df["%s_Xposition" % joint] = pd.Series( | |
| data=track.values["%s_Xposition" % joint] - projected_root_pos[rxp], index=new_df.index | |
| ) | |
| new_df["%s_Yposition" % joint] = pd.Series( | |
| data=track.values["%s_Yposition" % joint] - projected_root_pos[ryp], index=new_df.index | |
| ) | |
| new_df["%s_Zposition" % joint] = pd.Series( | |
| data=track.values["%s_Zposition" % joint] - projected_root_pos[rzp], index=new_df.index | |
| ) | |
| # keep the root as it is now | |
| new_df[rxp] = track.values[rxp] | |
| new_df[ryp] = track.values[ryp] | |
| new_df[rzp] = track.values[rzp] | |
| new_track.values = new_df | |
| Q.append(new_track) | |
| return Q | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| new_track = track.clone() | |
| rxp = "%s_Xposition" % track.root_name | |
| ryp = "%s_Yposition" % track.root_name | |
| rzp = "%s_Zposition" % track.root_name | |
| projected_root_pos = track.values[[rxp, ryp, rzp]] | |
| projected_root_pos.loc[:, ryp] = 0 # we want the root's projection on the floor plane as the ref | |
| new_df = pd.DataFrame(index=track.values.index) | |
| for joint in track.skeleton: | |
| new_df["%s_Xposition" % joint] = pd.Series( | |
| data=track.values["%s_Xposition" % joint] + projected_root_pos[rxp], index=new_df.index | |
| ) | |
| new_df["%s_Yposition" % joint] = pd.Series( | |
| data=track.values["%s_Yposition" % joint] + projected_root_pos[ryp], index=new_df.index | |
| ) | |
| new_df["%s_Zposition" % joint] = pd.Series( | |
| data=track.values["%s_Zposition" % joint] + projected_root_pos[rzp], index=new_df.index | |
| ) | |
| new_track.values = new_df | |
| Q.append(new_track) | |
| return Q | |
| class Flattener(BaseEstimator, TransformerMixin): | |
| def __init__(self): | |
| pass | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| return np.concatenate(X, axis=0) | |
| class ConstantsRemover(BaseEstimator, TransformerMixin): | |
| """ | |
| For now it just looks at the first track | |
| """ | |
| def __init__(self, eps=1e-6): | |
| self.eps = eps | |
| def fit(self, X, y=None): | |
| stds = X[0].values.std() | |
| cols = X[0].values.columns.values | |
| self.const_dims_ = [c for c in cols if (stds[c] < self.eps).any()] | |
| self.const_values_ = {c: X[0].values[c].values[0] for c in cols if (stds[c] < self.eps).any()} | |
| return self | |
| def transform(self, X, y=None): | |
| Q = [] | |
| for track in X: | |
| t2 = track.clone() | |
| # for key in t2.skeleton.keys(): | |
| # if key in self.ConstDims_: | |
| # t2.skeleton.pop(key) | |
| # print(track.values.columns.difference(self.const_dims_)) | |
| t2.values.drop(self.const_dims_, axis=1, inplace=True) | |
| # t2.values = track.values[track.values.columns.difference(self.const_dims_)] | |
| Q.append(t2) | |
| return Q | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| t2 = track.clone() | |
| for d in self.const_dims_: | |
| t2.values[d] = self.const_values_[d] | |
| # t2.values.assign(d=pd.Series(data=self.const_values_[d], index = t2.values.index)) | |
| Q.append(t2) | |
| return Q | |
| class ListStandardScaler(BaseEstimator, TransformerMixin): | |
| def __init__(self, is_DataFrame=False): | |
| self.is_DataFrame = is_DataFrame | |
| def fit(self, X, y=None): | |
| if self.is_DataFrame: | |
| X_train_flat = np.concatenate([m.values for m in X], axis=0) | |
| else: | |
| X_train_flat = np.concatenate([m for m in X], axis=0) | |
| self.data_mean_ = np.mean(X_train_flat, axis=0) | |
| self.data_std_ = np.std(X_train_flat, axis=0) | |
| return self | |
| def transform(self, X, y=None): | |
| Q = [] | |
| for track in X: | |
| if self.is_DataFrame: | |
| normalized_track = track.copy() | |
| normalized_track.values = (track.values - self.data_mean_) / self.data_std_ | |
| else: | |
| normalized_track = (track - self.data_mean_) / self.data_std_ | |
| Q.append(normalized_track) | |
| if self.is_DataFrame: | |
| return Q | |
| else: | |
| return np.array(Q) | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| if self.is_DataFrame: | |
| unnormalized_track = track.copy() | |
| unnormalized_track.values = (track.values * self.data_std_) + self.data_mean_ | |
| else: | |
| unnormalized_track = (track * self.data_std_) + self.data_mean_ | |
| Q.append(unnormalized_track) | |
| if self.is_DataFrame: | |
| return Q | |
| else: | |
| return np.array(Q) | |
| class ListMinMaxScaler(BaseEstimator, TransformerMixin): | |
| def __init__(self, is_DataFrame=False): | |
| self.is_DataFrame = is_DataFrame | |
| def fit(self, X, y=None): | |
| if self.is_DataFrame: | |
| X_train_flat = np.concatenate([m.values for m in X], axis=0) | |
| else: | |
| X_train_flat = np.concatenate([m for m in X], axis=0) | |
| self.data_max_ = np.max(X_train_flat, axis=0) | |
| self.data_min_ = np.min(X_train_flat, axis=0) | |
| return self | |
| def transform(self, X, y=None): | |
| Q = [] | |
| for track in X: | |
| if self.is_DataFrame: | |
| normalized_track = track.copy() | |
| normalized_track.values = (track.values - self.data_min_) / (self.data_max_ - self.data_min_) | |
| else: | |
| normalized_track = (track - self.data_min_) / (self.data_max_ - self.data_min_) | |
| Q.append(normalized_track) | |
| if self.is_DataFrame: | |
| return Q | |
| else: | |
| return np.array(Q) | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| if self.is_DataFrame: | |
| unnormalized_track = track.copy() | |
| unnormalized_track.values = (track.values * (self.data_max_ - self.data_min_)) + self.data_min_ | |
| else: | |
| unnormalized_track = (track * (self.data_max_ - self.data_min_)) + self.data_min_ | |
| Q.append(unnormalized_track) | |
| if self.is_DataFrame: | |
| return Q | |
| else: | |
| return np.array(Q) | |
| class Resampler(BaseEstimator, TransformerMixin): | |
| def __init__(self, fps, method="cubic"): | |
| """ | |
| Method to resample a pandas dataframe to a different framerate. | |
| NOTE: Pandas resampling is quit unintuitive when resampling to odd framerates using interpolation. | |
| Thus we do it in this complex way. | |
| """ | |
| self.tgt_frametime = 1.0 / fps | |
| self.method = method | |
| def fit(self, X, y=None): | |
| print("Resampling to tgt_frametime: " + str(self.tgt_frametime)) | |
| self.orig_frametime = X[0].framerate | |
| return self | |
| def resample_dataframe(self, df, frametime, method="cubic"): | |
| # Create a time index for the resampled data | |
| rate = str(round(1.0e9 * frametime)) + "N" | |
| time_index = df.resample(rate).indices | |
| # reindex the old data. This will turn all non-matching indices to NAN | |
| tmp = df.reindex(time_index) | |
| # merge with the old data and sort | |
| tmp = pd.concat([df, tmp]).sort_index() | |
| # remove duplicate time indices. Then fill the NAN values using interpolation | |
| tmp = tmp[~tmp.index.duplicated(keep="first")].interpolate(method=method) | |
| # return the values using the resampled indices | |
| return tmp.loc[list(time_index)] | |
| def resample_poly_df(self, df, new_frametime, old_frametime): | |
| old_fps = round(1 / old_frametime) | |
| new_fps = round(1 / new_frametime) | |
| lcm = np.lcm(old_fps, new_fps) | |
| up = lcm // old_fps | |
| down = lcm // new_fps | |
| new_vals = signal.resample_poly(df.values, up, down) | |
| time_index = pd.to_timedelta([f for f in range(new_vals.shape[0])], unit="s") * new_frametime | |
| new_df = pd.DataFrame(data=new_vals, index=time_index, columns=df.columns) | |
| return new_df | |
| def transform(self, X, y=None): | |
| Q = [] | |
| for track in X: | |
| new_track = track.clone() | |
| # new_track.values = self.resample_dataframe(track.values, self.tgt_frametime, method=self.method) | |
| new_track.values = self.resample_poly_df(track.values, self.tgt_frametime, track.framerate) | |
| new_track.framerate = self.tgt_frametime | |
| Q.append(new_track) | |
| return Q | |
| def inverse_transform(self, X, copy=None): | |
| Q = [] | |
| for track in X: | |
| new_track = track.clone() | |
| # new_track.values = self.resample_dataframe(track.values, self.orig_frametime, method=self.method) | |
| new_track.values = self.resample_poly_df(track.values, self.orig_frametime, track.framerate) | |
| new_track.framerate = self.orig_frametime | |
| Q.append(new_track) | |
| return Q | |
| class DownSampler(BaseEstimator, TransformerMixin): | |
| def __init__(self, tgt_fps, keep_all=False): | |
| self.tgt_fps = tgt_fps | |
| self.keep_all = keep_all | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| Q = [] | |
| for track in X: | |
| orig_fps = round(1.0 / track.framerate) | |
| rate = orig_fps // self.tgt_fps | |
| if orig_fps % self.tgt_fps != 0: | |
| print( | |
| "error orig_fps (" + str(orig_fps) + ") is not dividable with tgt_fps (" + str(self.tgt_fps) + ")" | |
| ) | |
| else: | |
| print("downsampling with rate: " + str(rate)) | |
| # print(track.values.size) | |
| for ii in range(0, rate): | |
| new_track = track.clone() | |
| if self.keep_all: | |
| new_track.take_name = new_track.take_name + "_" + str(ii).zfill(2) | |
| new_track.values = track.values[ii:-1:rate].copy() | |
| # print(new_track.values.size) | |
| # new_track = track[0:-1:self.rate] | |
| new_track.framerate = 1.0 / self.tgt_fps | |
| Q.append(new_track) | |
| if not self.keep_all: | |
| break | |
| return Q | |
| def inverse_transform(self, X, copy=None): | |
| return X | |
| class ReverseTime(BaseEstimator, TransformerMixin): | |
| def __init__(self, append=True): | |
| self.append = append | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| print("ReverseTime") | |
| Q = [] | |
| if self.append: | |
| for track in X: | |
| Q.append(track) | |
| for track in X: | |
| new_track = track.clone() | |
| new_track.values = track.values[-1::-1] | |
| new_track.values.index = new_track.values.index[0] - new_track.values.index | |
| Q.append(new_track) | |
| return Q | |
| def inverse_transform(self, X, copy=None): | |
| return X | |
| class ListFeatureUnion(BaseEstimator, TransformerMixin): | |
| def __init__(self, processors): | |
| self.processors = processors | |
| def fit(self, X, y=None): | |
| assert y is None | |
| for proc in self.processors: | |
| if isinstance(proc, Pipeline): | |
| # Loop steps and run fit on each. This is necessary since | |
| # running fit on a Pipeline runs fit_transform on all steps | |
| # and not only fit. | |
| for step in proc.steps: | |
| step[1].fit(X) | |
| else: | |
| proc.fit(X) | |
| return self | |
| def transform(self, X, y=None): | |
| assert y is None | |
| print("ListFeatureUnion") | |
| Q = [] | |
| idx = 0 | |
| for proc in self.processors: | |
| Z = proc.transform(X) | |
| if idx == 0: | |
| Q = Z | |
| else: | |
| assert len(Q) == len(Z) | |
| for idx2, track in enumerate(Z): | |
| Q[idx2].values = pd.concat([Q[idx2].values, Z[idx2].values], axis=1) | |
| idx += 1 | |
| return Q | |
| def inverse_transform(self, X, y=None): | |
| return X | |
| class RollingStatsCalculator(BaseEstimator, TransformerMixin): | |
| """ | |
| Creates a causal mean and std filter with a rolling window of length win (based on using prev and current values) | |
| """ | |
| def __init__(self, win): | |
| self.win = win | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| print("RollingStatsCalculator: " + str(self.win)) | |
| Q = [] | |
| for track in X: | |
| new_track = track.clone() | |
| mean_df = track.values.rolling(window=self.win).mean() | |
| std_df = track.values.rolling(window=self.win).std() | |
| # rolling.mean results in Nans in start seq. Here we fill these | |
| win = min(self.win, new_track.values.shape[0]) | |
| for i in range(1, win): | |
| mm = track.values[:i].rolling(window=i).mean() | |
| ss = track.values[:i].rolling(window=i).std() | |
| mean_df.iloc[i - 1] = mm.iloc[i - 1] | |
| std_df.iloc[i - 1] = ss.iloc[i - 1] | |
| std_df.iloc[0] = std_df.iloc[1] | |
| # Append to | |
| new_track.values = pd.concat([mean_df.add_suffix("_mean"), std_df.add_suffix("_std")], axis=1) | |
| Q.append(new_track) | |
| return Q | |
| def inverse_transform(self, X, copy=None): | |
| return X | |
| class FeatureCounter(BaseEstimator, TransformerMixin): | |
| def __init__(self): | |
| pass | |
| def fit(self, X, y=None): | |
| self.n_features = len(X[0].values.columns) | |
| return self | |
| def transform(self, X, y=None): | |
| return X | |
| def inverse_transform(self, X, copy=None): | |
| return X | |
| # TODO: JointsSelector (x) | |
| # TODO: SegmentMaker | |
| # TODO: DynamicFeaturesAdder | |
| # TODO: ShapeFeaturesAdder | |
| # TODO: DataFrameNumpier (x) | |
| class TemplateTransform(BaseEstimator, TransformerMixin): | |
| def __init__(self): | |
| pass | |
| def fit(self, X, y=None): | |
| return self | |
| def transform(self, X, y=None): | |
| return X | |