Spaces:
Running
Running
| import os | |
| import io | |
| import re | |
| import PIL.Image | |
| import uvicorn | |
| from collections import defaultdict | |
| from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| # Google GenAI SDK | |
| from google import genai | |
| from google.genai import types | |
| # Line Bot | |
| from linebot import LineBotApi, WebhookHandler | |
| from linebot.exceptions import InvalidSignatureError | |
| from linebot.models import ( | |
| MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, ImageMessage, | |
| ) | |
| # LangChain imports (修正後) | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain_core.tools import tool | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # 直接使用標準路徑,不再使用 try-except | |
| # 從具體路徑匯入 AgentExecutor | |
| from langchain.agents.agent import AgentExecutor | |
| from langchain.agents import create_tool_calling_agent | |
| # ========================== | |
| # 環境設定與工具函式 | |
| # ========================== | |
| google_api = os.environ.get("GOOGLE_API_KEY") | |
| genai_client = genai.Client(api_key=google_api) | |
| line_bot_api = LineBotApi(os.environ.get("CHANNEL_ACCESS_TOKEN")) | |
| line_handler = WebhookHandler(os.environ.get("CHANNEL_SECRET")) | |
| user_message_history = defaultdict(list) | |
| app = FastAPI() | |
| if not os.path.exists("static"): | |
| os.makedirs("static") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def get_image_url_from_line(message_id): | |
| try: | |
| message_content = line_bot_api.get_message_content(message_id) | |
| file_path = f"static/{message_id}.png" | |
| with open(file_path, "wb") as f: | |
| for chunk in message_content.iter_content(): | |
| f.write(chunk) | |
| return file_path | |
| except Exception as e: | |
| print(f"❌ 圖片取得失敗:{e}") | |
| return None | |
| def store_user_message(user_id, message_type, message_content): | |
| user_message_history[user_id].append({"type": message_type, "content": message_content}) | |
| def get_previous_message(user_id): | |
| if user_id in user_message_history and len(user_message_history[user_id]) > 0: | |
| return user_message_history[user_id][-1] | |
| return {"type": "text", "content": "No message!"} | |
| # ========================== | |
| # LangChain 工具定義 | |
| # ========================== | |
| def generate_and_upload_image(prompt: str) -> str: | |
| """根據文字提示生成圖片。""" | |
| try: | |
| # 修正:2026 年請使用實質存在的模型 imagen-3.0 | |
| response = genai_client.models.generate_content( | |
| model="gemini-2.5-flash-image", | |
| contents=prompt, | |
| config=types.GenerateContentConfig(response_modalities=['IMAGE']) | |
| ) | |
| image_binary = None | |
| for part in response.candidates[0].content.parts: | |
| if part.inline_data: | |
| image_binary = part.inline_data.data | |
| break | |
| if image_binary: | |
| image = PIL.Image.open(io.BytesIO(image_binary)) | |
| file_name = f"static/{os.urandom(8).hex()}.png" | |
| image.save(file_name, format="PNG") | |
| base_url = os.getenv("HF_SPACE", "http://localhost:7860").rstrip("/") | |
| return f"{base_url}/{file_name}" | |
| return "圖片生成失敗:模型未回傳數據。" | |
| except Exception as e: | |
| return f"圖片生成失敗: {e}" | |
| def analyze_image_with_text(image_path: str, user_text: str) -> str: | |
| """根據圖片路徑和文字提問進行分析。""" | |
| try: | |
| if not os.path.exists(image_path): | |
| return "錯誤:找不到該圖片檔案。" | |
| img_user = PIL.Image.open(image_path) | |
| # 修正模型:gemini-1.5-flash 或 2.0-flash | |
| response = genai_client.models.generate_content( | |
| model="gemini-2.5-flash", | |
| contents=[img_user, user_text] | |
| ) | |
| return response.text if response.text else "Gemini 沒答案!" | |
| except Exception as e: | |
| return f"分析出錯: {e}" | |
| # ========================== | |
| # LangChain 代理人設定 | |
| # ========================== | |
| tools = [generate_and_upload_image, analyze_image_with_text] | |
| # 修正模型:gemini-3-flash-preview 目前在正式 API 中通常不可用 | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.4) | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| ("system", "你是一個強大的助理。在使用工具之前,請務必先說明你的思考步驟。如果生成了圖片,請直接給出 URL。"), | |
| ("user", "{input}"), | |
| MessagesPlaceholder(variable_name="agent_scratchpad"), | |
| ]) | |
| agent = create_tool_calling_agent(llm, tools, prompt_template) | |
| agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=False, handle_parsing_errors=True) | |
| # ========================== | |
| # FastAPI 路由 | |
| # ========================== | |
| def root(): | |
| return {"status": "running"} | |
| async def webhook(request: Request, background_tasks: BackgroundTasks, x_line_signature=Header(None)): | |
| body = await request.body() | |
| try: | |
| background_tasks.add_task(line_handler.handle, body.decode("utf-8"), x_line_signature) | |
| except InvalidSignatureError: | |
| raise HTTPException(status_code=400) | |
| return "ok" | |
| def handle_message(event): | |
| user_id = event.source.user_id | |
| # 1. 處理圖片訊息 | |
| if isinstance(event.message, ImageMessage): | |
| image_path = get_image_url_from_line(event.message.id) | |
| if image_path: | |
| store_user_message(user_id, "image", image_path) | |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text="收到圖片了!請問你想對這張圖做什麼分析?")) | |
| # 2. 處理文字訊息 (修正縮進,確保它與上面的 if 對齊) | |
| elif isinstance(event.message, TextMessage): | |
| user_text = event.message.text | |
| previous_message = get_previous_message(user_id) | |
| if previous_message["type"] == "image": | |
| image_path = previous_message["content"] | |
| agent_input = {"input": f"請分析這張圖片 {image_path},問題是:{user_text}"} | |
| user_message_history[user_id].pop() | |
| else: | |
| agent_input = {"input": user_text} | |
| try: | |
| response = agent_executor.invoke(agent_input) | |
| out = response["output"] | |
| # 搜尋 URL 邏輯 | |
| urls = re.findall(r'https?://[^\s<>"]+|www\.[^\s<>"]+', out) | |
| image_url = next((u for u in urls if any(ext in u.lower() for ext in ['.png', '.jpg'])), None) | |
| if image_url: | |
| line_bot_api.reply_message( | |
| event.reply_token, | |
| [ | |
| TextSendMessage(text="這是我為你生成的圖片:"), | |
| ImageSendMessage(original_content_url=image_url, preview_image_url=image_url) | |
| ] | |
| ) | |
| else: | |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out)) | |
| except Exception as e: | |
| print(f"Agent Error: {e}") | |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text="抱歉,我現在無法處理這個請求。")) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |