Spaces:
Sleeping
Sleeping
File size: 7,445 Bytes
de8ea8e 6c95b02 de8ea8e 641b34a de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 de8ea8e 6c95b02 641b34a d958b94 641b34a de8ea8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import cv2
import numpy as np
import pandas as pd
from tensorflow.keras.models import load_model
import mediapipe as mp
from typing import List
# We need to import bytes explicitly if using type hinting for clarity
from builtins import bytes
MODEL_PATH = 'ai_model/words/saved_models/best_sign_classifier_model_125_words_seq90.keras'
CSV_PATH = 'ai_model/words/wlasl_125_words_personal_final_processed_data_augmented_seq90.csv'
SEQUENCE_LENGTH = 90
EXPECTED_COORDS_PER_FRAME = 1662
CONFIDENCE_THRESHOLD = 0.1
model = load_model(MODEL_PATH)
df = pd.read_csv(CSV_PATH)
unique_glosses = df['gloss'].unique()
id_to_gloss = {i: g for i, g in enumerate(unique_glosses)}
# Initialize MediaPipe Holistic once
mp_holistic = mp.solutions.holistic.Holistic(
static_image_mode=True,
model_complexity=1,
min_detection_confidence=0.2,
min_tracking_confidence=0.5
)
NUM_POSE_COORDS_SINGLE = 33*4
NUM_HAND_COORDS_SINGLE = 21*3
NUM_FACE_COORDS_SINGLE = 468*3
def normalize_landmarks(landmarks_sequence):
"""Normalizes landmark coordinates for model input."""
if landmarks_sequence.ndim == 1:
landmarks_sequence = np.expand_dims(landmarks_sequence, axis=0)
normalized_sequences = []
for frame_landmarks in landmarks_sequence:
if np.all(frame_landmarks == 0):
normalized_sequences.append(np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32))
continue
pose_coords_flat = frame_landmarks[0 : NUM_POSE_COORDS_SINGLE]
left_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE]
right_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2]
face_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2 : ]
all_parts_data = [
(pose_coords_flat, 4, [0.0]*NUM_POSE_COORDS_SINGLE),
(left_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE),
(right_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE),
(face_coords_flat, 3, [0.0]*NUM_FACE_COORDS_SINGLE)
]
normalized_frame_parts = []
for flat_lms, coords_per_lm, template in all_parts_data:
if np.all(flat_lms==0):
normalized_frame_parts.append(np.array(template, dtype=np.float32))
continue
lms_array = flat_lms.reshape(-1, coords_per_lm)
coords_for_mean = lms_array[:, :3] if coords_per_lm==4 else lms_array
mean_coords = np.mean(coords_for_mean, axis=0)
translated_lms = lms_array.copy()
translated_lms[:, :3] -= mean_coords
scale_factor = np.max(np.linalg.norm(translated_lms[:, :3], axis=1))
if scale_factor > 1e-6:
translated_lms[:, :3] /= scale_factor
normalized_frame_parts.append(translated_lms.flatten())
combined_frame = np.concatenate(normalized_frame_parts).astype(np.float32)
if len(combined_frame) < EXPECTED_COORDS_PER_FRAME:
combined_frame = np.pad(combined_frame, (0, EXPECTED_COORDS_PER_FRAME - len(combined_frame)), 'constant')
elif len(combined_frame) > EXPECTED_COORDS_PER_FRAME:
combined_frame = combined_frame[:EXPECTED_COORDS_PER_FRAME]
normalized_sequences.append(combined_frame)
return np.array(normalized_sequences, dtype=np.float32)
def pad_or_truncate_sequence(sequence, target_length, feature_dimension):
"""Ensures the sequence has the exact target_length."""
if sequence.shape[0] < target_length:
padding = np.zeros((target_length - sequence.shape[0], feature_dimension), dtype=np.float32)
return np.vstack((sequence, padding))
return sequence[:target_length, :]
def detectWords(image_bytes_list: List[bytes]):
"""
Detects sign language words from a sequence of frames provided as image bytes.
The input sequenceList is a list of raw image bytes from the FastAPI endpoint.
"""
results_dict = {}
sequence = []
# Rename loop variable to reflect the actual content (bytes)
for idx, image_bytes in enumerate(image_bytes_list):
# --------------------------------------------------------
# CRITICAL FIX: Use cv2.imdecode to read bytes from memory
# --------------------------------------------------------
np_arr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if img is None:
print(f"Warning: Could not decode image in frame {idx}")
continue
# OLD LINE (Removed): img = cv2.imread(path)
# --------------------------------------------------------
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
mp_results = mp_holistic.process(img_rgb)
frame_lms = np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32)
current_idx = 0
# Extract and flatten landmarks
if mp_results.pose_landmarks:
pose_flat = [coord for lm in mp_results.pose_landmarks.landmark for coord in [lm.x, lm.y, lm.z, lm.visibility]]
frame_lms[current_idx:current_idx+len(pose_flat)] = pose_flat
else:
print(f"Warning: No pose landmarks detected in frame {idx}")
current_idx += NUM_POSE_COORDS_SINGLE
if mp_results.left_hand_landmarks:
lh_flat = [coord for lm in mp_results.left_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
frame_lms[current_idx:current_idx+len(lh_flat)] = lh_flat
else:
print(f"Warning: No left hand landmarks detected in frame {idx}")
current_idx += NUM_HAND_COORDS_SINGLE
if mp_results.right_hand_landmarks:
rh_flat = [coord for lm in mp_results.right_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
frame_lms[current_idx:current_idx+len(rh_flat)] = rh_flat
else:
print(f"Warning: No right hand landmarks detected in frame {idx}")
current_idx += NUM_HAND_COORDS_SINGLE
if mp_results.face_landmarks:
face_flat = [coord for lm in mp_results.face_landmarks.landmark for coord in [lm.x, lm.y, lm.z]]
frame_lms[current_idx:current_idx+len(face_flat)] = face_flat
else:
# Note: This print statement is a duplicate and should likely be Face landmarks based on index check
print(f"Warning: No face landmarks detected in frame {idx}")
# current_idx += NUM_FACE_COORDS_SINGLE (Not needed as it's the last step before appending)
sequence.append(frame_lms)
# 1. Normalize the full sequence
sequence = normalize_landmarks(np.array(sequence, dtype=np.float32))
# 2. Pad/truncate to match model input length
sequence = pad_or_truncate_sequence(sequence, SEQUENCE_LENGTH, EXPECTED_COORDS_PER_FRAME)
# 3. Reshape for model prediction (batch dimension)
sequence = np.expand_dims(sequence, axis=0)
# Prediction
preds = model.predict(sequence, verbose=0)
predicted_id = int(np.argmax(preds))
confidence = float(np.max(preds))
predicted_word = id_to_gloss.get(predicted_id, "Unknown")
result = {"word": predicted_word if confidence >= CONFIDENCE_THRESHOLD else "",
"confidence": confidence}
print(f"Prediction result: {result}")
return result
|