tbdavid2019's picture
feat: 更新 README、app.py 和 requirements.txt,調整 SDK 版本及啟動方式
6a45158
import concurrent.futures as cf
import glob
import io
import os
import time
import warnings
import logging
from pathlib import Path
from typing import List, Literal
import gradio as gr
import requests
from dotenv import load_dotenv
import pymupdf
from bs4 import BeautifulSoup
import ebooklib
from ebooklib import epub
# 導入自定義模組
from prompts import get_prompt, get_all_template_names
from quality_control import DialogueQualityChecker, validate_dialogue_structure, suggest_improvements
from content_planner import ContentPlanner, SmartContentSplitter, create_adaptive_prompts
# 設置日誌
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# 忽略 ebooklib 的警告
warnings.filterwarnings('ignore', category=UserWarning, module='ebooklib.epub')
warnings.filterwarnings('ignore', category=FutureWarning, module='ebooklib.epub')
load_dotenv()
# 初始化品質控制和內容規劃器
quality_checker = DialogueQualityChecker()
content_planner = ContentPlanner()
content_splitter = SmartContentSplitter()
def fetch_models(api_key, api_base=None):
"""
Fetch the list of models from the given API base.
"""
base_url = api_base.rstrip("/") + "/models" if api_base else "https://api.openai.com/v1/models"
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.get(base_url, headers=headers)
if response.status_code == 200:
models = response.json().get('data', [])
return [model['id'] for model in models]
else:
return [f"Error fetching models: {response.status_code} {response.reason}"]
except requests.RequestException as e:
return [f"Error fetching models: {str(e)}"]
def generate_dialogue_via_requests(
pdf_text: str,
model: str,
llm_api_key: str,
api_base: str,
user_feedback: str = None,
num_parts: int = 3,
max_input_length: int = 1000000,
max_output_tokens: int = 65536,
progress_callback=None,
template_type: str = "podcast"
) -> str:
"""
Generate dialogue by making a direct request to the LLM API.
使用簡化版本,暫時不使用複雜的內容規劃
"""
logger.info(f"準備生成對話,使用模型: {model}")
# 檢查輸入文本是否為空或過短
if not pdf_text or len(pdf_text.strip()) < 50:
error_msg = f"錯誤:輸入文本過短或為空(當前長度: {len(pdf_text)} 字符)。請確認上傳的文件包含有效內容。"
logger.error(error_msg)
if progress_callback:
progress_callback(error_msg)
return error_msg
# 限制輸入文本長度
original_length = len(pdf_text)
if len(pdf_text) > max_input_length:
pdf_text = pdf_text[:max_input_length]
logger.info(f"輸入文本已截斷: {original_length} -> {max_input_length} 字符")
logger.info(f"輸入文本長度: {len(pdf_text)} 字符")
# 使用直接從 prompts 模組獲取的模板
try:
base_prompt = get_prompt(template_type, pdf_text)
except KeyError:
# 如果模板不存在,使用默認的 podcast 模板
logger.warning(f"模板 '{template_type}' 不存在,使用默認 podcast 模板")
base_prompt = get_prompt("podcast", pdf_text)
# 如果有自定義提示詞,添加到提示中
if user_feedback:
base_prompt += f"\n\n【額外要求】\n{user_feedback}"
headers = {
"Authorization": f"Bearer {llm_api_key}",
"Content-Type": "application/json"
}
base_url = api_base.rstrip("/")
url = f"{base_url}/chat/completions"
logger.info(f"準備發送請求到 API: {url}")
if progress_callback:
progress_callback("正在發送請求到 LLM API...")
# 重試參數
max_retries = 5
retry_delay = 5
# 使用可調整的輸出 token 限制
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": base_prompt
}
],
"temperature": 0.7,
"max_tokens": max_output_tokens, # 可調整的輸出 token 數
"stream": False # 先不用流式,確保穩定性
}
for attempt in range(max_retries):
try:
logger.info(f"發送 API 請求 (嘗試 {attempt+1}/{max_retries})...")
if progress_callback:
progress_callback(f"API 請求中 (嘗試 {attempt+1}/{max_retries})...")
response = requests.post(url, headers=headers, json=payload)
# 處理速率限制錯誤
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', retry_delay))
logger.warning(f"速率限制錯誤 (429)。將在 {retry_after} 秒後重試。嘗試 {attempt+1}/{max_retries}")
if progress_callback:
progress_callback(f"速率限制錯誤 (429)。將在 {retry_after} 秒後重試...")
time.sleep(retry_after)
retry_delay *= 2
continue
if response.status_code != 200:
logger.error(f"API 請求失敗: 狀態碼 {response.status_code}, 原因: {response.reason}")
if progress_callback:
progress_callback(f"API 錯誤: {response.status_code} {response.reason}")
response.raise_for_status()
result = response.json()
generated_content = result['choices'][0]['message']['content']
logger.info("API 請求成功,已收到回應")
if progress_callback:
progress_callback("已成功從 LLM 獲取回應")
# 檢查內容是否被截斷(更精確的檢查方式)
content_lines = generated_content.strip().split('\n')
last_line = content_lines[-1] if content_lines else ""
# 更精確的截斷檢測
is_truncated = (
len(generated_content) < 2000 or # 內容太短
not last_line.strip() or # 最後一行為空
(last_line.startswith('speaker-') and len(last_line.split(':', 1)) > 1 and
len(last_line.split(':', 1)[1].strip()) < 10) or # speaker 行內容太短
generated_content.strip().endswith(('在', '的', '了', '是', '會', '但', '因為', '所以', '這', '那'))
)
# 如果原始輸入文本為空或太短,不進行分批生成
if len(pdf_text.strip()) < 100:
logger.warning("輸入文本太短或為空,跳過分批生成")
is_truncated = False
if is_truncated:
logger.warning("檢測到內容可能被截斷,嘗試分批生成...")
if progress_callback:
progress_callback("檢測到內容可能被截斷,嘗試分批生成...")
# 如果內容被截斷,使用分批生成
full_content = _generate_in_batches(
pdf_text, base_prompt, headers, url, model, num_parts,
progress_callback, max_retries, retry_delay
)
if full_content:
generated_content = full_content
logger.info("使用分批生成成功獲得完整內容")
else:
logger.warning("分批生成失敗,使用原始內容")
# **對完整文稿進行品質檢查**
try:
logger.info("開始進行對話品質檢查")
quality_report = quality_checker.check_dialogue_quality(generated_content, ['speaker-1', 'speaker-2'])
logger.info(f"品質檢查完成,總分: {quality_report.overall_score:.1f}")
if progress_callback:
progress_callback(f"品質檢查完成,分數: {quality_report.overall_score:.1f}/100")
except Exception as e:
logger.warning(f"品質檢查失敗: {e}")
return generated_content
except requests.exceptions.RequestException as e:
error_msg = f"請求失敗: {str(e)}"
logger.error(error_msg)
if attempt < max_retries - 1:
retry_msg = f"將在 {retry_delay} 秒後重試。嘗試 {attempt+1}/{max_retries}"
logger.info(retry_msg)
if progress_callback:
progress_callback(f"{error_msg} {retry_msg}")
time.sleep(retry_delay)
retry_delay *= 2
else:
final_error = f"在 {max_retries} 次嘗試後失敗: {str(e)}"
logger.error(final_error)
if progress_callback:
progress_callback(final_error)
return f"Error after {max_retries} attempts: {str(e)}"
return "生成失敗"
def send_to_discord_webhook(webhook_url: str, script_content: str, summary_content: str = None) -> str:
"""
將腳本和摘要內容以文件形式發送到 Discord webhook
Args:
webhook_url: Discord webhook URL
script_content: 生成的腳本內容
summary_content: 摘要內容(可選)
Returns:
str: 發送結果消息
"""
if not webhook_url or not webhook_url.strip():
return "錯誤:請填寫 Discord Webhook URL"
if not script_content or not script_content.strip():
return "錯誤:沒有腳本內容可以發送"
try:
import tempfile
import json
from datetime import datetime
import socket
import urllib3
# 驗證 webhook URL 格式
if not webhook_url.startswith("https://discord.com/api/webhooks/"):
return "錯誤:請輸入有效的 Discord Webhook URL"
# 測試網路連線
try:
# 嘗試解析 Discord 域名
socket.gethostbyname('discord.com')
logger.info("DNS 解析成功:discord.com")
except socket.gaierror as dns_error:
logger.error(f"DNS 解析失敗: {dns_error}")
return f"❌ 網路連線問題:無法解析 discord.com。請檢查:\n1. 網路連線是否正常\n2. DNS 設定是否正確\n3. 是否需要代理設定\n4. 防火牆是否阻擋連線"
# 創建臨時文件
files_to_send = []
temp_files = [] # 追蹤需要清理的文件
try:
# 創建腳本文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') as script_file:
script_file.write(script_content)
script_filename = f"podcast_script_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
temp_files.append(script_file.name)
with open(script_file.name, 'rb') as f:
files_to_send.append(('file', (script_filename, f.read(), 'text/plain')))
# 如果有摘要內容,創建摘要文件
if summary_content and summary_content.strip():
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') as summary_file:
summary_file.write(summary_content)
summary_filename = f"podcast_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
temp_files.append(summary_file.name)
with open(summary_file.name, 'rb') as f:
files_to_send.append(('file', (summary_filename, f.read(), 'text/plain')))
# 準備 Discord 消息和文件
message_data = {
"content": "📻 **David888 Podcast 內容分享**\n\n🎙️ 新的播客腳本和摘要已生成完成!",
"username": "David888 Podcast Bot"
}
# 使用更強的重試機制發送到 Discord
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
logger.info(f"嘗試發送到 Discord (第 {attempt + 1}/{max_retries} 次)")
# 發送請求,設置超時和重試
response = requests.post(
webhook_url,
data=message_data,
files=files_to_send,
timeout=30, # 30秒超時
)
if response.status_code == 204:
file_count = len(files_to_send)
logger.info(f"成功發送 {file_count} 個文件到 Discord")
return f"✅ 成功發送 {file_count} 個文件到 Discord!"
elif response.status_code == 404:
return "❌ Webhook URL 無效或已過期,請檢查 URL 是否正確"
elif response.status_code == 403:
return "❌ 權限不足,請檢查 Webhook 權限設定"
elif response.status_code == 413:
return "❌ 文件過大,Discord 限制為 8MB (免費) 或 50MB (Nitro)"
else:
logger.warning(f"Discord API 回應: {response.status_code} - {response.text}")
if attempt < max_retries - 1:
import time
time.sleep(retry_delay)
retry_delay *= 2
continue
return f"❌ 發送失敗: HTTP {response.status_code} - {response.text[:100]}"
except requests.exceptions.ConnectionError as conn_error:
logger.error(f"連線錯誤 (嘗試 {attempt + 1}): {conn_error}")
if attempt < max_retries - 1:
import time
time.sleep(retry_delay)
retry_delay *= 2
continue
return f"❌ 網路連線失敗:{str(conn_error)}\n\n建議檢查:\n1. 網路連線穩定性\n2. 防火牆設定\n3. 代理伺服器設定"
except requests.exceptions.Timeout:
logger.error(f"請求超時 (嘗試 {attempt + 1})")
if attempt < max_retries - 1:
import time
time.sleep(retry_delay)
retry_delay *= 2
continue
return "❌ 請求超時,請檢查網路連線或稍後再試"
return "❌ 重試多次後仍然失敗,請檢查網路連線"
finally:
# 清理臨時文件
for temp_file in temp_files:
try:
os.unlink(temp_file)
except:
pass
except ImportError as import_error:
return f"❌ 缺少必要模組: {import_error}"
except Exception as e:
error_msg = f"Discord 發送過程中發生錯誤: {str(e)}"
logger.error(error_msg)
return f"❌ {error_msg}\n\n常見解決方案:\n1. 檢查網路連線\n2. 確認 Webhook URL 正確\n3. 檢查防火牆設定\n4. 如在企業網路,可能需要代理設定"
def generate_summary(
script_content: str,
summary_type: str,
model: str,
llm_api_key: str,
api_base: str,
max_output_tokens: int = 4096,
progress_callback=None
) -> str:
"""
為生成的腳本創建摘要
"""
if not script_content or not script_content.strip():
return "錯誤:請先生成腳本內容"
logger.info(f"開始生成摘要,類型: {summary_type}")
# 從 prompts 模組獲取摘要模板
try:
prompt = get_prompt(summary_type, script_content)
except KeyError:
return f"錯誤:未找到摘要模板 '{summary_type}'"
headers = {
"Authorization": f"Bearer {llm_api_key}",
"Content-Type": "application/json"
}
base_url = api_base.rstrip("/")
url = f"{base_url}/chat/completions"
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": prompt
}
],
"temperature": 0.7,
"max_tokens": max_output_tokens
}
if progress_callback:
progress_callback(f"正在生成{summary_type}摘要...")
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
result = response.json()
summary = result['choices'][0]['message']['content']
logger.info(f"摘要生成完成,長度: {len(summary)} 字符")
if progress_callback:
progress_callback(f"摘要生成完成!")
return summary
except requests.exceptions.RequestException as e:
error_msg = f"摘要生成失敗: {str(e)}"
logger.error(error_msg)
if progress_callback:
progress_callback(error_msg)
return error_msg
def handle_discord_share(webhook_url, script_content, summary_content):
"""處理 Discord 分享"""
if not webhook_url or not webhook_url.strip():
return gr.update(visible=True, value="⚠️ 請先填寫 Discord Webhook URL")
if not script_content or not script_content.strip():
return gr.update(visible=True, value="⚠️ 請先生成腳本內容")
logger.info("開始發送內容到 Discord")
result = send_to_discord_webhook(webhook_url, script_content, summary_content)
if result.startswith("✅"):
return gr.update(visible=True, value=result)
else:
return gr.update(visible=True, value=result)
def _generate_in_batches(pdf_text, base_prompt, headers, url, model, num_parts, progress_callback, max_retries, retry_delay):
"""
分批生成的備用機制,只在單次生成被截斷時使用
"""
try:
# 檢查輸入文本是否足夠
if not pdf_text or len(pdf_text.strip()) < 100:
logger.error("輸入文本為空或太短,無法進行分批生成")
return None
logger.info(f"開始分批生成,共 {num_parts} 個部分")
# 生成內容大綱(簡化版)
outline_prompt = f"""
請為以下內容生成一個簡潔的討論大綱,包含 {num_parts} 個主要部分:
{pdf_text[:5000]}...
請用繁體中文列出 {num_parts} 個主要討論主題,每個主題一行。
"""
# 獲取大綱
outline_payload = {
"model": model,
"messages": [{"role": "user", "content": outline_prompt}],
"temperature": 0.3,
"max_tokens": 1000
}
outline_response = requests.post(url, headers=headers, json=outline_payload)
outline = ""
if outline_response.status_code == 200:
outline = outline_response.json()['choices'][0]['message']['content']
logger.info(f"獲得內容大綱: {outline[:100]}...")
# 分批生成
dialogue_parts = []
context_summary = ""
for part_index in range(num_parts):
is_first_part = part_index == 0
is_last_part = part_index == num_parts - 1
if is_first_part:
part_prompt = f"""
將以下內容轉換成播客對話的第 1/{num_parts} 部分:
【內容來源】
{pdf_text[:10000]}...
【大綱參考】
{outline}
【要求】
- 按照正常格式開場:speaker-1: 歡迎收聽 David888 Podcast,我是 David...
- speaker-2 首次發言時自我介紹為 Cordelia
- 討論前面的主題
- **生成至少 20-30 輪對話**
- **不要結束對話**,在一個開放的討論點停止
- 必須使用繁體中文,格式為 speaker-1: 和 speaker-2:
"""
elif is_last_part:
part_prompt = f"""
延續之前的播客對話,這是第 {part_index+1}/{num_parts} 部分(最後一部分)。
【前文摘要】
{context_summary[-3000:]}
【大綱參考】
{outline}
【內容來源】
{pdf_text}
請:
1. **不要重複開場**,直接繼續前面的對話
2. 完成剩餘主題的討論
3. **生成至少 20-30 輪對話**
4. **自然地結束對話**,包含總結和告別
**必須使用繁體中文,格式為 speaker-1: 和 speaker-2:**
"""
else:
part_prompt = f"""
延續之前的播客對話,這是第 {part_index+1}/{num_parts} 部分(中間部分)。
【前文摘要】
{context_summary[-3000:]}
【大綱參考】
{outline}
【內容來源】
{pdf_text}
請:
1. **不要重複開場**,直接繼續前面的對話
2. 討論相應的主題
3. **生成至少 20-30 輪對話**
4. **不要結束對話**,在一個開放的討論點停止
**必須使用繁體中文,格式為 speaker-1: 和 speaker-2:**
"""
# 生成當前部分,使用更高的 token 限制
part_payload = {
"model": model,
"messages": [{"role": "user", "content": part_prompt}],
"temperature": 0.7,
"max_tokens": 16384 # 提高每部分的 token 限制
}
for attempt in range(max_retries):
try:
if progress_callback:
progress_callback(f"生成第 {part_index+1}/{num_parts} 部分 (嘗試 {attempt+1})...")
part_response = requests.post(url, headers=headers, json=part_payload)
part_response.raise_for_status()
current_part = part_response.json()['choices'][0]['message']['content']
dialogue_parts.append(current_part)
# 更新上下文摘要
if context_summary:
context_summary += "\n\n" + current_part
else:
context_summary = current_part
logger.info(f"完成第 {part_index+1}/{num_parts} 部分")
break
except Exception as e:
logger.error(f"生成第 {part_index+1} 部分失敗: {e}")
if attempt == max_retries - 1:
return None
time.sleep(retry_delay)
# 合併所有部分
full_dialogue = "\n\n".join(dialogue_parts)
logger.info(f"分批生成完成,總長度: {len(full_dialogue)} 字符")
return full_dialogue
except Exception as e:
logger.error(f"分批生成失敗: {e}")
return None
def validate_and_generate_script(
files,
openai_api_key,
text_model,
api_base_value,
template_type,
user_feedback,
num_parts=3,
max_input_length=1000000,
max_output_tokens=65536,
progress_callback=None
):
"""驗證輸入並生成腳本"""
if not files:
logger.warning("未上傳文件")
if progress_callback:
progress_callback("錯誤:請上傳至少一個文件")
return None, "請在生成腳本前上傳至少一個文件。"
try:
logger.info(f"開始處理 {len(files)} 個文件")
if progress_callback:
progress_callback(f"開始處理 {len(files)} 個文件...")
# 從檔案中提取文字
combined_text = ""
for file in files:
filename = file.name.lower()
logger.info(f"處理文件: {filename}")
if progress_callback:
progress_callback(f"處理文件: {os.path.basename(filename)}")
if filename.endswith(".pdf"):
try:
logger.info(f"使用 PyMuPDF 開啟 PDF: {filename}")
doc = pymupdf.open(file.name)
page_count = len(doc)
logger.info(f"PDF 頁數: {page_count}")
for i, page in enumerate(doc):
page_text = page.get_text()
combined_text += page_text + "\n\n"
if i % 10 == 0:
logger.debug(f"已處理 PDF 第 {i+1}/{page_count} 頁")
if progress_callback and page_count > 10:
progress_callback(f"處理 PDF: {os.path.basename(filename)} - {i+1}/{page_count} 頁")
logger.info(f"PDF 處理完成: {filename}")
if progress_callback:
progress_callback(f"PDF 處理完成: {os.path.basename(filename)}")
except Exception as e:
error_msg = f"PDF 處理錯誤 ({filename}): {str(e)}"
logger.error(error_msg)
if progress_callback:
progress_callback(error_msg)
elif filename.endswith(".txt"):
try:
logger.info(f"處理文本文件: {filename}")
with open(file.name, "r", encoding="utf-8", errors="ignore") as f:
file_text = f.read()
combined_text += file_text + "\n\n"
logger.info(f"文本文件處理完成,長度: {len(file_text)} 字符")
if progress_callback:
progress_callback(f"文本文件處理完成: {os.path.basename(filename)}")
except Exception as e:
error_msg = f"TXT 文件處理錯誤 ({filename}): {str(e)}"
logger.error(error_msg)
if progress_callback:
progress_callback(error_msg)
elif filename.endswith(".epub"):
try:
logger.info(f"處理 EPUB 文件: {filename}")
if progress_callback:
progress_callback(f"處理 EPUB 文件: {os.path.basename(filename)}")
book = epub.read_epub(file.name)
items = list(book.get_items())
item_count = len(items)
processed_count = 0
# 如果沒有找到任何項目,嘗試替代方法
if item_count == 0:
logger.warning(f"EPUB 文件沒有找到項目,嘗試替代處理方法: {filename}")
# 嘗試直接讀取 spine 中的文檔
for spine_item in book.spine:
try:
item_id = spine_item[0]
item = book.get_item_by_id(item_id)
if item:
soup = BeautifulSoup(item.get_body_content(), 'html.parser')
item_text = soup.get_text()
if item_text.strip(): # 只添加非空內容
combined_text += item_text + "\n\n"
processed_count += 1
except Exception as spine_error:
logger.debug(f"處理 spine 項目失敗: {spine_error}")
continue
else:
# 正常處理流程
for item in items:
if item.get_type() == ebooklib.ITEM_DOCUMENT:
try:
processed_count += 1
soup = BeautifulSoup(item.get_body_content(), 'html.parser')
item_text = soup.get_text()
if item_text.strip(): # 只添加非空內容
combined_text += item_text + "\n\n"
logger.debug(f"已處理 EPUB 項目 {processed_count}/{item_count}")
if processed_count % 5 == 0 and progress_callback:
progress_callback(f"處理 EPUB: {os.path.basename(filename)} - {processed_count}/{item_count} 項目")
except Exception as item_error:
logger.error(f"EPUB 項目處理錯誤: {item_error}")
continue
logger.info(f"EPUB 處理完成: {filename}, 共處理 {processed_count} 個項目")
if progress_callback:
progress_callback(f"EPUB 處理完成: {os.path.basename(filename)}, 共處理 {processed_count} 個項目")
except Exception as e:
error_msg = f"EPUB 處理錯誤 ({filename}): {str(e)}"
logger.error(error_msg)
if progress_callback:
progress_callback(error_msg)
# 提供 EPUB 處理失敗的具體建議
suggestion = "建議:1) 檢查 EPUB 文件是否完整;2) 嘗試用其他工具轉換為 PDF 或 TXT 格式後重新上傳"
logger.info(suggestion)
if progress_callback:
progress_callback(suggestion)
else:
logger.warning(f"跳過不支持的文件格式: {filename}")
if progress_callback:
progress_callback(f"跳過不支持的文件格式: {os.path.basename(filename)}")
text_length = len(combined_text)
logger.info(f"所有文件處理完成,合併文本長度: {text_length} 字符")
if progress_callback:
progress_callback(f"所有文件處理完成,合併文本長度: {text_length} 字符")
# 檢查是否成功提取到文本內容
if text_length < 50:
error_msg = f"錯誤:未能從上傳的文件中提取到足夠的文本內容(提取到 {text_length} 字符)。請檢查文件格式和內容是否正確。"
logger.error(error_msg)
if progress_callback:
progress_callback(error_msg)
return None, error_msg
# 生成對話腳本
logger.info("開始生成腳本...")
if progress_callback:
progress_callback("開始生成腳本,正在發送請求到 LLM API...")
script = generate_dialogue_via_requests(
pdf_text=combined_text,
model=text_model,
llm_api_key=openai_api_key,
api_base=api_base_value,
user_feedback=user_feedback,
num_parts=num_parts,
max_input_length=max_input_length,
max_output_tokens=max_output_tokens,
progress_callback=progress_callback,
template_type=template_type
)
logger.info("腳本生成完成")
if progress_callback:
progress_callback("腳本生成完成!")
return script, None
except Exception as e:
error_msg = f"腳本生成過程中發生錯誤: {str(e)}"
logger.error(error_msg)
if progress_callback:
progress_callback(error_msg)
return None, error_msg
# Gradio 介面
with gr.Blocks(title="Script Generator", css="""
#generate-btn {
background-color: #FF9800 !important;
color: white !important;
}
#header { text-align: center; margin-bottom: 20px; }
.error { color: red; }
.discord-status {
padding: 10px;
border-radius: 5px;
margin-top: 10px;
}
""") as demo:
gr.Markdown("# 腳本生成器 | Script Generator (重構版)", elem_id="header")
with gr.Row():
with gr.Column(scale=1):
# 輸入區
files = gr.Files(
label="上傳檔案 | Upload Files",
file_types=[".pdf", ".txt", ".epub"],
file_count="multiple",
interactive=True
)
api_base = gr.Textbox(
label="API Base URL",
placeholder="https://generativelanguage.googleapis.com/v1beta/openai",
value="https://generativelanguage.googleapis.com/v1beta/openai"
)
api_key = gr.Textbox(
label="LLM API Key",
type="password"
)
model_dropdown = gr.Dropdown(
label="選擇模型 | Select Model",
choices=[],
interactive=True
)
fetch_button = gr.Button("獲取模型列表 | Fetch Models")
template_dropdown = gr.Dropdown(
label="提示詞模板 | Prompt Template",
choices=get_all_template_names(),
value="podcast",
interactive=True
)
# 移除舊版本的多個提示詞欄位,現在使用模板化系統
# 只保留一個模板預覽欄位
dialog = gr.Textbox(
label="模板預覽 | Template Preview",
lines=8,
value="選擇模板後將顯示提示詞內容",
interactive=False,
info="這是當前選擇模板的提示詞內容預覽"
)
custom_prompt = gr.Textbox(
label="自定義提示詞 | Custom Prompt",
placeholder="Optional: Enter your custom prompt here",
lines=5
)
# 添加分批生成部分數量的滑動條
num_parts_slider = gr.Slider(
minimum=1,
maximum=5,
value=1,
step=1,
label="分批生成部分數量 | Number of Generation Parts",
info="暫時設為1,未來版本將支援智能分批生成"
)
# 添加最大輸入文本長度的滑動條
max_input_length_slider = gr.Slider(
minimum=50000,
maximum=2000000,
value=1000000,
step=50000,
label="最大輸入文本長度 | Max Input Text Length",
info="調整模型可處理的最大輸入文本長度(字符數)"
)
# 添加最大輸出 token 數的滑動條
max_output_tokens_slider = gr.Slider(
minimum=1024,
maximum=131072,
value=65536,
step=1024,
label="最大輸出 Token 數 | Max Output Tokens",
info="調整模型最大輸出 token 數。Gemini Flash 2.5: 65536, GPT-4: 4096, Claude: 8192"
)
with gr.Column(scale=1):
# 輸出區
generate_button = gr.Button("生成腳本 | Generate Script", elem_id="generate-btn")
output_text = gr.Textbox(
label="生成的腳本 | Generated Script",
lines=20,
show_copy_button=True
)
# 摘要生成區域
gr.Markdown("### 📝 Podcast 摘要生成 | Summary Generation")
with gr.Row():
summary_type_dropdown = gr.Dropdown(
label="摘要類型 | Summary Type",
choices=["blog-summary", "intro-summary"],
value="intro-summary",
interactive=True
)
generate_summary_button = gr.Button("生成摘要 | Generate Summary", size="sm")
summary_output = gr.Textbox(
label="生成的摘要 | Generated Summary",
lines=10,
show_copy_button=True,
placeholder="請先生成腳本,然後點擊「生成摘要」按鈕"
)
# Discord Webhook 功能區域
gr.Markdown("### 🔗 Discord 分享 | Discord Sharing")
with gr.Row():
discord_webhook = gr.Textbox(
label="Discord Webhook URL(可選)",
placeholder="https://discord.com/api/webhooks/...",
lines=2,
info="填寫 Discord Webhook URL 可將生成的內容發送到 Discord 頻道"
)
with gr.Row():
share_discord_button = gr.Button(
"📤 Share to Discord",
size="sm",
variant="secondary"
)
discord_status = gr.Markdown(
visible=False,
elem_classes=["discord-status"]
)
error_output = gr.Markdown(
visible=False,
elem_classes=["error"]
)
# 事件處理
def handle_model_fetch(key, base):
logger.info(f"嘗試從 {base} 獲取模型列表")
if not key:
logger.warning("未提供 API 密鑰")
return gr.update(choices=[], value=None), gr.update(visible=True, value="錯誤: 需要 API 密鑰")
models = fetch_models(key, base)
if isinstance(models, list) and models and not models[0].startswith("Error"):
logger.info(f"成功獲取 {len(models)} 個模型")
return gr.update(choices=models, value=models[0]), gr.update(visible=False)
error_msg = models[0] if models else "未知錯誤"
logger.error(f"獲取模型失敗: {error_msg}")
return gr.update(choices=[], value=None), gr.update(visible=True, value=error_msg)
def update_template(template):
logger.info(f"切換模板至: {template}")
try:
# 使用新的模板系統
template_content = get_prompt(template, "[內容將在此處顯示]")
return template_content
except KeyError:
logger.error(f"模板 {template} 不存在")
return "錯誤:模板不存在"
fetch_button.click(
fn=handle_model_fetch,
inputs=[api_key, api_base],
outputs=[model_dropdown, error_output]
)
template_dropdown.change(
fn=update_template,
inputs=[template_dropdown],
outputs=[dialog]
)
def handle_script_generation(*args):
logger.info("開始生成腳本")
script, error = validate_and_generate_script(*args)
if error:
logger.error(f"腳本生成失敗: {error}")
return None, gr.update(visible=True, value=error)
logger.info("腳本生成成功")
return script, gr.update(visible=False)
def handle_summary_generation(script_content, summary_type, api_key_val, model_val, api_base_val, max_tokens_val):
if not script_content or not script_content.strip():
return "錯誤:請先生成腳本內容"
if not api_key_val or not model_val:
return "錯誤:請確保已設定 API 金鑰和模型"
logger.info(f"開始生成摘要,類型: {summary_type}")
def progress_callback(msg):
pass # 簡化版本,不顯示進度
summary = generate_summary(
script_content=script_content,
summary_type=summary_type,
model=model_val,
llm_api_key=api_key_val,
api_base=api_base_val,
max_output_tokens=max_tokens_val // 2, # 摘要使用較少的 tokens
progress_callback=progress_callback
)
return summary
generate_button.click(
fn=handle_script_generation,
inputs=[
files,
api_key,
model_dropdown,
api_base,
template_dropdown, # 使用模板選擇
custom_prompt, # user_feedback
num_parts_slider, # 添加滑動條參數
max_input_length_slider, # 添加最大輸入文本長度參數
max_output_tokens_slider # 添加最大輸出 token 數參數
],
outputs=[output_text, error_output]
)
generate_summary_button.click(
fn=handle_summary_generation,
inputs=[
output_text, # 腳本內容
summary_type_dropdown, # 摘要類型
api_key, # API 金鑰
model_dropdown, # 模型
api_base, # API 基礎 URL
max_output_tokens_slider # 最大輸出 tokens
],
outputs=[summary_output]
)
share_discord_button.click(
fn=handle_discord_share,
inputs=[
discord_webhook, # Discord Webhook URL
output_text, # 腳本內容
summary_output # 摘要內容
],
outputs=[discord_status]
)
app = demo.queue()
if __name__ == "__main__":
logger.info("啟動腳本生成器應用 (重構版)")
app.launch(server_name="0.0.0.0", server_port=7860)