handsUp-backend / wordsController.py
mutarisi
fixed wordController
6c95b02
import cv2
import numpy as np
import pandas as pd
from tensorflow.keras.models import load_model
import mediapipe as mp
from typing import List
# We need to import bytes explicitly if using type hinting for clarity
from builtins import bytes
MODEL_PATH = 'ai_model/words/saved_models/best_sign_classifier_model_125_words_seq90.keras'
CSV_PATH = 'ai_model/words/wlasl_125_words_personal_final_processed_data_augmented_seq90.csv'
SEQUENCE_LENGTH = 90
EXPECTED_COORDS_PER_FRAME = 1662
CONFIDENCE_THRESHOLD = 0.1
model = load_model(MODEL_PATH)
df = pd.read_csv(CSV_PATH)
unique_glosses = df['gloss'].unique()
id_to_gloss = {i: g for i, g in enumerate(unique_glosses)}
# Initialize MediaPipe Holistic once
mp_holistic = mp.solutions.holistic.Holistic(
static_image_mode=True,
model_complexity=1,
min_detection_confidence=0.2,
min_tracking_confidence=0.5
)
NUM_POSE_COORDS_SINGLE = 33*4
NUM_HAND_COORDS_SINGLE = 21*3
NUM_FACE_COORDS_SINGLE = 468*3
def normalize_landmarks(landmarks_sequence):
"""Normalizes landmark coordinates for model input."""
if landmarks_sequence.ndim == 1:
landmarks_sequence = np.expand_dims(landmarks_sequence, axis=0)
normalized_sequences = []
for frame_landmarks in landmarks_sequence:
if np.all(frame_landmarks == 0):
normalized_sequences.append(np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32))
continue
pose_coords_flat = frame_landmarks[0 : NUM_POSE_COORDS_SINGLE]
left_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE]
right_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2]
face_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2 : ]
all_parts_data = [
(pose_coords_flat, 4, [0.0]*NUM_POSE_COORDS_SINGLE),
(left_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE),
(right_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE),
(face_coords_flat, 3, [0.0]*NUM_FACE_COORDS_SINGLE)
]
normalized_frame_parts = []
for flat_lms, coords_per_lm, template in all_parts_data:
if np.all(flat_lms==0):
normalized_frame_parts.append(np.array(template, dtype=np.float32))
continue
lms_array = flat_lms.reshape(-1, coords_per_lm)
coords_for_mean = lms_array[:, :3] if coords_per_lm==4 else lms_array
mean_coords = np.mean(coords_for_mean, axis=0)
translated_lms = lms_array.copy()
translated_lms[:, :3] -= mean_coords
scale_factor = np.max(np.linalg.norm(translated_lms[:, :3], axis=1))
if scale_factor > 1e-6:
translated_lms[:, :3] /= scale_factor
normalized_frame_parts.append(translated_lms.flatten())
combined_frame = np.concatenate(normalized_frame_parts).astype(np.float32)
if len(combined_frame) < EXPECTED_COORDS_PER_FRAME:
combined_frame = np.pad(combined_frame, (0, EXPECTED_COORDS_PER_FRAME - len(combined_frame)), 'constant')
elif len(combined_frame) > EXPECTED_COORDS_PER_FRAME:
combined_frame = combined_frame[:EXPECTED_COORDS_PER_FRAME]
normalized_sequences.append(combined_frame)
return np.array(normalized_sequences, dtype=np.float32)
def pad_or_truncate_sequence(sequence, target_length, feature_dimension):
"""Ensures the sequence has the exact target_length."""
if sequence.shape[0] < target_length:
padding = np.zeros((target_length - sequence.shape[0], feature_dimension), dtype=np.float32)
return np.vstack((sequence, padding))
return sequence[:target_length, :]
def detectWords(image_bytes_list: List[bytes]):
"""
Detects sign language words from a sequence of frames provided as image bytes.
The input sequenceList is a list of raw image bytes from the FastAPI endpoint.
"""
results_dict = {}
sequence = []
# Rename loop variable to reflect the actual content (bytes)
for idx, image_bytes in enumerate(image_bytes_list):
# --------------------------------------------------------
# CRITICAL FIX: Use cv2.imdecode to read bytes from memory
# --------------------------------------------------------
np_arr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if img is None:
print(f"Warning: Could not decode image in frame {idx}")
continue
# OLD LINE (Removed): img = cv2.imread(path)
# --------------------------------------------------------
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
mp_results = mp_holistic.process(img_rgb)
frame_lms = np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32)
current_idx = 0
# Extract and flatten landmarks
if mp_results.pose_landmarks:
pose_flat = [coord for lm in mp_results.pose_landmarks.landmark for coord in [lm.x, lm.y, lm.z, lm.visibility]]
frame_lms[current_idx:current_idx+len(pose_flat)] = pose_flat
else:
print(f"Warning: No pose landmarks detected in frame {idx}")
current_idx += NUM_POSE_COORDS_SINGLE
if mp_results.left_hand_landmarks:
lh_flat = [coord for lm in mp_results.left_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
frame_lms[current_idx:current_idx+len(lh_flat)] = lh_flat
else:
print(f"Warning: No left hand landmarks detected in frame {idx}")
current_idx += NUM_HAND_COORDS_SINGLE
if mp_results.right_hand_landmarks:
rh_flat = [coord for lm in mp_results.right_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
frame_lms[current_idx:current_idx+len(rh_flat)] = rh_flat
else:
print(f"Warning: No right hand landmarks detected in frame {idx}")
current_idx += NUM_HAND_COORDS_SINGLE
if mp_results.face_landmarks:
face_flat = [coord for lm in mp_results.face_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
frame_lms[current_idx:current_idx+len(face_flat)] = face_flat
else:
# Note: This print statement is a duplicate and should likely be Face landmarks based on index check
print(f"Warning: No face landmarks detected in frame {idx}")
# current_idx += NUM_FACE_COORDS_SINGLE (Not needed as it's the last step before appending)
sequence.append(frame_lms)
# 1. Normalize the full sequence
sequence = normalize_landmarks(np.array(sequence, dtype=np.float32))
# 2. Pad/truncate to match model input length
sequence = pad_or_truncate_sequence(sequence, SEQUENCE_LENGTH, EXPECTED_COORDS_PER_FRAME)
# 3. Reshape for model prediction (batch dimension)
sequence = np.expand_dims(sequence, axis=0)
# Prediction
preds = model.predict(sequence, verbose=0)
predicted_id = int(np.argmax(preds))
confidence = float(np.max(preds))
predicted_word = id_to_gloss.get(predicted_id, "Unknown")
result = {"word": predicted_word if confidence >= CONFIDENCE_THRESHOLD else "",
"confidence": confidence}
print(f"Prediction result: {result}")
return result