HaramGuard / backend /agents /risk_agent.py
adeem6's picture
Update backend/agents/risk_agent.py (#2)
86c514c
"""
HaramGuard β€” RiskAgent
========================
AISA Layer : Cognitive Agent Layer
Design Pattern : Clip Segmentation + Sliding K-Window Density
A) Clip segmentation
- Boundary if |persons[t] - persons[t-1]| >= P_JUMP
OR |density_score[t] - density_score[t-1]| >= D_JUMP
- Glitch filter: boundary must persist >= MIN_LEN frames; otherwise merge back.
B) Sliding-window density (updates EVERY frame)
- Keep a deque of the last K=17 frames within the current clip.
- N_est = count of UNIQUE track IDs across those K frames (union).
- density_pct = N_est / N_REF * 100 (N_REF = RISK_HIGH_THRESHOLD from config, capped at 100%)
C) Risk score 0–1.0 based on density_pct
- risk_score = density_pct / 100 (0.0–1.0 for downstream compatibility)
- risk_level: density_pct <= 20 β†’ LOW, <= 80 β†’ MEDIUM, > 80 β†’ HIGH
Level stabilization: a new level must hold for STABLE_FRAMES consecutive
frames before it is confirmed and level_changed fires.
"""
import numpy as np
from collections import deque
from core.models import FrameResult, RiskResult
import config
class RiskAgent:
P_JUMP = getattr(config, 'CLIP_P_JUMP', 40) # person count jump
D_JUMP = getattr(config, 'CLIP_D_JUMP', 0.4) # density_score jump
MIN_LEN = getattr(config, 'CLIP_MIN_LEN', 10) # min frames to confirm clip
K_WINDOW = getattr(config, 'CLIP_K_WINDOW', 17) # frames for unique-ID union
N_REF = config.RISK_HIGH_THRESHOLD # density_pct = N_est / N_REF * 100
LOW_CEIL = 20.0 # density_pct <= 20 β†’ LOW
MED_CEIL = 80.0 # density_pct <= 80 β†’ MEDIUM, > 80 β†’ HIGH
STABLE_FRAMES = 5
EMA_ALPHA = config.RISK_EMA_ALPHA
def __init__(self):
self.name = 'RiskAgent'
self.aisa_layer = 'Cognitive Agent Layer'
self._clip_id = 0
self._boundary_buf = 0 # consecutive boundary-candidate frames
self._in_boundary = False # currently in a boundary glitch zone
self._prev_count = 0
self._prev_density_sc = 0.0
self._k_deque = deque(maxlen=self.K_WINDOW) # last K FrameResults in clip
self._n_est = 0
self._density_pct = 0.0
self._prev_level = 'LOW'
self._candidate = None
self._candidate_count = 0
self._peak_ema = 0.0
self._occ_ema = 0.0
self._window = deque(maxlen=config.RISK_WINDOW_SIZE)
print(
f'⚠️ [RiskAgent] Ready β€” Clip segmentation + sliding K-window({self.K_WINDOW}) '
f'(P_JUMP={self.P_JUMP}, D_JUMP={self.D_JUMP}, '
f'MIN_LEN={self.MIN_LEN}, LOW<={self.LOW_CEIL}%, MED<={self.MED_CEIL}%)'
)
def _label(self, density_pct: float) -> str:
if density_pct > self.MED_CEIL:
return 'HIGH'
if density_pct > self.LOW_CEIL:
return 'MEDIUM'
return 'LOW'
def _compute_trend(self) -> str:
if len(self._window) < 6:
return 'stable'
counts = [f.person_count for f in self._window]
half = len(counts) // 2
delta = np.mean(counts[half:]) - np.mean(counts[:half])
if delta > 3: return 'rising'
if delta < -3: return 'falling'
return 'stable'
def _is_boundary(self, fr: FrameResult) -> bool:
"""Check if this frame is a clip boundary candidate."""
p_jump = abs(fr.person_count - self._prev_count) >= self.P_JUMP
d_jump = abs(fr.density_score - self._prev_density_sc) >= self.D_JUMP
return p_jump or d_jump
def _reset_clip(self, frame_id: int, reason: str):
"""Reset all clip state for a new clip segment."""
self._clip_id += 1
self._k_deque.clear()
self._n_est = 0
self._density_pct = 0.0
self._prev_level = 'LOW'
self._candidate = None
self._candidate_count = 0
self._peak_ema = 0.0
self._occ_ema = 0.0
print(f'🎬 [RiskAgent] Clip {self._clip_id} started at frame {frame_id} ({reason})')
def _update_density(self):
"""
Recompute N_est and density_pct from the current sliding K-window.
Called every frame β€” the deque always holds the latest K frames.
"""
all_ids = set()
for f in self._k_deque:
for tid in f.track_ids:
all_ids.add(tid)
self._n_est = len(all_ids)
self._density_pct = round(min(self._n_est / self.N_REF * 100, 100.0), 1)
def process_frame(self, fr: FrameResult) -> RiskResult:
self._window.append(fr)
# ── A) Clip boundary detection with glitch filtering ─────
if fr.frame_id > 1 and self._is_boundary(fr):
self._boundary_buf += 1
if self._boundary_buf >= self.MIN_LEN:
# Confirmed boundary β€” start new clip
self._reset_clip(fr.frame_id, f'boundary held {self._boundary_buf} frames')
self._boundary_buf = 0
self._in_boundary = False
else:
self._in_boundary = True
else:
if self._in_boundary and self._boundary_buf < self.MIN_LEN:
# Glitch β€” boundary didn't persist, merge back
self._boundary_buf = 0
self._in_boundary = False
self._boundary_buf = 0
self._prev_count = fr.person_count
self._prev_density_sc = fr.density_score
self._k_deque.append(fr)
self._update_density()
density_pct = self._density_pct
risk_score = round(density_pct / 100.0, 4) # 0.0–1.0
raw_level = self._label(density_pct)
# Suppress level_changed until K-window is full (warmup period).
# Prevents false P0/P1 triggers in the first K frames of each clip.
k_ready = len(self._k_deque) >= self.K_WINDOW
if raw_level != self._prev_level:
if raw_level == self._candidate:
self._candidate_count += 1
else:
self._candidate = raw_level
self._candidate_count = 1
if k_ready and self._candidate_count >= self.STABLE_FRAMES:
risk_level = raw_level
level_changed = True
self._prev_level = raw_level
self._candidate = None
self._candidate_count = 0
print(
f'πŸ“Š [RiskAgent] Level confirmed: {risk_level} '
f'(density_pct={density_pct:.1f}%, N_est={self._n_est}, '
f'frame={fr.frame_id})'
)
else:
risk_level = self._prev_level
level_changed = False
else:
risk_level = self._prev_level
level_changed = False
self._candidate = None
self._candidate_count = 0
self._peak_ema = (
self.EMA_ALPHA * float(fr.person_count) +
(1.0 - self.EMA_ALPHA) * self._peak_ema
)
trend = self._compute_trend()
counts = [f.person_count for f in self._window]
return RiskResult(
frame_id = fr.frame_id,
timestamp = fr.timestamp,
risk_score = risk_score,
risk_level = risk_level,
trend = trend,
level_changed = level_changed,
window_avg = round(float(np.mean(counts)), 1),
window_max = int(max(counts)),
density_ema = round(self._peak_ema, 1),
density_pct = density_pct,
)