""" HaramGuard — CoordinatorAgent ================================ AISA Layer : Reasoning + Governance Design Pattern : ReAct (Reason → Act → Observe) + Output Guardrails ReAct Pattern Implementation (Required by Capstone Rubric): This agent implements the ReAct (Reasoning-Acting-Observing) design pattern as explicitly required by the capstone project rubric. The ReAct loop enables iterative self-correction through: 1. REASON: Analyze the situation and compose a structured prompt - Input: RiskResult, Decision, recent frames, feedback (if any) - Output: Contextualized prompt for LLM 2. ACT: Execute the action (call LLM to generate action plan) - Model: Groq API (GPT-OSS-120B or similar) - Output: Raw JSON plan from LLM 3. OBSERVE: Validate the output using guardrails - Run 5 validation checks (GR-C1 through GR-C5) - If issues found: generate feedback and loop back to REASON - If valid: return the plan The loop continues up to MAX_REACT_ITERS (3) times, ensuring the agent can self-correct errors in its reasoning or output format. Responsibilities: - Called on ALL decisions (P0/P1/P2) — not just critical emergencies - Implements a ReAct loop (max 3 iterations): Reason : analyse the crowd situation and compose a prompt Act : call LLM (Groq) to generate a structured action plan Observe : run 6 guardrails to validate the output → if validation fails, feed back issues and Reason again - Guardrails: GR-C1: Required fields check GR-C2: Valid threat level (CRITICAL/HIGH/MEDIUM/LOW only) GR-C3: Confidence score in [0, 1] GR-C4: Consistency check (low risk score ≠ CRITICAL threat) GR-C5: Arabic alert fallback if empty GR-C6: selected_gates must be a non-empty list """ import json import numpy as np from typing import Optional, Tuple from groq import Groq from openai import OpenAI from core.models import RiskResult, Decision class CoordinatorAgent: REQUIRED_FIELDS = { 'threat_level', 'executive_summary', 'selected_gates', 'immediate_actions', 'actions_justification', 'arabic_alert', 'confidence_score' } VALID_THREATS = {'CRITICAL', 'HIGH', 'MEDIUM', 'LOW'} MAX_REACT_ITERS = 3 # ReAct: maximum reasoning iterations # Real gate list — injected into every LLM prompt so the agent can choose HARAM_GATES = [ 'باب الملك عبدالعزيز', # South, main entrance, highest traffic 'باب الملك فهد', # North, large capacity 'باب السلام', # East, historic, medium traffic 'باب الفتح', # West, medium capacity 'باب العمرة', # West, Umrah pilgrims 'باب الملك عبدالله', # South-West, high capacity 'باب الصفا', # East, leads to Safa-Marwa 'باب علي', # North-East, smaller gate 'باب الزيادة', # North, overflow gate 'باب المروة', # East, leads to Marwa ] def __init__(self, groq_api_key: str): self.name = 'CoordinatorAgent' self.aisa_layer = 'Reasoning + Governance (ReAct)' self._groq_client = Groq(api_key=groq_api_key) self._active_backend = 'groq' self._active_model = 'llama-3.3-70b-versatile' print(f'🧠 [CoordinatorAgent] Ready — backend=groq model={self._active_model} | ReAct loop') # ── Guardrails ──────────────────────────────────────────────────── def _validate(self, plan: dict, risk_score: float) -> Tuple[dict, list]: """ Validate and sanitise model output. Returns (corrected_plan, list_of_issues). Issues list is empty when the plan is fully valid. """ issues = [] if not isinstance(plan, dict): plan = {} issues.append('GR_C1_invalid_json_object') # GR-C1: Required fields for field_name in self.REQUIRED_FIELDS: if field_name not in plan: plan[field_name] = 'N/A' issues.append(f'GR_C1_missing:{field_name}') # GR-C2: Valid threat level tl = str(plan.get('threat_level', '')).upper() if tl not in self.VALID_THREATS: issues.append(f'GR_C2_invalid_threat:{tl}->HIGH') plan['threat_level'] = 'HIGH' else: plan['threat_level'] = tl # GR-C3: Confidence in [0, 1] cs = plan.get('confidence_score', 0) if not isinstance(cs, (int, float)) or not (0 <= cs <= 1): issues.append(f'GR_C3_invalid_confidence:{cs}->0.5') plan['confidence_score'] = 0.5 # GR-C4: Consistency — threat_level must match risk_score thresholds # risk > 0.80 (density_pct > 80%) → HIGH # risk > 0.20 (density_pct > 20%) → MEDIUM # risk <= 0.20 (density_pct <= 20%) → LOW expected = 'HIGH' if risk_score > 0.80 else 'MEDIUM' if risk_score > 0.20 else 'LOW' tl_current = plan['threat_level'] if tl_current != expected: issues.append(f'GR_C4_threat_corrected:{tl_current}->{expected}') plan['threat_level'] = expected # GR-C5: Arabic alert fallback if not str(plan.get('arabic_alert', '')).strip(): plan['arabic_alert'] = ( 'تنبيه أمني: يُرجى مراقبة نقاط التجمع واتخاذ الإجراءات الوقائية اللازمة.' ) issues.append('GR_C5_arabic_fallback') # GR-C1 extra: immediate_actions must be a non-empty list ia = plan.get('immediate_actions', []) if not isinstance(ia, list) or not ia: plan['immediate_actions'] = ['زيادة المراقبة الميدانية', 'إرسال وحدات إلى نقطة الازدحام'] issues.append('GR_C1_immediate_actions_fixed') # GR-C6: selected_gates — count enforced by threat level # LOW/P2 → 0 gates (no action needed) # MEDIUM → exactly 1 gate # HIGH → exactly 2 gates # CRITICAL→ exactly 2 gates (same as HIGH) tl_now = plan.get('threat_level', 'LOW') sg = plan.get('selected_gates', []) if not isinstance(sg, list): sg = [] if tl_now == 'LOW': plan['selected_gates'] = [] # no action, no gates shown elif tl_now == 'MEDIUM': if not sg: plan['selected_gates'] = ['باب الملك عبدالعزيز'] issues.append('GR_C6_medium_fallback') else: plan['selected_gates'] = sg[:1] # cap at 1 else: # HIGH or CRITICAL if len(sg) < 2: fallback = ['باب الملك عبدالعزيز', 'باب السلام'] plan['selected_gates'] = (sg + fallback)[:2] issues.append('GR_C6_high_padded') else: plan['selected_gates'] = sg[:2] # cap at 2 plan['_guardrail_issues'] = issues return plan, issues # ── ReAct helpers ───────────────────────────────────────────────── def _build_prompt( self, rr: RiskResult, decision: Decision, recent_frames: list, feedback: str = '' ) -> str: """ Reason step: compose the model prompt. If feedback is provided (from a previous failed Observe step), include it so the model can correct itself. """ avg_p = np.mean([f.person_count for f in recent_frames]) if recent_frames else 0 cur_count = recent_frames[-1].person_count if recent_frames else 0 gates_list = '\n'.join( f'{i+1}. {g}' for i, g in enumerate(self.HARAM_GATES) ) base = ( 'أنت منسق نظام حارس الحرم لإدارة سلامة الحشود في المسجد الحرام.\n' 'مهمتك: إنتاج خطة تشغيلية واضحة وموجزة للمشغلين بناءً على بيانات الحشود الحالية.\n\n' f'مستوى الخطر : {rr.risk_level} (الدرجة: {rr.risk_score:.3f})\n' f'الأولوية : {decision.priority} (P0=طارئ حرج، P1=تحذير وقائي، P2=مراقبة روتينية)\n' f'اتجاه الحشود : {rr.trend}\n' f'العدد الحالي : {cur_count} شخص | المتوسط (آخر 30 إطار): {avg_p:.0f} | الذروة: {rr.window_max}\n\n' 'البوابات المتاحة في المسجد الحرام — اختر الأنسب منها:\n' f'{gates_list}\n\n' 'إرشادات اختيار البوابات:\n' ' P0 (طارئ حرج): افتح بوابات الإخلاء عالية السعة + أغلق المداخل المزدحمة\n' ' P1 (تحذير) : فعّل اللافتات الإرشادية نحو البوابات الأقل ازدحاماً\n' ' P2 (روتيني) : راقب البوابات الأكثر حركة فقط\n\n' ) if feedback: base += ( 'تصحيح مطلوب — الإجابة السابقة بها مشاكل:\n' f'{feedback}\n' 'صحح جميع المشاكل وأعد الإجابة.\n\n' ) base += ( 'قواعد الإخراج الصارمة:\n' '- أجب فقط بـ JSON خام. بدون markdown. بدون backticks. بدون أي نص قبله أو بعده.\n' '- جميع حقول النص يجب أن تكون باللغة العربية الفصحى الرسمية حصراً. لا تستخدم العامية أو اللهجات المحلية مطلقاً. استخدم أسلوباً مهنياً رسمياً يليق بإدارة الحرم المكي.\n' '- ممنوع استخدام صيغة الأمر المباشر (راقبوا، تأكدوا، افتحوا). استخدم دائماً صيغة يُرجى / يُطلب / يُوصى. مثال: يُرجى مراقبة... وليس راقبوا...\n' '- لا تستخدم علامات الاقتباس المزدوجة (") داخل قيم النص العربي.\n' '- لا تضع أسطراً جديدة (\\n) داخل قيم النصوص في JSON.\n\n' 'قواعد selected_gates — إلزامية بحسب مستوى الخطر:\n' ' * LOW (P2 روتيني) : قائمة فارغة [] — لا حاجة لأي إجراء على البوابات.\n' ' * MEDIUM (P1 تحذير): بوابة واحدة فقط — الأنسب للتوجيه الوقائي.\n' ' * HIGH/CRITICAL (P0): بوابتان فقط — بوابة إخلاء + بوابة تحويل.\n' ' * استخدم الأسماء العربية الدقيقة من القائمة أعلاه حرفاً بحرف.\n' ' * ممنوع منعاً باتاً إدراج أكثر من بوابتين في أي حال — هذا خطأ فادح.\n\n' '- immediate_actions: 3 إلى 5 إجراءات عربية قصيرة تذكر البوابات المختارة بالاسم.\n' '- actions_justification: أقل من 40 كلمة. اشرح لماذا هذه البوابات بالذات.\n' '- arabic_alert: أقل من 15 كلمة. توجيه رسمي بالفصحى موجّه لموظفي الأمن والمشغلين حصراً. استخدم صيغة يُرجى/يُطلب فقط. مثال: يُرجى توجيه الحشود نحو باب الملك فهد ومراقبة نقاط التجمع.\n' '- executive_summary: أقل من 20 كلمة. ملخص الموقف للقيادة.\n\n' 'أعد بالضبط هذا JSON (بدون حقول إضافية، بدون نص إضافي):\n' '{\n' ' "threat_level": "HIGH",\n' ' "executive_summary": "...",\n' ' "selected_gates": ["باب ..."],\n' ' "immediate_actions": ["..."],\n' ' "actions_justification": "...",\n' ' "arabic_alert": "...",\n' ' "confidence_score": 0.85\n' '}' ) return base def _llm_call(self, prompt: str) -> Optional[dict]: """Act step: call Groq LLM, parse JSON response.""" raw_text = '' try: resp = self._groq_client.chat.completions.create( model=self._active_model, messages=[{'role': 'user', 'content': prompt}], max_tokens=1200, temperature=0.2, ) raw_text = (resp.choices[0].message.content or '').strip() # ── Parse JSON ──────────────────────────────────────────── try: return json.loads(raw_text) except json.JSONDecodeError: pass # Brace-counting extractor (handles truncated responses) start = raw_text.find('{') if start != -1: fragment = raw_text[start:] depth = 0 best_end = -1 for i, ch in enumerate(fragment): if ch == '{': depth += 1 elif ch == '}': depth -= 1 if depth == 0: best_end = i break if best_end != -1: return json.loads(fragment[:best_end + 1]) raise json.JSONDecodeError('No valid JSON block found', raw_text, 0) except json.JSONDecodeError as e: print(f' [CoordinatorAgent] JSON parse error: {e}') print(f' [CoordinatorAgent] Raw LLM response (first 600 chars):\n{raw_text[:600]}') return {} except Exception as e: print(f' [CoordinatorAgent] API error: {e}') return None # ── Public API ──────────────────────────────────────────────────── def call(self, rr: RiskResult, decision: Decision, recent_frames: list) -> Optional[dict]: """ ReAct loop: Reason → Act → Observe, up to MAX_REACT_ITERS times. Returns validated plan dict or None on unrecoverable error. """ print(f'\n🧠 [CoordinatorAgent] {decision.priority} @ frame {rr.frame_id} → ReAct loop starting...') feedback = '' best_plan = None for iteration in range(1, self.MAX_REACT_ITERS + 1): print(f' ↺ ReAct iteration {iteration}/{self.MAX_REACT_ITERS}') # ── Reason ────────────────────────────────────────────── prompt = self._build_prompt(rr, decision, recent_frames, feedback) # ── Act ───────────────────────────────────────────────── raw = self._llm_call(prompt) if raw is None: print(' [CoordinatorAgent] API unavailable — aborting ReAct') return best_plan # return previous best (may be None) # ── Observe ───────────────────────────────────────────── plan, issues = self._validate(raw, rr.risk_score) best_plan = plan if not issues: print(f' ✅ [CoordinatorAgent] Plan validated on iteration {iteration}') plan['_react_iterations'] = iteration plan['_llm_model'] = f'{self._active_backend}/{self._active_model}' return plan feedback = '; '.join(issues) print(f' ⚠️ Issues found ({len(issues)}): {feedback}') print(' [CoordinatorAgent] Max ReAct iterations reached — returning best effort') if best_plan: best_plan['_react_iterations'] = self.MAX_REACT_ITERS best_plan['_llm_model'] = f'{self._active_backend}/{self._active_model}' return best_plan