BIM-MSL / video_processor.py
Antigravity Bot
Full GPU acceleration and high-fidelity optimization for NVIDIA L40S
23019cc
import cv2
import torch
import numpy as np
import mediapipe as mp
import warnings
import os
import time
import threading
from collections import deque
import torch.nn as nn
import torch.nn.functional as F
warnings.filterwarnings('ignore')
class CNNTransformerHybrid(nn.Module):
"""EXACT SAME MODEL AS TRAINING"""
def __init__(self, input_size=258, num_classes=51, dropout=0.35):
super().__init__()
# CNN FOR LOCAL FEATURES
self.cnn = nn.Sequential(
nn.Conv1d(input_size, 128, kernel_size=3, padding=1),
nn.BatchNorm1d(128),
nn.GELU(),
nn.Dropout(dropout),
nn.Conv1d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm1d(256),
nn.GELU(),
nn.Dropout(dropout),
nn.Conv1d(256, 256, kernel_size=3, padding=1),
nn.BatchNorm1d(256),
nn.GELU(),
nn.Dropout(dropout),
nn.AdaptiveAvgPool1d(16)
)
# TRANSFORMER FOR TEMPORAL DEPENDENCIES
self.pos_embed = nn.Parameter(torch.randn(1, 16, 256))
self.pos_dropout = nn.Dropout(dropout)
encoder_layer = nn.TransformerEncoderLayer(
d_model=256,
nhead=8,
dim_feedforward=512,
dropout=dropout,
batch_first=True,
norm_first=True,
activation='gelu'
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=3)
# ATTENTION POOLING
self.attention_pool = nn.MultiheadAttention(
embed_dim=256,
num_heads=4,
dropout=dropout,
batch_first=True
)
self.pool_query = nn.Parameter(torch.randn(1, 1, 256))
# CLASSIFICATION HEAD
self.classifier = nn.Sequential(
nn.LayerNorm(256),
nn.Linear(256, 512),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(512, 256),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(256, 128),
nn.GELU(),
nn.Dropout(dropout * 0.5),
nn.Linear(128, num_classes)
)
self._init_weights()
def _init_weights(self):
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def forward(self, x):
batch_size = x.size(0)
# CNN processing
x = x.transpose(1, 2)
x = self.cnn(x)
x = x.transpose(1, 2)
# Add positional encoding
x = x + self.pos_embed
x = self.pos_dropout(x)
# Transformer encoding
x = self.transformer(x)
# Attention pooling
query = self.pool_query.expand(batch_size, -1, -1)
x, _ = self.attention_pool(query, x, x)
x = x.squeeze(1)
# Classification
return self.classifier(x)
GESTURE_NAMES = {
0: 'abang',
1: 'anak_lelaki',
2: 'anak_perempuan',
3: 'apa',
4: 'apa_khabar',
5: 'assalamualaikum',
6: 'ayah',
7: 'bagaimana',
8: 'bahasa_isyarat',
9: 'baik',
10: 'bapa_saudara',
11: 'beli',
12: 'beli_2',
13: 'berapa',
14: 'bila',
15: 'bomba',
16: 'buat',
17: 'emak',
18: 'emak_saudara',
19: 'hari',
20: 'hi',
21: 'hujan',
22: 'jahat',
23: 'jangan',
24: 'kakak',
25: 'keluarga',
26: 'kereta',
27: 'lelaki',
28: 'lemak',
29: 'main',
30: 'mana',
31: 'masalah',
32: 'nasi',
33: 'nasi_lemak',
34: 'panas',
35: 'panas_2',
36: 'pandai',
37: 'pandai_2',
38: 'payung',
39: 'perempuan',
40: 'perlahan',
41: 'perlahan_2',
42: 'pinjam',
43: 'polis',
44: 'pukul',
45: 'ribut',
46: 'saudara',
47: 'sejuk',
48: 'siapa',
49: 'tandas'
}
# English translations for Malaysian Sign Language gestures
GESTURE_TRANSLATIONS = {
'abang': 'Brother',
'anak_lelaki': 'Son',
'anak_perempuan': 'Daughter',
'apa': 'What',
'apa_khabar': 'How are you',
'assalamualaikum': 'Peace be upon you',
'ayah': 'Father',
'bagaimana': 'How',
'bahasa_isyarat': 'Sign Language',
'baik': 'Good',
'bapa_saudara': 'Uncle',
'beli': 'Buy',
'beli_2': 'Buy (variant)',
'berapa': 'How much',
'bila': 'When',
'bomba': 'Firefighter',
'buat': 'Do/Make',
'emak': 'Mother',
'emak_saudara': 'Aunt',
'hari': 'Day',
'hi': 'Hi',
'hujan': 'Rain',
'jahat': 'Bad',
'jangan': 'Don\'t',
'kakak': 'Sister',
'keluarga': 'Family',
'kereta': 'Car',
'lelaki': 'Male',
'lemak': 'Fat',
'main': 'Play',
'mana': 'Where',
'masalah': 'Problem',
'nasi': 'Rice',
'nasi_lemak': 'Nasi Lemak',
'panas': 'Hot',
'panas_2': 'Hot (variant)',
'pandai': 'Smart',
'pandai_2': 'Smart (variant)',
'payung': 'Umbrella',
'perempuan': 'Female',
'perlahan': 'Slow',
'perlahan_2': 'Slow (variant)',
'pinjam': 'Borrow',
'polis': 'Police',
'pukul': 'Hit/Time',
'ribut': 'Storm',
'saudara': 'Sibling',
'sejuk': 'Cold',
'siapa': 'Who',
'tandas': 'Toilet',
'Unknown': 'Unknown'
}
# --- GLOBAL SHARED RESOURCES (Singletons for Resource Optimization) ---
_MP_RESOURCES_LOCK = threading.Lock()
_TORCH_MODEL_LOCK = threading.Lock()
_SHARED_POSE_LANDMARKER = None
_SHARED_HAND_LANDMARKER = None
_SHARED_TORCH_MODEL = None
_GLOBAL_MP_TIMESTAMP_MS = 0
def _get_shared_mediapipe():
global _SHARED_POSE_LANDMARKER, _SHARED_HAND_LANDMARKER
with _MP_RESOURCES_LOCK:
if _SHARED_POSE_LANDMARKER is None:
print("[INFO] Initializing Shared MediaPipe Landmarkers...")
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
HandLandmarker = mp.tasks.vision.HandLandmarker
HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode
pose_model_path = 'pose_landmarker_lite.task'
hand_model_path = 'hand_landmarker.task'
if not os.path.exists(pose_model_path) or not os.path.exists(hand_model_path):
raise FileNotFoundError("MediaPipe task files not found.")
pose_options = PoseLandmarkerOptions(
base_options=BaseOptions(
model_asset_path=pose_model_path,
delegate=BaseOptions.Delegate.GPU
),
running_mode=VisionRunningMode.VIDEO,
min_pose_detection_confidence=0.5,
min_pose_presence_confidence=0.5,
min_tracking_confidence=0.5,
output_segmentation_masks=False
)
hand_options = HandLandmarkerOptions(
base_options=BaseOptions(
model_asset_path=hand_model_path,
delegate=BaseOptions.Delegate.GPU
),
num_hands=2,
min_hand_detection_confidence=0.5,
min_tracking_confidence=0.5,
running_mode=VisionRunningMode.VIDEO
)
_SHARED_POSE_LANDMARKER = PoseLandmarker.create_from_options(pose_options)
_SHARED_HAND_LANDMARKER = HandLandmarker.create_from_options(hand_options)
return _SHARED_POSE_LANDMARKER, _SHARED_HAND_LANDMARKER
def _get_shared_torch_model(model_path, device):
global _SHARED_TORCH_MODEL
with _TORCH_MODEL_LOCK:
if _SHARED_TORCH_MODEL is None:
print(f"[INFO] Loading Shared Torch Model: {model_path}")
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
model = CNNTransformerHybrid(
input_size=258,
num_classes=len(GESTURE_NAMES),
dropout=0.35
).to(device)
checkpoint = torch.load(model_path, map_location=device, weights_only=False)
model.load_state_dict(checkpoint['model_state_dict'])
# Enable FP16 (Half Precision) for GPU optimization
if device.type == 'cuda':
model = model.half()
torch.backends.cudnn.benchmark = True
model.eval()
_SHARED_TORCH_MODEL = model
return _SHARED_TORCH_MODEL
class GestureRecognizer:
def __init__(self, model_path='best_cnn_transformer_hybrid.pth'):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.sequence_buffer = deque(maxlen=30)
# Access shared singletons
self.model = _get_shared_torch_model(model_path, self.device)
self.pose_landmarker, self.hand_landmarker = _get_shared_mediapipe()
self.last_hands_detected = False
self.last_pose_detected = False
def reset_tracking(self):
"""No-op in shared mode to avoid closing for other users, just clear buffer"""
self.sequence_buffer.clear()
def extract_landmarks(self, frame, timestamp_ms=None):
global _GLOBAL_MP_TIMESTAMP_MS
if timestamp_ms is None:
timestamp_ms = int(time.time() * 1000)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
# Shared block for MediaPipe inference
with _MP_RESOURCES_LOCK:
# Enforce globally monotonic timestamps for all users sharing the models
if timestamp_ms <= _GLOBAL_MP_TIMESTAMP_MS:
timestamp_ms = _GLOBAL_MP_TIMESTAMP_MS + 1
_GLOBAL_MP_TIMESTAMP_MS = timestamp_ms
# MediaPipe VIDEO mode requires strictly increasing timestamps
# In a real app we might need to handle out-of-order or duplicate timestamps
# For now we assume the caller provides valid sequential times or we use system time
try:
pose_result = self.pose_landmarker.detect_for_video(mp_image, timestamp_ms)
hand_result = self.hand_landmarker.detect_for_video(mp_image, timestamp_ms)
except Exception as e:
print(f"MediaPipe Tracking Error (resetting): {e}")
# Fallback: simple reset and retry with bumped timestamp might be complex
# Just return empty for this frame to recover
# Or try to recreate?
return np.zeros(258, dtype=np.float32), type('obj', (object,), {'pose_landmarks': []}), type('obj', (object,), {'hand_landmarks': [], 'handedness': []})
features = np.zeros(258, dtype=np.float32)
# Track detection status
self.last_pose_detected = bool(pose_result.pose_landmarks)
self.last_hands_detected = bool(hand_result.hand_landmarks)
# Pose
if pose_result.pose_landmarks:
pose_landmarks = pose_result.pose_landmarks[0]
for i in range(min(33, len(pose_landmarks))):
idx = i * 4
features[idx] = pose_landmarks[i].x
features[idx + 1] = pose_landmarks[i].y
features[idx + 2] = pose_landmarks[i].z
features[idx + 3] = pose_landmarks[i].visibility
# Hands
left_hand_idx = 132
right_hand_idx = 195
if hand_result.hand_landmarks:
for i, hand_landmarks in enumerate(hand_result.hand_landmarks):
if i < len(hand_result.handedness):
handedness = hand_result.handedness[i][0].category_name
start_idx = left_hand_idx if handedness == 'Left' else right_hand_idx
for j in range(min(21, len(hand_landmarks))):
idx = start_idx + (j * 3)
features[idx] = hand_landmarks[j].x
features[idx + 1] = hand_landmarks[j].y
features[idx + 2] = hand_landmarks[j].z
return features, pose_result, hand_result
def predict(self, frame, timestamp_ms=None):
features, pose_res, hand_res = self.extract_landmarks(frame, timestamp_ms=timestamp_ms)
self.sequence_buffer.append(features)
prediction_idx = None
confidence = 0.0
probabilities = None
if len(self.sequence_buffer) == 30:
sequence_array = np.array(list(self.sequence_buffer), dtype=np.float32)
sequence_array = np.expand_dims(sequence_array, axis=0)
input_tensor = torch.tensor(sequence_array, dtype=torch.float32).to(self.device)
# Match model precision
if self.device.type == 'cuda':
input_tensor = input_tensor.half()
with _TORCH_MODEL_LOCK: # Added lock for shared model prediction
with torch.no_grad():
outputs = self.model(input_tensor)
probs = F.softmax(outputs, dim=1)
conf, pred = torch.max(probs, dim=1)
prediction_idx = pred.item()
confidence = conf.item()
probabilities = probs[0].cpu().numpy()
return {
'prediction_idx': prediction_idx,
'confidence': confidence,
'gesture_name': GESTURE_NAMES.get(prediction_idx) if prediction_idx is not None else None,
'pose_result': pose_res,
'hand_result': hand_res,
'probabilities': probabilities
}
def extract_features(self, frame, timestamp_ms=None):
"""Extract 258-dimensional feature vector from a single frame"""
features, _, _ = self.extract_landmarks(frame, timestamp_ms=timestamp_ms)
return features
def predict_from_sequence(self, sequence):
"""
Predict gesture from a pre-built sequence of features
Args:
sequence: numpy array of shape (1, 30, 258)
Returns:
dict with prediction results
"""
input_tensor = torch.tensor(sequence, dtype=torch.float32).to(self.device)
# Match model precision
if self.device.type == 'cuda':
input_tensor = input_tensor.half()
# Prediction Shared Block
with _TORCH_MODEL_LOCK:
with torch.no_grad():
outputs = self.model(input_tensor)
probabilities = F.softmax(outputs, dim=1)
confidence, predicted = torch.max(probabilities, dim=1)
prediction_idx = predicted.item()
confidence = confidence.item()
probabilities = probabilities[0].cpu().numpy()
return {
'prediction_idx': prediction_idx,
'confidence': float(confidence),
'gesture_name': GESTURE_NAMES.get(prediction_idx) if prediction_idx is not None else None,
'probabilities': probabilities,
'hands_detected': hasattr(self, 'last_hands_detected') and self.last_hands_detected,
'pose_detected': hasattr(self, 'last_pose_detected') and self.last_pose_detected
}
def draw_landmarks(self, frame, pose_result, hand_result):
annotated_frame = frame.copy()
if pose_result.pose_landmarks:
for landmark in pose_result.pose_landmarks[0]:
x = int(landmark.x * frame.shape[1])
y = int(landmark.y * frame.shape[0])
cv2.circle(annotated_frame, (x, y), 3, (0, 255, 0), -1)
if hand_result.hand_landmarks:
for hand_landmarks in hand_result.hand_landmarks:
for landmark in hand_landmarks:
x = int(landmark.x * frame.shape[1])
y = int(landmark.y * frame.shape[0])
cv2.circle(annotated_frame, (x, y), 3, (255, 0, 0), -1)
return annotated_frame