| import json, os |
| import gradio as gr |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status |
| import google.generativeai as genai |
| import base64 |
| from collections import defaultdict |
| from linebot import LineBotApi, WebhookHandler |
| from linebot.exceptions import InvalidSignatureError |
| from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage, ImageMessage |
| import PIL.Image |
| import tempfile |
|
|
| |
| genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) |
|
|
| |
| generation_config = genai.types.GenerationConfig(max_output_tokens=1000, temperature=0.2, top_p=0.5, top_k=16) |
|
|
| |
| model = genai.GenerativeModel('gemini-2.0-flash-exp', system_instruction="主要用繁體中文回答,但如果用戶使用詢問英文問題,就用英文回應。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題", generation_config=generation_config) |
|
|
| |
| line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"]) |
| line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"]) |
|
|
| |
| working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true" |
|
|
| |
| app = FastAPI() |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| @app.get("/") |
| def root(): |
| return {"title": "Line Bot"} |
|
|
| |
| @app.post("/webhook") |
| async def webhook( |
| request: Request, |
| background_tasks: BackgroundTasks, |
| x_line_signature=Header(None), |
| ): |
| |
| body = await request.body() |
| try: |
| |
| background_tasks.add_task( |
| line_handler.handle, body.decode("utf-8"), x_line_signature |
| ) |
| except InvalidSignatureError: |
| |
| raise HTTPException(status_code=400, detail="Invalid signature") |
| return "ok" |
| |
| |
| |
| |
| def upload_image_to_imgur_with_token(image_path, access_token): |
| """ |
| 使用 Imgur OAuth2 Access Token 上傳圖片 |
| """ |
| try: |
| with open(image_path, "rb") as img_file: |
| image_binary = img_file.read() |
|
|
| headers = {"Authorization": f"Bearer {access_token}"} |
|
|
| data = { |
| "image": base64.b64encode(image_binary).decode("utf-8"), |
| "type": "base64", |
| } |
|
|
| response = httpx.post("https://api.imgur.com/3/image", headers=headers, data=data) |
| if response.status_code == 200: |
| return response.json()["data"]["link"] |
| else: |
| print("Imgur 上傳失敗:", response.text) |
| return None |
| except Exception as e: |
| print("圖片上傳例外:", e) |
| return None |
|
|
| |
| |
| |
| |
| def get_image_url(message_id): |
| try: |
| message_content = line_bot_api.get_message_content(message_id) |
| file_path = f"/tmp/{message_id}.png" |
| with open(file_path, "wb") as f: |
| for chunk in message_content.iter_content(): |
| f.write(chunk) |
| return file_path |
| except Exception as e: |
| print(f"Error getting image: {e}") |
| return None |
|
|
| |
| user_message_history = defaultdict(list) |
| def store_user_message(user_id, message_type, message_content): |
| """ |
| 儲存用戶的訊息 |
| """ |
| user_message_history[user_id].append({ |
| "type": message_type, |
| "content": message_content}) |
| |
| def analyze_with_gemini(image_path, user_text): |
| """ |
| 分析用戶問題和圖片,並返回 Gemini 的回應 |
| """ |
| try: |
| |
| if not os.path.exists(image_path): |
| raise FileNotFoundError(f"圖片路徑無效:{image_path}") |
| |
| organ = PIL.Image.open(image_path) |
| response = chat.send_message([user_text, organ]) |
| |
| |
| return response.text |
|
|
| except Exception as e: |
| return f"發生錯誤: {e}" |
| |
|
|
| def get_previous_message(user_id): |
| """ |
| 獲取用戶的上一則訊息 |
| """ |
| if user_id in user_message_history and len(user_message_history[user_id]) > 0: |
| |
| return user_message_history[user_id][-1] |
| return None |
|
|
| |
| |
| |
| |
| chat_sessions = {} |
| @line_handler.add(MessageEvent, message=(ImageMessage, TextMessage)) |
| def handle_image_message(event): |
| user_id = event.source.user_id |
| chat = chat_sessions.get(user_id) or model.start_chat(history=[]) |
| chat_sessions[user_id] = chat |
|
|
| user_text = event.message.text if event.message.type == "text" else None |
|
|
| if user_text and user_text.startswith("請幫我生成圖片"): |
| prompt = user_text.replace("請幫我生成圖片", "").strip() |
|
|
| image_model = genai.GenerativeModel("gemini-2.0-flash-exp-image-generation") |
| response = image_model.generate_content(prompt) |
|
|
| if response.parts and hasattr(response.parts[0], "inline_data"): |
| image_data = response.parts[0].inline_data.data |
| access_token = os.environ.get("IMGUR_ACCESS_TOKEN") |
| image_url = upload_image_to_imgur_with_token(image_data,access_token) |
|
|
| if image_url: |
| line_bot_api.reply_message( |
| event.reply_token, |
| [ |
| TextSendMessage(text="這是我為你生成的圖片喔~ ✨"), |
| ImageSendMessage(original_content_url=image_url, preview_image_url=image_url) |
| ] |
| ) |
| else: |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成失敗,請稍後再試~")) |
| return |
|
|
| if event.message.type == "image": |
| image_path = get_image_url(event.message.id) |
| if image_path: |
| store_user_message(user_id, "image", image_path) |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~")) |
| else: |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text="沒有接收到圖片~")) |
| return |
|
|
| previous_message = get_previous_message(user_id) |
| if previous_message and previous_message["type"] == "image" and event.message.type == "text": |
| image_path = previous_message["content"] |
| user_text = event.message.text |
| store_user_message(user_id, "text", user_text) |
| try: |
| if not os.path.exists(image_path): |
| raise FileNotFoundError(f"圖片路徑無效:{image_path}") |
| organ = PIL.Image.open(image_path) |
| completion = chat.send_message([user_text, organ]) |
| out = completion.text |
| except Exception as e: |
| out = f"發生錯誤: {e}" |
| else: |
| if event.message.type != "text": |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~")) |
| return |
| if event.message.text == "再見": |
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text="Bye!")) |
| return |
| if working_status: |
| try: |
| prompt = event.message.text |
| store_user_message(user_id, "text", prompt) |
| completion = chat.send_message(prompt) |
| out = completion.text if completion.text else "我不太懂什麼意思也~" |
| except: |
| out = "執行出錯!請換個說法!" |
|
|
| line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out)) |
|
|
|
|
| if __name__ == "__main__": |
| |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |