Spaces:
Sleeping
Sleeping
| from api.routers.metrics import depth_duration_seconds | |
| from api.routers.metrics import detection_duration_seconds | |
| from api.routers.metrics import decode_duration_seconds | |
| from utils.profiling import profile_step | |
| from domain.detection_box_center import calculate_detection_box_center | |
| import asyncio | |
| from contracts.camera_metadata import DetectionMetadata | |
| from contracts.camera_metadata import CameraMetadata | |
| import cv2 as cv | |
| import numpy as np | |
| import json | |
| class ProcessingPipeline: | |
| def __init__(self, detector, depth_model, safety_detector, redis, config): | |
| self.detector = detector | |
| self.depth_model = depth_model | |
| self.safety_detector = safety_detector | |
| self.redis = redis | |
| self.config = config | |
| def _decode_frame(self, fb): | |
| return cv.imdecode(np.frombuffer(fb, np.uint8), cv.IMREAD_COLOR) | |
| def _camera_metadata( | |
| self, camera_id, safety_detection, depth_points, boxes_center_ratio | |
| ) -> CameraMetadata: | |
| detection_metadata = [ | |
| DetectionMetadata(depth=depth, xRatio=xRatio) | |
| for depth, xRatio in zip(depth_points, boxes_center_ratio) | |
| ] | |
| metadata = CameraMetadata( | |
| camera_id=camera_id, | |
| is_danger=True if safety_detection else False, | |
| detection_metadata=detection_metadata, | |
| ) | |
| return metadata | |
| async def run(self, camera_id: str, frame_bytes, frame_count): | |
| loop = asyncio.get_running_loop() | |
| with profile_step( | |
| "frame_processing_time", | |
| decode_duration_seconds, | |
| camera_id, | |
| frame_count, | |
| experiment=self.config.experiment, | |
| ): | |
| frame_bytes = await loop.run_in_executor( | |
| None, self._decode_frame, frame_bytes | |
| ) | |
| with profile_step( | |
| "detection_duration_seconds", | |
| detection_duration_seconds, | |
| camera_id, | |
| frame_count, | |
| experiment=self.config.experiment, | |
| ): | |
| detection_task = loop.run_in_executor( | |
| None, self.detector.detect, frame_bytes | |
| ) | |
| safety_task = loop.run_in_executor( | |
| None, self.safety_detector.detect, frame_bytes | |
| ) | |
| detections, safety_detection = await asyncio.gather( | |
| detection_task, safety_task | |
| ) | |
| boxes_center, boxes_center_ratio = calculate_detection_box_center( | |
| detections.detections, frame_bytes.shape[1] | |
| ) | |
| depth_points = [] | |
| if boxes_center: | |
| with profile_step( | |
| "depth_duration_seconds", | |
| depth_duration_seconds, | |
| camera_id, | |
| frame_count, | |
| experiment=self.config.experiment, | |
| ): | |
| depth_points = await loop.run_in_executor( | |
| None, self.depth_model.calculate_depth, frame_bytes, boxes_center | |
| ) | |
| metadata = self._camera_metadata( | |
| camera_id, safety_detection, depth_points, boxes_center_ratio | |
| ) | |
| # Note that this will generate text metadata.model_dump_json() don't use it here. | |
| await self.redis.publish( | |
| "dashboard_stream", | |
| json.dumps({camera_id: metadata.model_dump(mode="json")}), | |
| ) | |
| # Even if the camera was disconnected, redis is still going to show its data, which is not accurate. | |
| # Instead, we set expiry date for the camera data. | |
| await self.redis.setex( | |
| f"camera:{camera_id}:latest", # And this is the key, or tag | |
| 10, # in seconds | |
| metadata.model_dump_json(), | |
| ) | |
| # Note that JSONResponse doesn't work here, as it is for HTTP | |
| return {"status": 200, "camera_id": camera_id} | |