AI_Learning_Hub / app.py
yiming-0120's picture
Update app.py
d634207 verified
import gradio as gr
import os
import tempfile
import openai
import PyPDF2
import docx
from pptx import Presentation
import json
from datetime import datetime
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
import re
from urllib.parse import urlparse
# 全域變數儲存處理後的資料
processed_data_store = None
# 檢查檔案大小(Whisper API 限制 25MB)
def check_file_size(file_path):
file_size = os.path.getsize(file_path) / (1024 * 1024) # 轉換為 MB
if file_size > 25:
raise ValueError("檔案大小超過 25MB,請上傳較小的檔案。")
return file_size
# 語音轉文字主函數
def transcribe_audio(audio_file, language="zh"):
try:
if not os.path.splitext(audio_file)[1].lower() in ['.wav', '.mp3']:
return "錯誤:僅支援 .wav 或 .mp3 檔案格式!", None
check_file_size(audio_file)
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
with open(audio_file, "rb") as file:
transcription = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=file,
language=language
)
transcript_text = transcription.text
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file:
temp_file.write(transcript_text.encode('utf-8'))
temp_file_path = temp_file.name
return transcript_text, temp_file_path
except ValueError as ve:
return f"錯誤:{str(ve)}", None
except Exception as e:
return f"轉錄失敗:{str(e)}", None
# 文檔內容提取函數
def extract_text_from_pdf(file_path):
try:
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text.strip()
except Exception as e:
return f"PDF 讀取錯誤:{str(e)}"
def extract_text_from_docx(file_path):
try:
doc = docx.Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text.strip()
except Exception as e:
return f"DOCX 讀取錯誤:{str(e)}"
def extract_text_from_pptx(file_path):
try:
prs = Presentation(file_path)
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
text += shape.text + "\n"
return text.strip()
except Exception as e:
return f"PPTX 讀取錯誤:{str(e)}"
def extract_document_content(file_path):
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
return extract_text_from_pdf(file_path)
elif file_ext == '.docx':
return extract_text_from_docx(file_path)
elif file_ext in ['.ppt', '.pptx']:
return extract_text_from_pptx(file_path)
else:
return f"不支援的檔案格式:{file_ext}"
# 文字分塊處理
def chunk_text(text, chunk_size=1000, overlap=200):
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
if end < len(text):
for i in range(end, start + chunk_size//2, -1):
if text[i] in ['。', '!', '?', '\n', '.', '!', '?']:
end = i + 1
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - overlap if end < len(text) else end
return chunks
# 生成嵌入向量
def generate_embedding(text, client):
"""使用 OpenAI API 生成文字嵌入向量"""
try:
response = client.embeddings.create(
model="text-embedding-ada-002",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"生成嵌入向量失敗:{str(e)}")
return None
# 新增:生成摘要功能
def generate_summary(text, summary_type="教材", max_tokens=400):
"""使用 OpenAI API 生成內容摘要"""
if not text or not text.strip():
return "無內容可摘要。"
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if summary_type == "教材":
prompt = f"""請用繁體中文將以下教材內容摘要成300字的概述,包含:
1. 主要學科領域
2. 核心概念和主題
3. 學習重點
4. 內容結構
教材內容:
{text[:4000]} # 限制輸入長度避免超過token限制
請提供簡潔且全面的概述:"""
else: # 逐字稿
prompt = f"""請用繁體中文將以下錄音逐字稿摘要成300字,包含:
1. 主要討論主題
2. 重要觀點和概念
3. 關鍵資訊摘要
逐字稿內容:
{text[:4000]}
請提供簡潔且重點突出的摘要:"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.5,
max_tokens=max_tokens
)
summary = response.choices[0].message.content
return summary
except Exception as e:
return f"摘要生成失敗:{str(e)}"
# 新增:生成逐字稿摘要
def generate_transcript_summary(transcript_content):
"""為逐字稿生成摘要"""
if not transcript_content or not transcript_content.strip():
return "尚未有逐字稿內容"
return generate_summary(transcript_content, summary_type="逐字稿")
# 智能領域分析函數
def extract_domain_keywords_from_materials(material_context):
"""從教材上下文中動態提取領域關鍵詞"""
if not material_context:
return {}
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
prompt = f"""分析以下教材檔案名稱和內容,提取出主要的學科領域和相關關鍵詞:
教材內容:{material_context}
請提供:
1. 主要學科領域(例如:計算機科學、物理學、化學、生物學、經濟學等)
2. 5-10個該領域的核心英文關鍵詞
3. 可能產生歧義的詞彙(如果有的話)
格式:
領域:[學科名稱]
關鍵詞:[keyword1, keyword2, keyword3, ...]
歧義詞:[ambiguous_term1, ambiguous_term2, ...]"""
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=300
)
analysis = response.choices[0].message.content
return parse_domain_analysis(analysis)
except Exception as e:
print(f"領域分析失敗:{str(e)}")
return {}
def parse_domain_analysis(analysis):
"""解析領域分析結果"""
result = {
'domain': '',
'keywords': [],
'ambiguous_terms': []
}
lines = analysis.split('\n')
for line in lines:
line = line.strip()
if line.startswith('領域:'):
result['domain'] = line.replace('領域:', '').strip()
elif line.startswith('關鍵詞:'):
keywords_str = line.replace('關鍵詞:', '').strip()
keywords = [kw.strip().strip('[]') for kw in keywords_str.split(',')]
result['keywords'] = [kw for kw in keywords if kw]
elif line.startswith('歧義詞:'):
ambiguous_str = line.replace('歧義詞:', '').strip()
ambiguous = [term.strip().strip('[]') for term in ambiguous_str.split(',')]
result['ambiguous_terms'] = [term for term in ambiguous if term]
return result
def get_alternative_meanings(term, domain):
"""獲取詞彙在其他領域的含義"""
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
prompt = f"""詞彙 "{term}" 在 "{domain}" 領域有特定含義。
請列出這個詞彙在其他領域可能的含義或相關詞彙,用於排除不相關的搜尋結果。
請提供3-5個可能需要排除的相關詞彙:"""
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=150
)
result = response.choices[0].message.content
exclude_terms = []
for line in result.split('\n'):
if line.strip() and not line.startswith('例如'):
words = line.lower().split()
exclude_terms.extend([word for word in words if len(word) > 3])
return exclude_terms[:5]
except Exception as e:
print(f"生成排除詞彙失敗:{str(e)}")
return []
# 網路搜尋功能
def extract_web_content(url, max_chars=2000):
"""提取網頁內容"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
if len(text) > max_chars:
text = text[:max_chars] + "..."
return text
except Exception as e:
print(f"提取網頁內容失敗 {url}: {str(e)}")
return ""
def intelligent_web_search(query, material_context=""):
"""基於教材上下文的智能網路搜尋"""
try:
# 動態分析領域
domain_info = extract_domain_keywords_from_materials(material_context)
# 構建增強查詢
enhanced_query = query
if domain_info.get('keywords'):
relevant_keywords = domain_info['keywords'][:3]
enhanced_query = f"{query} {' '.join(relevant_keywords)}"
# 生成排除詞彙
exclude_terms = []
for ambiguous_term in domain_info.get('ambiguous_terms', []):
if ambiguous_term.upper() in query.upper():
exclude_terms.extend(get_alternative_meanings(ambiguous_term, domain_info.get('domain', '')))
ddgs = DDGS()
results = []
search_results = ddgs.text(enhanced_query, max_results=8)
for result in search_results:
content_lower = (result.get('title', '') + ' ' + result.get('body', '')).lower()
# 檢查排除詞彙
if any(exclude_term.lower() in content_lower for exclude_term in exclude_terms):
continue
# 檢查相關性
if domain_info.get('keywords'):
keyword_count = sum(1 for keyword in domain_info['keywords']
if keyword.lower() in content_lower)
if keyword_count == 0:
continue
results.append({
'title': result.get('title', ''),
'url': result.get('href', ''),
'snippet': result.get('body', ''),
'source': 'web_search'
})
if len(results) >= 3:
break
return results
except Exception as e:
print(f"智能網路搜尋失敗:{str(e)}")
return []
def enhanced_web_search_with_content(query, num_results=3):
"""進行智能網路搜尋並提取內容"""
global processed_data_store
# 獲取豐富的教材上下文
material_context = ""
if processed_data_store:
if processed_data_store.get("materials"):
material_files = [material["filename"] for material in processed_data_store["materials"]]
material_context += f"檔案:{', '.join(material_files)}. "
if processed_data_store.get("chunks"):
content_samples = []
for chunk in processed_data_store["chunks"][:3]:
content_sample = chunk["content"][:200]
content_samples.append(content_sample)
material_context += f"內容範例:{' '.join(content_samples)}"
search_results = intelligent_web_search(query, material_context)
enhanced_results = []
for result in search_results[:num_results]:
content = extract_web_content(result['url'])
if content:
result['content'] = content
enhanced_results.append(result)
return enhanced_results
# 檢查搜尋狀態變化
def check_search_status_change(current_message, history, current_search_enabled):
"""檢查是否為重複問題且搜尋狀態改變"""
if not history:
return False
# 檢查最近的問題是否相同
for user_msg, bot_msg in reversed(history[-3:]): # 檢查最近3輪對話
if user_msg and user_msg.strip().lower() == current_message.strip().lower():
# 檢查之前的回答是否包含網路搜尋結果
if bot_msg:
has_web_sources = "網路來源:" in bot_msg
# 如果之前有網路來源但現在沒啟用,或之前沒有但現在啟用了
if (has_web_sources and not current_search_enabled) or (not has_web_sources and current_search_enabled):
return True
return False
# 修改:RAG 資料前處理主函數(新增摘要功能)
def process_rag_data(material_files, transcript_content):
global processed_data_store
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
processed_data = {
"timestamp": datetime.now().isoformat(),
"materials": [],
"transcript": None,
"chunks": [],
"material_summary": "",
"transcript_summary": ""
}
# 收集所有教材內容用於生成摘要
all_material_content = ""
# 處理教材檔案
if material_files:
for file in material_files:
file_name = os.path.basename(file.name)
file_content = extract_document_content(file.name)
if not file_content.startswith("錯誤") and not file_content.startswith("不支援"):
all_material_content += f"\n\n檔案:{file_name}\n{file_content}"
material_chunks = chunk_text(file_content)
material_info = {
"filename": file_name,
"content": file_content,
"chunks": len(material_chunks),
"type": "material"
}
processed_data["materials"].append(material_info)
for i, chunk in enumerate(material_chunks):
embedding = generate_embedding(chunk, client)
if embedding:
processed_data["chunks"].append({
"content": chunk,
"source": file_name,
"type": "material",
"chunk_id": f"{file_name}_chunk_{i+1}",
"embedding": embedding
})
# 生成教材摘要
if all_material_content.strip():
processed_data["material_summary"] = generate_summary(all_material_content, "教材")
# 處理逐字稿
if transcript_content and transcript_content.strip():
transcript_chunks = chunk_text(transcript_content)
processed_data["transcript"] = {
"content": transcript_content,
"chunks": len(transcript_chunks),
"type": "transcript"
}
# 生成逐字稿摘要
processed_data["transcript_summary"] = generate_summary(transcript_content, "逐字稿")
for i, chunk in enumerate(transcript_chunks):
embedding = generate_embedding(chunk, client)
if embedding:
processed_data["chunks"].append({
"content": chunk,
"source": "錄音逐字稿",
"type": "transcript",
"chunk_id": f"transcript_chunk_{i+1}",
"embedding": embedding
})
# 儲存到全域變數
processed_data_store = processed_data
# 儲存處理結果到臨時檔案
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".json", encoding='utf-8') as temp_file:
json.dump(processed_data, temp_file, ensure_ascii=False, indent=2)
temp_file_path = temp_file.name
# 生成處理報告
total_materials = len(processed_data["materials"])
total_chunks = len(processed_data["chunks"])
has_transcript = processed_data["transcript"] is not None
report = f"""✅ RAG 資料前處理完成!
📊 處理統計:
• 教材檔案數量:{total_materials}
• 逐字稿:{'已處理' if has_transcript else '無'}
• 總文字塊數:{total_chunks}
• 嵌入向量:已生成
• 摘要:已生成
📋 處理詳情:"""
if processed_data["materials"]:
report += "\n\n📚 教材檔案:"
for material in processed_data["materials"]:
report += f"\n • {material['filename']} ({material['chunks']} 個文字塊)"
if has_transcript:
report += f"\n\n🎤 錄音逐字稿:{processed_data['transcript']['chunks']} 個文字塊"
report += f"\n\n💾 資料已準備完成,可前往 AI ChatBot 頁面進行問答!"
return report, temp_file_path, processed_data["material_summary"], processed_data["transcript_summary"]
except Exception as e:
return f"❌ RAG 前處理失敗:{str(e)}", None, "", ""
# 處理函數
def handle_material_upload(files):
if not files:
return "尚未上傳任何教材檔案"
uploaded_files = []
for file in files:
file_name = os.path.basename(file.name)
file_size = os.path.getsize(file.name) / (1024 * 1024)
uploaded_files.append(f"📄 {file_name} ({file_size:.2f} MB)")
return f"已上傳 {len(files)} 個教材檔案:\n" + "\n".join(uploaded_files)
def handle_audio_transcription(audio_file, language):
if not audio_file:
return "請先上傳錄音檔案", "", None, ""
language_names = {
"zh": "中文", "en": "英文", "ja": "日文", "ko": "韓文",
"fr": "法文", "de": "德文", "es": "西班牙文"
}
language_name = language_names.get(language, language)
result, temp_file = transcribe_audio(audio_file, language)
if temp_file:
status_message = f"✅ 轉錄完成!使用語言:{language_name}"
# 生成逐字稿摘要
summary = generate_transcript_summary(result)
return status_message, result, temp_file, summary
else:
return result, "", None, ""
def handle_rag_processing(material_files, transcript_content):
if not material_files and not transcript_content:
return "❌ 請先上傳教材檔案或完成錄音轉錄", None, "", ""
return process_rag_data(material_files, transcript_content)
# ChatBot 核心功能(保持不變)
def search_relevant_chunks(query, top_k=5):
global processed_data_store
if processed_data_store is None or not processed_data_store["chunks"]:
return []
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
query_embedding = generate_embedding(query, client)
if query_embedding is None:
return []
similarities = []
for chunk in processed_data_store["chunks"]:
if "embedding" in chunk:
similarity = cosine_similarity(
[query_embedding],
[chunk["embedding"]]
)[0][0]
similarities.append((chunk, similarity))
similarities.sort(key=lambda x: x[1], reverse=True)
return [item[0] for item in similarities[:top_k]]
except Exception as e:
print(f"搜尋相關內容失敗:{str(e)}")
return []
def enhanced_chatbot_with_web_search(message, history, enable_web_search=False):
global processed_data_store
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# 檢查是否為重複問題且搜尋狀態改變
search_status_changed = check_search_status_change(message, history, enable_web_search)
# 準備對話歷史上下文
conversation_context = ""
if history:
recent_history = history[-3:] if len(history) > 3 else history
for user_msg, bot_msg in recent_history:
if user_msg and bot_msg:
conversation_context += f"用戶:{user_msg}\n助理:{bot_msg}\n\n"
# 1. 搜尋 RAG 資料
rag_chunks = search_relevant_chunks(message, top_k=5)
# 2. 分析教材上下文
material_context = ""
if processed_data_store and processed_data_store.get("materials"):
material_files = [material["filename"] for material in processed_data_store["materials"]]
material_context = f"教材檔案:{', '.join(material_files)}"
# 3. 智能網路搜尋(如果啟用)
web_results = []
if enable_web_search:
web_results = enhanced_web_search_with_content(message, num_results=3)
# 4. 構建帶正確註腳的上下文
all_context_parts = []
footnotes = []
footnote_counter = 1
# 處理 RAG 資料
if rag_chunks:
for chunk in rag_chunks:
footnote_ref = f"[{footnote_counter}]"
footnotes.append(f"[{footnote_counter}] 教材來源:{chunk['source']}")
all_context_parts.append(f"教材內容{footnote_ref}{chunk['content']}")
footnote_counter += 1
# 處理網路搜尋結果
if web_results:
for result in web_results:
footnote_ref = f"[{footnote_counter}]"
footnotes.append(f"[{footnote_counter}] 網路來源:{result['title']} - {result['url']}")
all_context_parts.append(f"網路內容{footnote_ref}{result['content'][:800]}...")
footnote_counter += 1
# 5. 根據可用資料決定回答策略
if all_context_parts:
all_context = "\n\n".join(all_context_parts)
if rag_chunks:
# 構建系統提示詞,明確說明搜尋狀態
search_status_info = f"""
當前搜尋設定:{'已啟用網路搜尋' if enable_web_search else '未啟用網路搜尋'}
"""
# 如果搜尋狀態改變,添加特別說明
status_change_instruction = ""
if search_status_changed:
if enable_web_search:
status_change_instruction = """
重要:用戶剛剛啟用了網路搜尋功能,請提供包含網路搜尋結果的更全面回答,即使之前已經回答過類似問題。
"""
else:
status_change_instruction = """
重要:用戶剛剛關閉了網路搜尋功能,請僅基於教材內容回答,不要參考之前可能包含網路搜尋的回答。
"""
system_prompt = f"""你是一個智能學習助理。請根據提供的教材內容、逐字稿、網路搜尋結果以及對話歷史來回答用戶的問題。
{search_status_info}
{status_change_instruction}
教材上下文:{material_context}
對話歷史:
{conversation_context}
可用資料來源:
{all_context}
重要回答規則:
1. **以教材為核心**:優先使用教材和逐字稿的內容作為回答基礎
2. **正確使用註腳**:在回答中使用對應的註腳編號 [1], [2], [3] 等來標註具體的資料來源
3. **教材優先原則**:當教材有相關內容時,必須以教材內容為主要回答依據
4. **網路資料處理**:
- 如果啟用網路搜尋:網路搜尋結果用於補充教材中沒有的細節或例子
- 如果未啟用網路搜尋:僅使用教材和逐字稿內容,不要參考可能的網路資訊
5. **避免歧義**:根據教材的領域和上下文來理解問題,不回答無關領域的內容
6. **具體例子**:提供具體例子時,優先使用教材中的例子,再補充網路資料(如果啟用)
7. **繁體中文回答**:使用繁體中文進行回答
8. **保持連貫性**:結合對話歷史,但要根據當前的搜尋設定調整回答內容
9. **重複問題處理**:如果是重複問題但搜尋設定改變,請提供符合當前設定的新回答
註腳使用說明:
- 每當引用特定資料來源時,必須在該句末尾加上對應的註腳編號
- 如果一個句子引用多個來源,可以使用多個註腳 [1][2]
- 確保註腳編號與實際提供的資料來源對應
請根據以上原則回答用戶的問題,並正確使用註腳標註。"""
else:
# 只有網路搜尋結果的情況
system_prompt = f"""你是一個智能學習助理。用戶的問題在教材中沒有找到相關內容,但有網路搜尋結果可供參考。
當前搜尋設定:已啟用網路搜尋
對話歷史:
{conversation_context}
網路搜尋資訊:
{all_context}
回答規則:
1. 說明在教材中沒有找到相關資訊
2. 基於網路搜尋結果提供有用的回答
3. 正確使用註腳標註網路來源
4. 結合對話歷史,保持對話的連貫性
5. 用繁體中文回答
請根據以上資訊回答用戶的問題。"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": message}
],
temperature=0.7,
max_tokens=1500
)
answer = response.choices[0].message.content
# 添加搜尋狀態說明
search_status_note = f"\n\n🔍 **搜尋狀態:** {'已啟用網路搜尋' if enable_web_search else '僅使用教材資料'}"
# 添加註腳列表
if footnotes:
footnote_section = "\n\n**參考資料:**\n" + "\n".join(footnotes)
return answer + search_status_note + footnote_section
else:
return answer + search_status_note
else:
# 沒有任何資料來源的備用回答
system_prompt = f"""你是一個智能學習助理。用戶的問題在提供的教材中找不到相關內容,且未啟用網路搜尋或搜尋無結果。
教材上下文:{material_context}
對話歷史:
{conversation_context}
回答規則:
1. 說明在用戶提供的教材中沒有找到相關資訊,且未進行網路搜尋
2. 基於一般知識提供有用的回答,但要說明這不是基於用戶的教材
3. 結合對話歷史,保持對話的連貫性
4. 建議用戶可以啟用網路搜尋獲得更多資訊
5. 用繁體中文回答
請回答用戶的問題。"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": message}
],
temperature=0.8,
max_tokens=1000
)
answer = response.choices[0].message.content
disclaimer = "\n\n💡 **說明:** 在您的教材中沒有找到相關資訊,且未啟用網路搜尋。以上回答基於一般知識提供。建議啟用「聯網搜尋」獲得更完整和最新的資訊。"
return answer + disclaimer
except Exception as e:
return f"抱歉,處理您的問題時發生錯誤:{str(e)}"
# 修改:建立資料處理頁面(新增摘要欄位)
def create_data_processing_interface():
with gr.Blocks(theme=gr.themes.Soft()) as data_demo:
gr.Markdown("# 🎓 AI 學習助理 - 資料處理")
gr.Markdown("上傳教材檔案和錄音檔,進行 RAG 前處理後可前往 ChatBot 頁面進行問答!")
transcript_state = gr.State("")
with gr.Row():
# 左側:教材上傳區域
with gr.Column(scale=1):
gr.Markdown("## 📚 教材上傳區")
material_files = gr.File(
label="上傳教材檔案",
file_count="multiple",
file_types=[".pdf", ".docx", ".ppt", ".pptx"],
height=200
)
material_status = gr.Textbox(
label="教材上傳狀態",
value="尚未上傳任何教材檔案",
interactive=False,
lines=5
)
material_files.change(
fn=handle_material_upload,
inputs=[material_files],
outputs=[material_status]
)
# 右側:錄音檔上傳與轉錄區域
with gr.Column(scale=1):
gr.Markdown("## 🎤 錄音檔轉錄區")
audio_file = gr.File(
label="上傳錄音檔案",
file_types=[".wav", ".mp3"],
height=100
)
language = gr.Dropdown(
label="選擇轉錄語言",
choices=[
("中文", "zh"), ("英文", "en"), ("日文", "ja"), ("韓文", "ko"),
("法文", "fr"), ("德文", "de"), ("西班牙文", "es")
],
value="zh",
info="選擇錄音檔案的主要語言"
)
transcribe_btn = gr.Button("🔄 開始轉錄", variant="primary", size="lg")
transcription_status = gr.Textbox(
label="轉錄狀態",
value="請上傳錄音檔案並選擇語言後點擊轉錄按鈕",
interactive=False,
lines=2
)
transcript_output = gr.Textbox(
label="逐字稿內容",
placeholder="轉錄完成後,逐字稿內容將顯示在這裡...",
interactive=False,
lines=6,
max_lines=10
)
# 新增:逐字稿摘要欄位
transcript_summary = gr.Textbox(
label="📝 逐字稿摘要",
placeholder="轉錄完成後,AI 將自動生成逐字稿摘要...",
interactive=False,
lines=4,
max_lines=6
)
download_file = gr.File(label="下載逐字稿", visible=False)
def transcribe_and_show_download(audio_file, language):
status, content, file_path, summary = handle_audio_transcription(audio_file, language)
if file_path:
return status, content, content, summary, gr.update(visible=True, value=file_path)
else:
return status, content, "", "", gr.update(visible=False, value=None)
transcribe_btn.click(
fn=transcribe_and_show_download,
inputs=[audio_file, language],
outputs=[transcription_status, transcript_output, transcript_state, transcript_summary, download_file]
)
# RAG 資料前處理區域
with gr.Row():
with gr.Column():
gr.Markdown("## 🔄 RAG 資料前處理")
gr.Markdown("將上傳的教材和轉錄的逐字稿進行前處理,準備用於 AI ChatBot")
with gr.Row():
rag_process_btn = gr.Button(
"🚀 開始 RAG 前處理",
variant="secondary",
size="lg",
scale=2
)
rag_status = gr.Textbox(
label="RAG 處理狀態",
value="準備就緒,點擊按鈕開始處理教材和逐字稿",
interactive=False,
lines=6
)
# 新增:教材概述欄位
material_overview = gr.Textbox(
label="📖 教材概述",
placeholder="RAG 前處理完成後,AI 將自動生成教材概述...",
interactive=False,
lines=6,
max_lines=8
)
rag_download = gr.File(label="下載處理結果 (JSON)", visible=False)
def process_and_show_result(material_files, transcript_content):
status, file_path, material_summary, transcript_summary = handle_rag_processing(material_files, transcript_content)
if file_path:
return status, material_summary, gr.update(visible=True, value=file_path)
else:
return status, "", gr.update(visible=False, value=None)
rag_process_btn.click(
fn=process_and_show_result,
inputs=[material_files, transcript_state],
outputs=[rag_status, material_overview, rag_download]
)
with gr.Row():
gr.Markdown("""
### 📋 使用說明
- **教材檔案**:支援 PDF、DOCX、PPT、PPTX 格式
- **錄音檔案**:支援 WAV、MP3 格式,檔案大小限制 25MB
- **轉錄語言**:支援中文、英文、日文、韓文、法文、德文、西班牙文
- **智能摘要**:AI 會自動生成逐字稿摘要和教材概述
- **RAG 前處理**:將教材和逐字稿分塊處理並生成嵌入向量
- **完成處理後**:前往 **AI ChatBot** 頁面進行智能問答
""")
return data_demo
# 建立增強版 ChatBot 頁面(保持不變)
def create_enhanced_chatbot_interface():
with gr.Blocks(theme=gr.themes.Soft()) as chatbot_demo:
gr.Markdown("# 🤖 AI ChatBot")
gr.Markdown("整合 RAG 資料和網路搜尋的智能學習助理,具備連貫對話能力")
# 檢查資料狀態
def check_data_status():
global processed_data_store
if processed_data_store is None:
return "❌ 尚未處理任何資料,請先前往「資料處理」頁面上傳並處理教材或錄音檔"
else:
total_chunks = len(processed_data_store["chunks"])
materials_count = len(processed_data_store["materials"])
has_transcript = processed_data_store["transcript"] is not None
return f"✅ 資料已載入!共 {total_chunks} 個文字塊({materials_count} 個教材檔案,{'含' if has_transcript else '不含'}逐字稿)"
# 介面元件
data_status = gr.Textbox(
label="資料狀態",
value=check_data_status(),
interactive=False,
lines=2
)
refresh_btn = gr.Button("🔄 重新檢查資料狀態", variant="secondary")
refresh_btn.click(fn=check_data_status, outputs=[data_status])
# 網路搜尋開關
with gr.Row():
web_search_toggle = gr.Checkbox(
label="🌐 啟用聯網搜尋",
value=False,
info="啟用後會搜尋網路資料並整合到回答中(狀態變化會影響重複問題的回答)"
)
# ChatBot 介面
chatbot = gr.Chatbot(
label="增強版 AI 學習助理",
height=500,
placeholder="請輸入您的問題,我會根據教材、逐字稿和網路搜尋(如啟用)來回答..."
)
msg = gr.Textbox(
label="輸入問題",
placeholder="例如:這份教材的主要重點是什麼?或者:最新的相關發展有哪些?",
lines=2
)
with gr.Row():
send_btn = gr.Button("💬 發送", variant="primary", scale=2)
clear_btn = gr.Button("🗑️ 清除對話", variant="secondary", scale=1)
# 事件處理函數
def user_message(message, history):
if not message.strip():
return "", history
return "", history + [[message, None]]
def bot_message(history, web_search_enabled):
if history and history[-1][1] is None:
user_msg = history[-1][0]
conversation_history = history[:-1]
bot_response = enhanced_chatbot_with_web_search(
user_msg,
conversation_history,
enable_web_search=web_search_enabled
)
history[-1][1] = bot_response
return history
# 綁定事件
msg.submit(
user_message,
[msg, chatbot],
[msg, chatbot],
queue=False
).then(
bot_message,
[chatbot, web_search_toggle],
chatbot
)
send_btn.click(
user_message,
[msg, chatbot],
[msg, chatbot],
queue=False
).then(
bot_message,
[chatbot, web_search_toggle],
chatbot
)
clear_btn.click(lambda: [], None, chatbot, queue=False)
# 使用說明
with gr.Row():
gr.Markdown("""
### 💡 功能特色
- **多重資料來源**:整合您的教材、逐字稿和網路搜尋結果
- **智能領域分析**:自動識別教材領域,進行相關搜尋
- **正確註腳標註**:清楚標明每個回答的資料來源和網址
- **連貫對話記憶**:記住對話歷史,支援連續問答
- **動態搜尋切換**:可隨時切換網路搜尋,重複問題會給出不同回答
- **教材優先策略**:以教材內容為核心,網路資料作補充
### 📝 使用建議
- **基礎問題**:關於教材內容的問題,可不啟用網路搜尋
- **延伸問題**:需要最新資訊或更多例子時,建議啟用網路搜尋
- **連續對話**:可以說「剛才你提到...」來參考之前的回答
- **搜尋切換**:同一問題可切換搜尋狀態獲得不同深度的回答
### 🔍 搜尋策略
- 🏠 **僅教材資料**:快速回答,基於您的專屬內容
- 🌐 **教材 + 網路搜尋**:全面回答,整合多重資料來源
- 📚 **智能註腳**:每個回答都會標明具體的資料來源
- 🎯 **領域相關**:自動過濾無關領域的搜尋結果
- 🔄 **狀態感知**:系統會檢測搜尋設定變化並調整回答
""")
return chatbot_demo
# 新增播客生成相關函數
def generate_podcast_script(materials, transcript):
"""生成播客文稿"""
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# 合併教材和逐字稿內容
combined_content = ""
material_titles = []
if materials:
for material in materials:
material_titles.append(material['filename'])
combined_content += f"\n教材檔案:{material['filename']}\n{material['content']}\n"
if transcript:
combined_content += f"\n錄音逐字稿:\n{transcript}\n"
# 限制輸入長度避免超過 token 限制
content_for_prompt = combined_content[:6000]
prompt = f"""請根據以下教材和逐字稿內容,撰寫一篇約1500字的播客節目文稿,目標播放時間約10分鐘。
教材檔案:{', '.join(material_titles) if material_titles else '無'}
是否包含逐字稿:{'是' if transcript else '否'}
內容:
{content_for_prompt}
請按照以下格式撰寫播客文稿:
1. **開場白**:簡潔有趣的開場,介紹本期主題
2. **主要內容**:
- 將教材內容轉化為口語化、易懂的說明
- 適當加入提問和思考點
- 使用生活化的比喻和例子
- 保持邏輯清晰的結構
3. **重點總結**:歸納核心概念和要點
4. **結尾**:簡短的總結和下期預告
要求:
- 使用繁體中文
- 語調親切自然,適合口語播報
- 內容深入淺出,適合學習者理解
- 約1500字,播放時間約10分鐘
- 適當加入停頓提示(用...表示)
請開始撰寫播客文稿:"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2000
)
script = response.choices[0].message.content
return script
except Exception as e:
return f"Podcast文稿生成失敗:{str(e)}"
def generate_podcast_audio(script_text):
"""將文稿轉換為音頻"""
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# 使用 OpenAI TTS API
response = client.audio.speech.create(
model="gpt-4o-mini-tts",
voice="alloy", # 可選:alloy, echo, fable, onyx, nova, shimmer
input=script_text,
speed=0.9 # 稍微放慢語速,適合學習
)
# 儲存音頻到臨時檔案
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
temp_audio.write(response.content)
temp_audio_path = temp_audio.name
return temp_audio_path
except Exception as e:
print(f"音頻生成失敗:{str(e)}")
return None
def estimate_reading_time(text):
"""估算播放時間"""
# 中文平均每分鐘約150-200字
word_count = len(text)
estimated_minutes = word_count / 175 # 使用中間值
return word_count, estimated_minutes
# 建立播客生成頁面
def create_podcast_page():
with gr.Blocks(theme=gr.themes.Soft()) as podcast_demo:
gr.Markdown("# 🎙️ AI Podcast生成器")
gr.Markdown("根據您上傳的教材和錄音逐字稿生成約10分鐘的Podcast,包含文稿和音頻")
# 檢查資料狀態
def check_podcast_data_status():
global processed_data_store
if processed_data_store is None:
return "❌ 尚未處理任何資料,請先前往「資料處理」頁面完成前處理"
else:
materials_count = len(processed_data_store["materials"])
has_transcript = processed_data_store["transcript"] is not None
return f"✅ 資料已準備!{materials_count} 個教材檔案,{'含' if has_transcript else '不含'}逐字稿"
# 資料狀態顯示
data_status = gr.Textbox(
label="資料狀態",
value=check_podcast_data_status(),
interactive=False,
lines=2
)
refresh_data_btn = gr.Button("🔄 重新檢查資料狀態", variant="secondary")
refresh_data_btn.click(fn=check_podcast_data_status, outputs=[data_status])
with gr.Row():
# 左側:文稿生成
with gr.Column(scale=1):
gr.Markdown("## 📝 Podcast文稿生成")
generate_script_btn = gr.Button(
"🚀 生成文稿",
variant="primary",
size="lg"
)
script_status = gr.Textbox(
label="生成狀態",
value="點擊按鈕開始生成文稿",
interactive=False,
lines=2
)
podcast_script = gr.Textbox(
label="文稿",
placeholder="文稿將在這裡顯示...",
interactive=True, # 允許用戶編輯
lines=15,
max_lines=20
)
script_info = gr.Textbox(
label="文稿資訊",
interactive=False,
lines=2
)
download_script = gr.File(label="下載文稿 (TXT)", visible=False)
# 右側:音頻生成
with gr.Column(scale=1):
gr.Markdown("## 🎵 音頻生成")
voice_selection = gr.Dropdown(
label="選擇語音",
choices=[
("Alloy - 中性聲音", "alloy"),
("Echo - 男性聲音", "echo"),
("Fable - 英式男性", "fable"),
("Onyx - 深沉男性", "onyx"),
("Nova - 年輕女性", "nova"),
("Shimmer - 溫和女性", "shimmer")
],
value="alloy",
info="選擇適合的聲音"
)
speed_control = gr.Slider(
label="播放速度",
minimum=0.5,
maximum=2.0,
value=0.9,
step=0.1,
info="調整播放速度(0.9倍適合學習)"
)
generate_audio_btn = gr.Button(
"🎤 生成音頻",
variant="secondary",
size="lg"
)
audio_status = gr.Textbox(
label="音頻生成狀態",
value="請先生成文稿,然後點擊生成音頻",
interactive=False,
lines=2
)
podcast_audio = gr.Audio(
label="播客音頻",
visible=False
)
download_audio = gr.File(label="下載音頻 (MP3)", visible=False)
# 事件處理函數
def generate_script():
global processed_data_store
if processed_data_store is None:
return "❌ 請先完成資料前處理", "", "", gr.update(visible=False)
try:
materials = processed_data_store.get("materials", [])
transcript_content = ""
if processed_data_store.get("transcript"):
transcript_content = processed_data_store["transcript"]["content"]
if not materials and not transcript_content:
return "❌ 沒有可用的教材或逐字稿內容", "", "", gr.update(visible=False)
# 生成文稿
script = generate_podcast_script(materials, transcript_content)
if script.startswith("播客文稿生成失敗"):
return script, "", "", gr.update(visible=False)
# 計算文稿資訊
word_count, estimated_time = estimate_reading_time(script)
info_text = f"字數:{word_count} 字 | 預估播放時間:{estimated_time:.1f} 分鐘"
# 儲存文稿到檔案
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".txt", encoding='utf-8') as temp_file:
temp_file.write(script)
temp_file_path = temp_file.name
return "✅ 播客文稿生成完成!", script, info_text, gr.update(visible=True, value=temp_file_path)
except Exception as e:
return f"❌ 生成失敗:{str(e)}", "", "", gr.update(visible=False)
def generate_audio(script, voice, speed):
if not script or not script.strip():
return "❌ 請先生成播客文稿", gr.update(visible=False), gr.update(visible=False)
try:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# 生成音頻
response = client.audio.speech.create(
model="gpt-4o-mini-tts",
voice=voice,
input=script,
speed=speed
)
# 儲存音頻
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
temp_audio.write(response.content)
temp_audio_path = temp_audio.name
return "✅ 音頻生成完成!可以播放和下載", gr.update(visible=True, value=temp_audio_path), gr.update(visible=True, value=temp_audio_path)
except Exception as e:
return f"❌ 音頻生成失敗:{str(e)}", gr.update(visible=False), gr.update(visible=False)
# 綁定事件
generate_script_btn.click(
fn=generate_script,
outputs=[script_status, podcast_script, script_info, download_script]
)
generate_audio_btn.click(
fn=generate_audio,
inputs=[podcast_script, voice_selection, speed_control],
outputs=[audio_status, podcast_audio, download_audio]
)
# 使用說明
with gr.Row():
gr.Markdown("""
### 📋 使用說明
- **資料準備**:確保已在「資料處理」頁面完成教材和逐字稿的前處理
- **文稿生成**:AI 會根據您的教材內容生成約1500字的播客文稿
- **文稿編輯**:生成後可以直接在文稿框中編輯內容
- **音頻生成**:選擇合適的聲音和速度,將文稿轉換為音頻
- **下載功能**:可分別下載文稿(TXT)和音頻(MP3)檔案
### 🎯 播客特色
- **口語化表達**:適合播客的自然語調和節奏
- **結構清晰**:包含開場、主要內容、總結和結尾
- **學習導向**:深入淺出,適合教育用途
- **時長適中**:約10分鐘,適合碎片化學習
""")
return podcast_demo
# 修改主應用建立函數,新增播客頁面
def create_complete_app():
# 創建三個介面
data_interface = create_data_processing_interface()
enhanced_chatbot_interface = create_enhanced_chatbot_interface()
podcast_interface = create_podcast_page()
# 使用 TabbedInterface 組合
demo = gr.TabbedInterface(
[data_interface, enhanced_chatbot_interface, podcast_interface],
["📚 資料處理", "🤖 AI ChatBot", "🎙️ AI Podcast生成器"],
title="🎓 AI Learning Hub"
)
return demo
# 啟動應用程式
if __name__ == "__main__":
app = create_complete_app()
app.launch(share=True, debug=True)