Linebotpic / main.py
alanchen1115's picture
Update main.py
a17c006 verified
import os
import io
from collections import defaultdict
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException
from fastapi.staticfiles import StaticFiles
from google import genai
from google.genai import types
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import (
MessageEvent,
TextMessage,
TextSendMessage,
ImageSendMessage,
ImageMessage,
)
import PIL.Image
import uvicorn
# LangChain 相關匯入
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
# ==========================
# 環境設定與工具函式
# ==========================
# 設置 Google AI API 金鑰
google_api = os.environ["GOOGLE_API_KEY"]
genai_client = genai.Client(api_key=google_api)
# 設置 Line Bot 的 API 金鑰和秘密金鑰
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
# 使用字典模擬用戶訊息歷史存儲
user_message_history = defaultdict(list)
# 建立 FastAPI 應用程式
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
# 設定 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def get_image_url_from_line(message_id):
"""
從 Line 訊息 ID 獲取圖片內容並儲存到暫存檔案。
"""
try:
message_content = line_bot_api.get_message_content(message_id)
file_path = f"/tmp/{message_id}.png"
with open(file_path, "wb") as f:
for chunk in message_content.iter_content():
f.write(chunk)
print(f"✅ 圖片成功儲存到:{file_path}")
return file_path
except Exception as e:
print(f"❌ 圖片取得失敗:{e}")
return None
def store_user_message(user_id, message_type, message_content):
"""
儲存用戶的訊息。
"""
user_message_history[user_id].append(
{"type": message_type, "content": message_content}
)
def get_previous_message(user_id):
"""
獲取用戶的上一則訊息。
"""
if user_id in user_message_history and len(user_message_history[user_id]) > 0:
return user_message_history[user_id][-1]
return {"type": "text", "content": "No message!"}
# ==========================
# LangChain 工具定義
# ==========================
@tool
def generate_and_upload_image(prompt: str) -> str:
"""
這個工具可以根據文字提示生成圖片,並將其上傳到伺服器。
Args:
prompt: 用於生成圖片的文字提示。
Returns:
回傳生成圖片的 URL。
"""
try:
response = genai_client.models.generate_content(
model="gemini-2.0-flash-preview-image-generation",
contents=prompt,
config=types.GenerateContentConfig(response_modalities=['Text', 'Image'])
)
image_binary = None
for part in response.candidates[0].content.parts:
if part.inline_data is not None:
image_binary = part.inline_data.data
break
if image_binary:
image = PIL.Image.open(io.BytesIO(image_binary))
# 隨機生成一個檔案名以避免衝突
file_name = f"static/{os.urandom(16).hex()}.png"
image.save(file_name, format="PNG")
image_url = os.path.join(os.getenv("HF_SPACE"), file_name)
return image_url
return "圖片生成失敗。"
except Exception as e:
return f"圖片生成與上傳失敗: {e}"
@tool
def analyze_image_with_text(image_path: str, user_text: str) -> str:
"""
這個工具可以根據圖片和文字提示來回答問題。
Args:
image_path: 圖片在本地端儲存的路徑。
user_text: 針對圖片提出的文字問題。
Returns:
模型針對圖片和文字提示給出的回應。
"""
try:
if not os.path.exists(image_path):
return "圖片路徑無效,無法進行分析。"
img_user = PIL.Image.open(image_path)
response = genai_client.models.generate_content(
model="gemini-2.5-flash",
contents=[img_user, user_text]
)
if (response.text != None):
out = response.text
else:
out = "Gemini沒答案!請換個說法!"
except Exception as e:
# 處理錯誤
out = f"Gemini執行出錯: {e}"
return out
# ==========================
# LangChain 代理人設定
# ==========================
# 結合所有工具
tools = [generate_and_upload_image, analyze_image_with_text]
# 建立 LLM 模型實例
llm = ChatGoogleGenerativeAI(google_api_key=google_api, model="gemini-2.5-flash", temperature=0.2)
# 建立提示模板
prompt_template = ChatPromptTemplate([
("system", "你是一個強大的圖像生成與問答助理,可以根據用戶的請求使用提供的工具。當你執行 generate_and_upload_image 工具\
成功後會獲得一個 URL,然後你回答的 output 要包含有這個 URL 的完整資訊。如果工具有產生錯誤訊息請解讀並回應。"),
("user", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
# 建立代理人
agent = create_tool_calling_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# ==========================
# FastAPI 路由
# ==========================
@app.get("/")
def root():
return {"title": "Line Bot"}
@app.post("/webhook")
async def webhook(
request: Request,
background_tasks: BackgroundTasks,
x_line_signature=Header(None),
):
body = await request.body()
try:
background_tasks.add_task(
line_handler.handle, body.decode("utf-8"), x_line_signature
)
except InvalidSignatureError:
raise HTTPException(status_code=400, detail="Invalid signature")
return "ok"
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
def handle_message(event):
user_id = event.source.user_id
# 處理圖片上傳
if event.message.type == "image":
image_path = get_image_url_from_line(event.message.id)
if image_path:
store_user_message(user_id, "image", image_path)
line_bot_api.reply_message(
event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~")
)
else:
line_bot_api.reply_message(
event.reply_token, TextSendMessage(text="沒有接收到圖片~")
)
# 處理文字訊息
elif event.message.type == "text":
user_text = event.message.text
previous_message = get_previous_message(user_id)
print(previous_message)
# 根據上一則訊息類型,動態傳遞給代理人
if previous_message["type"] == "image":
image_path = previous_message["content"]
agent_input = {
"input": f"請根據這張圖片回答問題。圖片的路徑是 {image_path},我的問題是:{user_text}"
}
# 清除上一則圖片訊息,避免重複觸發
user_message_history[user_id].pop()
else:
agent_input = {"input": user_text}
try:
# 運行代理人
response = agent_executor.invoke(agent_input)
out = response["output"]
if 'https' in out:
img_tmp = 'https'+out.split('https')[1]
image_url = img_tmp.split('png')[0]+'png'
line_bot_api.push_message(
event.source.user_id,
[
TextSendMessage(text="✨ 這是我為你生成的圖片喔~"),
ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
]
)
else:
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
except Exception as e:
print(f"代理人執行出錯: {e}")
out = f"代理人執行出錯!錯誤訊息:{e}"
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)