resendLangChain / app.py
z9760405's picture
Update app.py
1b52a16 verified
from dotenv import load_dotenv
import os
import gradio as gr
from PyPDF2 import PdfReader
from langchain.text_splitter import CharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain_community.vectorstores import FAISS
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
import shutil
import tempfile
from docx import Document
from docx.shared import Inches
from datetime import datetime
import resend
# Load environment variables
load_dotenv()
# 檢查是否在 Hugging Face Spaces 環境
IS_SPACES = os.getenv("SPACE_ID") is not None
# Set Gemini API key - 建議使用環境變量
gemini_api_key = os.getenv("GOOGLE_API_KEY") or "AIzaSyAjTPjI5o3-YOzRlHECE1IaG_mrDuzWdbA"
os.environ["GOOGLE_API_KEY"] = gemini_api_key
# Set Resend API key - 建議使用環境變量
resend.api_key = os.getenv("RESEND_API_KEY") or "re_Qcv7LVC9_CU9etihGhypKcB36dCfeXPZ9"
class PDFChatBot:
def __init__(self):
self.vector_store = None
self.embeddings = GoogleGenerativeAIEmbeddings(
model="models/text-embedding-004", google_api_key=gemini_api_key
)
self.processed_files = []
self.chat_history = [] # 儲存聊天歷史
# 設置向量存儲路徑 - 針對不同環境使用不同策略
if IS_SPACES:
# 在 Hugging Face Spaces 中使用臨時目錄
self.vector_store_path = tempfile.mkdtemp(prefix="faiss_index_")
print(f"🌐 Hugging Face Spaces 環境檢測,使用臨時目錄: {self.vector_store_path}")
else:
# 本地環境使用固定路徑
self.vector_store_path = "faiss_index"
print(f"💻 本地環境檢測,使用目錄: {self.vector_store_path}")
def get_pdf_text(self, pdf_files):
"""從多個PDF文件中提取文字"""
raw_text = ""
processed_count = 0
error_messages = []
if not pdf_files:
return raw_text, processed_count, ["沒有提供PDF文件"]
# 處理單個文件和多個文件
if not isinstance(pdf_files, list):
pdf_files = [pdf_files]
for pdf_file in pdf_files:
try:
# 如果是上傳的文件對象,使用其name屬性
pdf_path = pdf_file.name if hasattr(pdf_file, "name") else pdf_file
print(f"正在處理文件: {pdf_path}")
# 檢查文件是否存在
if not os.path.exists(pdf_path):
error_messages.append(f"文件不存在: {pdf_path}")
continue
# 檢查文件大小
file_size = os.path.getsize(pdf_path)
if file_size == 0:
error_messages.append(f"文件為空: {pdf_path}")
continue
print(f"文件大小: {file_size} bytes")
pdf_reader = PdfReader(pdf_path)
# 檢查PDF是否有頁面
if len(pdf_reader.pages) == 0:
error_messages.append(f"PDF文件沒有頁面: {pdf_path}")
continue
print(f"PDF頁數: {len(pdf_reader.pages)}")
file_text = ""
for page_num, page in enumerate(pdf_reader.pages):
try:
text = page.extract_text()
if text:
file_text += text + "\n"
print(f"頁面 {page_num + 1} 提取的文字長度: {len(text) if text else 0}")
except Exception as e:
print(f"提取頁面 {page_num + 1} 時發生錯誤: {str(e)}")
error_messages.append(f"提取頁面 {page_num + 1} 失敗: {str(e)}")
continue
if file_text.strip():
raw_text += file_text
processed_count += 1
self.processed_files.append(os.path.basename(pdf_path))
print(f"成功處理文件: {pdf_path}, 提取文字長度: {len(file_text)}")
else:
error_messages.append(f"無法從PDF中提取文字: {pdf_path}")
except Exception as e:
error_msg = f"讀取PDF時發生錯誤 ({pdf_path}): {str(e)}"
print(error_msg)
error_messages.append(error_msg)
continue
return raw_text, processed_count, error_messages
def get_text_chunks(self, text):
"""將文字分割成區塊進行處理"""
try:
if not text or not text.strip():
print("警告: 沒有文字可以分割")
return []
text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=10000,
chunk_overlap=1000,
length_function=len,
)
chunks = text_splitter.split_text(text)
print(f"成功分割文字為 {len(chunks)} 個區塊")
return chunks
except Exception as e:
print(f"分割文字時發生錯誤: {str(e)}")
return []
def create_vector_store(self, chunks):
"""從文字區塊創建FAISS向量存儲"""
try:
if not chunks:
print("錯誤: 沒有文字區塊可以創建向量存儲")
return False
print(f"正在創建向量存儲,共 {len(chunks)} 個文字區塊...")
# 測試embedding是否正常工作
try:
test_embedding = self.embeddings.embed_query("測試文字")
print(f"Embedding測試成功,向量維度: {len(test_embedding)}")
except Exception as e:
print(f"Embedding測試失敗: {str(e)}")
return False
self.vector_store = FAISS.from_texts(chunks, self.embeddings)
# 創建目錄(如果不存在)
try:
os.makedirs(self.vector_store_path, exist_ok=True)
self.vector_store.save_local(self.vector_store_path)
print(f"向量存儲創建成功,保存至: {self.vector_store_path}")
except Exception as e:
print(f"保存向量存儲時發生錯誤,但向量存儲已在內存中創建: {str(e)}")
# 即使保存失敗,向量存儲仍在內存中可用
return True
except Exception as e:
print(f"創建向量存儲時發生錯誤:{str(e)}")
return False
def load_vector_store(self):
"""載入已存在的向量存儲"""
try:
if os.path.exists(self.vector_store_path) and os.listdir(self.vector_store_path):
self.vector_store = FAISS.load_local(
self.vector_store_path,
embeddings=self.embeddings,
allow_dangerous_deserialization=True,
)
print(f"成功載入現有的向量存儲從: {self.vector_store_path}")
return True
else:
print(f"沒有找到現有的向量存儲在: {self.vector_store_path}")
if IS_SPACES:
print("ℹ️ 在 Hugging Face Spaces 環境中,這是正常的,因為每次重啟都會清空存儲")
return False
except Exception as e:
print(f"載入向量存儲時發生錯誤:{str(e)}")
return False
def get_conversational_chain(self):
"""創建對話鏈"""
prompt_template = """
根據提供的內容盡可能詳細地回答問題。確保提供所有細節。
如果你需要更多細節來完美回答問題,那麼請詢問你認為需要了解的更多細節。
如果答案不在提供的內容中,只需說"在您提供的內容中找不到答案"。不要提供錯誤的答案。
內容:\n {context}\n
問題: \n{question}\n
回答:
"""
# Using Flash 2.0 model
try:
model = ChatGoogleGenerativeAI(
model="gemini-2.0-flash-exp",
google_api_key=gemini_api_key,
temperature=0.3,
max_tokens=8192,
top_p=0.8,
top_k=40,
)
except Exception as e:
print(f"創建模型時發生錯誤,嘗試使用備用模型: {str(e)}")
# 嘗試使用其他可用的模型
model = ChatGoogleGenerativeAI(
model="gemini-pro", google_api_key=gemini_api_key, temperature=0.3
)
prompt = PromptTemplate(
template=prompt_template, input_variables=["context", "question"]
)
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt)
return chain
def answer_question(self, question):
"""回答用戶問題"""
if not self.vector_store:
return "請先上傳並處理PDF文件!"
if not question.strip():
return "請輸入您的問題。"
try:
# 搜索相關文檔
docs = self.vector_store.similarity_search(question, k=6)
if not docs:
return "在上傳的文檔中找不到相關信息。"
# 生成回答
chain = self.get_conversational_chain()
# 使用新的invoke方法替代已棄用的__call__方法
response = chain.invoke(
{
"input_documents": docs,
"question": question,
}
)
return response["output_text"]
except Exception as e:
return f"處理問題時發生錯誤:{str(e)}"
def process_pdfs(self, pdf_files, progress=gr.Progress()):
"""處理PDF文件"""
if not pdf_files:
return "請上傳至少一個PDF文件。", ""
self.processed_files = []
progress(0, desc="開始處理PDF文件...")
# 提取文字
progress(0.2, desc="提取PDF文字內容...")
raw_text, processed_count, error_messages = self.get_pdf_text(pdf_files)
# 如果有錯誤訊息,顯示詳細錯誤
if error_messages:
error_details = "\n".join(error_messages)
if not raw_text.strip():
return f"❌ 處理失敗!\n\n錯誤詳情:\n{error_details}", ""
if not raw_text.strip():
return (
"❌ 無法從PDF文件中提取到文字。請確認PDF文件包含可提取的文字內容。",
"",
)
progress(0.4, desc="分割文字內容...")
# 分割文字
text_chunks = self.get_text_chunks(raw_text)
if not text_chunks:
return "❌ 文字分割失敗,請重試。", ""
progress(0.6, desc="創建向量存儲...")
# 創建向量存儲
success = self.create_vector_store(text_chunks)
progress(1.0, desc="處理完成!")
if success:
file_list = "已處理的文件:\n" + "\n".join(
[f"• {file}" for file in self.processed_files]
)
success_msg = f"✅ 成功處理 {processed_count} 個PDF文件!\n總共 {len(text_chunks)} 個文字區塊\n總文字長度: {len(raw_text)} 字符\n現在您可以開始提問。"
# 如果有部分錯誤,也要顯示
if error_messages:
error_details = "\n".join(error_messages)
success_msg += f"\n\n⚠️ 部分文件處理時出現問題:\n{error_details}"
if IS_SPACES:
success_msg += f"\n\nℹ️ 當前運行在 Hugging Face Spaces 環境,向量存儲僅在當前會話中有效。"
return success_msg, file_list
else:
error_details = "\n".join(error_messages) if error_messages else "未知錯誤"
return f"❌ PDF處理失敗!\n\n錯誤詳情:\n{error_details}", ""
def clear_data(self):
"""清除處理過的資料"""
try:
if os.path.exists(self.vector_store_path):
shutil.rmtree(self.vector_store_path)
print(f"已刪除向量存儲目錄: {self.vector_store_path}")
# 重新創建路徑
if IS_SPACES:
self.vector_store_path = tempfile.mkdtemp(prefix="faiss_index_")
self.vector_store = None
self.processed_files = []
self.chat_history = []
return "✅ 已清除所有處理過的資料!", ""
except Exception as e:
return f"❌ 清除資料時發生錯誤:{str(e)}", ""
def create_docx_report(self, chat_history):
"""創建包含聊天記錄的docx報告"""
try:
# 創建新的文檔
doc = Document()
# 添加標題
title = doc.add_heading("PDF聊天機器人 - 問答記錄", 0)
title.alignment = 1 # 置中對齊
# 添加生成時間
doc.add_paragraph(
f'生成時間:{datetime.now().strftime("%Y年%m月%d日 %H:%M:%S")}'
)
# 添加處理的文件列表
if self.processed_files:
doc.add_heading("已處理的PDF文件:", level=2)
for i, file in enumerate(self.processed_files, 1):
doc.add_paragraph(f"{i}. {file}", style="List Number")
doc.add_paragraph("") # 空行
# 添加問答記錄
doc.add_heading("問答記錄:", level=2)
if not chat_history:
doc.add_paragraph("目前沒有問答記錄。")
else:
for i in range(0, len(chat_history), 2):
if i + 1 < len(chat_history):
question = chat_history[i]["content"]
answer = chat_history[i + 1]["content"]
# 問題
q_paragraph = doc.add_paragraph()
q_run = q_paragraph.add_run(f"問題 {(i//2)+1}:")
q_run.bold = True
q_run.font.size = Inches(0.14)
q_paragraph.add_run(question)
# 回答
a_paragraph = doc.add_paragraph()
a_run = a_paragraph.add_run("回答:")
a_run.bold = True
a_run.font.size = Inches(0.14)
a_paragraph.add_run(answer)
# 分隔線
doc.add_paragraph("─" * 50)
# 保存到臨時文件
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".docx")
doc.save(temp_file.name)
temp_file.close()
return temp_file.name
except Exception as e:
print(f"創建docx文件時發生錯誤:{str(e)}")
return None
def generate_email_html(self, chat_history):
"""生成用於郵件的HTML內容"""
timestamp = datetime.now().strftime("%Y年%m月%d日 %H:%M:%S")
html_content = f"""<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>PDF聊天機器人 - 問答記錄</title>
<style>
body {{
font-family: 'Microsoft JhengHei', '微軟正黑體', Arial, sans-serif;
line-height: 1.6;
color: #333;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f9f9f9;
}}
.container {{
background-color: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}}
.header {{
text-align: center;
border-bottom: 3px solid #007bff;
padding-bottom: 20px;
margin-bottom: 30px;
}}
.header h1 {{
color: #007bff;
margin-bottom: 10px;
}}
.timestamp {{
color: #666;
font-size: 14px;
}}
.files-section {{
background-color: #f8f9fa;
padding: 15px;
border-radius: 5px;
margin-bottom: 30px;
}}
.files-section h3 {{
color: #495057;
margin-top: 0;
}}
.file-list {{
list-style-type: none;
padding: 0;
}}
.file-list li {{
background-color: white;
padding: 8px 12px;
margin: 5px 0;
border-radius: 3px;
border-left: 4px solid #007bff;
}}
.qa-section {{
margin-bottom: 30px;
}}
.qa-item {{
background-color: #fff;
border: 1px solid #e9ecef;
border-radius: 8px;
margin-bottom: 20px;
overflow: hidden;
}}
.question {{
background-color: #007bff;
color: white;
padding: 15px;
margin: 0;
}}
.question-label {{
font-weight: bold;
font-size: 16px;
}}
.answer {{
padding: 20px;
background-color: #f8f9fa;
margin: 0;
}}
.answer-label {{
font-weight: bold;
color: #28a745;
margin-bottom: 10px;
display: block;
}}
.answer-content {{
white-space: pre-wrap;
line-height: 1.8;
}}
.footer {{
text-align: center;
margin-top: 30px;
padding-top: 20px;
border-top: 1px solid #dee2e6;
color: #666;
font-size: 12px;
}}
.no-chat {{
text-align: center;
color: #6c757d;
font-style: italic;
padding: 40px;
background-color: #f8f9fa;
border-radius: 8px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🤖 PDF聊天機器人 - 問答記錄</h1>
<div class="timestamp">生成時間:{timestamp}</div>
</div>"""
# 添加已處理文件列表
if self.processed_files:
html_content += """
<div class="files-section">
<h3>📁 已處理的PDF文件</h3>
<ul class="file-list">"""
for file in self.processed_files:
html_content += f"<li>{file}</li>"
html_content += """
</ul>
</div>"""
# 添加問答記錄
html_content += '<div class="qa-section">'
if not chat_history:
html_content += '<div class="no-chat">目前沒有問答記錄。</div>'
else:
for i in range(0, len(chat_history), 2):
if i + 1 < len(chat_history):
question = chat_history[i]["content"]
answer = chat_history[i + 1]["content"]
html_content += f"""
<div class="qa-item">
<div class="question">
<div class="question-label">問題 {(i//2)+1}:</div>
{question}
</div>
<div class="answer">
<span class="answer-label">💡 回答:</span>
<div class="answer-content">{answer}</div>
</div>
</div>"""
html_content += """
</div>
<div class="footer">
此郵件由 PDF聊天機器人 自動生成<br>
如有任何問題,請聯繫系統管理員
</div>
</div>
</body>
</html>"""
return html_content
def send_chat_history_email(
self, recipient_email, subject="PDF聊天機器人 - 問答記錄"
):
"""發送聊天記錄郵件"""
try:
if not self.chat_history:
return "❌ 沒有聊天記錄可以發送!"
if not recipient_email or not recipient_email.strip():
return "❌ 請輸入有效的電子郵件地址!"
print(f"準備發送郵件至: {recipient_email}")
print(f"郵件主題: {subject}")
# 生成HTML內容
try:
html_content = self.generate_email_html(self.chat_history)
print(f"HTML內容生成成功,長度: {len(html_content)}")
except Exception as e:
print(f"生成HTML內容時發生錯誤: {str(e)}")
return f"❌ 生成郵件內容時發生錯誤:{str(e)}"
# 檢查Resend API key
if not resend.api_key:
return "❌ Resend API金鑰未設置!"
# 發送郵件
try:
params = {
"from": "PDF聊天機器人 <onboarding@resend.dev>",
"to": [recipient_email.strip()],
"subject": subject,
"html": html_content,
}
print("正在發送郵件...")
email = resend.Emails.send(params)
print(f"郵件發送結果: {email}")
if email and email.get("id"):
return f"✅ 郵件已成功發送至 {recipient_email}!\n郵件ID: {email.get('id')}"
else:
return f"❌ 郵件發送失敗,回應: {email}"
except Exception as e:
print(f"發送郵件時發生錯誤: {str(e)}")
print(f"錯誤類型: {type(e).__name__}")
return f"❌ 發送郵件時發生錯誤:{str(e)}"
except Exception as e:
print(f"郵件功能發生未預期錯誤: {str(e)}")
print(f"錯誤類型: {type(e).__name__}")
import traceback
traceback.print_exc()
return f"❌ 郵件功能發生未預期錯誤:{str(e)}"
# 初始化聊天機器人
bot = PDFChatBot()
# Gradio 接口函數
def upload_and_process(files, progress=gr.Progress()):
return bot.process_pdfs(files, progress)
def ask_question(question, history):
if not question.strip():
return history, ""
response = bot.answer_question(question)
# 使用新的消息格式
user_msg = {"role": "user", "content": question}
assistant_msg = {"role": "assistant", "content": response}
history.append(user_msg)
history.append(assistant_msg)
# 同步更新聊天歷史到bot實例
bot.chat_history = history.copy()
return history, ""
def download_chat_history():
"""下載聊天記錄為docx文件"""
if not bot.chat_history:
return None
docx_path = bot.create_docx_report(bot.chat_history)
return docx_path
def send_email(recipient_email, email_subject):
"""發送聊天記錄郵件"""
if not email_subject.strip():
email_subject = "PDF聊天機器人 - 問答記錄"
result = bot.send_chat_history_email(recipient_email, email_subject)
return result
def clear_chat():
"""清除聊天記錄"""
bot.chat_history = []
return [], ""
def clear_all_data():
return bot.clear_data()
def load_existing_data():
if bot.load_vector_store():
return "✅ 成功載入已處理的資料!", ""
else:
if IS_SPACES:
return "ℹ️ 在 Hugging Face Spaces 環境中,每次重啟都會清空存儲。請重新上傳PDF文件。", ""
else:
return "❌ 沒有找到已處理的資料。", ""
# 創建 Gradio 介面
with gr.Blocks(title="PDF聊天機器人", theme=gr.themes.Soft()) as demo:
# 添加環境提示
env_info = "🌐 Hugging Face Spaces" if IS_SPACES else "💻 本地環境"
gr.Markdown(
f"""
# 🤖 PDF聊天機器人 (Flash 2.0) - {env_info}
上傳您的PDF文件,然後就可以向文檔提問!支持多語言問答和郵件發送功能。
{"📝 **注意**: 在 Hugging Face Spaces 環境中,每次重啟都會清空存儲的向量數據,這是正常現象。" if IS_SPACES else ""}
"""
)
with gr.Tab("📁 文件處理"):
with gr.Row():
with gr.Column(scale=2):
file_upload = gr.File(
file_count="multiple",
file_types=[".pdf"],
label="上傳PDF文件",
height=200,
)
with gr.Row():
process_btn = gr.Button(
"🚀 處理PDF文件", variant="primary", size="lg"
)
if not IS_SPACES: # 只在非 Spaces 環境顯示載入按鈕
load_btn = gr.Button("📂 載入已處理資料", variant="secondary")
clear_btn = gr.Button("🗑️ 清除資料", variant="stop")
with gr.Column(scale=1):
status_text = gr.Textbox(label="處理狀態", lines=8, interactive=False)
file_list = gr.Textbox(label="已處理文件", lines=6, interactive=False)
with gr.Tab("💬 問答聊天"):
chatbot = gr.Chatbot(
label="聊天記錄", height=500, show_copy_button=True, type="messages"
)
with gr.Row():
question_input = gr.Textbox(
placeholder="請輸入您的問題...", label="問題", lines=2, scale=4
)
ask_btn = gr.Button("📤 提問", variant="primary", scale=1)
with gr.Row():
clear_chat_btn = gr.Button("🧹 清除聊天記錄", variant="secondary", scale=1)
download_btn = gr.Button("📥 下載問答記錄", variant="primary", scale=1)
# 隱藏的文件下載組件
download_file = gr.File(visible=False)
gr.Examples(
examples=[
"這份文檔的主要內容是什麼?",
"請總結文檔的重點。",
"文檔中提到了哪些重要概念?",
"能否詳細解釋某個特定主題?",
],
inputs=question_input,
label="問題範例",
)
with gr.Tab("📧 郵件發送"):
gr.Markdown("### 發送聊天記錄到您的郵箱")
with gr.Row():
with gr.Column(scale=2):
email_input = gr.Textbox(
label="收件人郵箱",
placeholder="請輸入接收郵件的電子郵箱地址...",
lines=1,
)
subject_input = gr.Textbox(
label="郵件主題", value="PDF聊天機器人 - 問答記錄", lines=1
)
send_email_btn = gr.Button("📨 發送郵件", variant="primary", size="lg")
with gr.Column(scale=1):
email_status = gr.Textbox(label="發送狀態", lines=4, interactive=False)
gr.Markdown(
"""
**注意事項:**
- 請確保已有聊天記錄才能發送郵件
- 郵件將包含完整的問答記錄和已處理的PDF文件列表
- 郵件格式為精美的HTML格式,便於閱讀
"""
)
# 下載功能處理函數
def handle_download():
file_path = download_chat_history()
if file_path:
return gr.update(value=file_path, visible=True)
else:
gr.Warning("沒有聊天記錄可以下載!")
return gr.update(visible=False)
# 事件處理
process_btn.click(
fn=upload_and_process,
inputs=[file_upload],
outputs=[status_text, file_list],
show_progress=True,
)
# 只在非 Spaces 環境綁定載入按鈕事件
if not IS_SPACES:
load_btn.click(fn=load_existing_data, outputs=[status_text, file_list])
clear_btn.click(fn=clear_all_data, outputs=[status_text, file_list])
ask_btn.click(
fn=ask_question,
inputs=[question_input, chatbot],
outputs=[chatbot, question_input],
)
question_input.submit(
fn=ask_question,
inputs=[question_input, chatbot],
outputs=[chatbot, question_input],
)
clear_chat_btn.click(fn=clear_chat, outputs=[chatbot, question_input])
download_btn.click(fn=handle_download, outputs=download_file)
send_email_btn.click(
fn=send_email, inputs=[email_input, subject_input], outputs=email_status
)
if __name__ == "__main__":
# 只在非 Spaces 環境嘗試載入現有向量存儲
if not IS_SPACES:
bot.load_vector_store()
# 啟動應用
demo.launch(
share=False, # 設為 True 可獲得公共連結
server_name="127.0.0.1" if not IS_SPACES else None, # Spaces 環境使用默認
server_port=None, # 自動選擇可用端口
show_error=True,
inbrowser=True if not IS_SPACES else False, # Spaces 環境不自動打開瀏覽器
)