import os import cv2 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import gradio as gr from pathlib import Path import mediapipe as mp import json # MediaPipe設定 mp_pose = mp.solutions.pose mp_hands = mp.solutions.hands mp_face_mesh = mp.solutions.face_mesh # 設定設備 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"使用設備: {device}") # 載入標籤映射 label_to_idx = {'again': 0, 'all': 1, 'apple': 2, 'bad': 3, 'bathroom': 4, 'beautiful': 5, 'bird': 6, 'black': 7, 'blue': 8, 'book': 9, 'bored': 10, 'boy': 11, 'brother': 12, 'brown': 13, 'but': 14, 'computer': 15, 'cousin': 16, 'dance': 17, 'day': 18, 'deaf': 19, 'doctor': 20, 'dog': 21, 'draw': 22, 'drink': 23, 'eat': 24, 'english': 25, 'family': 26, 'father': 27, 'fine': 28, 'finish': 29, 'fish': 30, 'forget': 31, 'friend': 32, 'girl': 33} idx_to_label = {v: k for k, v in label_to_idx.items()} class SignLanguageModel(nn.Module): """Sign Language Recognition Model""" def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5, flow_dim=10): super(SignLanguageModel, self).__init__() self.hidden_dim = hidden_dim self.num_layers = num_layers self.num_classes = num_classes # Keypoint feature projection self.keypoint_projection = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Dropout(dropout/2), nn.Linear(hidden_dim, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Dropout(dropout/2) ) # Flow feature projection self.flow_projection = nn.Sequential( nn.Linear(flow_dim, hidden_dim // 2), nn.BatchNorm1d(hidden_dim // 2), nn.ReLU(), nn.Dropout(dropout/2), nn.Linear(hidden_dim // 2, hidden_dim // 2), nn.BatchNorm1d(hidden_dim // 2), nn.ReLU(), nn.Dropout(dropout/2) ) # Feature fusion self.fusion_layer = nn.Sequential( nn.Linear(hidden_dim + (hidden_dim // 2), hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Dropout(dropout/2) ) # Bidirectional LSTM self.lstm = nn.LSTM( input_size=hidden_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True ) # GRU for additional temporal features self.gru = nn.GRU( input_size=hidden_dim * 2, hidden_size=hidden_dim, num_layers=1, batch_first=True, bidirectional=True ) # Batch normalization self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2) self.gru_bn = nn.BatchNorm1d(hidden_dim * 2) # Multi-head attention self.multihead_attn = nn.MultiheadAttention( embed_dim=hidden_dim * 2, num_heads=4, dropout=dropout, batch_first=True ) # Attention mechanism self.attention = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, 1), nn.Softmax(dim=1) ) # Classifier self.classifier = nn.Sequential( nn.Linear(hidden_dim * 4, hidden_dim * 2), nn.BatchNorm1d(hidden_dim * 2), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim * 2, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Dropout(dropout/2), nn.Linear(hidden_dim, num_classes) ) self._init_weights() def _init_weights(self): """Initialize model weights""" for m in self.modules(): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, (nn.LSTM, nn.GRU)): for name, param in m.named_parameters(): if 'weight' in name: nn.init.orthogonal_(param) elif 'bias' in name: nn.init.zeros_(param) def forward(self, keypoints, flow=None): """Forward pass""" batch_size, seq_len, _ = keypoints.size() # Process keypoint features kp_reshaped = keypoints.reshape(-1, keypoints.size(-1)) # First layer kp_projected = self.keypoint_projection[0](kp_reshaped) kp_projected = kp_projected.reshape(batch_size, seq_len, -1) kp_projected = kp_projected.transpose(1, 2) kp_projected = self.keypoint_projection[1](kp_projected) kp_projected = kp_projected.transpose(1, 2) kp_projected = self.keypoint_projection[2](kp_projected) kp_projected = self.keypoint_projection[3](kp_projected) # Second layer kp_projected_reshaped = kp_projected.reshape(-1, kp_projected.size(-1)) kp_projected = self.keypoint_projection[4](kp_projected_reshaped) kp_projected = kp_projected.reshape(batch_size, seq_len, -1) kp_projected = kp_projected.transpose(1, 2) kp_projected = self.keypoint_projection[5](kp_projected) kp_projected = kp_projected.transpose(1, 2) kp_projected = self.keypoint_projection[6](kp_projected) kp_projected = self.keypoint_projection[7](kp_projected) # Process flow features if provided if flow is not None: flow_reshaped = flow.reshape(-1, flow.size(-1)) # First layer flow_projected = self.flow_projection[0](flow_reshaped) flow_projected = flow_projected.reshape(batch_size, seq_len, -1) flow_projected = flow_projected.transpose(1, 2) flow_projected = self.flow_projection[1](flow_projected) flow_projected = flow_projected.transpose(1, 2) flow_projected = self.flow_projection[2](flow_projected) flow_projected = self.flow_projection[3](flow_projected) # Second layer flow_projected_reshaped = flow_projected.reshape(-1, flow_projected.size(-1)) flow_projected = self.flow_projection[4](flow_projected_reshaped) flow_projected = flow_projected.reshape(batch_size, seq_len, -1) flow_projected = flow_projected.transpose(1, 2) flow_projected = self.flow_projection[5](flow_projected) flow_projected = flow_projected.transpose(1, 2) flow_projected = self.flow_projection[6](flow_projected) flow_projected = self.flow_projection[7](flow_projected) # Feature fusion combined_features = torch.cat([kp_projected, flow_projected], dim=2) combined_reshaped = combined_features.reshape(-1, combined_features.size(-1)) fused_features = self.fusion_layer[0](combined_reshaped) fused_features = fused_features.reshape(batch_size, seq_len, -1) fused_features = fused_features.transpose(1, 2) fused_features = self.fusion_layer[1](fused_features) fused_features = fused_features.transpose(1, 2) fused_features = self.fusion_layer[2](fused_features) fused_features = self.fusion_layer[3](fused_features) x_projected = fused_features else: x_projected = kp_projected # Residual connection x_residual = x_projected # LSTM processing lstm_out, _ = self.lstm(x_projected) # Residual connection x_residual_expanded = torch.cat([x_residual, x_residual], dim=2) lstm_out_with_residual = lstm_out + x_residual_expanded # BatchNorm for LSTM output lstm_out_bn = lstm_out_with_residual.transpose(1, 2) lstm_out_bn = self.lstm_bn(lstm_out_bn) lstm_out = lstm_out_bn.transpose(1, 2) # GRU processing gru_out, _ = self.gru(lstm_out) # BatchNorm for GRU output gru_out_bn = gru_out.transpose(1, 2) gru_out_bn = self.gru_bn(gru_out_bn) gru_out = gru_out_bn.transpose(1, 2) # Multi-head attention attn_output, _ = self.multihead_attn(lstm_out, lstm_out, lstm_out) # Traditional attention attention_weights = self.attention(gru_out) context_gru = torch.bmm(gru_out.transpose(1, 2), attention_weights) context_gru = context_gru.squeeze(-1) attention_weights_attn = self.attention(attn_output) context_attn = torch.bmm(attn_output.transpose(1, 2), attention_weights_attn) context_attn = context_attn.squeeze(-1) # Combine contexts combined_context = torch.cat([context_gru, context_attn], dim=1) # Final classification output = self.classifier(combined_context) return output # 初始化模型 model = SignLanguageModel( input_dim=225, # keypoint dimension hidden_dim=256, num_layers=2, num_classes=len(label_to_idx), dropout=0.5, flow_dim=10 ) model = model.to(device) # 載入模型權重 model_path = Path("tsflow/models/best_model.pt") if model_path.exists(): try: checkpoint = torch.load(model_path, map_location=device) if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: model.load_state_dict(checkpoint['model_state_dict']) else: model.load_state_dict(checkpoint) model.eval() print("✅ 模型載入成功") except Exception as e: print(f"❌ 模型載入失敗: {e}") raise else: print(f"❌ 找不到模型檔案: {model_path}") raise FileNotFoundError(f"模型檔案不存在: {model_path}") def extract_keypoints_from_frame(frame): """從單個frame提取關鍵點 - 與訓練時一致""" try: with mp.solutions.holistic.Holistic( static_image_mode=True, model_complexity=1, min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic: # 轉換為RGB格式 frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_rgb.flags.writeable = False results = holistic.process(frame_rgb) frame_rgb.flags.writeable = True keypoints = [] # 提取手部關鍵點 (左手: 21個點 * 3維 = 63) if results.left_hand_landmarks: for landmark in results.left_hand_landmarks.landmark: keypoints.extend([landmark.x, landmark.y, landmark.z]) else: keypoints.extend([0] * (21 * 3)) # 提取手部關鍵點 (右手: 21個點 * 3維 = 63) if results.right_hand_landmarks: for landmark in results.right_hand_landmarks.landmark: keypoints.extend([landmark.x, landmark.y, landmark.z]) else: keypoints.extend([0] * (21 * 3)) # 提取姿勢關鍵點 (33個點 * 3維 = 99) if results.pose_landmarks: for landmark in results.pose_landmarks.landmark: keypoints.extend([landmark.x, landmark.y, landmark.z]) else: keypoints.extend([0] * (33 * 3)) return np.array(keypoints[:225], dtype=np.float32), results except Exception as e: print(f"關鍵點提取錯誤: {e}") return np.zeros(225, dtype=np.float32), None def create_hand_mask(frame, left_hand_landmarks, right_hand_landmarks, pose_landmarks): """創建手部和上半身的ROI遮罩 - 與訓練時一致""" h, w = frame.shape[:2] mask = np.zeros((h, w), dtype=np.uint8) def draw_landmarks_on_mask(landmarks, radius=15): if landmarks: for landmark in landmarks.landmark: x, y = int(landmark.x * w), int(landmark.y * h) if 0 <= x < w and 0 <= y < h: cv2.circle(mask, (x, y), radius=radius, color=255, thickness=-1) # 繪製左手關鍵點 draw_landmarks_on_mask(left_hand_landmarks, radius=20) # 繪製右手關鍵點 draw_landmarks_on_mask(right_hand_landmarks, radius=20) # 只繪製上半身關鍵點 (頭部、肩膀、手臂) if pose_landmarks: upper_body_indices = list(range(0, 25)) # 0-24為上半身關鍵點 for idx in upper_body_indices: if idx < len(pose_landmarks.landmark): landmark = pose_landmarks.landmark[idx] x, y = int(landmark.x * w), int(landmark.y * h) if 0 <= x < w and 0 <= y < h: cv2.circle(mask, (x, y), radius=10, color=255, thickness=-1) # 擴大遮罩區域,使用膨脹操作 kernel = np.ones((15, 15), np.uint8) dilated_mask = cv2.dilate(mask, kernel, iterations=1) return dilated_mask def compute_regional_optical_flow(prev_frame, curr_frame, mask, downscale=0.5): """計算區域性光流特徵 - 與訓練時一致""" try: # 降低解析度 if downscale < 1.0: h, w = prev_frame.shape[:2] new_h, new_w = int(h * downscale), int(w * downscale) prev_small = cv2.resize(prev_frame, (new_w, new_h)) curr_small = cv2.resize(curr_frame, (new_w, new_h)) mask_small = cv2.resize(mask, (new_w, new_h)) else: prev_small = prev_frame curr_small = curr_frame mask_small = mask # 轉換為灰度圖 prev_gray = cv2.cvtColor(prev_small, cv2.COLOR_BGR2GRAY) curr_gray = cv2.cvtColor(curr_small, cv2.COLOR_BGR2GRAY) # 計算遮罩區域的光流 flow = cv2.calcOpticalFlowFarneback( prev_gray, curr_gray, None, # flow 0.5, # pyr_scale 3, # levels 15, # winsize 3, # iterations 5, # poly_n 1.2, # poly_sigma 0 # flags ) # 將mask_small轉換為布爾遮罩 bool_mask = mask_small > 0 # 只計算遮罩區域的光流特徵 if np.any(bool_mask): # 提取x和y方向的光流 fx = flow[..., 0][bool_mask] fy = flow[..., 1][bool_mask] # 計算統計特徵 flow_features = np.array([ np.mean(fx), np.std(fx), np.mean(fy), np.std(fy), np.percentile(fx, 25), np.percentile(fx, 75), np.percentile(fy, 25), np.percentile(fy, 75), np.max(np.abs(fx)), np.max(np.abs(fy)) ], dtype=np.float16) else: flow_features = np.zeros(10, dtype=np.float16) return flow_features except Exception as e: print(f"區域性光流計算錯誤: {e}") return np.zeros(10, dtype=np.float16) def predict_sign_language(video_path): """預測手語影片""" try: cap = cv2.VideoCapture(video_path) frames = [] while True: ret, frame = cap.read() if not ret: break frames.append(frame) cap.release() if len(frames) == 0: return "錯誤:無法讀取影片幀", 0.0 # 提取特徵 - 同時獲取關鍵點和MediaPipe結果 keypoints_sequence = [] all_results = [] for frame in frames: keypoints, results = extract_keypoints_from_frame(frame) keypoints_sequence.append(keypoints) all_results.append(results) # 計算每一幀的光流特徵 flow_features = [] for i in range(len(frames) - 1): # 使用當前幀的MediaPipe結果創建遮罩 current_results = all_results[i] if current_results is not None: mask = create_hand_mask( frames[i], current_results.left_hand_landmarks, current_results.right_hand_landmarks, current_results.pose_landmarks ) else: # 如果沒有MediaPipe結果,創建空遮罩 h, w = frames[i].shape[:2] mask = np.zeros((h, w), dtype=np.uint8) flow = compute_regional_optical_flow(frames[i], frames[i + 1], mask) flow_features.append(flow) # 確保光流特徵的幀數與關鍵點一致 if len(flow_features) < len(keypoints_sequence): # 如果光流特徵少於關鍵點幀數,複製最後一個光流特徵 while len(flow_features) < len(keypoints_sequence): if flow_features: flow_features.append(flow_features[-1]) else: flow_features.append(np.zeros(10, dtype=np.float16)) # 確保序列長度為50 (與訓練時一致) target_length = 50 if len(keypoints_sequence) > target_length: # 均勻採樣關鍵點和光流特徵 indices = np.linspace(0, len(keypoints_sequence) - 1, target_length, dtype=int) keypoints_sequence = [keypoints_sequence[i] for i in indices] flow_features = [flow_features[min(i, len(flow_features)-1)] for i in indices] elif len(keypoints_sequence) < target_length: # 重複最後一幀 while len(keypoints_sequence) < target_length: if keypoints_sequence: keypoints_sequence.append(keypoints_sequence[-1]) flow_features.append(flow_features[-1] if flow_features else np.zeros(10, dtype=np.float16)) else: keypoints_sequence.append(np.zeros(225, dtype=np.float32)) flow_features.append(np.zeros(10, dtype=np.float16)) # 轉換為numpy數組再轉為tensor (避免警告) keypoints_array = np.array(keypoints_sequence, dtype=np.float32) flow_array = np.array(flow_features, dtype=np.float32) keypoints_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(device) flow_tensor = torch.from_numpy(flow_array).unsqueeze(0).to(device) print(f"關鍵點張量形狀: {keypoints_tensor.shape}") print(f"光流張量形狀: {flow_tensor.shape}") with torch.no_grad(): outputs = model(keypoints_tensor, flow_tensor) probabilities = torch.softmax(outputs, dim=1) predicted_class = torch.argmax(probabilities, dim=1).item() confidence = probabilities[0][predicted_class].item() predicted_label = idx_to_label.get(predicted_class, "未知") return f"預測結果: {predicted_label}", confidence except Exception as e: print(f"預測錯誤: {e}") return f"預測失敗: {str(e)}", 0.0 def gradio_predict(video): """Gradio介面的預測函數""" if video is None: return "請上傳影片", "信心度: 0%" try: result, confidence = predict_sign_language(video) confidence_text = f"信心度: {confidence:.2%}" return result, confidence_text except Exception as e: return f"處理錯誤: {str(e)}", "信心度: 0%" # 建立Gradio介面 demo = gr.Interface( fn=gradio_predict, inputs=gr.Video(label="上傳手語影片"), outputs=[ gr.Textbox(label="預測結果"), gr.Textbox(label="信心度") ], title="🤟 SignView2.0 - 手語辨識系統", description=""" ### 歡迎使用 SignView2.0 手語辨識系統! **系統特色:** - 🎯 準確率:94.25% - 📚 支援34種手語詞彙 - 🧠 使用BiLSTM + GRU + 多頭注意力機制 - 👁️ MediaPipe + 光流特徵融合 **使用方法:** 1. 上傳手語影片(建議3-4秒) 2. 點擊提交進行辨識 3. 查看預測結果和信心度 **支援詞彙:** again, all, apple, bad, bathroom, beautiful, bird, black, blue, book, bored, boy, brother, brown, but, computer, cousin, dance, day, deaf, doctor, dog, draw, drink, eat, english, family, father, fine, finish, fish, forget, friend, girl """, examples=[] ) if __name__ == "__main__": demo.launch()