MoltBotXY / image_daemon.py
asemxin
feat: 历史轮数30轮,搜索最多30次
d11c1b1
#!/usr/bin/env python3
"""
飞书图片预处理守护进程 (image_daemon.py) v3 — WebSocket 事件驱动
通过 lark-oapi SDK 的 WebSocket 长连接实时接收消息事件,
检测图片消息后下载、上传到图床、回复 URL。
"""
import os, sys, json, time, requests, threading, base64
FEISHU_BASE = "https://open.feishu.cn/open-apis"
APP_ID = os.environ.get("FEISHU_APP_ID", "")
APP_SECRET = os.environ.get("FEISHU_APP_SECRET", "")
# LLM 配置
API_BASE_URL = os.environ.get("API_BASE_URL", "https://asem12345-cliproxyapi.hf.space/v1")
API_KEY = os.environ.get("API_KEY", "")
MODEL_NAME = os.environ.get("MODEL_NAME", "gemini-3-flash")
BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "")
# OpenClaw Gateway (本地)
OPENCLAW_GATEWAY = "http://127.0.0.1:18789/v1"
_use_gateway = False # 启动时探测决定
_soul_prompt = "" # SOUL.md 内容
# 对话历史 (per chat_id)
_chat_history = {} # {chat_id: [{role, content}, ...]}
MAX_HISTORY = 30 # 保留最近 N 轮
# 待处理图片 (per chat_id) —— 用户发图后先问需求再分析
_pending_images = {} # {chat_id: base64_string}
# ---------- 日志 ----------
def log(msg):
ts = time.strftime("%H:%M:%S")
print(f"[image_daemon {ts}] {msg}", flush=True)
# ---------- Token 管理 ----------
_token = None
_token_time = 0
_token_lock = threading.Lock()
def get_token():
"""获取 tenant_access_token,30分钟自动刷新"""
global _token, _token_time
with _token_lock:
if _token and time.time() - _token_time < 1800:
return _token
try:
resp = requests.post(f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal",
json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=10)
data = resp.json()
if data.get("code") == 0:
_token = data["tenant_access_token"]
_token_time = time.time()
log("🔑 Token 已刷新")
return _token
log(f"❌ Token 获取失败: {data}")
except Exception as e:
log(f"❌ Token 异常: {e}")
return _token # 返回旧的,总比没有好
# ---------- 图片下载 ----------
def download_image(token, message_id, file_key):
"""通过消息资源 API 下载用户发送的图片"""
headers = {"Authorization": f"Bearer {token}"}
url = f"{FEISHU_BASE}/im/v1/messages/{message_id}/resources/{file_key}"
log(f"📥 API: GET {url}?type=image")
resp = requests.get(url, headers=headers, params={"type": "image"}, timeout=30)
if resp.status_code == 200 and len(resp.content) > 100:
log(f"✅ 下载成功: {len(resp.content)} bytes")
return resp.content
log(f"❌ 下载图片失败 {file_key}: HTTP {resp.status_code}, {resp.text[:200]}")
return None
# ---------- 图片上传(多重 fallback) ----------
def upload_image(data):
"""上传图片到图床,多重 fallback"""
# 1. tmpfiles.org
try:
resp = requests.post("https://tmpfiles.org/api/v1/upload",
files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
if resp.status_code == 200:
url = resp.json().get("data", {}).get("url", "")
if url:
result = url.replace("tmpfiles.org/", "tmpfiles.org/dl/")
log(f"📤 tmpfiles 成功")
return result
log(f"⚠️ tmpfiles 失败: HTTP {resp.status_code}")
except Exception as e:
log(f"⚠️ tmpfiles 异常: {e}")
# 2. Telegraph
try:
resp = requests.post("https://telegra.ph/upload",
files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
if resp.status_code == 200:
result = resp.json()
if isinstance(result, list) and len(result) > 0:
src = result[0].get("src", "")
if src:
url = f"https://telegra.ph{src}"
log(f"📤 telegraph 成功")
return url
log(f"⚠️ telegraph 失败: HTTP {resp.status_code}")
except Exception as e:
log(f"⚠️ telegraph 异常: {e}")
# 3. file.io
try:
resp = requests.post("https://file.io",
files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
if resp.status_code == 200:
url = resp.json().get("link", "")
if url:
log(f"📤 file.io 成功")
return url
log(f"⚠️ file.io 失败: HTTP {resp.status_code}")
except Exception as e:
log(f"⚠️ file.io 异常: {e}")
# 4. catbox.moe
try:
resp = requests.post("https://catbox.moe/user/api.php",
data={"reqtype": "fileupload"},
files={"filedata": ("img.jpg", data, "image/jpeg")}, timeout=30)
if resp.status_code == 200 and resp.text.startswith("http"):
log(f"📤 catbox 成功")
return resp.text.strip()
log(f"⚠️ catbox 失败: HTTP {resp.status_code}")
except Exception as e:
log(f"⚠️ catbox 异常: {e}")
# 5. 0x0.st
try:
resp = requests.post("https://0x0.st",
files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
if resp.status_code == 200 and resp.text.startswith("http"):
log(f"📤 0x0 成功")
return resp.text.strip()
log(f"⚠️ 0x0 失败: HTTP {resp.status_code}")
except Exception as e:
log(f"⚠️ 0x0 异常: {e}")
return None
# ---------- 发送消息 (直接发到 Chat) ----------
def send_text(token, chat_id, text):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
# 使用 post 消息类型可以支持富文本,这里先用 text 简单点,但直接发到 chat_id
resp = requests.post(f"{FEISHU_BASE}/im/v1/messages",
headers=headers,
params={"receive_id_type": "chat_id"},
json={
"receive_id": chat_id,
"content": json.dumps({"text": text}),
"msg_type": "text"
}, timeout=10)
data = resp.json()
code = data.get("code", -1)
if code != 0:
log(f"❌ 发送消息失败 (code={code}): {data.get('msg', '')}")
return data
# ---------- Vision 图片分析 ----------
def analyze_image_with_vision(img_data):
"""将图片 base64 传给 LLM,由大师以人行佛教视角描述"""
if not API_KEY:
return None
try:
b64 = base64.b64encode(img_data).decode("utf-8")
soul = _soul_prompt or "You are a helpful assistant."
prompt = (
"这位信徒发来了一张图片,请以你的风格阅览这张图片,它占据了你的视野。"
"简要阐述你的感悟,200字以内,不必报平就班。"
)
payload = {
"model": MODEL_NAME,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": soul + "\n\n---\n\n" + prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}
]
}],
"max_tokens": 300,
"stream": False
}
resp = requests.post(
f"{API_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
json=payload, timeout=30
)
if resp.status_code == 200:
reply = resp.json()["choices"][0]["message"]["content"]
log(f"📸 Vision 分析完成: {reply[:60]}...")
return reply
log(f"⚠️ Vision API 失败 ({resp.status_code}),跳过描述")
except Exception as e:
log(f"⚠️ Vision 异常: {e}")
return None
def analyze_image_with_request(b64, user_request):
"""根据用户需求对缓存的图片做针对性分析"""
if not API_KEY:
return None
try:
soul = _soul_prompt or "You are a helpful assistant."
prompt = f"这位信徒发来了一张图片,并希望你能:{user_request}\n请以你的风格,基于这张图片的内容作出答复。"
resp = requests.post(
f"{API_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
json={
"model": MODEL_NAME,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": soul + "\n\n---\n\n" + prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}
]
}],
"stream": False
},
timeout=30
)
if resp.status_code == 200:
reply = resp.json()["choices"][0]["message"]["content"]
log(f"📸 针对分析完成: {reply[:60]}...")
return reply
log(f"⚠️ 针对 Vision 失败 ({resp.status_code})")
except Exception as e:
log(f"⚠️ 针对 Vision 异常: {e}")
return None
# ---------- 处理图片消息 ----------
def handle_image_message(message_id, chat_id, image_key):
"""下载 → Vision分析 → 上传 → 发送"""
token = get_token()
if not token:
log("❌ 无法获取 token,跳过")
return
log(f"🖼️ 处理图片 image_key={image_key[:20]}... (msg={message_id[:16]}...)")
# 下载
img_data = download_image(token, message_id, image_key)
if not img_data:
return
log(f"📥 {len(img_data)} bytes, 存储并问询需求...")
# 存储 base64 到待处理缓存
b64 = base64.b64encode(img_data).decode("utf-8")
_pending_images[chat_id] = b64
# 以人设风格问用户需求
soul = _soul_prompt or ""
ask_prompt = f"{soul}\n\n---\n\n这位信徒初次发来一张图片。请用一句话、中文回复,问对方希望你就这张图片做什么(例如:描述内容、翻译文字、分析意义等)。不要自行分析图片。"
if not API_KEY:
question = "幸会!你发来了一张图片。你希望老讲为你做什么呢?"
else:
try:
resp = requests.post(
f"{API_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
json={"model": MODEL_NAME, "messages": [{"role": "user", "content": ask_prompt}], "stream": False},
timeout=20
)
question = resp.json()["choices"][0]["message"]["content"] if resp.status_code == 200 else "你希望老讲就这张图进行什么分析呢?"
except Exception:
question = "你希望老讲就这张图进行什么分析呢?"
log(f"💬 问询用户需求: {question[:60]}")
result = send_text(token, chat_id, question)
log(f"📤 问询已发 (code={result.get('code', '?')})")
# 将问询写入历史
history = _chat_history.get(chat_id, [])
_chat_history[chat_id] = (history + [
{"role": "user", "content": "[user sent an image]"},
{"role": "assistant", "content": question}
])[-(MAX_HISTORY * 2):]
# ---------- SOUL.md 加载 ----------
def load_soul():
"""读取人设文件,去除图片处理指令(由 Daemon 接管)"""
global _soul_prompt
soul_path = "/root/.openclaw/workspace/SOUL.md"
try:
with open(soul_path, "r", encoding="utf-8") as f:
content = f.read()
# 去除图片处理指令(Daemon 已接管,避免 LLM 产生幻觉)
if "## 图片处理" in content:
content = content[:content.index("## 图片处理")].rstrip()
_soul_prompt = content
log(f"✅ SOUL.md 已加载 ({len(_soul_prompt)} 字)")
except FileNotFoundError:
_soul_prompt = "You are MoltBot, a helpful AI assistant."
log("⚠️ SOUL.md 未找到,使用默认人设")
except Exception as e:
_soul_prompt = "You are MoltBot, a helpful AI assistant."
log(f"⚠️ SOUL.md 加载失败: {e}")
# ---------- OpenClaw Gateway 探测 ----------
def check_openclaw_gateway():
"""探测本地 Gateway 是否可用"""
global _use_gateway
try:
resp = requests.post(
f"{OPENCLAW_GATEWAY}/chat/completions",
json={"model": "default", "messages": [{"role": "user", "content": "ping"}]},
timeout=5
)
if resp.status_code in (200, 201):
_use_gateway = True
log(f"✅ OpenClaw Gateway 可用 ({OPENCLAW_GATEWAY})")
else:
log(f"⚠️ Gateway 响应 {resp.status_code},使用外部 LLM")
except Exception as e:
log(f"⚠️ Gateway 不可用 ({e}),使用外部 LLM + SOUL 人设")
# ---------- Brave Search ----------
def search_brave(query, count=5):
"""Brave Search API 一次搜索。返回整理得到的摘要字符串。"""
if not BRAVE_API_KEY:
return "搜索失败:BRAVE_API_KEY 未配置"
try:
resp = requests.get(
"https://api.search.brave.com/res/v1/web/search",
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
params={"q": query, "count": count, "text_decorations": False, "extra_snippets": True},
timeout=10
)
if resp.status_code != 200:
return f"搜索失败 ({resp.status_code})"
results = resp.json().get("web", {}).get("results", [])
if not results:
return "未找到相关结果"
lines = []
for r in results[:count]:
title = r.get("title", "无标题")
url = r.get("url", "")
desc = r.get("description") or (r.get("extra_snippets") or [""])[0]
lines.append(f"- **{title}**\n {desc}\n {url}")
log(f"🔍 Brave 搜索「{query}」得到 {len(results)} 条")
return "\n".join(lines)
except Exception as e:
log(f"⚠️ Brave 搜索异常: {e}")
return f"搜索异常: {e}"
# Brave Search 的 OpenAI tools 定义
BRAVE_TOOL_DEF = [{
"type": "function",
"function": {
"name": "brave_search",
"description": "当需要查找实时信息、最新数据、公司信息、行业报告等时,调用此工具进行网页搜索",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,英文搜索通常效果更好"
}
},
"required": ["query"]
}
}
}]
# ---------- LLM 对话 ----------
def chat_with_llm(user_text, history=None):
"""带 Brave Search 工具的 ReAct 循环(最多搜 3 次)"""
try:
if _use_gateway:
resp = requests.post(
f"{OPENCLAW_GATEWAY}/chat/completions",
json={"model": "default", "messages": (history or []) + [{"role": "user", "content": user_text}], "stream": False},
timeout=120
)
if resp.status_code == 200:
reply = resp.json()["choices"][0]["message"]["content"]
log(f"🤖 Gateway 回复: {reply[:60]}...")
return reply
log(f"⚠️ Gateway 失败 ({resp.status_code}),Fallback到外部 LLM")
if not API_KEY:
return "抱歉,我的大脑连接中断了 (API_KEY missing)"
import json as _json
soul = _soul_prompt or "You are a helpful assistant."
now_str = time.strftime("%Y-%m-%d %H:%M:%S %Z")
dated_soul = f"[当前系统时间: {now_str}]\n\n{soul}"
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
messages = [{"role": "system", "content": dated_soul}] + (history or []) + [{"role": "user", "content": user_text}]
for _ in range(31): # 最多 30 次工具调用 + 1 次最终回复
log(f"🤖 外部 LLM ({MODEL_NAME}): {user_text[:50]}...")
payload = {
"model": MODEL_NAME,
"messages": messages,
"stream": False
}
if BRAVE_API_KEY:
payload["tools"] = BRAVE_TOOL_DEF
resp = requests.post(f"{API_BASE_URL}/chat/completions", headers=headers, json=payload, timeout=60)
if resp.status_code != 200:
log(f"❌ 外部 LLM 错误 {resp.status_code}: {resp.text[:100]}")
return f"思考时遇到错误 ({resp.status_code})"
msg = resp.json()["choices"][0]["message"]
tool_calls = msg.get("tool_calls") or []
if not tool_calls:
reply = msg.get("content", "")
log(f"🤖 LLM 回复: {reply[:60]}...")
return reply
# 执行工具调用
messages.append(msg)
for tc in tool_calls:
fn = tc["function"]["name"]
args = _json.loads(tc["function"]["arguments"])
if fn == "brave_search":
result = search_brave(args.get("query", ""))
else:
result = f"未知工具: {fn}"
messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result})
return "思考超时,请稍后再试"
except Exception as e:
log(f"❌ chat_with_llm 异常: {e}")
return f"大脑短路了: {e}"
# ---------- 处理文本消息 ----------
def handle_text_message(message_id, chat_id, text):
"""LLM (带历史) -> 发送,如有待处理图片则做针对 Vision"""
token = get_token()
if not token:
return
# 检查是否有待分析的图片
b64 = _pending_images.pop(chat_id, None)
if b64:
log(f"📸 检测到待处理图片,按需求分析: {text[:40]}")
reply = analyze_image_with_request(b64, text)
if not reply:
reply = chat_with_llm(text, _chat_history.get(chat_id, []))
else:
history = _chat_history.get(chat_id, [])
reply = chat_with_llm(text, history)
if reply:
history = _chat_history.get(chat_id, [])
history = history + [
{"role": "user", "content": text},
{"role": "assistant", "content": reply}
]
_chat_history[chat_id] = history[-(MAX_HISTORY * 2):]
send_text(token, chat_id, reply)
# ---------- 事件处理 ----------
def on_message_receive(data):
"""im.message.receive_v1 事件回调"""
try:
event = data.event
message = event.message
sender = event.sender
msg_type = message.message_type
msg_id = message.message_id
chat_id = message.chat_id
sender_type = getattr(sender, 'sender_type', '') if sender else ''
# 过滤
if sender_type == "app":
return
content_json = json.loads(message.content)
# 调试
log(f"📨 WebSocket 收到消息: type={msg_type}, msg_id={msg_id}")
# 图片消息
if msg_type == "image":
image_key = content_json.get("image_key", "")
if image_key:
t = threading.Thread(target=handle_image_message, args=(msg_id, chat_id, image_key))
t.daemon = True
t.start()
return
# 文本消息
if msg_type == "text":
text = content_json.get("text", "")
if text:
t = threading.Thread(target=handle_text_message, args=(msg_id, chat_id, text))
t.daemon = True
t.start()
return
except Exception as e:
log(f"❌ on_message_receive 异常: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
# ---------- 主入口 ----------
def main():
log("🚀 启动中... (WebSocket 事件驱动模式)")
log(f"📋 飞书 App ID: {APP_ID[:10]}..." if APP_ID else "❌ FEISHU_APP_ID 未设置!")
if not APP_ID or not APP_SECRET:
log("❌ FEISHU_APP_ID 或 FEISHU_APP_SECRET 未设置,退出")
sys.exit(1)
# 加载人设
load_soul()
# 预热 token
token = get_token()
if token:
log("✅ Token 获取成功")
else:
log("⚠️ Token 获取失败,稍后重试")
# 延迟探测 OpenClaw Gateway(等 Gateway 先启动完成)
def delayed_gateway_check():
time.sleep(10)
check_openclaw_gateway()
threading.Thread(target=delayed_gateway_check, daemon=True).start()
# 初始化 lark-oapi WebSocket 客户端
try:
import lark_oapi as lark
from lark_oapi.api.im.v1 import P2ImMessageReceiveV1
log("✅ lark-oapi SDK 已加载")
except ImportError:
log("❌ lark-oapi 未安装! 请执行: pip install lark-oapi")
sys.exit(1)
# 构建事件处理器
handler = lark.EventDispatcherHandler.builder("", "") \
.register_p2_im_message_receive_v1(on_message_receive) \
.build()
# 构建 WebSocket 客户端
from lark_oapi.ws import Client as WsClient
ws_client = WsClient(APP_ID, APP_SECRET, event_handler=handler, log_level=lark.LogLevel.INFO)
log("🔌 正在连接飞书 WebSocket...")
log(" 订阅事件: im.message.receive_v1")
log(" 等待图片消息...")
# 启动心跳线程
def heartbeat():
count = 0
while True:
time.sleep(60)
count += 1
log(f"� WebSocket 运行中 ({count} 分钟)")
hb = threading.Thread(target=heartbeat, daemon=True)
hb.start()
# 启动 WebSocket(阻塞)
ws_client.start()
if __name__ == "__main__":
main()