asemxin commited on
Commit
f7fadc5
·
1 Parent(s): 1a2d4fe

feat: image_daemon 改用飞书 WebSocket 事件订阅,实时接收图片消息

Browse files
Files changed (2) hide show
  1. Dockerfile +1 -1
  2. image_daemon.py +150 -215
Dockerfile CHANGED
@@ -6,7 +6,7 @@ RUN apt-get update && apt-get install -y \
6
  && rm -rf /var/lib/apt/lists/*
7
 
8
  # Python 依赖
9
- RUN pip3 install flask psutil requests --break-system-packages
10
 
11
  # 安装 OpenClaw
12
  RUN npm install -g openclaw@latest
 
6
  && rm -rf /var/lib/apt/lists/*
7
 
8
  # Python 依赖
9
+ RUN pip3 install flask psutil requests lark-oapi --break-system-packages
10
 
11
  # 安装 OpenClaw
12
  RUN npm install -g openclaw@latest
image_daemon.py CHANGED
@@ -1,115 +1,48 @@
1
  #!/usr/bin/env python3
2
  """
3
- 飞书图片预处理守护进程 (image_daemon.py) v2
4
- 后台运行,自动检测飞书聊天中新图片消息,下载并上传到图床,
5
- 然后以文本消息回复图片 URL,让 OpenClaw agent 直接看到可访问的 URL
6
  """
7
- import os, sys, json, time, requests
8
 
9
  FEISHU_BASE = "https://open.feishu.cn/open-apis"
10
- POLL_INTERVAL = int(os.environ.get("IMAGE_POLL_INTERVAL", "5"))
11
- processed_file = "/tmp/image_daemon_processed.json"
12
 
 
13
  def log(msg):
14
  ts = time.strftime("%H:%M:%S")
15
  print(f"[image_daemon {ts}] {msg}", flush=True)
16
 
17
- def load_processed():
18
- try:
19
- with open(processed_file) as f:
20
- return set(json.load(f))
21
- except:
22
- return set()
23
-
24
- def save_processed(s):
25
- with open(processed_file, "w") as f:
26
- json.dump(list(s)[-500:], f)
27
-
28
- def get_tenant_token():
29
- app_id = os.environ.get("FEISHU_APP_ID")
30
- app_secret = os.environ.get("FEISHU_APP_SECRET")
31
- if not app_id or not app_secret:
32
- log(f" 环境变量缺失: FEISHU_APP_ID={'set' if app_id else 'MISSING'}, FEISHU_APP_SECRET={'set' if app_secret else 'MISSING'}")
33
- return None
34
- resp = requests.post(f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal",
35
- json={"app_id": app_id, "app_secret": app_secret}, timeout=10)
36
- data = resp.json()
37
- if data.get("code") != 0:
38
- log(f"❌ Token 获取失败: {data}")
39
- return None
40
- return data["tenant_access_token"]
41
-
42
- def get_bot_chats(token):
43
- """获取 bot 参与的所有聊天"""
44
- try:
45
- headers = {"Authorization": f"Bearer {token}"}
46
- resp = requests.get(f"{FEISHU_BASE}/im/v1/chats",
47
- headers=headers, params={"page_size": 20}, timeout=10)
48
- data = resp.json()
49
- code = data.get("code", -1)
50
- if code != 0:
51
- msg = data.get("msg", "unknown")
52
- log(f"❌ 获取聊天列表失败 (code={code}): {msg}")
53
- log(f" 需要权限: im:chat / im:chat:readonly")
54
- return []
55
- items = data.get("data", {}).get("items", [])
56
- return [c["chat_id"] for c in items]
57
- except Exception as e:
58
- log(f"❌ get_bot_chats 异常: {type(e).__name__}: {e}")
59
- return []
60
-
61
- def get_recent_messages(token, chat_id, limit=50):
62
- """获取聊天中最近的消息(不用 start_time,飞书 API 对新消息有索引延迟)"""
63
- try:
64
- headers = {"Authorization": f"Bearer {token}"}
65
- resp = requests.get(f"{FEISHU_BASE}/im/v1/messages",
66
- headers=headers,
67
- params={
68
- "container_id_type": "chat",
69
- "container_id": chat_id,
70
- "sort_type": "ByCreateTimeDesc",
71
- "page_size": limit
72
- }, timeout=10)
73
- data = resp.json()
74
- code = data.get("code", -1)
75
- if code != 0:
76
- msg = data.get("msg", "unknown")
77
- log(f"❌ 获取消息失败 chat={chat_id} (code={code}): {msg}")
78
- return []
79
- return data.get("data", {}).get("items", [])
80
- except Exception as e:
81
- log(f"❌ get_recent_messages 异常: {type(e).__name__}: {e}")
82
- return []
83
-
84
- def extract_image_keys(msg):
85
- """从消息中提取所有 image_key"""
86
- msg_type = msg.get("msg_type", "")
87
- body_str = msg.get("body", {}).get("content", "{}")
88
- try:
89
- content = json.loads(body_str)
90
- except:
91
- return []
92
-
93
- keys = []
94
- if msg_type == "image":
95
- k = content.get("image_key", "")
96
- if k:
97
- keys.append(k)
98
- elif msg_type == "post":
99
- for lang in ["zh_cn", "en_us"]:
100
- lang_content = content.get(lang, {})
101
- if isinstance(lang_content, dict):
102
- for row in lang_content.get("content", []):
103
- if isinstance(row, list):
104
- for elem in row:
105
- if isinstance(elem, dict) and elem.get("tag") == "img":
106
- k = elem.get("image_key", "")
107
- if k:
108
- keys.append(k)
109
- return keys
110
 
 
111
  def download_image(token, message_id, file_key):
112
- """通过 获取消息中的资源文件 API 下载用户发送的图片"""
113
  headers = {"Authorization": f"Bearer {token}"}
114
  url = f"{FEISHU_BASE}/im/v1/messages/{message_id}/resources/{file_key}"
115
  log(f"📥 API: GET {url}?type=image")
@@ -118,12 +51,12 @@ def download_image(token, message_id, file_key):
118
  log(f"✅ 下载成功: {len(resp.content)} bytes")
119
  return resp.content
120
  log(f"❌ 下载图片失败 {file_key}: HTTP {resp.status_code}, {resp.text[:200]}")
121
- log(f" 需要权限: im:resource")
122
  return None
123
 
 
124
  def upload_image(data):
125
- """上传图片到图床,按可用性排序,多重 fallback"""
126
- # 1. tmpfiles.org — 当前最稳定
127
  try:
128
  resp = requests.post("https://tmpfiles.org/api/v1/upload",
129
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
@@ -137,9 +70,8 @@ def upload_image(data):
137
  except Exception as e:
138
  log(f"⚠️ tmpfiles 异常: {e}")
139
 
140
- # 2. Telegraph (Telegra.ph) — 无需 API key
141
  try:
142
- # Telegraph 的图片上传接口
143
  resp = requests.post("https://telegra.ph/upload",
144
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
145
  if resp.status_code == 200:
@@ -154,20 +86,20 @@ def upload_image(data):
154
  except Exception as e:
155
  log(f"⚠️ telegraph 异常: {e}")
156
 
157
- # 3. file.io — 一次性链接但可用性高
158
  try:
159
  resp = requests.post("https://file.io",
160
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
161
  if resp.status_code == 200:
162
  url = resp.json().get("link", "")
163
  if url:
164
- log(f"📤 file.io 成功 (注意: 链接仅可下载一次)")
165
  return url
166
  log(f"⚠️ file.io 失败: HTTP {resp.status_code}")
167
  except Exception as e:
168
  log(f"⚠️ file.io 异常: {e}")
169
 
170
- # 4. catbox.moe — 近期返回 412,保留作为后备
171
  try:
172
  resp = requests.post("https://catbox.moe/user/api.php",
173
  data={"reqtype": "fileupload"},
@@ -179,7 +111,7 @@ def upload_image(data):
179
  except Exception as e:
180
  log(f"⚠️ catbox 异常: {e}")
181
 
182
- # 5. 0x0.st — 近期返回 403,保留作为最后手段
183
  try:
184
  resp = requests.post("https://0x0.st",
185
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
@@ -190,10 +122,10 @@ def upload_image(data):
190
  except Exception as e:
191
  log(f"⚠️ 0x0 异常: {e}")
192
 
193
-
194
  return None
195
 
196
- def reply_text(token, chat_id, msg_id, text):
 
197
  headers = {
198
  "Authorization": f"Bearer {token}",
199
  "Content-Type": "application/json; charset=utf-8"
@@ -208,122 +140,125 @@ def reply_text(token, chat_id, msg_id, text):
208
  code = data.get("code", -1)
209
  if code != 0:
210
  log(f"❌ 回复消息失败 (code={code}): {data.get('msg', '')}")
211
- log(f" 需要权限: im:message / im:message:create_as_bot")
212
  return data
213
 
214
- def process_message(token, chat_id, msg):
215
- msg_id = msg.get("message_id", "")
216
- sender_type = msg.get("sender", {}).get("sender_type", "")
217
- if sender_type == "app":
 
 
218
  return
219
 
220
- image_keys = extract_image_keys(msg)
221
- if not image_keys:
222
- return
223
 
224
- log(f"🖼️ 发现 {len(image_keys)} 张图片 (msg_id={msg_id[:16]}...)")
225
- urls = []
226
- for key in image_keys[:3]:
227
- log(f"📥 下载图片 {key} (msg={msg_id[:16]}...)")
228
- data = download_image(token, msg_id, key)
229
- if data:
230
- log(f"📥 {len(data)} bytes, 上传中...")
231
- url = upload_image(data)
232
- if url:
233
- urls.append(url)
234
- log(f"✅ {url}")
235
- else:
236
- path = f"/tmp/{key}.jpg"
237
- with open(path, "wb") as f:
238
- f.write(data)
239
- log(f"⚠️ 图��全部失败,本地保存: {path}")
240
-
241
- if urls:
242
- url_text = "\n".join(urls)
243
- reply = f"[系统] 图片已自动处理,可通过以下链接查看:\n{url_text}"
244
- result = reply_text(token, chat_id, msg_id, reply)
245
- log(f"📤 已回复 (code={result.get('code', '?')})")
246
-
247
- def main():
248
- log("🚀 启动中...")
249
-
250
- # 环境诊断
251
- app_id = os.environ.get("FEISHU_APP_ID", "")
252
- log(f"📋 飞书 App ID: {app_id[:10]}..." if app_id else "❌ FEISHU_APP_ID 未设置!")
253
-
254
- processed = load_processed()
255
- token = None
256
- token_time = 0
257
- cycle = 0
258
-
259
- while True:
260
- try:
261
- cycle += 1
262
- # 每 30 分钟刷新一次 token
263
- if not token or time.time() - token_time > 1800:
264
- token = get_tenant_token()
265
- if not token:
266
- log("⏳ Token 获取失败,60 秒后重试...")
267
- time.sleep(60)
268
- continue
269
- token_time = time.time()
270
- log("🔑 Token 已刷新")
271
-
272
- chats = get_bot_chats(token)
273
-
274
- # 每轮打印心跳(每 20 轮详细,其余简短)
275
- if cycle == 1 or cycle % 20 == 0:
276
- log(f"💓 轮询中... {len(chats)} 个聊天, 已处理 {len(processed)} 条 (第 {cycle} 轮)")
277
- else:
278
- log(f"💓 #{cycle} ok, {len(chats)} chats")
279
-
280
- if not chats and cycle == 1:
281
- log("⚠️ 未找到任何聊天!")
282
-
283
- new_imgs = 0
284
- for chat_id in chats:
285
- messages = get_recent_messages(token, chat_id)
286
-
287
- # 首轮打印消息类型统计
288
- if cycle == 1:
289
- type_counts = {}
290
- for m in messages:
291
- t = m.get("msg_type", "unknown")
292
- type_counts[t] = type_counts.get(t, 0) + 1
293
- log(f"📊 聊天 {chat_id[:12]}... 共 {len(messages)} 条")
294
- log(f" 消息类型分布: {type_counts}")
295
-
296
- new_in_chat = 0
297
- for msg in messages:
298
- msg_id = msg.get("message_id", "")
299
- if msg_id in processed:
300
- continue
301
- processed.add(msg_id)
302
- new_in_chat += 1
303
 
304
- msg_type = msg.get("msg_type", "")
305
- sender_type = msg.get("sender", {}).get("sender_type", "")
306
- body_preview = str(msg.get("body", {}).get("content", ""))[:100]
307
 
308
- log(f"📨 新消息: type={msg_type}, sender={sender_type}, preview={body_preview}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
 
310
- # 处理非文本消息(图片、富文本等)
311
- if msg_type != "text" and sender_type != "app":
312
- process_message(token, chat_id, msg)
313
- new_imgs += 1
314
 
315
- if new_in_chat > 0 or cycle % 20 == 0:
316
- log(f"📊 chat={chat_id[:12]}... 拉取 {len(messages)} 条, 新 {new_in_chat} 条, 已知 {len(processed)} 条")
 
 
317
 
318
- if new_imgs > 0:
319
- save_processed(processed)
 
320
 
321
- except Exception as e:
322
- log(f"❌ 错误: {type(e).__name__}: {e}")
323
- import traceback
324
- traceback.print_exc()
 
 
325
 
326
- time.sleep(POLL_INTERVAL)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
327
 
328
  if __name__ == "__main__":
329
  main()
 
1
  #!/usr/bin/env python3
2
  """
3
+ 飞书图片预处理守护进程 (image_daemon.py) v3 — WebSocket 事件驱动
4
+ 通过 lark-oapi SDK WebSocket 长连接实时接收消息事件
5
+ 检测图片消息后下载、上传到图床、回复 URL。
6
  """
7
+ import os, sys, json, time, requests, threading
8
 
9
  FEISHU_BASE = "https://open.feishu.cn/open-apis"
10
+ APP_ID = os.environ.get("FEISHU_APP_ID", "")
11
+ APP_SECRET = os.environ.get("FEISHU_APP_SECRET", "")
12
 
13
+ # ---------- 日志 ----------
14
  def log(msg):
15
  ts = time.strftime("%H:%M:%S")
16
  print(f"[image_daemon {ts}] {msg}", flush=True)
17
 
18
+ # ---------- Token 管理 ----------
19
+ _token = None
20
+ _token_time = 0
21
+ _token_lock = threading.Lock()
22
+
23
+ def get_token():
24
+ """获取 tenant_access_token,30分钟自动刷新"""
25
+ global _token, _token_time
26
+ with _token_lock:
27
+ if _token and time.time() - _token_time < 1800:
28
+ return _token
29
+ try:
30
+ resp = requests.post(f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal",
31
+ json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=10)
32
+ data = resp.json()
33
+ if data.get("code") == 0:
34
+ _token = data["tenant_access_token"]
35
+ _token_time = time.time()
36
+ log("🔑 Token 已刷新")
37
+ return _token
38
+ log(f" Token 获取失败: {data}")
39
+ except Exception as e:
40
+ log(f"❌ Token 异常: {e}")
41
+ return _token # 返回旧的,总比没有好
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
 
43
+ # ---------- 图片下载 ----------
44
  def download_image(token, message_id, file_key):
45
+ """通过消息资源 API 下载用户发送的图片"""
46
  headers = {"Authorization": f"Bearer {token}"}
47
  url = f"{FEISHU_BASE}/im/v1/messages/{message_id}/resources/{file_key}"
48
  log(f"📥 API: GET {url}?type=image")
 
51
  log(f"✅ 下载成功: {len(resp.content)} bytes")
52
  return resp.content
53
  log(f"❌ 下载图片失败 {file_key}: HTTP {resp.status_code}, {resp.text[:200]}")
 
54
  return None
55
 
56
+ # ---------- 图片上传(多重 fallback) ----------
57
  def upload_image(data):
58
+ """上传图片到图床,多重 fallback"""
59
+ # 1. tmpfiles.org
60
  try:
61
  resp = requests.post("https://tmpfiles.org/api/v1/upload",
62
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
 
70
  except Exception as e:
71
  log(f"⚠️ tmpfiles 异常: {e}")
72
 
73
+ # 2. Telegraph
74
  try:
 
75
  resp = requests.post("https://telegra.ph/upload",
76
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
77
  if resp.status_code == 200:
 
86
  except Exception as e:
87
  log(f"⚠️ telegraph 异常: {e}")
88
 
89
+ # 3. file.io
90
  try:
91
  resp = requests.post("https://file.io",
92
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
93
  if resp.status_code == 200:
94
  url = resp.json().get("link", "")
95
  if url:
96
+ log(f"📤 file.io 成功")
97
  return url
98
  log(f"⚠️ file.io 失败: HTTP {resp.status_code}")
99
  except Exception as e:
100
  log(f"⚠️ file.io 异常: {e}")
101
 
102
+ # 4. catbox.moe
103
  try:
104
  resp = requests.post("https://catbox.moe/user/api.php",
105
  data={"reqtype": "fileupload"},
 
111
  except Exception as e:
112
  log(f"⚠️ catbox 异常: {e}")
113
 
114
+ # 5. 0x0.st
115
  try:
116
  resp = requests.post("https://0x0.st",
117
  files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30)
 
122
  except Exception as e:
123
  log(f"⚠️ 0x0 异常: {e}")
124
 
 
125
  return None
126
 
127
+ # ---------- 回复消息 ----------
128
+ def reply_text(token, msg_id, text):
129
  headers = {
130
  "Authorization": f"Bearer {token}",
131
  "Content-Type": "application/json; charset=utf-8"
 
140
  code = data.get("code", -1)
141
  if code != 0:
142
  log(f"❌ 回复消息失败 (code={code}): {data.get('msg', '')}")
 
143
  return data
144
 
145
+ # ---------- 处理图片消息 ----------
146
+ def handle_image_message(message_id, chat_id, image_key):
147
+ """下载 → 上传 → 回复"""
148
+ token = get_token()
149
+ if not token:
150
+ log("❌ 无法获取 token,跳过")
151
  return
152
 
153
+ log(f"🖼️ 处理图片 image_key={image_key[:20]}... (msg={message_id[:16]}...)")
 
 
154
 
155
+ # 下载
156
+ img_data = download_image(token, message_id, image_key)
157
+ if not img_data:
158
+ return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
 
160
+ log(f"📥 {len(img_data)} bytes, 上传中...")
 
 
161
 
162
+ # 上传
163
+ url = upload_image(img_data)
164
+ if url:
165
+ log(f"✅ {url}")
166
+ reply = f"[系统] 图片已自动处理,可通过以下链接查看:\n{url}"
167
+ result = reply_text(token, message_id, reply)
168
+ log(f"📤 已回复 (code={result.get('code', '?')})")
169
+ else:
170
+ path = f"/tmp/{image_key[:30]}.jpg"
171
+ with open(path, "wb") as f:
172
+ f.write(img_data)
173
+ log(f"⚠️ 图床全部失败,本地保存: {path}")
174
+
175
+ # ---------- 事件处理 ----------
176
+ def on_message_receive(data):
177
+ """im.message.receive_v1 事件回调"""
178
+ try:
179
+ event = data.event
180
+ message = event.message
181
+ sender = event.sender
182
+
183
+ msg_id = message.message_id
184
+ chat_id = message.chat_id
185
+ msg_type = message.message_type
186
+ sender_type = getattr(sender, 'sender_type', '') if sender else ''
187
+
188
+ # 只处理用户发的图片
189
+ if sender_type == "app":
190
+ return
191
+ if msg_type != "image":
192
+ return
193
+
194
+ content = json.loads(message.content)
195
+ image_key = content.get("image_key", "")
196
+ if not image_key:
197
+ log(f"⚠️ 图片消息但无 image_key: {message.content}")
198
+ return
199
+
200
+ log(f"📨 实时收到图片消息: msg_id={msg_id[:16]}..., image_key={image_key[:20]}...")
201
+
202
+ # 在新线程中处理,避免阻塞事件循环
203
+ t = threading.Thread(target=handle_image_message, args=(msg_id, chat_id, image_key))
204
+ t.daemon = True
205
+ t.start()
206
 
207
+ except Exception as e:
208
+ log(f" on_message_receive 异常: {type(e).__name__}: {e}")
209
+ import traceback
210
+ traceback.print_exc()
211
 
212
+ # ---------- 主入口 ----------
213
+ def main():
214
+ log("🚀 启动中... (WebSocket 事件驱动模式)")
215
+ log(f"📋 飞书 App ID: {APP_ID[:10]}..." if APP_ID else "❌ FEISHU_APP_ID 未设置!")
216
 
217
+ if not APP_ID or not APP_SECRET:
218
+ log("❌ FEISHU_APP_ID 或 FEISHU_APP_SECRET 未设置,退出")
219
+ sys.exit(1)
220
 
221
+ # 预热 token
222
+ token = get_token()
223
+ if token:
224
+ log("✅ Token 获取成功")
225
+ else:
226
+ log("⚠️ Token 获取失败,稍后重试")
227
 
228
+ # 初始化 lark-oapi WebSocket 客户端
229
+ try:
230
+ import lark_oapi as lark
231
+ from lark_oapi.api.im.v1 import P2ImMessageReceiveV1
232
+ log("✅ lark-oapi SDK 已加载")
233
+ except ImportError:
234
+ log("❌ lark-oapi 未安装! 请执行: pip install lark-oapi")
235
+ sys.exit(1)
236
+
237
+ # 构建事件处理器
238
+ handler = lark.EventDispatcherHandler.builder("", "") \
239
+ .register_p2_im_message_receive_v1(on_message_receive) \
240
+ .build()
241
+
242
+ # 构建 WebSocket 客户端
243
+ from lark_oapi.ws import Client as WsClient
244
+ ws_client = WsClient(APP_ID, APP_SECRET, event_handler=handler, log_level=lark.LogLevel.INFO)
245
+
246
+ log("🔌 正在连接飞书 WebSocket...")
247
+ log(" 订阅事件: im.message.receive_v1")
248
+ log(" 等待图片消息...")
249
+
250
+ # 启动心跳线程
251
+ def heartbeat():
252
+ count = 0
253
+ while True:
254
+ time.sleep(60)
255
+ count += 1
256
+ log(f"� WebSocket 运行中 ({count} 分钟)")
257
+ hb = threading.Thread(target=heartbeat, daemon=True)
258
+ hb.start()
259
+
260
+ # 启动 WebSocket(阻塞)
261
+ ws_client.start()
262
 
263
  if __name__ == "__main__":
264
  main()