import asyncio import itertools from fastapi import APIRouter, WebSocket, WebSocketDisconnect from pandas.core.frame import nested_data_to_arrays from ai.contracts.detector import DetectionResults from backend.api.routers.metrics import active_cameras, decode_duration_seconds, depth_duration_seconds, detection_duration_seconds, frame_processing_duration_seconds from backend.contracts.camera_metadata import CameraMetadata, DetectionMetadata import traceback import mlflow from backend.utils.experiment import log_config import cv2 as cv import numpy as np import time router = APIRouter() @router.websocket("/stream/{camera_id}") async def websocket_detect(websocket: WebSocket, camera_id:str): """ WebSocket stream takes the frame pass it to the ai models, save it under the camera id provided in the url. url here is: ws://127.0.0.1:8000/detectors/stream/camera_id """ # Yes, I asked the same questions, is using webscoket.app.state many times here is consuming. after checking, it is not performance consuming. state = websocket.app.state logger = state.logger detector = state.detection_model safety_detector = state.safety_detection_model depth_model = state.depth_model # Accepting the connection from the client await websocket.accept() # Logging and tracking action active_cameras.inc() logger.info(f"Client ID >>{camera_id}<< Connected...") loop = asyncio.get_running_loop() step_counter = itertools.count() if mlflow.active_run(): mlflow.end_run() run = mlflow.start_run(run_name=f'camera_{camera_id}', nested=True) log_config() try: # What are the info you aim to collect from the camera? # How many frames received by second. # Frame processing time. # Average processing time for logger. # Model processing time. # frame_count = itertools.count() logger.info(f"Camera {camera_id} start sending frames...") def decode_frame(): # Decode image return cv.imdecode(np.frombuffer(frame_bytes, np.uint8), cv.IMREAD_COLOR) def run_detection(frame) -> DetectionResults: return detector.detect(frame) def run_safety(frame) -> DetectionResults: return safety_detector.detect(frame) def run_depth(frame, points): return depth_model.calculate_depth(frame, points) # Keep receiving messages in a loop until disconnection. while True: frame_bytes = await websocket.receive_bytes() # Profiling t0 = time.time() image_array = await loop.run_in_executor(None, decode_frame) decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3)) mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3), next(step_counter)) detection_task = loop.run_in_executor(None, run_detection, image_array) safety_task = loop.run_in_executor(None, run_safety, image_array) detections, safety_detection = await asyncio.gather(detection_task, safety_task) detection_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3)) mlflow.log_metric("detection_duration_seconds", round(time.time() - t0, 3), next(step_counter)) # Profiling frame_processing_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3)) logger.debug("Frame processed", camera_id=camera_id) mlflow.log_metric("frame_processing duration time", round(time.time() - t0, 3), next(step_counter)) boxes_center = [] boxes_center_ratio = [] for box in detections.detections: print(type(box)) xmin, ymin, xmax, ymax = box.xyxy xcenter = (xmax + xmin) / 2 ycenter = (ymax + ymin) / 2 boxes_center.append((int(xcenter), int(ycenter))) boxes_center_ratio.append(xcenter / image_array.shape[1]) depth_points = await loop.run_in_executor(None, run_depth, image_array, boxes_center) if boxes_center else [] depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3)) mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3), next(step_counter)) detection_metadata = [DetectionMetadata(depth=depth, xRatio=xRatio) for depth, xRatio in zip(depth_points, boxes_center_ratio)] metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, detection_metadata=detection_metadata) state.camera_metadata[camera_id] = metadata.model_dump() # Note that JSONResponse doesn't work here, as it is for HTTP await websocket.send_json({"status": 200, "camera_id": camera_id}) except WebSocketDisconnect: logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...") state.camera_metadata.pop(camera_id, None) except Exception as e: logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}") traceback.print_exc() # This one is actually really better, it shows more details about the issue happened. # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened. await websocket.close() finally: active_cameras.dec() mlflow.end_run() # Uncomment this when needed, It is the same but using HTTP, which is Request Response only. could be used for testing. # from fastapi import Request, UploadFile # @router.post("/detect") # async def post_detection(request: Request, file: UploadFile): # # Request here is being used to access the app.state.model # request.app.state.model.detect(file)