Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import gradio as gr | |
| from pathlib import Path | |
| import mediapipe as mp | |
| import json | |
| # MediaPipe設定 | |
| mp_pose = mp.solutions.pose | |
| mp_hands = mp.solutions.hands | |
| mp_face_mesh = mp.solutions.face_mesh | |
| # 設定設備 | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"使用設備: {device}") | |
| # 載入標籤映射 | |
| label_to_idx = {'again': 0, 'all': 1, 'apple': 2, 'bad': 3, 'bathroom': 4, 'beautiful': 5, 'bird': 6, 'black': 7, 'blue': 8, 'book': 9, 'bored': 10, 'boy': 11, 'brother': 12, 'brown': 13, 'but': 14, 'computer': 15, 'cousin': 16, 'dance': 17, 'day': 18, 'deaf': 19, 'doctor': 20, 'dog': 21, 'draw': 22, 'drink': 23, 'eat': 24, 'english': 25, 'family': 26, 'father': 27, 'fine': 28, 'finish': 29, 'fish': 30, 'forget': 31, 'friend': 32, 'girl': 33} | |
| idx_to_label = {v: k for k, v in label_to_idx.items()} | |
| class SignLanguageModel(nn.Module): | |
| """Sign Language Recognition Model""" | |
| def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5, flow_dim=10): | |
| super(SignLanguageModel, self).__init__() | |
| self.hidden_dim = hidden_dim | |
| self.num_layers = num_layers | |
| self.num_classes = num_classes | |
| # Keypoint feature projection | |
| self.keypoint_projection = nn.Sequential( | |
| nn.Linear(input_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2), | |
| nn.Linear(hidden_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2) | |
| ) | |
| # Flow feature projection | |
| self.flow_projection = nn.Sequential( | |
| nn.Linear(flow_dim, hidden_dim // 2), | |
| nn.BatchNorm1d(hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2), | |
| nn.Linear(hidden_dim // 2, hidden_dim // 2), | |
| nn.BatchNorm1d(hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2) | |
| ) | |
| # Feature fusion | |
| self.fusion_layer = nn.Sequential( | |
| nn.Linear(hidden_dim + (hidden_dim // 2), hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2) | |
| ) | |
| # Bidirectional LSTM | |
| self.lstm = nn.LSTM( | |
| input_size=hidden_dim, | |
| hidden_size=hidden_dim, | |
| num_layers=num_layers, | |
| batch_first=True, | |
| dropout=dropout if num_layers > 1 else 0, | |
| bidirectional=True | |
| ) | |
| # GRU for additional temporal features | |
| self.gru = nn.GRU( | |
| input_size=hidden_dim * 2, | |
| hidden_size=hidden_dim, | |
| num_layers=1, | |
| batch_first=True, | |
| bidirectional=True | |
| ) | |
| # Batch normalization | |
| self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2) | |
| self.gru_bn = nn.BatchNorm1d(hidden_dim * 2) | |
| # Multi-head attention | |
| self.multihead_attn = nn.MultiheadAttention( | |
| embed_dim=hidden_dim * 2, | |
| num_heads=4, | |
| dropout=dropout, | |
| batch_first=True | |
| ) | |
| # Attention mechanism | |
| self.attention = nn.Sequential( | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.Tanh(), | |
| nn.Linear(hidden_dim, 1), | |
| nn.Softmax(dim=1) | |
| ) | |
| # Classifier | |
| self.classifier = nn.Sequential( | |
| nn.Linear(hidden_dim * 4, hidden_dim * 2), | |
| nn.BatchNorm1d(hidden_dim * 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2), | |
| nn.Linear(hidden_dim, num_classes) | |
| ) | |
| self._init_weights() | |
| def _init_weights(self): | |
| """Initialize model weights""" | |
| for m in self.modules(): | |
| if isinstance(m, nn.Linear): | |
| nn.init.xavier_uniform_(m.weight) | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| elif isinstance(m, (nn.LSTM, nn.GRU)): | |
| for name, param in m.named_parameters(): | |
| if 'weight' in name: | |
| nn.init.orthogonal_(param) | |
| elif 'bias' in name: | |
| nn.init.zeros_(param) | |
| def forward(self, keypoints, flow=None): | |
| """Forward pass""" | |
| batch_size, seq_len, _ = keypoints.size() | |
| # Process keypoint features | |
| kp_reshaped = keypoints.reshape(-1, keypoints.size(-1)) | |
| # First layer | |
| kp_projected = self.keypoint_projection[0](kp_reshaped) | |
| kp_projected = kp_projected.reshape(batch_size, seq_len, -1) | |
| kp_projected = kp_projected.transpose(1, 2) | |
| kp_projected = self.keypoint_projection[1](kp_projected) | |
| kp_projected = kp_projected.transpose(1, 2) | |
| kp_projected = self.keypoint_projection[2](kp_projected) | |
| kp_projected = self.keypoint_projection[3](kp_projected) | |
| # Second layer | |
| kp_projected_reshaped = kp_projected.reshape(-1, kp_projected.size(-1)) | |
| kp_projected = self.keypoint_projection[4](kp_projected_reshaped) | |
| kp_projected = kp_projected.reshape(batch_size, seq_len, -1) | |
| kp_projected = kp_projected.transpose(1, 2) | |
| kp_projected = self.keypoint_projection[5](kp_projected) | |
| kp_projected = kp_projected.transpose(1, 2) | |
| kp_projected = self.keypoint_projection[6](kp_projected) | |
| kp_projected = self.keypoint_projection[7](kp_projected) | |
| # Process flow features if provided | |
| if flow is not None: | |
| flow_reshaped = flow.reshape(-1, flow.size(-1)) | |
| # First layer | |
| flow_projected = self.flow_projection[0](flow_reshaped) | |
| flow_projected = flow_projected.reshape(batch_size, seq_len, -1) | |
| flow_projected = flow_projected.transpose(1, 2) | |
| flow_projected = self.flow_projection[1](flow_projected) | |
| flow_projected = flow_projected.transpose(1, 2) | |
| flow_projected = self.flow_projection[2](flow_projected) | |
| flow_projected = self.flow_projection[3](flow_projected) | |
| # Second layer | |
| flow_projected_reshaped = flow_projected.reshape(-1, flow_projected.size(-1)) | |
| flow_projected = self.flow_projection[4](flow_projected_reshaped) | |
| flow_projected = flow_projected.reshape(batch_size, seq_len, -1) | |
| flow_projected = flow_projected.transpose(1, 2) | |
| flow_projected = self.flow_projection[5](flow_projected) | |
| flow_projected = flow_projected.transpose(1, 2) | |
| flow_projected = self.flow_projection[6](flow_projected) | |
| flow_projected = self.flow_projection[7](flow_projected) | |
| # Feature fusion | |
| combined_features = torch.cat([kp_projected, flow_projected], dim=2) | |
| combined_reshaped = combined_features.reshape(-1, combined_features.size(-1)) | |
| fused_features = self.fusion_layer[0](combined_reshaped) | |
| fused_features = fused_features.reshape(batch_size, seq_len, -1) | |
| fused_features = fused_features.transpose(1, 2) | |
| fused_features = self.fusion_layer[1](fused_features) | |
| fused_features = fused_features.transpose(1, 2) | |
| fused_features = self.fusion_layer[2](fused_features) | |
| fused_features = self.fusion_layer[3](fused_features) | |
| x_projected = fused_features | |
| else: | |
| x_projected = kp_projected | |
| # Residual connection | |
| x_residual = x_projected | |
| # LSTM processing | |
| lstm_out, _ = self.lstm(x_projected) | |
| # Residual connection | |
| x_residual_expanded = torch.cat([x_residual, x_residual], dim=2) | |
| lstm_out_with_residual = lstm_out + x_residual_expanded | |
| # BatchNorm for LSTM output | |
| lstm_out_bn = lstm_out_with_residual.transpose(1, 2) | |
| lstm_out_bn = self.lstm_bn(lstm_out_bn) | |
| lstm_out = lstm_out_bn.transpose(1, 2) | |
| # GRU processing | |
| gru_out, _ = self.gru(lstm_out) | |
| # BatchNorm for GRU output | |
| gru_out_bn = gru_out.transpose(1, 2) | |
| gru_out_bn = self.gru_bn(gru_out_bn) | |
| gru_out = gru_out_bn.transpose(1, 2) | |
| # Multi-head attention | |
| attn_output, _ = self.multihead_attn(lstm_out, lstm_out, lstm_out) | |
| # Traditional attention | |
| attention_weights = self.attention(gru_out) | |
| context_gru = torch.bmm(gru_out.transpose(1, 2), attention_weights) | |
| context_gru = context_gru.squeeze(-1) | |
| attention_weights_attn = self.attention(attn_output) | |
| context_attn = torch.bmm(attn_output.transpose(1, 2), attention_weights_attn) | |
| context_attn = context_attn.squeeze(-1) | |
| # Combine contexts | |
| combined_context = torch.cat([context_gru, context_attn], dim=1) | |
| # Final classification | |
| output = self.classifier(combined_context) | |
| return output | |
| # 初始化模型 | |
| model = SignLanguageModel( | |
| input_dim=225, # keypoint dimension | |
| hidden_dim=256, | |
| num_layers=2, | |
| num_classes=len(label_to_idx), | |
| dropout=0.5, | |
| flow_dim=10 | |
| ) | |
| model = model.to(device) | |
| # 載入模型權重 | |
| model_path = Path("tsflow/models/best_model.pt") | |
| if model_path.exists(): | |
| try: | |
| checkpoint = torch.load(model_path, map_location=device) | |
| if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| else: | |
| model.load_state_dict(checkpoint) | |
| model.eval() | |
| print("✅ 模型載入成功") | |
| except Exception as e: | |
| print(f"❌ 模型載入失敗: {e}") | |
| raise | |
| else: | |
| print(f"❌ 找不到模型檔案: {model_path}") | |
| raise FileNotFoundError(f"模型檔案不存在: {model_path}") | |
| def extract_keypoints_from_frame(frame): | |
| """從單個frame提取關鍵點 - 與訓練時一致""" | |
| try: | |
| with mp.solutions.holistic.Holistic( | |
| static_image_mode=True, | |
| model_complexity=1, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5) as holistic: | |
| # 轉換為RGB格式 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame_rgb.flags.writeable = False | |
| results = holistic.process(frame_rgb) | |
| frame_rgb.flags.writeable = True | |
| keypoints = [] | |
| # 提取手部關鍵點 (左手: 21個點 * 3維 = 63) | |
| if results.left_hand_landmarks: | |
| for landmark in results.left_hand_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (21 * 3)) | |
| # 提取手部關鍵點 (右手: 21個點 * 3維 = 63) | |
| if results.right_hand_landmarks: | |
| for landmark in results.right_hand_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (21 * 3)) | |
| # 提取姿勢關鍵點 (33個點 * 3維 = 99) | |
| if results.pose_landmarks: | |
| for landmark in results.pose_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (33 * 3)) | |
| return np.array(keypoints[:225], dtype=np.float32), results | |
| except Exception as e: | |
| print(f"關鍵點提取錯誤: {e}") | |
| return np.zeros(225, dtype=np.float32), None | |
| def create_hand_mask(frame, left_hand_landmarks, right_hand_landmarks, pose_landmarks): | |
| """創建手部和上半身的ROI遮罩 - 與訓練時一致""" | |
| h, w = frame.shape[:2] | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| def draw_landmarks_on_mask(landmarks, radius=15): | |
| if landmarks: | |
| for landmark in landmarks.landmark: | |
| x, y = int(landmark.x * w), int(landmark.y * h) | |
| if 0 <= x < w and 0 <= y < h: | |
| cv2.circle(mask, (x, y), radius=radius, color=255, thickness=-1) | |
| # 繪製左手關鍵點 | |
| draw_landmarks_on_mask(left_hand_landmarks, radius=20) | |
| # 繪製右手關鍵點 | |
| draw_landmarks_on_mask(right_hand_landmarks, radius=20) | |
| # 只繪製上半身關鍵點 (頭部、肩膀、手臂) | |
| if pose_landmarks: | |
| upper_body_indices = list(range(0, 25)) # 0-24為上半身關鍵點 | |
| for idx in upper_body_indices: | |
| if idx < len(pose_landmarks.landmark): | |
| landmark = pose_landmarks.landmark[idx] | |
| x, y = int(landmark.x * w), int(landmark.y * h) | |
| if 0 <= x < w and 0 <= y < h: | |
| cv2.circle(mask, (x, y), radius=10, color=255, thickness=-1) | |
| # 擴大遮罩區域,使用膨脹操作 | |
| kernel = np.ones((15, 15), np.uint8) | |
| dilated_mask = cv2.dilate(mask, kernel, iterations=1) | |
| return dilated_mask | |
| def compute_regional_optical_flow(prev_frame, curr_frame, mask, downscale=0.5): | |
| """計算區域性光流特徵 - 與訓練時一致""" | |
| try: | |
| # 降低解析度 | |
| if downscale < 1.0: | |
| h, w = prev_frame.shape[:2] | |
| new_h, new_w = int(h * downscale), int(w * downscale) | |
| prev_small = cv2.resize(prev_frame, (new_w, new_h)) | |
| curr_small = cv2.resize(curr_frame, (new_w, new_h)) | |
| mask_small = cv2.resize(mask, (new_w, new_h)) | |
| else: | |
| prev_small = prev_frame | |
| curr_small = curr_frame | |
| mask_small = mask | |
| # 轉換為灰度圖 | |
| prev_gray = cv2.cvtColor(prev_small, cv2.COLOR_BGR2GRAY) | |
| curr_gray = cv2.cvtColor(curr_small, cv2.COLOR_BGR2GRAY) | |
| # 計算遮罩區域的光流 | |
| flow = cv2.calcOpticalFlowFarneback( | |
| prev_gray, curr_gray, | |
| None, # flow | |
| 0.5, # pyr_scale | |
| 3, # levels | |
| 15, # winsize | |
| 3, # iterations | |
| 5, # poly_n | |
| 1.2, # poly_sigma | |
| 0 # flags | |
| ) | |
| # 將mask_small轉換為布爾遮罩 | |
| bool_mask = mask_small > 0 | |
| # 只計算遮罩區域的光流特徵 | |
| if np.any(bool_mask): | |
| # 提取x和y方向的光流 | |
| fx = flow[..., 0][bool_mask] | |
| fy = flow[..., 1][bool_mask] | |
| # 計算統計特徵 | |
| flow_features = np.array([ | |
| np.mean(fx), np.std(fx), | |
| np.mean(fy), np.std(fy), | |
| np.percentile(fx, 25), np.percentile(fx, 75), | |
| np.percentile(fy, 25), np.percentile(fy, 75), | |
| np.max(np.abs(fx)), np.max(np.abs(fy)) | |
| ], dtype=np.float16) | |
| else: | |
| flow_features = np.zeros(10, dtype=np.float16) | |
| return flow_features | |
| except Exception as e: | |
| print(f"區域性光流計算錯誤: {e}") | |
| return np.zeros(10, dtype=np.float16) | |
| def predict_sign_language(video_path): | |
| """預測手語影片""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| frames = [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frames.append(frame) | |
| cap.release() | |
| if len(frames) == 0: | |
| return "錯誤:無法讀取影片幀", 0.0 | |
| # 提取特徵 - 同時獲取關鍵點和MediaPipe結果 | |
| keypoints_sequence = [] | |
| all_results = [] | |
| for frame in frames: | |
| keypoints, results = extract_keypoints_from_frame(frame) | |
| keypoints_sequence.append(keypoints) | |
| all_results.append(results) | |
| # 計算每一幀的光流特徵 | |
| flow_features = [] | |
| for i in range(len(frames) - 1): | |
| # 使用當前幀的MediaPipe結果創建遮罩 | |
| current_results = all_results[i] | |
| if current_results is not None: | |
| mask = create_hand_mask( | |
| frames[i], | |
| current_results.left_hand_landmarks, | |
| current_results.right_hand_landmarks, | |
| current_results.pose_landmarks | |
| ) | |
| else: | |
| # 如果沒有MediaPipe結果,創建空遮罩 | |
| h, w = frames[i].shape[:2] | |
| mask = np.zeros((h, w), dtype=np.uint8) | |
| flow = compute_regional_optical_flow(frames[i], frames[i + 1], mask) | |
| flow_features.append(flow) | |
| # 確保光流特徵的幀數與關鍵點一致 | |
| if len(flow_features) < len(keypoints_sequence): | |
| # 如果光流特徵少於關鍵點幀數,複製最後一個光流特徵 | |
| while len(flow_features) < len(keypoints_sequence): | |
| if flow_features: | |
| flow_features.append(flow_features[-1]) | |
| else: | |
| flow_features.append(np.zeros(10, dtype=np.float16)) | |
| # 確保序列長度為50 (與訓練時一致) | |
| target_length = 50 | |
| if len(keypoints_sequence) > target_length: | |
| # 均勻採樣關鍵點和光流特徵 | |
| indices = np.linspace(0, len(keypoints_sequence) - 1, target_length, dtype=int) | |
| keypoints_sequence = [keypoints_sequence[i] for i in indices] | |
| flow_features = [flow_features[min(i, len(flow_features)-1)] for i in indices] | |
| elif len(keypoints_sequence) < target_length: | |
| # 重複最後一幀 | |
| while len(keypoints_sequence) < target_length: | |
| if keypoints_sequence: | |
| keypoints_sequence.append(keypoints_sequence[-1]) | |
| flow_features.append(flow_features[-1] if flow_features else np.zeros(10, dtype=np.float16)) | |
| else: | |
| keypoints_sequence.append(np.zeros(225, dtype=np.float32)) | |
| flow_features.append(np.zeros(10, dtype=np.float16)) | |
| # 轉換為numpy數組再轉為tensor (避免警告) | |
| keypoints_array = np.array(keypoints_sequence, dtype=np.float32) | |
| flow_array = np.array(flow_features, dtype=np.float32) | |
| keypoints_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(device) | |
| flow_tensor = torch.from_numpy(flow_array).unsqueeze(0).to(device) | |
| print(f"關鍵點張量形狀: {keypoints_tensor.shape}") | |
| print(f"光流張量形狀: {flow_tensor.shape}") | |
| with torch.no_grad(): | |
| outputs = model(keypoints_tensor, flow_tensor) | |
| probabilities = torch.softmax(outputs, dim=1) | |
| predicted_class = torch.argmax(probabilities, dim=1).item() | |
| confidence = probabilities[0][predicted_class].item() | |
| predicted_label = idx_to_label.get(predicted_class, "未知") | |
| return f"預測結果: {predicted_label}", confidence | |
| except Exception as e: | |
| print(f"預測錯誤: {e}") | |
| return f"預測失敗: {str(e)}", 0.0 | |
| def gradio_predict(video): | |
| """Gradio介面的預測函數""" | |
| if video is None: | |
| return "請上傳影片", "信心度: 0%" | |
| try: | |
| result, confidence = predict_sign_language(video) | |
| confidence_text = f"信心度: {confidence:.2%}" | |
| return result, confidence_text | |
| except Exception as e: | |
| return f"處理錯誤: {str(e)}", "信心度: 0%" | |
| # 建立Gradio介面 | |
| demo = gr.Interface( | |
| fn=gradio_predict, | |
| inputs=gr.Video(label="上傳手語影片"), | |
| outputs=[ | |
| gr.Textbox(label="預測結果"), | |
| gr.Textbox(label="信心度") | |
| ], | |
| title="🤟 SignView2.0 - 手語辨識系統", | |
| description=""" | |
| ### 歡迎使用 SignView2.0 手語辨識系統! | |
| **系統特色:** | |
| - 🎯 準確率:94.25% | |
| - 📚 支援34種手語詞彙 | |
| - 🧠 使用BiLSTM + GRU + 多頭注意力機制 | |
| - 👁️ MediaPipe + 光流特徵融合 | |
| **使用方法:** | |
| 1. 上傳手語影片(建議3-4秒) | |
| 2. 點擊提交進行辨識 | |
| 3. 查看預測結果和信心度 | |
| **支援詞彙:** again, all, apple, bad, bathroom, beautiful, bird, black, blue, book, bored, boy, brother, brown, but, computer, cousin, dance, day, deaf, doctor, dog, draw, drink, eat, english, family, father, fine, finish, fish, forget, friend, girl | |
| """, | |
| examples=[] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |