""" HaramGuard — OperationsAgent ============================== AISA Layer : Execution (Decision) Design Pattern : Event-Driven Responsibilities: - Fire ONLY when risk level changes (event-driven, not every frame) - Map risk score to priority: P0 (immediate) / P1 (5 min) / P2 (routine) - Return a Decision with empty actions — CoordinatorAgent fills them dynamically - Apply Guardrail: P0 rate-limiting (max 1 per 5 min per zone) - Persist every decision to DB Actions are NO LONGER hardcoded. CoordinatorAgent (LLM) decides which specific gates and actions to take based on live crowd data for every decision. """ from datetime import datetime from typing import Optional from core.models import RiskResult, Decision from core.database import HajjFlowDB import config class OperationsAgent: # ── Guardrail: cooldown between P0 alerts per zone ──────────────── RATE_LIMIT_SEC = 300 # 5 minutes def __init__(self, db: HajjFlowDB): self.name = 'OperationsAgent' self.aisa_layer = 'Execution (Decision)' self.db = db self._boot_time = datetime.now() print('🚨 [OperationsAgent] Ready — event-driven | actions decided by CoordinatorAgent (LLM)') # ── Private helpers ─────────────────────────────────────────────── def _get_priority(self, risk_score: float, risk_level: str = None) -> str: """ Map risk score to priority: P0 (immediate) / P1 (5 min) / P2 (routine) Uses config thresholds for alignment with RiskAgent. risk_level takes precedence for HIGH to handle floating-point edge cases. """ p0_threshold = config.OPS_P0_SCORE # 0.65 p1_threshold = config.OPS_P1_SCORE # 0.35 if risk_level == 'HIGH': return 'P0' if risk_score >= p0_threshold: return 'P0' if risk_score >= p1_threshold: return 'P1' return 'P2' def _is_rate_limited(self, context: str, priority: str) -> bool: """Guardrail: prevent P0 spam for the same zone.""" if priority != 'P0': return False last = self.db.get_last_p0_time(context) if last: # Ignore stale P0 records from previous runs; rate-limit only within current process lifetime. if last < self._boot_time: return False delta = datetime.now() - last if delta.total_seconds() < self.RATE_LIMIT_SEC: print( f' [GR-OPS] P0 rate-limited for {context} ' f'({int(delta.total_seconds())}s ago — cooldown {self.RATE_LIMIT_SEC}s)' ) return True return False # ── Public API ──────────────────────────────────────────────────── def process(self, rr: RiskResult, context: str = 'Mecca_Main_Area') -> Optional[Decision]: """ Evaluate risk and emit a Decision only on level transitions. Returns Decision with empty actions — pipeline fills them via CoordinatorAgent. """ if not rr.level_changed: return None priority = self._get_priority(rr.risk_score, rr.risk_level) if self._is_rate_limited(context, priority): return None # Actions and justification are intentionally empty here. # CoordinatorAgent (LLM) will dynamically decide gates and actions # and populate decision.actions, decision.justification, decision.selected_gates. decision = Decision( frame_id = rr.frame_id, timestamp = datetime.now().isoformat(), context = context, priority = priority, actions = [], risk_score = rr.risk_score, risk_level = rr.risk_level, justification = '', ) self.db.save_decision(decision) print( f'🚨 [OperationsAgent] {priority} @ frame {rr.frame_id} | ' f'risk={rr.risk_level}({rr.risk_score:.3f}) → CoordinatorAgent will decide actions' ) return decision