Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import os | |
| import hmac | |
| import hashlib | |
| import json | |
| import requests | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import base64 | |
| import threading | |
| import time | |
| import mediapipe as mp | |
| import collections | |
| from flask import Flask, request, jsonify, render_template, Response | |
| from werkzeug.utils import secure_filename | |
| from datetime import datetime | |
| from flask_socketio import SocketIO, emit | |
| from openai import OpenAI | |
| from app_config import get_config | |
| # 選擇 SocketIO 執行模式(優先使用 eventlet) | |
| ASYNC_MODE = os.environ.get('SOCKETIO_ASYNC_MODE', 'auto') | |
| try: | |
| import eventlet | |
| if ASYNC_MODE in ('auto', 'eventlet'): | |
| eventlet.monkey_patch() | |
| ASYNC_MODE = 'eventlet' | |
| except Exception: | |
| ASYNC_MODE = 'threading' | |
| # 環境變數設定 | |
| # OpenAI API KEY 應該從環境變數獲取,不要硬編碼 | |
| # 請在 HuggingFace Spaces 設定中添加 OPENAI_API_KEY 環境變數 | |
| # 設定環境變數避免權限問題和減少日誌 | |
| os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib' | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 減少TensorFlow日誌 | |
| os.environ['MEDIAPIPE_DISABLE_GPU'] = '1' # 禁用GPU避免警告 | |
| # 避免 eventlet greendns 造成外部連線(如 OpenAI)解析問題 | |
| os.environ.setdefault('EVENTLET_NO_GREENDNS', 'yes') | |
| # 環境檢測 | |
| IS_HUGGINGFACE = os.environ.get('SPACE_ID') is not None | |
| IS_LOCAL_DEV = not IS_HUGGINGFACE | |
| # 載入集中設定 | |
| CONFIG = get_config() | |
| # Flask 應用初始化 | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'sign_language_secret_key' | |
| app.config['MAX_CONTENT_LENGTH'] = CONFIG.get('MAX_FILE_SIZE', 100 * 1024 * 1024) # 100MB max file size | |
| socketio = SocketIO(app, cors_allowed_origins="*", async_mode=ASYNC_MODE) | |
| # Messenger Bot 設定 | |
| VERIFY_TOKEN = CONFIG.get('VERIFY_TOKEN', 'your_verify_token') | |
| PAGE_ACCESS_TOKEN = CONFIG.get('PAGE_ACCESS_TOKEN', 'your_page_access_token') | |
| APP_SECRET = CONFIG.get('APP_SECRET') | |
| FACEBOOK_API_URL = 'https://graph.facebook.com/v18.0/me/messages' | |
| # 路徑設定 - 適應不同環境 | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DATA_DIR = os.path.join(BASE_DIR, 'data') | |
| MODEL_PATH = os.path.join(DATA_DIR, 'models', 'sign_language_model.pth') | |
| LABELS_PATH = os.path.join(DATA_DIR, 'labels.csv') | |
| UPLOAD_FOLDER = os.path.join(BASE_DIR, 'uploads') | |
| # 建立必要資料夾 | |
| for folder in [UPLOAD_FOLDER, os.path.join(DATA_DIR, 'models'), os.path.join(DATA_DIR, 'features', 'keypoints')]: | |
| os.makedirs(folder, exist_ok=True) | |
| # 全域變數 | |
| camera = None | |
| recognizer = None | |
| is_running = False | |
| frame_lock = threading.Lock() | |
| current_frame = None | |
| print(f"🌍 運行環境: {'HuggingFace Spaces' if IS_HUGGINGFACE else '本地開發'}") | |
| print(f"📁 基礎目錄: {BASE_DIR}") | |
| print(f"🤖 模型路徑: {MODEL_PATH}") | |
| print(f"📊 標籤路徑: {LABELS_PATH}") | |
| #-------------------- | |
| # AI 模型類別 | |
| #-------------------- | |
| class FeatureExtractor: | |
| def __init__(self): | |
| # 初始化MediaPipe模型 | |
| self.mp_holistic = mp.solutions.holistic | |
| self.mp_drawing = mp.solutions.drawing_utils | |
| self.mp_drawing_styles = mp.solutions.drawing_styles | |
| # 建立長駐的 Holistic 實例(避免每幀重建導致效能低落) | |
| self.holistic = self.mp_holistic.Holistic( | |
| static_image_mode=False, | |
| model_complexity=1, | |
| smooth_landmarks=True, | |
| enable_segmentation=False, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5 | |
| ) | |
| def close(self): | |
| try: | |
| if self.holistic: | |
| self.holistic.close() | |
| except Exception: | |
| pass | |
| def extract_pose_keypoints(self, frame, holistic_results): | |
| """提取骨架關鍵點""" | |
| keypoints = [] | |
| # 提取手部關鍵點 (如果檢測到) | |
| if holistic_results.left_hand_landmarks: | |
| for landmark in holistic_results.left_hand_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| # 如果沒有檢測到手,填充0 | |
| keypoints.extend([0] * (21 * 3)) | |
| if holistic_results.right_hand_landmarks: | |
| for landmark in holistic_results.right_hand_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (21 * 3)) | |
| # 提取姿勢關鍵點 | |
| if holistic_results.pose_landmarks: | |
| for landmark in holistic_results.pose_landmarks.landmark: | |
| keypoints.extend([landmark.x, landmark.y, landmark.z]) | |
| else: | |
| keypoints.extend([0] * (33 * 3)) | |
| return np.array(keypoints) | |
| class SignLanguageModel(nn.Module): | |
| """ | |
| 手語辨識模型,使用雙向LSTM和注意力機制,加入批量標準化和殘差連接 | |
| """ | |
| def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5): | |
| super(SignLanguageModel, self).__init__() | |
| self.hidden_dim = hidden_dim | |
| self.num_layers = num_layers | |
| self.num_classes = num_classes | |
| # 特徵投影層,將輸入映射到統一維度 | |
| self.feature_projection = nn.Sequential( | |
| nn.Linear(input_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2) # 較輕的dropout | |
| ) | |
| # 雙向LSTM層 | |
| self.lstm = nn.LSTM( | |
| input_size=hidden_dim, | |
| hidden_size=hidden_dim, | |
| num_layers=num_layers, | |
| batch_first=True, | |
| dropout=dropout if num_layers > 1 else 0, | |
| bidirectional=True | |
| ) | |
| # 批量標準化層(用於規範化LSTM輸出) | |
| self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2) | |
| # 注意力機制 | |
| self.attention = nn.Sequential( | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.Tanh(), | |
| nn.Linear(hidden_dim, 1), | |
| nn.Softmax(dim=1) | |
| ) | |
| # 分類器 | |
| self.classifier = nn.Sequential( | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(hidden_dim, hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout/2), | |
| nn.Linear(hidden_dim // 2, num_classes) | |
| ) | |
| # L2正則化 | |
| self.l2_reg_alpha = 0.001 | |
| # 初始化權重 | |
| self._init_weights() | |
| def _init_weights(self): | |
| """初始化模型權重""" | |
| for m in self.modules(): | |
| if isinstance(m, nn.Linear): | |
| nn.init.xavier_uniform_(m.weight) | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| elif isinstance(m, nn.LSTM): | |
| for name, param in m.named_parameters(): | |
| if 'weight' in name: | |
| nn.init.orthogonal_(param) # 正交初始化對RNN很有效 | |
| elif 'bias' in name: | |
| nn.init.zeros_(param) | |
| def forward(self, x): | |
| """前向傳播""" | |
| # x的形狀: [batch_size, seq_len, feature_dim] | |
| batch_size, seq_len, _ = x.size() | |
| # 特徵投影 - 需要調整維度以適應BatchNorm1d | |
| x_reshaped = x.reshape(-1, x.size(-1)) # [batch_size*seq_len, feature_dim] | |
| x_projected = self.feature_projection[0](x_reshaped) # Linear層 | |
| x_projected = x_projected.reshape(batch_size, seq_len, -1) # 恢復形狀 [batch_size, seq_len, hidden_dim] | |
| x_projected = x_projected.transpose(1, 2) # [batch_size, hidden_dim, seq_len] 用於BatchNorm | |
| x_projected = self.feature_projection[1](x_projected) # BatchNorm層 | |
| x_projected = x_projected.transpose(1, 2) # 恢復形狀 [batch_size, seq_len, hidden_dim] | |
| x_projected = self.feature_projection[2](x_projected) # ReLU | |
| x_projected = self.feature_projection[3](x_projected) # Dropout | |
| # 保存輸入特徵,用於殘差連接 | |
| x_residual = x_projected | |
| # LSTM處理 | |
| lstm_out, _ = self.lstm(x_projected) | |
| # lstm_out的形狀: [batch_size, seq_len, hidden_dim*2] | |
| # 對LSTM輸出應用BatchNorm | |
| lstm_out_bn = lstm_out.transpose(1, 2) # [batch_size, hidden_dim*2, seq_len] | |
| lstm_out_bn = self.lstm_bn(lstm_out_bn) | |
| lstm_out = lstm_out_bn.transpose(1, 2) # [batch_size, seq_len, hidden_dim*2] | |
| # 注意力權重計算 | |
| attention_weights = self.attention(lstm_out) | |
| # attention_weights的形狀: [batch_size, seq_len, 1] | |
| # 應用注意力機制 | |
| context = torch.bmm(lstm_out.transpose(1, 2), attention_weights) | |
| # context的形狀: [batch_size, hidden_dim*2, 1] | |
| context = context.squeeze(-1) | |
| # 最終分類 | |
| output = self.classifier(context) | |
| # output的形狀: [batch_size, num_classes] | |
| return output | |
| #-------------------- | |
| # 手語辨識器類別 | |
| #-------------------- | |
| class VideoSignLanguageRecognizer: | |
| """影片手語辨識器 - 專門處理影片檔案""" | |
| def __init__(self, model_path, threshold=0.7): | |
| self.model_path = model_path | |
| self.threshold = threshold | |
| self.effective_threshold = threshold | |
| # 初始化特徵提取器 | |
| self.feature_extractor = FeatureExtractor() | |
| # 加載標籤映射 | |
| self.label_map = self._load_label_mapping() | |
| # 加載模型 | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.model = self._load_model() | |
| # GPT整合 | |
| try: | |
| base_url = os.environ.get('OPENAI_BASE_URL') | |
| if base_url: | |
| self.openai_client = OpenAI(timeout=30.0, max_retries=5, base_url=base_url) | |
| else: | |
| self.openai_client = OpenAI(timeout=30.0, max_retries=5) | |
| except Exception as e: | |
| print(f"初始化OpenAI客户端出錯: {e}") | |
| self.openai_client = None | |
| print(f"影片辨識器初始化完成!使用設備: {self.device}") | |
| def _load_label_mapping(self): | |
| """加載標籤映射(統一由 labels.csv 提供)""" | |
| return load_label_mapping_from_csv() | |
| def _load_model(self): | |
| """加載訓練好的模型""" | |
| input_dim = 225 # (21+21+33) * 3 = 225 | |
| model = SignLanguageModel( | |
| input_dim=input_dim, | |
| hidden_dim=96, | |
| num_layers=2, | |
| num_classes=len(self.label_map), | |
| dropout=0.5 | |
| ) | |
| # 檢查模型檔案是否存在 | |
| if not os.path.exists(self.model_path): | |
| print(f"⚠️ 警告:模型檔案不存在 {self.model_path}") | |
| print("🔧 將使用隨機初始化的模型(僅供測試)") | |
| # 隨機初始化權重用於測試 | |
| model.to(self.device) | |
| model.eval() | |
| return model | |
| try: | |
| # 載入權重 | |
| model.load_state_dict(torch.load(self.model_path, map_location=self.device)) | |
| model.to(self.device) | |
| model.eval() | |
| print(f"✅ 模型載入成功:{self.model_path}") | |
| except Exception as e: | |
| print(f"❌ 模型載入失敗:{e}") | |
| print("🔧 使用隨機初始化的模型") | |
| model.to(self.device) | |
| model.eval() | |
| return model | |
| def process_video(self, video_path): | |
| """處理整個影片檔案""" | |
| print(f"🎬 開始處理影片:{video_path}") | |
| # 開啟影片 | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| print(f"❌ 無法開啟影片檔:{video_path}") | |
| return None, 0 | |
| # 提取特徵序列 | |
| keypoints_sequence = [] | |
| frame_count = 0 | |
| hands_present_count = 0 | |
| motion_history = [] | |
| prev_gray = None | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # 跳幀處理 | |
| if frame_count % 5 == 0: # 每5幀處理一次 | |
| keypoints, hands_detected = self._extract_features(frame) | |
| if keypoints is not None: | |
| keypoints_sequence.append(keypoints) | |
| if hands_detected: | |
| hands_present_count += 1 | |
| # 計算光流運動量 | |
| try: | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| if prev_gray is not None: | |
| flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) | |
| mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1]) | |
| motion_history.append(float(np.mean(mag))) | |
| prev_gray = gray | |
| except Exception: | |
| pass | |
| frame_count += 1 | |
| # 限制處理幀數 | |
| if len(keypoints_sequence) >= 60: | |
| break | |
| cap.release() | |
| if len(keypoints_sequence) < 3: | |
| print(f"❌ 有效幀數不足,無法進行辨識") | |
| return None, 0 | |
| # 動態調整 threshold(手部存在比例 + 運動量) | |
| frames_used = max(1, len(keypoints_sequence)) | |
| hand_ratio = hands_present_count / frames_used | |
| avg_motion = float(np.mean(motion_history)) if motion_history else 0.0 | |
| dynamic_threshold = self.threshold | |
| if hand_ratio < 0.3: | |
| dynamic_threshold = min(0.9, dynamic_threshold + 0.1) | |
| if avg_motion < 0.05: | |
| dynamic_threshold = min(0.9, dynamic_threshold + 0.05) | |
| self.effective_threshold = dynamic_threshold | |
| # 進行預測(使用動態 threshold) | |
| prediction, confidence, word_sequence, probabilities = self._predict_from_sequence(keypoints_sequence) | |
| # 使用GPT生成完整句子 | |
| generated_sentence = self._generate_sentence_with_gpt(word_sequence) | |
| print(f"🎯 辨識結果:{word_sequence}") | |
| print(f"📈 信心度:{confidence:.2f}") | |
| return { | |
| 'predicted_class': prediction, | |
| 'word_sequence': word_sequence, | |
| 'confidence': confidence, | |
| 'probabilities': probabilities, | |
| 'generated_sentence': generated_sentence, | |
| 'hand_presence_ratio': hand_ratio, | |
| 'avg_motion': avg_motion, | |
| 'effective_threshold': dynamic_threshold | |
| } | |
| def _extract_features(self, frame): | |
| """從單一幀提取手部和姿勢特徵""" | |
| # 轉為RGB | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # 使用長駐的 holistic 實例處理圖像 | |
| results = self.feature_extractor.holistic.process(frame_rgb) | |
| # 檢查是否有手部被檢測到 | |
| hands_detected = (results.left_hand_landmarks is not None or | |
| results.right_hand_landmarks is not None) | |
| try: | |
| keypoints = self.feature_extractor.extract_pose_keypoints(frame, results) | |
| return keypoints, hands_detected | |
| except Exception as e: | |
| return None, hands_detected | |
| def _predict_from_sequence(self, keypoints_sequence): | |
| """從關鍵點序列進行預測""" | |
| # 優化tensor創建避免效能警告 | |
| keypoints_array = np.array(keypoints_sequence, dtype=np.float32) | |
| sequence_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.model(sequence_tensor) | |
| probabilities = torch.nn.functional.softmax(outputs, dim=1) | |
| max_prob, predicted_class = torch.max(probabilities, 1) | |
| predicted_class = predicted_class.item() | |
| confidence = max_prob.item() | |
| # 提取所有類別的機率 | |
| probs = probabilities[0].cpu().numpy() | |
| effective_thr = getattr(self, 'effective_threshold', self.threshold) | |
| if confidence >= effective_thr: | |
| predicted_word = self.label_map.get(predicted_class, f"類別{predicted_class}") | |
| word_sequence = [predicted_word] | |
| else: | |
| word_sequence = [] | |
| return predicted_class, confidence, word_sequence, probs | |
| def _generate_sentence_with_gpt(self, word_sequence): | |
| """使用GPT根據單詞序列生成一個完整句子""" | |
| if not word_sequence: | |
| return "無法辨識手語內容" | |
| if not self.openai_client: | |
| return " ".join(word_sequence) | |
| try: | |
| # 優化prompt,要求GPT只回覆簡潔句子 | |
| prompt = f"手語詞彙: {', '.join(word_sequence)}。請組成一個簡潔的中文句子,只回覆句子內容,不要額外說明。" | |
| response = self.openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "你是手語翻譯助手。只回覆簡潔的中文句子,不要額外說明或範例。"}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=50, # 減少token數量 | |
| temperature=0.3 # 降低隨機性,更準確 | |
| ) | |
| result = response.choices[0].message.content.strip() | |
| # 移除可能的引號和額外文字 | |
| result = result.replace('"', '').replace("'", '').strip() | |
| # 如果結果太長或包含解釋性文字,回退到原詞彙 | |
| if len(result) > 30 or '例如' in result or '可以' in result: | |
| return " ".join(word_sequence) | |
| return result | |
| except Exception as e: | |
| print(f"調用GPT API時出錯: {e}") | |
| return " ".join(word_sequence) | |
| class SignLanguageRecognizer: | |
| """即時手語辨識器 - 用於攝像頭流""" | |
| def __init__(self, model_path, frame_buffer_size=30, prediction_interval=15, threshold=0.7): | |
| self.model_path = model_path | |
| self.threshold = threshold | |
| self.dynamic_threshold = threshold | |
| self.max_buffer_size = frame_buffer_size | |
| self.prediction_interval = prediction_interval | |
| # 初始化特徵提取器 | |
| self.feature_extractor = FeatureExtractor() | |
| # 加載標籤映射 | |
| self.label_map = self._load_label_mapping() | |
| # 加載模型 | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.model = self._load_model() | |
| # 緩衝區和狀態 | |
| self.keypoints_buffer = collections.deque(maxlen=frame_buffer_size) | |
| self.frame_count = 0 | |
| self.current_prediction = None | |
| self.prediction_probabilities = None | |
| # 手部存在檢測 | |
| self.hand_present = False | |
| self.hand_absent_frames = 0 | |
| self.hand_absent_threshold = 30 | |
| # 單詞序列 | |
| self.word_sequence = [] | |
| self.last_added_word = None | |
| self.word_cooldown = 0 | |
| self.recent_top1_queue = collections.deque(maxlen=15) | |
| self.ema_confidence = 0.0 | |
| self.ema_alpha = 0.3 | |
| # 生成的句子 | |
| self.generated_sentence = "" | |
| self.display_sentence_time = 0 | |
| # GPT整合 | |
| try: | |
| base_url = os.environ.get('OPENAI_BASE_URL') | |
| if base_url: | |
| self.openai_client = OpenAI(timeout=30.0, max_retries=5, base_url=base_url) | |
| else: | |
| self.openai_client = OpenAI(timeout=30.0, max_retries=5) | |
| except Exception as e: | |
| print(f"初始化OpenAI客户端出錯: {e}") | |
| self.openai_client = None | |
| print(f"即時辨識器初始化完成!使用設備: {self.device}") | |
| def _load_label_mapping(self): | |
| """加載標籤映射(統一由 labels.csv 提供)""" | |
| return load_label_mapping_from_csv() | |
| def _load_model(self): | |
| """加載訓練好的模型""" | |
| input_dim = 225 | |
| model = SignLanguageModel( | |
| input_dim=input_dim, | |
| hidden_dim=96, | |
| num_layers=2, | |
| num_classes=len(self.label_map), | |
| dropout=0.5 | |
| ) | |
| # 檢查模型檔案是否存在 | |
| if not os.path.exists(self.model_path): | |
| print(f"⚠️ 警告:模型檔案不存在 {self.model_path}") | |
| print("🔧 將使用隨機初始化的模型(僅供測試)") | |
| model.to(self.device) | |
| model.eval() | |
| return model | |
| try: | |
| model.load_state_dict(torch.load(self.model_path, map_location=self.device)) | |
| model.to(self.device) | |
| model.eval() | |
| print(f"✅ 即時辨識模型載入成功:{self.model_path}") | |
| except Exception as e: | |
| print(f"❌ 即時辨識模型載入失敗:{e}") | |
| print("🔧 使用隨機初始化的模型") | |
| model.to(self.device) | |
| model.eval() | |
| return model | |
| def process_frame(self, frame): | |
| """處理單個視頻幀""" | |
| # 提取特徵和檢測手部 | |
| keypoint_features, hands_detected = self._extract_features(frame) | |
| # 更新手部存在狀態 | |
| self._update_hand_presence(hands_detected) | |
| # 僅當成功提取特徵時才繼續 | |
| if keypoint_features is not None: | |
| self.keypoints_buffer.append(keypoint_features) | |
| # 定期進行預測 | |
| if self.hand_present and self.frame_count % self.prediction_interval == 0 and len(self.keypoints_buffer) > 5: | |
| self._make_prediction() | |
| self._apply_smoothing_and_decide() | |
| # 手部離開時生成句子 | |
| if self.hand_present == False and self.hand_absent_frames == self.hand_absent_threshold and self.word_sequence: | |
| self._generate_sentence_with_gpt() | |
| self.frame_count += 1 | |
| if self.word_cooldown > 0: | |
| self.word_cooldown -= 1 | |
| # 回傳狀態 | |
| status = { | |
| "hand_present": self.hand_present, | |
| "frame_count": self.frame_count, | |
| "current_prediction": None, | |
| "word_sequence": self.word_sequence.copy(), | |
| "generated_sentence": self.generated_sentence, | |
| "display_sentence": (time.time() - self.display_sentence_time < 10) | |
| } | |
| if self.current_prediction is not None: | |
| if self.current_prediction == -1: | |
| status["current_prediction"] = {"label": "未知", "confidence": 0} | |
| else: | |
| label = self.label_map.get(self.current_prediction, f"類別{self.current_prediction}") | |
| confidence = float(self.prediction_probabilities[self.current_prediction]) if self.prediction_probabilities is not None else 0 | |
| status["current_prediction"] = {"label": label, "confidence": confidence} | |
| if self.prediction_probabilities is not None: | |
| status["probabilities"] = [] | |
| sorted_indices = np.argsort(self.prediction_probabilities)[::-1][:4] | |
| for idx in sorted_indices: | |
| prob = float(self.prediction_probabilities[idx]) | |
| class_label = self.label_map.get(idx, f"類別{idx}") | |
| status["probabilities"].append({"label": class_label, "probability": prob}) | |
| return status | |
| def _update_hand_presence(self, hands_detected): | |
| """更新手部存在狀態""" | |
| if hands_detected: | |
| self.hand_present = True | |
| self.hand_absent_frames = 0 | |
| else: | |
| self.hand_absent_frames += 1 | |
| if self.hand_absent_frames >= self.hand_absent_threshold: | |
| if self.hand_present: | |
| self.hand_present = False | |
| def _update_word_sequence(self): | |
| """根據當前預測更新單詞序列""" | |
| if self.current_prediction is not None and self.current_prediction >= 0: | |
| word = self.label_map.get(self.current_prediction, f"類別{self.current_prediction}") | |
| if word != self.last_added_word or self.word_cooldown == 0: | |
| self.word_sequence.append(word) | |
| self.last_added_word = word | |
| self.word_cooldown = 20 | |
| def _generate_sentence_with_gpt(self): | |
| """使用GPT根據單詞序列生成一個完整句子""" | |
| if not self.word_sequence: | |
| return | |
| if not self.openai_client: | |
| self.generated_sentence = " ".join(self.word_sequence) | |
| self.display_sentence_time = time.time() | |
| print(f"生成句子: {self.generated_sentence}") | |
| self.word_sequence = [] | |
| return | |
| try: | |
| # 優化prompt,要求GPT只回覆簡潔句子 | |
| prompt = f"手語詞彙: {', '.join(self.word_sequence)}。請組成一個簡潔的中文句子,只回覆句子內容,不要額外說明。" | |
| response = self.openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "你是手語翻譯助手。只回覆簡潔的中文句子,不要額外說明或範例。"}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=50, # 減少token數量 | |
| temperature=0.3 # 降低隨機性,更準確 | |
| ) | |
| result = response.choices[0].message.content.strip() | |
| # 移除可能的引號和額外文字 | |
| result = result.replace('"', '').replace("'", '').strip() | |
| # 如果結果太長或包含解釋性文字,回退到原詞彙 | |
| if len(result) > 30 or '例如' in result or '可以' in result: | |
| self.generated_sentence = " ".join(self.word_sequence) | |
| else: | |
| self.generated_sentence = result | |
| self.display_sentence_time = time.time() | |
| print(f"GPT生成句子: {self.generated_sentence}") | |
| except Exception as e: | |
| print(f"調用GPT API時出錯: {e}") | |
| self.generated_sentence = " ".join(self.word_sequence) | |
| self.display_sentence_time = time.time() | |
| self.word_sequence = [] | |
| def _extract_features(self, frame): | |
| """從單一幀提取手部和姿勢特徵""" | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = self.feature_extractor.holistic.process(frame_rgb) | |
| hands_detected = (results.left_hand_landmarks is not None or | |
| results.right_hand_landmarks is not None) | |
| try: | |
| keypoints = self.feature_extractor.extract_pose_keypoints(frame, results) | |
| return keypoints, hands_detected | |
| except Exception as e: | |
| return None, hands_detected | |
| def _make_prediction(self): | |
| """使用緩衝區中的特徵進行預測,並更新平滑緩衝""" | |
| if len(self.keypoints_buffer) < 2: | |
| return | |
| keypoints_array = np.array(list(self.keypoints_buffer), dtype=np.float32) | |
| keypoints_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.model(keypoints_tensor) | |
| probabilities = torch.nn.functional.softmax(outputs, dim=1) | |
| max_prob, predicted_class = torch.max(probabilities, 1) | |
| predicted_class = predicted_class.item() | |
| max_prob = max_prob.item() | |
| probs = probabilities[0].cpu().numpy() | |
| # 更新 EMA 信心 | |
| self.ema_confidence = self.ema_alpha * max_prob + (1 - self.ema_alpha) * self.ema_confidence | |
| # 記錄近N次 top1,供投票平滑 | |
| self.recent_top1_queue.append(predicted_class) | |
| # 動態 threshold:手不存在或 EMA 偏低時提高門檻 | |
| dyn_thr = self.threshold | |
| if not self.hand_present: | |
| dyn_thr = min(0.95, dyn_thr + 0.15) | |
| if self.ema_confidence < 0.5: | |
| dyn_thr = min(0.9, dyn_thr + 0.1) | |
| self.dynamic_threshold = dyn_thr | |
| if max_prob >= dyn_thr: | |
| self.current_prediction = predicted_class | |
| self.prediction_probabilities = probs | |
| else: | |
| self.current_prediction = -1 | |
| self.prediction_probabilities = probs | |
| def _apply_smoothing_and_decide(self): | |
| """多幀投票 + 冷卻控制,縮減抖動後再加入字串""" | |
| if self.current_prediction is None: | |
| return | |
| # 多幀投票:取近N幀最多數的類別 | |
| if len(self.recent_top1_queue) >= max(5, self.recent_top1_queue.maxlen // 2): | |
| counts = collections.Counter(self.recent_top1_queue) | |
| voted_class, voted_count = counts.most_common(1)[0] | |
| vote_ratio = voted_count / len(self.recent_top1_queue) | |
| else: | |
| voted_class, vote_ratio = self.current_prediction, 0.0 | |
| # 分級門檻:投票占比與 EMA 信心共同決策 | |
| strong = (vote_ratio >= 0.6 and self.ema_confidence >= 0.6) | |
| medium = (vote_ratio >= 0.5 and self.ema_confidence >= 0.5) | |
| weak = (vote_ratio >= 0.4 and self.ema_confidence >= 0.45) | |
| decided_class = -1 | |
| if strong or medium or weak: | |
| decided_class = voted_class | |
| # 產生單詞 | |
| if decided_class >= 0: | |
| self.current_prediction = decided_class | |
| self._update_word_sequence() | |
| def load_label_mapping_from_csv(labels_file: str = LABELS_PATH): | |
| """從 labels.csv 統一載入標籤映射;失敗則回退到預設。""" | |
| label_map = {} | |
| print(f"🔍 嘗試載入標籤檔案: {labels_file}") | |
| if os.path.exists(labels_file): | |
| try: | |
| df = pd.read_csv(labels_file) | |
| for _, row in df.iterrows(): | |
| label_map[int(row['index'])] = row['label'] | |
| print(f"✅ 從 {labels_file} 載入了 {len(label_map)} 個類別標籤") | |
| print(f"📊 標籤映射: {label_map}") | |
| except Exception as e: | |
| print(f"❌ 讀取 labels.csv 出錯: {e}") | |
| else: | |
| print(f"❌ 標籤檔案不存在: {labels_file}") | |
| if not label_map: | |
| label_map = {0: "eat", 1: "fish", 2: "like", 3: "want"} | |
| print(f"⚠️ 使用預設標籤映射: {label_map}") | |
| return label_map | |
| def initialize_recognizer(): | |
| global recognizer | |
| model_path = MODEL_PATH | |
| recognizer = SignLanguageRecognizer( | |
| model_path=model_path, | |
| frame_buffer_size=30, | |
| prediction_interval=10, | |
| threshold=0.6 | |
| ) | |
| def gen_frames(): | |
| global camera, recognizer, is_running, current_frame, frame_lock | |
| while is_running: | |
| success, frame = camera.read() | |
| if not success: | |
| break | |
| else: | |
| status = recognizer.process_frame(frame) | |
| ret, buffer = cv2.imencode('.jpg', frame) | |
| if not ret: | |
| continue | |
| frame_data = base64.b64encode(buffer).decode('utf-8') | |
| with frame_lock: | |
| current_frame = {'image': frame_data, 'status': status} | |
| socketio.emit('update_frame', {'image': frame_data, 'status': status}) | |
| time.sleep(0.1) # 約10 FPS,降低頻寬與CPU | |
| #-------------------- | |
| # 路由定義 | |
| #-------------------- | |
| # Messenger Bot 路由 | |
| def home(): | |
| """主頁 - 提供Web介面和Messenger Bot狀態""" | |
| return render_template('index.html') | |
| def health_check(): | |
| """健康檢查""" | |
| return { | |
| 'status': 'healthy', | |
| 'environment': 'HuggingFace Spaces' if IS_HUGGINGFACE else 'Local Development', | |
| 'model_loaded': os.path.exists(MODEL_PATH), | |
| 'labels_loaded': os.path.exists(LABELS_PATH) | |
| } | |
| def verify_webhook(): | |
| """驗證 Webhook - Facebook 會呼叫這個來驗證你的服務""" | |
| mode = request.args.get('hub.mode') | |
| token = request.args.get('hub.verify_token') | |
| challenge = request.args.get('hub.challenge') | |
| if mode and token: | |
| if mode == 'subscribe' and token == VERIFY_TOKEN: | |
| print("Webhook 驗證成功!") | |
| return challenge | |
| else: | |
| print("驗證失敗 - token 不正確") | |
| return "驗證失敗", 403 | |
| return "需要驗證參數", 400 | |
| def handle_webhook(): | |
| """處理從 Messenger 來的訊息""" | |
| try: | |
| # 驗證 Facebook 簽章 | |
| if APP_SECRET: | |
| signature = request.headers.get('X-Hub-Signature-256') | |
| if not _verify_facebook_signature(signature, request.data, APP_SECRET): | |
| print("簽章驗證失敗") | |
| return "簽章驗證失敗", 403 | |
| data = request.get_json() | |
| if data.get('object') == 'page': | |
| for entry in data.get('entry', []): | |
| for messaging_event in entry.get('messaging', []): | |
| if messaging_event.get('message'): | |
| handle_message(messaging_event) | |
| elif messaging_event.get('postback'): | |
| handle_postback(messaging_event) | |
| return "EVENT_RECEIVED", 200 | |
| except Exception as e: | |
| print(f"處理 webhook 時發生錯誤: {e}") | |
| return "錯誤", 500 | |
| def _verify_facebook_signature(signature_header: str, payload: bytes, app_secret: str) -> bool: | |
| """驗證 X-Hub-Signature-256 簽章(Facebook Webhook 安全)""" | |
| try: | |
| if not signature_header or not signature_header.startswith('sha256='): | |
| return False | |
| received_sig = signature_header.split('=')[1] | |
| mac = hmac.new(app_secret.encode('utf-8'), msg=payload, digestmod=hashlib.sha256) | |
| expected_sig = mac.hexdigest() | |
| return hmac.compare_digest(received_sig, expected_sig) | |
| except Exception: | |
| return False | |
| def receive_recognition_result(): | |
| """接收手語辨識結果(內部呼叫)""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"status": "error", "message": "沒有收到資料"}), 400 | |
| sender_id = data.get('sender_id') | |
| recognition_result = data.get('recognition_result', '無法辨識') | |
| confidence = data.get('confidence', 0) | |
| if not sender_id: | |
| return jsonify({"status": "error", "message": "缺少 sender_id"}), 400 | |
| print(f"📝 收到手語辨識結果 - 用戶:{sender_id}") | |
| print(f"🎯 辨識結果:{recognition_result}") | |
| print(f"📊 信心度:{confidence}") | |
| # 發送結果給用戶 | |
| send_message(sender_id, recognition_result) | |
| return jsonify({ | |
| "status": "success", | |
| "message": "辨識結果已發送給用戶" | |
| }) | |
| except Exception as e: | |
| print(f"處理辨識結果時發生錯誤:{e}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def process_video(): | |
| """處理上傳的影片檔案(整合版本)""" | |
| try: | |
| # 檢查是否有上傳檔案 | |
| if 'video' not in request.files: | |
| return jsonify({"status": "error", "message": "沒有上傳影片檔案"}), 400 | |
| video_file = request.files['video'] | |
| sender_id = request.form.get('sender_id', 'unknown') | |
| if video_file.filename == '': | |
| return jsonify({"status": "error", "message": "沒有選擇檔案"}), 400 | |
| # 基本 MIME 與副檔名檢查 | |
| allowed_exts = {'.mp4', '.mov', '.avi', '.wmv', '.mkv'} | |
| _, ext = os.path.splitext(video_file.filename.lower()) | |
| content_type = (video_file.content_type or '').lower() | |
| if ext not in allowed_exts and not content_type.startswith('video/'): | |
| return jsonify({"status": "error", "message": "不支援的影片格式"}), 400 | |
| # 使用臨時檔案避免權限問題 | |
| import tempfile | |
| filename = secure_filename(video_file.filename) | |
| timestamp = int(time.time()) | |
| # 創建臨時檔案 | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4', prefix=f'upload_{sender_id}_') as temp_file: | |
| video_path = temp_file.name | |
| video_file.save(video_path) | |
| print(f"📁 影片已儲存:{video_path}") | |
| # 初始化影片辨識器 | |
| model_path = MODEL_PATH | |
| print(f"🔍 模型路徑: {model_path}") | |
| print(f"🔍 模型檔案是否存在: {os.path.exists(model_path)}") | |
| if not os.path.exists(model_path): | |
| return jsonify({ | |
| "status": "error", | |
| "message": f"模型檔案不存在: {model_path}" | |
| }), 500 | |
| video_recognizer = VideoSignLanguageRecognizer(model_path, threshold=0.5) | |
| # 處理影片 | |
| result = video_recognizer.process_video(video_path) | |
| # 清理臨時檔案 | |
| try: | |
| os.remove(video_path) | |
| except: | |
| pass | |
| if result is not None: | |
| # 提取結果數據 | |
| predicted_class = result.get('predicted_class', -1) | |
| word_sequence = result.get('word_sequence', []) | |
| confidence = result.get('confidence', 0.0) | |
| probabilities = result.get('probabilities', []) | |
| generated_sentence = result.get('generated_sentence', '無法辨識手語內容') | |
| # 創建類別機率數據供前端使用 | |
| prob_data = [] | |
| if len(probabilities) > 0: | |
| sorted_indices = np.argsort(probabilities)[::-1][:4] # 取前4個最高機率 | |
| for idx in sorted_indices: | |
| prob = float(probabilities[idx]) | |
| class_label = video_recognizer.label_map.get(idx, f"類別{idx}") | |
| prob_data.append({"label": class_label, "probability": prob}) | |
| # 獲取預測類別的標籤 | |
| predicted_label = video_recognizer.label_map.get(predicted_class, "未知") if predicted_class >= 0 else "未知" | |
| # 如果是來自 Messenger 的請求,發送GPT生成的句子 | |
| if sender_id != 'unknown': | |
| send_message(sender_id, generated_sentence) | |
| return jsonify({ | |
| "status": "success", | |
| "predicted_class": predicted_class, | |
| "predicted_label": predicted_label, | |
| "word_sequence": word_sequence, | |
| "confidence": float(confidence), | |
| "probabilities": prob_data, | |
| "generated_sentence": generated_sentence, | |
| "sender_id": sender_id | |
| }) | |
| else: | |
| return jsonify({ | |
| "status": "error", | |
| "message": "無法辨識手語內容", | |
| "sender_id": sender_id | |
| }), 400 | |
| except Exception as e: | |
| print(f"處理影片時發生錯誤:{e}") | |
| import traceback | |
| traceback.print_exc() # 印出完整的錯誤堆疊 | |
| return jsonify({"status": "error", "message": f"處理影片時發生錯誤: {str(e)}"}), 500 | |
| #-------------------- | |
| # Messenger Bot 輔助函數 | |
| #-------------------- | |
| def handle_message(messaging_event): | |
| """處理一般訊息""" | |
| sender_id = messaging_event['sender']['id'] | |
| message = messaging_event.get('message', {}) | |
| message_text = message.get('text', '') | |
| attachments = message.get('attachments', []) | |
| print(f"收到訊息 from {sender_id}: {message_text}") | |
| # 檢查是否有附件 | |
| if attachments: | |
| for attachment in attachments: | |
| if attachment.get('type') == 'video': | |
| video_url = attachment.get('payload', {}).get('url') | |
| if video_url: | |
| # 直接處理影片(HuggingFace 整合版本) | |
| process_messenger_video(video_url, sender_id) | |
| return | |
| else: | |
| send_message(sender_id, f"收到 {attachment.get('type')} 附件") | |
| return | |
| # 處理文字訊息 | |
| if message_text: | |
| response_text = f"您好!請發送手語影片給我,我會幫您辨識手語內容。" | |
| send_message(sender_id, response_text) | |
| def handle_postback(messaging_event): | |
| """處理 postback 事件(按鈕點擊等)""" | |
| sender_id = messaging_event['sender']['id'] | |
| postback_payload = messaging_event['postback']['payload'] | |
| print(f"收到 postback from {sender_id}: {postback_payload}") | |
| send_message(sender_id, f"收到 postback:{postback_payload}") | |
| def send_message(recipient_id, message_text): | |
| """發送訊息給使用者""" | |
| headers = { | |
| 'Content-Type': 'application/json' | |
| } | |
| data = { | |
| 'recipient': {'id': recipient_id}, | |
| 'message': {'text': message_text} | |
| } | |
| params = { | |
| 'access_token': PAGE_ACCESS_TOKEN | |
| } | |
| response = requests.post( | |
| FACEBOOK_API_URL, | |
| headers=headers, | |
| params=params, | |
| json=data | |
| ) | |
| if response.status_code != 200: | |
| print(f"發送訊息失敗: {response.status_code} - {response.text}") | |
| else: | |
| print(f"訊息發送成功給 {recipient_id}") | |
| def process_messenger_video(video_url, sender_id): | |
| """處理來自 Messenger 的影片(HuggingFace 整合版本)""" | |
| import tempfile | |
| import time | |
| try: | |
| print(f"🎬 開始處理 Messenger 影片:{video_url}") | |
| # 自動修復包含佔位符的 URL | |
| if 'xx.fbcdn.net' in video_url: | |
| print(f"🔧 檢測到佔位符 URL,嘗試自動修復:{video_url}") | |
| video_url = _fix_facebook_cdn_url(video_url) | |
| print(f"🔄 修復後的 URL:{video_url}") | |
| # 檢查 URL 是否可訪問(輕量級檢查) | |
| try: | |
| # 使用 HEAD 請求檢查 URL 是否可訪問 | |
| head_response = requests.head(video_url, timeout=10, verify=False, allow_redirects=True) | |
| if head_response.status_code != 200: | |
| print(f"❌ 影片 URL 不可訪問,狀態碼:{head_response.status_code}") | |
| send_message(sender_id, "影片連結已過期或無法訪問,請重新發送影片。") | |
| return | |
| except requests.exceptions.RequestException as e: | |
| print(f"❌ 影片 URL 檢查失敗:{e}") | |
| send_message(sender_id, "影片連結檢查失敗,請重新發送影片。") | |
| return | |
| # 重試下載邏輯 | |
| max_retries = 3 | |
| retry_delay = 2 # 初始延遲 2 秒 | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"📥 嘗試下載影片(第 {attempt + 1} 次)") | |
| # 下載影片 | |
| response = requests.get(video_url, stream=True, timeout=60, verify=False) | |
| response.raise_for_status() | |
| # 使用臨時檔案避免權限問題 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"messenger_video_{sender_id}_{timestamp}.mp4" | |
| # 創建臨時檔案 | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4', prefix=f'messenger_{sender_id}_') as temp_file: | |
| file_path = temp_file.name | |
| # 寫入檔案 | |
| downloaded_size = 0 | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| temp_file.write(chunk) | |
| downloaded_size += len(chunk) | |
| # 檢查下載的檔案大小 | |
| if downloaded_size < 1024: # 小於 1KB 可能是錯誤 | |
| raise ValueError(f"下載的檔案太小:{downloaded_size} bytes") | |
| print(f"✅ 影片下載完成:{file_path} ({downloaded_size} bytes)") | |
| # 初始化影片辨識器 | |
| model_path = MODEL_PATH | |
| video_recognizer = VideoSignLanguageRecognizer(model_path, threshold=0.5) | |
| # 處理影片 | |
| result = video_recognizer.process_video(file_path) | |
| # 清理臨時檔案 | |
| try: | |
| os.remove(file_path) | |
| except: | |
| pass | |
| if result: | |
| generated_sentence = result.get('generated_sentence', '無法辨識手語內容') | |
| confidence = result.get('confidence', 0.0) | |
| word_sequence = result.get('word_sequence', []) | |
| print(f"✅ 手語辨識完成 - 用戶:{sender_id}") | |
| print(f"📝 模型辨識:{word_sequence}") | |
| print(f"💬 GPT翻譯:{generated_sentence}") | |
| print(f"🎯 信心度:{confidence:.2f}") | |
| # 發送GPT翻譯結果給用戶 | |
| send_message(sender_id, generated_sentence) | |
| else: | |
| send_message(sender_id, "抱歉,無法辨識您的手語內容,請再試一次。") | |
| # 釋放 Mediapipe 資源 | |
| try: | |
| video_recognizer.feature_extractor.close() | |
| except Exception: | |
| pass | |
| return # 成功處理,退出函數 | |
| except requests.exceptions.RequestException as e: | |
| print(f"❌ 下載失敗(第 {attempt + 1} 次):{e}") | |
| if attempt < max_retries - 1: | |
| print(f"⏳ {retry_delay} 秒後重試...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 # 指數退避 | |
| else: | |
| print("❌ 所有重試次數已用完") | |
| send_message(sender_id, "影片下載失敗,請檢查網路連線後重新發送影片。") | |
| except Exception as e: | |
| print(f"❌ 處理影片時發生錯誤:{e}") | |
| send_message(sender_id, "處理影片時發生錯誤,請稍後再試。") | |
| return | |
| except Exception as e: | |
| print(f"處理 Messenger 影片時發生錯誤:{e}") | |
| send_message(sender_id, "處理影片時發生錯誤,請稍後再試。") | |
| def _fix_facebook_cdn_url(url): | |
| """修復包含佔位符 'xx' 的 Facebook CDN URL""" | |
| if 'xx.fbcdn.net' not in url: | |
| return url | |
| # 首先測試原始 URL 是否真的無法訪問 | |
| print(f"🔍 先測試原始 URL 是否可訪問:{url}") | |
| try: | |
| response = requests.head(url, timeout=10, verify=False, allow_redirects=True) | |
| if response.status_code == 200: | |
| print(f"✅ 原始 URL 實際上是可以訪問的!狀態碼:{response.status_code}") | |
| return url # 如果原始 URL 可以訪問,直接返回 | |
| except requests.exceptions.RequestException as e: | |
| print(f"⚠️ 原始 URL 測試失敗:{e},開始嘗試修復...") | |
| # 擴展的 Facebook CDN 子域名列表(包含更多可能性) | |
| common_subdomains = [ | |
| # 主要數據中心 | |
| 'fsin2-1', 'fsin2-2', 'fsin6-1', 'fsin6-2', # 新加坡 | |
| 'fsjc1-1', 'fsjc1-2', 'fsjc2-1', 'fsjc2-2', # 加州 | |
| 'fmaa1-1', 'fmaa1-2', 'fmaa2-1', 'fmaa2-2', # 馬來西亞 | |
| 'fatl1-1', 'fatl1-2', # 亞特蘭大 | |
| 'fsea1-1', 'fsea1-2', # 西雅圖 | |
| 'fiad1-1', 'fiad1-2', # 愛爾蘭都柏林 | |
| 'flin1-1', 'flin1-2', # 倫敦 | |
| 'ffor1-1', 'ffor1-2', # 法蘭克福 | |
| 'ftpe1-1', 'ftpe1-2', # 台灣 | |
| 'fhkg1-1', 'fhkg1-2', # 香港 | |
| 'fbom1-1', 'fbom1-2', # 孟買 | |
| 'fsyd1-1', 'fsyd1-2', # 悉尼 | |
| 'fssa1-1', 'fssa1-2', # 南非 | |
| 'fgig1-1', 'fgig1-2', # 巴西 | |
| # 備用和測試子域名 | |
| 'video', # 有時直接用 video | |
| 'scontent', # 靜態內容 | |
| 'external', # 外部內容 | |
| ] | |
| print(f"🔧 開始測試 {len(common_subdomains)} 個可能的子域名...") | |
| # 替換 'xx' 為每個可能的子域名並測試 | |
| for subdomain in common_subdomains: | |
| fixed_url = url.replace('xx.fbcdn.net', f'{subdomain}.fbcdn.net') | |
| print(f"🔍 測試:{fixed_url}") | |
| try: | |
| # 快速測試 URL 是否可訪問 | |
| response = requests.head(fixed_url, timeout=5, verify=False, allow_redirects=True) | |
| if response.status_code == 200: | |
| print(f"✅ 找到有效的 URL:{fixed_url}") | |
| return fixed_url | |
| except requests.exceptions.RequestException: | |
| continue | |
| # 如果都失敗,返回原始 URL(因為用戶說可以訪問) | |
| print(f"❌ 無法找到更好的 URL,但原始 URL 可能仍然有效:{url}") | |
| return url | |
| #-------------------- | |
| # WebSocket 路由 (即時手語辨識) | |
| #-------------------- | |
| def handle_connect(): | |
| """處理WebSocket連接""" | |
| print('客戶端已連接') | |
| def handle_disconnect(): | |
| """處理WebSocket斷開連接""" | |
| print('客戶端已斷開連接') | |
| def handle_start_stream(data): | |
| """開始視頻流""" | |
| global camera, is_running | |
| # 雲端環境檢查 | |
| if IS_HUGGINGFACE: | |
| return {'status': 'error', 'message': '雲端環境不支援攝像頭功能,請使用影片上傳功能'} | |
| if is_running: | |
| return {'status': 'already_running'} | |
| # 初始化攝像頭 | |
| camera = cv2.VideoCapture(0) | |
| camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640) | |
| camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
| if not camera.isOpened(): | |
| return {'status': 'error', 'message': '無法打開攝像頭'} | |
| # 初始化手語辨識器 | |
| if recognizer is None: | |
| initialize_recognizer() | |
| # 啟動處理線程 | |
| is_running = True | |
| threading.Thread(target=gen_frames, daemon=True).start() | |
| return {'status': 'success'} | |
| def handle_stop_stream(data): | |
| """停止視頻流""" | |
| global camera, is_running | |
| is_running = False | |
| # 釋放攝像頭 | |
| if camera is not None: | |
| camera.release() | |
| camera = None | |
| return {'status': 'success'} | |
| #-------------------- | |
| # 應用程式啟動 | |
| #-------------------- | |
| if __name__ == '__main__': | |
| # HuggingFace Spaces 環境檢測 | |
| port = int(CONFIG.get('PORT', 7860)) # HuggingFace 預設端口 | |
| print("🚀 手語辨識整合系統啟動中...") | |
| print(f"📱 Messenger Bot: {'已配置' if PAGE_ACCESS_TOKEN != 'your_page_access_token' else '未配置'}") | |
| print(f"🤖 OpenAI API: {'已配置' if CONFIG.get('OPENAI_API_KEY') else '未配置'}") | |
| print(f"🔧 運行模式: {'HuggingFace Spaces' if port == 7860 else '本地開發'} | SocketIO: {ASYNC_MODE}") | |
| socketio.run(app, host='0.0.0.0', port=port, debug=CONFIG.get('DEBUG', False)) |