""" HaramGuard — RiskAgent ======================== AISA Layer : Cognitive Agent Layer Design Pattern : Clip Segmentation + Sliding K-Window Density A) Clip segmentation - Boundary if |persons[t] - persons[t-1]| >= P_JUMP OR |density_score[t] - density_score[t-1]| >= D_JUMP - Glitch filter: boundary must persist >= MIN_LEN frames; otherwise merge back. B) Sliding-window density (updates EVERY frame) - Keep a deque of the last K=17 frames within the current clip. - N_est = count of UNIQUE track IDs across those K frames (union). - density_pct = N_est / N_REF * 100 (N_REF = RISK_HIGH_THRESHOLD from config, capped at 100%) C) Risk score 0–1.0 based on density_pct - risk_score = density_pct / 100 (0.0–1.0 for downstream compatibility) - risk_level: density_pct <= 20 → LOW, <= 80 → MEDIUM, > 80 → HIGH Level stabilization: a new level must hold for STABLE_FRAMES consecutive frames before it is confirmed and level_changed fires. """ import numpy as np from collections import deque from core.models import FrameResult, RiskResult import config class RiskAgent: P_JUMP = getattr(config, 'CLIP_P_JUMP', 40) # person count jump D_JUMP = getattr(config, 'CLIP_D_JUMP', 0.4) # density_score jump MIN_LEN = getattr(config, 'CLIP_MIN_LEN', 10) # min frames to confirm clip K_WINDOW = getattr(config, 'CLIP_K_WINDOW', 17) # frames for unique-ID union N_REF = config.RISK_HIGH_THRESHOLD # density_pct = N_est / N_REF * 100 LOW_CEIL = 20.0 # density_pct <= 20 → LOW MED_CEIL = 80.0 # density_pct <= 80 → MEDIUM, > 80 → HIGH STABLE_FRAMES = 5 EMA_ALPHA = config.RISK_EMA_ALPHA def __init__(self): self.name = 'RiskAgent' self.aisa_layer = 'Cognitive Agent Layer' self._clip_id = 0 self._boundary_buf = 0 # consecutive boundary-candidate frames self._in_boundary = False # currently in a boundary glitch zone self._prev_count = 0 self._prev_density_sc = 0.0 self._k_deque = deque(maxlen=self.K_WINDOW) # last K FrameResults in clip self._n_est = 0 self._density_pct = 0.0 self._prev_level = 'LOW' self._candidate = None self._candidate_count = 0 self._peak_ema = 0.0 self._occ_ema = 0.0 self._window = deque(maxlen=config.RISK_WINDOW_SIZE) print( f'⚠️ [RiskAgent] Ready — Clip segmentation + sliding K-window({self.K_WINDOW}) ' f'(P_JUMP={self.P_JUMP}, D_JUMP={self.D_JUMP}, ' f'MIN_LEN={self.MIN_LEN}, LOW<={self.LOW_CEIL}%, MED<={self.MED_CEIL}%)' ) def _label(self, density_pct: float) -> str: if density_pct > self.MED_CEIL: return 'HIGH' if density_pct > self.LOW_CEIL: return 'MEDIUM' return 'LOW' def _compute_trend(self) -> str: if len(self._window) < 6: return 'stable' counts = [f.person_count for f in self._window] half = len(counts) // 2 delta = np.mean(counts[half:]) - np.mean(counts[:half]) if delta > 3: return 'rising' if delta < -3: return 'falling' return 'stable' def _is_boundary(self, fr: FrameResult) -> bool: """Check if this frame is a clip boundary candidate.""" p_jump = abs(fr.person_count - self._prev_count) >= self.P_JUMP d_jump = abs(fr.density_score - self._prev_density_sc) >= self.D_JUMP return p_jump or d_jump def _reset_clip(self, frame_id: int, reason: str): """Reset all clip state for a new clip segment.""" self._clip_id += 1 self._k_deque.clear() self._n_est = 0 self._density_pct = 0.0 self._prev_level = 'LOW' self._candidate = None self._candidate_count = 0 self._peak_ema = 0.0 self._occ_ema = 0.0 print(f'🎬 [RiskAgent] Clip {self._clip_id} started at frame {frame_id} ({reason})') def _update_density(self): """ Recompute N_est and density_pct from the current sliding K-window. Called every frame — the deque always holds the latest K frames. """ all_ids = set() for f in self._k_deque: for tid in f.track_ids: all_ids.add(tid) self._n_est = len(all_ids) self._density_pct = round(min(self._n_est / self.N_REF * 100, 100.0), 1) def process_frame(self, fr: FrameResult) -> RiskResult: self._window.append(fr) # ── A) Clip boundary detection with glitch filtering ───── if fr.frame_id > 1 and self._is_boundary(fr): self._boundary_buf += 1 if self._boundary_buf >= self.MIN_LEN: # Confirmed boundary — start new clip self._reset_clip(fr.frame_id, f'boundary held {self._boundary_buf} frames') self._boundary_buf = 0 self._in_boundary = False else: self._in_boundary = True else: if self._in_boundary and self._boundary_buf < self.MIN_LEN: # Glitch — boundary didn't persist, merge back self._boundary_buf = 0 self._in_boundary = False self._boundary_buf = 0 self._prev_count = fr.person_count self._prev_density_sc = fr.density_score self._k_deque.append(fr) self._update_density() density_pct = self._density_pct risk_score = round(density_pct / 100.0, 4) # 0.0–1.0 raw_level = self._label(density_pct) # Suppress level_changed until K-window is full (warmup period). # Prevents false P0/P1 triggers in the first K frames of each clip. k_ready = len(self._k_deque) >= self.K_WINDOW if raw_level != self._prev_level: if raw_level == self._candidate: self._candidate_count += 1 else: self._candidate = raw_level self._candidate_count = 1 if k_ready and self._candidate_count >= self.STABLE_FRAMES: risk_level = raw_level level_changed = True self._prev_level = raw_level self._candidate = None self._candidate_count = 0 print( f'📊 [RiskAgent] Level confirmed: {risk_level} ' f'(density_pct={density_pct:.1f}%, N_est={self._n_est}, ' f'frame={fr.frame_id})' ) else: risk_level = self._prev_level level_changed = False else: risk_level = self._prev_level level_changed = False self._candidate = None self._candidate_count = 0 self._peak_ema = ( self.EMA_ALPHA * float(fr.person_count) + (1.0 - self.EMA_ALPHA) * self._peak_ema ) trend = self._compute_trend() counts = [f.person_count for f in self._window] return RiskResult( frame_id = fr.frame_id, timestamp = fr.timestamp, risk_score = risk_score, risk_level = risk_level, trend = trend, level_changed = level_changed, window_avg = round(float(np.mean(counts)), 1), window_max = int(max(counts)), density_ema = round(self._peak_ema, 1), density_pct = density_pct, )