HaramGuard / backend /agents /reflection_agent.py
adeem6's picture
Update backend/agents/reflection_agent.py (#3)
bcec515
"""
HaramGuard β€” ReflectionAgent
==============================
AISA Layer : Reasoning + Governance
Design Pattern : Reflection (Self-Critique & Correction)
Responsibilities:
- Observe every RiskResult produced by RiskAgent
- Critique the assessment for four systematic biases:
1. Chronic LOW under-reporting (20+ consecutive LOW with large crowd)
2. Rising trend ignored (trend=rising but risk=LOW with n>20 persons)
3. Count-risk mismatch (80+ persons but risk=LOW)
4. Over-estimation (HIGH risk but only <15 persons detected)
- Correct: override risk_level and risk_score when bias is found
- Log: persist every reflection to DB for evaluation / auditability
Flow per frame:
Observe β†’ Critique β†’ Correct β†’ Log
"""
import numpy as np
from collections import deque
from core.models import FrameResult, RiskResult
import config
class ReflectionAgent:
BIAS_WINDOW = config.REFLECTION_BIAS_WINDOW # 10
CROWD_LOW_THRESH = config.REFLECTION_CROWD_LOW_THRESH # 15
HIGH_CROWD_THRESH = config.REFLECTION_HIGH_CROWD_THRESH # 50
CRITICAL_CROWD_THRESH = 70 # 70+ persons always HIGH (single-camera calibration)
def __init__(self):
self.name = 'ReflectionAgent'
self.aisa_layer = 'Reasoning (Reflection)'
self._history = deque(maxlen=self.BIAS_WINDOW)
self._reflection_log = []
print('πŸͺž [ReflectionAgent] Ready β€” 4 bias detectors active')
def reflect(self, rr: RiskResult, fr: FrameResult) -> dict:
"""
Four-step reflection loop:
Step 1 β€” Observe : record latest assessment in history window
Step 2 β€” Critique : check for each known bias pattern
Step 3 β€” Correct : compute corrected_level / corrected_score
Step 4 β€” Log : append to internal log (also saved to DB by pipeline)
Returns reflection dict β€” pipeline applies corrections to rr before
passing it downstream to OperationsAgent.
"""
self._history.append({
'risk_level': rr.risk_level,
'risk_score': rr.risk_score,
'person_count': fr.person_count,
'trend': rr.trend,
})
critique = []
bias_detected = False
corrected_level = rr.risk_level
corrected_score = rr.risk_score
# Bias 1: Chronic LOW under-reporting
if len(self._history) >= self.BIAS_WINDOW:
all_low = all(h['risk_level'] == 'LOW' for h in self._history)
avg_crowd = np.mean([h['person_count'] for h in self._history])
if all_low and avg_crowd > self.CROWD_LOW_THRESH:
bias_detected = True
corrected_level = 'MEDIUM'
corrected_score = max(rr.risk_score, 0.30) # push into MEDIUM zone (>0.20)
critique.append(
f'CHRONIC_LOW_BIAS: {self.BIAS_WINDOW} consecutive LOW '
f'with avg crowd={avg_crowd:.0f} persons. Upgraded to MEDIUM.'
)
# Bias 2: Rising trend ignored
if rr.trend == 'rising' and rr.risk_level == 'LOW' and fr.person_count > 20:
bias_detected = True
corrected_level = 'MEDIUM'
corrected_score = max(corrected_score, 0.25) # push into MEDIUM zone (>0.20)
critique.append(
f'RISING_TREND_IGNORED: trend=rising, persons={fr.person_count}, '
f'but risk=LOW. Upgraded to MEDIUM.'
)
# Bias 3: Count-risk mismatch (enhanced with critical threshold)
if fr.person_count >= self.CRITICAL_CROWD_THRESH and rr.risk_level == 'LOW':
# 70+ persons with LOW risk β†’ CRITICAL inconsistency β†’ upgrade to HIGH
bias_detected = True
corrected_level = 'HIGH'
corrected_score = max(corrected_score, 0.85) # push into HIGH zone (>0.80)
critique.append(
f'CRITICAL_COUNT_RISK_MISMATCH: {fr.person_count} persons but risk=LOW. '
f'This is a severe inconsistency. '
f'Upgraded to HIGH (corrected_score={corrected_score:.3f}).'
)
elif fr.person_count > self.HIGH_CROWD_THRESH and rr.risk_level == 'LOW':
# 50+ persons with LOW risk β†’ upgrade to MEDIUM
bias_detected = True
corrected_level = 'MEDIUM'
corrected_score = max(corrected_score, 0.30) # push into MEDIUM zone (>0.20)
critique.append(
f'COUNT_RISK_MISMATCH: {fr.person_count} persons but risk=LOW. '
f'Thresholds may need recalibration. Upgraded to MEDIUM.'
)
# Bias 4: Over-estimation β€” HIGH risk but very few persons (downward correction)
if rr.risk_level == 'HIGH' and fr.person_count < config.REFLECTION_OVER_EST_THRESH:
bias_detected = True
corrected_level = 'MEDIUM'
corrected_score = min(corrected_score, 0.50) # pull down to MEDIUM zone
critique.append(
f'OVER_ESTIMATION: HIGH risk but only {fr.person_count} persons. '
f'Downgraded to MEDIUM β€” possible empty-frame or detection artifact.'
)
if not critique:
critique.append('OK: assessment consistent, no bias detected.')
reflection = {
'frame_id': rr.frame_id,
'original_level': rr.risk_level,
'original_score': rr.risk_score,
'corrected_level': corrected_level,
'corrected_score': round(corrected_score, 4),
'bias_detected': bias_detected,
'critique': ' | '.join(critique),
'person_count': fr.person_count,
}
self._reflection_log.append(reflection)
if bias_detected:
print(
f' πŸͺž [ReflectionAgent] Frame {rr.frame_id}: '
f'{rr.risk_level}({rr.risk_score:.3f}) β†’ '
f'{corrected_level}({corrected_score:.3f})'
)
print(f' {critique[0][:90]}')
return reflection
def get_summary(self) -> dict:
"""Summary stats β€” used by dashboard and evaluation section."""
total = len(self._reflection_log)
biased = sum(1 for r in self._reflection_log if r['bias_detected'])
return {
'total_reflections': total,
'bias_events': biased,
'bias_rate_pct': round(biased / total * 100, 1) if total > 0 else 0,
'corrections': {
'LOW->MEDIUM': sum(1 for r in self._reflection_log
if r['original_level'] == 'LOW'
and r['corrected_level'] == 'MEDIUM'),
},
}