#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import hmac import hashlib import json import requests import cv2 import numpy as np import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F import base64 import threading import time import mediapipe as mp import collections from flask import Flask, request, jsonify, render_template, Response from werkzeug.utils import secure_filename from datetime import datetime from flask_socketio import SocketIO, emit from openai import OpenAI from app_config import get_config # 選擇 SocketIO 執行模式(優先使用 eventlet) ASYNC_MODE = os.environ.get('SOCKETIO_ASYNC_MODE', 'auto') try: import eventlet if ASYNC_MODE in ('auto', 'eventlet'): eventlet.monkey_patch() ASYNC_MODE = 'eventlet' except Exception: ASYNC_MODE = 'threading' # 環境變數設定 # OpenAI API KEY 應該從環境變數獲取,不要硬編碼 # 請在 HuggingFace Spaces 設定中添加 OPENAI_API_KEY 環境變數 # 設定環境變數避免權限問題和減少日誌 os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib' os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 減少TensorFlow日誌 os.environ['MEDIAPIPE_DISABLE_GPU'] = '1' # 禁用GPU避免警告 # 避免 eventlet greendns 造成外部連線(如 OpenAI)解析問題 os.environ.setdefault('EVENTLET_NO_GREENDNS', 'yes') # 環境檢測 IS_HUGGINGFACE = os.environ.get('SPACE_ID') is not None IS_LOCAL_DEV = not IS_HUGGINGFACE # 載入集中設定 CONFIG = get_config() # Flask 應用初始化 app = Flask(__name__) app.config['SECRET_KEY'] = 'sign_language_secret_key' app.config['MAX_CONTENT_LENGTH'] = CONFIG.get('MAX_FILE_SIZE', 100 * 1024 * 1024) # 100MB max file size socketio = SocketIO(app, cors_allowed_origins="*", async_mode=ASYNC_MODE) # Messenger Bot 設定 VERIFY_TOKEN = CONFIG.get('VERIFY_TOKEN', 'your_verify_token') PAGE_ACCESS_TOKEN = CONFIG.get('PAGE_ACCESS_TOKEN', 'your_page_access_token') APP_SECRET = CONFIG.get('APP_SECRET') FACEBOOK_API_URL = 'https://graph.facebook.com/v18.0/me/messages' # 路徑設定 - 適應不同環境 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(BASE_DIR, 'data') MODEL_PATH = os.path.join(DATA_DIR, 'models', 'sign_language_model.pth') LABELS_PATH = os.path.join(DATA_DIR, 'labels.csv') UPLOAD_FOLDER = os.path.join(BASE_DIR, 'uploads') # 建立必要資料夾 for folder in [UPLOAD_FOLDER, os.path.join(DATA_DIR, 'models'), os.path.join(DATA_DIR, 'features', 'keypoints')]: os.makedirs(folder, exist_ok=True) # 全域變數 camera = None recognizer = None is_running = False frame_lock = threading.Lock() current_frame = None print(f"🌍 運行環境: {'HuggingFace Spaces' if IS_HUGGINGFACE else '本地開發'}") print(f"📁 基礎目錄: {BASE_DIR}") print(f"🤖 模型路徑: {MODEL_PATH}") print(f"📊 標籤路徑: {LABELS_PATH}") #-------------------- # AI 模型類別 #-------------------- class FeatureExtractor: def __init__(self): # 初始化MediaPipe模型 self.mp_holistic = mp.solutions.holistic self.mp_drawing = mp.solutions.drawing_utils self.mp_drawing_styles = mp.solutions.drawing_styles # 建立長駐的 Holistic 實例(避免每幀重建導致效能低落) self.holistic = self.mp_holistic.Holistic( static_image_mode=False, model_complexity=1, smooth_landmarks=True, enable_segmentation=False, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) def close(self): try: if self.holistic: self.holistic.close() except Exception: pass def extract_pose_keypoints(self, frame, holistic_results): """提取骨架關鍵點""" keypoints = [] # 提取手部關鍵點 (如果檢測到) if holistic_results.left_hand_landmarks: for landmark in holistic_results.left_hand_landmarks.landmark: keypoints.extend([landmark.x, landmark.y, landmark.z]) else: # 如果沒有檢測到手,填充0 keypoints.extend([0] * (21 * 3)) if holistic_results.right_hand_landmarks: for landmark in holistic_results.right_hand_landmarks.landmark: keypoints.extend([landmark.x, landmark.y, landmark.z]) else: keypoints.extend([0] * (21 * 3)) # 提取姿勢關鍵點 if holistic_results.pose_landmarks: for landmark in holistic_results.pose_landmarks.landmark: keypoints.extend([landmark.x, landmark.y, landmark.z]) else: keypoints.extend([0] * (33 * 3)) return np.array(keypoints) class SignLanguageModel(nn.Module): """ 手語辨識模型,使用雙向LSTM和注意力機制,加入批量標準化和殘差連接 """ def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5): super(SignLanguageModel, self).__init__() self.hidden_dim = hidden_dim self.num_layers = num_layers self.num_classes = num_classes # 特徵投影層,將輸入映射到統一維度 self.feature_projection = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Dropout(dropout/2) # 較輕的dropout ) # 雙向LSTM層 self.lstm = nn.LSTM( input_size=hidden_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True ) # 批量標準化層(用於規範化LSTM輸出) self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2) # 注意力機制 self.attention = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, 1), nn.Softmax(dim=1) ) # 分類器 self.classifier = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim, hidden_dim // 2), nn.ReLU(), nn.Dropout(dropout/2), nn.Linear(hidden_dim // 2, num_classes) ) # L2正則化 self.l2_reg_alpha = 0.001 # 初始化權重 self._init_weights() def _init_weights(self): """初始化模型權重""" for m in self.modules(): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, nn.LSTM): for name, param in m.named_parameters(): if 'weight' in name: nn.init.orthogonal_(param) # 正交初始化對RNN很有效 elif 'bias' in name: nn.init.zeros_(param) def forward(self, x): """前向傳播""" # x的形狀: [batch_size, seq_len, feature_dim] batch_size, seq_len, _ = x.size() # 特徵投影 - 需要調整維度以適應BatchNorm1d x_reshaped = x.reshape(-1, x.size(-1)) # [batch_size*seq_len, feature_dim] x_projected = self.feature_projection[0](x_reshaped) # Linear層 x_projected = x_projected.reshape(batch_size, seq_len, -1) # 恢復形狀 [batch_size, seq_len, hidden_dim] x_projected = x_projected.transpose(1, 2) # [batch_size, hidden_dim, seq_len] 用於BatchNorm x_projected = self.feature_projection[1](x_projected) # BatchNorm層 x_projected = x_projected.transpose(1, 2) # 恢復形狀 [batch_size, seq_len, hidden_dim] x_projected = self.feature_projection[2](x_projected) # ReLU x_projected = self.feature_projection[3](x_projected) # Dropout # 保存輸入特徵,用於殘差連接 x_residual = x_projected # LSTM處理 lstm_out, _ = self.lstm(x_projected) # lstm_out的形狀: [batch_size, seq_len, hidden_dim*2] # 對LSTM輸出應用BatchNorm lstm_out_bn = lstm_out.transpose(1, 2) # [batch_size, hidden_dim*2, seq_len] lstm_out_bn = self.lstm_bn(lstm_out_bn) lstm_out = lstm_out_bn.transpose(1, 2) # [batch_size, seq_len, hidden_dim*2] # 注意力權重計算 attention_weights = self.attention(lstm_out) # attention_weights的形狀: [batch_size, seq_len, 1] # 應用注意力機制 context = torch.bmm(lstm_out.transpose(1, 2), attention_weights) # context的形狀: [batch_size, hidden_dim*2, 1] context = context.squeeze(-1) # 最終分類 output = self.classifier(context) # output的形狀: [batch_size, num_classes] return output #-------------------- # 手語辨識器類別 #-------------------- class VideoSignLanguageRecognizer: """影片手語辨識器 - 專門處理影片檔案""" def __init__(self, model_path, threshold=0.7): self.model_path = model_path self.threshold = threshold self.effective_threshold = threshold # 初始化特徵提取器 self.feature_extractor = FeatureExtractor() # 加載標籤映射 self.label_map = self._load_label_mapping() # 加載模型 self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = self._load_model() # GPT整合 try: base_url = os.environ.get('OPENAI_BASE_URL') if base_url: self.openai_client = OpenAI(timeout=30.0, max_retries=5, base_url=base_url) else: self.openai_client = OpenAI(timeout=30.0, max_retries=5) except Exception as e: print(f"初始化OpenAI客户端出錯: {e}") self.openai_client = None print(f"影片辨識器初始化完成!使用設備: {self.device}") def _load_label_mapping(self): """加載標籤映射(統一由 labels.csv 提供)""" return load_label_mapping_from_csv() def _load_model(self): """加載訓練好的模型""" input_dim = 225 # (21+21+33) * 3 = 225 model = SignLanguageModel( input_dim=input_dim, hidden_dim=96, num_layers=2, num_classes=len(self.label_map), dropout=0.5 ) # 檢查模型檔案是否存在 if not os.path.exists(self.model_path): print(f"⚠️ 警告:模型檔案不存在 {self.model_path}") print("🔧 將使用隨機初始化的模型(僅供測試)") # 隨機初始化權重用於測試 model.to(self.device) model.eval() return model try: # 載入權重 model.load_state_dict(torch.load(self.model_path, map_location=self.device)) model.to(self.device) model.eval() print(f"✅ 模型載入成功:{self.model_path}") except Exception as e: print(f"❌ 模型載入失敗:{e}") print("🔧 使用隨機初始化的模型") model.to(self.device) model.eval() return model def process_video(self, video_path): """處理整個影片檔案""" print(f"🎬 開始處理影片:{video_path}") # 開啟影片 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 無法開啟影片檔:{video_path}") return None, 0 # 提取特徵序列 keypoints_sequence = [] frame_count = 0 hands_present_count = 0 motion_history = [] prev_gray = None while True: ret, frame = cap.read() if not ret: break # 跳幀處理 if frame_count % 5 == 0: # 每5幀處理一次 keypoints, hands_detected = self._extract_features(frame) if keypoints is not None: keypoints_sequence.append(keypoints) if hands_detected: hands_present_count += 1 # 計算光流運動量 try: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if prev_gray is not None: flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1]) motion_history.append(float(np.mean(mag))) prev_gray = gray except Exception: pass frame_count += 1 # 限制處理幀數 if len(keypoints_sequence) >= 60: break cap.release() if len(keypoints_sequence) < 3: print(f"❌ 有效幀數不足,無法進行辨識") return None, 0 # 動態調整 threshold(手部存在比例 + 運動量) frames_used = max(1, len(keypoints_sequence)) hand_ratio = hands_present_count / frames_used avg_motion = float(np.mean(motion_history)) if motion_history else 0.0 dynamic_threshold = self.threshold if hand_ratio < 0.3: dynamic_threshold = min(0.9, dynamic_threshold + 0.1) if avg_motion < 0.05: dynamic_threshold = min(0.9, dynamic_threshold + 0.05) self.effective_threshold = dynamic_threshold # 進行預測(使用動態 threshold) prediction, confidence, word_sequence, probabilities = self._predict_from_sequence(keypoints_sequence) # 使用GPT生成完整句子 generated_sentence = self._generate_sentence_with_gpt(word_sequence) print(f"🎯 辨識結果:{word_sequence}") print(f"📈 信心度:{confidence:.2f}") return { 'predicted_class': prediction, 'word_sequence': word_sequence, 'confidence': confidence, 'probabilities': probabilities, 'generated_sentence': generated_sentence, 'hand_presence_ratio': hand_ratio, 'avg_motion': avg_motion, 'effective_threshold': dynamic_threshold } def _extract_features(self, frame): """從單一幀提取手部和姿勢特徵""" # 轉為RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 使用長駐的 holistic 實例處理圖像 results = self.feature_extractor.holistic.process(frame_rgb) # 檢查是否有手部被檢測到 hands_detected = (results.left_hand_landmarks is not None or results.right_hand_landmarks is not None) try: keypoints = self.feature_extractor.extract_pose_keypoints(frame, results) return keypoints, hands_detected except Exception as e: return None, hands_detected def _predict_from_sequence(self, keypoints_sequence): """從關鍵點序列進行預測""" # 優化tensor創建避免效能警告 keypoints_array = np.array(keypoints_sequence, dtype=np.float32) sequence_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(self.device) with torch.no_grad(): outputs = self.model(sequence_tensor) probabilities = torch.nn.functional.softmax(outputs, dim=1) max_prob, predicted_class = torch.max(probabilities, 1) predicted_class = predicted_class.item() confidence = max_prob.item() # 提取所有類別的機率 probs = probabilities[0].cpu().numpy() effective_thr = getattr(self, 'effective_threshold', self.threshold) if confidence >= effective_thr: predicted_word = self.label_map.get(predicted_class, f"類別{predicted_class}") word_sequence = [predicted_word] else: word_sequence = [] return predicted_class, confidence, word_sequence, probs def _generate_sentence_with_gpt(self, word_sequence): """使用GPT根據單詞序列生成一個完整句子""" if not word_sequence: return "無法辨識手語內容" if not self.openai_client: return " ".join(word_sequence) try: # 優化prompt,要求GPT只回覆簡潔句子 prompt = f"手語詞彙: {', '.join(word_sequence)}。請組成一個簡潔的中文句子,只回覆句子內容,不要額外說明。" response = self.openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "你是手語翻譯助手。只回覆簡潔的中文句子,不要額外說明或範例。"}, {"role": "user", "content": prompt} ], max_tokens=50, # 減少token數量 temperature=0.3 # 降低隨機性,更準確 ) result = response.choices[0].message.content.strip() # 移除可能的引號和額外文字 result = result.replace('"', '').replace("'", '').strip() # 如果結果太長或包含解釋性文字,回退到原詞彙 if len(result) > 30 or '例如' in result or '可以' in result: return " ".join(word_sequence) return result except Exception as e: print(f"調用GPT API時出錯: {e}") return " ".join(word_sequence) class SignLanguageRecognizer: """即時手語辨識器 - 用於攝像頭流""" def __init__(self, model_path, frame_buffer_size=30, prediction_interval=15, threshold=0.7): self.model_path = model_path self.threshold = threshold self.dynamic_threshold = threshold self.max_buffer_size = frame_buffer_size self.prediction_interval = prediction_interval # 初始化特徵提取器 self.feature_extractor = FeatureExtractor() # 加載標籤映射 self.label_map = self._load_label_mapping() # 加載模型 self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = self._load_model() # 緩衝區和狀態 self.keypoints_buffer = collections.deque(maxlen=frame_buffer_size) self.frame_count = 0 self.current_prediction = None self.prediction_probabilities = None # 手部存在檢測 self.hand_present = False self.hand_absent_frames = 0 self.hand_absent_threshold = 30 # 單詞序列 self.word_sequence = [] self.last_added_word = None self.word_cooldown = 0 self.recent_top1_queue = collections.deque(maxlen=15) self.ema_confidence = 0.0 self.ema_alpha = 0.3 # 生成的句子 self.generated_sentence = "" self.display_sentence_time = 0 # GPT整合 try: base_url = os.environ.get('OPENAI_BASE_URL') if base_url: self.openai_client = OpenAI(timeout=30.0, max_retries=5, base_url=base_url) else: self.openai_client = OpenAI(timeout=30.0, max_retries=5) except Exception as e: print(f"初始化OpenAI客户端出錯: {e}") self.openai_client = None print(f"即時辨識器初始化完成!使用設備: {self.device}") def _load_label_mapping(self): """加載標籤映射(統一由 labels.csv 提供)""" return load_label_mapping_from_csv() def _load_model(self): """加載訓練好的模型""" input_dim = 225 model = SignLanguageModel( input_dim=input_dim, hidden_dim=96, num_layers=2, num_classes=len(self.label_map), dropout=0.5 ) # 檢查模型檔案是否存在 if not os.path.exists(self.model_path): print(f"⚠️ 警告:模型檔案不存在 {self.model_path}") print("🔧 將使用隨機初始化的模型(僅供測試)") model.to(self.device) model.eval() return model try: model.load_state_dict(torch.load(self.model_path, map_location=self.device)) model.to(self.device) model.eval() print(f"✅ 即時辨識模型載入成功:{self.model_path}") except Exception as e: print(f"❌ 即時辨識模型載入失敗:{e}") print("🔧 使用隨機初始化的模型") model.to(self.device) model.eval() return model def process_frame(self, frame): """處理單個視頻幀""" # 提取特徵和檢測手部 keypoint_features, hands_detected = self._extract_features(frame) # 更新手部存在狀態 self._update_hand_presence(hands_detected) # 僅當成功提取特徵時才繼續 if keypoint_features is not None: self.keypoints_buffer.append(keypoint_features) # 定期進行預測 if self.hand_present and self.frame_count % self.prediction_interval == 0 and len(self.keypoints_buffer) > 5: self._make_prediction() self._apply_smoothing_and_decide() # 手部離開時生成句子 if self.hand_present == False and self.hand_absent_frames == self.hand_absent_threshold and self.word_sequence: self._generate_sentence_with_gpt() self.frame_count += 1 if self.word_cooldown > 0: self.word_cooldown -= 1 # 回傳狀態 status = { "hand_present": self.hand_present, "frame_count": self.frame_count, "current_prediction": None, "word_sequence": self.word_sequence.copy(), "generated_sentence": self.generated_sentence, "display_sentence": (time.time() - self.display_sentence_time < 10) } if self.current_prediction is not None: if self.current_prediction == -1: status["current_prediction"] = {"label": "未知", "confidence": 0} else: label = self.label_map.get(self.current_prediction, f"類別{self.current_prediction}") confidence = float(self.prediction_probabilities[self.current_prediction]) if self.prediction_probabilities is not None else 0 status["current_prediction"] = {"label": label, "confidence": confidence} if self.prediction_probabilities is not None: status["probabilities"] = [] sorted_indices = np.argsort(self.prediction_probabilities)[::-1][:4] for idx in sorted_indices: prob = float(self.prediction_probabilities[idx]) class_label = self.label_map.get(idx, f"類別{idx}") status["probabilities"].append({"label": class_label, "probability": prob}) return status def _update_hand_presence(self, hands_detected): """更新手部存在狀態""" if hands_detected: self.hand_present = True self.hand_absent_frames = 0 else: self.hand_absent_frames += 1 if self.hand_absent_frames >= self.hand_absent_threshold: if self.hand_present: self.hand_present = False def _update_word_sequence(self): """根據當前預測更新單詞序列""" if self.current_prediction is not None and self.current_prediction >= 0: word = self.label_map.get(self.current_prediction, f"類別{self.current_prediction}") if word != self.last_added_word or self.word_cooldown == 0: self.word_sequence.append(word) self.last_added_word = word self.word_cooldown = 20 def _generate_sentence_with_gpt(self): """使用GPT根據單詞序列生成一個完整句子""" if not self.word_sequence: return if not self.openai_client: self.generated_sentence = " ".join(self.word_sequence) self.display_sentence_time = time.time() print(f"生成句子: {self.generated_sentence}") self.word_sequence = [] return try: # 優化prompt,要求GPT只回覆簡潔句子 prompt = f"手語詞彙: {', '.join(self.word_sequence)}。請組成一個簡潔的中文句子,只回覆句子內容,不要額外說明。" response = self.openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "你是手語翻譯助手。只回覆簡潔的中文句子,不要額外說明或範例。"}, {"role": "user", "content": prompt} ], max_tokens=50, # 減少token數量 temperature=0.3 # 降低隨機性,更準確 ) result = response.choices[0].message.content.strip() # 移除可能的引號和額外文字 result = result.replace('"', '').replace("'", '').strip() # 如果結果太長或包含解釋性文字,回退到原詞彙 if len(result) > 30 or '例如' in result or '可以' in result: self.generated_sentence = " ".join(self.word_sequence) else: self.generated_sentence = result self.display_sentence_time = time.time() print(f"GPT生成句子: {self.generated_sentence}") except Exception as e: print(f"調用GPT API時出錯: {e}") self.generated_sentence = " ".join(self.word_sequence) self.display_sentence_time = time.time() self.word_sequence = [] def _extract_features(self, frame): """從單一幀提取手部和姿勢特徵""" frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.feature_extractor.holistic.process(frame_rgb) hands_detected = (results.left_hand_landmarks is not None or results.right_hand_landmarks is not None) try: keypoints = self.feature_extractor.extract_pose_keypoints(frame, results) return keypoints, hands_detected except Exception as e: return None, hands_detected def _make_prediction(self): """使用緩衝區中的特徵進行預測,並更新平滑緩衝""" if len(self.keypoints_buffer) < 2: return keypoints_array = np.array(list(self.keypoints_buffer), dtype=np.float32) keypoints_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(self.device) with torch.no_grad(): outputs = self.model(keypoints_tensor) probabilities = torch.nn.functional.softmax(outputs, dim=1) max_prob, predicted_class = torch.max(probabilities, 1) predicted_class = predicted_class.item() max_prob = max_prob.item() probs = probabilities[0].cpu().numpy() # 更新 EMA 信心 self.ema_confidence = self.ema_alpha * max_prob + (1 - self.ema_alpha) * self.ema_confidence # 記錄近N次 top1,供投票平滑 self.recent_top1_queue.append(predicted_class) # 動態 threshold:手不存在或 EMA 偏低時提高門檻 dyn_thr = self.threshold if not self.hand_present: dyn_thr = min(0.95, dyn_thr + 0.15) if self.ema_confidence < 0.5: dyn_thr = min(0.9, dyn_thr + 0.1) self.dynamic_threshold = dyn_thr if max_prob >= dyn_thr: self.current_prediction = predicted_class self.prediction_probabilities = probs else: self.current_prediction = -1 self.prediction_probabilities = probs def _apply_smoothing_and_decide(self): """多幀投票 + 冷卻控制,縮減抖動後再加入字串""" if self.current_prediction is None: return # 多幀投票:取近N幀最多數的類別 if len(self.recent_top1_queue) >= max(5, self.recent_top1_queue.maxlen // 2): counts = collections.Counter(self.recent_top1_queue) voted_class, voted_count = counts.most_common(1)[0] vote_ratio = voted_count / len(self.recent_top1_queue) else: voted_class, vote_ratio = self.current_prediction, 0.0 # 分級門檻:投票占比與 EMA 信心共同決策 strong = (vote_ratio >= 0.6 and self.ema_confidence >= 0.6) medium = (vote_ratio >= 0.5 and self.ema_confidence >= 0.5) weak = (vote_ratio >= 0.4 and self.ema_confidence >= 0.45) decided_class = -1 if strong or medium or weak: decided_class = voted_class # 產生單詞 if decided_class >= 0: self.current_prediction = decided_class self._update_word_sequence() def load_label_mapping_from_csv(labels_file: str = LABELS_PATH): """從 labels.csv 統一載入標籤映射;失敗則回退到預設。""" label_map = {} print(f"🔍 嘗試載入標籤檔案: {labels_file}") if os.path.exists(labels_file): try: df = pd.read_csv(labels_file) for _, row in df.iterrows(): label_map[int(row['index'])] = row['label'] print(f"✅ 從 {labels_file} 載入了 {len(label_map)} 個類別標籤") print(f"📊 標籤映射: {label_map}") except Exception as e: print(f"❌ 讀取 labels.csv 出錯: {e}") else: print(f"❌ 標籤檔案不存在: {labels_file}") if not label_map: label_map = {0: "eat", 1: "fish", 2: "like", 3: "want"} print(f"⚠️ 使用預設標籤映射: {label_map}") return label_map def initialize_recognizer(): global recognizer model_path = MODEL_PATH recognizer = SignLanguageRecognizer( model_path=model_path, frame_buffer_size=30, prediction_interval=10, threshold=0.6 ) def gen_frames(): global camera, recognizer, is_running, current_frame, frame_lock while is_running: success, frame = camera.read() if not success: break else: status = recognizer.process_frame(frame) ret, buffer = cv2.imencode('.jpg', frame) if not ret: continue frame_data = base64.b64encode(buffer).decode('utf-8') with frame_lock: current_frame = {'image': frame_data, 'status': status} socketio.emit('update_frame', {'image': frame_data, 'status': status}) time.sleep(0.1) # 約10 FPS,降低頻寬與CPU #-------------------- # 路由定義 #-------------------- # Messenger Bot 路由 @app.route('/', methods=['GET']) def home(): """主頁 - 提供Web介面和Messenger Bot狀態""" return render_template('index.html') @app.route('/health') def health_check(): """健康檢查""" return { 'status': 'healthy', 'environment': 'HuggingFace Spaces' if IS_HUGGINGFACE else 'Local Development', 'model_loaded': os.path.exists(MODEL_PATH), 'labels_loaded': os.path.exists(LABELS_PATH) } @app.route('/webhook', methods=['GET']) def verify_webhook(): """驗證 Webhook - Facebook 會呼叫這個來驗證你的服務""" mode = request.args.get('hub.mode') token = request.args.get('hub.verify_token') challenge = request.args.get('hub.challenge') if mode and token: if mode == 'subscribe' and token == VERIFY_TOKEN: print("Webhook 驗證成功!") return challenge else: print("驗證失敗 - token 不正確") return "驗證失敗", 403 return "需要驗證參數", 400 @app.route('/webhook', methods=['POST']) def handle_webhook(): """處理從 Messenger 來的訊息""" try: # 驗證 Facebook 簽章 if APP_SECRET: signature = request.headers.get('X-Hub-Signature-256') if not _verify_facebook_signature(signature, request.data, APP_SECRET): print("簽章驗證失敗") return "簽章驗證失敗", 403 data = request.get_json() if data.get('object') == 'page': for entry in data.get('entry', []): for messaging_event in entry.get('messaging', []): if messaging_event.get('message'): handle_message(messaging_event) elif messaging_event.get('postback'): handle_postback(messaging_event) return "EVENT_RECEIVED", 200 except Exception as e: print(f"處理 webhook 時發生錯誤: {e}") return "錯誤", 500 def _verify_facebook_signature(signature_header: str, payload: bytes, app_secret: str) -> bool: """驗證 X-Hub-Signature-256 簽章(Facebook Webhook 安全)""" try: if not signature_header or not signature_header.startswith('sha256='): return False received_sig = signature_header.split('=')[1] mac = hmac.new(app_secret.encode('utf-8'), msg=payload, digestmod=hashlib.sha256) expected_sig = mac.hexdigest() return hmac.compare_digest(received_sig, expected_sig) except Exception: return False @app.route('/receive_recognition_result', methods=['POST']) def receive_recognition_result(): """接收手語辨識結果(內部呼叫)""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "沒有收到資料"}), 400 sender_id = data.get('sender_id') recognition_result = data.get('recognition_result', '無法辨識') confidence = data.get('confidence', 0) if not sender_id: return jsonify({"status": "error", "message": "缺少 sender_id"}), 400 print(f"📝 收到手語辨識結果 - 用戶:{sender_id}") print(f"🎯 辨識結果:{recognition_result}") print(f"📊 信心度:{confidence}") # 發送結果給用戶 send_message(sender_id, recognition_result) return jsonify({ "status": "success", "message": "辨識結果已發送給用戶" }) except Exception as e: print(f"處理辨識結果時發生錯誤:{e}") return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/process_video', methods=['POST']) def process_video(): """處理上傳的影片檔案(整合版本)""" try: # 檢查是否有上傳檔案 if 'video' not in request.files: return jsonify({"status": "error", "message": "沒有上傳影片檔案"}), 400 video_file = request.files['video'] sender_id = request.form.get('sender_id', 'unknown') if video_file.filename == '': return jsonify({"status": "error", "message": "沒有選擇檔案"}), 400 # 基本 MIME 與副檔名檢查 allowed_exts = {'.mp4', '.mov', '.avi', '.wmv', '.mkv'} _, ext = os.path.splitext(video_file.filename.lower()) content_type = (video_file.content_type or '').lower() if ext not in allowed_exts and not content_type.startswith('video/'): return jsonify({"status": "error", "message": "不支援的影片格式"}), 400 # 使用臨時檔案避免權限問題 import tempfile filename = secure_filename(video_file.filename) timestamp = int(time.time()) # 創建臨時檔案 with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4', prefix=f'upload_{sender_id}_') as temp_file: video_path = temp_file.name video_file.save(video_path) print(f"📁 影片已儲存:{video_path}") # 初始化影片辨識器 model_path = MODEL_PATH print(f"🔍 模型路徑: {model_path}") print(f"🔍 模型檔案是否存在: {os.path.exists(model_path)}") if not os.path.exists(model_path): return jsonify({ "status": "error", "message": f"模型檔案不存在: {model_path}" }), 500 video_recognizer = VideoSignLanguageRecognizer(model_path, threshold=0.5) # 處理影片 result = video_recognizer.process_video(video_path) # 清理臨時檔案 try: os.remove(video_path) except: pass if result is not None: # 提取結果數據 predicted_class = result.get('predicted_class', -1) word_sequence = result.get('word_sequence', []) confidence = result.get('confidence', 0.0) probabilities = result.get('probabilities', []) generated_sentence = result.get('generated_sentence', '無法辨識手語內容') # 創建類別機率數據供前端使用 prob_data = [] if len(probabilities) > 0: sorted_indices = np.argsort(probabilities)[::-1][:4] # 取前4個最高機率 for idx in sorted_indices: prob = float(probabilities[idx]) class_label = video_recognizer.label_map.get(idx, f"類別{idx}") prob_data.append({"label": class_label, "probability": prob}) # 獲取預測類別的標籤 predicted_label = video_recognizer.label_map.get(predicted_class, "未知") if predicted_class >= 0 else "未知" # 如果是來自 Messenger 的請求,發送GPT生成的句子 if sender_id != 'unknown': send_message(sender_id, generated_sentence) return jsonify({ "status": "success", "predicted_class": predicted_class, "predicted_label": predicted_label, "word_sequence": word_sequence, "confidence": float(confidence), "probabilities": prob_data, "generated_sentence": generated_sentence, "sender_id": sender_id }) else: return jsonify({ "status": "error", "message": "無法辨識手語內容", "sender_id": sender_id }), 400 except Exception as e: print(f"處理影片時發生錯誤:{e}") import traceback traceback.print_exc() # 印出完整的錯誤堆疊 return jsonify({"status": "error", "message": f"處理影片時發生錯誤: {str(e)}"}), 500 #-------------------- # Messenger Bot 輔助函數 #-------------------- def handle_message(messaging_event): """處理一般訊息""" sender_id = messaging_event['sender']['id'] message = messaging_event.get('message', {}) message_text = message.get('text', '') attachments = message.get('attachments', []) print(f"收到訊息 from {sender_id}: {message_text}") # 檢查是否有附件 if attachments: for attachment in attachments: if attachment.get('type') == 'video': video_url = attachment.get('payload', {}).get('url') if video_url: # 直接處理影片(HuggingFace 整合版本) process_messenger_video(video_url, sender_id) return else: send_message(sender_id, f"收到 {attachment.get('type')} 附件") return # 處理文字訊息 if message_text: response_text = f"您好!請發送手語影片給我,我會幫您辨識手語內容。" send_message(sender_id, response_text) def handle_postback(messaging_event): """處理 postback 事件(按鈕點擊等)""" sender_id = messaging_event['sender']['id'] postback_payload = messaging_event['postback']['payload'] print(f"收到 postback from {sender_id}: {postback_payload}") send_message(sender_id, f"收到 postback:{postback_payload}") def send_message(recipient_id, message_text): """發送訊息給使用者""" headers = { 'Content-Type': 'application/json' } data = { 'recipient': {'id': recipient_id}, 'message': {'text': message_text} } params = { 'access_token': PAGE_ACCESS_TOKEN } response = requests.post( FACEBOOK_API_URL, headers=headers, params=params, json=data ) if response.status_code != 200: print(f"發送訊息失敗: {response.status_code} - {response.text}") else: print(f"訊息發送成功給 {recipient_id}") def process_messenger_video(video_url, sender_id): """處理來自 Messenger 的影片(HuggingFace 整合版本)""" import tempfile import time try: print(f"🎬 開始處理 Messenger 影片:{video_url}") # 自動修復包含佔位符的 URL if 'xx.fbcdn.net' in video_url: print(f"🔧 檢測到佔位符 URL,嘗試自動修復:{video_url}") video_url = _fix_facebook_cdn_url(video_url) print(f"🔄 修復後的 URL:{video_url}") # 檢查 URL 是否可訪問(輕量級檢查) try: # 使用 HEAD 請求檢查 URL 是否可訪問 head_response = requests.head(video_url, timeout=10, verify=False, allow_redirects=True) if head_response.status_code != 200: print(f"❌ 影片 URL 不可訪問,狀態碼:{head_response.status_code}") send_message(sender_id, "影片連結已過期或無法訪問,請重新發送影片。") return except requests.exceptions.RequestException as e: print(f"❌ 影片 URL 檢查失敗:{e}") send_message(sender_id, "影片連結檢查失敗,請重新發送影片。") return # 重試下載邏輯 max_retries = 3 retry_delay = 2 # 初始延遲 2 秒 for attempt in range(max_retries): try: print(f"📥 嘗試下載影片(第 {attempt + 1} 次)") # 下載影片 response = requests.get(video_url, stream=True, timeout=60, verify=False) response.raise_for_status() # 使用臨時檔案避免權限問題 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"messenger_video_{sender_id}_{timestamp}.mp4" # 創建臨時檔案 with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4', prefix=f'messenger_{sender_id}_') as temp_file: file_path = temp_file.name # 寫入檔案 downloaded_size = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: temp_file.write(chunk) downloaded_size += len(chunk) # 檢查下載的檔案大小 if downloaded_size < 1024: # 小於 1KB 可能是錯誤 raise ValueError(f"下載的檔案太小:{downloaded_size} bytes") print(f"✅ 影片下載完成:{file_path} ({downloaded_size} bytes)") # 初始化影片辨識器 model_path = MODEL_PATH video_recognizer = VideoSignLanguageRecognizer(model_path, threshold=0.5) # 處理影片 result = video_recognizer.process_video(file_path) # 清理臨時檔案 try: os.remove(file_path) except: pass if result: generated_sentence = result.get('generated_sentence', '無法辨識手語內容') confidence = result.get('confidence', 0.0) word_sequence = result.get('word_sequence', []) print(f"✅ 手語辨識完成 - 用戶:{sender_id}") print(f"📝 模型辨識:{word_sequence}") print(f"💬 GPT翻譯:{generated_sentence}") print(f"🎯 信心度:{confidence:.2f}") # 發送GPT翻譯結果給用戶 send_message(sender_id, generated_sentence) else: send_message(sender_id, "抱歉,無法辨識您的手語內容,請再試一次。") # 釋放 Mediapipe 資源 try: video_recognizer.feature_extractor.close() except Exception: pass return # 成功處理,退出函數 except requests.exceptions.RequestException as e: print(f"❌ 下載失敗(第 {attempt + 1} 次):{e}") if attempt < max_retries - 1: print(f"⏳ {retry_delay} 秒後重試...") time.sleep(retry_delay) retry_delay *= 2 # 指數退避 else: print("❌ 所有重試次數已用完") send_message(sender_id, "影片下載失敗,請檢查網路連線後重新發送影片。") except Exception as e: print(f"❌ 處理影片時發生錯誤:{e}") send_message(sender_id, "處理影片時發生錯誤,請稍後再試。") return except Exception as e: print(f"處理 Messenger 影片時發生錯誤:{e}") send_message(sender_id, "處理影片時發生錯誤,請稍後再試。") def _fix_facebook_cdn_url(url): """修復包含佔位符 'xx' 的 Facebook CDN URL""" if 'xx.fbcdn.net' not in url: return url # 首先測試原始 URL 是否真的無法訪問 print(f"🔍 先測試原始 URL 是否可訪問:{url}") try: response = requests.head(url, timeout=10, verify=False, allow_redirects=True) if response.status_code == 200: print(f"✅ 原始 URL 實際上是可以訪問的!狀態碼:{response.status_code}") return url # 如果原始 URL 可以訪問,直接返回 except requests.exceptions.RequestException as e: print(f"⚠️ 原始 URL 測試失敗:{e},開始嘗試修復...") # 擴展的 Facebook CDN 子域名列表(包含更多可能性) common_subdomains = [ # 主要數據中心 'fsin2-1', 'fsin2-2', 'fsin6-1', 'fsin6-2', # 新加坡 'fsjc1-1', 'fsjc1-2', 'fsjc2-1', 'fsjc2-2', # 加州 'fmaa1-1', 'fmaa1-2', 'fmaa2-1', 'fmaa2-2', # 馬來西亞 'fatl1-1', 'fatl1-2', # 亞特蘭大 'fsea1-1', 'fsea1-2', # 西雅圖 'fiad1-1', 'fiad1-2', # 愛爾蘭都柏林 'flin1-1', 'flin1-2', # 倫敦 'ffor1-1', 'ffor1-2', # 法蘭克福 'ftpe1-1', 'ftpe1-2', # 台灣 'fhkg1-1', 'fhkg1-2', # 香港 'fbom1-1', 'fbom1-2', # 孟買 'fsyd1-1', 'fsyd1-2', # 悉尼 'fssa1-1', 'fssa1-2', # 南非 'fgig1-1', 'fgig1-2', # 巴西 # 備用和測試子域名 'video', # 有時直接用 video 'scontent', # 靜態內容 'external', # 外部內容 ] print(f"🔧 開始測試 {len(common_subdomains)} 個可能的子域名...") # 替換 'xx' 為每個可能的子域名並測試 for subdomain in common_subdomains: fixed_url = url.replace('xx.fbcdn.net', f'{subdomain}.fbcdn.net') print(f"🔍 測試:{fixed_url}") try: # 快速測試 URL 是否可訪問 response = requests.head(fixed_url, timeout=5, verify=False, allow_redirects=True) if response.status_code == 200: print(f"✅ 找到有效的 URL:{fixed_url}") return fixed_url except requests.exceptions.RequestException: continue # 如果都失敗,返回原始 URL(因為用戶說可以訪問) print(f"❌ 無法找到更好的 URL,但原始 URL 可能仍然有效:{url}") return url #-------------------- # WebSocket 路由 (即時手語辨識) #-------------------- @socketio.on('connect') def handle_connect(): """處理WebSocket連接""" print('客戶端已連接') @socketio.on('disconnect') def handle_disconnect(): """處理WebSocket斷開連接""" print('客戶端已斷開連接') @socketio.on('start_stream') def handle_start_stream(data): """開始視頻流""" global camera, is_running # 雲端環境檢查 if IS_HUGGINGFACE: return {'status': 'error', 'message': '雲端環境不支援攝像頭功能,請使用影片上傳功能'} if is_running: return {'status': 'already_running'} # 初始化攝像頭 camera = cv2.VideoCapture(0) camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640) camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) if not camera.isOpened(): return {'status': 'error', 'message': '無法打開攝像頭'} # 初始化手語辨識器 if recognizer is None: initialize_recognizer() # 啟動處理線程 is_running = True threading.Thread(target=gen_frames, daemon=True).start() return {'status': 'success'} @socketio.on('stop_stream') def handle_stop_stream(data): """停止視頻流""" global camera, is_running is_running = False # 釋放攝像頭 if camera is not None: camera.release() camera = None return {'status': 'success'} #-------------------- # 應用程式啟動 #-------------------- if __name__ == '__main__': # HuggingFace Spaces 環境檢測 port = int(CONFIG.get('PORT', 7860)) # HuggingFace 預設端口 print("🚀 手語辨識整合系統啟動中...") print(f"📱 Messenger Bot: {'已配置' if PAGE_ACCESS_TOKEN != 'your_page_access_token' else '未配置'}") print(f"🤖 OpenAI API: {'已配置' if CONFIG.get('OPENAI_API_KEY') else '未配置'}") print(f"🔧 運行模式: {'HuggingFace Spaces' if port == 7860 else '本地開發'} | SocketIO: {ASYNC_MODE}") socketio.run(app, host='0.0.0.0', port=port, debug=CONFIG.get('DEBUG', False))