File size: 7,445 Bytes
de8ea8e
 
 
 
 
6c95b02
 
 
de8ea8e
 
 
 
 
 
 
641b34a
 
 
 
de8ea8e
6c95b02
de8ea8e
 
 
 
 
 
 
 
 
 
 
 
6c95b02
de8ea8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c95b02
de8ea8e
 
 
 
 
6c95b02
 
 
 
 
 
de8ea8e
 
 
6c95b02
 
 
 
 
 
 
 
de8ea8e
6c95b02
de8ea8e
 
6c95b02
 
 
de8ea8e
 
 
 
 
 
6c95b02
de8ea8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c95b02
 
 
 
de8ea8e
 
 
6c95b02
de8ea8e
6c95b02
 
de8ea8e
6c95b02
 
de8ea8e
 
6c95b02
641b34a
 
 
d958b94
641b34a
de8ea8e
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import cv2
import numpy as np
import pandas as pd
from tensorflow.keras.models import load_model
import mediapipe as mp
from typing import List
# We need to import bytes explicitly if using type hinting for clarity
from builtins import bytes 

MODEL_PATH = 'ai_model/words/saved_models/best_sign_classifier_model_125_words_seq90.keras'
CSV_PATH = 'ai_model/words/wlasl_125_words_personal_final_processed_data_augmented_seq90.csv'
SEQUENCE_LENGTH = 90
EXPECTED_COORDS_PER_FRAME = 1662
CONFIDENCE_THRESHOLD = 0.1

model = load_model(MODEL_PATH)
df = pd.read_csv(CSV_PATH)
unique_glosses = df['gloss'].unique()
id_to_gloss = {i: g for i, g in enumerate(unique_glosses)}

# Initialize MediaPipe Holistic once
mp_holistic = mp.solutions.holistic.Holistic(
    static_image_mode=True,
    model_complexity=1,
    min_detection_confidence=0.2,
    min_tracking_confidence=0.5
)

NUM_POSE_COORDS_SINGLE = 33*4
NUM_HAND_COORDS_SINGLE = 21*3
NUM_FACE_COORDS_SINGLE = 468*3

def normalize_landmarks(landmarks_sequence):
    """Normalizes landmark coordinates for model input."""
    if landmarks_sequence.ndim == 1:
        landmarks_sequence = np.expand_dims(landmarks_sequence, axis=0)

    normalized_sequences = []
    for frame_landmarks in landmarks_sequence:
        if np.all(frame_landmarks == 0):
            normalized_sequences.append(np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32))
            continue

        pose_coords_flat = frame_landmarks[0 : NUM_POSE_COORDS_SINGLE]
        left_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE]
        right_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2]
        face_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2 : ]

        all_parts_data = [
            (pose_coords_flat, 4, [0.0]*NUM_POSE_COORDS_SINGLE),
            (left_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE),
            (right_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE),
            (face_coords_flat, 3, [0.0]*NUM_FACE_COORDS_SINGLE)
        ]

        normalized_frame_parts = []
        for flat_lms, coords_per_lm, template in all_parts_data:
            if np.all(flat_lms==0):
                normalized_frame_parts.append(np.array(template, dtype=np.float32))
                continue

            lms_array = flat_lms.reshape(-1, coords_per_lm)
            coords_for_mean = lms_array[:, :3] if coords_per_lm==4 else lms_array
            mean_coords = np.mean(coords_for_mean, axis=0)
            translated_lms = lms_array.copy()
            translated_lms[:, :3] -= mean_coords
            scale_factor = np.max(np.linalg.norm(translated_lms[:, :3], axis=1))
            if scale_factor > 1e-6:
                translated_lms[:, :3] /= scale_factor
            normalized_frame_parts.append(translated_lms.flatten())

        combined_frame = np.concatenate(normalized_frame_parts).astype(np.float32)
        if len(combined_frame) < EXPECTED_COORDS_PER_FRAME:
            combined_frame = np.pad(combined_frame, (0, EXPECTED_COORDS_PER_FRAME - len(combined_frame)), 'constant')
        elif len(combined_frame) > EXPECTED_COORDS_PER_FRAME:
            combined_frame = combined_frame[:EXPECTED_COORDS_PER_FRAME]

        normalized_sequences.append(combined_frame)

    return np.array(normalized_sequences, dtype=np.float32)

def pad_or_truncate_sequence(sequence, target_length, feature_dimension):
    """Ensures the sequence has the exact target_length."""
    if sequence.shape[0] < target_length:
        padding = np.zeros((target_length - sequence.shape[0], feature_dimension), dtype=np.float32)
        return np.vstack((sequence, padding))
    return sequence[:target_length, :]

def detectWords(image_bytes_list: List[bytes]):
    """
    Detects sign language words from a sequence of frames provided as image bytes.
    
    The input sequenceList is a list of raw image bytes from the FastAPI endpoint.
    """
    results_dict = {}
    sequence = []

    # Rename loop variable to reflect the actual content (bytes)
    for idx, image_bytes in enumerate(image_bytes_list):
        # --------------------------------------------------------
        # CRITICAL FIX: Use cv2.imdecode to read bytes from memory
        # --------------------------------------------------------
        np_arr = np.frombuffer(image_bytes, np.uint8)
        img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)

        if img is None:
            print(f"Warning: Could not decode image in frame {idx}")
            continue

        # OLD LINE (Removed): img = cv2.imread(path) 
        # --------------------------------------------------------

        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mp_results = mp_holistic.process(img_rgb)

        frame_lms = np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32)
        current_idx = 0

        # Extract and flatten landmarks
        if mp_results.pose_landmarks:
            pose_flat = [coord for lm in mp_results.pose_landmarks.landmark for coord in [lm.x, lm.y, lm.z, lm.visibility]]
            frame_lms[current_idx:current_idx+len(pose_flat)] = pose_flat
        else: 
            print(f"Warning: No pose landmarks detected in frame {idx}")
        current_idx += NUM_POSE_COORDS_SINGLE

        if mp_results.left_hand_landmarks:
            lh_flat = [coord for lm in mp_results.left_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
            frame_lms[current_idx:current_idx+len(lh_flat)] = lh_flat
        else: 
            print(f"Warning: No left hand landmarks detected in frame {idx}")
        current_idx += NUM_HAND_COORDS_SINGLE

        if mp_results.right_hand_landmarks:
            rh_flat = [coord for lm in mp_results.right_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
            frame_lms[current_idx:current_idx+len(rh_flat)] = rh_flat
        else: 
            print(f"Warning: No right hand landmarks detected in frame {idx}")
        current_idx += NUM_HAND_COORDS_SINGLE

        if mp_results.face_landmarks:
            face_flat = [coord for lm in mp_results.face_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
            frame_lms[current_idx:current_idx+len(face_flat)] = face_flat
        else: 
            # Note: This print statement is a duplicate and should likely be Face landmarks based on index check
            print(f"Warning: No face landmarks detected in frame {idx}") 
        
        # current_idx += NUM_FACE_COORDS_SINGLE (Not needed as it's the last step before appending)

        sequence.append(frame_lms)

    # 1. Normalize the full sequence
    sequence = normalize_landmarks(np.array(sequence, dtype=np.float32))
    
    # 2. Pad/truncate to match model input length
    sequence = pad_or_truncate_sequence(sequence, SEQUENCE_LENGTH, EXPECTED_COORDS_PER_FRAME)
    
    # 3. Reshape for model prediction (batch dimension)
    sequence = np.expand_dims(sequence, axis=0) 

    # Prediction
    preds = model.predict(sequence, verbose=0)
    predicted_id = int(np.argmax(preds))
    confidence = float(np.max(preds))
    predicted_word = id_to_gloss.get(predicted_id, "Unknown")

    result = {"word": predicted_word if confidence >= CONFIDENCE_THRESHOLD else "",
              "confidence": confidence}
    
    print(f"Prediction result: {result}")
    return result