""" HaramGuard — ReflectionAgent ============================== AISA Layer : Reasoning + Governance Design Pattern : Reflection (Self-Critique & Correction) Responsibilities: - Observe every RiskResult produced by RiskAgent - Critique the assessment for four systematic biases: 1. Chronic LOW under-reporting (20+ consecutive LOW with large crowd) 2. Rising trend ignored (trend=rising but risk=LOW with n>20 persons) 3. Count-risk mismatch (80+ persons but risk=LOW) 4. Over-estimation (HIGH risk but only <15 persons detected) - Correct: override risk_level and risk_score when bias is found - Log: persist every reflection to DB for evaluation / auditability Flow per frame: Observe → Critique → Correct → Log """ import numpy as np from collections import deque from core.models import FrameResult, RiskResult import config class ReflectionAgent: BIAS_WINDOW = config.REFLECTION_BIAS_WINDOW # 10 CROWD_LOW_THRESH = config.REFLECTION_CROWD_LOW_THRESH # 15 HIGH_CROWD_THRESH = config.REFLECTION_HIGH_CROWD_THRESH # 50 CRITICAL_CROWD_THRESH = 70 # 70+ persons always HIGH (single-camera calibration) def __init__(self): self.name = 'ReflectionAgent' self.aisa_layer = 'Reasoning (Reflection)' self._history = deque(maxlen=self.BIAS_WINDOW) self._reflection_log = [] print('🪞 [ReflectionAgent] Ready — 4 bias detectors active') def reflect(self, rr: RiskResult, fr: FrameResult) -> dict: """ Four-step reflection loop: Step 1 — Observe : record latest assessment in history window Step 2 — Critique : check for each known bias pattern Step 3 — Correct : compute corrected_level / corrected_score Step 4 — Log : append to internal log (also saved to DB by pipeline) Returns reflection dict — pipeline applies corrections to rr before passing it downstream to OperationsAgent. """ self._history.append({ 'risk_level': rr.risk_level, 'risk_score': rr.risk_score, 'person_count': fr.person_count, 'trend': rr.trend, }) critique = [] bias_detected = False corrected_level = rr.risk_level corrected_score = rr.risk_score # Bias 1: Chronic LOW under-reporting if len(self._history) >= self.BIAS_WINDOW: all_low = all(h['risk_level'] == 'LOW' for h in self._history) avg_crowd = np.mean([h['person_count'] for h in self._history]) if all_low and avg_crowd > self.CROWD_LOW_THRESH: bias_detected = True corrected_level = 'MEDIUM' corrected_score = max(rr.risk_score, 0.30) # push into MEDIUM zone (>0.20) critique.append( f'CHRONIC_LOW_BIAS: {self.BIAS_WINDOW} consecutive LOW ' f'with avg crowd={avg_crowd:.0f} persons. Upgraded to MEDIUM.' ) # Bias 2: Rising trend ignored if rr.trend == 'rising' and rr.risk_level == 'LOW' and fr.person_count > 20: bias_detected = True corrected_level = 'MEDIUM' corrected_score = max(corrected_score, 0.25) # push into MEDIUM zone (>0.20) critique.append( f'RISING_TREND_IGNORED: trend=rising, persons={fr.person_count}, ' f'but risk=LOW. Upgraded to MEDIUM.' ) # Bias 3: Count-risk mismatch (enhanced with critical threshold) if fr.person_count >= self.CRITICAL_CROWD_THRESH and rr.risk_level == 'LOW': # 70+ persons with LOW risk → CRITICAL inconsistency → upgrade to HIGH bias_detected = True corrected_level = 'HIGH' corrected_score = max(corrected_score, 0.85) # push into HIGH zone (>0.80) critique.append( f'CRITICAL_COUNT_RISK_MISMATCH: {fr.person_count} persons but risk=LOW. ' f'This is a severe inconsistency. ' f'Upgraded to HIGH (corrected_score={corrected_score:.3f}).' ) elif fr.person_count > self.HIGH_CROWD_THRESH and rr.risk_level == 'LOW': # 50+ persons with LOW risk → upgrade to MEDIUM bias_detected = True corrected_level = 'MEDIUM' corrected_score = max(corrected_score, 0.30) # push into MEDIUM zone (>0.20) critique.append( f'COUNT_RISK_MISMATCH: {fr.person_count} persons but risk=LOW. ' f'Thresholds may need recalibration. Upgraded to MEDIUM.' ) # Bias 4: Over-estimation — HIGH risk but very few persons (downward correction) if rr.risk_level == 'HIGH' and fr.person_count < config.REFLECTION_OVER_EST_THRESH: bias_detected = True corrected_level = 'MEDIUM' corrected_score = min(corrected_score, 0.50) # pull down to MEDIUM zone critique.append( f'OVER_ESTIMATION: HIGH risk but only {fr.person_count} persons. ' f'Downgraded to MEDIUM — possible empty-frame or detection artifact.' ) if not critique: critique.append('OK: assessment consistent, no bias detected.') reflection = { 'frame_id': rr.frame_id, 'original_level': rr.risk_level, 'original_score': rr.risk_score, 'corrected_level': corrected_level, 'corrected_score': round(corrected_score, 4), 'bias_detected': bias_detected, 'critique': ' | '.join(critique), 'person_count': fr.person_count, } self._reflection_log.append(reflection) if bias_detected: print( f' 🪞 [ReflectionAgent] Frame {rr.frame_id}: ' f'{rr.risk_level}({rr.risk_score:.3f}) → ' f'{corrected_level}({corrected_score:.3f})' ) print(f' {critique[0][:90]}') return reflection def get_summary(self) -> dict: """Summary stats — used by dashboard and evaluation section.""" total = len(self._reflection_log) biased = sum(1 for r in self._reflection_log if r['bias_detected']) return { 'total_reflections': total, 'bias_events': biased, 'bias_rate_pct': round(biased / total * 100, 1) if total > 0 else 0, 'corrections': { 'LOW->MEDIUM': sum(1 for r in self._reflection_log if r['original_level'] == 'LOW' and r['corrected_level'] == 'MEDIUM'), }, }