tracking_system_backend / api /routers /camera_stream.py
e1250's picture
fix: fixing bugs, and make sure mlflow works fine
84dbb52
import asyncio
import itertools
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from pandas.core.frame import nested_data_to_arrays
from ai.contracts.detector import DetectionResults
from backend.api.routers.metrics import active_cameras, decode_duration_seconds, depth_duration_seconds, detection_duration_seconds, frame_processing_duration_seconds
from backend.contracts.camera_metadata import CameraMetadata, DetectionMetadata
import traceback
import mlflow
from backend.utils.experiment import log_config
import cv2 as cv
import numpy as np
import time
router = APIRouter()
@router.websocket("/stream/{camera_id}")
async def websocket_detect(websocket: WebSocket, camera_id:str):
"""
WebSocket stream takes the frame pass it to the ai models, save it under the camera id provided in the url.
url here is: ws://127.0.0.1:8000/detectors/stream/camera_id
"""
# Yes, I asked the same questions, is using webscoket.app.state many times here is consuming. after checking, it is not performance consuming.
state = websocket.app.state
logger = state.logger
detector = state.detection_model
safety_detector = state.safety_detection_model
depth_model = state.depth_model
# Accepting the connection from the client
await websocket.accept()
# Logging and tracking action
active_cameras.inc()
logger.info(f"Client ID >>{camera_id}<< Connected...")
loop = asyncio.get_running_loop()
step_counter = itertools.count()
if mlflow.active_run():
mlflow.end_run()
run = mlflow.start_run(run_name=f'camera_{camera_id}', nested=True)
log_config()
try:
# What are the info you aim to collect from the camera?
# How many frames received by second.
# Frame processing time.
# Average processing time for logger.
# Model processing time.
# frame_count = itertools.count()
logger.info(f"Camera {camera_id} start sending frames...")
def decode_frame():
# Decode image
return cv.imdecode(np.frombuffer(frame_bytes, np.uint8), cv.IMREAD_COLOR)
def run_detection(frame) -> DetectionResults:
return detector.detect(frame)
def run_safety(frame) -> DetectionResults:
return safety_detector.detect(frame)
def run_depth(frame, points):
return depth_model.calculate_depth(frame, points)
# Keep receiving messages in a loop until disconnection.
while True:
frame_bytes = await websocket.receive_bytes()
# Profiling
t0 = time.time()
image_array = await loop.run_in_executor(None, decode_frame)
decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3), next(step_counter))
detection_task = loop.run_in_executor(None, run_detection, image_array)
safety_task = loop.run_in_executor(None, run_safety, image_array)
detections, safety_detection = await asyncio.gather(detection_task, safety_task)
detection_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
mlflow.log_metric("detection_duration_seconds", round(time.time() - t0, 3), next(step_counter))
# Profiling
frame_processing_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
logger.debug("Frame processed", camera_id=camera_id)
mlflow.log_metric("frame_processing duration time", round(time.time() - t0, 3), next(step_counter))
boxes_center = []
boxes_center_ratio = []
for box in detections.detections:
print(type(box))
xmin, ymin, xmax, ymax = box.xyxy
xcenter = (xmax + xmin) / 2
ycenter = (ymax + ymin) / 2
boxes_center.append((int(xcenter), int(ycenter)))
boxes_center_ratio.append(xcenter / image_array.shape[1])
depth_points = await loop.run_in_executor(None, run_depth, image_array, boxes_center) if boxes_center else []
depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3), next(step_counter))
detection_metadata = [DetectionMetadata(depth=depth, xRatio=xRatio) for depth, xRatio in zip(depth_points, boxes_center_ratio)]
metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, detection_metadata=detection_metadata)
state.camera_metadata[camera_id] = metadata.model_dump()
# Note that JSONResponse doesn't work here, as it is for HTTP
await websocket.send_json({"status": 200, "camera_id": camera_id})
except WebSocketDisconnect:
logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
state.camera_metadata.pop(camera_id, None)
except Exception as e:
logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
traceback.print_exc() # This one is actually really better, it shows more details about the issue happened.
# Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
await websocket.close()
finally:
active_cameras.dec()
mlflow.end_run()
# Uncomment this when needed, It is the same but using HTTP, which is Request Response only. could be used for testing.
# from fastapi import Request, UploadFile
# @router.post("/detect")
# async def post_detection(request: Request, file: UploadFile):
# # Request here is being used to access the app.state.model
# request.app.state.model.detect(file)