ChickSense / utils /video /stream_processor.py
IceKhoffi's picture
Initial commit for ChickSense Space
88f1cd0
import numpy as np
import time
from typing import Dict, List, Optional
import threading
import torch
from sklearn.cluster import DBSCAN
# -- Local Imports --
from config import APP_CONFIG, TUNING, INACTIVITY_CFG, DENSITY_DBSCAN_CFG
from .frame_reader import FrameReader
from .tracker import MOTTracker
class StreamProcessor:
def __init__(self,
video_url: str,
model,
device: str = "cpu",
half: bool = False,
infer_lock=None):
# -- Inisialisasi --
self.video_url = video_url
self.model = model
self.device = device
self.detection_interval = max(1, int(TUNING["DETECTION_INTERVAL_FRAMES"]))
self.imgsz = int(TUNING["YOLO_IMG_SIZE"])
self.half = bool(half and device == "cuda")
self._infer_lock = infer_lock
self.frame_reader = FrameReader(video_url)
self.tracker = MOTTracker(tracker_type="bytetrack", device=device)
self._running = False
self._thread = threading.Thread(target=self._run, daemon=True)
self._lock = threading.Lock()
self._latest_payload = None
self._frame_idx = 0
self._last_dets = None
self.id_state: Dict[int, Dict] = {}
def start(self):
if self._running:
return
self._running = True
self.frame_reader.start()
self._thread.start()
def stop(self):
self._running = False
self.frame_reader.stop()
self._thread.join()
def get_latest(self) -> Optional[Dict]:
with self._lock:
if self._latest_payload is None:
return None
return {
"frame" : self._latest_payload["frame"].copy(),
"tracks" : list(self._latest_payload["tracks"]),
"timestamp" : self._latest_payload["timestamp"],
"frame_idx" : self._latest_payload["frame_idx"],
"stats" : dict(self._latest_payload.get("stats", {}))
}
def _size_norm(self, box: List[float]) -> float:
x1, y1, x2, y2 = box
w = max(1.0, x2- x1)
h = max(1.0, y2 - y1)
return (w*w + h*h) ** 0.5
# -- Inactivity Logic --
def _update_inactivity(self, tracks: List[Dict], now: float):
current_ids = set(t["id"] for t in tracks)
for t in tracks:
tid = t["id"]
cx, cy = self._center(t["box"])
diag = self._size_norm(t["box"])
st = self.id_state.get(tid)
if st is None:
# please refer to "config.py" for each definition
st = {"pos": (cx, cy), "t": now, "ema_v": 0.0, "inactive": False, "since": None, "last_seen": now}
self.id_state[tid] = st
t["inactive"] = False
continue
dt = max(1e-3, now - st["t"])
dx = cx - st["pos"][0]
dy = cy - st["pos"][1]
v_norm = ((dx*dx + dy * dy)**0.5 / dt) / max(1.0, diag)
alpha = INACTIVITY_CFG["EMA_ALPHA"]
ema_v = alpha * v_norm + (1.0 - alpha) * st["ema_v"]
entry = INACTIVITY_CFG["ENTER_THRESH_NORM_SPEED"]
exit_ = INACTIVITY_CFG["EXIT_THRESH_NORM_SPEED"]
dwell = INACTIVITY_CFG["MIN_DURATION_S"]
if st["inactive"]:
if ema_v > exit_:
st["since"] = st.get("since") or now
if (now - st["since"]) >= dwell:
st["inactive"] = False
st['since'] = None
else:
st["since"] = None
else:
if ema_v < entry:
st["since"] = st.get("since") or now
if (now - st["since"]) >= dwell:
st["inactive"] = True
st["since"] = None
else:
st["since"] = None
st.update(pos = (cx, cy), t = now, ema_v = ema_v, last_seen = now)
t["inactive"] = st["inactive"]
# ensure old unseen ID removed
stale = [
tid for tid, st in list(self.id_state.items())
if tid not in current_ids and (now - st.get("last_seen", now)) > INACTIVITY_CFG["MAX_UNSEEN_GAP_S"]
]
for tid in stale:
self.id_state.pop(tid, None)
def _center(self, box: List[float]) -> tuple[float, float]:
x1, y1, x2, y2 = box
return ((x1 + x2) * 0.5, (y1 + y2) * 0.5)
# -- Density Logic --
def _compute_density_dbscan(self, tracks: List[Dict]) -> set:
if not tracks:
return set(), 0
centers = np.array([self._center(t["box"]) for t in tracks], dtype=np.float32)
# please refer to "config.py" for each definition
min_samples = max(1, DENSITY_DBSCAN_CFG["MIN_NEIGHBORS"] + 1)
labels = DBSCAN(eps=DENSITY_DBSCAN_CFG["EPS_PX"], min_samples=min_samples).fit_predict(centers)
# below to count cluster that happend ( -1 is noise )
n_clusters = int(len(set(lbl for lbl in labels if lbl != -1)))
dense_ids = {t["id"] for t, lbl in zip(tracks, labels) if lbl != -1}
return dense_ids, n_clusters
# return {t["id"] for t, lbl in zip(tracks, labels) if lbl != -1}
def _run(self):
""" Detect Object from The Frame and add its metadata (Inactivity / Density) """
while self._running:
frame = self.frame_reader.read()
if frame is None:
time.sleep(0.01)
continue
self._frame_idx += 1
# ensure frame to detect only on the interval
if self._frame_idx % self.detection_interval == 1:
try:
with torch.no_grad():
res = self.model.predict(
frame,
imgsz = self.imgsz,
device = self.device,
half = self.half,
verbose = False
)[0]
boxes = res.boxes
if boxes is not None and len(boxes) > 0:
dets = np.concatenate([
boxes.xyxy.cpu().numpy(),
boxes.conf.cpu().numpy()[:, None],
boxes.cls.cpu().numpy()[:, None]
], axis = 1).astype("float32")
else:
dets = np.empty((0, 6), dtype="float32")
self._last_dets = dets
except Exception as e:
print(f"[StreamProcessor] detection error: {e}")
self._last_dets = None
dets = self._last_dets # use last know tracking to ensure trackig is keept
# Update tracker
try:
tracks = self.tracker.update(dets, frame)
except Exception as e:
print(f"[StreamProcessor] tracking error: {e}")
tracks = []
# Update metadata
now = time.time()
self._update_inactivity(tracks, now)
dense_ids, n_clusters = self._compute_density_dbscan(tracks)
for t in tracks:
t["dense"] = (t["id"] in dense_ids)
stats = {
"detected": len(tracks),
"inactive": sum(1 for t in tracks if t.get("inactive")),
"dense_clusters": n_clusters
}
# Store latest result
with self._lock:
self._latest_payload = {
"frame" : frame,
"tracks" : tracks,
"timestamp" : now,
"frame_idx" : self._frame_idx,
"stats" : stats
}
class StreamRegistry:
def __init__(self):
self._by_url: Dict[str, StreamProcessor] = {}
self._ref_count: Dict[str, int] = {}
self._lock = threading.Lock()
def get(self, url: str, model, device="cpu", half=False) -> StreamProcessor:
""" Gather and made new StreamProcessor from each Video URL"""
with self._lock:
sp = self._by_url.get(url)
if sp is None:
print(f"[Registry] Creating new stream processor for: {url}")
sp = StreamProcessor(url, model=model, device=device, half=half)
sp.start()
self._by_url[url] = sp
self._ref_count[url] = 0
self._ref_count[url] += 1
print(f"[Registry] URL {url} ref count is now {self._ref_count[url]}")
return sp
def release(self, url: str):
""" Ensure when stream video stop, threading is stop too """
with self._lock:
if url in self._by_url:
self._ref_count[url] -= 1
print(f"[Registry] URL {url} ref count is now {self._ref_count[url]}")
if self._ref_count[url] <= 0:
print(f"[Registry] Stopping and removing processor for {url}")
try:
self._by_url[url].stop()
finally:
self._by_url.pop(url, None)
self._ref_count.pop(url, None)