Linebotpic / main.py
alanchen1115's picture
Update main.py
82a6007 verified
import os
import io
import re
import PIL.Image
import uvicorn
from collections import defaultdict
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
# Google GenAI SDK
from google import genai
from google.genai import types
# Line Bot
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import (
MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, ImageMessage,
)
# LangChain imports (修正後)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
# 直接使用標準路徑,不再使用 try-except
# 從具體路徑匯入 AgentExecutor
from langchain.agents.agent import AgentExecutor
from langchain.agents import create_tool_calling_agent
# ==========================
# 環境設定與工具函式
# ==========================
google_api = os.environ.get("GOOGLE_API_KEY")
genai_client = genai.Client(api_key=google_api)
line_bot_api = LineBotApi(os.environ.get("CHANNEL_ACCESS_TOKEN"))
line_handler = WebhookHandler(os.environ.get("CHANNEL_SECRET"))
user_message_history = defaultdict(list)
app = FastAPI()
if not os.path.exists("static"):
os.makedirs("static")
app.mount("/static", StaticFiles(directory="static"), name="static")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def get_image_url_from_line(message_id):
try:
message_content = line_bot_api.get_message_content(message_id)
file_path = f"static/{message_id}.png"
with open(file_path, "wb") as f:
for chunk in message_content.iter_content():
f.write(chunk)
return file_path
except Exception as e:
print(f"❌ 圖片取得失敗:{e}")
return None
def store_user_message(user_id, message_type, message_content):
user_message_history[user_id].append({"type": message_type, "content": message_content})
def get_previous_message(user_id):
if user_id in user_message_history and len(user_message_history[user_id]) > 0:
return user_message_history[user_id][-1]
return {"type": "text", "content": "No message!"}
# ==========================
# LangChain 工具定義
# ==========================
@tool
def generate_and_upload_image(prompt: str) -> str:
"""根據文字提示生成圖片。"""
try:
# 修正:2026 年請使用實質存在的模型 imagen-3.0
response = genai_client.models.generate_content(
model="gemini-2.5-flash-image",
contents=prompt,
config=types.GenerateContentConfig(response_modalities=['IMAGE'])
)
image_binary = None
for part in response.candidates[0].content.parts:
if part.inline_data:
image_binary = part.inline_data.data
break
if image_binary:
image = PIL.Image.open(io.BytesIO(image_binary))
file_name = f"static/{os.urandom(8).hex()}.png"
image.save(file_name, format="PNG")
base_url = os.getenv("HF_SPACE", "http://localhost:7860").rstrip("/")
return f"{base_url}/{file_name}"
return "圖片生成失敗:模型未回傳數據。"
except Exception as e:
return f"圖片生成失敗: {e}"
@tool
def analyze_image_with_text(image_path: str, user_text: str) -> str:
"""根據圖片路徑和文字提問進行分析。"""
try:
if not os.path.exists(image_path):
return "錯誤:找不到該圖片檔案。"
img_user = PIL.Image.open(image_path)
# 修正模型:gemini-1.5-flash 或 2.0-flash
response = genai_client.models.generate_content(
model="gemini-2.5-flash",
contents=[img_user, user_text]
)
return response.text if response.text else "Gemini 沒答案!"
except Exception as e:
return f"分析出錯: {e}"
# ==========================
# LangChain 代理人設定
# ==========================
tools = [generate_and_upload_image, analyze_image_with_text]
# 修正模型:gemini-3-flash-preview 目前在正式 API 中通常不可用
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.4)
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是一個強大的助理。在使用工具之前,請務必先說明你的思考步驟。如果生成了圖片,請直接給出 URL。"),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_tool_calling_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=False, handle_parsing_errors=True)
# ==========================
# FastAPI 路由
# ==========================
@app.get("/")
def root():
return {"status": "running"}
@app.post("/webhook")
async def webhook(request: Request, background_tasks: BackgroundTasks, x_line_signature=Header(None)):
body = await request.body()
try:
background_tasks.add_task(line_handler.handle, body.decode("utf-8"), x_line_signature)
except InvalidSignatureError:
raise HTTPException(status_code=400)
return "ok"
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
def handle_message(event):
user_id = event.source.user_id
# 1. 處理圖片訊息
if isinstance(event.message, ImageMessage):
image_path = get_image_url_from_line(event.message.id)
if image_path:
store_user_message(user_id, "image", image_path)
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="收到圖片了!請問你想對這張圖做什麼分析?"))
# 2. 處理文字訊息 (修正縮進,確保它與上面的 if 對齊)
elif isinstance(event.message, TextMessage):
user_text = event.message.text
previous_message = get_previous_message(user_id)
if previous_message["type"] == "image":
image_path = previous_message["content"]
agent_input = {"input": f"請分析這張圖片 {image_path},問題是:{user_text}"}
user_message_history[user_id].pop()
else:
agent_input = {"input": user_text}
try:
response = agent_executor.invoke(agent_input)
out = response["output"]
# 搜尋 URL 邏輯
urls = re.findall(r'https?://[^\s<>"]+|www\.[^\s<>"]+', out)
image_url = next((u for u in urls if any(ext in u.lower() for ext in ['.png', '.jpg'])), None)
if image_url:
line_bot_api.reply_message(
event.reply_token,
[
TextSendMessage(text="這是我為你生成的圖片:"),
ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
]
)
else:
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
except Exception as e:
print(f"Agent Error: {e}")
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="抱歉,我現在無法處理這個請求。"))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)