SignView / app.py
XiaoBai1221's picture
Update app.py
847c86a verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hmac
import hashlib
import json
import requests
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import base64
import threading
import time
import mediapipe as mp
import collections
from flask import Flask, request, jsonify, render_template, Response
from werkzeug.utils import secure_filename
from datetime import datetime
from flask_socketio import SocketIO, emit
from openai import OpenAI
from app_config import get_config
# 選擇 SocketIO 執行模式(優先使用 eventlet)
ASYNC_MODE = os.environ.get('SOCKETIO_ASYNC_MODE', 'auto')
try:
import eventlet
if ASYNC_MODE in ('auto', 'eventlet'):
eventlet.monkey_patch()
ASYNC_MODE = 'eventlet'
except Exception:
ASYNC_MODE = 'threading'
# 環境變數設定
# OpenAI API KEY 應該從環境變數獲取,不要硬編碼
# 請在 HuggingFace Spaces 設定中添加 OPENAI_API_KEY 環境變數
# 設定環境變數避免權限問題和減少日誌
os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 減少TensorFlow日誌
os.environ['MEDIAPIPE_DISABLE_GPU'] = '1' # 禁用GPU避免警告
# 避免 eventlet greendns 造成外部連線(如 OpenAI)解析問題
os.environ.setdefault('EVENTLET_NO_GREENDNS', 'yes')
# 環境檢測
IS_HUGGINGFACE = os.environ.get('SPACE_ID') is not None
IS_LOCAL_DEV = not IS_HUGGINGFACE
# 載入集中設定
CONFIG = get_config()
# Flask 應用初始化
app = Flask(__name__)
app.config['SECRET_KEY'] = 'sign_language_secret_key'
app.config['MAX_CONTENT_LENGTH'] = CONFIG.get('MAX_FILE_SIZE', 100 * 1024 * 1024) # 100MB max file size
socketio = SocketIO(app, cors_allowed_origins="*", async_mode=ASYNC_MODE)
# Messenger Bot 設定
VERIFY_TOKEN = CONFIG.get('VERIFY_TOKEN', 'your_verify_token')
PAGE_ACCESS_TOKEN = CONFIG.get('PAGE_ACCESS_TOKEN', 'your_page_access_token')
APP_SECRET = CONFIG.get('APP_SECRET')
FACEBOOK_API_URL = 'https://graph.facebook.com/v18.0/me/messages'
# 路徑設定 - 適應不同環境
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, 'data')
MODEL_PATH = os.path.join(DATA_DIR, 'models', 'sign_language_model.pth')
LABELS_PATH = os.path.join(DATA_DIR, 'labels.csv')
UPLOAD_FOLDER = os.path.join(BASE_DIR, 'uploads')
# 建立必要資料夾
for folder in [UPLOAD_FOLDER, os.path.join(DATA_DIR, 'models'), os.path.join(DATA_DIR, 'features', 'keypoints')]:
os.makedirs(folder, exist_ok=True)
# 全域變數
camera = None
recognizer = None
is_running = False
frame_lock = threading.Lock()
current_frame = None
print(f"🌍 運行環境: {'HuggingFace Spaces' if IS_HUGGINGFACE else '本地開發'}")
print(f"📁 基礎目錄: {BASE_DIR}")
print(f"🤖 模型路徑: {MODEL_PATH}")
print(f"📊 標籤路徑: {LABELS_PATH}")
#--------------------
# AI 模型類別
#--------------------
class FeatureExtractor:
def __init__(self):
# 初始化MediaPipe模型
self.mp_holistic = mp.solutions.holistic
self.mp_drawing = mp.solutions.drawing_utils
self.mp_drawing_styles = mp.solutions.drawing_styles
# 建立長駐的 Holistic 實例(避免每幀重建導致效能低落)
self.holistic = self.mp_holistic.Holistic(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
enable_segmentation=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
def close(self):
try:
if self.holistic:
self.holistic.close()
except Exception:
pass
def extract_pose_keypoints(self, frame, holistic_results):
"""提取骨架關鍵點"""
keypoints = []
# 提取手部關鍵點 (如果檢測到)
if holistic_results.left_hand_landmarks:
for landmark in holistic_results.left_hand_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
# 如果沒有檢測到手,填充0
keypoints.extend([0] * (21 * 3))
if holistic_results.right_hand_landmarks:
for landmark in holistic_results.right_hand_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (21 * 3))
# 提取姿勢關鍵點
if holistic_results.pose_landmarks:
for landmark in holistic_results.pose_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (33 * 3))
return np.array(keypoints)
class SignLanguageModel(nn.Module):
"""
手語辨識模型,使用雙向LSTM和注意力機制,加入批量標準化和殘差連接
"""
def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5):
super(SignLanguageModel, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.num_classes = num_classes
# 特徵投影層,將輸入映射到統一維度
self.feature_projection = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout/2) # 較輕的dropout
)
# 雙向LSTM層
self.lstm = nn.LSTM(
input_size=hidden_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=True
)
# 批量標準化層(用於規範化LSTM輸出)
self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2)
# 注意力機制
self.attention = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1),
nn.Softmax(dim=1)
)
# 分類器
self.classifier = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(dropout/2),
nn.Linear(hidden_dim // 2, num_classes)
)
# L2正則化
self.l2_reg_alpha = 0.001
# 初始化權重
self._init_weights()
def _init_weights(self):
"""初始化模型權重"""
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.LSTM):
for name, param in m.named_parameters():
if 'weight' in name:
nn.init.orthogonal_(param) # 正交初始化對RNN很有效
elif 'bias' in name:
nn.init.zeros_(param)
def forward(self, x):
"""前向傳播"""
# x的形狀: [batch_size, seq_len, feature_dim]
batch_size, seq_len, _ = x.size()
# 特徵投影 - 需要調整維度以適應BatchNorm1d
x_reshaped = x.reshape(-1, x.size(-1)) # [batch_size*seq_len, feature_dim]
x_projected = self.feature_projection[0](x_reshaped) # Linear層
x_projected = x_projected.reshape(batch_size, seq_len, -1) # 恢復形狀 [batch_size, seq_len, hidden_dim]
x_projected = x_projected.transpose(1, 2) # [batch_size, hidden_dim, seq_len] 用於BatchNorm
x_projected = self.feature_projection[1](x_projected) # BatchNorm層
x_projected = x_projected.transpose(1, 2) # 恢復形狀 [batch_size, seq_len, hidden_dim]
x_projected = self.feature_projection[2](x_projected) # ReLU
x_projected = self.feature_projection[3](x_projected) # Dropout
# 保存輸入特徵,用於殘差連接
x_residual = x_projected
# LSTM處理
lstm_out, _ = self.lstm(x_projected)
# lstm_out的形狀: [batch_size, seq_len, hidden_dim*2]
# 對LSTM輸出應用BatchNorm
lstm_out_bn = lstm_out.transpose(1, 2) # [batch_size, hidden_dim*2, seq_len]
lstm_out_bn = self.lstm_bn(lstm_out_bn)
lstm_out = lstm_out_bn.transpose(1, 2) # [batch_size, seq_len, hidden_dim*2]
# 注意力權重計算
attention_weights = self.attention(lstm_out)
# attention_weights的形狀: [batch_size, seq_len, 1]
# 應用注意力機制
context = torch.bmm(lstm_out.transpose(1, 2), attention_weights)
# context的形狀: [batch_size, hidden_dim*2, 1]
context = context.squeeze(-1)
# 最終分類
output = self.classifier(context)
# output的形狀: [batch_size, num_classes]
return output
#--------------------
# 手語辨識器類別
#--------------------
class VideoSignLanguageRecognizer:
"""影片手語辨識器 - 專門處理影片檔案"""
def __init__(self, model_path, threshold=0.7):
self.model_path = model_path
self.threshold = threshold
self.effective_threshold = threshold
# 初始化特徵提取器
self.feature_extractor = FeatureExtractor()
# 加載標籤映射
self.label_map = self._load_label_mapping()
# 加載模型
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model()
# GPT整合
try:
base_url = os.environ.get('OPENAI_BASE_URL')
if base_url:
self.openai_client = OpenAI(timeout=30.0, max_retries=5, base_url=base_url)
else:
self.openai_client = OpenAI(timeout=30.0, max_retries=5)
except Exception as e:
print(f"初始化OpenAI客户端出錯: {e}")
self.openai_client = None
print(f"影片辨識器初始化完成!使用設備: {self.device}")
def _load_label_mapping(self):
"""加載標籤映射(統一由 labels.csv 提供)"""
return load_label_mapping_from_csv()
def _load_model(self):
"""加載訓練好的模型"""
input_dim = 225 # (21+21+33) * 3 = 225
model = SignLanguageModel(
input_dim=input_dim,
hidden_dim=96,
num_layers=2,
num_classes=len(self.label_map),
dropout=0.5
)
# 檢查模型檔案是否存在
if not os.path.exists(self.model_path):
print(f"⚠️ 警告:模型檔案不存在 {self.model_path}")
print("🔧 將使用隨機初始化的模型(僅供測試)")
# 隨機初始化權重用於測試
model.to(self.device)
model.eval()
return model
try:
# 載入權重
model.load_state_dict(torch.load(self.model_path, map_location=self.device))
model.to(self.device)
model.eval()
print(f"✅ 模型載入成功:{self.model_path}")
except Exception as e:
print(f"❌ 模型載入失敗:{e}")
print("🔧 使用隨機初始化的模型")
model.to(self.device)
model.eval()
return model
def process_video(self, video_path):
"""處理整個影片檔案"""
print(f"🎬 開始處理影片:{video_path}")
# 開啟影片
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 無法開啟影片檔:{video_path}")
return None, 0
# 提取特徵序列
keypoints_sequence = []
frame_count = 0
hands_present_count = 0
motion_history = []
prev_gray = None
while True:
ret, frame = cap.read()
if not ret:
break
# 跳幀處理
if frame_count % 5 == 0: # 每5幀處理一次
keypoints, hands_detected = self._extract_features(frame)
if keypoints is not None:
keypoints_sequence.append(keypoints)
if hands_detected:
hands_present_count += 1
# 計算光流運動量
try:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if prev_gray is not None:
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
motion_history.append(float(np.mean(mag)))
prev_gray = gray
except Exception:
pass
frame_count += 1
# 限制處理幀數
if len(keypoints_sequence) >= 60:
break
cap.release()
if len(keypoints_sequence) < 3:
print(f"❌ 有效幀數不足,無法進行辨識")
return None, 0
# 動態調整 threshold(手部存在比例 + 運動量)
frames_used = max(1, len(keypoints_sequence))
hand_ratio = hands_present_count / frames_used
avg_motion = float(np.mean(motion_history)) if motion_history else 0.0
dynamic_threshold = self.threshold
if hand_ratio < 0.3:
dynamic_threshold = min(0.9, dynamic_threshold + 0.1)
if avg_motion < 0.05:
dynamic_threshold = min(0.9, dynamic_threshold + 0.05)
self.effective_threshold = dynamic_threshold
# 進行預測(使用動態 threshold)
prediction, confidence, word_sequence, probabilities = self._predict_from_sequence(keypoints_sequence)
# 使用GPT生成完整句子
generated_sentence = self._generate_sentence_with_gpt(word_sequence)
print(f"🎯 辨識結果:{word_sequence}")
print(f"📈 信心度:{confidence:.2f}")
return {
'predicted_class': prediction,
'word_sequence': word_sequence,
'confidence': confidence,
'probabilities': probabilities,
'generated_sentence': generated_sentence,
'hand_presence_ratio': hand_ratio,
'avg_motion': avg_motion,
'effective_threshold': dynamic_threshold
}
def _extract_features(self, frame):
"""從單一幀提取手部和姿勢特徵"""
# 轉為RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 使用長駐的 holistic 實例處理圖像
results = self.feature_extractor.holistic.process(frame_rgb)
# 檢查是否有手部被檢測到
hands_detected = (results.left_hand_landmarks is not None or
results.right_hand_landmarks is not None)
try:
keypoints = self.feature_extractor.extract_pose_keypoints(frame, results)
return keypoints, hands_detected
except Exception as e:
return None, hands_detected
def _predict_from_sequence(self, keypoints_sequence):
"""從關鍵點序列進行預測"""
# 優化tensor創建避免效能警告
keypoints_array = np.array(keypoints_sequence, dtype=np.float32)
sequence_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(self.device)
with torch.no_grad():
outputs = self.model(sequence_tensor)
probabilities = torch.nn.functional.softmax(outputs, dim=1)
max_prob, predicted_class = torch.max(probabilities, 1)
predicted_class = predicted_class.item()
confidence = max_prob.item()
# 提取所有類別的機率
probs = probabilities[0].cpu().numpy()
effective_thr = getattr(self, 'effective_threshold', self.threshold)
if confidence >= effective_thr:
predicted_word = self.label_map.get(predicted_class, f"類別{predicted_class}")
word_sequence = [predicted_word]
else:
word_sequence = []
return predicted_class, confidence, word_sequence, probs
def _generate_sentence_with_gpt(self, word_sequence):
"""使用GPT根據單詞序列生成一個完整句子"""
if not word_sequence:
return "無法辨識手語內容"
if not self.openai_client:
return " ".join(word_sequence)
try:
# 優化prompt,要求GPT只回覆簡潔句子
prompt = f"手語詞彙: {', '.join(word_sequence)}。請組成一個簡潔的中文句子,只回覆句子內容,不要額外說明。"
response = self.openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是手語翻譯助手。只回覆簡潔的中文句子,不要額外說明或範例。"},
{"role": "user", "content": prompt}
],
max_tokens=50, # 減少token數量
temperature=0.3 # 降低隨機性,更準確
)
result = response.choices[0].message.content.strip()
# 移除可能的引號和額外文字
result = result.replace('"', '').replace("'", '').strip()
# 如果結果太長或包含解釋性文字,回退到原詞彙
if len(result) > 30 or '例如' in result or '可以' in result:
return " ".join(word_sequence)
return result
except Exception as e:
print(f"調用GPT API時出錯: {e}")
return " ".join(word_sequence)
class SignLanguageRecognizer:
"""即時手語辨識器 - 用於攝像頭流"""
def __init__(self, model_path, frame_buffer_size=30, prediction_interval=15, threshold=0.7):
self.model_path = model_path
self.threshold = threshold
self.dynamic_threshold = threshold
self.max_buffer_size = frame_buffer_size
self.prediction_interval = prediction_interval
# 初始化特徵提取器
self.feature_extractor = FeatureExtractor()
# 加載標籤映射
self.label_map = self._load_label_mapping()
# 加載模型
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model()
# 緩衝區和狀態
self.keypoints_buffer = collections.deque(maxlen=frame_buffer_size)
self.frame_count = 0
self.current_prediction = None
self.prediction_probabilities = None
# 手部存在檢測
self.hand_present = False
self.hand_absent_frames = 0
self.hand_absent_threshold = 30
# 單詞序列
self.word_sequence = []
self.last_added_word = None
self.word_cooldown = 0
self.recent_top1_queue = collections.deque(maxlen=15)
self.ema_confidence = 0.0
self.ema_alpha = 0.3
# 生成的句子
self.generated_sentence = ""
self.display_sentence_time = 0
# GPT整合
try:
base_url = os.environ.get('OPENAI_BASE_URL')
if base_url:
self.openai_client = OpenAI(timeout=30.0, max_retries=5, base_url=base_url)
else:
self.openai_client = OpenAI(timeout=30.0, max_retries=5)
except Exception as e:
print(f"初始化OpenAI客户端出錯: {e}")
self.openai_client = None
print(f"即時辨識器初始化完成!使用設備: {self.device}")
def _load_label_mapping(self):
"""加載標籤映射(統一由 labels.csv 提供)"""
return load_label_mapping_from_csv()
def _load_model(self):
"""加載訓練好的模型"""
input_dim = 225
model = SignLanguageModel(
input_dim=input_dim,
hidden_dim=96,
num_layers=2,
num_classes=len(self.label_map),
dropout=0.5
)
# 檢查模型檔案是否存在
if not os.path.exists(self.model_path):
print(f"⚠️ 警告:模型檔案不存在 {self.model_path}")
print("🔧 將使用隨機初始化的模型(僅供測試)")
model.to(self.device)
model.eval()
return model
try:
model.load_state_dict(torch.load(self.model_path, map_location=self.device))
model.to(self.device)
model.eval()
print(f"✅ 即時辨識模型載入成功:{self.model_path}")
except Exception as e:
print(f"❌ 即時辨識模型載入失敗:{e}")
print("🔧 使用隨機初始化的模型")
model.to(self.device)
model.eval()
return model
def process_frame(self, frame):
"""處理單個視頻幀"""
# 提取特徵和檢測手部
keypoint_features, hands_detected = self._extract_features(frame)
# 更新手部存在狀態
self._update_hand_presence(hands_detected)
# 僅當成功提取特徵時才繼續
if keypoint_features is not None:
self.keypoints_buffer.append(keypoint_features)
# 定期進行預測
if self.hand_present and self.frame_count % self.prediction_interval == 0 and len(self.keypoints_buffer) > 5:
self._make_prediction()
self._apply_smoothing_and_decide()
# 手部離開時生成句子
if self.hand_present == False and self.hand_absent_frames == self.hand_absent_threshold and self.word_sequence:
self._generate_sentence_with_gpt()
self.frame_count += 1
if self.word_cooldown > 0:
self.word_cooldown -= 1
# 回傳狀態
status = {
"hand_present": self.hand_present,
"frame_count": self.frame_count,
"current_prediction": None,
"word_sequence": self.word_sequence.copy(),
"generated_sentence": self.generated_sentence,
"display_sentence": (time.time() - self.display_sentence_time < 10)
}
if self.current_prediction is not None:
if self.current_prediction == -1:
status["current_prediction"] = {"label": "未知", "confidence": 0}
else:
label = self.label_map.get(self.current_prediction, f"類別{self.current_prediction}")
confidence = float(self.prediction_probabilities[self.current_prediction]) if self.prediction_probabilities is not None else 0
status["current_prediction"] = {"label": label, "confidence": confidence}
if self.prediction_probabilities is not None:
status["probabilities"] = []
sorted_indices = np.argsort(self.prediction_probabilities)[::-1][:4]
for idx in sorted_indices:
prob = float(self.prediction_probabilities[idx])
class_label = self.label_map.get(idx, f"類別{idx}")
status["probabilities"].append({"label": class_label, "probability": prob})
return status
def _update_hand_presence(self, hands_detected):
"""更新手部存在狀態"""
if hands_detected:
self.hand_present = True
self.hand_absent_frames = 0
else:
self.hand_absent_frames += 1
if self.hand_absent_frames >= self.hand_absent_threshold:
if self.hand_present:
self.hand_present = False
def _update_word_sequence(self):
"""根據當前預測更新單詞序列"""
if self.current_prediction is not None and self.current_prediction >= 0:
word = self.label_map.get(self.current_prediction, f"類別{self.current_prediction}")
if word != self.last_added_word or self.word_cooldown == 0:
self.word_sequence.append(word)
self.last_added_word = word
self.word_cooldown = 20
def _generate_sentence_with_gpt(self):
"""使用GPT根據單詞序列生成一個完整句子"""
if not self.word_sequence:
return
if not self.openai_client:
self.generated_sentence = " ".join(self.word_sequence)
self.display_sentence_time = time.time()
print(f"生成句子: {self.generated_sentence}")
self.word_sequence = []
return
try:
# 優化prompt,要求GPT只回覆簡潔句子
prompt = f"手語詞彙: {', '.join(self.word_sequence)}。請組成一個簡潔的中文句子,只回覆句子內容,不要額外說明。"
response = self.openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是手語翻譯助手。只回覆簡潔的中文句子,不要額外說明或範例。"},
{"role": "user", "content": prompt}
],
max_tokens=50, # 減少token數量
temperature=0.3 # 降低隨機性,更準確
)
result = response.choices[0].message.content.strip()
# 移除可能的引號和額外文字
result = result.replace('"', '').replace("'", '').strip()
# 如果結果太長或包含解釋性文字,回退到原詞彙
if len(result) > 30 or '例如' in result or '可以' in result:
self.generated_sentence = " ".join(self.word_sequence)
else:
self.generated_sentence = result
self.display_sentence_time = time.time()
print(f"GPT生成句子: {self.generated_sentence}")
except Exception as e:
print(f"調用GPT API時出錯: {e}")
self.generated_sentence = " ".join(self.word_sequence)
self.display_sentence_time = time.time()
self.word_sequence = []
def _extract_features(self, frame):
"""從單一幀提取手部和姿勢特徵"""
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.feature_extractor.holistic.process(frame_rgb)
hands_detected = (results.left_hand_landmarks is not None or
results.right_hand_landmarks is not None)
try:
keypoints = self.feature_extractor.extract_pose_keypoints(frame, results)
return keypoints, hands_detected
except Exception as e:
return None, hands_detected
def _make_prediction(self):
"""使用緩衝區中的特徵進行預測,並更新平滑緩衝"""
if len(self.keypoints_buffer) < 2:
return
keypoints_array = np.array(list(self.keypoints_buffer), dtype=np.float32)
keypoints_tensor = torch.from_numpy(keypoints_array).unsqueeze(0).to(self.device)
with torch.no_grad():
outputs = self.model(keypoints_tensor)
probabilities = torch.nn.functional.softmax(outputs, dim=1)
max_prob, predicted_class = torch.max(probabilities, 1)
predicted_class = predicted_class.item()
max_prob = max_prob.item()
probs = probabilities[0].cpu().numpy()
# 更新 EMA 信心
self.ema_confidence = self.ema_alpha * max_prob + (1 - self.ema_alpha) * self.ema_confidence
# 記錄近N次 top1,供投票平滑
self.recent_top1_queue.append(predicted_class)
# 動態 threshold:手不存在或 EMA 偏低時提高門檻
dyn_thr = self.threshold
if not self.hand_present:
dyn_thr = min(0.95, dyn_thr + 0.15)
if self.ema_confidence < 0.5:
dyn_thr = min(0.9, dyn_thr + 0.1)
self.dynamic_threshold = dyn_thr
if max_prob >= dyn_thr:
self.current_prediction = predicted_class
self.prediction_probabilities = probs
else:
self.current_prediction = -1
self.prediction_probabilities = probs
def _apply_smoothing_and_decide(self):
"""多幀投票 + 冷卻控制,縮減抖動後再加入字串"""
if self.current_prediction is None:
return
# 多幀投票:取近N幀最多數的類別
if len(self.recent_top1_queue) >= max(5, self.recent_top1_queue.maxlen // 2):
counts = collections.Counter(self.recent_top1_queue)
voted_class, voted_count = counts.most_common(1)[0]
vote_ratio = voted_count / len(self.recent_top1_queue)
else:
voted_class, vote_ratio = self.current_prediction, 0.0
# 分級門檻:投票占比與 EMA 信心共同決策
strong = (vote_ratio >= 0.6 and self.ema_confidence >= 0.6)
medium = (vote_ratio >= 0.5 and self.ema_confidence >= 0.5)
weak = (vote_ratio >= 0.4 and self.ema_confidence >= 0.45)
decided_class = -1
if strong or medium or weak:
decided_class = voted_class
# 產生單詞
if decided_class >= 0:
self.current_prediction = decided_class
self._update_word_sequence()
def load_label_mapping_from_csv(labels_file: str = LABELS_PATH):
"""從 labels.csv 統一載入標籤映射;失敗則回退到預設。"""
label_map = {}
print(f"🔍 嘗試載入標籤檔案: {labels_file}")
if os.path.exists(labels_file):
try:
df = pd.read_csv(labels_file)
for _, row in df.iterrows():
label_map[int(row['index'])] = row['label']
print(f"✅ 從 {labels_file} 載入了 {len(label_map)} 個類別標籤")
print(f"📊 標籤映射: {label_map}")
except Exception as e:
print(f"❌ 讀取 labels.csv 出錯: {e}")
else:
print(f"❌ 標籤檔案不存在: {labels_file}")
if not label_map:
label_map = {0: "eat", 1: "fish", 2: "like", 3: "want"}
print(f"⚠️ 使用預設標籤映射: {label_map}")
return label_map
def initialize_recognizer():
global recognizer
model_path = MODEL_PATH
recognizer = SignLanguageRecognizer(
model_path=model_path,
frame_buffer_size=30,
prediction_interval=10,
threshold=0.6
)
def gen_frames():
global camera, recognizer, is_running, current_frame, frame_lock
while is_running:
success, frame = camera.read()
if not success:
break
else:
status = recognizer.process_frame(frame)
ret, buffer = cv2.imencode('.jpg', frame)
if not ret:
continue
frame_data = base64.b64encode(buffer).decode('utf-8')
with frame_lock:
current_frame = {'image': frame_data, 'status': status}
socketio.emit('update_frame', {'image': frame_data, 'status': status})
time.sleep(0.1) # 約10 FPS,降低頻寬與CPU
#--------------------
# 路由定義
#--------------------
# Messenger Bot 路由
@app.route('/', methods=['GET'])
def home():
"""主頁 - 提供Web介面和Messenger Bot狀態"""
return render_template('index.html')
@app.route('/health')
def health_check():
"""健康檢查"""
return {
'status': 'healthy',
'environment': 'HuggingFace Spaces' if IS_HUGGINGFACE else 'Local Development',
'model_loaded': os.path.exists(MODEL_PATH),
'labels_loaded': os.path.exists(LABELS_PATH)
}
@app.route('/webhook', methods=['GET'])
def verify_webhook():
"""驗證 Webhook - Facebook 會呼叫這個來驗證你的服務"""
mode = request.args.get('hub.mode')
token = request.args.get('hub.verify_token')
challenge = request.args.get('hub.challenge')
if mode and token:
if mode == 'subscribe' and token == VERIFY_TOKEN:
print("Webhook 驗證成功!")
return challenge
else:
print("驗證失敗 - token 不正確")
return "驗證失敗", 403
return "需要驗證參數", 400
@app.route('/webhook', methods=['POST'])
def handle_webhook():
"""處理從 Messenger 來的訊息"""
try:
# 驗證 Facebook 簽章
if APP_SECRET:
signature = request.headers.get('X-Hub-Signature-256')
if not _verify_facebook_signature(signature, request.data, APP_SECRET):
print("簽章驗證失敗")
return "簽章驗證失敗", 403
data = request.get_json()
if data.get('object') == 'page':
for entry in data.get('entry', []):
for messaging_event in entry.get('messaging', []):
if messaging_event.get('message'):
handle_message(messaging_event)
elif messaging_event.get('postback'):
handle_postback(messaging_event)
return "EVENT_RECEIVED", 200
except Exception as e:
print(f"處理 webhook 時發生錯誤: {e}")
return "錯誤", 500
def _verify_facebook_signature(signature_header: str, payload: bytes, app_secret: str) -> bool:
"""驗證 X-Hub-Signature-256 簽章(Facebook Webhook 安全)"""
try:
if not signature_header or not signature_header.startswith('sha256='):
return False
received_sig = signature_header.split('=')[1]
mac = hmac.new(app_secret.encode('utf-8'), msg=payload, digestmod=hashlib.sha256)
expected_sig = mac.hexdigest()
return hmac.compare_digest(received_sig, expected_sig)
except Exception:
return False
@app.route('/receive_recognition_result', methods=['POST'])
def receive_recognition_result():
"""接收手語辨識結果(內部呼叫)"""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "沒有收到資料"}), 400
sender_id = data.get('sender_id')
recognition_result = data.get('recognition_result', '無法辨識')
confidence = data.get('confidence', 0)
if not sender_id:
return jsonify({"status": "error", "message": "缺少 sender_id"}), 400
print(f"📝 收到手語辨識結果 - 用戶:{sender_id}")
print(f"🎯 辨識結果:{recognition_result}")
print(f"📊 信心度:{confidence}")
# 發送結果給用戶
send_message(sender_id, recognition_result)
return jsonify({
"status": "success",
"message": "辨識結果已發送給用戶"
})
except Exception as e:
print(f"處理辨識結果時發生錯誤:{e}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/process_video', methods=['POST'])
def process_video():
"""處理上傳的影片檔案(整合版本)"""
try:
# 檢查是否有上傳檔案
if 'video' not in request.files:
return jsonify({"status": "error", "message": "沒有上傳影片檔案"}), 400
video_file = request.files['video']
sender_id = request.form.get('sender_id', 'unknown')
if video_file.filename == '':
return jsonify({"status": "error", "message": "沒有選擇檔案"}), 400
# 基本 MIME 與副檔名檢查
allowed_exts = {'.mp4', '.mov', '.avi', '.wmv', '.mkv'}
_, ext = os.path.splitext(video_file.filename.lower())
content_type = (video_file.content_type or '').lower()
if ext not in allowed_exts and not content_type.startswith('video/'):
return jsonify({"status": "error", "message": "不支援的影片格式"}), 400
# 使用臨時檔案避免權限問題
import tempfile
filename = secure_filename(video_file.filename)
timestamp = int(time.time())
# 創建臨時檔案
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4', prefix=f'upload_{sender_id}_') as temp_file:
video_path = temp_file.name
video_file.save(video_path)
print(f"📁 影片已儲存:{video_path}")
# 初始化影片辨識器
model_path = MODEL_PATH
print(f"🔍 模型路徑: {model_path}")
print(f"🔍 模型檔案是否存在: {os.path.exists(model_path)}")
if not os.path.exists(model_path):
return jsonify({
"status": "error",
"message": f"模型檔案不存在: {model_path}"
}), 500
video_recognizer = VideoSignLanguageRecognizer(model_path, threshold=0.5)
# 處理影片
result = video_recognizer.process_video(video_path)
# 清理臨時檔案
try:
os.remove(video_path)
except:
pass
if result is not None:
# 提取結果數據
predicted_class = result.get('predicted_class', -1)
word_sequence = result.get('word_sequence', [])
confidence = result.get('confidence', 0.0)
probabilities = result.get('probabilities', [])
generated_sentence = result.get('generated_sentence', '無法辨識手語內容')
# 創建類別機率數據供前端使用
prob_data = []
if len(probabilities) > 0:
sorted_indices = np.argsort(probabilities)[::-1][:4] # 取前4個最高機率
for idx in sorted_indices:
prob = float(probabilities[idx])
class_label = video_recognizer.label_map.get(idx, f"類別{idx}")
prob_data.append({"label": class_label, "probability": prob})
# 獲取預測類別的標籤
predicted_label = video_recognizer.label_map.get(predicted_class, "未知") if predicted_class >= 0 else "未知"
# 如果是來自 Messenger 的請求,發送GPT生成的句子
if sender_id != 'unknown':
send_message(sender_id, generated_sentence)
return jsonify({
"status": "success",
"predicted_class": predicted_class,
"predicted_label": predicted_label,
"word_sequence": word_sequence,
"confidence": float(confidence),
"probabilities": prob_data,
"generated_sentence": generated_sentence,
"sender_id": sender_id
})
else:
return jsonify({
"status": "error",
"message": "無法辨識手語內容",
"sender_id": sender_id
}), 400
except Exception as e:
print(f"處理影片時發生錯誤:{e}")
import traceback
traceback.print_exc() # 印出完整的錯誤堆疊
return jsonify({"status": "error", "message": f"處理影片時發生錯誤: {str(e)}"}), 500
#--------------------
# Messenger Bot 輔助函數
#--------------------
def handle_message(messaging_event):
"""處理一般訊息"""
sender_id = messaging_event['sender']['id']
message = messaging_event.get('message', {})
message_text = message.get('text', '')
attachments = message.get('attachments', [])
print(f"收到訊息 from {sender_id}: {message_text}")
# 檢查是否有附件
if attachments:
for attachment in attachments:
if attachment.get('type') == 'video':
video_url = attachment.get('payload', {}).get('url')
if video_url:
# 直接處理影片(HuggingFace 整合版本)
process_messenger_video(video_url, sender_id)
return
else:
send_message(sender_id, f"收到 {attachment.get('type')} 附件")
return
# 處理文字訊息
if message_text:
response_text = f"您好!請發送手語影片給我,我會幫您辨識手語內容。"
send_message(sender_id, response_text)
def handle_postback(messaging_event):
"""處理 postback 事件(按鈕點擊等)"""
sender_id = messaging_event['sender']['id']
postback_payload = messaging_event['postback']['payload']
print(f"收到 postback from {sender_id}: {postback_payload}")
send_message(sender_id, f"收到 postback:{postback_payload}")
def send_message(recipient_id, message_text):
"""發送訊息給使用者"""
headers = {
'Content-Type': 'application/json'
}
data = {
'recipient': {'id': recipient_id},
'message': {'text': message_text}
}
params = {
'access_token': PAGE_ACCESS_TOKEN
}
response = requests.post(
FACEBOOK_API_URL,
headers=headers,
params=params,
json=data
)
if response.status_code != 200:
print(f"發送訊息失敗: {response.status_code} - {response.text}")
else:
print(f"訊息發送成功給 {recipient_id}")
def process_messenger_video(video_url, sender_id):
"""處理來自 Messenger 的影片(HuggingFace 整合版本)"""
import tempfile
import time
try:
print(f"🎬 開始處理 Messenger 影片:{video_url}")
# 自動修復包含佔位符的 URL
if 'xx.fbcdn.net' in video_url:
print(f"🔧 檢測到佔位符 URL,嘗試自動修復:{video_url}")
video_url = _fix_facebook_cdn_url(video_url)
print(f"🔄 修復後的 URL:{video_url}")
# 檢查 URL 是否可訪問(輕量級檢查)
try:
# 使用 HEAD 請求檢查 URL 是否可訪問
head_response = requests.head(video_url, timeout=10, verify=False, allow_redirects=True)
if head_response.status_code != 200:
print(f"❌ 影片 URL 不可訪問,狀態碼:{head_response.status_code}")
send_message(sender_id, "影片連結已過期或無法訪問,請重新發送影片。")
return
except requests.exceptions.RequestException as e:
print(f"❌ 影片 URL 檢查失敗:{e}")
send_message(sender_id, "影片連結檢查失敗,請重新發送影片。")
return
# 重試下載邏輯
max_retries = 3
retry_delay = 2 # 初始延遲 2 秒
for attempt in range(max_retries):
try:
print(f"📥 嘗試下載影片(第 {attempt + 1} 次)")
# 下載影片
response = requests.get(video_url, stream=True, timeout=60, verify=False)
response.raise_for_status()
# 使用臨時檔案避免權限問題
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"messenger_video_{sender_id}_{timestamp}.mp4"
# 創建臨時檔案
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4', prefix=f'messenger_{sender_id}_') as temp_file:
file_path = temp_file.name
# 寫入檔案
downloaded_size = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
temp_file.write(chunk)
downloaded_size += len(chunk)
# 檢查下載的檔案大小
if downloaded_size < 1024: # 小於 1KB 可能是錯誤
raise ValueError(f"下載的檔案太小:{downloaded_size} bytes")
print(f"✅ 影片下載完成:{file_path} ({downloaded_size} bytes)")
# 初始化影片辨識器
model_path = MODEL_PATH
video_recognizer = VideoSignLanguageRecognizer(model_path, threshold=0.5)
# 處理影片
result = video_recognizer.process_video(file_path)
# 清理臨時檔案
try:
os.remove(file_path)
except:
pass
if result:
generated_sentence = result.get('generated_sentence', '無法辨識手語內容')
confidence = result.get('confidence', 0.0)
word_sequence = result.get('word_sequence', [])
print(f"✅ 手語辨識完成 - 用戶:{sender_id}")
print(f"📝 模型辨識:{word_sequence}")
print(f"💬 GPT翻譯:{generated_sentence}")
print(f"🎯 信心度:{confidence:.2f}")
# 發送GPT翻譯結果給用戶
send_message(sender_id, generated_sentence)
else:
send_message(sender_id, "抱歉,無法辨識您的手語內容,請再試一次。")
# 釋放 Mediapipe 資源
try:
video_recognizer.feature_extractor.close()
except Exception:
pass
return # 成功處理,退出函數
except requests.exceptions.RequestException as e:
print(f"❌ 下載失敗(第 {attempt + 1} 次):{e}")
if attempt < max_retries - 1:
print(f"⏳ {retry_delay} 秒後重試...")
time.sleep(retry_delay)
retry_delay *= 2 # 指數退避
else:
print("❌ 所有重試次數已用完")
send_message(sender_id, "影片下載失敗,請檢查網路連線後重新發送影片。")
except Exception as e:
print(f"❌ 處理影片時發生錯誤:{e}")
send_message(sender_id, "處理影片時發生錯誤,請稍後再試。")
return
except Exception as e:
print(f"處理 Messenger 影片時發生錯誤:{e}")
send_message(sender_id, "處理影片時發生錯誤,請稍後再試。")
def _fix_facebook_cdn_url(url):
"""修復包含佔位符 'xx' 的 Facebook CDN URL"""
if 'xx.fbcdn.net' not in url:
return url
# 首先測試原始 URL 是否真的無法訪問
print(f"🔍 先測試原始 URL 是否可訪問:{url}")
try:
response = requests.head(url, timeout=10, verify=False, allow_redirects=True)
if response.status_code == 200:
print(f"✅ 原始 URL 實際上是可以訪問的!狀態碼:{response.status_code}")
return url # 如果原始 URL 可以訪問,直接返回
except requests.exceptions.RequestException as e:
print(f"⚠️ 原始 URL 測試失敗:{e},開始嘗試修復...")
# 擴展的 Facebook CDN 子域名列表(包含更多可能性)
common_subdomains = [
# 主要數據中心
'fsin2-1', 'fsin2-2', 'fsin6-1', 'fsin6-2', # 新加坡
'fsjc1-1', 'fsjc1-2', 'fsjc2-1', 'fsjc2-2', # 加州
'fmaa1-1', 'fmaa1-2', 'fmaa2-1', 'fmaa2-2', # 馬來西亞
'fatl1-1', 'fatl1-2', # 亞特蘭大
'fsea1-1', 'fsea1-2', # 西雅圖
'fiad1-1', 'fiad1-2', # 愛爾蘭都柏林
'flin1-1', 'flin1-2', # 倫敦
'ffor1-1', 'ffor1-2', # 法蘭克福
'ftpe1-1', 'ftpe1-2', # 台灣
'fhkg1-1', 'fhkg1-2', # 香港
'fbom1-1', 'fbom1-2', # 孟買
'fsyd1-1', 'fsyd1-2', # 悉尼
'fssa1-1', 'fssa1-2', # 南非
'fgig1-1', 'fgig1-2', # 巴西
# 備用和測試子域名
'video', # 有時直接用 video
'scontent', # 靜態內容
'external', # 外部內容
]
print(f"🔧 開始測試 {len(common_subdomains)} 個可能的子域名...")
# 替換 'xx' 為每個可能的子域名並測試
for subdomain in common_subdomains:
fixed_url = url.replace('xx.fbcdn.net', f'{subdomain}.fbcdn.net')
print(f"🔍 測試:{fixed_url}")
try:
# 快速測試 URL 是否可訪問
response = requests.head(fixed_url, timeout=5, verify=False, allow_redirects=True)
if response.status_code == 200:
print(f"✅ 找到有效的 URL:{fixed_url}")
return fixed_url
except requests.exceptions.RequestException:
continue
# 如果都失敗,返回原始 URL(因為用戶說可以訪問)
print(f"❌ 無法找到更好的 URL,但原始 URL 可能仍然有效:{url}")
return url
#--------------------
# WebSocket 路由 (即時手語辨識)
#--------------------
@socketio.on('connect')
def handle_connect():
"""處理WebSocket連接"""
print('客戶端已連接')
@socketio.on('disconnect')
def handle_disconnect():
"""處理WebSocket斷開連接"""
print('客戶端已斷開連接')
@socketio.on('start_stream')
def handle_start_stream(data):
"""開始視頻流"""
global camera, is_running
# 雲端環境檢查
if IS_HUGGINGFACE:
return {'status': 'error', 'message': '雲端環境不支援攝像頭功能,請使用影片上傳功能'}
if is_running:
return {'status': 'already_running'}
# 初始化攝像頭
camera = cv2.VideoCapture(0)
camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
if not camera.isOpened():
return {'status': 'error', 'message': '無法打開攝像頭'}
# 初始化手語辨識器
if recognizer is None:
initialize_recognizer()
# 啟動處理線程
is_running = True
threading.Thread(target=gen_frames, daemon=True).start()
return {'status': 'success'}
@socketio.on('stop_stream')
def handle_stop_stream(data):
"""停止視頻流"""
global camera, is_running
is_running = False
# 釋放攝像頭
if camera is not None:
camera.release()
camera = None
return {'status': 'success'}
#--------------------
# 應用程式啟動
#--------------------
if __name__ == '__main__':
# HuggingFace Spaces 環境檢測
port = int(CONFIG.get('PORT', 7860)) # HuggingFace 預設端口
print("🚀 手語辨識整合系統啟動中...")
print(f"📱 Messenger Bot: {'已配置' if PAGE_ACCESS_TOKEN != 'your_page_access_token' else '未配置'}")
print(f"🤖 OpenAI API: {'已配置' if CONFIG.get('OPENAI_API_KEY') else '未配置'}")
print(f"🔧 運行模式: {'HuggingFace Spaces' if port == 7860 else '本地開發'} | SocketIO: {ASYNC_MODE}")
socketio.run(app, host='0.0.0.0', port=port, debug=CONFIG.get('DEBUG', False))