File size: 7,607 Bytes
de1e300
 
8ffc1e2
de1e300
 
 
 
 
 
6c1cb43
 
4a5e3f1
 
6c1cb43
 
de1e300
 
 
6c1cb43
8e6ac56
d390efa
6c1cb43
de1e300
 
 
6c1cb43
9a5e624
7951177
9a5e624
7951177
8e6ac56
 
 
de1e300
a412fae
 
de1e300
 
98dc21e
8e6ac56
55b1be9
de1e300
 
 
b59435f
98dc21e
55b1be9
 
de1e300
 
 
 
55b1be9
 
8e6ac56
 
 
eec3bfc
8e6ac56
 
 
 
 
 
 
 
 
de1e300
8e6ac56
 
 
 
 
 
 
 
 
 
 
 
de1e300
8e6ac56
8ffc1e2
8e6ac56
4176f0e
de1e300
 
8e6ac56
 
 
 
de1e300
8e6ac56
 
 
 
 
de1e300
8e6ac56
 
de1e300
eec3bfc
de1e300
8e6ac56
de1e300
8e6ac56
 
 
de1e300
8e6ac56
 
de1e300
a6fab3b
8ffc1e2
251fb1a
4176f0e
de1e300
552dc60
de1e300
cc84f2e
de1e300
8e6ac56
 
 
 
 
8ffc1e2
4176f0e
8e6ac56
de1e300
82a6007
de1e300
 
d201588
42558dd
d201588
b110f31
8e6ac56
 
 
 
 
55b1be9
6482098
de1e300
2cf7795
6482098
de1e300
0a52d39
3a2e91a
de1e300
3a2e91a
de1e300
0385c62
8ff7523
b92f1fc
8e6ac56
fbc391f
8e6ac56
8ffc1e2
eec3bfc
8e6ac56
 
 
8ffc1e2
8e6ac56
8ffc1e2
eec3bfc
de1e300
8e6ac56
de1e300
8e6ac56
 
8ffc1e2
eec3bfc
e4477cb
8e6ac56
de1e300
8e6ac56
 
 
1da4fc3
8ffc1e2
eec3bfc
8ffc1e2
de1e300
eec3bfc
 
 
 
8ffc1e2
eec3bfc
 
 
 
 
de1e300
eec3bfc
8ffc1e2
4123c19
3bf6983
eec3bfc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
import io
import re
import PIL.Image
import uvicorn
from collections import defaultdict
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles

# Google GenAI SDK
from google import genai
from google.genai import types

# Line Bot
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import (
    MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, ImageMessage,
)

# LangChain imports (修正後)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
# 直接使用標準路徑,不再使用 try-except
# 從具體路徑匯入 AgentExecutor
from langchain.agents.agent import AgentExecutor
from langchain.agents import create_tool_calling_agent

# ==========================
#  環境設定與工具函式
# ==========================
google_api = os.environ.get("GOOGLE_API_KEY")
genai_client = genai.Client(api_key=google_api)

line_bot_api = LineBotApi(os.environ.get("CHANNEL_ACCESS_TOKEN"))
line_handler = WebhookHandler(os.environ.get("CHANNEL_SECRET"))

user_message_history = defaultdict(list)
app = FastAPI()

if not os.path.exists("static"):
    os.makedirs("static")
app.mount("/static", StaticFiles(directory="static"), name="static")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

def get_image_url_from_line(message_id):
    try:
        message_content = line_bot_api.get_message_content(message_id)
        file_path = f"static/{message_id}.png"
        with open(file_path, "wb") as f:
            for chunk in message_content.iter_content():
                f.write(chunk)
        return file_path
    except Exception as e:
        print(f"❌ 圖片取得失敗:{e}")
        return None

def store_user_message(user_id, message_type, message_content):
    user_message_history[user_id].append({"type": message_type, "content": message_content})

def get_previous_message(user_id):
    if user_id in user_message_history and len(user_message_history[user_id]) > 0:
        return user_message_history[user_id][-1]
    return {"type": "text", "content": "No message!"}

# ==========================
#  LangChain 工具定義
# ==========================

@tool
def generate_and_upload_image(prompt: str) -> str:
    """根據文字提示生成圖片。"""
    try:
        # 修正:2026 年請使用實質存在的模型 imagen-3.0
        response = genai_client.models.generate_content(
            model="gemini-2.5-flash-image", 
            contents=prompt,
            config=types.GenerateContentConfig(response_modalities=['IMAGE'])
        )
        
        image_binary = None
        for part in response.candidates[0].content.parts:
            if part.inline_data:
                image_binary = part.inline_data.data
                break
        
        if image_binary:
            image = PIL.Image.open(io.BytesIO(image_binary))
            file_name = f"static/{os.urandom(8).hex()}.png"
            image.save(file_name, format="PNG")
            
            base_url = os.getenv("HF_SPACE", "http://localhost:7860").rstrip("/")
            return f"{base_url}/{file_name}"
        return "圖片生成失敗:模型未回傳數據。"
    except Exception as e:
        return f"圖片生成失敗: {e}"

@tool
def analyze_image_with_text(image_path: str, user_text: str) -> str:
    """根據圖片路徑和文字提問進行分析。"""
    try:
        if not os.path.exists(image_path):
            return "錯誤:找不到該圖片檔案。"
        img_user = PIL.Image.open(image_path)
        # 修正模型:gemini-1.5-flash 或 2.0-flash
        response = genai_client.models.generate_content(
            model="gemini-2.5-flash",
            contents=[img_user, user_text]
        )
        return response.text if response.text else "Gemini 沒答案!"
    except Exception as e:
        return f"分析出錯: {e}"

# ==========================
#  LangChain 代理人設定
# ==========================
tools = [generate_and_upload_image, analyze_image_with_text]
# 修正模型:gemini-3-flash-preview 目前在正式 API 中通常不可用
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.4)

prompt_template = ChatPromptTemplate.from_messages([
    ("system", "你是一個強大的助理。在使用工具之前,請務必先說明你的思考步驟。如果生成了圖片,請直接給出 URL。"),
    ("user", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

agent = create_tool_calling_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=False, handle_parsing_errors=True)

# ==========================
#  FastAPI 路由
# ==========================

@app.get("/")
def root():
    return {"status": "running"}

@app.post("/webhook")
async def webhook(request: Request, background_tasks: BackgroundTasks, x_line_signature=Header(None)):
    body = await request.body()
    try:
        background_tasks.add_task(line_handler.handle, body.decode("utf-8"), x_line_signature)
    except InvalidSignatureError:
        raise HTTPException(status_code=400)
    return "ok"

@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
def handle_message(event):
    user_id = event.source.user_id
    
    # 1. 處理圖片訊息
    if isinstance(event.message, ImageMessage):
        image_path = get_image_url_from_line(event.message.id)
        if image_path:
            store_user_message(user_id, "image", image_path)
            line_bot_api.reply_message(event.reply_token, TextSendMessage(text="收到圖片了!請問你想對這張圖做什麼分析?"))
    
    # 2. 處理文字訊息 (修正縮進,確保它與上面的 if 對齊)
    elif isinstance(event.message, TextMessage):
        user_text = event.message.text
        previous_message = get_previous_message(user_id)
        
        if previous_message["type"] == "image":
            image_path = previous_message["content"]
            agent_input = {"input": f"請分析這張圖片 {image_path},問題是:{user_text}"}
            user_message_history[user_id].pop()
        else:
            agent_input = {"input": user_text}
        
        try:
            response = agent_executor.invoke(agent_input)
            out = response["output"]
            
            # 搜尋 URL 邏輯
            urls = re.findall(r'https?://[^\s<>"]+|www\.[^\s<>"]+', out)
            image_url = next((u for u in urls if any(ext in u.lower() for ext in ['.png', '.jpg'])), None)
            
            if image_url:
                line_bot_api.reply_message(
                    event.reply_token,
                    [
                        TextSendMessage(text="這是我為你生成的圖片:"),
                        ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
                    ]
                )
            else:
                line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
        except Exception as e:
            print(f"Agent Error: {e}")
            line_bot_api.reply_message(event.reply_token, TextSendMessage(text="抱歉,我現在無法處理這個請求。"))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)