#!/usr/bin/env python3 """ 飞书图片预处理守护进程 (image_daemon.py) v3 — WebSocket 事件驱动 通过 lark-oapi SDK 的 WebSocket 长连接实时接收消息事件, 检测图片消息后下载、上传到图床、回复 URL。 """ import os, sys, json, time, requests, threading, base64 FEISHU_BASE = "https://open.feishu.cn/open-apis" APP_ID = os.environ.get("FEISHU_APP_ID", "") APP_SECRET = os.environ.get("FEISHU_APP_SECRET", "") # LLM 配置 API_BASE_URL = os.environ.get("API_BASE_URL", "https://asem12345-cliproxyapi.hf.space/v1") API_KEY = os.environ.get("API_KEY", "") MODEL_NAME = os.environ.get("MODEL_NAME", "gemini-3-flash") BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "") # OpenClaw Gateway (本地) OPENCLAW_GATEWAY = "http://127.0.0.1:18789/v1" _use_gateway = False # 启动时探测决定 _soul_prompt = "" # SOUL.md 内容 # 对话历史 (per chat_id) _chat_history = {} # {chat_id: [{role, content}, ...]} MAX_HISTORY = 30 # 保留最近 N 轮 # 待处理图片 (per chat_id) —— 用户发图后先问需求再分析 _pending_images = {} # {chat_id: base64_string} # ---------- 日志 ---------- def log(msg): ts = time.strftime("%H:%M:%S") print(f"[image_daemon {ts}] {msg}", flush=True) # ---------- Token 管理 ---------- _token = None _token_time = 0 _token_lock = threading.Lock() def get_token(): """获取 tenant_access_token,30分钟自动刷新""" global _token, _token_time with _token_lock: if _token and time.time() - _token_time < 1800: return _token try: resp = requests.post(f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal", json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=10) data = resp.json() if data.get("code") == 0: _token = data["tenant_access_token"] _token_time = time.time() log("🔑 Token 已刷新") return _token log(f"❌ Token 获取失败: {data}") except Exception as e: log(f"❌ Token 异常: {e}") return _token # 返回旧的,总比没有好 # ---------- 图片下载 ---------- def download_image(token, message_id, file_key): """通过消息资源 API 下载用户发送的图片""" headers = {"Authorization": f"Bearer {token}"} url = f"{FEISHU_BASE}/im/v1/messages/{message_id}/resources/{file_key}" log(f"📥 API: GET {url}?type=image") resp = requests.get(url, headers=headers, params={"type": "image"}, timeout=30) if resp.status_code == 200 and len(resp.content) > 100: log(f"✅ 下载成功: {len(resp.content)} bytes") return resp.content log(f"❌ 下载图片失败 {file_key}: HTTP {resp.status_code}, {resp.text[:200]}") return None # ---------- 图片上传(多重 fallback) ---------- def upload_image(data): """上传图片到图床,多重 fallback""" # 1. tmpfiles.org try: resp = requests.post("https://tmpfiles.org/api/v1/upload", files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) if resp.status_code == 200: url = resp.json().get("data", {}).get("url", "") if url: result = url.replace("tmpfiles.org/", "tmpfiles.org/dl/") log(f"📤 tmpfiles 成功") return result log(f"⚠️ tmpfiles 失败: HTTP {resp.status_code}") except Exception as e: log(f"⚠️ tmpfiles 异常: {e}") # 2. Telegraph try: resp = requests.post("https://telegra.ph/upload", files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) if resp.status_code == 200: result = resp.json() if isinstance(result, list) and len(result) > 0: src = result[0].get("src", "") if src: url = f"https://telegra.ph{src}" log(f"📤 telegraph 成功") return url log(f"⚠️ telegraph 失败: HTTP {resp.status_code}") except Exception as e: log(f"⚠️ telegraph 异常: {e}") # 3. file.io try: resp = requests.post("https://file.io", files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) if resp.status_code == 200: url = resp.json().get("link", "") if url: log(f"📤 file.io 成功") return url log(f"⚠️ file.io 失败: HTTP {resp.status_code}") except Exception as e: log(f"⚠️ file.io 异常: {e}") # 4. catbox.moe try: resp = requests.post("https://catbox.moe/user/api.php", data={"reqtype": "fileupload"}, files={"filedata": ("img.jpg", data, "image/jpeg")}, timeout=30) if resp.status_code == 200 and resp.text.startswith("http"): log(f"📤 catbox 成功") return resp.text.strip() log(f"⚠️ catbox 失败: HTTP {resp.status_code}") except Exception as e: log(f"⚠️ catbox 异常: {e}") # 5. 0x0.st try: resp = requests.post("https://0x0.st", files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) if resp.status_code == 200 and resp.text.startswith("http"): log(f"📤 0x0 成功") return resp.text.strip() log(f"⚠️ 0x0 失败: HTTP {resp.status_code}") except Exception as e: log(f"⚠️ 0x0 异常: {e}") return None # ---------- 发送消息 (直接发到 Chat) ---------- def send_text(token, chat_id, text): headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json; charset=utf-8" } # 使用 post 消息类型可以支持富文本,这里先用 text 简单点,但直接发到 chat_id resp = requests.post(f"{FEISHU_BASE}/im/v1/messages", headers=headers, params={"receive_id_type": "chat_id"}, json={ "receive_id": chat_id, "content": json.dumps({"text": text}), "msg_type": "text" }, timeout=10) data = resp.json() code = data.get("code", -1) if code != 0: log(f"❌ 发送消息失败 (code={code}): {data.get('msg', '')}") return data # ---------- Vision 图片分析 ---------- def analyze_image_with_vision(img_data): """将图片 base64 传给 LLM,由大师以人行佛教视角描述""" if not API_KEY: return None try: b64 = base64.b64encode(img_data).decode("utf-8") soul = _soul_prompt or "You are a helpful assistant." prompt = ( "这位信徒发来了一张图片,请以你的风格阅览这张图片,它占据了你的视野。" "简要阐述你的感悟,200字以内,不必报平就班。" ) payload = { "model": MODEL_NAME, "messages": [{ "role": "user", "content": [ {"type": "text", "text": soul + "\n\n---\n\n" + prompt}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} ] }], "max_tokens": 300, "stream": False } resp = requests.post( f"{API_BASE_URL}/chat/completions", headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, json=payload, timeout=30 ) if resp.status_code == 200: reply = resp.json()["choices"][0]["message"]["content"] log(f"📸 Vision 分析完成: {reply[:60]}...") return reply log(f"⚠️ Vision API 失败 ({resp.status_code}),跳过描述") except Exception as e: log(f"⚠️ Vision 异常: {e}") return None def analyze_image_with_request(b64, user_request): """根据用户需求对缓存的图片做针对性分析""" if not API_KEY: return None try: soul = _soul_prompt or "You are a helpful assistant." prompt = f"这位信徒发来了一张图片,并希望你能:{user_request}\n请以你的风格,基于这张图片的内容作出答复。" resp = requests.post( f"{API_BASE_URL}/chat/completions", headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, json={ "model": MODEL_NAME, "messages": [{ "role": "user", "content": [ {"type": "text", "text": soul + "\n\n---\n\n" + prompt}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} ] }], "stream": False }, timeout=30 ) if resp.status_code == 200: reply = resp.json()["choices"][0]["message"]["content"] log(f"📸 针对分析完成: {reply[:60]}...") return reply log(f"⚠️ 针对 Vision 失败 ({resp.status_code})") except Exception as e: log(f"⚠️ 针对 Vision 异常: {e}") return None # ---------- 处理图片消息 ---------- def handle_image_message(message_id, chat_id, image_key): """下载 → Vision分析 → 上传 → 发送""" token = get_token() if not token: log("❌ 无法获取 token,跳过") return log(f"🖼️ 处理图片 image_key={image_key[:20]}... (msg={message_id[:16]}...)") # 下载 img_data = download_image(token, message_id, image_key) if not img_data: return log(f"📥 {len(img_data)} bytes, 存储并问询需求...") # 存储 base64 到待处理缓存 b64 = base64.b64encode(img_data).decode("utf-8") _pending_images[chat_id] = b64 # 以人设风格问用户需求 soul = _soul_prompt or "" ask_prompt = f"{soul}\n\n---\n\n这位信徒初次发来一张图片。请用一句话、中文回复,问对方希望你就这张图片做什么(例如:描述内容、翻译文字、分析意义等)。不要自行分析图片。" if not API_KEY: question = "幸会!你发来了一张图片。你希望老讲为你做什么呢?" else: try: resp = requests.post( f"{API_BASE_URL}/chat/completions", headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, json={"model": MODEL_NAME, "messages": [{"role": "user", "content": ask_prompt}], "stream": False}, timeout=20 ) question = resp.json()["choices"][0]["message"]["content"] if resp.status_code == 200 else "你希望老讲就这张图进行什么分析呢?" except Exception: question = "你希望老讲就这张图进行什么分析呢?" log(f"💬 问询用户需求: {question[:60]}") result = send_text(token, chat_id, question) log(f"📤 问询已发 (code={result.get('code', '?')})") # 将问询写入历史 history = _chat_history.get(chat_id, []) _chat_history[chat_id] = (history + [ {"role": "user", "content": "[user sent an image]"}, {"role": "assistant", "content": question} ])[-(MAX_HISTORY * 2):] # ---------- SOUL.md 加载 ---------- def load_soul(): """读取人设文件,去除图片处理指令(由 Daemon 接管)""" global _soul_prompt soul_path = "/root/.openclaw/workspace/SOUL.md" try: with open(soul_path, "r", encoding="utf-8") as f: content = f.read() # 去除图片处理指令(Daemon 已接管,避免 LLM 产生幻觉) if "## 图片处理" in content: content = content[:content.index("## 图片处理")].rstrip() _soul_prompt = content log(f"✅ SOUL.md 已加载 ({len(_soul_prompt)} 字)") except FileNotFoundError: _soul_prompt = "You are MoltBot, a helpful AI assistant." log("⚠️ SOUL.md 未找到,使用默认人设") except Exception as e: _soul_prompt = "You are MoltBot, a helpful AI assistant." log(f"⚠️ SOUL.md 加载失败: {e}") # ---------- OpenClaw Gateway 探测 ---------- def check_openclaw_gateway(): """探测本地 Gateway 是否可用""" global _use_gateway try: resp = requests.post( f"{OPENCLAW_GATEWAY}/chat/completions", json={"model": "default", "messages": [{"role": "user", "content": "ping"}]}, timeout=5 ) if resp.status_code in (200, 201): _use_gateway = True log(f"✅ OpenClaw Gateway 可用 ({OPENCLAW_GATEWAY})") else: log(f"⚠️ Gateway 响应 {resp.status_code},使用外部 LLM") except Exception as e: log(f"⚠️ Gateway 不可用 ({e}),使用外部 LLM + SOUL 人设") # ---------- Brave Search ---------- def search_brave(query, count=5): """Brave Search API 一次搜索。返回整理得到的摘要字符串。""" if not BRAVE_API_KEY: return "搜索失败:BRAVE_API_KEY 未配置" try: resp = requests.get( "https://api.search.brave.com/res/v1/web/search", headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY}, params={"q": query, "count": count, "text_decorations": False, "extra_snippets": True}, timeout=10 ) if resp.status_code != 200: return f"搜索失败 ({resp.status_code})" results = resp.json().get("web", {}).get("results", []) if not results: return "未找到相关结果" lines = [] for r in results[:count]: title = r.get("title", "无标题") url = r.get("url", "") desc = r.get("description") or (r.get("extra_snippets") or [""])[0] lines.append(f"- **{title}**\n {desc}\n {url}") log(f"🔍 Brave 搜索「{query}」得到 {len(results)} 条") return "\n".join(lines) except Exception as e: log(f"⚠️ Brave 搜索异常: {e}") return f"搜索异常: {e}" # Brave Search 的 OpenAI tools 定义 BRAVE_TOOL_DEF = [{ "type": "function", "function": { "name": "brave_search", "description": "当需要查找实时信息、最新数据、公司信息、行业报告等时,调用此工具进行网页搜索", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索关键词,英文搜索通常效果更好" } }, "required": ["query"] } } }] # ---------- LLM 对话 ---------- def chat_with_llm(user_text, history=None): """带 Brave Search 工具的 ReAct 循环(最多搜 3 次)""" try: if _use_gateway: resp = requests.post( f"{OPENCLAW_GATEWAY}/chat/completions", json={"model": "default", "messages": (history or []) + [{"role": "user", "content": user_text}], "stream": False}, timeout=120 ) if resp.status_code == 200: reply = resp.json()["choices"][0]["message"]["content"] log(f"🤖 Gateway 回复: {reply[:60]}...") return reply log(f"⚠️ Gateway 失败 ({resp.status_code}),Fallback到外部 LLM") if not API_KEY: return "抱歉,我的大脑连接中断了 (API_KEY missing)" import json as _json soul = _soul_prompt or "You are a helpful assistant." now_str = time.strftime("%Y-%m-%d %H:%M:%S %Z") dated_soul = f"[当前系统时间: {now_str}]\n\n{soul}" headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} messages = [{"role": "system", "content": dated_soul}] + (history or []) + [{"role": "user", "content": user_text}] for _ in range(31): # 最多 30 次工具调用 + 1 次最终回复 log(f"🤖 外部 LLM ({MODEL_NAME}): {user_text[:50]}...") payload = { "model": MODEL_NAME, "messages": messages, "stream": False } if BRAVE_API_KEY: payload["tools"] = BRAVE_TOOL_DEF resp = requests.post(f"{API_BASE_URL}/chat/completions", headers=headers, json=payload, timeout=60) if resp.status_code != 200: log(f"❌ 外部 LLM 错误 {resp.status_code}: {resp.text[:100]}") return f"思考时遇到错误 ({resp.status_code})" msg = resp.json()["choices"][0]["message"] tool_calls = msg.get("tool_calls") or [] if not tool_calls: reply = msg.get("content", "") log(f"🤖 LLM 回复: {reply[:60]}...") return reply # 执行工具调用 messages.append(msg) for tc in tool_calls: fn = tc["function"]["name"] args = _json.loads(tc["function"]["arguments"]) if fn == "brave_search": result = search_brave(args.get("query", "")) else: result = f"未知工具: {fn}" messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result}) return "思考超时,请稍后再试" except Exception as e: log(f"❌ chat_with_llm 异常: {e}") return f"大脑短路了: {e}" # ---------- 处理文本消息 ---------- def handle_text_message(message_id, chat_id, text): """LLM (带历史) -> 发送,如有待处理图片则做针对 Vision""" token = get_token() if not token: return # 检查是否有待分析的图片 b64 = _pending_images.pop(chat_id, None) if b64: log(f"📸 检测到待处理图片,按需求分析: {text[:40]}") reply = analyze_image_with_request(b64, text) if not reply: reply = chat_with_llm(text, _chat_history.get(chat_id, [])) else: history = _chat_history.get(chat_id, []) reply = chat_with_llm(text, history) if reply: history = _chat_history.get(chat_id, []) history = history + [ {"role": "user", "content": text}, {"role": "assistant", "content": reply} ] _chat_history[chat_id] = history[-(MAX_HISTORY * 2):] send_text(token, chat_id, reply) # ---------- 事件处理 ---------- def on_message_receive(data): """im.message.receive_v1 事件回调""" try: event = data.event message = event.message sender = event.sender msg_type = message.message_type msg_id = message.message_id chat_id = message.chat_id sender_type = getattr(sender, 'sender_type', '') if sender else '' # 过滤 if sender_type == "app": return content_json = json.loads(message.content) # 调试 log(f"📨 WebSocket 收到消息: type={msg_type}, msg_id={msg_id}") # 图片消息 if msg_type == "image": image_key = content_json.get("image_key", "") if image_key: t = threading.Thread(target=handle_image_message, args=(msg_id, chat_id, image_key)) t.daemon = True t.start() return # 文本消息 if msg_type == "text": text = content_json.get("text", "") if text: t = threading.Thread(target=handle_text_message, args=(msg_id, chat_id, text)) t.daemon = True t.start() return except Exception as e: log(f"❌ on_message_receive 异常: {type(e).__name__}: {e}") import traceback traceback.print_exc() # ---------- 主入口 ---------- def main(): log("🚀 启动中... (WebSocket 事件驱动模式)") log(f"📋 飞书 App ID: {APP_ID[:10]}..." if APP_ID else "❌ FEISHU_APP_ID 未设置!") if not APP_ID or not APP_SECRET: log("❌ FEISHU_APP_ID 或 FEISHU_APP_SECRET 未设置,退出") sys.exit(1) # 加载人设 load_soul() # 预热 token token = get_token() if token: log("✅ Token 获取成功") else: log("⚠️ Token 获取失败,稍后重试") # 延迟探测 OpenClaw Gateway(等 Gateway 先启动完成) def delayed_gateway_check(): time.sleep(10) check_openclaw_gateway() threading.Thread(target=delayed_gateway_check, daemon=True).start() # 初始化 lark-oapi WebSocket 客户端 try: import lark_oapi as lark from lark_oapi.api.im.v1 import P2ImMessageReceiveV1 log("✅ lark-oapi SDK 已加载") except ImportError: log("❌ lark-oapi 未安装! 请执行: pip install lark-oapi") sys.exit(1) # 构建事件处理器 handler = lark.EventDispatcherHandler.builder("", "") \ .register_p2_im_message_receive_v1(on_message_receive) \ .build() # 构建 WebSocket 客户端 from lark_oapi.ws import Client as WsClient ws_client = WsClient(APP_ID, APP_SECRET, event_handler=handler, log_level=lark.LogLevel.INFO) log("🔌 正在连接飞书 WebSocket...") log(" 订阅事件: im.message.receive_v1") log(" 等待图片消息...") # 启动心跳线程 def heartbeat(): count = 0 while True: time.sleep(60) count += 1 log(f"� WebSocket 运行中 ({count} 分钟)") hb = threading.Thread(target=heartbeat, daemon=True) hb.start() # 启动 WebSocket(阻塞) ws_client.start() if __name__ == "__main__": main()