Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI | |
| # from prometheus_client import metrics | |
| from ai.depth.depth_anything import DepthAnything | |
| from ai.detectors.yolo_detector import YOLO_Detector | |
| from app.config import AppConfig | |
| from backend.api.routers.metrics import metrics_asgi_app | |
| from infra.system_metrics import log_system_metrics | |
| from backend.api.routers import camera_stream | |
| from backend.api.routers import dashboard_stream | |
| from backend.api.routers import health | |
| from contextlib import asynccontextmanager | |
| from infra.logger_structlog import StructLogger | |
| import asyncio | |
| async def lifespan(app: FastAPI): | |
| """ | |
| This is on_event("startup") new alternative, Make sure you load models here. | |
| """ | |
| settings = AppConfig() | |
| logger = StructLogger(settings=settings) | |
| logger.info("Starting Server.... ") | |
| asyncio.create_task(log_system_metrics( | |
| logger, | |
| logger_interval_sec=settings.intervals.system_metrics_seconds)) | |
| # Using this way to can store data. it is acts as a dict which holds instances | |
| app.state.detection_model = YOLO_Detector(settings.yolo.model_path) | |
| app.state.depth_model = DepthAnything(encoder=settings.depth.encoder, depth_model_path=settings.depth.model_path, DEVICE="cuda") | |
| # safety_detection_path = hg_hub_download( | |
| # repo_id="e1250/safety_detection", | |
| # filename="yolo_smoke_fire.pt", | |
| # ) | |
| app.state.safety_detection_model = YOLO_Detector(settings.security_detector.model_path) | |
| app.state.logger = logger | |
| app.state.settings = settings | |
| # Each camera should have its tracker to be able to work fine. | |
| # app.state.camera_trackers = {} | |
| app.state.camera_metadata = {} | |
| app.state.dashboard_clients = set() | |
| yield | |
| logger.warn("Shutting down the server....") | |
| # You can remove connections and release gpu here . | |
| app = FastAPI( | |
| title="Tracking System Backend", | |
| description="real-time frame processing API", | |
| version="0.1.0", | |
| lifespan=lifespan | |
| ) | |
| # Routes | |
| app.mount("/metrics", metrics_asgi_app) # Starting Prometheus server attached to my server. | |
| app.include_router(camera_stream.router, prefix="/detectors") | |
| app.include_router(dashboard_stream.router, prefix="/dashboard") | |
| app.include_router(health.router, prefix="/health") | |
| async def root(): | |
| return {"status": "Real-Time tracker backend is running..."} |