Spaces:
Sleeping
Sleeping
File size: 9,799 Bytes
644f14a db6a296 644f14a 6dcee9d 644f14a 6dcee9d 644f14a 6dcee9d 644f14a 6dcee9d 2745ea7 6dcee9d 644f14a 6dcee9d db6a296 644f14a 7cd6a31 644f14a 6dcee9d 644f14a 6dcee9d 644f14a db6a296 6dcee9d db6a296 6dcee9d 644f14a 6dcee9d 644f14a db6a296 644f14a 6dcee9d 644f14a 6dcee9d 644f14a 6dcee9d 644f14a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# core.py
import google.generativeai as genai
import pysrt
import re
import os
import time
# 解析 SRT 檔案
def parse_srt(srt_path):
try:
subs = pysrt.open(srt_path, encoding='utf-8')
srt_data = {}
for sub in subs:
srt_data[sub.index] = {"time": str(sub.start) + " --> " + str(sub.end), "text": sub.text}
return srt_data
except Exception as e:
print(f"解析字幕檔案錯誤: {e}")
return None
# 改進的預處理逐字稿函數
def preprocess_transcript(transcript):
# 移除 SSML 標籤
transcript = re.sub(r'<[^>]*?>', '', transcript)
# 移除多個連續空格
transcript = re.sub(r'\s+', ' ', transcript)
# 簡單地將換行替換為空格(不使用特殊分隔符)
transcript = transcript.replace('\n', ' ')
# 移除文字前後的空白
transcript = transcript.strip()
return transcript
# 驗證SRT結構
def validate_srt(original_srt, modified_srt):
"""
驗證修改後的SRT是否保留了原始SRT的基本結構
Args:
original_srt: 原始SRT數據
modified_srt: 修改後的SRT數據
Returns:
(bool, str): (是否有效, 錯誤信息)
"""
# 檢查編號完整性
if set(original_srt.keys()) != set(modified_srt.keys()):
missing = set(original_srt.keys()) - set(modified_srt.keys())
extra = set(modified_srt.keys()) - set(original_srt.keys())
return False, f"編號不匹配: 缺少 {missing}, 多出 {extra}"
# 檢查時間戳保持不變
for idx in original_srt:
if original_srt[idx]['time'] != modified_srt[idx]['time']:
return False, f"編號 {idx} 的時間戳被修改"
# 檢查文字長度變化不太大(防止大規模刪減或添加)
for idx in original_srt:
orig_len = len(original_srt[idx]['text'])
mod_len = len(modified_srt[idx]['text'])
# 允許30%的文字長度變化
if abs(orig_len - mod_len) / max(1, orig_len) > 0.3:
return False, f"編號 {idx} 的文字長度變化過大: 原 {orig_len}, 現 {mod_len}"
return True, "驗證通過"
def process_files(api_key, test_transcript_file, test_srt_file, batch_size, delay_seconds=2):
# 1. 配置 Gemini API
try:
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-2.0-flash-exp') # 測試金鑰是否正確
except Exception as e:
return f"API 金鑰錯誤,請檢查設定: {e}", None, None
# 檢查測試用逐字稿檔案是否存在
if not os.path.exists(test_transcript_file):
return f"錯誤: 找不到逐字稿檔案: {test_transcript_file},請確認您已上傳該檔案", None, None
else:
with open(test_transcript_file, 'r', encoding='utf-8') as f:
transcript_content = f.read()
transcript_content = preprocess_transcript(transcript_content)
# 檢查測試用 srt 檔案是否存在
if not os.path.exists(test_srt_file):
return f"錯誤: 找不到口語稿檔案: {test_srt_file},請確認您已上傳該檔案", None, None
else:
original_srt_data = parse_srt(test_srt_file)
if original_srt_data is None:
return "錯誤: 解析口語稿檔案失敗,請確認檔案內容", None, None
# 創建工作副本
srt_data = original_srt_data.copy()
# 儲存所有報告
all_reports = []
keys = list(srt_data.keys()) # 取得編號
# 使用固定數量的重疊而非百分比
overlap = 2 # 固定重疊2條字幕
# 處理每個批次,使用固定重疊數量
for i in range(0, len(keys), batch_size - overlap):
end_idx = min(i + batch_size, len(keys))
batch_keys = keys[i:end_idx]
# 確保前一批次的最後部分有重疊
context_start = max(0, i - overlap)
context_keys = keys[context_start:i] if i > 0 else []
# 準備本批次處理的數據
batch_with_context = context_keys + batch_keys
context_batch_data = {key: srt_data[key] for key in batch_with_context}
# 在提示中標記哪些是實際需要處理的部分(非上下文)
subtitle_lines = []
for key in batch_with_context:
prefix = "處理→ " if key in batch_keys else "上下文: "
subtitle_lines.append(f"{prefix}編號{key}:{context_batch_data[key]['text']}")
subtitle_content = "\n".join(subtitle_lines)
prompt = (
"請扮演一位專業的文字編輯,比對以下字幕檔和逐字稿,並以逐字稿的內容為準,修正字幕檔中因語音轉文字造成的錯別字、漏字、多字、詞語誤用等問題,請勿修改語氣和修辭。\n"
"【重要規則】:\n"
"1. 只修正標記有「處理→」的字幕行,標記為「上下文:」的行僅供參考\n"
"2. 必須嚴格保持原始字幕的時間戳和斷句結構\n"
"3. 不允許合併任何字幕行,每個編號必須獨立輸出\n"
"4. 不允許拆分任何字幕行\n"
"5. 不要新增或移除任何字幕段落\n"
"6. 即使對於很長的字幕行,也必須完整處理,不能忽略任何部分\n"
"請使用以下格式輸出每個字幕行:\n"
"編號X:修正後的文字\n"
"接著,請將**當前批次**所有修改的內容整理成一個修改清單,並放在所有字幕內容之後。\n"
"請使用以下格式輸出修改清單: \n"
"編號: 原文: 修正後\n"
"請使用 `<<<分隔符號>>>` 來分隔字幕與修改清單。\n"
"字幕檔內容:\n"
f"{subtitle_content}\n\n"
"逐字稿內容:\n"
f"{transcript_content}\n\n"
)
try:
# 添加重試機制與間隔時間
max_retries = 3
retry_count = 0
retry_delay = 5 # 初始等待秒數
while retry_count < max_retries:
try:
# 添加間隔時間以避免觸發限流
if i > 0:
print(f"等待 {retry_delay} 秒以避免達到API限制...")
time.sleep(retry_delay)
response = model.generate_content(prompt)
corrected_subtitle = response.text
print(f"第 {i // (batch_size - overlap) + 1} 批次 Gemini 模型的回應:")
print(corrected_subtitle)
break # 成功獲取回應,跳出重試循環
except Exception as retry_error:
retry_count += 1
if "429" in str(retry_error):
print(f"遇到配額限制 (429),重試 {retry_count}/{max_retries}...")
retry_delay *= 2 # 指數退避策略
else:
# 其他錯誤,直接拋出
raise retry_error
if retry_count >= max_retries:
raise retry_error
# 使用 re.split 分割字幕和報告
parts = re.split(r'<<<分隔符號>>>', corrected_subtitle, maxsplit=1) # 只分割一次
# 改進的字幕解析方法
corrected_lines = parts[0].strip().split('\n')
# 建立一個映射表來追踪已處理的編號
processed_indices = set()
for line in corrected_lines:
# 確保只處理「處理→」標記的編號字幕行
match = re.match(r'(?:處理→ )?編號(\d+):(.*)', line)
if match:
index = int(match.group(1))
corrected_text = match.group(2).strip()
# 確保此編號在當前批次中且需要處理
if index in batch_keys:
srt_data[index]['text'] = corrected_text
processed_indices.add(index)
# 檢查是否所有批次中的編號都被處理了
for index in batch_keys:
if index not in processed_indices:
print(f"警告:編號 {index} 在AI處理後丟失,保持原始字幕內容")
# 處理報告部分
report = ""
if len(parts) > 1:
report = parts[1] # 取得修改清單文字
report_lines = report.strip().split('\n')
all_reports.extend(report_lines)
except Exception as e:
return f"第 {i // (batch_size - overlap) + 1} 批次修正過程失敗:{e}", None, None
# 驗證修改後的SRT結構
is_valid, error_msg = validate_srt(original_srt_data, srt_data)
if not is_valid:
return f"SRT結構驗證失敗: {error_msg}", None, None
# 寫回 SRT 檔案
new_subs = []
for index, data in sorted(srt_data.items()): # 確保按編號順序排序
time_str = data['time']
start_str, end_str = time_str.split(" --> ")
start = pysrt.srtitem.SubRipTime.from_string(start_str)
end = pysrt.srtitem.SubRipTime.from_string(end_str)
new_subs.append(pysrt.SubRipItem(index=index, start=start, end=end, text=data['text']))
# 將 new_subs 轉換為 SubRipFile 物件
new_srt = pysrt.SubRipFile(items=new_subs)
return None, new_srt, all_reports |