Spaces:
Running
Running
| """ | |
| HaramGuard β RiskAgent | |
| ======================== | |
| AISA Layer : Cognitive Agent Layer | |
| Design Pattern : Clip Segmentation + Sliding K-Window Density | |
| A) Clip segmentation | |
| - Boundary if |persons[t] - persons[t-1]| >= P_JUMP | |
| OR |density_score[t] - density_score[t-1]| >= D_JUMP | |
| - Glitch filter: boundary must persist >= MIN_LEN frames; otherwise merge back. | |
| B) Sliding-window density (updates EVERY frame) | |
| - Keep a deque of the last K=17 frames within the current clip. | |
| - N_est = count of UNIQUE track IDs across those K frames (union). | |
| - density_pct = N_est / N_REF * 100 (N_REF = RISK_HIGH_THRESHOLD from config, capped at 100%) | |
| C) Risk score 0β1.0 based on density_pct | |
| - risk_score = density_pct / 100 (0.0β1.0 for downstream compatibility) | |
| - risk_level: density_pct <= 20 β LOW, <= 80 β MEDIUM, > 80 β HIGH | |
| Level stabilization: a new level must hold for STABLE_FRAMES consecutive | |
| frames before it is confirmed and level_changed fires. | |
| """ | |
| import numpy as np | |
| from collections import deque | |
| from core.models import FrameResult, RiskResult | |
| import config | |
| class RiskAgent: | |
| P_JUMP = getattr(config, 'CLIP_P_JUMP', 40) # person count jump | |
| D_JUMP = getattr(config, 'CLIP_D_JUMP', 0.4) # density_score jump | |
| MIN_LEN = getattr(config, 'CLIP_MIN_LEN', 10) # min frames to confirm clip | |
| K_WINDOW = getattr(config, 'CLIP_K_WINDOW', 17) # frames for unique-ID union | |
| N_REF = config.RISK_HIGH_THRESHOLD # density_pct = N_est / N_REF * 100 | |
| LOW_CEIL = 20.0 # density_pct <= 20 β LOW | |
| MED_CEIL = 80.0 # density_pct <= 80 β MEDIUM, > 80 β HIGH | |
| STABLE_FRAMES = 5 | |
| EMA_ALPHA = config.RISK_EMA_ALPHA | |
| def __init__(self): | |
| self.name = 'RiskAgent' | |
| self.aisa_layer = 'Cognitive Agent Layer' | |
| self._clip_id = 0 | |
| self._boundary_buf = 0 # consecutive boundary-candidate frames | |
| self._in_boundary = False # currently in a boundary glitch zone | |
| self._prev_count = 0 | |
| self._prev_density_sc = 0.0 | |
| self._k_deque = deque(maxlen=self.K_WINDOW) # last K FrameResults in clip | |
| self._n_est = 0 | |
| self._density_pct = 0.0 | |
| self._prev_level = 'LOW' | |
| self._candidate = None | |
| self._candidate_count = 0 | |
| self._peak_ema = 0.0 | |
| self._occ_ema = 0.0 | |
| self._window = deque(maxlen=config.RISK_WINDOW_SIZE) | |
| print( | |
| f'β οΈ [RiskAgent] Ready β Clip segmentation + sliding K-window({self.K_WINDOW}) ' | |
| f'(P_JUMP={self.P_JUMP}, D_JUMP={self.D_JUMP}, ' | |
| f'MIN_LEN={self.MIN_LEN}, LOW<={self.LOW_CEIL}%, MED<={self.MED_CEIL}%)' | |
| ) | |
| def _label(self, density_pct: float) -> str: | |
| if density_pct > self.MED_CEIL: | |
| return 'HIGH' | |
| if density_pct > self.LOW_CEIL: | |
| return 'MEDIUM' | |
| return 'LOW' | |
| def _compute_trend(self) -> str: | |
| if len(self._window) < 6: | |
| return 'stable' | |
| counts = [f.person_count for f in self._window] | |
| half = len(counts) // 2 | |
| delta = np.mean(counts[half:]) - np.mean(counts[:half]) | |
| if delta > 3: return 'rising' | |
| if delta < -3: return 'falling' | |
| return 'stable' | |
| def _is_boundary(self, fr: FrameResult) -> bool: | |
| """Check if this frame is a clip boundary candidate.""" | |
| p_jump = abs(fr.person_count - self._prev_count) >= self.P_JUMP | |
| d_jump = abs(fr.density_score - self._prev_density_sc) >= self.D_JUMP | |
| return p_jump or d_jump | |
| def _reset_clip(self, frame_id: int, reason: str): | |
| """Reset all clip state for a new clip segment.""" | |
| self._clip_id += 1 | |
| self._k_deque.clear() | |
| self._n_est = 0 | |
| self._density_pct = 0.0 | |
| self._prev_level = 'LOW' | |
| self._candidate = None | |
| self._candidate_count = 0 | |
| self._peak_ema = 0.0 | |
| self._occ_ema = 0.0 | |
| print(f'π¬ [RiskAgent] Clip {self._clip_id} started at frame {frame_id} ({reason})') | |
| def _update_density(self): | |
| """ | |
| Recompute N_est and density_pct from the current sliding K-window. | |
| Called every frame β the deque always holds the latest K frames. | |
| """ | |
| all_ids = set() | |
| for f in self._k_deque: | |
| for tid in f.track_ids: | |
| all_ids.add(tid) | |
| self._n_est = len(all_ids) | |
| self._density_pct = round(min(self._n_est / self.N_REF * 100, 100.0), 1) | |
| def process_frame(self, fr: FrameResult) -> RiskResult: | |
| self._window.append(fr) | |
| # ββ A) Clip boundary detection with glitch filtering βββββ | |
| if fr.frame_id > 1 and self._is_boundary(fr): | |
| self._boundary_buf += 1 | |
| if self._boundary_buf >= self.MIN_LEN: | |
| # Confirmed boundary β start new clip | |
| self._reset_clip(fr.frame_id, f'boundary held {self._boundary_buf} frames') | |
| self._boundary_buf = 0 | |
| self._in_boundary = False | |
| else: | |
| self._in_boundary = True | |
| else: | |
| if self._in_boundary and self._boundary_buf < self.MIN_LEN: | |
| # Glitch β boundary didn't persist, merge back | |
| self._boundary_buf = 0 | |
| self._in_boundary = False | |
| self._boundary_buf = 0 | |
| self._prev_count = fr.person_count | |
| self._prev_density_sc = fr.density_score | |
| self._k_deque.append(fr) | |
| self._update_density() | |
| density_pct = self._density_pct | |
| risk_score = round(density_pct / 100.0, 4) # 0.0β1.0 | |
| raw_level = self._label(density_pct) | |
| # Suppress level_changed until K-window is full (warmup period). | |
| # Prevents false P0/P1 triggers in the first K frames of each clip. | |
| k_ready = len(self._k_deque) >= self.K_WINDOW | |
| if raw_level != self._prev_level: | |
| if raw_level == self._candidate: | |
| self._candidate_count += 1 | |
| else: | |
| self._candidate = raw_level | |
| self._candidate_count = 1 | |
| if k_ready and self._candidate_count >= self.STABLE_FRAMES: | |
| risk_level = raw_level | |
| level_changed = True | |
| self._prev_level = raw_level | |
| self._candidate = None | |
| self._candidate_count = 0 | |
| print( | |
| f'π [RiskAgent] Level confirmed: {risk_level} ' | |
| f'(density_pct={density_pct:.1f}%, N_est={self._n_est}, ' | |
| f'frame={fr.frame_id})' | |
| ) | |
| else: | |
| risk_level = self._prev_level | |
| level_changed = False | |
| else: | |
| risk_level = self._prev_level | |
| level_changed = False | |
| self._candidate = None | |
| self._candidate_count = 0 | |
| self._peak_ema = ( | |
| self.EMA_ALPHA * float(fr.person_count) + | |
| (1.0 - self.EMA_ALPHA) * self._peak_ema | |
| ) | |
| trend = self._compute_trend() | |
| counts = [f.person_count for f in self._window] | |
| return RiskResult( | |
| frame_id = fr.frame_id, | |
| timestamp = fr.timestamp, | |
| risk_score = risk_score, | |
| risk_level = risk_level, | |
| trend = trend, | |
| level_changed = level_changed, | |
| window_avg = round(float(np.mean(counts)), 1), | |
| window_max = int(max(counts)), | |
| density_ema = round(self._peak_ema, 1), | |
| density_pct = density_pct, | |
| ) | |