alanchen1115 commited on
Commit
8e6ac56
·
verified ·
1 Parent(s): 789916f

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +206 -127
main.py CHANGED
@@ -1,38 +1,50 @@
 
 
 
 
1
  from fastapi.middleware.cors import CORSMiddleware
2
- from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status
3
  from fastapi.staticfiles import StaticFiles
4
  from google import genai
 
5
  from linebot import LineBotApi, WebhookHandler
6
  from linebot.exceptions import InvalidSignatureError
7
- from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage, ImageMessage
8
- import json, os
9
- import io
 
 
 
 
10
  import PIL.Image
11
- from Image_text_generation import Image_text_Generator
12
- from Uploading_images_file import get_image_url, store_user_message, analyze_with_gemini, get_previous_message
 
 
 
 
 
13
 
14
- #==========================
15
- # API 金鑰
16
- #==========================
17
 
18
- # 設定 Google AI API 金鑰
19
- client = genai.Client(api_key=os.getenv("GOOGLE_API_KEY"))
 
20
 
21
- # 設定生成文字的參數
22
- generation_config = genai.types.GenerateContentConfig(max_output_tokens=256, temperature=0.5, top_p=0.5, top_k=16)
23
 
24
- # 設定 Line Bot 的 API 金鑰和秘密金鑰
25
  line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
26
  line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
27
 
28
- # 設定是否正在與使用者交談
29
- working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true"
30
 
31
  # 建立 FastAPI 應用程式
32
  app = FastAPI()
33
  app.mount("/static", StaticFiles(directory="static"), name="static")
34
 
35
- # 設定 CORS,允許跨域請求
36
  app.add_middleware(
37
  CORSMiddleware,
38
  allow_origins=["*"],
@@ -41,141 +53,208 @@ app.add_middleware(
41
  allow_headers=["*"],
42
  )
43
 
44
- # 處理根路徑請求
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  @app.get("/")
46
  def root():
47
  return {"title": "Line Bot"}
48
 
49
- # 處理 Line Webhook 請求
50
  @app.post("/webhook")
51
  async def webhook(
52
  request: Request,
53
  background_tasks: BackgroundTasks,
54
  x_line_signature=Header(None),
55
  ):
56
- # 取得請求內容
57
  body = await request.body()
58
  try:
59
- # 將處理 Line 事件的任務加入背景工作
60
  background_tasks.add_task(
61
  line_handler.handle, body.decode("utf-8"), x_line_signature
62
  )
63
  except InvalidSignatureError:
64
- # 處理無效的簽章錯誤
65
  raise HTTPException(status_code=400, detail="Invalid signature")
66
  return "ok"
67
-
68
 
69
- #==========================
70
- # 主程式(圖片與文字)
71
- #==========================
72
- # 建立 chat_sessions 字典
73
- chat_sessions = {}
74
  @line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
75
- def handle_image_message(event):
76
  user_id = event.source.user_id
77
- user_text = event.message.text if event.message.type == "text" else None
78
- previous_message = get_previous_message(user_id)
79
-
80
- if event.message.type != "text" and event.message.type != "image":
81
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~"))
82
- return
83
- elif user_text == "再見":
84
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text="Bye!"))
85
- return
86
-
87
- # ========
88
- # 生成圖片
89
- # ========
90
- elif user_text and user_text.startswith("生成圖片"):
91
- prompt = user_text.replace("生成圖片", "").strip()
92
-
93
- # 先立即回覆避免token過期
94
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成中~ 請稍候.....✨"))
95
- image_generator = Image_text_Generator(user_id)
96
- # 生成圖片
97
- image_binary = image_generator.generate_image_with_gemini(prompt)
98
-
99
- if image_binary:
100
- image_url = os.path.join(os.getenv("HF_SPACE"),image_generator.upload_image_to_tmp(image_binary)) ### os.getenv("HF_SPACE") => https://xxxxx.hf.space
101
-
102
- if image_url:
103
- # 使用 push message 發送圖片,避免 reply token 超時
104
- line_bot_api.push_message(
105
- event.source.user_id,
106
- [
107
- TextSendMessage(text="✨ 這是我為你生成的圖片喔~"),
108
- ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
109
- ]
110
- )
111
- else:
112
- line_bot_api.push_message(
113
- event.source.user_id,
114
- TextSendMessage(text="⚠️ 圖片上傳失敗,請稍後再試~")
115
- )
116
  else:
117
- line_bot_api.push_message(
118
- event.source.user_id,
119
- TextSendMessage(text="⚠️ 圖片生成失敗,請稍後再試~")
120
  )
121
  return
122
-
123
- # ========
124
- # 純文字
125
- # ========
126
- elif event.message.type == "text" and previous_message["type"] != "image":
127
- try:
128
- user_id = event.source.user_id
129
- chat = chat_sessions.get(user_id) or client.chats.create(model="gemini-2.0-flash", config=generation_config)
130
- chat_sessions[user_id] = chat
131
- # 取得使用者輸入的文字
132
- user_input = event.message.text
133
- response = chat.send_message(user_input)
134
- if (response.text != None):
135
- out = response.text
136
- else:
137
- out = "Gemini沒答案!請換個說法!"
138
- except:
139
- # 處理錯誤
140
- out = "Gemini執行出錯!請換個說法!"
141
-
142
- elif previous_message and previous_message["type"] == "image" and event.message.type == "text":
143
- image_path = previous_message["content"]
144
  user_text = event.message.text
145
- store_user_message(user_id, "text", user_text)
146
- try:
147
- if not os.path.exists(image_path):
148
- raise FileNotFoundError(f"圖片路徑無效:{image_path}")
149
- previous_img = PIL.Image.open(image_path)
150
- user_id = event.source.user_id
151
- chat = chat_sessions.get(user_id) or client.chats.create(model="gemini-2.0-flash", config=generation_config)
152
- chat_sessions[user_id] = chat
153
- # 取得使用者輸入的文字
154
- user_input = event.message.text
155
- response = chat.send_message([previous_img, user_input])
156
- if (response.text != None):
157
- out = response.text
158
- else:
159
- out = "Gemini沒答案!請換個說法!"
160
- except:
161
- # 處理錯誤
162
- out = "Gemini執行出錯!請換個說法!"
163
-
164
- # ========
165
- # 上傳圖片
166
- # ========
167
- elif event.message.type == "image":
168
- image_path = get_image_url(event.message.id)
169
- if image_path:
170
- store_user_message(user_id, "image", image_path)
171
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~"))
172
  else:
173
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text="沒有接收到圖片~"))
174
- return
175
-
176
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
177
-
 
 
 
 
 
 
 
178
 
179
  if __name__ == "__main__":
180
- # 啟動 FastAPI 應用程式
181
- uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)
 
1
+ import os
2
+ import io
3
+ import tempfile
4
+ from collections import defaultdict
5
  from fastapi.middleware.cors import CORSMiddleware
6
+ from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException
7
  from fastapi.staticfiles import StaticFiles
8
  from google import genai
9
+ from google.genai import types
10
  from linebot import LineBotApi, WebhookHandler
11
  from linebot.exceptions import InvalidSignatureError
12
+ from linebot.models import (
13
+ MessageEvent,
14
+ TextMessage,
15
+ TextSendMessage,
16
+ ImageSendMessage,
17
+ ImageMessage,
18
+ )
19
  import PIL.Image
20
+ import uvicorn
21
+
22
+ # LangChain 相關匯入
23
+ from langchain_core.prompts import ChatPromptTemplate
24
+ from langchain_core.tools import tool
25
+ from langchain_google_genai import ChatGoogleGenerativeAI
26
+ from langchain.agents import create_tool_calling_agent, AgentExecutor
27
 
 
 
 
28
 
29
+ # ==========================
30
+ # 環境設定與工具函式
31
+ # ==========================
32
 
33
+ # 設置 Google AI API 金鑰
34
+ os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
35
 
36
+ # 設置 Line Bot 的 API 金鑰和秘密金鑰
37
  line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
38
  line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
39
 
40
+ # 使用字典模擬用戶訊息歷史存儲
41
+ user_message_history = defaultdict(list)
42
 
43
  # 建立 FastAPI 應用程式
44
  app = FastAPI()
45
  app.mount("/static", StaticFiles(directory="static"), name="static")
46
 
47
+ # 設定 CORS
48
  app.add_middleware(
49
  CORSMiddleware,
50
  allow_origins=["*"],
 
53
  allow_headers=["*"],
54
  )
55
 
56
+ def get_image_url_from_line(message_id):
57
+ """
58
+ 從 Line 訊息 ID 獲取圖片內容並儲存到暫存檔案。
59
+ """
60
+ try:
61
+ message_content = line_bot_api.get_message_content(message_id)
62
+ file_path = f"/tmp/{message_id}.png"
63
+ with open(file_path, "wb") as f:
64
+ for chunk in message_content.iter_content():
65
+ f.write(chunk)
66
+ print(f"✅ 圖片成功儲存到:{file_path}")
67
+ return file_path
68
+ except Exception as e:
69
+ print(f"❌ 圖片取得失敗:{e}")
70
+ return None
71
+
72
+ def store_user_message(user_id, message_type, message_content):
73
+ """
74
+ 儲存用戶的訊息。
75
+ """
76
+ user_message_history[user_id].append(
77
+ {"type": message_type, "content": message_content}
78
+ )
79
+
80
+ def get_previous_message(user_id):
81
+ """
82
+ 獲取用戶的上一則訊息。
83
+ """
84
+ if user_id in user_message_history and len(user_message_history[user_id]) > 0:
85
+ return user_message_history[user_id][-1]
86
+ return {"type": "text", "content": "No message!"}
87
+
88
+
89
+ # ==========================
90
+ # LangChain 工具定義
91
+ # ==========================
92
+
93
+ @tool
94
+ def generate_and_upload_image(prompt: str) -> str:
95
+ """
96
+ 這個工具可以根據文字提示生成圖片,並將其上傳到伺服器。
97
+
98
+ Args:
99
+ prompt: 用於生成圖片的文字提示。
100
+
101
+ Returns:
102
+ 生成圖片的 URL。
103
+ """
104
+ try:
105
+ genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
106
+ response = genai_client.models.generate_content(
107
+ model="gemini-2.0-flash-exp",
108
+ contents=prompt,
109
+ config=types.GenerateContentConfig(response_modalities=['Text', 'Image'])
110
+ )
111
+
112
+ image_binary = None
113
+ for part in response.candidates[0].content.parts:
114
+ if part.inline_data is not None:
115
+ image_binary = part.inline_data.data
116
+ break
117
+
118
+ if image_binary:
119
+ image = PIL.Image.open(io.BytesIO(image_binary))
120
+ # 隨機生成一個檔案名以避免衝突
121
+ file_name = f"static/{os.urandom(16).hex()}.png"
122
+ image.save(file_name, format="PNG")
123
+
124
+ image_url = os.path.join(os.getenv("HF_SPACE"), file_name)
125
+ return image_url
126
+
127
+ return "圖片生成失敗。"
128
+ except Exception as e:
129
+ return f"圖片生成與上傳失敗: {e}"
130
+
131
+ @tool
132
+ def analyze_image_with_text(image_path: str, user_text: str) -> str:
133
+ """
134
+ 這個工具可以根據一張圖片和一個文字提示來回答問題。
135
+
136
+ Args:
137
+ image_path: 圖片在本地端儲存的路徑。
138
+ user_text: 針對圖片提出的文字問題。
139
+
140
+ Returns:
141
+ 模型針對圖片和文字提示給出的回應。
142
+ """
143
+ try:
144
+ if not os.path.exists(image_path):
145
+ return "圖片路徑無效,無法進行分析。"
146
+
147
+ img_user = PIL.Image.open(image_path)
148
+
149
+ # 建立一個新的聊天模型實例,以確保狀態獨立
150
+ chat_model = ChatGoogleGenerativeAI(
151
+ model="gemini-1.5-pro",
152
+ temperature=0.5,
153
+ top_p=0.5,
154
+ top_k=16
155
+ )
156
+
157
+ response = chat_model.invoke([
158
+ {"role": "user", "content": [
159
+ {"type": "image_url", "image_url": {"url": f"file://{image_path}"}},
160
+ {"type": "text", "text": user_text}
161
+ ]}
162
+ ])
163
+
164
+ return response.content
165
+ except Exception as e:
166
+ return f"圖片分析失敗: {e}"
167
+
168
+
169
+ # ==========================
170
+ # LangChain 代理人設定
171
+ # ==========================
172
+
173
+ # 結合所有工具
174
+ tools = [generate_and_upload_image, analyze_image_with_text]
175
+
176
+ # 建立 LLM 模型實例
177
+ llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", temperature=0.5)
178
+
179
+ # 建立提示模板
180
+ prompt_template = ChatPromptTemplate.from_messages([
181
+ ("system", "你是一個強大的助理,可以根據用戶的請求使用提供的工具。"),
182
+ ("user", "{input}"),
183
+ ("placeholder", "{agent_scratchpad}"),
184
+ ])
185
+
186
+ # 建立代理人
187
+ agent = create_tool_calling_agent(llm, tools, prompt_template)
188
+ agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
189
+
190
+
191
+ # ==========================
192
+ # FastAPI 路由
193
+ # ==========================
194
+
195
  @app.get("/")
196
  def root():
197
  return {"title": "Line Bot"}
198
 
 
199
  @app.post("/webhook")
200
  async def webhook(
201
  request: Request,
202
  background_tasks: BackgroundTasks,
203
  x_line_signature=Header(None),
204
  ):
 
205
  body = await request.body()
206
  try:
 
207
  background_tasks.add_task(
208
  line_handler.handle, body.decode("utf-8"), x_line_signature
209
  )
210
  except InvalidSignatureError:
 
211
  raise HTTPException(status_code=400, detail="Invalid signature")
212
  return "ok"
 
213
 
 
 
 
 
 
214
  @line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
215
+ def handle_message(event):
216
  user_id = event.source.user_id
217
+
218
+ # 處理圖片上傳
219
+ if event.message.type == "image":
220
+ image_path = get_image_url_from_line(event.message.id)
221
+ if image_path:
222
+ store_user_message(user_id, "image", image_path)
223
+ line_bot_api.reply_message(
224
+ event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~")
225
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226
  else:
227
+ line_bot_api.reply_message(
228
+ event.reply_token, TextSendMessage(text="沒有接收到圖片~")
 
229
  )
230
  return
231
+
232
+ # 處理文字訊息
233
+ if event.message.type == "text":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
  user_text = event.message.text
235
+ previous_message = get_previous_message(user_id)
236
+
237
+ # 根據上一則訊息類型,動態傳遞給代理人
238
+ if previous_message["type"] == "image":
239
+ image_path = previous_message["content"]
240
+ agent_input = {
241
+ "input": f"這是一張圖片在 {image_path},請根據圖片和我的問題回答:{user_text}"
242
+ }
243
+ # 清除上一則圖片訊息,避免重複觸發
244
+ user_message_history[user_id].pop()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
245
  else:
246
+ agent_input = {"input": user_text}
247
+
248
+ try:
249
+ # 運行代理人
250
+ response = agent_executor.invoke(agent_input)
251
+ out = response["output"]
252
+
253
+ except Exception as e:
254
+ print(f"代理人執行出錯: {e}")
255
+ out = f"代理人執行出錯!請換個說法或稍後再試!錯誤訊息:{e}"
256
+
257
+ line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
258
 
259
  if __name__ == "__main__":
260
+ uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)