File size: 5,958 Bytes
e19b795
 
 
84dbb52
e19b795
84dbb52
c303abd
e19b795
84dbb52
 
e19b795
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c303abd
e19b795
 
 
 
 
 
 
 
84dbb52
 
 
 
 
e19b795
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c303abd
 
 
e19b795
c303abd
84dbb52
e19b795
 
 
c303abd
e19b795
c303abd
84dbb52
c303abd
 
 
 
84dbb52
c303abd
e19b795
c1c755a
e19b795
 
 
 
 
 
c1c755a
84dbb52
 
 
 
e19b795
c303abd
c1c755a
e19b795
 
 
 
 
 
 
 
 
 
 
c303abd
 
e19b795
c303abd
e19b795
 
c303abd
e19b795
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import asyncio
import itertools
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from pandas.core.frame import nested_data_to_arrays
from ai.contracts.detector import DetectionResults
from backend.api.routers.metrics import active_cameras, decode_duration_seconds, depth_duration_seconds, detection_duration_seconds, frame_processing_duration_seconds
from backend.contracts.camera_metadata import CameraMetadata, DetectionMetadata
import traceback
import mlflow
from backend.utils.experiment import log_config

import cv2 as cv
import numpy as np
import time

router = APIRouter()

@router.websocket("/stream/{camera_id}")
async def websocket_detect(websocket: WebSocket, camera_id:str):
    """
    WebSocket stream takes the frame pass it to the ai models, save it under the camera id provided in the url. 
    
     url here is:  ws://127.0.0.1:8000/detectors/stream/camera_id
    """
    # Yes, I asked the same questions, is using webscoket.app.state many times here is consuming.   after checking, it is not performance consuming. 
    state = websocket.app.state
    logger = state.logger
    detector = state.detection_model
    safety_detector = state.safety_detection_model
    depth_model = state.depth_model
    

    # Accepting the connection from the client
    await websocket.accept()

    # Logging and tracking action    
    active_cameras.inc()
    logger.info(f"Client ID >>{camera_id}<< Connected...")
    
    loop = asyncio.get_running_loop() 
    step_counter = itertools.count()
    if mlflow.active_run():
         mlflow.end_run()
    run = mlflow.start_run(run_name=f'camera_{camera_id}', nested=True)
    log_config()

    try:
        # What are the info you aim to collect from the camera? 
        # How many frames received by second. 
        # Frame processing time. 
        # Average processing time  for logger. 
        # Model processing time. 

        # frame_count = itertools.count()

        logger.info(f"Camera {camera_id} start sending frames...")

        def decode_frame():
            # Decode image
            return cv.imdecode(np.frombuffer(frame_bytes, np.uint8), cv.IMREAD_COLOR)        
            
        def run_detection(frame) -> DetectionResults:
            return detector.detect(frame)

        def run_safety(frame) -> DetectionResults:
            return safety_detector.detect(frame)

        def run_depth(frame, points):
            return depth_model.calculate_depth(frame, points)

        # Keep receiving messages in a loop until disconnection. 
        while True:

            frame_bytes = await websocket.receive_bytes()
            
            # Profiling
            t0 = time.time()            

            image_array = await loop.run_in_executor(None, decode_frame)
            decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
            mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3), next(step_counter))

            detection_task = loop.run_in_executor(None, run_detection, image_array)
            safety_task = loop.run_in_executor(None, run_safety, image_array)

            detections, safety_detection = await asyncio.gather(detection_task, safety_task)
            detection_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
            mlflow.log_metric("detection_duration_seconds", round(time.time() - t0, 3), next(step_counter))

            # Profiling
            frame_processing_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
            logger.debug("Frame processed", camera_id=camera_id)
            mlflow.log_metric("frame_processing duration time", round(time.time() - t0, 3), next(step_counter))

            boxes_center = []
            boxes_center_ratio = []
            for box in detections.detections:
                print(type(box))
                xmin, ymin, xmax, ymax = box.xyxy
                xcenter = (xmax + xmin) / 2
                ycenter = (ymax + ymin) / 2
                boxes_center.append((int(xcenter), int(ycenter)))
                boxes_center_ratio.append(xcenter / image_array.shape[1])
            
            depth_points = await loop.run_in_executor(None, run_depth, image_array, boxes_center) if boxes_center else []
            depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
            mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3), next(step_counter))

            detection_metadata = [DetectionMetadata(depth=depth, xRatio=xRatio) for depth, xRatio in zip(depth_points, boxes_center_ratio)]
            metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, detection_metadata=detection_metadata)
            state.camera_metadata[camera_id] = metadata.model_dump()

            # Note that JSONResponse doesn't work here, as it is for HTTP
            await websocket.send_json({"status": 200, "camera_id": camera_id})

    except WebSocketDisconnect:
        logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
        state.camera_metadata.pop(camera_id, None)

    except Exception as e:
        logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
        traceback.print_exc() # This one is actually really better, it shows more details about the issue happened. 
        # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
        await websocket.close()

    finally:
        active_cameras.dec()
        mlflow.end_run()


# Uncomment this when needed, It is the same but using HTTP, which is Request Response only. could be used for testing. 
# from fastapi import Request, UploadFile
# @router.post("/detect")
# async def post_detection(request: Request, file: UploadFile):
#     # Request here is being used to access the app.state.model

#     request.app.state.model.detect(file)