HaramGuard / backend /pipeline.py
adeem6's picture
Update backend/pipeline.py (#7)
0b37ca0
"""
HaramGuard β€” RealTimePipeline
================================
Orchestrates all agents frame-by-frame.
`pipeline.state` is the single shared dict read by the Streamlit dashboard.
Agent execution order per frame:
1. PerceptionAgent β†’ detect & track persons
2. RiskAgent β†’ compute risk score
3. ReflectionAgent β†’ self-critique & correct if biased
4. OperationsAgent β†’ emit decision on level change (event-driven)
5. CoordinatorAgent β†’ LLM decides gates + actions for ALL priorities (P0/P1/P2)
"""
import time
import cv2
import numpy as np
import threading
from collections import deque
import config
from core.database import HajjFlowDB
from agents.perception_agent import PerceptionAgent
from agents.risk_agent import RiskAgent
from agents.reflection_agent import ReflectionAgent
from agents.operations_agent import OperationsAgent
from agents.coordinator_agent import CoordinatorAgent
class RealTimePipeline:
def __init__(self,
video_path: str,
groq_api_key: str,
anthropic_key: str = None,
model_path: str = 'yolov10n.pt',
db_path: str = 'outputs/hajjflow_rt.db'):
self.video_path = video_path
self.db = HajjFlowDB(db_path)
# ── Instantiate agents ────────────────────────────────────────
# Cached mode: read pre-computed detections; Live mode: run YOLO
cached_path = getattr(config, 'CACHED_DETECTIONS_PATH', None)
self.perception = PerceptionAgent(model_path, anthropic_key=None, cached_path=cached_path)
self.risk = RiskAgent()
self.reflection = ReflectionAgent()
self.operations = OperationsAgent(self.db)
self.coordinator = CoordinatorAgent(groq_api_key)
self._last_effective_risk_level = 'LOW'
self._frame_buffer = deque(maxlen=30)
# ── Shared state β€” read by Streamlit dashboard ─────────────────
self.state = {
# PerceptionAgent outputs
'frame_id': 0,
'annotated': None, # np.ndarray BGR
'person_count': 0,
'density_score': 0.0,
'fps': 0.0,
# RiskAgent outputs (after ReflectionAgent correction)
'risk_score': 0.0,
'risk_level': 'LOW',
'trend': 'stable',
'risk_history': [], # list[dict] for chart
# OperationsAgent outputs
'latest_decision': None, # Decision dataclass or None
'decisions_log': [], # last 10 decisions from DB
# CoordinatorAgent outputs (P0 only)
'arabic_alert': '',
'coordinator_plan': None,
# ReflectionAgent outputs
'reflection_summary': {},
'last_reflection': None,
}
print('πŸš€ [Pipeline] Initialized β€” all agents ready')
# ── Video capture helper ──────────────────────────────────────────
def get_video_capture(self) -> cv2.VideoCapture:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise RuntimeError(f'Cannot open video: {self.video_path}')
return cap
# ── Per-frame processing ──────────────────────────────────────────
def process_one_frame(self, frame: np.ndarray) -> None:
"""
Run all agents on a single frame and update self.state.
Called from the Streamlit loop or batch runner.
"""
t0 = time.time()
# Agent 1 β€” Perception
fr = self.perception.process_frame(frame)
self._frame_buffer.append(fr)
# Agent 2 β€” Risk scoring
rr = self.risk.process_frame(fr)
original_level_changed = rr.level_changed
# Agent 3 β€” Reflection (self-critique)
reflection = self.reflection.reflect(rr, fr)
if reflection['bias_detected']:
rr.risk_level = reflection['corrected_level']
rr.risk_score = reflection['corrected_score']
# Ensure event-driven decisions use effective risk after reflection.
effective_level_changed = rr.risk_level != self._last_effective_risk_level
rr.level_changed = bool(original_level_changed or effective_level_changed)
self._last_effective_risk_level = rr.risk_level
self.db.save_reflection(reflection)
# Save risk event every 30 frames
if fr.frame_id % 30 == 0:
self.db.save_risk_event(rr)
# Agent 4 β€” Operations (event-driven)
decision = self.operations.process(rr, context='Mecca_Main_Area')
# Agent 5 β€” Coordinator (all priorities: P0/P1/P2)
# LLM runs in background thread so it doesn't block frame processing.
if decision:
self._launch_coordinator(rr, decision, list(self._frame_buffer))
# ── Update shared state ───────────────────────────────────────
fps = round(1.0 / (time.time() - t0 + 1e-9), 1)
self.state.update({
'frame_id': fr.frame_id,
'annotated': fr.annotated,
'person_count': fr.person_count,
'density_score': fr.density_score,
'density_ema': rr.density_ema,
'density_pct': rr.density_pct,
'fps': fps,
'risk_score': rr.risk_score,
'risk_level': rr.risk_level,
'trend': rr.trend,
'risk_history': self.db.get_risk_history(60),
'reflection_summary': self.reflection.get_summary(),
'last_reflection': reflection,
})
if decision:
self.state['latest_decision'] = decision
self.state['decisions_log'] = self.db.get_recent_decisions(10)
def _launch_coordinator(self, rr, decision, frame_buffer):
"""Run CoordinatorAgent LLM call in a background thread."""
def _run():
try:
plan = self.coordinator.call(rr, decision, frame_buffer)
if plan:
decision.actions = plan.get('immediate_actions', [])
decision.justification = plan.get('actions_justification', '')
decision.selected_gates = plan.get('selected_gates', [])
self.db.save_coordinator_plan(rr.frame_id, plan)
self.state['coordinator_plan'] = plan
self.state['arabic_alert'] = plan.get('arabic_alert', '')
self.state['latest_decision'] = decision
self.state['decisions_log'] = self.db.get_recent_decisions(10)
except Exception as exc:
print(f'[Pipeline] Coordinator error: {exc}')
threading.Thread(target=_run, daemon=True).start()