import os import io import re import PIL.Image import uvicorn from collections import defaultdict from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles # Google GenAI SDK from google import genai from google.genai import types # Line Bot from linebot import LineBotApi, WebhookHandler from linebot.exceptions import InvalidSignatureError from linebot.models import ( MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, ImageMessage, ) # LangChain imports (修正後) from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.tools import tool from langchain_google_genai import ChatGoogleGenerativeAI # 直接使用標準路徑,不再使用 try-except # 從具體路徑匯入 AgentExecutor from langchain.agents.agent import AgentExecutor from langchain.agents import create_tool_calling_agent # ========================== # 環境設定與工具函式 # ========================== google_api = os.environ.get("GOOGLE_API_KEY") genai_client = genai.Client(api_key=google_api) line_bot_api = LineBotApi(os.environ.get("CHANNEL_ACCESS_TOKEN")) line_handler = WebhookHandler(os.environ.get("CHANNEL_SECRET")) user_message_history = defaultdict(list) app = FastAPI() if not os.path.exists("static"): os.makedirs("static") app.mount("/static", StaticFiles(directory="static"), name="static") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_image_url_from_line(message_id): try: message_content = line_bot_api.get_message_content(message_id) file_path = f"static/{message_id}.png" with open(file_path, "wb") as f: for chunk in message_content.iter_content(): f.write(chunk) return file_path except Exception as e: print(f"❌ 圖片取得失敗:{e}") return None def store_user_message(user_id, message_type, message_content): user_message_history[user_id].append({"type": message_type, "content": message_content}) def get_previous_message(user_id): if user_id in user_message_history and len(user_message_history[user_id]) > 0: return user_message_history[user_id][-1] return {"type": "text", "content": "No message!"} # ========================== # LangChain 工具定義 # ========================== @tool def generate_and_upload_image(prompt: str) -> str: """根據文字提示生成圖片。""" try: # 修正:2026 年請使用實質存在的模型 imagen-3.0 response = genai_client.models.generate_content( model="gemini-2.5-flash-image", contents=prompt, config=types.GenerateContentConfig(response_modalities=['IMAGE']) ) image_binary = None for part in response.candidates[0].content.parts: if part.inline_data: image_binary = part.inline_data.data break if image_binary: image = PIL.Image.open(io.BytesIO(image_binary)) file_name = f"static/{os.urandom(8).hex()}.png" image.save(file_name, format="PNG") base_url = os.getenv("HF_SPACE", "http://localhost:7860").rstrip("/") return f"{base_url}/{file_name}" return "圖片生成失敗:模型未回傳數據。" except Exception as e: return f"圖片生成失敗: {e}" @tool def analyze_image_with_text(image_path: str, user_text: str) -> str: """根據圖片路徑和文字提問進行分析。""" try: if not os.path.exists(image_path): return "錯誤:找不到該圖片檔案。" img_user = PIL.Image.open(image_path) # 修正模型:gemini-1.5-flash 或 2.0-flash response = genai_client.models.generate_content( model="gemini-2.5-flash", contents=[img_user, user_text] ) return response.text if response.text else "Gemini 沒答案!" except Exception as e: return f"分析出錯: {e}" # ========================== # LangChain 代理人設定 # ========================== tools = [generate_and_upload_image, analyze_image_with_text] # 修正模型:gemini-3-flash-preview 目前在正式 API 中通常不可用 llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.4) prompt_template = ChatPromptTemplate.from_messages([ ("system", "你是一個強大的助理。在使用工具之前,請務必先說明你的思考步驟。如果生成了圖片,請直接給出 URL。"), ("user", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"), ]) agent = create_tool_calling_agent(llm, tools, prompt_template) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=False, handle_parsing_errors=True) # ========================== # FastAPI 路由 # ========================== @app.get("/") def root(): return {"status": "running"} @app.post("/webhook") async def webhook(request: Request, background_tasks: BackgroundTasks, x_line_signature=Header(None)): body = await request.body() try: background_tasks.add_task(line_handler.handle, body.decode("utf-8"), x_line_signature) except InvalidSignatureError: raise HTTPException(status_code=400) return "ok" @line_handler.add(MessageEvent, message=(ImageMessage, TextMessage)) def handle_message(event): user_id = event.source.user_id # 1. 處理圖片訊息 if isinstance(event.message, ImageMessage): image_path = get_image_url_from_line(event.message.id) if image_path: store_user_message(user_id, "image", image_path) line_bot_api.reply_message(event.reply_token, TextSendMessage(text="收到圖片了!請問你想對這張圖做什麼分析?")) # 2. 處理文字訊息 (修正縮進,確保它與上面的 if 對齊) elif isinstance(event.message, TextMessage): user_text = event.message.text previous_message = get_previous_message(user_id) if previous_message["type"] == "image": image_path = previous_message["content"] agent_input = {"input": f"請分析這張圖片 {image_path},問題是:{user_text}"} user_message_history[user_id].pop() else: agent_input = {"input": user_text} try: response = agent_executor.invoke(agent_input) out = response["output"] # 搜尋 URL 邏輯 urls = re.findall(r'https?://[^\s<>"]+|www\.[^\s<>"]+', out) image_url = next((u for u in urls if any(ext in u.lower() for ext in ['.png', '.jpg'])), None) if image_url: line_bot_api.reply_message( event.reply_token, [ TextSendMessage(text="這是我為你生成的圖片:"), ImageSendMessage(original_content_url=image_url, preview_image_url=image_url) ] ) else: line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out)) except Exception as e: print(f"Agent Error: {e}") line_bot_api.reply_message(event.reply_token, TextSendMessage(text="抱歉,我現在無法處理這個請求。")) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)