Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| import mediapipe as mp | |
| from openai import OpenAI | |
| # -------------------- | |
| # 特徵提取模塊 | |
| # -------------------- | |
| class FeatureExtractor: | |
| def __init__(self): | |
| self.mp_holistic = mp.solutions.holistic | |
| def extract_pose_keypoints(self, frame, holistic_results): | |
| keypoints = [] | |
| if holistic_results.left_hand_landmarks: | |
| for landmark in holistic_results.left_hand_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (21 * 3)) | |
| if holistic_results.right_hand_landmarks: | |
| for landmark in holistic_results.right_hand_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (21 * 3)) | |
| if holistic_results.pose_landmarks: | |
| for landmark in holistic_results.pose_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (33 * 3)) | |
| return np.array(keypoints) | |
| # -------------------- | |
| # 模型架構 | |
| # -------------------- | |
| class SignLanguageModel(nn.Module): | |
| """ | |
| 手語辨識模型,使用雙向LSTM和注意力機制,加入批量標準化和殘差連接 | |
| """ | |
| def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5): | |
| super(SignLanguageModel, self).__init__() | |
| self.hidden_dim = hidden_dim | |
| self.num_layers = num_layers | |
| # 特徵投影層,將輸入映射到統一維度 | |
| self.feature_projection = nn.Sequential( | |
| nn.Linear(input_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2) | |
| ) | |
| # 雙向LSTM層 | |
| self.lstm = nn.LSTM( | |
| input_size=hidden_dim, | |
| hidden_size=hidden_dim, | |
| num_layers=num_layers, | |
| batch_first=True, | |
| dropout=dropout if num_layers > 1 else 0, | |
| bidirectional=True | |
| ) | |
| # 批量標準化層(用於規範化LSTM輸出) | |
| self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2) | |
| # 注意力機制 | |
| self.attention = nn.Sequential( | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.Tanh(), | |
| nn.Linear(hidden_dim, 1), | |
| nn.Softmax(dim=1) | |
| ) | |
| # 分類器 | |
| self.classifier = nn.Sequential( | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(hidden_dim, hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2), | |
| nn.Linear(hidden_dim // 2, num_classes) | |
| ) | |
| def forward(self, x): | |
| batch_size, seq_len, _ = x.size() | |
| # 特徵投影 | |
| x_reshaped = x.reshape(-1, x.size(-1)) | |
| x_projected_linear = self.feature_projection[0](x_reshaped) | |
| x_projected_reshaped = x_projected_linear.reshape(batch_size, seq_len, -1) | |
| x_projected_transposed = x_projected_reshaped.transpose(1, 2) | |
| x_projected_bn = self.feature_projection[1](x_projected_transposed) | |
| x_projected_transposed_back = x_projected_bn.transpose(1, 2) | |
| x_projected = self.feature_projection[2](x_projected_transposed_back) | |
| x_projected = self.feature_projection[3](x_projected) | |
| # LSTM處理 | |
| lstm_out, _ = self.lstm(x_projected) | |
| # 對LSTM輸出應用BatchNorm | |
| lstm_out_bn = self.lstm_bn(lstm_out.transpose(1, 2)).transpose(1, 2) | |
| # 注意力權重計算 | |
| attention_weights = self.attention(lstm_out_bn) | |
| # 應用注意力機制 | |
| context = torch.bmm(lstm_out_bn.transpose(1, 2), attention_weights).squeeze(-1) | |
| # 最終分類 | |
| output = self.classifier(context) | |
| return output | |
| # -------------------- | |
| # 影片辨識器 | |
| # -------------------- | |
| class VideoSignLanguageRecognizer: | |
| def __init__(self, model_path, threshold=0.7): | |
| self.model_path = model_path | |
| self.threshold = threshold | |
| self.feature_extractor = FeatureExtractor() | |
| self.label_map = self._load_label_mapping() | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.model = self._load_model() | |
| try: | |
| self.openai_client = OpenAI() | |
| except Exception as e: | |
| print(f"初始化OpenAI客户端出錯: {e}") | |
| self.openai_client = None | |
| print(f"影片辨識器初始化完成!使用設備: {self.device}") | |
| def _load_label_mapping(self): | |
| label_map = {} | |
| labels_file = "labels.csv" | |
| if os.path.exists(labels_file): | |
| try: | |
| df = pd.read_csv(labels_file) | |
| for _, row in df.iterrows(): | |
| label_map[int(row['index'])] = row['label'] | |
| print(f"✅ 從 {labels_file} 載入了 {len(label_map)} 個類別標籤") | |
| except Exception as e: | |
| print(f"❌ 讀取 labels.csv 出錯: {e}") | |
| else: | |
| print(f"⚠️ 找不到標籤檔案: {labels_file},將使用空映射。") | |
| return label_map | |
| def _load_model(self): | |
| # The parameters here must match the original model's training parameters | |
| num_classes = len(self.label_map) if self.label_map else 4 # Fallback to 4 classes if no labels.csv | |
| input_dim = 225 | |
| hidden_dim = 96 # Adjusted to match original model | |
| num_layers = 2 | |
| model = SignLanguageModel( | |
| input_dim=input_dim, | |
| hidden_dim=hidden_dim, | |
| num_layers=num_layers, | |
| num_classes=num_classes | |
| ) | |
| if os.path.exists(self.model_path): | |
| try: | |
| model.load_state_dict(torch.load(self.model_path, map_location=self.device)) | |
| model.to(self.device) | |
| model.eval() | |
| print(f"✅ 模型成功從 {self.model_path} 載入") | |
| return model | |
| except Exception as e: | |
| print(f"❌ 載入模型權重時出錯: {e}") | |
| else: | |
| print(f"⚠️ 找不到模型檔案: {self.model_path}") | |
| return None | |
| def process_video(self, video_path): | |
| if not self.model: | |
| return {"status": "error", "message": "模型未成功載入"} | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return {"status": "error", "message": f"無法打開影片檔案: {video_path}"} | |
| all_keypoints = [] | |
| with self.feature_extractor.mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic: | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: break | |
| keypoints = self._extract_features(frame, holistic) | |
| all_keypoints.append(keypoints) | |
| cap.release() | |
| if not all_keypoints: | |
| return {"recognition_result": "影片中未偵測到有效動作。", "confidence": 0} | |
| keypoints_sequence = np.array(all_keypoints) | |
| word_sequence, confidence = self._predict_from_sequence(keypoints_sequence) | |
| if self.openai_client and word_sequence: | |
| final_sentence = self._generate_sentence_with_gpt(word_sequence) | |
| else: | |
| final_sentence = " ".join(word_sequence) if word_sequence else "無法辨識" | |
| return {"recognition_result": final_sentence, "confidence": confidence} | |
| def _extract_features(self, frame, holistic): | |
| image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| image.flags.writeable = False | |
| results = holistic.process(image) | |
| image.flags.writeable = True | |
| return self.feature_extractor.extract_pose_keypoints(frame, results) | |
| def _predict_from_sequence(self, keypoints_sequence): | |
| WINDOW_SIZE = 30 | |
| STRIDE = 10 | |
| if len(keypoints_sequence) < WINDOW_SIZE: | |
| return self._single_prediction(keypoints_sequence) | |
| predictions = [] | |
| for i in range(0, len(keypoints_sequence) - WINDOW_SIZE + 1, STRIDE): | |
| window = keypoints_sequence[i:i + WINDOW_SIZE] | |
| prediction, confidence = self._predict_single_window(window) | |
| if prediction is not None: | |
| predictions.append({"word": prediction, "confidence": confidence}) | |
| if not predictions: return [], 0 | |
| processed_words, avg_confidence = self._post_process_predictions(predictions) | |
| return processed_words, avg_confidence | |
| def _single_prediction(self, keypoints_sequence): | |
| if len(keypoints_sequence) == 0: return None, 0.0 | |
| padded_sequence = self._normalize_sequence_length(keypoints_sequence, 30) | |
| return self._predict_single_window(padded_sequence) | |
| def _predict_single_window(self, window_sequence): | |
| sequence_tensor = torch.tensor(window_sequence, dtype=torch.float32).unsqueeze(0).to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.model(sequence_tensor) | |
| probabilities = torch.softmax(outputs, dim=1) | |
| confidence, predicted_idx = torch.max(probabilities, 1) | |
| predicted_label = self.label_map.get(predicted_idx.item()) | |
| if confidence.item() > self.threshold: | |
| return predicted_label, confidence.item() | |
| return None, 0.0 | |
| def _normalize_sequence_length(self, sequence, target_length): | |
| current_length = len(sequence) | |
| if current_length == 0: | |
| return np.zeros((target_length, sequence.shape[1] if len(sequence.shape)>1 else 225)) | |
| if current_length > target_length: | |
| return sequence[:target_length] | |
| else: | |
| padding = np.zeros((target_length - current_length, sequence.shape[1])) | |
| return np.vstack((sequence, padding)) | |
| def _post_process_predictions(self, predictions): | |
| if not predictions: return [], 0.0 | |
| # Simple deduplication | |
| final_words = [] | |
| if predictions: | |
| final_words.append(predictions[0]['word']) | |
| for i in range(1, len(predictions)): | |
| if predictions[i]['word'] != predictions[i-1]['word']: | |
| final_words.append(predictions[i]['word']) | |
| total_confidence = sum(p['confidence'] for p in predictions) | |
| avg_confidence = total_confidence / len(predictions) if predictions else 0.0 | |
| return final_words, avg_confidence | |
| def _generate_sentence_with_gpt(self, word_sequence): | |
| if not self.openai_client: return " ".join(word_sequence) | |
| prompt = (f"你是一個手語翻譯專家。請將以下由獨立單詞組成的序列轉換成一句通順、完整的台灣繁體中文句子。" | |
| f"原始單詞序列: [{', '.join(word_sequence)}]\n\n翻譯後的句子:") | |
| try: | |
| response = self.openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.5, | |
| max_tokens=150 | |
| ) | |
| sentence = response.choices[0].message.content.strip() | |
| print(f"🤖 GPT生成句子: {sentence}") | |
| return sentence | |
| except Exception as e: | |
| print(f"❌ GPT API 調用失敗: {e}") | |
| return " ".join(word_sequence) |