e1250 commited on
Commit
c303abd
·
1 Parent(s): c1c755a

feat: adjusting monitoring and profiling, adding mlflow

Browse files
README.md CHANGED
@@ -3,4 +3,6 @@ title: Tracking System Backend
3
  sdk: docker
4
  app_port: 7960
5
  pinned: false
6
- ---
 
 
 
3
  sdk: docker
4
  app_port: 7960
5
  pinned: false
6
+ ---
7
+
8
+ This project is part of a big project proiding Real-Time Tracking system for Indoors.
api/routers/camera_stream.py CHANGED
@@ -3,7 +3,7 @@ import itertools
3
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
4
  from ai.contracts.detector import DetectionResults
5
  from backend.api.routers.metrics import active_cameras, frame_processing_duration_seconds
6
- from backend.contracts.camera_metadata import CameraMetadata
7
  import traceback
8
 
9
  import cv2 as cv
@@ -26,6 +26,7 @@ async def websocket_detect(websocket: WebSocket, camera_id:str):
26
  safety_detector = state.safety_detection_model
27
  depth_model = state.depth_model
28
 
 
29
  # Accepting the connection from the client
30
  await websocket.accept()
31
 
@@ -34,6 +35,7 @@ async def websocket_detect(websocket: WebSocket, camera_id:str):
34
  logger.info(f"Client ID >>{camera_id}<< Connected...")
35
 
36
  loop = asyncio.get_running_loop()
 
37
 
38
  try:
39
  # What are the info you aim to collect from the camera?
@@ -62,17 +64,32 @@ async def websocket_detect(websocket: WebSocket, camera_id:str):
62
  # Keep receiving messages in a loop until disconnection.
63
  while True:
64
 
65
- # Profiling
66
- time_start = time.time()
67
-
68
  frame_bytes = await websocket.receive_bytes()
69
 
 
 
 
70
  image_array = await loop.run_in_executor(None, decode_frame)
 
 
71
 
72
  detection_task = loop.run_in_executor(None, run_detection, image_array)
73
  safety_task = loop.run_in_executor(None, run_safety, image_array)
 
74
  detections, safety_detection = await asyncio.gather(detection_task, safety_task)
75
-
 
 
 
 
 
 
 
 
 
 
 
 
76
  boxes_center = []
77
  boxes_center_ratio = []
78
  for box in detections.detections:
@@ -82,34 +99,27 @@ async def websocket_detect(websocket: WebSocket, camera_id:str):
82
  ycenter = (ymax + ymin) / 2
83
  boxes_center.append((int(xcenter), int(ycenter)))
84
  boxes_center_ratio.append(xcenter / image_array.shape[1])
85
-
86
- depth_points = await loop.run_in_executor(None, run_depth, image_array, boxes_center) if boxes_center else []
87
 
88
- detection_metadata = [{"depth": depth, "xRatio": xRatio} for depth, xRatio in zip(depth_points, boxes_center_ratio)]
89
  metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, detection_metadata=detection_metadata)
90
- print(metadata)
91
  state.camera_metadata[camera_id] = metadata.model_dump()
92
 
93
- # Profiling
94
- duration = time.time() - time_start
95
- frame_processing_duration_seconds.labels(camera_id).observe(round(duration, 3))
96
- logger.debug("Frame processed", camera_id=camera_id)
97
-
98
  # Note that JSONResponse doesn't work here, as it is for HTTP
99
  await websocket.send_json({"status": 200, "camera_id": camera_id})
100
 
101
  except WebSocketDisconnect:
102
  logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
103
- traceback.print_exc() # This one is actually really better, it shows more details about the issue happened.
104
- # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
105
  state.camera_metadata.pop(camera_id, None)
106
 
107
  except Exception as e:
108
  logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
109
- traceback.print_exc()
 
110
  await websocket.close()
 
111
  finally:
112
  active_cameras.dec()
 
113
 
114
 
115
  # Uncomment this when needed, It is the same but using HTTP, which is Request Response only. could be used for testing.
 
3
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
4
  from ai.contracts.detector import DetectionResults
5
  from backend.api.routers.metrics import active_cameras, frame_processing_duration_seconds
6
+ from backend.contracts.camera_metadata import CameraMetadata, DetectionMetadata
7
  import traceback
8
 
9
  import cv2 as cv
 
26
  safety_detector = state.safety_detection_model
27
  depth_model = state.depth_model
28
 
29
+
30
  # Accepting the connection from the client
31
  await websocket.accept()
32
 
 
35
  logger.info(f"Client ID >>{camera_id}<< Connected...")
36
 
37
  loop = asyncio.get_running_loop()
38
+ run = mlflow.start_run(run_name=f'camera_{camera_id}')
39
 
40
  try:
41
  # What are the info you aim to collect from the camera?
 
64
  # Keep receiving messages in a loop until disconnection.
65
  while True:
66
 
 
 
 
67
  frame_bytes = await websocket.receive_bytes()
68
 
69
+ # Profiling
70
+ t0 = time.time()
71
+
72
  image_array = await loop.run_in_executor(None, decode_frame)
73
+ decode_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
74
+ mlflow.log_metric("frame_processing_time", round(time.time() - t0, 3))
75
 
76
  detection_task = loop.run_in_executor(None, run_detection, image_array)
77
  safety_task = loop.run_in_executor(None, run_safety, image_array)
78
+
79
  detections, safety_detection = await asyncio.gather(detection_task, safety_task)
80
+ detection_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
81
+ mlflow.log_metric("detection_duration_seconds", round(time.time() - t0, 3))
82
+
83
+ depth_points = await loop.run_in_executor(None, run_depth, image_array, boxes_center) if boxes_center else []
84
+ depth_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
85
+ mlflow.log_metric("depth_duration_seconds", round(time.time() - t0, 3))
86
+
87
+ # Profiling
88
+ frame_processing_duration_seconds.labels(camera_id).observe(round(time.time() - t0, 3))
89
+ logger.debug("Frame processed", camera_id=camera_id)
90
+ mlflow.log_metric("frame_processing duration time", round(time.time() - t0, 3))
91
+
92
+
93
  boxes_center = []
94
  boxes_center_ratio = []
95
  for box in detections.detections:
 
99
  ycenter = (ymax + ymin) / 2
100
  boxes_center.append((int(xcenter), int(ycenter)))
101
  boxes_center_ratio.append(xcenter / image_array.shape[1])
 
 
102
 
103
+ detection_metadata = [DetectionMetadata(depth=depth, xRatio=xRatio) for depth, xRatio in zip(depth_points, boxes_center_ratio)]
104
  metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, detection_metadata=detection_metadata)
 
105
  state.camera_metadata[camera_id] = metadata.model_dump()
106
 
 
 
 
 
 
107
  # Note that JSONResponse doesn't work here, as it is for HTTP
108
  await websocket.send_json({"status": 200, "camera_id": camera_id})
109
 
110
  except WebSocketDisconnect:
111
  logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
 
 
112
  state.camera_metadata.pop(camera_id, None)
113
 
114
  except Exception as e:
115
  logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
116
+ traceback.print_exc() # This one is actually really better, it shows more details about the issue happened.
117
+ # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
118
  await websocket.close()
119
+
120
  finally:
121
  active_cameras.dec()
122
+ mlflow.end_run()
123
 
124
 
125
  # Uncomment this when needed, It is the same but using HTTP, which is Request Response only. could be used for testing.
api/routers/dashboard_stream.py CHANGED
@@ -1,6 +1,7 @@
1
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
2
  from backend.api.routers.metrics import active_dashboards
3
  import asyncio
 
4
 
5
  router = APIRouter()
6
 
@@ -35,5 +36,7 @@ async def dashboard_websocket(websocket: WebSocket):
35
 
36
  except Exception as e:
37
  logger.error(f"Dashboard Error: {e}")
 
 
38
  finally:
39
  active_dashboards.dec()
 
1
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
2
  from backend.api.routers.metrics import active_dashboards
3
  import asyncio
4
+ import traceback
5
 
6
  router = APIRouter()
7
 
 
36
 
37
  except Exception as e:
38
  logger.error(f"Dashboard Error: {e}")
39
+ traceback.print_exc()
40
+
41
  finally:
42
  active_dashboards.dec()
api/routers/metrics.py CHANGED
@@ -1,7 +1,11 @@
 
 
 
1
  from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
2
 
3
  metrics_asgi_app = make_asgi_app()
4
 
 
5
  active_cameras = Gauge(
6
  "active_camera_connections",
7
  "Number of Currently Connected camera websockets"
@@ -18,3 +22,24 @@ frame_processing_duration_seconds = Histogram(
18
  ["camera_id"]
19
  )
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Prometheus is for real-time system health.
2
+ # Grafana visualize the output of Prometheus
3
+ # This is considered as Monitoring
4
  from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
5
 
6
  metrics_asgi_app = make_asgi_app()
7
 
8
+
9
  active_cameras = Gauge(
10
  "active_camera_connections",
11
  "Number of Currently Connected camera websockets"
 
22
  ["camera_id"]
23
  )
24
 
25
+ decode_duration_seconds = Histogram(
26
+ "decode_duration_seconds",
27
+ "Time to decode one image",
28
+ ["camera_id"]
29
+ )
30
+ detection_duration_seconds = Histogram(
31
+ "detection_duration_seconds",
32
+ "Time to detect",
33
+ ["camera_id"]
34
+ )
35
+ depth_duration_seconds = Histogram(
36
+ "depth_duration_seconds",
37
+ "Time to calculate the depth",
38
+ ["camera_id"]
39
+ )
40
+
41
+
42
+ cpu_usage = Gauge("cpu_usage_percent", "CPU usage %")
43
+ mem_usage = Gauge("mem_usage_percent", "mem usage %")
44
+
45
+ active_workers = Gauge("active_workers", "Active threads")
contracts/camera_metadata.py CHANGED
@@ -1,10 +1,11 @@
1
  from typing import List
2
  from pydantic import BaseModel
3
 
 
 
 
4
 
5
  class CameraMetadata(BaseModel):
6
  camera_id: str
7
  is_danger: bool = False
8
- detection_metadata: List
9
- # depth_points: List = []
10
- # box_detections_ratio: List = []
 
1
  from typing import List
2
  from pydantic import BaseModel
3
 
4
+ class DetectionMetadata(BaseModel):
5
+ depth: float
6
+ xRatio: float
7
 
8
  class CameraMetadata(BaseModel):
9
  camera_id: str
10
  is_danger: bool = False
11
+ detection_metadata: List
 
 
main.py CHANGED
@@ -11,6 +11,8 @@ from backend.api.routers import health
11
  from contextlib import asynccontextmanager
12
  from infra.logger_structlog import StructLogger
13
  import asyncio
 
 
14
 
15
  @asynccontextmanager
16
  async def lifespan(app: FastAPI):
@@ -47,7 +49,10 @@ async def lifespan(app: FastAPI):
47
 
48
  logger.warn("Shutting down the server....")
49
  # You can remove connections and release gpu here .
50
-
 
 
 
51
  app = FastAPI(
52
  title="Tracking System Backend",
53
  description="real-time frame processing API",
 
11
  from contextlib import asynccontextmanager
12
  from infra.logger_structlog import StructLogger
13
  import asyncio
14
+ import mlflow
15
+ from backend.utils.experiment import log_config()
16
 
17
  @asynccontextmanager
18
  async def lifespan(app: FastAPI):
 
49
 
50
  logger.warn("Shutting down the server....")
51
  # You can remove connections and release gpu here .
52
+
53
+ mlflow.set_experiment("realtime-detection-system")
54
+ log_config()
55
+
56
  app = FastAPI(
57
  title="Tracking System Backend",
58
  description="real-time frame processing API",
utils/experiment.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import mlflow
2
+
3
+ def start_run(camera_id: str):
4
+ return mlflow.start_run(run_name=f"camera_{camera_id}")
5
+
6
+ def log_config():
7
+ mlflow.log_param("detector", "yolov26_n")
8
+ mlflow.log_param("safety_model", "custom YOLO26_n")
9
+ mlflow.log_param("depth_model", "depthAnything_n")
10
+
11
+ def log_metrics(metrics:dict):
12
+ for k, v in metrics.items():
13
+ mlflow.log_metric(k, v)