e1250 commited on
Commit
e19b795
·
0 Parent(s):

most of the functions are tested and ready, still just fixes and connect them together

Browse files
api/routers/__pycache__/camera_stream.cpython-311.pyc ADDED
Binary file (6.17 kB). View file
 
api/routers/__pycache__/dashboard_stream.cpython-311.pyc ADDED
Binary file (2.22 kB). View file
 
api/routers/__pycache__/health.cpython-311.pyc ADDED
Binary file (2 kB). View file
 
api/routers/__pycache__/metrics.cpython-311.pyc ADDED
Binary file (803 Bytes). View file
 
api/routers/camera_stream.py ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import itertools
3
+ from fastapi import APIRouter, WebSocket, WebSocketDisconnect
4
+ from ai.contracts.detector import DetectionResults
5
+ from backend.api.routers.metrics import active_cameras, frame_processing_duration_seconds
6
+ from backend.contracts.camera_metadata import CameraMetadata
7
+ import traceback
8
+
9
+ import cv2 as cv
10
+ import numpy as np
11
+ import time
12
+
13
+ router = APIRouter()
14
+
15
+ @router.websocket("/stream/{camera_id}")
16
+ async def websocket_detect(websocket: WebSocket, camera_id:str):
17
+ """
18
+ WebSocket stream takes the frame pass it to the ai models, save it under the camera id provided in the url.
19
+
20
+ url here is: ws://127.0.0.1:8000/detectors/stream/camera_id
21
+ """
22
+ # Yes, I asked the same questions, is using webscoket.app.state many times here is consuming. after checking, it is not performance consuming.
23
+ state = websocket.app.state
24
+ logger = state.logger
25
+ detector = state.detection_model
26
+ safety_detector = state.safety_detection_model
27
+ depth_model = state.depth_model
28
+
29
+ # Accepting the connection from the client
30
+ await websocket.accept()
31
+
32
+ # Logging and tracking action
33
+ active_cameras.inc()
34
+ logger.info(f"Client ID >>{camera_id}<< Connected...")
35
+
36
+ loop = asyncio.get_running_loop()
37
+
38
+ try:
39
+ # What are the info you aim to collect from the camera?
40
+ # How many frames received by second.
41
+ # Frame processing time.
42
+ # Average processing time for logger.
43
+ # Model processing time.
44
+
45
+ # frame_count = itertools.count()
46
+
47
+ logger.info(f"Camera {camera_id} start sending frames...")
48
+
49
+ def decode_frame():
50
+ # Decode image
51
+ return cv.imdecode(np.frombuffer(frame_bytes, np.uint8), cv.IMREAD_COLOR)
52
+
53
+ def run_detection(frame) -> DetectionResults:
54
+ return detector.detect(frame)
55
+
56
+ def run_safety(frame) -> DetectionResults:
57
+ return safety_detector.detect(frame)
58
+
59
+ def run_depth(frame, points):
60
+ return depth_model.calculate_depth(frame, points)
61
+
62
+ # Keep receiving messages in a loop until disconnection.
63
+ while True:
64
+
65
+ # Profiling
66
+ time_start = time.time()
67
+
68
+ frame_bytes = await websocket.receive_bytes()
69
+
70
+ image_array = await loop.run_in_executor(None, decode_frame)
71
+
72
+ detection_task = loop.run_in_executor(None, run_detection, image_array)
73
+ safety_task = loop.run_in_executor(None, run_safety, image_array)
74
+ detections, safety_detection = await asyncio.gather(detection_task, safety_task)
75
+
76
+ boxes_center = []
77
+ for box in detections.detections:
78
+ print(type(box))
79
+ xmin, ymin, xmax, ymax = box.xyxy
80
+ xcenter = (xmax + xmin) / 2
81
+ ycenter = (ymax + ymin) / 2
82
+ boxes_center.append((int(xcenter), int(ycenter)))
83
+
84
+ # print(depth_model)
85
+ depth_points = await loop.run_in_executor(None, run_depth, image_array, boxes_center) if boxes_center else []
86
+ # depth_points = []
87
+ # print(depth_points)
88
+ # print(type(depth_points))
89
+ # print(boxes_center)
90
+
91
+ metadata = CameraMetadata(camera_id=camera_id, is_danger = True if safety_detection else False, depth_points=depth_points)
92
+ state.camera_metadata[camera_id] = metadata.model_dump()
93
+
94
+ # Profiling
95
+ duration = time.time() - time_start
96
+ frame_processing_duration_seconds.labels(camera_id).observe(round(duration, 3))
97
+ logger.debug("Frame processed", camera_id=camera_id)
98
+
99
+ # Note that JSONResponse doesn't work here, as it is for HTTP
100
+ await websocket.send_json({"status": 200, "camera_id": camera_id})
101
+
102
+
103
+ except WebSocketDisconnect:
104
+ logger.warn(f"Client ID >>{camera_id}<< Disconnected Normally...")
105
+ traceback.print_exc() # This one is actually really better, it shows more details about the issue happened.
106
+ # Also work on and create the logger.exception, as it directly controls printing more details about the issue happened.
107
+ state.camera_metadata.pop(camera_id, None)
108
+
109
+ except Exception as e:
110
+ logger.error(f"Error in websocker, Client ID: >>{camera_id}<<: {e}")
111
+ traceback.print_exc()
112
+ await websocket.close()
113
+ finally:
114
+ active_cameras.dec()
115
+
116
+
117
+
118
+ # Uncomment this when needed, It is the same but using HTTP, which is Request Response only. could be used for testing.
119
+ # from fastapi import Request, UploadFile
120
+ # @router.post("/detect")
121
+ # async def post_detection(request: Request, file: UploadFile):
122
+ # # Request here is being used to access the app.state.model
123
+
124
+ # request.app.state.model.detect(file)
api/routers/dashboard_stream.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, WebSocket, WebSocketDisconnect
2
+ import asyncio
3
+
4
+ from backend.api.routers.metrics import active_dashboards
5
+
6
+ router = APIRouter()
7
+
8
+ @router.websocket("/stream")
9
+ async def dashboard_websocket(websocket: WebSocket):
10
+ """
11
+ WebScoket sending updates to the dashboard.
12
+
13
+ url: ws://127.0.0.1:8000/dashboard/stream
14
+ """
15
+ state = websocket.app.state
16
+ logger = state.logger
17
+
18
+ # Accept the client connection.
19
+ await websocket.accept()
20
+
21
+ # Logging and tracking
22
+ active_dashboards.inc()
23
+ logger.info("Dashboard Connected...")
24
+
25
+ try:
26
+ while True:
27
+ logger.debug("Sending updates to Dashboard...")
28
+ cameras_metadata = state.camera_metadata
29
+ await websocket.send_json(cameras_metadata)
30
+
31
+ # Sending data to the dashboard every 1.5 seconds.
32
+ await asyncio.sleep(state.settings.intervals.realtime_updates_every)
33
+
34
+ except WebSocketDisconnect:
35
+ logger.warn("Dashboard Disconnected Normally...")
36
+
37
+ except Exception as e:
38
+ logger.error(f"Dashboard Error: {e}")
39
+ finally:
40
+ active_dashboards.dec()
api/routers/health.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Very simple and important file, uesd to check the api health, if it return 200 everything is great, otherwise, there is an issue.
2
+ # This file is being used mostly in HTTP and not websockets.
3
+ # Health check is being used for example by docker, to check is dependencies are working fine, if not, he might restart.
4
+
5
+ from http import HTTPStatus
6
+ from datetime import datetime
7
+ from fastapi import APIRouter, Response
8
+
9
+ from backend.api.routers.metrics import active_cameras
10
+
11
+ router = APIRouter()
12
+
13
+ @router.get("/")
14
+ @router.get("/live")
15
+ async def live_check(response: Response):
16
+ """
17
+ Prove that the process is running, No logic requried here.
18
+ Confirming that the server is not dead.
19
+ It is fails, container killed and restarted..
20
+ Has to be very cheap.
21
+ """
22
+ response.status_code = HTTPStatus.OK
23
+ # TODO you can add also some prometheus info here.
24
+ return {
25
+ "status": "live",
26
+ "active_cameras": active_cameras._value.get(),
27
+ "timestamp": datetime.now().isoformat()
28
+ }
29
+
30
+ @router.get("/ready")
31
+ async def ready_check(response: Response):
32
+ """
33
+ Checck if parts work here, ex. are data readable.
34
+ Are data readable here.
35
+ Also can this instance accept traffic right now, or send them to another healthy instance.
36
+ """
37
+ # 1. Check database ping
38
+ # 2. Check Redis or cache ping
39
+ # 3. Queue connection or length
40
+
41
+ response.status_code = HTTPStatus.OK
42
+ # response.status_code = HTTPStatus.SERVICE_UNAVAILABLE
43
+ return {
44
+ "status": "ready",
45
+ "timestamp": datetime.now().isoformat(), # Sending the time also is a good practise
46
+ "version": "1.0.0",
47
+ }
api/routers/metrics.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
2
+
3
+ metrics_asgi_app = make_asgi_app()
4
+
5
+ active_cameras = Gauge(
6
+ "active_camera_connections",
7
+ "Number of Currently Connected camera websockets"
8
+ )
9
+
10
+ active_dashboards = Gauge(
11
+ "active_dashboards",
12
+ "Number of active dashboards which fetching data"
13
+ )
14
+
15
+ frame_processing_duration_seconds = Histogram(
16
+ "frame_processing_duration_seconds",
17
+ "Time to process one frame",
18
+ ["camera_id"]
19
+ )
20
+
contracts/__pycache__/camera_metadata.cpython-311.pyc ADDED
Binary file (725 Bytes). View file
 
contracts/camera_metadata.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ from typing import List
2
+ from pydantic import BaseModel
3
+
4
+
5
+ class CameraMetadata(BaseModel):
6
+ camera_id: str
7
+ is_danger: bool = False
8
+ depth_points: List = []
main.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI
2
+ # from prometheus_client import metrics
3
+ from ai.depth.depth_anything import DepthAnything
4
+ from ai.detectors.yolo_detector import YOLO_Detector
5
+ from app.config import AppConfig
6
+ from backend.api.routers.metrics import metrics_asgi_app
7
+ from infra.system_metrics import log_system_metrics
8
+ from backend.api.routers import camera_stream
9
+ from backend.api.routers import dashboard_stream
10
+ from backend.api.routers import health
11
+ from contextlib import asynccontextmanager
12
+ from infra.logger_structlog import StructLogger
13
+ import asyncio
14
+
15
+ @asynccontextmanager
16
+ async def lifespan(app: FastAPI):
17
+ """
18
+ This is on_event("startup") new alternative, Make sure you load models here.
19
+ """
20
+
21
+ settings = AppConfig()
22
+ logger = StructLogger(settings=settings)
23
+
24
+
25
+ logger.info("Starting Server.... ")
26
+ asyncio.create_task(log_system_metrics(
27
+ logger,
28
+ logger_interval_sec=settings.intervals.system_metrics_seconds))
29
+
30
+ # Using this way to can store data. it is acts as a dict which holds instances
31
+ app.state.detection_model = YOLO_Detector(settings.yolo.model_path)
32
+ app.state.safety_detection_model = YOLO_Detector(settings.security_detector.model_path)
33
+ app.state.depth_model = DepthAnything(encoder=settings.depth.encoder, depth_model_path=settings.depth.model_path, DEVICE="cuda")
34
+
35
+ app.state.logger = logger
36
+ app.state.settings = settings
37
+ # Each camera should have its tracker to be able to work fine.
38
+ # app.state.camera_trackers = {}
39
+ app.state.camera_metadata = {}
40
+ app.state.dashboard_clients = set()
41
+ yield
42
+
43
+ logger.warn("Shutting down the server....")
44
+ # You can remove connections and release gpu here .
45
+
46
+ app = FastAPI(
47
+ title="Tracking System Backend",
48
+ description="real-time frame processing API",
49
+ version="0.1.0",
50
+ lifespan=lifespan
51
+ )
52
+
53
+ # Routes
54
+ app.mount("/metrics", metrics_asgi_app) # Starting Prometheus server attached to my server.
55
+ app.include_router(camera_stream.router, prefix="/detectors")
56
+ app.include_router(dashboard_stream.router, prefix="/dashboard")
57
+ app.include_router(health.router, prefix="/health")
58
+
59
+ @app.get("/")
60
+ async def root():
61
+ return {"status": "Real-Time tracker backend is running..."}