Spaces:
Running
Running
| """ | |
| HaramGuard β OperationsAgent | |
| ============================== | |
| AISA Layer : Execution (Decision) | |
| Design Pattern : Event-Driven | |
| Responsibilities: | |
| - Fire ONLY when risk level changes (event-driven, not every frame) | |
| - Map risk score to priority: P0 (immediate) / P1 (5 min) / P2 (routine) | |
| - Return a Decision with empty actions β CoordinatorAgent fills them dynamically | |
| - Apply Guardrail: P0 rate-limiting (max 1 per 5 min per zone) | |
| - Persist every decision to DB | |
| Actions are NO LONGER hardcoded. CoordinatorAgent (LLM) decides which specific | |
| gates and actions to take based on live crowd data for every decision. | |
| """ | |
| from datetime import datetime | |
| from typing import Optional | |
| from core.models import RiskResult, Decision | |
| from core.database import HajjFlowDB | |
| import config | |
| class OperationsAgent: | |
| # ββ Guardrail: cooldown between P0 alerts per zone ββββββββββββββββ | |
| RATE_LIMIT_SEC = 300 # 5 minutes | |
| def __init__(self, db: HajjFlowDB): | |
| self.name = 'OperationsAgent' | |
| self.aisa_layer = 'Execution (Decision)' | |
| self.db = db | |
| self._boot_time = datetime.now() | |
| print('π¨ [OperationsAgent] Ready β event-driven | actions decided by CoordinatorAgent (LLM)') | |
| # ββ Private helpers βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_priority(self, risk_score: float, risk_level: str = None) -> str: | |
| """ | |
| Map risk score to priority: P0 (immediate) / P1 (5 min) / P2 (routine) | |
| Uses config thresholds for alignment with RiskAgent. | |
| risk_level takes precedence for HIGH to handle floating-point edge cases. | |
| """ | |
| p0_threshold = config.OPS_P0_SCORE # 0.65 | |
| p1_threshold = config.OPS_P1_SCORE # 0.35 | |
| if risk_level == 'HIGH': | |
| return 'P0' | |
| if risk_score >= p0_threshold: | |
| return 'P0' | |
| if risk_score >= p1_threshold: | |
| return 'P1' | |
| return 'P2' | |
| def _is_rate_limited(self, context: str, priority: str) -> bool: | |
| """Guardrail: prevent P0 spam for the same zone.""" | |
| if priority != 'P0': | |
| return False | |
| last = self.db.get_last_p0_time(context) | |
| if last: | |
| # Ignore stale P0 records from previous runs; rate-limit only within current process lifetime. | |
| if last < self._boot_time: | |
| return False | |
| delta = datetime.now() - last | |
| if delta.total_seconds() < self.RATE_LIMIT_SEC: | |
| print( | |
| f' [GR-OPS] P0 rate-limited for {context} ' | |
| f'({int(delta.total_seconds())}s ago β cooldown {self.RATE_LIMIT_SEC}s)' | |
| ) | |
| return True | |
| return False | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process(self, rr: RiskResult, | |
| context: str = 'Mecca_Main_Area') -> Optional[Decision]: | |
| """ | |
| Evaluate risk and emit a Decision only on level transitions. | |
| Returns Decision with empty actions β pipeline fills them via CoordinatorAgent. | |
| """ | |
| if not rr.level_changed: | |
| return None | |
| priority = self._get_priority(rr.risk_score, rr.risk_level) | |
| if self._is_rate_limited(context, priority): | |
| return None | |
| # Actions and justification are intentionally empty here. | |
| # CoordinatorAgent (LLM) will dynamically decide gates and actions | |
| # and populate decision.actions, decision.justification, decision.selected_gates. | |
| decision = Decision( | |
| frame_id = rr.frame_id, | |
| timestamp = datetime.now().isoformat(), | |
| context = context, | |
| priority = priority, | |
| actions = [], | |
| risk_score = rr.risk_score, | |
| risk_level = rr.risk_level, | |
| justification = '', | |
| ) | |
| self.db.save_decision(decision) | |
| print( | |
| f'π¨ [OperationsAgent] {priority} @ frame {rr.frame_id} | ' | |
| f'risk={rr.risk_level}({rr.risk_score:.3f}) β CoordinatorAgent will decide actions' | |
| ) | |
| return decision | |