SignView / sign_language_processor.py
XiaoBai1221's picture
feat: Initial commit for Hugging Face deployment
dffbff8
raw
history blame
11.8 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import mediapipe as mp
from openai import OpenAI
# --------------------
# 特徵提取模塊
# --------------------
class FeatureExtractor:
def __init__(self):
self.mp_holistic = mp.solutions.holistic
def extract_pose_keypoints(self, frame, holistic_results):
keypoints = []
if holistic_results.left_hand_landmarks:
for landmark in holistic_results.left_hand_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (21 * 3))
if holistic_results.right_hand_landmarks:
for landmark in holistic_results.right_hand_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (21 * 3))
if holistic_results.pose_landmarks:
for landmark in holistic_results.pose_landmarks.landmark:
keypoints.extend([landmark.x, landmark.y, landmark.z])
else:
keypoints.extend([0] * (33 * 3))
return np.array(keypoints)
# --------------------
# 模型架構
# --------------------
class SignLanguageModel(nn.Module):
"""
手語辨識模型,使用雙向LSTM和注意力機制,加入批量標準化和殘差連接
"""
def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5):
super(SignLanguageModel, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
# 特徵投影層,將輸入映射到統一維度
self.feature_projection = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout/2)
)
# 雙向LSTM層
self.lstm = nn.LSTM(
input_size=hidden_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=True
)
# 批量標準化層(用於規範化LSTM輸出)
self.lstm_bn = nn.BatchNorm1d(hidden_dim * 2)
# 注意力機制
self.attention = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1),
nn.Softmax(dim=1)
)
# 分類器
self.classifier = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(dropout/2),
nn.Linear(hidden_dim // 2, num_classes)
)
def forward(self, x):
batch_size, seq_len, _ = x.size()
# 特徵投影
x_reshaped = x.reshape(-1, x.size(-1))
x_projected_linear = self.feature_projection[0](x_reshaped)
x_projected_reshaped = x_projected_linear.reshape(batch_size, seq_len, -1)
x_projected_transposed = x_projected_reshaped.transpose(1, 2)
x_projected_bn = self.feature_projection[1](x_projected_transposed)
x_projected_transposed_back = x_projected_bn.transpose(1, 2)
x_projected = self.feature_projection[2](x_projected_transposed_back)
x_projected = self.feature_projection[3](x_projected)
# LSTM處理
lstm_out, _ = self.lstm(x_projected)
# 對LSTM輸出應用BatchNorm
lstm_out_bn = self.lstm_bn(lstm_out.transpose(1, 2)).transpose(1, 2)
# 注意力權重計算
attention_weights = self.attention(lstm_out_bn)
# 應用注意力機制
context = torch.bmm(lstm_out_bn.transpose(1, 2), attention_weights).squeeze(-1)
# 最終分類
output = self.classifier(context)
return output
# --------------------
# 影片辨識器
# --------------------
class VideoSignLanguageRecognizer:
def __init__(self, model_path, threshold=0.7):
self.model_path = model_path
self.threshold = threshold
self.feature_extractor = FeatureExtractor()
self.label_map = self._load_label_mapping()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model()
try:
self.openai_client = OpenAI()
except Exception as e:
print(f"初始化OpenAI客户端出錯: {e}")
self.openai_client = None
print(f"影片辨識器初始化完成!使用設備: {self.device}")
def _load_label_mapping(self):
label_map = {}
labels_file = "labels.csv"
if os.path.exists(labels_file):
try:
df = pd.read_csv(labels_file)
for _, row in df.iterrows():
label_map[int(row['index'])] = row['label']
print(f"✅ 從 {labels_file} 載入了 {len(label_map)} 個類別標籤")
except Exception as e:
print(f"❌ 讀取 labels.csv 出錯: {e}")
else:
print(f"⚠️ 找不到標籤檔案: {labels_file},將使用空映射。")
return label_map
def _load_model(self):
# The parameters here must match the original model's training parameters
num_classes = len(self.label_map) if self.label_map else 4 # Fallback to 4 classes if no labels.csv
input_dim = 225
hidden_dim = 96 # Adjusted to match original model
num_layers = 2
model = SignLanguageModel(
input_dim=input_dim,
hidden_dim=hidden_dim,
num_layers=num_layers,
num_classes=num_classes
)
if os.path.exists(self.model_path):
try:
model.load_state_dict(torch.load(self.model_path, map_location=self.device))
model.to(self.device)
model.eval()
print(f"✅ 模型成功從 {self.model_path} 載入")
return model
except Exception as e:
print(f"❌ 載入模型權重時出錯: {e}")
else:
print(f"⚠️ 找不到模型檔案: {self.model_path}")
return None
def process_video(self, video_path):
if not self.model:
return {"status": "error", "message": "模型未成功載入"}
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return {"status": "error", "message": f"無法打開影片檔案: {video_path}"}
all_keypoints = []
with self.feature_extractor.mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
keypoints = self._extract_features(frame, holistic)
all_keypoints.append(keypoints)
cap.release()
if not all_keypoints:
return {"recognition_result": "影片中未偵測到有效動作。", "confidence": 0}
keypoints_sequence = np.array(all_keypoints)
word_sequence, confidence = self._predict_from_sequence(keypoints_sequence)
if self.openai_client and word_sequence:
final_sentence = self._generate_sentence_with_gpt(word_sequence)
else:
final_sentence = " ".join(word_sequence) if word_sequence else "無法辨識"
return {"recognition_result": final_sentence, "confidence": confidence}
def _extract_features(self, frame, holistic):
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image.flags.writeable = False
results = holistic.process(image)
image.flags.writeable = True
return self.feature_extractor.extract_pose_keypoints(frame, results)
def _predict_from_sequence(self, keypoints_sequence):
WINDOW_SIZE = 30
STRIDE = 10
if len(keypoints_sequence) < WINDOW_SIZE:
return self._single_prediction(keypoints_sequence)
predictions = []
for i in range(0, len(keypoints_sequence) - WINDOW_SIZE + 1, STRIDE):
window = keypoints_sequence[i:i + WINDOW_SIZE]
prediction, confidence = self._predict_single_window(window)
if prediction is not None:
predictions.append({"word": prediction, "confidence": confidence})
if not predictions: return [], 0
processed_words, avg_confidence = self._post_process_predictions(predictions)
return processed_words, avg_confidence
def _single_prediction(self, keypoints_sequence):
if len(keypoints_sequence) == 0: return None, 0.0
padded_sequence = self._normalize_sequence_length(keypoints_sequence, 30)
return self._predict_single_window(padded_sequence)
def _predict_single_window(self, window_sequence):
sequence_tensor = torch.tensor(window_sequence, dtype=torch.float32).unsqueeze(0).to(self.device)
with torch.no_grad():
outputs = self.model(sequence_tensor)
probabilities = torch.softmax(outputs, dim=1)
confidence, predicted_idx = torch.max(probabilities, 1)
predicted_label = self.label_map.get(predicted_idx.item())
if confidence.item() > self.threshold:
return predicted_label, confidence.item()
return None, 0.0
def _normalize_sequence_length(self, sequence, target_length):
current_length = len(sequence)
if current_length == 0:
return np.zeros((target_length, sequence.shape[1] if len(sequence.shape)>1 else 225))
if current_length > target_length:
return sequence[:target_length]
else:
padding = np.zeros((target_length - current_length, sequence.shape[1]))
return np.vstack((sequence, padding))
def _post_process_predictions(self, predictions):
if not predictions: return [], 0.0
# Simple deduplication
final_words = []
if predictions:
final_words.append(predictions[0]['word'])
for i in range(1, len(predictions)):
if predictions[i]['word'] != predictions[i-1]['word']:
final_words.append(predictions[i]['word'])
total_confidence = sum(p['confidence'] for p in predictions)
avg_confidence = total_confidence / len(predictions) if predictions else 0.0
return final_words, avg_confidence
def _generate_sentence_with_gpt(self, word_sequence):
if not self.openai_client: return " ".join(word_sequence)
prompt = (f"你是一個手語翻譯專家。請將以下由獨立單詞組成的序列轉換成一句通順、完整的台灣繁體中文句子。"
f"原始單詞序列: [{', '.join(word_sequence)}]\n\n翻譯後的句子:")
try:
response = self.openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.5,
max_tokens=150
)
sentence = response.choices[0].message.content.strip()
print(f"🤖 GPT生成句子: {sentence}")
return sentence
except Exception as e:
print(f"❌ GPT API 調用失敗: {e}")
return " ".join(word_sequence)