HaramGuard / backend /agents /operations_agent.py
adeem6's picture
Update backend/agents/operations_agent.py (#5)
550e3cf
"""
HaramGuard β€” OperationsAgent
==============================
AISA Layer : Execution (Decision)
Design Pattern : Event-Driven
Responsibilities:
- Fire ONLY when risk level changes (event-driven, not every frame)
- Map risk score to priority: P0 (immediate) / P1 (5 min) / P2 (routine)
- Return a Decision with empty actions β€” CoordinatorAgent fills them dynamically
- Apply Guardrail: P0 rate-limiting (max 1 per 5 min per zone)
- Persist every decision to DB
Actions are NO LONGER hardcoded. CoordinatorAgent (LLM) decides which specific
gates and actions to take based on live crowd data for every decision.
"""
from datetime import datetime
from typing import Optional
from core.models import RiskResult, Decision
from core.database import HajjFlowDB
import config
class OperationsAgent:
# ── Guardrail: cooldown between P0 alerts per zone ────────────────
RATE_LIMIT_SEC = 300 # 5 minutes
def __init__(self, db: HajjFlowDB):
self.name = 'OperationsAgent'
self.aisa_layer = 'Execution (Decision)'
self.db = db
self._boot_time = datetime.now()
print('🚨 [OperationsAgent] Ready β€” event-driven | actions decided by CoordinatorAgent (LLM)')
# ── Private helpers ───────────────────────────────────────────────
def _get_priority(self, risk_score: float, risk_level: str = None) -> str:
"""
Map risk score to priority: P0 (immediate) / P1 (5 min) / P2 (routine)
Uses config thresholds for alignment with RiskAgent.
risk_level takes precedence for HIGH to handle floating-point edge cases.
"""
p0_threshold = config.OPS_P0_SCORE # 0.65
p1_threshold = config.OPS_P1_SCORE # 0.35
if risk_level == 'HIGH':
return 'P0'
if risk_score >= p0_threshold:
return 'P0'
if risk_score >= p1_threshold:
return 'P1'
return 'P2'
def _is_rate_limited(self, context: str, priority: str) -> bool:
"""Guardrail: prevent P0 spam for the same zone."""
if priority != 'P0':
return False
last = self.db.get_last_p0_time(context)
if last:
# Ignore stale P0 records from previous runs; rate-limit only within current process lifetime.
if last < self._boot_time:
return False
delta = datetime.now() - last
if delta.total_seconds() < self.RATE_LIMIT_SEC:
print(
f' [GR-OPS] P0 rate-limited for {context} '
f'({int(delta.total_seconds())}s ago β€” cooldown {self.RATE_LIMIT_SEC}s)'
)
return True
return False
# ── Public API ────────────────────────────────────────────────────
def process(self, rr: RiskResult,
context: str = 'Mecca_Main_Area') -> Optional[Decision]:
"""
Evaluate risk and emit a Decision only on level transitions.
Returns Decision with empty actions β€” pipeline fills them via CoordinatorAgent.
"""
if not rr.level_changed:
return None
priority = self._get_priority(rr.risk_score, rr.risk_level)
if self._is_rate_limited(context, priority):
return None
# Actions and justification are intentionally empty here.
# CoordinatorAgent (LLM) will dynamically decide gates and actions
# and populate decision.actions, decision.justification, decision.selected_gates.
decision = Decision(
frame_id = rr.frame_id,
timestamp = datetime.now().isoformat(),
context = context,
priority = priority,
actions = [],
risk_score = rr.risk_score,
risk_level = rr.risk_level,
justification = '',
)
self.db.save_decision(decision)
print(
f'🚨 [OperationsAgent] {priority} @ frame {rr.frame_id} | '
f'risk={rr.risk_level}({rr.risk_score:.3f}) β†’ CoordinatorAgent will decide actions'
)
return decision