import cv2 import torch import numpy as np import mediapipe as mp import warnings import os import time import threading from collections import deque import torch.nn as nn import torch.nn.functional as F warnings.filterwarnings('ignore') class CNNTransformerHybrid(nn.Module): """EXACT SAME MODEL AS TRAINING""" def __init__(self, input_size=258, num_classes=51, dropout=0.35): super().__init__() # CNN FOR LOCAL FEATURES self.cnn = nn.Sequential( nn.Conv1d(input_size, 128, kernel_size=3, padding=1), nn.BatchNorm1d(128), nn.GELU(), nn.Dropout(dropout), nn.Conv1d(128, 256, kernel_size=3, padding=1), nn.BatchNorm1d(256), nn.GELU(), nn.Dropout(dropout), nn.Conv1d(256, 256, kernel_size=3, padding=1), nn.BatchNorm1d(256), nn.GELU(), nn.Dropout(dropout), nn.AdaptiveAvgPool1d(16) ) # TRANSFORMER FOR TEMPORAL DEPENDENCIES self.pos_embed = nn.Parameter(torch.randn(1, 16, 256)) self.pos_dropout = nn.Dropout(dropout) encoder_layer = nn.TransformerEncoderLayer( d_model=256, nhead=8, dim_feedforward=512, dropout=dropout, batch_first=True, norm_first=True, activation='gelu' ) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=3) # ATTENTION POOLING self.attention_pool = nn.MultiheadAttention( embed_dim=256, num_heads=4, dropout=dropout, batch_first=True ) self.pool_query = nn.Parameter(torch.randn(1, 1, 256)) # CLASSIFICATION HEAD self.classifier = nn.Sequential( nn.LayerNorm(256), nn.Linear(256, 512), nn.GELU(), nn.Dropout(dropout), nn.Linear(512, 256), nn.GELU(), nn.Dropout(dropout), nn.Linear(256, 128), nn.GELU(), nn.Dropout(dropout * 0.5), nn.Linear(128, num_classes) ) self._init_weights() def _init_weights(self): for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) def forward(self, x): batch_size = x.size(0) # CNN processing x = x.transpose(1, 2) x = self.cnn(x) x = x.transpose(1, 2) # Add positional encoding x = x + self.pos_embed x = self.pos_dropout(x) # Transformer encoding x = self.transformer(x) # Attention pooling query = self.pool_query.expand(batch_size, -1, -1) x, _ = self.attention_pool(query, x, x) x = x.squeeze(1) # Classification return self.classifier(x) GESTURE_NAMES = { 0: 'abang', 1: 'anak_lelaki', 2: 'anak_perempuan', 3: 'apa', 4: 'apa_khabar', 5: 'assalamualaikum', 6: 'ayah', 7: 'bagaimana', 8: 'bahasa_isyarat', 9: 'baik', 10: 'bapa_saudara', 11: 'beli', 12: 'beli_2', 13: 'berapa', 14: 'bila', 15: 'bomba', 16: 'buat', 17: 'emak', 18: 'emak_saudara', 19: 'hari', 20: 'hi', 21: 'hujan', 22: 'jahat', 23: 'jangan', 24: 'kakak', 25: 'keluarga', 26: 'kereta', 27: 'lelaki', 28: 'lemak', 29: 'main', 30: 'mana', 31: 'masalah', 32: 'nasi', 33: 'nasi_lemak', 34: 'panas', 35: 'panas_2', 36: 'pandai', 37: 'pandai_2', 38: 'payung', 39: 'perempuan', 40: 'perlahan', 41: 'perlahan_2', 42: 'pinjam', 43: 'polis', 44: 'pukul', 45: 'ribut', 46: 'saudara', 47: 'sejuk', 48: 'siapa', 49: 'tandas' } # English translations for Malaysian Sign Language gestures GESTURE_TRANSLATIONS = { 'abang': 'Brother', 'anak_lelaki': 'Son', 'anak_perempuan': 'Daughter', 'apa': 'What', 'apa_khabar': 'How are you', 'assalamualaikum': 'Peace be upon you', 'ayah': 'Father', 'bagaimana': 'How', 'bahasa_isyarat': 'Sign Language', 'baik': 'Good', 'bapa_saudara': 'Uncle', 'beli': 'Buy', 'beli_2': 'Buy (variant)', 'berapa': 'How much', 'bila': 'When', 'bomba': 'Firefighter', 'buat': 'Do/Make', 'emak': 'Mother', 'emak_saudara': 'Aunt', 'hari': 'Day', 'hi': 'Hi', 'hujan': 'Rain', 'jahat': 'Bad', 'jangan': 'Don\'t', 'kakak': 'Sister', 'keluarga': 'Family', 'kereta': 'Car', 'lelaki': 'Male', 'lemak': 'Fat', 'main': 'Play', 'mana': 'Where', 'masalah': 'Problem', 'nasi': 'Rice', 'nasi_lemak': 'Nasi Lemak', 'panas': 'Hot', 'panas_2': 'Hot (variant)', 'pandai': 'Smart', 'pandai_2': 'Smart (variant)', 'payung': 'Umbrella', 'perempuan': 'Female', 'perlahan': 'Slow', 'perlahan_2': 'Slow (variant)', 'pinjam': 'Borrow', 'polis': 'Police', 'pukul': 'Hit/Time', 'ribut': 'Storm', 'saudara': 'Sibling', 'sejuk': 'Cold', 'siapa': 'Who', 'tandas': 'Toilet', 'Unknown': 'Unknown' } # --- GLOBAL SHARED RESOURCES (Singletons for Resource Optimization) --- _MP_RESOURCES_LOCK = threading.Lock() _TORCH_MODEL_LOCK = threading.Lock() _SHARED_POSE_LANDMARKER = None _SHARED_HAND_LANDMARKER = None _SHARED_TORCH_MODEL = None _GLOBAL_MP_TIMESTAMP_MS = 0 def _get_shared_mediapipe(): global _SHARED_POSE_LANDMARKER, _SHARED_HAND_LANDMARKER with _MP_RESOURCES_LOCK: if _SHARED_POSE_LANDMARKER is None: print("[INFO] Initializing Shared MediaPipe Landmarkers...") BaseOptions = mp.tasks.BaseOptions PoseLandmarker = mp.tasks.vision.PoseLandmarker PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions HandLandmarker = mp.tasks.vision.HandLandmarker HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions VisionRunningMode = mp.tasks.vision.RunningMode pose_model_path = 'pose_landmarker_lite.task' hand_model_path = 'hand_landmarker.task' if not os.path.exists(pose_model_path) or not os.path.exists(hand_model_path): raise FileNotFoundError("MediaPipe task files not found.") pose_options = PoseLandmarkerOptions( base_options=BaseOptions( model_asset_path=pose_model_path, delegate=BaseOptions.Delegate.GPU ), running_mode=VisionRunningMode.VIDEO, min_pose_detection_confidence=0.5, min_pose_presence_confidence=0.5, min_tracking_confidence=0.5, output_segmentation_masks=False ) hand_options = HandLandmarkerOptions( base_options=BaseOptions( model_asset_path=hand_model_path, delegate=BaseOptions.Delegate.GPU ), num_hands=2, min_hand_detection_confidence=0.5, min_tracking_confidence=0.5, running_mode=VisionRunningMode.VIDEO ) _SHARED_POSE_LANDMARKER = PoseLandmarker.create_from_options(pose_options) _SHARED_HAND_LANDMARKER = HandLandmarker.create_from_options(hand_options) return _SHARED_POSE_LANDMARKER, _SHARED_HAND_LANDMARKER def _get_shared_torch_model(model_path, device): global _SHARED_TORCH_MODEL with _TORCH_MODEL_LOCK: if _SHARED_TORCH_MODEL is None: print(f"[INFO] Loading Shared Torch Model: {model_path}") if not os.path.exists(model_path): raise FileNotFoundError(f"Model file not found: {model_path}") model = CNNTransformerHybrid( input_size=258, num_classes=len(GESTURE_NAMES), dropout=0.35 ).to(device) checkpoint = torch.load(model_path, map_location=device, weights_only=False) model.load_state_dict(checkpoint['model_state_dict']) # Enable FP16 (Half Precision) for GPU optimization if device.type == 'cuda': model = model.half() torch.backends.cudnn.benchmark = True model.eval() _SHARED_TORCH_MODEL = model return _SHARED_TORCH_MODEL class GestureRecognizer: def __init__(self, model_path='best_cnn_transformer_hybrid.pth'): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.sequence_buffer = deque(maxlen=30) # Access shared singletons self.model = _get_shared_torch_model(model_path, self.device) self.pose_landmarker, self.hand_landmarker = _get_shared_mediapipe() self.last_hands_detected = False self.last_pose_detected = False def reset_tracking(self): """No-op in shared mode to avoid closing for other users, just clear buffer""" self.sequence_buffer.clear() def extract_landmarks(self, frame, timestamp_ms=None): global _GLOBAL_MP_TIMESTAMP_MS if timestamp_ms is None: timestamp_ms = int(time.time() * 1000) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame) # Shared block for MediaPipe inference with _MP_RESOURCES_LOCK: # Enforce globally monotonic timestamps for all users sharing the models if timestamp_ms <= _GLOBAL_MP_TIMESTAMP_MS: timestamp_ms = _GLOBAL_MP_TIMESTAMP_MS + 1 _GLOBAL_MP_TIMESTAMP_MS = timestamp_ms # MediaPipe VIDEO mode requires strictly increasing timestamps # In a real app we might need to handle out-of-order or duplicate timestamps # For now we assume the caller provides valid sequential times or we use system time try: pose_result = self.pose_landmarker.detect_for_video(mp_image, timestamp_ms) hand_result = self.hand_landmarker.detect_for_video(mp_image, timestamp_ms) except Exception as e: print(f"MediaPipe Tracking Error (resetting): {e}") # Fallback: simple reset and retry with bumped timestamp might be complex # Just return empty for this frame to recover # Or try to recreate? return np.zeros(258, dtype=np.float32), type('obj', (object,), {'pose_landmarks': []}), type('obj', (object,), {'hand_landmarks': [], 'handedness': []}) features = np.zeros(258, dtype=np.float32) # Track detection status self.last_pose_detected = bool(pose_result.pose_landmarks) self.last_hands_detected = bool(hand_result.hand_landmarks) # Pose if pose_result.pose_landmarks: pose_landmarks = pose_result.pose_landmarks[0] for i in range(min(33, len(pose_landmarks))): idx = i * 4 features[idx] = pose_landmarks[i].x features[idx + 1] = pose_landmarks[i].y features[idx + 2] = pose_landmarks[i].z features[idx + 3] = pose_landmarks[i].visibility # Hands left_hand_idx = 132 right_hand_idx = 195 if hand_result.hand_landmarks: for i, hand_landmarks in enumerate(hand_result.hand_landmarks): if i < len(hand_result.handedness): handedness = hand_result.handedness[i][0].category_name start_idx = left_hand_idx if handedness == 'Left' else right_hand_idx for j in range(min(21, len(hand_landmarks))): idx = start_idx + (j * 3) features[idx] = hand_landmarks[j].x features[idx + 1] = hand_landmarks[j].y features[idx + 2] = hand_landmarks[j].z return features, pose_result, hand_result def predict(self, frame, timestamp_ms=None): features, pose_res, hand_res = self.extract_landmarks(frame, timestamp_ms=timestamp_ms) self.sequence_buffer.append(features) prediction_idx = None confidence = 0.0 probabilities = None if len(self.sequence_buffer) == 30: sequence_array = np.array(list(self.sequence_buffer), dtype=np.float32) sequence_array = np.expand_dims(sequence_array, axis=0) input_tensor = torch.tensor(sequence_array, dtype=torch.float32).to(self.device) # Match model precision if self.device.type == 'cuda': input_tensor = input_tensor.half() with _TORCH_MODEL_LOCK: # Added lock for shared model prediction with torch.no_grad(): outputs = self.model(input_tensor) probs = F.softmax(outputs, dim=1) conf, pred = torch.max(probs, dim=1) prediction_idx = pred.item() confidence = conf.item() probabilities = probs[0].cpu().numpy() return { 'prediction_idx': prediction_idx, 'confidence': confidence, 'gesture_name': GESTURE_NAMES.get(prediction_idx) if prediction_idx is not None else None, 'pose_result': pose_res, 'hand_result': hand_res, 'probabilities': probabilities } def extract_features(self, frame, timestamp_ms=None): """Extract 258-dimensional feature vector from a single frame""" features, _, _ = self.extract_landmarks(frame, timestamp_ms=timestamp_ms) return features def predict_from_sequence(self, sequence): """ Predict gesture from a pre-built sequence of features Args: sequence: numpy array of shape (1, 30, 258) Returns: dict with prediction results """ input_tensor = torch.tensor(sequence, dtype=torch.float32).to(self.device) # Match model precision if self.device.type == 'cuda': input_tensor = input_tensor.half() # Prediction Shared Block with _TORCH_MODEL_LOCK: with torch.no_grad(): outputs = self.model(input_tensor) probabilities = F.softmax(outputs, dim=1) confidence, predicted = torch.max(probabilities, dim=1) prediction_idx = predicted.item() confidence = confidence.item() probabilities = probabilities[0].cpu().numpy() return { 'prediction_idx': prediction_idx, 'confidence': float(confidence), 'gesture_name': GESTURE_NAMES.get(prediction_idx) if prediction_idx is not None else None, 'probabilities': probabilities, 'hands_detected': hasattr(self, 'last_hands_detected') and self.last_hands_detected, 'pose_detected': hasattr(self, 'last_pose_detected') and self.last_pose_detected } def draw_landmarks(self, frame, pose_result, hand_result): annotated_frame = frame.copy() if pose_result.pose_landmarks: for landmark in pose_result.pose_landmarks[0]: x = int(landmark.x * frame.shape[1]) y = int(landmark.y * frame.shape[0]) cv2.circle(annotated_frame, (x, y), 3, (0, 255, 0), -1) if hand_result.hand_landmarks: for hand_landmarks in hand_result.hand_landmarks: for landmark in hand_landmarks: x = int(landmark.x * frame.shape[1]) y = int(landmark.y * frame.shape[0]) cv2.circle(annotated_frame, (x, y), 3, (255, 0, 0), -1) return annotated_frame