### Packages import

In [68]:
# .\movella\Scripts\activate


In [69]:
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import untangle
import numpy as np
import os
import random


In [70]:
import scipy

In [71]:
import mvnx

In [72]:
import tensorflow as tf
from tensorflow.keras.utils import to_categorical


In [73]:
folder_path = 'Data/bench/'


### Data preprocessing and padding

Bench samples

In [74]:
import os
import numpy as np
import tensorflow as tf
from mvnx import MVNX

# Specify the path to the folder containing MVNX files
mvnx_files = [f for f in os.listdir(folder_path) if f.endswith('.mvnx')]

# Lists to store processed tensor data and labels
tensor_data_list = []

# Iterate through each MVNX file
for mvnx_file in mvnx_files:
 mvnx_path = os.path.join(folder_path, mvnx_file)
 
 # Load MVNX data and extract position data
 data = MVNX(mvnx_path)
 position_data = data.get_info('position') # Change to the appropriate attribute

 # Convert position data to a tensor
 position_tensor = np.array([list(frame.values()) for frame in position_data])

 # Get the dimensions of the position tensor
 num_frames, num_joints, num_features = position_tensor.shape
 
 # Determine the desired sequence length
 max_sequence_length = 1200 # Choose an appropriate value

 # Calculate the amount of padding needed
 padding_amount = max_sequence_length - num_frames

 # Create a tensor with zeros to represent padding
 padding_tensor = tf.zeros((padding_amount, num_joints, num_features))

 # Concatenate the padding tensor to the position tensor along the first axis
 padded_position_tensor = tf.concat([position_tensor, padding_tensor], axis=0)

 # Append the padded tensor data to the list
 tensor_data_list.append(padded_position_tensor)

# Convert the list of tensors to a TensorFlow tensor
tensor_data = tf.convert_to_tensor(tensor_data_list, dtype=tf.float32)

# Now tensor_data has shape (num_samples, max_sequence_length, num_joints, num_features)


In [75]:
tensor_data.shape

TensorShape([5, 1200, 23, 3])

Squat samples

In [76]:
folder_path = 'Data/squat/'


In [77]:


# Specify the path to the folder containing MVNX files
mvnx_files_squat = [f for f in os.listdir(folder_path) if f.endswith('.mvnx')]

# Lists to store processed tensor data and labels
tensor_data_list_squat = []

# Iterate through each MVNX file
for mvnx_file in mvnx_files_squat:
 mvnx_path = os.path.join(folder_path, mvnx_file)
 
 # Load MVNX data and extract position data
 data = MVNX(mvnx_path)
 position_data = data.get_info('position') # Change to the appropriate attribute

 # Convert position data to a tensor
 position_tensor = np.array([list(frame.values()) for frame in position_data])

 # Get the dimensions of the position tensor
 num_frames, num_joints, num_features = position_tensor.shape
 
 # Determine the desired sequence length
 max_sequence_length = 1200 # Choose an appropriate value

 # Calculate the amount of padding needed
 padding_amount = max_sequence_length - num_frames

 # Create a tensor with zeros to represent padding
 padding_tensor = tf.zeros((padding_amount, num_joints, num_features))

 # Concatenate the padding tensor to the position tensor along the first axis
 padded_position_tensor = tf.concat([position_tensor, padding_tensor], axis=0)

 # Append the padded tensor data to the list
 tensor_data_list_squat.append(padded_position_tensor)

# Convert the list of tensors to a TensorFlow tensor
tensor_data_squat = tf.convert_to_tensor(tensor_data_list_squat, dtype=tf.float32)

# Now tensor_data has shape (num_samples, max_sequence_length, num_joints, num_features)


In [78]:
tensor_data_squat.shape

TensorShape([2, 1200, 23, 3])

### Data plotting

In [80]:
import numpy as np
import plotly.graph_objs as go

# Assuming you have a tensor named 'tensor_data' with shape (num_samples, num_timesteps, num_joints, num_features)

# Convert the tensor to a NumPy array
tensor_array = tensor_data.numpy()

# Choose a sample to visualize
sample_index = 0

# Get the number of joints and timesteps
num_joints = tensor_array.shape[2]
num_timesteps = tensor_array.shape[1]

# Joint names dictionary
joint_names = [
 'pelvis', 'l5', 'l3', 't12', 't8', 'neck', 'head',
 'right_shoulder', 'right_upper_arm', 'right_forearm', 'right_hand',
 'left_shoulder', 'left_upper_arm', 'left_forearm', 'left_hand',
 'right_upper_leg', 'right_lower_leg', 'right_foot', 'right_toe',
 'left_upper_leg', 'left_lower_leg', 'left_foot', 'left_toe'
]

# Create a list to store scatter plot traces
traces = []

# Iterate through all joints
for joint_index, joint_name in enumerate(joint_names):
 # Extract the X, Y, and Z coordinates of the joint over all time steps
 x_coords = tensor_array[sample_index, :, joint_index, 0]
 y_coords = tensor_array[sample_index, :, joint_index, 1]
 z_coords = tensor_array[sample_index, :, joint_index, 2]
 
 # Create a scatter plot trace for the joint's movement trajectory
 trace = go.Scatter3d(
 x=x_coords,
 y=y_coords,
 z=z_coords,
 mode='lines',
 name=joint_name
 )
 traces.append(trace)

# Create the figure
fig = go.Figure(data=traces)

# Set layout
fig.update_layout(
 scene=dict(
 xaxis_title='X',
 yaxis_title='Y',
 zaxis_title='Z'
 ),
 title='Interactive 3D Movement Trajectories of All Joints'
)

# Show the interactive plot in a web browser
fig.show()



In [81]:
import numpy as np
import plotly.graph_objs as go

def plot_3d_joint_trajectories(tensor_data, sample_index):
 """
 Plot interactive 3D movement trajectories of joints using Plotly.

 Parameters:
 tensor_data (np.ndarray): Tensor data of shape (num_samples, num_timesteps, num_joints, num_features).
 sample_index (int): Index of the sample to visualize.

 Returns:
 None
 """
 # Assuming tensor_data is a NumPy array with shape (num_samples, num_timesteps, num_joints, num_features)

 # Choose a sample to visualize
 sample = tensor_data[sample_index]

 # Get the number of joints and timesteps
 num_joints = sample.shape[1]
 num_timesteps = sample.shape[0]

 # Create a list to store scatter plot traces
 traces = []

 # Iterate through all joints
 for joint_index in range(num_joints):
 # Extract the X, Y, and Z coordinates of the joint over all time steps
 x_coords = sample[:, joint_index, 0]
 y_coords = sample[:, joint_index, 1]
 z_coords = sample[:, joint_index, 2]

 # Create a scatter plot trace for the joint's movement trajectory
 trace = go.Scatter3d(
 x=x_coords,
 y=y_coords,
 z=z_coords,
 mode='lines',
 name=f'Joint {joint_index}'
 )
 traces.append(trace)

 # Create the figure
 fig = go.Figure(data=traces)

 # Set layout
 fig.update_layout(
 scene=dict(
 xaxis_title='X',
 yaxis_title='Y',
 zaxis_title='Z'
 ),
 title='Interactive 3D Movement Trajectories of All Joints'
 )

 # Show the interactive plot in a web browser
 fig.show()




### Data Augmentation

Data Augmentation - Random translation

In [82]:
import numpy as np
import tensorflow as tf

def random_translation(sample, max_translation=0.002):
 """
 Apply random translation to the joint coordinates in a sample.

 Parameters:
 sample (tf.Tensor): Single sample of shape (num_timesteps, num_joints, num_features).
 max_translation (float): Maximum translation in each direction. Default is 0.002.

 Returns:
 translated_sample (tf.Tensor): Translated sample.
 """
 translation = np.random.uniform(-max_translation, max_translation, size=sample.shape)
 translated_sample = sample + translation
 return translated_sample

def generate_rand_transl_samples(original_data, num_augmented_samples=5, max_translation=0.002):
 """
 Generate augmented samples by applying random translation to joint positions.

 Parameters:
 original_data (tf.Tensor): Original data of shape (num_samples, num_timesteps, num_joints, num_features).
 num_augmented_samples (int): Number of augmented samples to generate per original sample. Default is 5.
 max_translation (float): Maximum translation in each direction. Default is 0.002.

 Returns:
 augmented_data (tf.Tensor): Augmented data of shape (num_samples * num_augmented_samples, num_timesteps, num_joints, num_features).
 """
 augmented_samples = []

 for sample_index in range(original_data.shape[0]):
 original_sample = original_data[sample_index]

 for _ in range(num_augmented_samples):
 augmented_sample = random_translation(original_sample, max_translation)
 augmented_samples.append(augmented_sample)

 augmented_data = tf.stack(augmented_samples)

 return augmented_data

# Assuming you have 'tensor_data' with shape (num_samples, num_timesteps, num_joints, num_features)

# Number of augmented samples to create
num_augmented_samples = 5

# Generate augmented samples
augmented_data = generate_rand_transl_samples(tensor_data, num_augmented_samples)

# Print the shapes of the original data and augmented data
print("Original Data Shape:", tensor_data.shape)
print("Augmented Data Shape:", augmented_data.shape)


Original Data Shape: (5, 1200, 23, 3)
Augmented Data Shape: (25, 1200, 23, 3)


In [83]:
plot_3d_joint_trajectories(augmented_data, 1)

Data Augmentation - augmented samples by perturbing

In [84]:
def perturb_joint_positions(sample, max_perturbation=0.01):
 """
 Perturb joint positions in a coherent manner to simulate variation.

 Parameters:
 sample (np.ndarray): Single sample of shape (num_timesteps, num_joints, num_features).
 max_perturbation (float): Maximum perturbation value. Default is 0.01.

 Returns:
 perturbed_sample (np.ndarray): Perturbed sample.
 """
 perturbation = np.random.uniform(-max_perturbation, max_perturbation, size=sample.shape)
 perturbed_sample = sample + perturbation

 return perturbed_sample

In [85]:
import numpy as np

def generate_augmented_samples(original_samples, num_augmentations=5, max_perturbation=0.005):
 """
 Generate augmented samples by perturbing joint positions.

 Parameters:
 original_samples (np.ndarray): Original samples of shape (num_samples, num_timesteps, num_joints, num_features).
 num_augmentations (int): Number of augmented samples to generate per original sample. Default is 5.
 max_perturbation (float): Maximum perturbation value. Default is 0.01.

 Returns:
 augmented_samples (np.ndarray): Augmented samples of shape (num_samples * num_augmentations, num_timesteps, num_joints, num_features).
 """
 augmented_samples = []

 for sample_index in range(original_samples.shape[0]):
 original_sample = original_samples[sample_index]

 for _ in range(num_augmentations):
 perturbed_sample = perturb_joint_positions(original_sample, max_perturbation)
 augmented_samples.append(perturbed_sample)

 augmented_samples = tf.stack(augmented_samples)

 return augmented_samples

# Assuming you have 'tensor_data' with shape (num_samples, num_timesteps, num_joints, num_features)

# Generate augmented samples
num_augmentations = 5
max_perturbation = 0.01
augmented_samples = generate_augmented_samples(tensor_data, num_augmentations, max_perturbation)

# Print the shape of the augmented samples
print("Original Samples Shape:", tensor_data.shape)
print("Augmented Samples Shape:", augmented_samples.shape)


Original Samples Shape: (5, 1200, 23, 3)
Augmented Samples Shape: (25, 1200, 23, 3)


In [86]:
augmented_data = generate_augmented_samples(tensor_data, num_augmentations=5, max_perturbation=0.005)

In [87]:
plot_3d_joint_trajectories(augmented_data, 1)

Data Augmentation - scaling

In [88]:
import numpy as np

def random_scaling(sample, min_scale=0.5, max_scale=1.5):
 """
 Apply random scaling to the joint coordinates in a sample.

 Parameters:
 sample (np.ndarray): Single sample of shape (num_timesteps, num_joints, num_features).
 min_scale (float): Minimum scaling factor. Default is 0.9.
 max_scale (float): Maximum scaling factor. Default is 1.1.

 Returns:
 scaled_sample (np.ndarray): Scaled sample.
 """
 # Generate random scaling factors for each coordinate (X, Y, Z)
 scaling_factors = np.random.uniform(min_scale, max_scale, size=(1, sample.shape[1], 1)) # Shape: (1, num_joints, 1)

 # Apply scaling to the joint coordinates
 scaled_sample = sample * scaling_factors

 return scaled_sample



In [89]:
def generate_scaled_samples(original_samples, num_samples=5, min_scale=0.5, max_scale=1.5):
 """
 Generate scaled samples by applying random scaling to joint positions.

 Parameters:
 original_samples (np.ndarray): Original samples of shape (num_samples, num_timesteps, num_joints, num_features).
 num_samples (int): Number of scaled samples to generate per original sample. Default is 5.
 min_scale (float): Minimum scaling factor. Default is 0.9.
 max_scale (float): Maximum scaling factor. Default is 1.1.

 Returns:
 scaled_samples (np.ndarray): Scaled samples of shape (num_samples * num_samples, num_timesteps, num_joints, num_features).
 """
 scaled_samples = []

 for sample_index in range(original_samples.shape[0]):
 original_sample = original_samples[sample_index]

 for _ in range(num_samples):
 scaled_sample = random_scaling(original_sample, min_scale, max_scale)
 scaled_samples.append(scaled_sample)

 scaled_samples = tf.stack(scaled_samples)

 return scaled_samples

# Assuming you have 'tensor_data' with shape (num_samples, num_timesteps, num_joints, num_features)

# Generate scaled samples
num_scaled_samples = 5
min_scale = 0.9
max_scale = 1.1
scaled_samples = generate_scaled_samples(tensor_data, num_scaled_samples, min_scale, max_scale)

# Print the shape of the scaled samples
print("Original Samples Shape:", tensor_data.shape)
print("Scaled Samples Shape:", scaled_samples.shape)

Original Samples Shape: (5, 1200, 23, 3)
Scaled Samples Shape: (25, 1200, 23, 3)


In [90]:
plot_3d_joint_trajectories(scaled_samples, 1)


In [91]:
plot_3d_joint_trajectories(tensor_data, 1)


### Data set generation

Bench data set generation

In [92]:
# Augmentation functions:
# generate_scaled_samples(original_samples)
# generate_augmented_samples
# generate_rand_transl_samples


In [93]:
aug_scale = generate_scaled_samples(tensor_data, 100)

In [94]:
aug_scale.shape

TensorShape([500, 1200, 23, 3])

In [95]:
aug_pert = generate_augmented_samples(tensor_data, 100)

In [96]:
aug_pert.shape

TensorShape([500, 1200, 23, 3])

In [97]:
aug_tran = generate_rand_transl_samples(tensor_data, 100)

In [98]:
aug_tran.shape

TensorShape([500, 1200, 23, 3])

In [99]:
bench_tensors_list = [aug_scale, aug_pert, aug_tran] 

In [100]:
bench_tensors_stack = tf.concat(bench_tensors_list, axis=0)


In [101]:
bench_tensors_stack.shape

TensorShape([1500, 1200, 23, 3])

Squat data set generation 

In [102]:
aug_scale_squat = generate_scaled_samples(tensor_data_squat, 200)
aug_scale_squat.shape
aug_pert_squat = generate_augmented_samples(tensor_data_squat, 200)
aug_pert_squat.shape
aug_tran_squat = generate_rand_transl_samples(tensor_data_squat, 200)
aug_tran_squat.shape
squat_tensors_list = [aug_scale_squat, aug_pert_squat, aug_tran_squat] 
squat_tensors_stack = tf.concat(squat_tensors_list, axis=0)

squat_tensors_stack.shape

TensorShape([1200, 1200, 23, 3])

### Data set train/test split

Data labeling

In [103]:
import numpy as np

num_bench_samples = bench_tensors_stack.shape[0]
num_squat_samples = squat_tensors_stack.shape[0]

bench_labels = np.zeros(num_bench_samples) # Label 0 for bench samples
squat_labels = np.ones(num_squat_samples) # Label 1 for squat samples

# Combine labels and tensors
all_labels = np.concatenate([bench_labels, squat_labels], axis=0)
all_tensors = np.concatenate([bench_tensors_stack, squat_tensors_stack], axis=0)


Data split

In [104]:
import tensorflow as tf

# Assuming you have 'all_tensors' and 'all_labels'

# Combine tensors and labels into a dataset
dataset = tf.data.Dataset.from_tensor_slices((all_tensors, all_labels))

# Split the dataset into train and test sets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset = dataset.take(train_size)
test_dataset = dataset.skip(train_size)

batch_size = 32

# Optionally, shuffle and batch the datasets
train_dataset = train_dataset.shuffle(buffer_size=train_size).batch(batch_size)
test_dataset = test_dataset.batch(batch_size)


In [105]:
train_dataset

<_BatchDataset element_spec=(TensorSpec(shape=(None, 1200, 23, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float64, name=None))>

### Model training

In [107]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Flatten, Dense
from tensorflow.keras.models import Sequential

# Define your model
model = Sequential([
 Conv2D(32, (3, 3), activation='relu', input_shape=(1200, 23, 3)),
 Flatten(),
 Dense(64, activation='relu'),
 Dense(2, activation='softmax')
])

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(train_dataset, epochs=5)

# Evaluate the model
loss, accuracy = model.evaluate(test_dataset)
print("Test Loss:", loss)
print("Test Accuracy:", accuracy)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Test Loss: 0.0
Test Accuracy: 1.0


In [None]:
import tensorflow as tf

In [108]:
from tensorflow.keras.models import load_model


In [109]:

# Assuming 'model' is your trained model
model.save('model/my_model.keras')

In [110]:
loaded_model = load_model('model/my_model.keras') 

In [141]:
loaded_model.summary()


Model: "sequential_3"
_________________________________________________________________
 Layer (type) Output Shape Param # 
 conv2d_3 (Conv2D) (None, 1198, 21, 32) 896 
 
 flatten_3 (Flatten) (None, 805056) 0 
 
 dense_6 (Dense) (None, 64) 51523648 
 
 dense_7 (Dense) (None, 2) 130 
 
Total params: 51,524,674
Trainable params: 51,524,674
Non-trainable params: 0
_________________________________________________________________


In [142]:
loaded_model.input_shape

(None, 1200, 23, 3)

### Model samples prediction

In [None]:
input_data_squat = tensor_data[0]

In [None]:
tensor_data[0].shape

TensorShape([1200, 23, 3])

In [143]:
transformed_tensor = tf.expand_dims(tensor_data_squat[0], axis=0)


In [144]:
transformed_tensor.shape

TensorShape([1, 1200, 23, 3])

In [145]:
predictions = loaded_model.predict(transformed_tensor)




In [146]:
print(predictions)

[[0. 1.]]


In [None]:
# Label 0 for bench samples
# Label 1 for squat samples

In [147]:
probabilities = tf.nn.softmax(predictions)

In [148]:
probabilities



In [149]:
pn = probabilities.numpy()


In [150]:
pn

array([[0.26894143, 0.7310586 ]], dtype=float32)

In [151]:
probabilities.numpy()[0]


array([0.26894143, 0.7310586 ], dtype=float32)

In [132]:
labels = ['bench', 'squat']

In [152]:
prob_dict = {}
for i, label in enumerate(labels):
 prob_dict[label] = probabilities.numpy()[0][i]

print(prob_dict)







{'bench': 0.26894143, 'squat': 0.7310586}
