Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import time | |
| from typing import Dict, List, Optional | |
| import threading | |
| import torch | |
| from sklearn.cluster import DBSCAN | |
| # -- Local Imports -- | |
| from config import APP_CONFIG, TUNING, INACTIVITY_CFG, DENSITY_DBSCAN_CFG | |
| from .frame_reader import FrameReader | |
| from .tracker import MOTTracker | |
| class StreamProcessor: | |
| def __init__(self, | |
| video_url: str, | |
| model, | |
| device: str = "cpu", | |
| half: bool = False, | |
| infer_lock=None): | |
| # -- Inisialisasi -- | |
| self.video_url = video_url | |
| self.model = model | |
| self.device = device | |
| self.detection_interval = max(1, int(TUNING["DETECTION_INTERVAL_FRAMES"])) | |
| self.imgsz = int(TUNING["YOLO_IMG_SIZE"]) | |
| self.half = bool(half and device == "cuda") | |
| self._infer_lock = infer_lock | |
| self.frame_reader = FrameReader(video_url) | |
| self.tracker = MOTTracker(tracker_type="bytetrack", device=device) | |
| self._running = False | |
| self._thread = threading.Thread(target=self._run, daemon=True) | |
| self._lock = threading.Lock() | |
| self._latest_payload = None | |
| self._frame_idx = 0 | |
| self._last_dets = None | |
| self.id_state: Dict[int, Dict] = {} | |
| def start(self): | |
| if self._running: | |
| return | |
| self._running = True | |
| self.frame_reader.start() | |
| self._thread.start() | |
| def stop(self): | |
| self._running = False | |
| self.frame_reader.stop() | |
| self._thread.join() | |
| def get_latest(self) -> Optional[Dict]: | |
| with self._lock: | |
| if self._latest_payload is None: | |
| return None | |
| return { | |
| "frame" : self._latest_payload["frame"].copy(), | |
| "tracks" : list(self._latest_payload["tracks"]), | |
| "timestamp" : self._latest_payload["timestamp"], | |
| "frame_idx" : self._latest_payload["frame_idx"], | |
| "stats" : dict(self._latest_payload.get("stats", {})) | |
| } | |
| def _size_norm(self, box: List[float]) -> float: | |
| x1, y1, x2, y2 = box | |
| w = max(1.0, x2- x1) | |
| h = max(1.0, y2 - y1) | |
| return (w*w + h*h) ** 0.5 | |
| # -- Inactivity Logic -- | |
| def _update_inactivity(self, tracks: List[Dict], now: float): | |
| current_ids = set(t["id"] for t in tracks) | |
| for t in tracks: | |
| tid = t["id"] | |
| cx, cy = self._center(t["box"]) | |
| diag = self._size_norm(t["box"]) | |
| st = self.id_state.get(tid) | |
| if st is None: | |
| # please refer to "config.py" for each definition | |
| st = {"pos": (cx, cy), "t": now, "ema_v": 0.0, "inactive": False, "since": None, "last_seen": now} | |
| self.id_state[tid] = st | |
| t["inactive"] = False | |
| continue | |
| dt = max(1e-3, now - st["t"]) | |
| dx = cx - st["pos"][0] | |
| dy = cy - st["pos"][1] | |
| v_norm = ((dx*dx + dy * dy)**0.5 / dt) / max(1.0, diag) | |
| alpha = INACTIVITY_CFG["EMA_ALPHA"] | |
| ema_v = alpha * v_norm + (1.0 - alpha) * st["ema_v"] | |
| entry = INACTIVITY_CFG["ENTER_THRESH_NORM_SPEED"] | |
| exit_ = INACTIVITY_CFG["EXIT_THRESH_NORM_SPEED"] | |
| dwell = INACTIVITY_CFG["MIN_DURATION_S"] | |
| if st["inactive"]: | |
| if ema_v > exit_: | |
| st["since"] = st.get("since") or now | |
| if (now - st["since"]) >= dwell: | |
| st["inactive"] = False | |
| st['since'] = None | |
| else: | |
| st["since"] = None | |
| else: | |
| if ema_v < entry: | |
| st["since"] = st.get("since") or now | |
| if (now - st["since"]) >= dwell: | |
| st["inactive"] = True | |
| st["since"] = None | |
| else: | |
| st["since"] = None | |
| st.update(pos = (cx, cy), t = now, ema_v = ema_v, last_seen = now) | |
| t["inactive"] = st["inactive"] | |
| # ensure old unseen ID removed | |
| stale = [ | |
| tid for tid, st in list(self.id_state.items()) | |
| if tid not in current_ids and (now - st.get("last_seen", now)) > INACTIVITY_CFG["MAX_UNSEEN_GAP_S"] | |
| ] | |
| for tid in stale: | |
| self.id_state.pop(tid, None) | |
| def _center(self, box: List[float]) -> tuple[float, float]: | |
| x1, y1, x2, y2 = box | |
| return ((x1 + x2) * 0.5, (y1 + y2) * 0.5) | |
| # -- Density Logic -- | |
| def _compute_density_dbscan(self, tracks: List[Dict]) -> set: | |
| if not tracks: | |
| return set(), 0 | |
| centers = np.array([self._center(t["box"]) for t in tracks], dtype=np.float32) | |
| # please refer to "config.py" for each definition | |
| min_samples = max(1, DENSITY_DBSCAN_CFG["MIN_NEIGHBORS"] + 1) | |
| labels = DBSCAN(eps=DENSITY_DBSCAN_CFG["EPS_PX"], min_samples=min_samples).fit_predict(centers) | |
| # below to count cluster that happend ( -1 is noise ) | |
| n_clusters = int(len(set(lbl for lbl in labels if lbl != -1))) | |
| dense_ids = {t["id"] for t, lbl in zip(tracks, labels) if lbl != -1} | |
| return dense_ids, n_clusters | |
| # return {t["id"] for t, lbl in zip(tracks, labels) if lbl != -1} | |
| def _run(self): | |
| """ Detect Object from The Frame and add its metadata (Inactivity / Density) """ | |
| while self._running: | |
| frame = self.frame_reader.read() | |
| if frame is None: | |
| time.sleep(0.01) | |
| continue | |
| self._frame_idx += 1 | |
| # ensure frame to detect only on the interval | |
| if self._frame_idx % self.detection_interval == 1: | |
| try: | |
| with torch.no_grad(): | |
| res = self.model.predict( | |
| frame, | |
| imgsz = self.imgsz, | |
| device = self.device, | |
| half = self.half, | |
| verbose = False | |
| )[0] | |
| boxes = res.boxes | |
| if boxes is not None and len(boxes) > 0: | |
| dets = np.concatenate([ | |
| boxes.xyxy.cpu().numpy(), | |
| boxes.conf.cpu().numpy()[:, None], | |
| boxes.cls.cpu().numpy()[:, None] | |
| ], axis = 1).astype("float32") | |
| else: | |
| dets = np.empty((0, 6), dtype="float32") | |
| self._last_dets = dets | |
| except Exception as e: | |
| print(f"[StreamProcessor] detection error: {e}") | |
| self._last_dets = None | |
| dets = self._last_dets # use last know tracking to ensure trackig is keept | |
| # Update tracker | |
| try: | |
| tracks = self.tracker.update(dets, frame) | |
| except Exception as e: | |
| print(f"[StreamProcessor] tracking error: {e}") | |
| tracks = [] | |
| # Update metadata | |
| now = time.time() | |
| self._update_inactivity(tracks, now) | |
| dense_ids, n_clusters = self._compute_density_dbscan(tracks) | |
| for t in tracks: | |
| t["dense"] = (t["id"] in dense_ids) | |
| stats = { | |
| "detected": len(tracks), | |
| "inactive": sum(1 for t in tracks if t.get("inactive")), | |
| "dense_clusters": n_clusters | |
| } | |
| # Store latest result | |
| with self._lock: | |
| self._latest_payload = { | |
| "frame" : frame, | |
| "tracks" : tracks, | |
| "timestamp" : now, | |
| "frame_idx" : self._frame_idx, | |
| "stats" : stats | |
| } | |
| class StreamRegistry: | |
| def __init__(self): | |
| self._by_url: Dict[str, StreamProcessor] = {} | |
| self._ref_count: Dict[str, int] = {} | |
| self._lock = threading.Lock() | |
| def get(self, url: str, model, device="cpu", half=False) -> StreamProcessor: | |
| """ Gather and made new StreamProcessor from each Video URL""" | |
| with self._lock: | |
| sp = self._by_url.get(url) | |
| if sp is None: | |
| print(f"[Registry] Creating new stream processor for: {url}") | |
| sp = StreamProcessor(url, model=model, device=device, half=half) | |
| sp.start() | |
| self._by_url[url] = sp | |
| self._ref_count[url] = 0 | |
| self._ref_count[url] += 1 | |
| print(f"[Registry] URL {url} ref count is now {self._ref_count[url]}") | |
| return sp | |
| def release(self, url: str): | |
| """ Ensure when stream video stop, threading is stop too """ | |
| with self._lock: | |
| if url in self._by_url: | |
| self._ref_count[url] -= 1 | |
| print(f"[Registry] URL {url} ref count is now {self._ref_count[url]}") | |
| if self._ref_count[url] <= 0: | |
| print(f"[Registry] Stopping and removing processor for {url}") | |
| try: | |
| self._by_url[url].stop() | |
| finally: | |
| self._by_url.pop(url, None) | |
| self._ref_count.pop(url, None) |