nzjsdsk's picture
Upload 169 files
27e74f3 verified
import asyncio
import cv2
import base64
import logging
import threading
from src.application import Application
from src.constants.constants import DeviceState
from src.iot.thing import Thing
from src.iot.things.CameraVL import VL
logger = logging.getLogger("Camera")
class Camera(Thing):
def __init__(self):
super().__init__("Camera", "摄像头管理")
self.app = None
"""初始化摄像头管理器"""
if hasattr(self, '_initialized'):
return
self._initialized = True
# 加载配置
self.cap = None
self.is_running = False
self.camera_thread = None
self.result=""
from src.utils.config_manager import ConfigManager
self.config = ConfigManager.get_instance()
# 摄像头控制器
VL.ImageAnalyzer.get_instance().init(self.config.get_config('CAMERA.VLapi_key'), self.config.get_config('CAMERA.Loacl_VL_url'),self.config.get_config('CAMERA.models'))
self.VL= VL.ImageAnalyzer.get_instance()
print(f"[虚拟设备] 摄像头设备初始化完成")
self.add_property_and_method()#定义设备方法与状态属性
def add_property_and_method(self):
# 定义属性
self.add_property("power", "摄像头是否打开", lambda: self.is_running )
self.add_property("result", "识别画面的内容", lambda: self.result )
# 定义方法
self.add_method("start_camera", "打开摄像头", [],
lambda params: self.start_camera())
self.add_method("stop_camera", "关闭摄像头", [],
lambda params: self.stop_camera())
self.add_method("capture_frame_to_base64", "识别画面", [],
lambda params: self.capture_frame_to_base64())
def _camera_loop(self):
"""摄像头线程的主循环"""
camera_index = self.config.get_config('CAMERA.camera_index')
self.cap = cv2.VideoCapture(camera_index)
if not self.cap.isOpened():
logger.error("无法打开摄像头")
return
# 设置摄像头参数
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.config.get_config('CAMERA.frame_width'))
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.config.get_config('CAMERA.frame_height'))
self.cap.set(cv2.CAP_PROP_FPS, self.config.get_config('CAMERA.fps'))
self.is_running = True
while self.is_running:
ret, frame = self.cap.read()
if not ret:
logger.error("无法读取画面")
break
# 显示画面
cv2.imshow('Camera', frame)
# 按下 'q' 键退出
if cv2.waitKey(1) & 0xFF == ord('q'):
self.is_running = False
# 释放摄像头并关闭窗口
self.cap.release()
cv2.destroyAllWindows()
def start_camera(self):
"""启动摄像头线程"""
if self.camera_thread is not None and self.camera_thread.is_alive():
logger.warning("摄像头线程已在运行")
return
self.camera_thread = threading.Thread(target=self._camera_loop, daemon=True)
self.camera_thread.start()
logger.info("摄像头线程已启动")
print(f"[虚拟设备] 摄像头线程已启动")
return {"status": "success", "message": "摄像头线程已打开"}
def capture_frame_to_base64(self):
"""截取当前画面并转换为 Base64 编码"""
if not self.cap or not self.cap.isOpened():
logger.error("摄像头未打开")
return None
ret, frame = self.cap.read()
if not ret:
logger.error("无法读取画面")
return None
# 将帧转换为 JPEG 格式
_, buffer = cv2.imencode('.jpg', frame)
# 将 JPEG 图像转换为 Base64 编码
frame_base64 = base64.b64encode(buffer).decode('utf-8')
self.result=str(self.VL.analyze_image(frame_base64))
print(self.result)
# 获取应用程序实例
self.app = Application.get_instance()
logger.info("画面已经识别到啦")
print(f"[虚拟设备] 画面已经识别完成")
self.app.set_device_state(DeviceState.LISTENING)
asyncio.create_task(self.app.protocol.send_wake_word_detected("播报识别结果"))
return {"status": 'success', "message": "识别成功","result":self.result}
def stop_camera(self):
"""停止摄像头线程"""
self.is_running = False
if self.camera_thread is not None:
self.camera_thread.join() # 等待线程结束
self.camera_thread = None
logger.info("摄像头线程已停止")
print(f"[虚拟设备] 摄像头线程已停止")
return {"status": "success", "message": "摄像头线程已停止"}