Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| from tensorflow.keras.models import load_model | |
| import mediapipe as mp | |
| from typing import List | |
| # We need to import bytes explicitly if using type hinting for clarity | |
| from builtins import bytes | |
| MODEL_PATH = 'ai_model/words/saved_models/best_sign_classifier_model_125_words_seq90.keras' | |
| CSV_PATH = 'ai_model/words/wlasl_125_words_personal_final_processed_data_augmented_seq90.csv' | |
| SEQUENCE_LENGTH = 90 | |
| EXPECTED_COORDS_PER_FRAME = 1662 | |
| CONFIDENCE_THRESHOLD = 0.1 | |
| model = load_model(MODEL_PATH) | |
| df = pd.read_csv(CSV_PATH) | |
| unique_glosses = df['gloss'].unique() | |
| id_to_gloss = {i: g for i, g in enumerate(unique_glosses)} | |
| # Initialize MediaPipe Holistic once | |
| mp_holistic = mp.solutions.holistic.Holistic( | |
| static_image_mode=True, | |
| model_complexity=1, | |
| min_detection_confidence=0.2, | |
| min_tracking_confidence=0.5 | |
| ) | |
| NUM_POSE_COORDS_SINGLE = 33*4 | |
| NUM_HAND_COORDS_SINGLE = 21*3 | |
| NUM_FACE_COORDS_SINGLE = 468*3 | |
| def normalize_landmarks(landmarks_sequence): | |
| """Normalizes landmark coordinates for model input.""" | |
| if landmarks_sequence.ndim == 1: | |
| landmarks_sequence = np.expand_dims(landmarks_sequence, axis=0) | |
| normalized_sequences = [] | |
| for frame_landmarks in landmarks_sequence: | |
| if np.all(frame_landmarks == 0): | |
| normalized_sequences.append(np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32)) | |
| continue | |
| pose_coords_flat = frame_landmarks[0 : NUM_POSE_COORDS_SINGLE] | |
| left_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE] | |
| right_hand_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE : NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2] | |
| face_coords_flat = frame_landmarks[NUM_POSE_COORDS_SINGLE + NUM_HAND_COORDS_SINGLE*2 : ] | |
| all_parts_data = [ | |
| (pose_coords_flat, 4, [0.0]*NUM_POSE_COORDS_SINGLE), | |
| (left_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE), | |
| (right_hand_coords_flat, 3, [0.0]*NUM_HAND_COORDS_SINGLE), | |
| (face_coords_flat, 3, [0.0]*NUM_FACE_COORDS_SINGLE) | |
| ] | |
| normalized_frame_parts = [] | |
| for flat_lms, coords_per_lm, template in all_parts_data: | |
| if np.all(flat_lms==0): | |
| normalized_frame_parts.append(np.array(template, dtype=np.float32)) | |
| continue | |
| lms_array = flat_lms.reshape(-1, coords_per_lm) | |
| coords_for_mean = lms_array[:, :3] if coords_per_lm==4 else lms_array | |
| mean_coords = np.mean(coords_for_mean, axis=0) | |
| translated_lms = lms_array.copy() | |
| translated_lms[:, :3] -= mean_coords | |
| scale_factor = np.max(np.linalg.norm(translated_lms[:, :3], axis=1)) | |
| if scale_factor > 1e-6: | |
| translated_lms[:, :3] /= scale_factor | |
| normalized_frame_parts.append(translated_lms.flatten()) | |
| combined_frame = np.concatenate(normalized_frame_parts).astype(np.float32) | |
| if len(combined_frame) < EXPECTED_COORDS_PER_FRAME: | |
| combined_frame = np.pad(combined_frame, (0, EXPECTED_COORDS_PER_FRAME - len(combined_frame)), 'constant') | |
| elif len(combined_frame) > EXPECTED_COORDS_PER_FRAME: | |
| combined_frame = combined_frame[:EXPECTED_COORDS_PER_FRAME] | |
| normalized_sequences.append(combined_frame) | |
| return np.array(normalized_sequences, dtype=np.float32) | |
| def pad_or_truncate_sequence(sequence, target_length, feature_dimension): | |
| """Ensures the sequence has the exact target_length.""" | |
| if sequence.shape[0] < target_length: | |
| padding = np.zeros((target_length - sequence.shape[0], feature_dimension), dtype=np.float32) | |
| return np.vstack((sequence, padding)) | |
| return sequence[:target_length, :] | |
| def detectWords(image_bytes_list: List[bytes]): | |
| """ | |
| Detects sign language words from a sequence of frames provided as image bytes. | |
| The input sequenceList is a list of raw image bytes from the FastAPI endpoint. | |
| """ | |
| results_dict = {} | |
| sequence = [] | |
| # Rename loop variable to reflect the actual content (bytes) | |
| for idx, image_bytes in enumerate(image_bytes_list): | |
| # -------------------------------------------------------- | |
| # CRITICAL FIX: Use cv2.imdecode to read bytes from memory | |
| # -------------------------------------------------------- | |
| np_arr = np.frombuffer(image_bytes, np.uint8) | |
| img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) | |
| if img is None: | |
| print(f"Warning: Could not decode image in frame {idx}") | |
| continue | |
| # OLD LINE (Removed): img = cv2.imread(path) | |
| # -------------------------------------------------------- | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| mp_results = mp_holistic.process(img_rgb) | |
| frame_lms = np.zeros(EXPECTED_COORDS_PER_FRAME, dtype=np.float32) | |
| current_idx = 0 | |
| # Extract and flatten landmarks | |
| if mp_results.pose_landmarks: | |
| pose_flat = [coord for lm in mp_results.pose_landmarks.landmark for coord in [lm.x, lm.y, lm.z, lm.visibility]] | |
| frame_lms[current_idx:current_idx+len(pose_flat)] = pose_flat | |
| else: | |
| print(f"Warning: No pose landmarks detected in frame {idx}") | |
| current_idx += NUM_POSE_COORDS_SINGLE | |
| if mp_results.left_hand_landmarks: | |
| lh_flat = [coord for lm in mp_results.left_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]] | |
| frame_lms[current_idx:current_idx+len(lh_flat)] = lh_flat | |
| else: | |
| print(f"Warning: No left hand landmarks detected in frame {idx}") | |
| current_idx += NUM_HAND_COORDS_SINGLE | |
| if mp_results.right_hand_landmarks: | |
| rh_flat = [coord for lm in mp_results.right_hand_landmarks.landmark for coord in [lm.x, lm.y, lm.z]] | |
| frame_lms[current_idx:current_idx+len(rh_flat)] = rh_flat | |
| else: | |
| print(f"Warning: No right hand landmarks detected in frame {idx}") | |
| current_idx += NUM_HAND_COORDS_SINGLE | |
| if mp_results.face_landmarks: | |
| face_flat = [coord for lm in mp_results.face_landmarks.landmark for coord in [lm.x, lm.y, lm.z]] | |
| frame_lms[current_idx:current_idx+len(face_flat)] = face_flat | |
| else: | |
| # Note: This print statement is a duplicate and should likely be Face landmarks based on index check | |
| print(f"Warning: No face landmarks detected in frame {idx}") | |
| # current_idx += NUM_FACE_COORDS_SINGLE (Not needed as it's the last step before appending) | |
| sequence.append(frame_lms) | |
| # 1. Normalize the full sequence | |
| sequence = normalize_landmarks(np.array(sequence, dtype=np.float32)) | |
| # 2. Pad/truncate to match model input length | |
| sequence = pad_or_truncate_sequence(sequence, SEQUENCE_LENGTH, EXPECTED_COORDS_PER_FRAME) | |
| # 3. Reshape for model prediction (batch dimension) | |
| sequence = np.expand_dims(sequence, axis=0) | |
| # Prediction | |
| preds = model.predict(sequence, verbose=0) | |
| predicted_id = int(np.argmax(preds)) | |
| confidence = float(np.max(preds)) | |
| predicted_word = id_to_gloss.get(predicted_id, "Unknown") | |
| result = {"word": predicted_word if confidence >= CONFIDENCE_THRESHOLD else "", | |
| "confidence": confidence} | |
| print(f"Prediction result: {result}") | |
| return result | |