| | |
| | """ |
| | 飞书图片预处理守护进程 (image_daemon.py) v3 — WebSocket 事件驱动 |
| | 通过 lark-oapi SDK 的 WebSocket 长连接实时接收消息事件, |
| | 检测图片消息后下载、上传到图床、回复 URL。 |
| | """ |
| | import os, sys, json, time, requests, threading, base64 |
| |
|
| | FEISHU_BASE = "https://open.feishu.cn/open-apis" |
| | APP_ID = os.environ.get("FEISHU_APP_ID", "") |
| | APP_SECRET = os.environ.get("FEISHU_APP_SECRET", "") |
| |
|
| | |
| | API_BASE_URL = os.environ.get("API_BASE_URL", "https://asem12345-cliproxyapi.hf.space/v1") |
| | API_KEY = os.environ.get("API_KEY", "") |
| | MODEL_NAME = os.environ.get("MODEL_NAME", "gemini-3-flash") |
| | BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "") |
| |
|
| | |
| | OPENCLAW_GATEWAY = "http://127.0.0.1:18789/v1" |
| | _use_gateway = False |
| | _soul_prompt = "" |
| |
|
| | |
| | _chat_history = {} |
| | MAX_HISTORY = 30 |
| |
|
| | |
| | _pending_images = {} |
| |
|
| | |
| | def log(msg): |
| | ts = time.strftime("%H:%M:%S") |
| | print(f"[image_daemon {ts}] {msg}", flush=True) |
| |
|
| | |
| | _token = None |
| | _token_time = 0 |
| | _token_lock = threading.Lock() |
| |
|
| | def get_token(): |
| | """获取 tenant_access_token,30分钟自动刷新""" |
| | global _token, _token_time |
| | with _token_lock: |
| | if _token and time.time() - _token_time < 1800: |
| | return _token |
| | try: |
| | resp = requests.post(f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal", |
| | json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=10) |
| | data = resp.json() |
| | if data.get("code") == 0: |
| | _token = data["tenant_access_token"] |
| | _token_time = time.time() |
| | log("🔑 Token 已刷新") |
| | return _token |
| | log(f"❌ Token 获取失败: {data}") |
| | except Exception as e: |
| | log(f"❌ Token 异常: {e}") |
| | return _token |
| |
|
| | |
| | def download_image(token, message_id, file_key): |
| | """通过消息资源 API 下载用户发送的图片""" |
| | headers = {"Authorization": f"Bearer {token}"} |
| | url = f"{FEISHU_BASE}/im/v1/messages/{message_id}/resources/{file_key}" |
| | log(f"📥 API: GET {url}?type=image") |
| | resp = requests.get(url, headers=headers, params={"type": "image"}, timeout=30) |
| | if resp.status_code == 200 and len(resp.content) > 100: |
| | log(f"✅ 下载成功: {len(resp.content)} bytes") |
| | return resp.content |
| | log(f"❌ 下载图片失败 {file_key}: HTTP {resp.status_code}, {resp.text[:200]}") |
| | return None |
| |
|
| | |
| | def upload_image(data): |
| | """上传图片到图床,多重 fallback""" |
| | |
| | try: |
| | resp = requests.post("https://tmpfiles.org/api/v1/upload", |
| | files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| | if resp.status_code == 200: |
| | url = resp.json().get("data", {}).get("url", "") |
| | if url: |
| | result = url.replace("tmpfiles.org/", "tmpfiles.org/dl/") |
| | log(f"📤 tmpfiles 成功") |
| | return result |
| | log(f"⚠️ tmpfiles 失败: HTTP {resp.status_code}") |
| | except Exception as e: |
| | log(f"⚠️ tmpfiles 异常: {e}") |
| |
|
| | |
| | try: |
| | resp = requests.post("https://telegra.ph/upload", |
| | files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| | if resp.status_code == 200: |
| | result = resp.json() |
| | if isinstance(result, list) and len(result) > 0: |
| | src = result[0].get("src", "") |
| | if src: |
| | url = f"https://telegra.ph{src}" |
| | log(f"📤 telegraph 成功") |
| | return url |
| | log(f"⚠️ telegraph 失败: HTTP {resp.status_code}") |
| | except Exception as e: |
| | log(f"⚠️ telegraph 异常: {e}") |
| |
|
| | |
| | try: |
| | resp = requests.post("https://file.io", |
| | files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| | if resp.status_code == 200: |
| | url = resp.json().get("link", "") |
| | if url: |
| | log(f"📤 file.io 成功") |
| | return url |
| | log(f"⚠️ file.io 失败: HTTP {resp.status_code}") |
| | except Exception as e: |
| | log(f"⚠️ file.io 异常: {e}") |
| |
|
| | |
| | try: |
| | resp = requests.post("https://catbox.moe/user/api.php", |
| | data={"reqtype": "fileupload"}, |
| | files={"filedata": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| | if resp.status_code == 200 and resp.text.startswith("http"): |
| | log(f"📤 catbox 成功") |
| | return resp.text.strip() |
| | log(f"⚠️ catbox 失败: HTTP {resp.status_code}") |
| | except Exception as e: |
| | log(f"⚠️ catbox 异常: {e}") |
| |
|
| | |
| | try: |
| | resp = requests.post("https://0x0.st", |
| | files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| | if resp.status_code == 200 and resp.text.startswith("http"): |
| | log(f"📤 0x0 成功") |
| | return resp.text.strip() |
| | log(f"⚠️ 0x0 失败: HTTP {resp.status_code}") |
| | except Exception as e: |
| | log(f"⚠️ 0x0 异常: {e}") |
| |
|
| | return None |
| |
|
| | |
| | def send_text(token, chat_id, text): |
| | headers = { |
| | "Authorization": f"Bearer {token}", |
| | "Content-Type": "application/json; charset=utf-8" |
| | } |
| | |
| | resp = requests.post(f"{FEISHU_BASE}/im/v1/messages", |
| | headers=headers, |
| | params={"receive_id_type": "chat_id"}, |
| | json={ |
| | "receive_id": chat_id, |
| | "content": json.dumps({"text": text}), |
| | "msg_type": "text" |
| | }, timeout=10) |
| | data = resp.json() |
| | code = data.get("code", -1) |
| | if code != 0: |
| | log(f"❌ 发送消息失败 (code={code}): {data.get('msg', '')}") |
| | return data |
| |
|
| | |
| | def analyze_image_with_vision(img_data): |
| | """将图片 base64 传给 LLM,由大师以人行佛教视角描述""" |
| | if not API_KEY: |
| | return None |
| | try: |
| | b64 = base64.b64encode(img_data).decode("utf-8") |
| | soul = _soul_prompt or "You are a helpful assistant." |
| | prompt = ( |
| | "这位信徒发来了一张图片,请以你的风格阅览这张图片,它占据了你的视野。" |
| | "简要阐述你的感悟,200字以内,不必报平就班。" |
| | ) |
| | payload = { |
| | "model": MODEL_NAME, |
| | "messages": [{ |
| | "role": "user", |
| | "content": [ |
| | {"type": "text", "text": soul + "\n\n---\n\n" + prompt}, |
| | {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} |
| | ] |
| | }], |
| | "max_tokens": 300, |
| | "stream": False |
| | } |
| | resp = requests.post( |
| | f"{API_BASE_URL}/chat/completions", |
| | headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, |
| | json=payload, timeout=30 |
| | ) |
| | if resp.status_code == 200: |
| | reply = resp.json()["choices"][0]["message"]["content"] |
| | log(f"📸 Vision 分析完成: {reply[:60]}...") |
| | return reply |
| | log(f"⚠️ Vision API 失败 ({resp.status_code}),跳过描述") |
| | except Exception as e: |
| | log(f"⚠️ Vision 异常: {e}") |
| | return None |
| |
|
| | def analyze_image_with_request(b64, user_request): |
| | """根据用户需求对缓存的图片做针对性分析""" |
| | if not API_KEY: |
| | return None |
| | try: |
| | soul = _soul_prompt or "You are a helpful assistant." |
| | prompt = f"这位信徒发来了一张图片,并希望你能:{user_request}\n请以你的风格,基于这张图片的内容作出答复。" |
| | resp = requests.post( |
| | f"{API_BASE_URL}/chat/completions", |
| | headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, |
| | json={ |
| | "model": MODEL_NAME, |
| | "messages": [{ |
| | "role": "user", |
| | "content": [ |
| | {"type": "text", "text": soul + "\n\n---\n\n" + prompt}, |
| | {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} |
| | ] |
| | }], |
| | "stream": False |
| | }, |
| | timeout=30 |
| | ) |
| | if resp.status_code == 200: |
| | reply = resp.json()["choices"][0]["message"]["content"] |
| | log(f"📸 针对分析完成: {reply[:60]}...") |
| | return reply |
| | log(f"⚠️ 针对 Vision 失败 ({resp.status_code})") |
| | except Exception as e: |
| | log(f"⚠️ 针对 Vision 异常: {e}") |
| | return None |
| |
|
| | |
| | def handle_image_message(message_id, chat_id, image_key): |
| | """下载 → Vision分析 → 上传 → 发送""" |
| | token = get_token() |
| | if not token: |
| | log("❌ 无法获取 token,跳过") |
| | return |
| |
|
| | log(f"🖼️ 处理图片 image_key={image_key[:20]}... (msg={message_id[:16]}...)") |
| |
|
| | |
| | img_data = download_image(token, message_id, image_key) |
| | if not img_data: |
| | return |
| |
|
| | log(f"📥 {len(img_data)} bytes, 存储并问询需求...") |
| |
|
| | |
| | b64 = base64.b64encode(img_data).decode("utf-8") |
| | _pending_images[chat_id] = b64 |
| |
|
| | |
| | soul = _soul_prompt or "" |
| | ask_prompt = f"{soul}\n\n---\n\n这位信徒初次发来一张图片。请用一句话、中文回复,问对方希望你就这张图片做什么(例如:描述内容、翻译文字、分析意义等)。不要自行分析图片。" |
| | if not API_KEY: |
| | question = "幸会!你发来了一张图片。你希望老讲为你做什么呢?" |
| | else: |
| | try: |
| | resp = requests.post( |
| | f"{API_BASE_URL}/chat/completions", |
| | headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, |
| | json={"model": MODEL_NAME, "messages": [{"role": "user", "content": ask_prompt}], "stream": False}, |
| | timeout=20 |
| | ) |
| | question = resp.json()["choices"][0]["message"]["content"] if resp.status_code == 200 else "你希望老讲就这张图进行什么分析呢?" |
| | except Exception: |
| | question = "你希望老讲就这张图进行什么分析呢?" |
| | log(f"💬 问询用户需求: {question[:60]}") |
| | result = send_text(token, chat_id, question) |
| | log(f"📤 问询已发 (code={result.get('code', '?')})") |
| | |
| | history = _chat_history.get(chat_id, []) |
| | _chat_history[chat_id] = (history + [ |
| | {"role": "user", "content": "[user sent an image]"}, |
| | {"role": "assistant", "content": question} |
| | ])[-(MAX_HISTORY * 2):] |
| |
|
| | |
| | def load_soul(): |
| | """读取人设文件,去除图片处理指令(由 Daemon 接管)""" |
| | global _soul_prompt |
| | soul_path = "/root/.openclaw/workspace/SOUL.md" |
| | try: |
| | with open(soul_path, "r", encoding="utf-8") as f: |
| | content = f.read() |
| | |
| | if "## 图片处理" in content: |
| | content = content[:content.index("## 图片处理")].rstrip() |
| | _soul_prompt = content |
| | log(f"✅ SOUL.md 已加载 ({len(_soul_prompt)} 字)") |
| | except FileNotFoundError: |
| | _soul_prompt = "You are MoltBot, a helpful AI assistant." |
| | log("⚠️ SOUL.md 未找到,使用默认人设") |
| | except Exception as e: |
| | _soul_prompt = "You are MoltBot, a helpful AI assistant." |
| | log(f"⚠️ SOUL.md 加载失败: {e}") |
| |
|
| | |
| | def check_openclaw_gateway(): |
| | """探测本地 Gateway 是否可用""" |
| | global _use_gateway |
| | try: |
| | resp = requests.post( |
| | f"{OPENCLAW_GATEWAY}/chat/completions", |
| | json={"model": "default", "messages": [{"role": "user", "content": "ping"}]}, |
| | timeout=5 |
| | ) |
| | if resp.status_code in (200, 201): |
| | _use_gateway = True |
| | log(f"✅ OpenClaw Gateway 可用 ({OPENCLAW_GATEWAY})") |
| | else: |
| | log(f"⚠️ Gateway 响应 {resp.status_code},使用外部 LLM") |
| | except Exception as e: |
| | log(f"⚠️ Gateway 不可用 ({e}),使用外部 LLM + SOUL 人设") |
| |
|
| | |
| | def search_brave(query, count=5): |
| | """Brave Search API 一次搜索。返回整理得到的摘要字符串。""" |
| | if not BRAVE_API_KEY: |
| | return "搜索失败:BRAVE_API_KEY 未配置" |
| | try: |
| | resp = requests.get( |
| | "https://api.search.brave.com/res/v1/web/search", |
| | headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY}, |
| | params={"q": query, "count": count, "text_decorations": False, "extra_snippets": True}, |
| | timeout=10 |
| | ) |
| | if resp.status_code != 200: |
| | return f"搜索失败 ({resp.status_code})" |
| | results = resp.json().get("web", {}).get("results", []) |
| | if not results: |
| | return "未找到相关结果" |
| | lines = [] |
| | for r in results[:count]: |
| | title = r.get("title", "无标题") |
| | url = r.get("url", "") |
| | desc = r.get("description") or (r.get("extra_snippets") or [""])[0] |
| | lines.append(f"- **{title}**\n {desc}\n {url}") |
| | log(f"🔍 Brave 搜索「{query}」得到 {len(results)} 条") |
| | return "\n".join(lines) |
| | except Exception as e: |
| | log(f"⚠️ Brave 搜索异常: {e}") |
| | return f"搜索异常: {e}" |
| |
|
| | |
| | BRAVE_TOOL_DEF = [{ |
| | "type": "function", |
| | "function": { |
| | "name": "brave_search", |
| | "description": "当需要查找实时信息、最新数据、公司信息、行业报告等时,调用此工具进行网页搜索", |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "query": { |
| | "type": "string", |
| | "description": "搜索关键词,英文搜索通常效果更好" |
| | } |
| | }, |
| | "required": ["query"] |
| | } |
| | } |
| | }] |
| |
|
| | |
| | def chat_with_llm(user_text, history=None): |
| | """带 Brave Search 工具的 ReAct 循环(最多搜 3 次)""" |
| | try: |
| | if _use_gateway: |
| | resp = requests.post( |
| | f"{OPENCLAW_GATEWAY}/chat/completions", |
| | json={"model": "default", "messages": (history or []) + [{"role": "user", "content": user_text}], "stream": False}, |
| | timeout=120 |
| | ) |
| | if resp.status_code == 200: |
| | reply = resp.json()["choices"][0]["message"]["content"] |
| | log(f"🤖 Gateway 回复: {reply[:60]}...") |
| | return reply |
| | log(f"⚠️ Gateway 失败 ({resp.status_code}),Fallback到外部 LLM") |
| |
|
| | if not API_KEY: |
| | return "抱歉,我的大脑连接中断了 (API_KEY missing)" |
| |
|
| | import json as _json |
| | soul = _soul_prompt or "You are a helpful assistant." |
| | now_str = time.strftime("%Y-%m-%d %H:%M:%S %Z") |
| | dated_soul = f"[当前系统时间: {now_str}]\n\n{soul}" |
| | headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} |
| | messages = [{"role": "system", "content": dated_soul}] + (history or []) + [{"role": "user", "content": user_text}] |
| |
|
| |
|
| | for _ in range(31): |
| | log(f"🤖 外部 LLM ({MODEL_NAME}): {user_text[:50]}...") |
| | payload = { |
| | "model": MODEL_NAME, |
| | "messages": messages, |
| | "stream": False |
| | } |
| | if BRAVE_API_KEY: |
| | payload["tools"] = BRAVE_TOOL_DEF |
| | resp = requests.post(f"{API_BASE_URL}/chat/completions", headers=headers, json=payload, timeout=60) |
| | if resp.status_code != 200: |
| | log(f"❌ 外部 LLM 错误 {resp.status_code}: {resp.text[:100]}") |
| | return f"思考时遇到错误 ({resp.status_code})" |
| |
|
| | msg = resp.json()["choices"][0]["message"] |
| | tool_calls = msg.get("tool_calls") or [] |
| |
|
| | if not tool_calls: |
| | reply = msg.get("content", "") |
| | log(f"🤖 LLM 回复: {reply[:60]}...") |
| | return reply |
| |
|
| | |
| | messages.append(msg) |
| | for tc in tool_calls: |
| | fn = tc["function"]["name"] |
| | args = _json.loads(tc["function"]["arguments"]) |
| | if fn == "brave_search": |
| | result = search_brave(args.get("query", "")) |
| | else: |
| | result = f"未知工具: {fn}" |
| | messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result}) |
| |
|
| | return "思考超时,请稍后再试" |
| | except Exception as e: |
| | log(f"❌ chat_with_llm 异常: {e}") |
| | return f"大脑短路了: {e}" |
| |
|
| |
|
| | |
| | def handle_text_message(message_id, chat_id, text): |
| | """LLM (带历史) -> 发送,如有待处理图片则做针对 Vision""" |
| | token = get_token() |
| | if not token: |
| | return |
| |
|
| | |
| | b64 = _pending_images.pop(chat_id, None) |
| | if b64: |
| | log(f"📸 检测到待处理图片,按需求分析: {text[:40]}") |
| | reply = analyze_image_with_request(b64, text) |
| | if not reply: |
| | reply = chat_with_llm(text, _chat_history.get(chat_id, [])) |
| | else: |
| | history = _chat_history.get(chat_id, []) |
| | reply = chat_with_llm(text, history) |
| |
|
| | if reply: |
| | history = _chat_history.get(chat_id, []) |
| | history = history + [ |
| | {"role": "user", "content": text}, |
| | {"role": "assistant", "content": reply} |
| | ] |
| | _chat_history[chat_id] = history[-(MAX_HISTORY * 2):] |
| | send_text(token, chat_id, reply) |
| |
|
| | |
| | def on_message_receive(data): |
| | """im.message.receive_v1 事件回调""" |
| | try: |
| | event = data.event |
| | message = event.message |
| | sender = event.sender |
| | |
| | msg_type = message.message_type |
| | msg_id = message.message_id |
| | chat_id = message.chat_id |
| | sender_type = getattr(sender, 'sender_type', '') if sender else '' |
| |
|
| | |
| | if sender_type == "app": |
| | return |
| | |
| | content_json = json.loads(message.content) |
| |
|
| | |
| | log(f"📨 WebSocket 收到消息: type={msg_type}, msg_id={msg_id}") |
| |
|
| | |
| | if msg_type == "image": |
| | image_key = content_json.get("image_key", "") |
| | if image_key: |
| | t = threading.Thread(target=handle_image_message, args=(msg_id, chat_id, image_key)) |
| | t.daemon = True |
| | t.start() |
| | return |
| |
|
| | |
| | if msg_type == "text": |
| | text = content_json.get("text", "") |
| | if text: |
| | t = threading.Thread(target=handle_text_message, args=(msg_id, chat_id, text)) |
| | t.daemon = True |
| | t.start() |
| | return |
| |
|
| | except Exception as e: |
| | log(f"❌ on_message_receive 异常: {type(e).__name__}: {e}") |
| | import traceback |
| | traceback.print_exc() |
| |
|
| | |
| | def main(): |
| | log("🚀 启动中... (WebSocket 事件驱动模式)") |
| | log(f"📋 飞书 App ID: {APP_ID[:10]}..." if APP_ID else "❌ FEISHU_APP_ID 未设置!") |
| |
|
| | if not APP_ID or not APP_SECRET: |
| | log("❌ FEISHU_APP_ID 或 FEISHU_APP_SECRET 未设置,退出") |
| | sys.exit(1) |
| |
|
| | |
| | load_soul() |
| |
|
| | |
| | token = get_token() |
| | if token: |
| | log("✅ Token 获取成功") |
| | else: |
| | log("⚠️ Token 获取失败,稍后重试") |
| |
|
| | |
| | def delayed_gateway_check(): |
| | time.sleep(10) |
| | check_openclaw_gateway() |
| | threading.Thread(target=delayed_gateway_check, daemon=True).start() |
| |
|
| | |
| | try: |
| | import lark_oapi as lark |
| | from lark_oapi.api.im.v1 import P2ImMessageReceiveV1 |
| | log("✅ lark-oapi SDK 已加载") |
| | except ImportError: |
| | log("❌ lark-oapi 未安装! 请执行: pip install lark-oapi") |
| | sys.exit(1) |
| |
|
| | |
| | handler = lark.EventDispatcherHandler.builder("", "") \ |
| | .register_p2_im_message_receive_v1(on_message_receive) \ |
| | .build() |
| |
|
| | |
| | from lark_oapi.ws import Client as WsClient |
| | ws_client = WsClient(APP_ID, APP_SECRET, event_handler=handler, log_level=lark.LogLevel.INFO) |
| |
|
| | log("🔌 正在连接飞书 WebSocket...") |
| | log(" 订阅事件: im.message.receive_v1") |
| | log(" 等待图片消息...") |
| |
|
| | |
| | def heartbeat(): |
| | count = 0 |
| | while True: |
| | time.sleep(60) |
| | count += 1 |
| | log(f"� WebSocket 运行中 ({count} 分钟)") |
| | hb = threading.Thread(target=heartbeat, daemon=True) |
| | hb.start() |
| |
|
| | |
| | ws_client.start() |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|