SignView2.0 / app.py
XiaoBai1221's picture
🎯 修正特徵提取以匹配訓練時的方法
de897a8
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import gradio as gr
from pathlib import Path
import mediapipe as mp
import json
# MediaPipe設定
mp_pose = mp.solutions.pose
mp_hands = mp.solutions.hands
mp_face_mesh = mp.solutions.face_mesh
# 設定設備
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用設備: {device}")
# 載入標籤映射
label_to_idx = {'again': 0, 'all': 1, 'apple': 2, 'bad': 3, 'bathroom': 4, 'beautiful': 5, 'bird': 6, 'black': 7, 'blue': 8, 'book': 9, 'bored': 10, 'boy': 11, 'brother': 12, 'brown': 13, 'but': 14, 'computer': 15, 'cousin': 16, 'dance': 17, 'day': 18, 'deaf': 19, 'doctor': 20, 'dog': 21, 'draw': 22, 'drink': 23, 'eat': 24, 'english': 25, 'family': 26, 'father': 27, 'fine': 28, 'finish': 29, 'fish': 30, 'forget': 31, 'friend': 32, 'girl': 33}
idx_to_label = {v: k for k, v in label_to_idx.items()}
class SignLanguageModel(nn.Module):
"""Sign Language Recognition Model"""
def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5, flow_dim=10):
super(SignLanguageModel, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.num_classes = num_classes
# Keypoint feature projection
self.keypoint_projection = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout/2),
nn.Linear(hidden_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout/2)
)
# Flow feature projection
self.flow_projection = nn.Sequential(
nn.Linear(flow_dim, hidden_dim // 2),
nn.BatchNorm1d(hidden_dim // 2),
nn.ReLU(),
nn.Dropout(dropout/2),
nn.Linear(hidden_dim // 2, hidden_dim // 2),
nn.BatchNorm1d(hidden_dim // 2),
nn.ReLU(),
nn.Dropout(dropout/2)
)
# Feature fusion
self.fusion_layer = nn.Sequential(
nn.Linear(hidden_dim + (hidden_dim // 2), hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout/2)
)
# Bidirectional LSTM
self.lstm = nn.LSTM(
input_size=hidden_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=True
)
# GRU for additional temporal features
self.gru = nn.GRU(
input_size=hidden_dim * 2,
hidden_size=hidden_dim,
num_layers=1,
batch_first=True,
bidirectional=True
)
# Batch normalization
self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2)
self.gru_bn = nn.BatchNorm1d(hidden_dim * 2)
# Multi-head attention
self.multihead_attn = nn.MultiheadAttention(
embed_dim=hidden_dim * 2,
num_heads=4,
dropout=dropout,
batch_first=True
)
# Attention mechanism
self.attention = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1),
nn.Softmax(dim=1)
)
# Classifier
self.classifier = nn.Sequential(
nn.Linear(hidden_dim * 4, hidden_dim * 2),
nn.BatchNorm1d(hidden_dim * 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim * 2, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout/2),
nn.Linear(hidden_dim, num_classes)
)
self._init_weights()
def _init_weights(self):
"""Initialize model weights"""
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.LSTM, nn.GRU)):
for name, param in m.named_parameters():
if 'weight' in name:
nn.init.orthogonal_(param)
elif 'bias' in name:
nn.init.zeros_(param)
def forward(self, keypoints, flow=None):
"""Forward pass"""
batch_size, seq_len, _ = keypoints.size()
# Process keypoint features
kp_reshaped = keypoints.reshape(-1, keypoints.size(-1))
# First layer
kp_projected = self.keypoint_projection[0](kp_reshaped)
kp_projected = kp_projected.reshape(batch_size, seq_len, -1)
kp_projected = kp_projected.transpose(1, 2)
kp_projected = self.keypoint_projection[1](kp_projected)
kp_projected = kp_projected.transpose(1, 2)
kp_projected = self.keypoint_projection[2](kp_projected)
kp_projected = self.keypoint_projection[3](kp_projected)
# Second layer
kp_projected_reshaped = kp_projected.reshape(-1, kp_projected.size(-1))
kp_projected = self.keypoint_projection[4](kp_projected_reshaped)
kp_projected = kp_projected.reshape(batch_size, seq_len, -1)
kp_projected = kp_projected.transpose(1, 2)
kp_projected = self.keypoint_projection[5](kp_projected)
kp_projected = kp_projected.transpose(1, 2)
kp_projected = self.keypoint_projection[6](kp_projected)
kp_projected = self.keypoint_projection[7](kp_projected)
# Process flow features if provided
if flow is not None:
flow_reshaped = flow.reshape(-1, flow.size(-1))
# First layer
flow_projected = self.flow_projection[0](flow_reshaped)
flow_projected = flow_projected.reshape(batch_size, seq_len, -1)
flow_projected = flow_projected.transpose(1, 2)
flow_projected = self.flow_projection[1](flow_projected)
flow_projected = flow_projected.transpose(1, 2)
flow_projected = self.flow_projection[2](flow_projected)
flow_projected = self.flow_projection[3](flow_projected)
# Second layer
flow_projected_reshaped = flow_projected.reshape(-1, flow_projected.size(-1))
flow_projected = self.flow_projection[4](flow_projected_reshaped)
flow_projected = flow_projected.reshape(batch_size, seq_len, -1)
flow_projected = flow_projected.transpose(1, 2)
flow_projected = self.flow_projection[5](flow_projected)
flow_projected = flow_projected.transpose(1, 2)
flow_projected = self.flow_projection[6](flow_projected)
flow_projected = self.flow_projection[7](flow_projected)
# Feature fusion
combined_features = torch.cat([kp_projected, flow_projected], dim=2)
combined_reshaped = combined_features.reshape(-1, combined_features.size(-1))
fused_features = self.fusion_layer[0](combined_reshaped)
fused_features = fused_features.reshape(batch_size, seq_len, -1)
fused_features = fused_features.transpose(1, 2)
fused_features = self.fusion_layer[1](fused_features)
fused_features = fused_features.transpose(1, 2)
fused_features = self.fusion_layer[2](fused_features)
fused_features = self.fusion_layer[3](fused_features)
x_projected = fused_features
else:
x_projected = kp_projected
# Residual connection
x_residual = x_projected
# LSTM processing
lstm_out, _ = self.lstm(x_projected)
# Residual connection
x_residual_expanded = torch.cat([x_residual, x_residual], dim=2)
lstm_out_with_residual = lstm_out + x_residual_expanded
# BatchNorm for LSTM output
lstm_out_bn = lstm_out_with_residual.transpose(1, 2)
lstm_out_bn = self.lstm_bn(lstm_out_bn)
lstm_out = lstm_out_bn.transpose(1, 2)
# GRU processing
gru_out, _ = self.gru(lstm_out)
# BatchNorm for GRU output
gru_out_bn = gru_out.transpose(1, 2)
gru_out_bn = self.gru_bn(gru_out_bn)
gru_out = gru_out_bn.transpose(1, 2)
# Multi-head attention
attn_output, _ = self.multihead_attn(lstm_out, lstm_out, lstm_out)
# Traditional attention
attention_weights = self.attention(gru_out)
context_gru = torch.bmm(gru_out.transpose(1, 2), attention_weights)
context_gru = context_gru.squeeze(-1)
attention_weights_attn = self.attention(attn_output)
context_attn = torch.bmm(attn_output.transpose(1, 2), attention_weights_attn)
context_attn = context_attn.squeeze(-1)
# Combine contexts
combined_context = torch.cat([context_gru, context_attn], dim=1)
# Final classification
output = self.classifier(combined_context)
return output
# 初始化模型
model = SignLanguageModel(
input_dim=225, # keypoint dimension
hidden_dim=256,
num_layers=2,
num_classes=len(label_to_idx),
dropout=0.5,
flow_dim=10
)
model = model.to(device)
# 載入模型權重
model_path = Path("tsflow/models/best_model.pt")
if model_path.exists():
try:
checkpoint = torch.load(model_path, map_location=device)
if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
model.load_state_dict(checkpoint['model_state_dict'])
else:
model.load_state_dict(checkpoint)
model.eval()
print("✅ 模型載入成功")
except Exception as e:
print(f"❌ 模型載入失敗: {e}")
raise
else:
print(f"❌ 找不到模型檔案: {model_path}")
raise FileNotFoundError(f"模型檔案不存在: {model_path}")
def extract_keypoints_from_frame(frame):
"""從單個frame提取關鍵點 - 與訓練時一致"""
try:
with mp.solutions.holistic.Holistic(
static_image_mode=True,
model_complexity=1,
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as holistic:
# 轉換為RGB格式
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_rgb.flags.writeable = False
results = holistic.process(frame_rgb)
frame_rgb.flags.writeable = True
keypoints = []
# 提取手部關鍵點 (左手: 21個點 * 3維 = 63)
if results.left_hand_landmarks:
for landmark in results.left_hand_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (21 * 3))
# 提取手部關鍵點 (右手: 21個點 * 3維 = 63)
if results.right_hand_landmarks:
for landmark in results.right_hand_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (21 * 3))
# 提取姿勢關鍵點 (33個點 * 3維 = 99)
if results.pose_landmarks:
for landmark in results.pose_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (33 * 3))
return np.array(keypoints[:225], dtype=np.float32), results
except Exception as e:
print(f"關鍵點提取錯誤: {e}")
return np.zeros(225, dtype=np.float32), None
def create_hand_mask(frame, left_hand_landmarks, right_hand_landmarks, pose_landmarks):
"""創建手部和上半身的ROI遮罩 - 與訓練時一致"""
h, w = frame.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
def draw_landmarks_on_mask(landmarks, radius=15):
if landmarks:
for landmark in landmarks.landmark:
x, y = int(landmark.x * w), int(landmark.y * h)
if 0 <= x < w and 0 <= y < h:
cv2.circle(mask, (x, y), radius=radius, color=255, thickness=-1)
# 繪製左手關鍵點
draw_landmarks_on_mask(left_hand_landmarks, radius=20)
# 繪製右手關鍵點
draw_landmarks_on_mask(right_hand_landmarks, radius=20)
# 只繪製上半身關鍵點 (頭部、肩膀、手臂)
if pose_landmarks:
upper_body_indices = list(range(0, 25)) # 0-24為上半身關鍵點
for idx in upper_body_indices:
if idx < len(pose_landmarks.landmark):
landmark = pose_landmarks.landmark[idx]
x, y = int(landmark.x * w), int(landmark.y * h)
if 0 <= x < w and 0 <= y < h:
cv2.circle(mask, (x, y), radius=10, color=255, thickness=-1)
# 擴大遮罩區域,使用膨脹操作
kernel = np.ones((15, 15), np.uint8)
dilated_mask = cv2.dilate(mask, kernel, iterations=1)
return dilated_mask
def compute_regional_optical_flow(prev_frame, curr_frame, mask, downscale=0.5):
"""計算區域性光流特徵 - 與訓練時一致"""
try:
# 降低解析度
if downscale < 1.0:
h, w = prev_frame.shape[:2]
new_h, new_w = int(h * downscale), int(w * downscale)
prev_small = cv2.resize(prev_frame, (new_w, new_h))
curr_small = cv2.resize(curr_frame, (new_w, new_h))
mask_small = cv2.resize(mask, (new_w, new_h))
else:
prev_small = prev_frame
curr_small = curr_frame
mask_small = mask
# 轉換為灰度圖
prev_gray = cv2.cvtColor(prev_small, cv2.COLOR_BGR2GRAY)
curr_gray = cv2.cvtColor(curr_small, cv2.COLOR_BGR2GRAY)
# 計算遮罩區域的光流
flow = cv2.calcOpticalFlowFarneback(
prev_gray, curr_gray,
None, # flow
0.5, # pyr_scale
3, # levels
15, # winsize
3, # iterations
5, # poly_n
1.2, # poly_sigma
0 # flags
)
# 將mask_small轉換為布爾遮罩
bool_mask = mask_small > 0
# 只計算遮罩區域的光流特徵
if np.any(bool_mask):
# 提取x和y方向的光流
fx = flow[..., 0][bool_mask]
fy = flow[..., 1][bool_mask]
# 計算統計特徵
flow_features = np.array([
np.mean(fx), np.std(fx),
np.mean(fy), np.std(fy),
np.percentile(fx, 25), np.percentile(fx, 75),
np.percentile(fy, 25), np.percentile(fy, 75),
np.max(np.abs(fx)), np.max(np.abs(fy))
], dtype=np.float16)
else:
flow_features = np.zeros(10, dtype=np.float16)
return flow_features
except Exception as e:
print(f"區域性光流計算錯誤: {e}")
return np.zeros(10, dtype=np.float16)
def predict_sign_language(video_path):
"""預測手語影片"""
try:
cap = cv2.VideoCapture(video_path)
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
cap.release()
if len(frames) == 0:
return "錯誤:無法讀取影片幀", 0.0
# 提取特徵 - 同時獲取關鍵點和MediaPipe結果
keypoints_sequence = []
all_results = []
for frame in frames:
keypoints, results = extract_keypoints_from_frame(frame)
keypoints_sequence.append(keypoints)
all_results.append(results)
# 計算每一幀的光流特徵
flow_features = []
for i in range(len(frames) - 1):
# 使用當前幀的MediaPipe結果創建遮罩
current_results = all_results[i]
if current_results is not None:
mask = create_hand_mask(
frames[i],
current_results.left_hand_landmarks,
current_results.right_hand_landmarks,
current_results.pose_landmarks
)
else:
# 如果沒有MediaPipe結果,創建空遮罩
h, w = frames[i].shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
flow = compute_regional_optical_flow(frames[i], frames[i + 1], mask)
flow_features.append(flow)
# 確保光流特徵的幀數與關鍵點一致
if len(flow_features) < len(keypoints_sequence):
# 如果光流特徵少於關鍵點幀數,複製最後一個光流特徵
while len(flow_features) < len(keypoints_sequence):
if flow_features:
flow_features.append(flow_features[-1])
else:
flow_features.append(np.zeros(10, dtype=np.float16))
# 確保序列長度為50 (與訓練時一致)
target_length = 50
if len(keypoints_sequence) > target_length:
# 均勻採樣關鍵點和光流特徵
indices = np.linspace(0, len(keypoints_sequence) - 1, target_length, dtype=int)
keypoints_sequence = [keypoints_sequence[i] for i in indices]
flow_features = [flow_features[min(i, len(flow_features)-1)] for i in indices]
elif len(keypoints_sequence) < target_length:
# 重複最後一幀
while len(keypoints_sequence) < target_length:
if keypoints_sequence:
keypoints_sequence.append(keypoints_sequence[-1])
flow_features.append(flow_features[-1] if flow_features else np.zeros(10, dtype=np.float16))
else:
keypoints_sequence.append(np.zeros(225, dtype=np.float32))
flow_features.append(np.zeros(10, dtype=np.float16))
# 轉換為numpy數組再轉為tensor (避免警告)
keypoints_array = np.array(keypoints_sequence, dtype=np.float32)
flow_array = np.array(flow_features, dtype=np.float32)
keypoints_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(device)
flow_tensor = torch.from_numpy(flow_array).unsqueeze(0).to(device)
print(f"關鍵點張量形狀: {keypoints_tensor.shape}")
print(f"光流張量形狀: {flow_tensor.shape}")
with torch.no_grad():
outputs = model(keypoints_tensor, flow_tensor)
probabilities = torch.softmax(outputs, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
confidence = probabilities[0][predicted_class].item()
predicted_label = idx_to_label.get(predicted_class, "未知")
return f"預測結果: {predicted_label}", confidence
except Exception as e:
print(f"預測錯誤: {e}")
return f"預測失敗: {str(e)}", 0.0
def gradio_predict(video):
"""Gradio介面的預測函數"""
if video is None:
return "請上傳影片", "信心度: 0%"
try:
result, confidence = predict_sign_language(video)
confidence_text = f"信心度: {confidence:.2%}"
return result, confidence_text
except Exception as e:
return f"處理錯誤: {str(e)}", "信心度: 0%"
# 建立Gradio介面
demo = gr.Interface(
fn=gradio_predict,
inputs=gr.Video(label="上傳手語影片"),
outputs=[
gr.Textbox(label="預測結果"),
gr.Textbox(label="信心度")
],
title="🤟 SignView2.0 - 手語辨識系統",
description="""
### 歡迎使用 SignView2.0 手語辨識系統!
**系統特色:**
- 🎯 準確率:94.25%
- 📚 支援34種手語詞彙
- 🧠 使用BiLSTM + GRU + 多頭注意力機制
- 👁️ MediaPipe + 光流特徵融合
**使用方法:**
1. 上傳手語影片(建議3-4秒)
2. 點擊提交進行辨識
3. 查看預測結果和信心度
**支援詞彙:** again, all, apple, bad, bathroom, beautiful, bird, black, blue, book, bored, boy, brother, brown, but, computer, cousin, dance, day, deaf, doctor, dog, draw, drink, eat, english, family, father, fine, finish, fish, forget, friend, girl
""",
examples=[]
)
if __name__ == "__main__":
demo.launch()