import os, re, base64 from langchain_core.documents import Document from langchain_chroma import Chroma from openai import OpenAI from langchain.embeddings.base import Embeddings from langchain_google_genai import ChatGoogleGenerativeAI from langchain_community.vectorstores import FAISS import gradio as gr from langchain.memory import ConversationBufferMemory # ============================================= # 1️⃣ 內建 Embedding:使用 Gemini embedding API # ============================================= from langchain_community.embeddings import HuggingFaceEmbeddings embedding = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5") # ============================================= # 2️⃣ 載入 QA 檔案並分類 # ============================================= BASE_DIR = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(BASE_DIR, "QA_v2.txt") if not os.path.exists(path): raise FileNotFoundError(f"❌ 找不到 QA 檔案:{path}") with open(path, "r", encoding="utf-8") as f: text = f.read() pattern = r"(Q[::].*?)(?=Q[::]|$)" qas = re.findall(pattern, text, flags=re.S) qa_docs = {"證券": [], "期貨": [], "複委託": []} for qa in qas: if "證券" in qa: qa_docs["證券"].append(Document(page_content=qa.strip(), metadata={"source": path})) elif "期貨" in qa: qa_docs["期貨"].append(Document(page_content=qa.strip(), metadata={"source": path})) elif "複委託" in qa: qa_docs["複委託"].append(Document(page_content=qa.strip(), metadata={"source": path})) print("✅ 已成功讀取 QA 並完成分類:", {k: len(v) for k, v in qa_docs.items()}) # ============================================= # 3️⃣ 建立向量資料庫(使用 FAISS,記憶體型) # ============================================= vectordbs = {} for k, docs in qa_docs.items(): vectordbs[k] = FAISS.from_documents(docs, embedding) # ============================================= # 4️⃣ 初始化 Gemini LLM # ============================================= API_KEY = os.getenv("GOOGLE_API_KEY") if not API_KEY: raise ValueError("⚠️ 未設定 GOOGLE_API_KEY,請在 Hugging Face Secrets 中新增。") llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash', google_api_key=API_KEY) memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) # ============================================= # 5️⃣ 對話邏輯 # ============================================= def auto_detect_category(text): if any(k in text for k in ["股票", "證券", "開戶", "下單", "交割", "現股"]): return "證券" elif any(k in text for k in ["期貨", "選擇權", "結算", "保證金", "契約"]): return "期貨" elif any(k in text for k in ["複委託", "海外", "美股", "港股", "國外"]): return "複委託" else: return "證券" def chat_fn(message, history): category = auto_detect_category(message) vectordb = vectordbs.get(category) if not vectordb: return "目前尚無此類別的知識庫。" docs = vectordb.similarity_search(message, k=2) context = "\n\n".join([d.page_content for d in docs]) if docs else "查無相關內容。" prompt = f""" 我是一位金融客服人員。根據以下公司規章內容回答使用者問題: --- {context} --- 使用者問題:{message} """ try: response = llm.invoke(prompt) reply = response.content.strip() except Exception as e: reply = f"⚠️ 生成錯誤:{e}" return reply or "請洽營業員" # ============================================= # 6️⃣ Gradio 介面 # ============================================= logo_path = os.path.join(BASE_DIR, "mega.png") logo_base64 = "" if os.path.exists(logo_path): with open(logo_path, "rb") as f: logo_base64 = base64.b64encode(f.read()).decode("utf-8") logo_path = os.path.join(BASE_DIR, "mega.png") logo_base64 = "" if os.path.exists(logo_path): with open(logo_path, "rb") as f: logo_base64 = base64.b64encode(f.read()).decode("utf-8") gr.HTML("""

👨‍💼 我是小智  您的金融好幫手 🫰

Powered by Gemini & LangChain

""") with gr.Row(): with gr.Column(scale=4): chatbox = gr.Chatbot(label="💬 對話紀錄", type="messages") with gr.Row(elem_id="input-row"): user_input = gr.Textbox( elem_id="user-input", show_label=False, placeholder="輸入訊息...", scale=8 ) send_btn = gr.Button("送出", elem_id="send-btn", scale=1) def handle_input(message, history): if not message.strip(): return history, gr.update(value="") reply = chat_fn(message, history) history = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": reply} ] return history, gr.update(value="") user_input.submit(handle_input, [user_input, chatbox], [chatbox, user_input]) send_btn.click(handle_input, [user_input, chatbox], [chatbox, user_input]) with gr.Column(scale=1): gr.Markdown("### 👇 快速提問") btns = [ ("未成年可以開戶嗎?", "未成年可以開戶嗎?"), ("法人開戶要準備什麼?", "法人開戶要準備什麼?"), ("期貨交易保證金是什麼?", "期貨交易保證金是什麼?"), ("複委託要如何下單?", "複委託要如何下單?"), ("美股交易時間?", "美股交易時間?"), ("美股可以定期定額嗎?", "美股可以定期定額嗎?") ] for label, q in btns: gr.Button(label).click(lambda h, q=q: handle_input(q, h), [chatbox], [chatbox, user_input]) def clear_memory(): memory.clear() return [], gr.update(value="", placeholder="輸入訊息...") gr.Button("🧹 整理畫面").click(clear_memory, outputs=[chatbox, user_input]) # 底部版權列 gr.HTML("") # 手機鍵盤彈出時捲動補丁 demo.load(None, None, None, js=""" window.addEventListener('focusin', () => { document.querySelector('textarea')?.scrollIntoView({ behavior: 'smooth', block: 'center' }); }); """) demo.launch()