Spaces:
Running
Running
| """ | |
| HaramGuard β RealTimePipeline | |
| ================================ | |
| Orchestrates all agents frame-by-frame. | |
| `pipeline.state` is the single shared dict read by the Streamlit dashboard. | |
| Agent execution order per frame: | |
| 1. PerceptionAgent β detect & track persons | |
| 2. RiskAgent β compute risk score | |
| 3. ReflectionAgent β self-critique & correct if biased | |
| 4. OperationsAgent β emit decision on level change (event-driven) | |
| 5. CoordinatorAgent β LLM decides gates + actions for ALL priorities (P0/P1/P2) | |
| """ | |
| import time | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| from collections import deque | |
| import config | |
| from core.database import HajjFlowDB | |
| from agents.perception_agent import PerceptionAgent | |
| from agents.risk_agent import RiskAgent | |
| from agents.reflection_agent import ReflectionAgent | |
| from agents.operations_agent import OperationsAgent | |
| from agents.coordinator_agent import CoordinatorAgent | |
| class RealTimePipeline: | |
| def __init__(self, | |
| video_path: str, | |
| groq_api_key: str, | |
| anthropic_key: str = None, | |
| model_path: str = 'yolov10n.pt', | |
| db_path: str = 'outputs/hajjflow_rt.db'): | |
| self.video_path = video_path | |
| self.db = HajjFlowDB(db_path) | |
| # ββ Instantiate agents ββββββββββββββββββββββββββββββββββββββββ | |
| # Cached mode: read pre-computed detections; Live mode: run YOLO | |
| cached_path = getattr(config, 'CACHED_DETECTIONS_PATH', None) | |
| self.perception = PerceptionAgent(model_path, anthropic_key=None, cached_path=cached_path) | |
| self.risk = RiskAgent() | |
| self.reflection = ReflectionAgent() | |
| self.operations = OperationsAgent(self.db) | |
| self.coordinator = CoordinatorAgent(groq_api_key) | |
| self._last_effective_risk_level = 'LOW' | |
| self._frame_buffer = deque(maxlen=30) | |
| # ββ Shared state β read by Streamlit dashboard βββββββββββββββββ | |
| self.state = { | |
| # PerceptionAgent outputs | |
| 'frame_id': 0, | |
| 'annotated': None, # np.ndarray BGR | |
| 'person_count': 0, | |
| 'density_score': 0.0, | |
| 'fps': 0.0, | |
| # RiskAgent outputs (after ReflectionAgent correction) | |
| 'risk_score': 0.0, | |
| 'risk_level': 'LOW', | |
| 'trend': 'stable', | |
| 'risk_history': [], # list[dict] for chart | |
| # OperationsAgent outputs | |
| 'latest_decision': None, # Decision dataclass or None | |
| 'decisions_log': [], # last 10 decisions from DB | |
| # CoordinatorAgent outputs (P0 only) | |
| 'arabic_alert': '', | |
| 'coordinator_plan': None, | |
| # ReflectionAgent outputs | |
| 'reflection_summary': {}, | |
| 'last_reflection': None, | |
| } | |
| print('π [Pipeline] Initialized β all agents ready') | |
| # ββ Video capture helper ββββββββββββββββββββββββββββββββββββββββββ | |
| def get_video_capture(self) -> cv2.VideoCapture: | |
| cap = cv2.VideoCapture(self.video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f'Cannot open video: {self.video_path}') | |
| return cap | |
| # ββ Per-frame processing ββββββββββββββββββββββββββββββββββββββββββ | |
| def process_one_frame(self, frame: np.ndarray) -> None: | |
| """ | |
| Run all agents on a single frame and update self.state. | |
| Called from the Streamlit loop or batch runner. | |
| """ | |
| t0 = time.time() | |
| # Agent 1 β Perception | |
| fr = self.perception.process_frame(frame) | |
| self._frame_buffer.append(fr) | |
| # Agent 2 β Risk scoring | |
| rr = self.risk.process_frame(fr) | |
| original_level_changed = rr.level_changed | |
| # Agent 3 β Reflection (self-critique) | |
| reflection = self.reflection.reflect(rr, fr) | |
| if reflection['bias_detected']: | |
| rr.risk_level = reflection['corrected_level'] | |
| rr.risk_score = reflection['corrected_score'] | |
| # Ensure event-driven decisions use effective risk after reflection. | |
| effective_level_changed = rr.risk_level != self._last_effective_risk_level | |
| rr.level_changed = bool(original_level_changed or effective_level_changed) | |
| self._last_effective_risk_level = rr.risk_level | |
| self.db.save_reflection(reflection) | |
| # Save risk event every 30 frames | |
| if fr.frame_id % 30 == 0: | |
| self.db.save_risk_event(rr) | |
| # Agent 4 β Operations (event-driven) | |
| decision = self.operations.process(rr, context='Mecca_Main_Area') | |
| # Agent 5 β Coordinator (all priorities: P0/P1/P2) | |
| # LLM runs in background thread so it doesn't block frame processing. | |
| if decision: | |
| self._launch_coordinator(rr, decision, list(self._frame_buffer)) | |
| # ββ Update shared state βββββββββββββββββββββββββββββββββββββββ | |
| fps = round(1.0 / (time.time() - t0 + 1e-9), 1) | |
| self.state.update({ | |
| 'frame_id': fr.frame_id, | |
| 'annotated': fr.annotated, | |
| 'person_count': fr.person_count, | |
| 'density_score': fr.density_score, | |
| 'density_ema': rr.density_ema, | |
| 'density_pct': rr.density_pct, | |
| 'fps': fps, | |
| 'risk_score': rr.risk_score, | |
| 'risk_level': rr.risk_level, | |
| 'trend': rr.trend, | |
| 'risk_history': self.db.get_risk_history(60), | |
| 'reflection_summary': self.reflection.get_summary(), | |
| 'last_reflection': reflection, | |
| }) | |
| if decision: | |
| self.state['latest_decision'] = decision | |
| self.state['decisions_log'] = self.db.get_recent_decisions(10) | |
| def _launch_coordinator(self, rr, decision, frame_buffer): | |
| """Run CoordinatorAgent LLM call in a background thread.""" | |
| def _run(): | |
| try: | |
| plan = self.coordinator.call(rr, decision, frame_buffer) | |
| if plan: | |
| decision.actions = plan.get('immediate_actions', []) | |
| decision.justification = plan.get('actions_justification', '') | |
| decision.selected_gates = plan.get('selected_gates', []) | |
| self.db.save_coordinator_plan(rr.frame_id, plan) | |
| self.state['coordinator_plan'] = plan | |
| self.state['arabic_alert'] = plan.get('arabic_alert', '') | |
| self.state['latest_decision'] = decision | |
| self.state['decisions_log'] = self.db.get_recent_decisions(10) | |
| except Exception as exc: | |
| print(f'[Pipeline] Coordinator error: {exc}') | |
| threading.Thread(target=_run, daemon=True).start() | |