SissiFeng commited on
Commit
afe17aa
·
verified ·
1 Parent(s): e09cf4f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +103 -76
app.py CHANGED
@@ -254,57 +254,84 @@ def get_test_image(image_name=None):
254
  return None
255
 
256
 
257
- def capture_image(url=None, use_test_image=False, test_image_name=None):
258
- global rpi_client, latest_data
259
-
260
- if rpi_client is None:
261
- create_client("rpi")
262
-
263
- serial = DEFAULT_SERIAL
264
- request_topic = f"bambu_a1_mini/request/{serial}"
265
- response_topic = f"bambu_a1_mini/response/{serial}"
266
-
267
- logger.info(f"Subscribing to {response_topic}")
268
- rpi_client.subscribe(response_topic)
269
-
270
- rpi_client.publish(
271
- request_topic,
272
- json.dumps(
273
- {
274
- "command": "capture_image",
275
- }
276
- ),
277
- )
278
-
279
- latest_data["image_url"] = "N/A"
280
- timeout = 45
281
- while latest_data["image_url"] == "N/A" and timeout > 0:
282
- # print("timeout", timeout)
283
- time.sleep(1)
284
- timeout -= 1
285
-
286
- url = latest_data["image_url"]
 
 
 
 
 
 
 
 
287
 
288
- if use_test_image:
289
- logger.info("Using test image instead of URL")
290
- test_img = get_test_image(test_image_name)
291
- if test_img:
292
- return test_img
293
- else:
294
- logger.warning("Failed to get specified test image, trying URL")
295
 
296
- if url != "N/A":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
  try:
298
- logger.info(f"Capturing image from URL: {url}")
299
- response = requests.get(url, timeout=10)
300
- if response.status_code == 200:
301
- return Image.open(io.BytesIO(response.content))
 
 
 
 
302
  else:
303
- logger.error(f"Failed to get image from URL: {response.status_code}")
 
 
304
  except Exception as e:
305
- logger.error(f"Error capturing image from URL: {e}")
306
- else:
307
- raise Exception("url is 'N/A'")
 
 
 
308
 
309
 
310
  def health_check():
@@ -341,32 +368,6 @@ def update_print_status():
341
  )
342
 
343
 
344
- def handle_auto_capture(message):
345
- """处理自动拍照请求"""
346
- try:
347
- logger.info(f"收到自动拍照请求: {message}")
348
-
349
- # 提取信息
350
- print_job = message.get("print_job", "unknown_job")
351
- timestamp = message.get("timestamp", time.strftime("%Y-%m-%d %H:%M:%S"))
352
-
353
- # 记录到最新数据���
354
- latest_data["auto_capture_requested"] = True
355
- latest_data["last_capture_job"] = print_job
356
- latest_data["last_capture_time"] = timestamp
357
-
358
- # 执行拍照操作 - 假设capture_image函数已存在
359
- image, status = capture_image()
360
-
361
- # 记录拍照结果
362
- logger.info(f"自动拍照结果: {status}")
363
-
364
- return image, status
365
- except Exception as e:
366
- logger.error(f"处理自动拍照请求时出错: {e}")
367
- return None, f"自动拍照失败: {str(e)}"
368
-
369
-
370
  demo = gr.Blocks(title="Bambu A1 Mini Print Control")
371
 
372
  with demo:
@@ -672,20 +673,46 @@ with demo:
672
  # 添加自动拍照状态显示
673
  with gr.Row():
674
  auto_capture_status = gr.Textbox(label="Auto Capture Status", value="Waiting")
 
675
 
676
- # 添加自动刷新,更新自动拍照状态
 
 
 
 
 
 
 
677
  def update_capture_status():
678
  auto_status = "Triggered" if latest_data.get("auto_capture_requested", False) else "Waiting"
679
  last_time = latest_data.get("last_capture_time", "None")
680
  last_job = latest_data.get("last_capture_job", "None")
681
  return f"{auto_status} (Last: {last_job} at {last_time})"
682
 
683
- demo.load(
 
684
  fn=update_capture_status,
685
  inputs=[],
686
- outputs=[auto_capture_status],
687
- every=5 # 每5秒刷新一次
688
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
689
 
690
  if __name__ == "__main__":
691
  logger.info("Starting Bambu A1 Mini Print Control application")
 
254
  return None
255
 
256
 
257
+ def capture_image():
258
+ """捕获图像"""
259
+ try:
260
+ logger.info("Capturing image...")
261
+
262
+ # 尝试使用摄像头捕获图像
263
+ try:
264
+ cap = cv2.VideoCapture(0) # 使用默认摄像头
265
+ ret, frame = cap.read()
266
+ cap.release()
267
+
268
+ if ret:
269
+ # 转换为RGB(OpenCV使用BGR)
270
+ frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
271
+ logger.info(f"Image captured successfully: {frame_rgb.shape}")
272
+ return frame_rgb
273
+ else:
274
+ logger.error("Failed to capture image from camera")
275
+ except Exception as e:
276
+ logger.error(f"Error accessing camera: {e}")
277
+
278
+ # 如果摄像头捕获失败,使用测试图像
279
+ test_image = get_test_image()
280
+ if test_image is not None:
281
+ logger.info("Using test image instead")
282
+ return np.array(test_image)
283
+
284
+ # 如果测试图像也不可用,创建一个错误图像
285
+ error_img = np.zeros((480, 640, 3), dtype=np.uint8)
286
+ cv2.putText(error_img, "Camera Error", (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
287
+ return error_img
288
+
289
+ except Exception as e:
290
+ logger.error(f"Error in capture_image: {e}")
291
+ # 返回错误图像
292
+ error_img = np.zeros((480, 640, 3), dtype=np.uint8)
293
+ cv2.putText(error_img, f"Error: {str(e)}", (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
294
+ return error_img
295
 
 
 
 
 
 
 
 
296
 
297
+ def handle_auto_capture(message):
298
+ """处理自动拍照请求"""
299
+ try:
300
+ logger.info(f"收到自动拍照请求: {message}")
301
+
302
+ # 提取信息
303
+ print_job = message.get("print_job", "unknown_job")
304
+ timestamp = message.get("timestamp", time.strftime("%Y-%m-%d %H:%M:%S"))
305
+
306
+ # 记录到最新数据中
307
+ latest_data["auto_capture_requested"] = True
308
+ latest_data["last_capture_job"] = print_job
309
+ latest_data["last_capture_time"] = timestamp
310
+
311
+ # 执行拍照操作
312
+ image = capture_image()
313
+
314
+ # 保存图像
315
  try:
316
+ save_dir = os.path.join(os.path.dirname(__file__), "captured_images")
317
+ os.makedirs(save_dir, exist_ok=True)
318
+
319
+ filename = f"{print_job}_{timestamp.replace(' ', '_').replace(':', '-')}.jpg"
320
+ filepath = os.path.join(save_dir, filename)
321
+
322
+ if isinstance(image, np.ndarray):
323
+ cv2.imwrite(filepath, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
324
  else:
325
+ image.save(filepath)
326
+
327
+ logger.info(f"Saved captured image to {filepath}")
328
  except Exception as e:
329
+ logger.error(f"Error saving captured image: {e}")
330
+
331
+ return image
332
+ except Exception as e:
333
+ logger.error(f"处理自动拍照请求时出错: {e}")
334
+ return None
335
 
336
 
337
  def health_check():
 
368
  )
369
 
370
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
371
  demo = gr.Blocks(title="Bambu A1 Mini Print Control")
372
 
373
  with demo:
 
673
  # 添加自动拍照状态显示
674
  with gr.Row():
675
  auto_capture_status = gr.Textbox(label="Auto Capture Status", value="Waiting")
676
+ refresh_status_btn = gr.Button("Refresh Status")
677
 
678
+ # 添加手动拍照按钮和显示区域
679
+ with gr.Row():
680
+ capture_btn = gr.Button("Capture Image")
681
+
682
+ with gr.Row():
683
+ captured_image = gr.Image(label="Captured Image", type="numpy")
684
+
685
+ # 定义更新状态函数
686
  def update_capture_status():
687
  auto_status = "Triggered" if latest_data.get("auto_capture_requested", False) else "Waiting"
688
  last_time = latest_data.get("last_capture_time", "None")
689
  last_job = latest_data.get("last_capture_job", "None")
690
  return f"{auto_status} (Last: {last_job} at {last_time})"
691
 
692
+ # 连接刷新按钮
693
+ refresh_status_btn.click(
694
  fn=update_capture_status,
695
  inputs=[],
696
+ outputs=[auto_capture_status]
 
697
  )
698
+
699
+ # 连接拍照按钮
700
+ capture_btn.click(
701
+ fn=capture_image,
702
+ inputs=[],
703
+ outputs=[captured_image]
704
+ )
705
+
706
+ # 使用JavaScript实现自动刷新
707
+ auto_refresh_js = """
708
+ function autoRefresh() {
709
+ document.querySelector('#refresh-status-btn').click();
710
+ setTimeout(autoRefresh, 5000); // 每5秒刷新一次
711
+ }
712
+ setTimeout(autoRefresh, 5000); // 启动自动刷新
713
+ """
714
+
715
+ gr.HTML(f"<script>{auto_refresh_js}</script>")
716
 
717
  if __name__ == "__main__":
718
  logger.info("Starting Bambu A1 Mini Print Control application")