File size: 10,893 Bytes
e817788
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import numpy as np
import pandas as pd
import cv2 # for camera feed
import mediapipe as mp # for accessing and reading from webcam
import tensorflow as tf

# developer modules
from params import  LENGTH, DROP_Z, averaging_sets, point_landmarks_left, point_landmarks_right, FLATTEN, INPUT_SHAPE, RIGHT_HAND, LEFT_HAND, PADDING, CONSTANT_VALUE

# Initiate mediapipe model and utils
mp_holistic = mp.solutions.holistic # holistic model
mp_drawing = mp.solutions.drawing_utils # drawing utilities


# ------------------------------
# Mediapipe
# ------------------------------

# function to extract coordinates (+visibility) of all landmarks --> keypoints
# and concatenates everything into a flattened list 
def extract_keypoints(results): 
    face = np.array([[r.x, r.y, r.z] for r in results.face_landmarks.landmark]) if results.face_landmarks else np.zeros([468, 3])
    left_hand = np.array([[r.x, r.y, r.z] for r in results.left_hand_landmarks.landmark]) if results.left_hand_landmarks else np.zeros([21, 3])
    pose = np.array([[r.x, r.y, r.z] for r in results.pose_landmarks.landmark]) if results.pose_landmarks else np.zeros([33, 3]) # x, y, z and extra value visibility
    right_hand = np.array([[r.x, r.y, r.z] for r in results.right_hand_landmarks.landmark]) if results.right_hand_landmarks else np.zeros([21, 3])
    return np.concatenate([face, left_hand, pose, right_hand]) # original code
    # a flattened list with list of all face, left_hand, pose, right_hand landmark x, y, z, (+visibility) coordinates


# ------------------------------
# Visualization
# ------------------------------

# function to draw landmarks points and connecting lines on top of an image, e.g. on top of your camera feed
def draw_styled_landmarks(image, results): 
    # draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                              mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                              mp_drawing.DrawingSpec(color=(224,208,64), thickness=1, circle_radius=1))
    # draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                              mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                              mp_drawing.DrawingSpec(color=(224,208,64), thickness=2, circle_radius=2)) 
    # draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                              mp_drawing.DrawingSpec(color=(224,208,64), thickness=2, circle_radius=4), 
                              mp_drawing.DrawingSpec(color=(235,206,135), thickness=2, circle_radius=2)) 
    # draw right hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                              mp_drawing.DrawingSpec(color=(224,208,64), thickness=2, circle_radius=4), 
                              mp_drawing.DrawingSpec(color=(128,128,240), thickness=2, circle_radius=2))
 
# function to visualize predicted word probabilities with a dynamic real-time bar chart
def prob_viz(pred, SELECTED_SIGNS, input_frame): 
    output_frame = input_frame.copy() 
    bar_zero = 15
    
    for num, prob in enumerate(pred): 
        cv2.rectangle(output_frame, 
                      pt1=(bar_zero, 65+num*50), 
                      pt2=(bar_zero+int(prob*100*5), 95+num*50), 
                      color=(200, 200, 200), thickness=-1)
        # cv2.rectangle(image, start_point, end_point, color, thickness)
        cv2.putText(img=output_frame, 
                    text=SELECTED_SIGNS[num], 
                    org=(bar_zero, 90+num*50), 
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, 
                    color=(50, 50, 50), 
                    thickness=1, lineType=cv2.LINE_AA)
        # cv2.putText(image, 'OpenCV', org, font, fontScale, color, thickness, cv2.LINE_AA)
    return output_frame


# ------------------------------
# Pre-processing
# ------------------------------

# helper function for pre-processing
def tf_nan_mean(x, axis=0):
    #calculates the mean of a TensorFlow tensor x along a specified axis while ignoring any NaN values in the tensor.
    return tf.reduce_sum(tf.where(tf.math.is_nan(x), tf.zeros_like(x), x), axis=axis) / tf.reduce_sum(tf.where(tf.math.is_nan(x), tf.zeros_like(x), tf.ones_like(x)), axis=axis)

# helper function for pre-processing
def right_hand_percentage(x):
    #calculates percentage of right hand usage
    right = tf.gather(x, RIGHT_HAND, axis=1)
    left = tf.gather(x, LEFT_HAND, axis=1)
    right_count = tf.reduce_sum(tf.where(tf.math.is_nan(right), tf.zeros_like(right), tf.ones_like(right)))
    left_count = tf.reduce_sum(tf.where(tf.math.is_nan(left), tf.zeros_like(left), tf.ones_like(left)))
    return right_count / (left_count+right_count)

#generating preprocessing layer that will be added to final model
class FeatureGen(tf.keras.layers.Layer):
    #defines custom tensorflow layer 
    def __init__(self):
        #initializes layer
        super(FeatureGen, self).__init__()
    
    def call(self, x_in, MIRROR=False):
        #drop z coordinates if required
        if DROP_Z:
            x_in = x_in[:, :, 0:2]
        if MIRROR:
            #flipping x coordinates
            x_in = np.array(x_in)
            x_in[:, :, 0] = (x_in[:, :, 0]-1)*(-1)
            x_in = tf.convert_to_tensor(x_in)

        #generates list with mean values for landmarks that will be merged
        x_list = [tf.expand_dims(tf_nan_mean(x_in[:, av_set[0]:av_set[0]+av_set[1], :], axis=1), axis=1) for av_set in averaging_sets]
        
        #extracts specific columns from input x_in defined by landmarks
        handedness = right_hand_percentage(x_in)
        if handedness > 0.5:
            x_list.append(tf.gather(x_in, point_landmarks_right, axis=1))
        else: 
            x_list.append(tf.gather(x_in, point_landmarks_left, axis=1))

        #concatenates the two tensors from above along axis 1/columns
        x = tf.concat(x_list, 1)

        #padding to desired length of sequence (defined by LENGTH)
        #get current number of rows
        x_padded = x
        current_rows = tf.shape(x_padded)[0]
        #if current number of rows is greater than desired number of rows, truncate excess rows
        if current_rows > LENGTH:
            x_padded = x_padded[:LENGTH, :, :]

        #if current number of rows is less than desired number of rows, add padding
        elif current_rows < LENGTH:
            #calculate amount of padding needed
            pad_rows = LENGTH - current_rows

            if PADDING ==4: #copy first/last frame
                if pad_rows %2 == 0: #if pad_rows is even
                    padding_front = tf.repeat(x_padded[0:1, :], pad_rows//2, axis=0)
                    padding_back = tf.repeat(x_padded[-1:, :], pad_rows//2, axis=0)
                else: #if pad_rows is odd
                    padding_front = tf.repeat(x_padded[0:1, :], (pad_rows//2)+1, axis=0)
                    padding_back = tf.repeat(x_padded[-1:, :], pad_rows//2, axis=0)
                x_padded = tf.concat([padding_front, x_padded, padding_back], axis=0)
            elif PADDING == 5: #copy last frame
                padding_back = tf.repeat(x_padded[-1:, :], pad_rows, axis=0)
                x_padded = tf.concat([x_padded, padding_back], axis=0)
            else:
                if PADDING ==1: #padding at start and end
                    if pad_rows %2 == 0: #if pad_rows is even
                        paddings = [[pad_rows//2, pad_rows//2], [0, 0], [0, 0]]
                    else: #if pad_rows is odd
                        paddings = [[pad_rows//2+1, pad_rows//2], [0, 0], [0, 0]]
                elif PADDING ==2: #padding only at the end of sequence
                    paddings = [[0, pad_rows], [0, 0], [0, 0]]
                elif PADDING ==3: #no padding
                    paddings = [[0, 0], [0, 0], [0, 0]]
                x_padded = tf.pad(x_padded, paddings, mode='CONSTANT', constant_values=CONSTANT_VALUE)

        x = x_padded
        current_rows = tf.shape(x)[0]

        #interpolate single missing values
        x = pd.DataFrame(np.array(x).flatten()).interpolate(method='linear', limit=2, limit_direction='both')
        #fill missing values with zeros
        x = tf.where(tf.math.is_nan(x), tf.zeros_like(x), x)
        
        #reshape data to 2D or 3D array
        if FLATTEN:
            x = tf.reshape(x, (1, current_rows*INPUT_SHAPE[1]))
        else:
            x = tf.reshape(x, (1, current_rows, INPUT_SHAPE[1]))

        return x

#define converter using generated layer
feature_converter = FeatureGen()


# ------------------------------
# Real-time prediction 
# ------------------------------

def real_time_prediction(results, sequence, predictions, threshold, LENGTH, MODEL, SELECTED_LABELS, TRANSITION_FRAMES, SELECTED_SIGNS): 
    sign = ''
    prob = 0

    # Extract key points into a sequence
    keypoints = extract_keypoints(results) # extract keypoints x, y, z for face, left_hand, pose, right_hand from mediapipe holistic predictions, keypoints.shape e.g. (543, 3)
    sequence.append(keypoints) # keep appending keypoints (frames) to a sequence, np.array(sequence).shape e.g. (22, 543, 3)
    sequence = sequence[-LENGTH:] # takes last e.g. 22 frames of the sequence

    # Predict upon full sequence
    if len(sequence) == LENGTH: 
        # pre-processing
        model_input = feature_converter(np.array(sequence))
        #print(f'OMG! Frenzy Franzi is converting your mediapipe input! See how the shape is changing from {np.array(sequence).shape} to {model_input.shape}! SO AWESOME!!!')
        
        # prediction
        pred = MODEL.predict(model_input)[0] # MODEL.fit() expects something in shape (num_sequences, 30, 1662), e.g. (1, 30, 1662) for a single sequence                    
        pred = pred[SELECTED_LABELS] # selects only a subset of signs, as defined in SELECTED_LABELS
        predictions.append(np.argmax(pred)) # appends all predictions

        # 3. Visualization logic
        # makes sure the last x frames had the same prediction (more stable transition from one sign to another) 
        if np.unique(predictions[-TRANSITION_FRAMES:])[0]==np.argmax(pred): 
            # if the confidence of the most confident prediction is above threshold
            if pred[np.argmax(pred)] > threshold: 
                sign = SELECTED_SIGNS[np.argmax(pred)]
                prob = pred[np.argmax(pred)]
                prob = np.round(float(prob), 2)
            else: 
                sign = ' '
                prob = 0
                
    return sign, prob
                


# ------------------------------
# Streamlit
# ------------------------------