hazelhh commited on
Commit
be612ee
·
verified ·
1 Parent(s): 552dc60

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +146 -219
main.py CHANGED
@@ -1,51 +1,36 @@
1
- import os
2
- import io
3
- import tempfile
4
- from collections import defaultdict
5
  from fastapi.middleware.cors import CORSMiddleware
6
- from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException
7
- from fastapi.staticfiles import StaticFiles
8
- from google import genai
9
- from google.genai import types
10
  from linebot import LineBotApi, WebhookHandler
11
  from linebot.exceptions import InvalidSignatureError
12
- from linebot.models import (
13
- MessageEvent,
14
- TextMessage,
15
- TextSendMessage,
16
- ImageSendMessage,
17
- ImageMessage,
18
- )
19
- import PIL.Image
20
  import uvicorn
21
 
22
- # LangChain 相關匯入
23
- from langchain_core.prompts import ChatPromptTemplate
24
- from langchain_core.tools import tool
25
- from langchain_google_genai import ChatGoogleGenerativeAI
26
- from langchain.agents import AgentExecutor, create_tool_calling_agent
27
 
28
-
29
- # ==========================
30
- # 環境設定與工具函式
31
- # ==========================
32
-
33
- # 設置 Google AI API 金鑰
34
- google_api = os.environ["GOOGLE_API_KEY"]
35
- genai_client = genai.Client(api_key=google_api)
36
-
37
- # 設置 Line Bot 的 API 金鑰和秘密金鑰
38
  line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
39
  line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
40
 
41
- # 使用字典模擬用戶訊息歷史存儲
42
- user_message_history = defaultdict(list)
 
 
 
 
 
 
 
 
43
 
44
- # 建立 FastAPI 應用程式
45
  app = FastAPI()
46
- app.mount("/static", StaticFiles(directory="static"), name="static")
47
 
48
- # 設定 CORS
49
  app.add_middleware(
50
  CORSMiddleware,
51
  allow_origins=["*"],
@@ -54,141 +39,9 @@ app.add_middleware(
54
  allow_headers=["*"],
55
  )
56
 
57
- def get_image_url_from_line(message_id):
58
- """
59
- 從 Line 訊息 ID 獲取圖片內容並儲存到暫存檔案。
60
- """
61
- try:
62
- message_content = line_bot_api.get_message_content(message_id)
63
- file_path = f"/tmp/{message_id}.png"
64
- with open(file_path, "wb") as f:
65
- for chunk in message_content.iter_content():
66
- f.write(chunk)
67
- print(f"✅ 圖片成功儲存到:{file_path}")
68
- return file_path
69
- except Exception as e:
70
- print(f"❌ 圖片取得失敗:{e}")
71
- return None
72
-
73
- def store_user_message(user_id, message_type, message_content):
74
- """
75
- 儲存用戶的訊息。
76
- """
77
- user_message_history[user_id].append(
78
- {"type": message_type, "content": message_content}
79
- )
80
-
81
- def get_previous_message(user_id):
82
- """
83
- 獲取用戶的上一則訊息。
84
- """
85
- if user_id in user_message_history and len(user_message_history[user_id]) > 0:
86
- return user_message_history[user_id][-1]
87
- return {"type": "text", "content": "No message!"}
88
-
89
-
90
- # ==========================
91
- # LangChain 工具定義
92
- # ==========================
93
-
94
- @tool
95
- def generate_and_upload_image(prompt: str) -> str:
96
- """
97
- 這個工具可以根據文字提示生成圖片,並將其上傳到伺服器。
98
-
99
- Args:
100
- prompt: 用於生成圖片的文字提示。
101
-
102
- Returns:
103
- 生成圖片的 URL。
104
- """
105
- try:
106
- response = genai_client.models.generate_content(
107
- model="gemini-2.0-flash-preview-image-generation",
108
- contents=prompt,
109
- config=types.GenerateContentConfig(response_modalities=['Text', 'Image'])
110
- )
111
-
112
- image_binary = None
113
- for part in response.candidates[0].content.parts:
114
- if part.inline_data is not None:
115
- image_binary = part.inline_data.data
116
- break
117
-
118
- if image_binary:
119
- image = PIL.Image.open(io.BytesIO(image_binary))
120
- # 隨機生成一個檔案名以避免衝突
121
- file_name = f"static/{os.urandom(16).hex()}.png"
122
- image.save(file_name, format="PNG")
123
-
124
- image_url = os.path.join(os.getenv("HF_SPACE"), file_name)
125
- return image_url
126
-
127
- return "圖片生成失敗。"
128
- except Exception as e:
129
- return f"圖片生成與上傳失敗: {e}"
130
-
131
- @tool
132
- def analyze_image_with_text(image_path: str, user_text: str) -> str:
133
- """
134
- 這個工具可以根據圖片和文字提示來回答問題。
135
-
136
- Args:
137
- image_path: 圖片在本地端儲存的路徑。
138
- user_text: 針對圖片提出的文字問題。
139
-
140
- Returns:
141
- 模型針對圖片和文字提示給出的回應。
142
- """
143
- try:
144
- if not os.path.exists(image_path):
145
- return "圖片路徑無效,無法進行分析。"
146
-
147
- img_user = PIL.Image.open(image_path)
148
- response = genai_client.models.generate_content(
149
- model="gemini-2.5-flash",
150
- config=types.GenerateContentConfig(response_mime_type="application/json"),
151
- contents=[img_user, user_text]
152
- )
153
- if (response.text != None):
154
- out = response.text
155
- else:
156
- out = "Gemini沒答案!請換個說法!"
157
- except:
158
- # 處理錯誤
159
- out = "Gemini執行出錯!請換個說法!"
160
-
161
- return out
162
-
163
-
164
- # ==========================
165
- # LangChain 代理人設定
166
- # ==========================
167
-
168
- # 結合所有工具
169
- tools = [generate_and_upload_image, analyze_image_with_text]
170
-
171
- # 建立 LLM 模型實例
172
- llm = ChatGoogleGenerativeAI(google_api_key=google_api, model="gemini-2.5-flash", temperature=0.2)
173
-
174
- # 建立提示模板
175
- prompt_template = ChatPromptTemplate([
176
- ("system", "你是一個強大的助理,可以根據用戶的請求使用提供的工具。"),
177
- ("user", "{input}"),
178
- ("placeholder", "{agent_scratchpad}"),
179
- ])
180
-
181
- # 建立代理人
182
- agent = create_tool_calling_agent(llm, tools, prompt_template)
183
- agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
184
-
185
- # ==========================
186
- # FastAPI 路由
187
- # ==========================
188
-
189
  @app.get("/")
190
  def root():
191
- return {"title": "Line Bot"}
192
 
193
  @app.post("/webhook")
194
  async def webhook(
@@ -198,67 +51,141 @@ async def webhook(
198
  ):
199
  body = await request.body()
200
  try:
201
- background_tasks.add_task(
202
- line_handler.handle, body.decode("utf-8"), x_line_signature
203
- )
204
  except InvalidSignatureError:
205
  raise HTTPException(status_code=400, detail="Invalid signature")
206
  return "ok"
207
 
208
- @line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
209
- def handle_message(event):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
  user_id = event.source.user_id
 
211
 
212
- # 處理圖片上傳
213
- if event.message.type == "image":
214
- image_path = get_image_url_from_line(event.message.id)
215
- print(image_path)
216
- if image_path:
217
- store_user_message(user_id, "image", image_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
  line_bot_api.reply_message(
219
- event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~")
 
220
  )
221
  else:
222
  line_bot_api.reply_message(
223
- event.reply_token, TextSendMessage(text="沒有接收到圖片~")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  )
225
-
226
- # 處理文字訊息
227
- elif event.message.type == "text":
228
- user_text = event.message.text
229
- previous_message = get_previous_message(user_id)
230
- print(previous_message)
231
 
232
- # 根據上一則訊息類型,動態傳遞給代理人
233
- if previous_message["type"] == "image":
234
- image_path = previous_message["content"]
235
- agent_input = {
236
- "input": f"請根據這張圖片回答問題。圖片的路徑是 {image_path},我的問題是:{user_text}"
237
- }
238
- # 清除上一則圖片訊息,避免重複觸發
239
- user_message_history[user_id].pop()
240
- else:
241
- agent_input = {"input": user_text}
242
- try:
243
- # 運行代理人
244
- response = agent_executor.invoke(agent_input)
245
- out = response["output"]
246
- if 'https' in out:
247
- img_tmp = 'https'+out.split('https')[1]
248
- image_url = img_tmp.split('png')[0]+'png'
249
- line_bot_api.push_message(
250
- event.source.user_id,
251
- [
252
- TextSendMessage(text="✨ 這是我為你生成的圖片喔~"),
253
- ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
254
- ]
255
- )
256
- else:
257
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
258
- except Exception as e:
259
- print(f"代理人執行出錯: {e}")
260
- out = f"代理人執行出錯!錯誤訊息:{e}"
261
- line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
262
 
263
  if __name__ == "__main__":
264
- uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)
 
1
+ from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status
 
 
 
2
  from fastapi.middleware.cors import CORSMiddleware
 
 
 
 
3
  from linebot import LineBotApi, WebhookHandler
4
  from linebot.exceptions import InvalidSignatureError
5
+ from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageMessage
6
+ import json
7
+ import os
8
+ import requests
9
+ import base64
10
+ from collections import defaultdict
 
 
11
  import uvicorn
12
 
13
+ # 檢查環境變數
14
+ if not all(k in os.environ for k in ["CHANNEL_ACCESS_TOKEN", "CHANNEL_SECRET", "GOOGLE_API_KEY"]):
15
+ raise ValueError("Missing environment variables. Please set CHANNEL_ACCESS_TOKEN, CHANNEL_SECRET, and GOOGLE_API_KEY.")
 
 
16
 
17
+ # Line Bot API 設定
 
 
 
 
 
 
 
 
 
18
  line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
19
  line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
20
 
21
+ # 使用者狀態追蹤
22
+ user_states = defaultdict(lambda: {
23
+ "upper_body_images": [],
24
+ "lower_body_images": [],
25
+ "current_mode": None, # "upper" or "lower"
26
+ })
27
+
28
+ MAX_IMAGES = 6
29
+ GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent"
30
+ GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"]
31
 
 
32
  app = FastAPI()
 
33
 
 
34
  app.add_middleware(
35
  CORSMiddleware,
36
  allow_origins=["*"],
 
39
  allow_headers=["*"],
40
  )
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  @app.get("/")
43
  def root():
44
+ return {"title": "Line Bot 穿搭建議"}
45
 
46
  @app.post("/webhook")
47
  async def webhook(
 
51
  ):
52
  body = await request.body()
53
  try:
54
+ background_tasks.add_task(line_handler.handle, body.decode("utf-8"), x_line_signature)
 
 
55
  except InvalidSignatureError:
56
  raise HTTPException(status_code=400, detail="Invalid signature")
57
  return "ok"
58
 
59
+ def get_base64_image(message_id: str):
60
+ message_content = line_bot_api.get_message_content(message_id)
61
+ image_bytes = message_content.content
62
+ return base64.b64encode(image_bytes).decode('utf-8')
63
+
64
+ def get_gemini_response(prompt: str, images: list):
65
+ payload = {
66
+ "contents": [
67
+ {
68
+ "parts": [
69
+ {"text": prompt}
70
+ ] + [
71
+ {"inlineData": {"mimeType": "image/jpeg", "data": img}} for img in images
72
+ ]
73
+ }
74
+ ]
75
+ }
76
+
77
+ response = requests.post(f"{GEMINI_API_URL}?key={GOOGLE_API_KEY}", json=payload)
78
+ if response.status_code == 200:
79
+ return response.json()['candidates'][0]['content']['parts'][0]['text']
80
+ else:
81
+ return f"Gemini API 請求失敗:{response.status_code}, {response.text}"
82
+
83
+ @line_handler.add(MessageEvent, message=TextMessage)
84
+ def handle_text_message(event):
85
  user_id = event.source.user_id
86
+ text = event.message.text.lower()
87
 
88
+ if text == "上衣":
89
+ user_states[user_id]["current_mode"] = "upper"
90
+ line_bot_api.reply_message(
91
+ event.reply_token,
92
+ TextSendMessage(text=f"請上傳三件上衣圖片,您已上傳 {len(user_states[user_id]['upper_body_images'])}/{MAX_IMAGES} 張。")
93
+ )
94
+ elif text == "褲子":
95
+ user_states[user_id]["current_mode"] = "lower"
96
+ line_bot_api.reply_message(
97
+ event.reply_token,
98
+ TextSendMessage(text=f"請上傳三件褲子/裙子圖片,您已上傳 {len(user_states[user_id]['lower_body_images'])}/{MAX_IMAGES} 張。")
99
+ )
100
+ elif text == "重置":
101
+ user_states[user_id] = defaultdict(lambda: {"upper_body_images": [], "lower_body_images": [], "current_mode": None})[user_id]
102
+ line_bot_api.reply_message(
103
+ event.reply_token,
104
+ TextSendMessage(text="狀態已重置。請傳送「上衣」或「褲子」來開始上傳。")
105
+ )
106
+ else:
107
+ line_bot_api.reply_message(
108
+ event.reply_token,
109
+ TextSendMessage(text="請先傳送「上衣」或「褲子」來選擇要上傳的衣服類型。")
110
+ )
111
+
112
+ @line_handler.add(MessageEvent, message=ImageMessage)
113
+ def handle_image_message(event):
114
+ user_id = event.source.user_id
115
+ reply_token = event.reply_token
116
+
117
+ mode = user_states[user_id]["current_mode"]
118
+
119
+ if not mode:
120
+ line_bot_api.reply_message(
121
+ reply_token,
122
+ TextSendMessage(text="請先傳送「上衣」或「褲子」來選擇要上傳的衣服類型。")
123
+ )
124
+ return
125
+
126
+ try:
127
+ image_id = event.message.id
128
+ base64_img = get_base64_image(image_id)
129
+
130
+ if mode == "upper":
131
+ if len(user_states[user_id]["upper_body_images"]) < MAX_IMAGES:
132
+ user_states[user_id]["upper_body_images"].append(base64_img)
133
+ else:
134
+ line_bot_api.reply_message(
135
+ reply_token,
136
+ TextSendMessage(text="上衣數量已滿,請傳送「褲子」來上傳褲子圖片。")
137
+ )
138
+ return
139
+ else: # mode == "lower"
140
+ if len(user_states[user_id]["lower_body_images"]) < MAX_IMAGES:
141
+ user_states[user_id]["lower_body_images"].append(base64_img)
142
+ else:
143
+ line_bot_api.reply_message(
144
+ reply_token,
145
+ TextSendMessage(text="褲子數量已滿,請傳送「上衣」來上傳上衣圖片。")
146
+ )
147
+ return
148
+
149
+ upper_count = len(user_states[user_id]["upper_body_images"])
150
+ lower_count = len(user_states[user_id]["lower_body_images"])
151
+
152
+ if upper_count < MAX_IMAGES or lower_count < MAX_IMAGES:
153
  line_bot_api.reply_message(
154
+ reply_token,
155
+ TextSendMessage(text=f"已接收。上衣: {upper_count}/{MAX_IMAGES},褲子/裙子: {lower_count}/{MAX_IMAGES}。")
156
  )
157
  else:
158
  line_bot_api.reply_message(
159
+ reply_token,
160
+ TextSendMessage(text="已收到所有圖片,正在為您分析並產生穿搭建議... 請稍候。")
161
+ )
162
+
163
+ all_images = user_states[user_id]["upper_body_images"] + user_states[user_id]["lower_body_images"]
164
+
165
+ prompt = (
166
+ f"我提供了 {MAX_IMAGES} 張上衣圖片和 {MAX_IMAGES} 張下半身圖片(共 {MAX_IMAGES*2} 張)。"
167
+ "請根據這些衣服,為我推薦三種不同場合的穿搭建議,並盡可能將不同的上衣和下衣進行搭配。"
168
+ "請考慮以下場合:1. 約會,2. 結婚典禮,3. 工作。\n\n"
169
+ "對於每種場合,請提供一個簡短的段落說明,解釋為何這個搭配適合該場合,並詳細描述你推薦的上衣與下衣組合。請以繁體中文回答。"
170
+ )
171
+
172
+ response_text = get_gemini_response(prompt, all_images)
173
+
174
+ line_bot_api.push_message(
175
+ user_id,
176
+ TextSendMessage(text=response_text)
177
  )
 
 
 
 
 
 
178
 
179
+ # 重置狀態以便下一次使用
180
+ user_states[user_id]["upper_body_images"] = []
181
+ user_states[user_id]["lower_body_images"] = []
182
+ user_states[user_id]["current_mode"] = None
183
+
184
+ except Exception as e:
185
+ line_bot_api.reply_message(
186
+ reply_token,
187
+ TextSendMessage(text=f"圖片處理失敗,請稍後再試。錯誤:{e}")
188
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189
 
190
  if __name__ == "__main__":
191
+ uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))