""" HaramGuard — RealTimePipeline ================================ Orchestrates all agents frame-by-frame. `pipeline.state` is the single shared dict read by the Streamlit dashboard. Agent execution order per frame: 1. PerceptionAgent → detect & track persons 2. RiskAgent → compute risk score 3. ReflectionAgent → self-critique & correct if biased 4. OperationsAgent → emit decision on level change (event-driven) 5. CoordinatorAgent → LLM decides gates + actions for ALL priorities (P0/P1/P2) """ import time import cv2 import numpy as np import threading from collections import deque import config from core.database import HajjFlowDB from agents.perception_agent import PerceptionAgent from agents.risk_agent import RiskAgent from agents.reflection_agent import ReflectionAgent from agents.operations_agent import OperationsAgent from agents.coordinator_agent import CoordinatorAgent class RealTimePipeline: def __init__(self, video_path: str, groq_api_key: str, anthropic_key: str = None, model_path: str = 'yolov10n.pt', db_path: str = 'outputs/hajjflow_rt.db'): self.video_path = video_path self.db = HajjFlowDB(db_path) # ── Instantiate agents ──────────────────────────────────────── # Cached mode: read pre-computed detections; Live mode: run YOLO cached_path = getattr(config, 'CACHED_DETECTIONS_PATH', None) self.perception = PerceptionAgent(model_path, anthropic_key=None, cached_path=cached_path) self.risk = RiskAgent() self.reflection = ReflectionAgent() self.operations = OperationsAgent(self.db) self.coordinator = CoordinatorAgent(groq_api_key) self._last_effective_risk_level = 'LOW' self._frame_buffer = deque(maxlen=30) # ── Shared state — read by Streamlit dashboard ───────────────── self.state = { # PerceptionAgent outputs 'frame_id': 0, 'annotated': None, # np.ndarray BGR 'person_count': 0, 'density_score': 0.0, 'fps': 0.0, # RiskAgent outputs (after ReflectionAgent correction) 'risk_score': 0.0, 'risk_level': 'LOW', 'trend': 'stable', 'risk_history': [], # list[dict] for chart # OperationsAgent outputs 'latest_decision': None, # Decision dataclass or None 'decisions_log': [], # last 10 decisions from DB # CoordinatorAgent outputs (P0 only) 'arabic_alert': '', 'coordinator_plan': None, # ReflectionAgent outputs 'reflection_summary': {}, 'last_reflection': None, } print('🚀 [Pipeline] Initialized — all agents ready') # ── Video capture helper ────────────────────────────────────────── def get_video_capture(self) -> cv2.VideoCapture: cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise RuntimeError(f'Cannot open video: {self.video_path}') return cap # ── Per-frame processing ────────────────────────────────────────── def process_one_frame(self, frame: np.ndarray) -> None: """ Run all agents on a single frame and update self.state. Called from the Streamlit loop or batch runner. """ t0 = time.time() # Agent 1 — Perception fr = self.perception.process_frame(frame) self._frame_buffer.append(fr) # Agent 2 — Risk scoring rr = self.risk.process_frame(fr) original_level_changed = rr.level_changed # Agent 3 — Reflection (self-critique) reflection = self.reflection.reflect(rr, fr) if reflection['bias_detected']: rr.risk_level = reflection['corrected_level'] rr.risk_score = reflection['corrected_score'] # Ensure event-driven decisions use effective risk after reflection. effective_level_changed = rr.risk_level != self._last_effective_risk_level rr.level_changed = bool(original_level_changed or effective_level_changed) self._last_effective_risk_level = rr.risk_level self.db.save_reflection(reflection) # Save risk event every 30 frames if fr.frame_id % 30 == 0: self.db.save_risk_event(rr) # Agent 4 — Operations (event-driven) decision = self.operations.process(rr, context='Mecca_Main_Area') # Agent 5 — Coordinator (all priorities: P0/P1/P2) # LLM runs in background thread so it doesn't block frame processing. if decision: self._launch_coordinator(rr, decision, list(self._frame_buffer)) # ── Update shared state ─────────────────────────────────────── fps = round(1.0 / (time.time() - t0 + 1e-9), 1) self.state.update({ 'frame_id': fr.frame_id, 'annotated': fr.annotated, 'person_count': fr.person_count, 'density_score': fr.density_score, 'density_ema': rr.density_ema, 'density_pct': rr.density_pct, 'fps': fps, 'risk_score': rr.risk_score, 'risk_level': rr.risk_level, 'trend': rr.trend, 'risk_history': self.db.get_risk_history(60), 'reflection_summary': self.reflection.get_summary(), 'last_reflection': reflection, }) if decision: self.state['latest_decision'] = decision self.state['decisions_log'] = self.db.get_recent_decisions(10) def _launch_coordinator(self, rr, decision, frame_buffer): """Run CoordinatorAgent LLM call in a background thread.""" def _run(): try: plan = self.coordinator.call(rr, decision, frame_buffer) if plan: decision.actions = plan.get('immediate_actions', []) decision.justification = plan.get('actions_justification', '') decision.selected_gates = plan.get('selected_gates', []) self.db.save_coordinator_plan(rr.frame_id, plan) self.state['coordinator_plan'] = plan self.state['arabic_alert'] = plan.get('arabic_alert', '') self.state['latest_decision'] = decision self.state['decisions_log'] = self.db.get_recent_decisions(10) except Exception as exc: print(f'[Pipeline] Coordinator error: {exc}') threading.Thread(target=_run, daemon=True).start()