Spaces:
Runtime error
Runtime error
File size: 5,958 Bytes
e19b795 84dbb52 e19b795 84dbb52 c303abd e19b795 84dbb52 e19b795 c303abd e19b795 84dbb52 e19b795 c303abd e19b795 c303abd 84dbb52 e19b795 c303abd e19b795 c303abd 84dbb52 c303abd 84dbb52 c303abd e19b795 c1c755a e19b795 c1c755a 84dbb52 e19b795 c303abd c1c755a e19b795 c303abd e19b795 c303abd e19b795 c303abd e19b795 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | import asyncio
import itertools
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from pandas.core.frame import nested_data_to_arrays
from ai.contracts.detector import DetectionResults
from backend.api.routers.metrics import active_cameras, decode_duration_seconds, depth_duration_seconds, detection_duration_seconds, frame_processing_duration_seconds
from backend.contracts.camera_metadata import CameraMetadata, DetectionMetadata
import traceback
import mlflow
from backend.utils.experiment import log_config
import cv2 as cv
import numpy as np
import time
router = APIRouter()
@router.websocket("/stream/{camera_id}")
async def websocket_detect(websocket: WebSocket, camera_id:str):
"""
WebSocket stream takes the frame pass it to the ai models, save it under the camera id provided in the url.
url here is: ws://127.0.0.1:8000/detectors/stream/camera_id
"""
# Yes, I asked the same questions, is using webscoket.app.state many times here is consuming. after checking, it is not performance consuming.
state = websocket.app.state
logger = state.logger
detector = state.detection_model
safety_detector = state.safety_detection_model
depth_model = state.depth_model
# Accepting the connection from the client
await websocket.accept()
# Logging and tracking action
active_cameras.inc()
logger.info(f"Client ID >>{camera_id}<< Connected...")
loop = asyncio.get_running_loop()
step_counter = itertools.count()
if mlflow.active_run():
mlflow.end_run()
run = mlflow.start_run(run_name=f'camera_{camera_id}', nested=True)
log_config()
try:
# What are the info you aim to collect from the camera?
# How many frames received by second.
# Frame processing time.
# Average processing time for logger.
# Model processing time.
# frame_count = itertools.count()
logger.info(f"Camera {camera_id} start sending frames...")
def decode_frame():
# Decode image
return cv.imdecode(np.frombuffer(frame_bytes, np.uint8), cv.IMREAD_COLOR)
def run_detection(frame) -> DetectionResults:
return detector.detect(frame)
def run_safety(frame) -> DetectionResults:
return safety_detector.detect(frame)
def run_depth(frame, points):
return depth_model.calculate_depth(frame, points)
# Keep receiving messages in a loop until disconnection.
while True:
frame_bytes = await websocket.receive_bytes()
# Profiling
t0 = time.time()
image_array = await loop.run_in_executor(None, decode_frame)
decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3), next(step_counter))
detection_task = loop.run_in_executor(None, run_detection, image_array)
safety_task = loop.run_in_executor(None, run_safety, image_array)
detections, safety_detection = await asyncio.gather(detection_task, safety_task)
detection_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
mlflow.log_metric("detection_duration_seconds", round(time.time() - t0, 3), next(step_counter))
# Profiling
frame_processing_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
logger.debug("Frame processed", camera_id=camera_id)
mlflow.log_metric("frame_processing duration time", round(time.time() - t0, 3), next(step_counter))
boxes_center = []
boxes_center_ratio = []
for box in detections.detections:
print(type(box))
xmin, ymin, xmax, ymax = box.xyxy
xcenter = (xmax + xmin) / 2
ycenter = (ymax + ymin) / 2
boxes_center.append((int(xcenter), int(ycenter)))
boxes_center_ratio.append(xcenter / image_array.shape[1])
depth_points = await loop.run_in_executor(None, run_depth, image_array, boxes_center) if boxes_center else []
depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3), next(step_counter))
detection_metadata = [DetectionMetadata(depth=depth, xRatio=xRatio) for depth, xRatio in zip(depth_points, boxes_center_ratio)]
metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, detection_metadata=detection_metadata)
state.camera_metadata[camera_id] = metadata.model_dump()
# Note that JSONResponse doesn't work here, as it is for HTTP
await websocket.send_json({"status": 200, "camera_id": camera_id})
except WebSocketDisconnect:
logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
state.camera_metadata.pop(camera_id, None)
except Exception as e:
logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
traceback.print_exc() # This one is actually really better, it shows more details about the issue happened.
# Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
await websocket.close()
finally:
active_cameras.dec()
mlflow.end_run()
# Uncomment this when needed, It is the same but using HTTP, which is Request Response only. could be used for testing.
# from fastapi import Request, UploadFile
# @router.post("/detect")
# async def post_detection(request: Request, file: UploadFile):
# # Request here is being used to access the app.state.model
# request.app.state.model.detect(file) |