Spaces:
Running
Running
Update backend/pipeline.py
#7
by
munals - opened
- backend/pipeline.py +31 -14
backend/pipeline.py
CHANGED
|
@@ -15,8 +15,10 @@ Agent execution order per frame:
|
|
| 15 |
import time
|
| 16 |
import cv2
|
| 17 |
import numpy as np
|
|
|
|
| 18 |
from collections import deque
|
| 19 |
|
|
|
|
| 20 |
from core.database import HajjFlowDB
|
| 21 |
from agents.perception_agent import PerceptionAgent
|
| 22 |
from agents.risk_agent import RiskAgent
|
|
@@ -38,12 +40,14 @@ class RealTimePipeline:
|
|
| 38 |
self.db = HajjFlowDB(db_path)
|
| 39 |
|
| 40 |
# ββ Instantiate agents ββββββββββββββββββββββββββββββββββββββββ
|
| 41 |
-
#
|
| 42 |
-
|
|
|
|
| 43 |
self.risk = RiskAgent()
|
| 44 |
self.reflection = ReflectionAgent()
|
| 45 |
self.operations = OperationsAgent(self.db)
|
| 46 |
self.coordinator = CoordinatorAgent(groq_api_key)
|
|
|
|
| 47 |
|
| 48 |
self._frame_buffer = deque(maxlen=30)
|
| 49 |
|
|
@@ -99,12 +103,17 @@ class RealTimePipeline:
|
|
| 99 |
|
| 100 |
# Agent 2 β Risk scoring
|
| 101 |
rr = self.risk.process_frame(fr)
|
|
|
|
| 102 |
|
| 103 |
# Agent 3 β Reflection (self-critique)
|
| 104 |
reflection = self.reflection.reflect(rr, fr)
|
| 105 |
if reflection['bias_detected']:
|
| 106 |
rr.risk_level = reflection['corrected_level']
|
| 107 |
rr.risk_score = reflection['corrected_score']
|
|
|
|
|
|
|
|
|
|
|
|
|
| 108 |
self.db.save_reflection(reflection)
|
| 109 |
|
| 110 |
# Save risk event every 30 frames
|
|
@@ -115,16 +124,9 @@ class RealTimePipeline:
|
|
| 115 |
decision = self.operations.process(rr, context='Mecca_Main_Area')
|
| 116 |
|
| 117 |
# Agent 5 β Coordinator (all priorities: P0/P1/P2)
|
| 118 |
-
# LLM
|
| 119 |
-
plan = None
|
| 120 |
if decision:
|
| 121 |
-
|
| 122 |
-
if plan:
|
| 123 |
-
# Fill decision with LLM-chosen gates and actions
|
| 124 |
-
decision.actions = plan.get('immediate_actions', [])
|
| 125 |
-
decision.justification = plan.get('actions_justification', '')
|
| 126 |
-
decision.selected_gates = plan.get('selected_gates', [])
|
| 127 |
-
self.db.save_coordinator_plan(fr.frame_id, plan)
|
| 128 |
|
| 129 |
# ββ Update shared state βββββββββββββββββββββββββββββββββββββββ
|
| 130 |
fps = round(1.0 / (time.time() - t0 + 1e-9), 1)
|
|
@@ -148,6 +150,21 @@ class RealTimePipeline:
|
|
| 148 |
self.state['latest_decision'] = decision
|
| 149 |
self.state['decisions_log'] = self.db.get_recent_decisions(10)
|
| 150 |
|
| 151 |
-
|
| 152 |
-
|
| 153 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 15 |
import time
|
| 16 |
import cv2
|
| 17 |
import numpy as np
|
| 18 |
+
import threading
|
| 19 |
from collections import deque
|
| 20 |
|
| 21 |
+
import config
|
| 22 |
from core.database import HajjFlowDB
|
| 23 |
from agents.perception_agent import PerceptionAgent
|
| 24 |
from agents.risk_agent import RiskAgent
|
|
|
|
| 40 |
self.db = HajjFlowDB(db_path)
|
| 41 |
|
| 42 |
# ββ Instantiate agents ββββββββββββββββββββββββββββββββββββββββ
|
| 43 |
+
# Cached mode: read pre-computed detections; Live mode: run YOLO
|
| 44 |
+
cached_path = getattr(config, 'CACHED_DETECTIONS_PATH', None)
|
| 45 |
+
self.perception = PerceptionAgent(model_path, anthropic_key=None, cached_path=cached_path)
|
| 46 |
self.risk = RiskAgent()
|
| 47 |
self.reflection = ReflectionAgent()
|
| 48 |
self.operations = OperationsAgent(self.db)
|
| 49 |
self.coordinator = CoordinatorAgent(groq_api_key)
|
| 50 |
+
self._last_effective_risk_level = 'LOW'
|
| 51 |
|
| 52 |
self._frame_buffer = deque(maxlen=30)
|
| 53 |
|
|
|
|
| 103 |
|
| 104 |
# Agent 2 β Risk scoring
|
| 105 |
rr = self.risk.process_frame(fr)
|
| 106 |
+
original_level_changed = rr.level_changed
|
| 107 |
|
| 108 |
# Agent 3 β Reflection (self-critique)
|
| 109 |
reflection = self.reflection.reflect(rr, fr)
|
| 110 |
if reflection['bias_detected']:
|
| 111 |
rr.risk_level = reflection['corrected_level']
|
| 112 |
rr.risk_score = reflection['corrected_score']
|
| 113 |
+
# Ensure event-driven decisions use effective risk after reflection.
|
| 114 |
+
effective_level_changed = rr.risk_level != self._last_effective_risk_level
|
| 115 |
+
rr.level_changed = bool(original_level_changed or effective_level_changed)
|
| 116 |
+
self._last_effective_risk_level = rr.risk_level
|
| 117 |
self.db.save_reflection(reflection)
|
| 118 |
|
| 119 |
# Save risk event every 30 frames
|
|
|
|
| 124 |
decision = self.operations.process(rr, context='Mecca_Main_Area')
|
| 125 |
|
| 126 |
# Agent 5 β Coordinator (all priorities: P0/P1/P2)
|
| 127 |
+
# LLM runs in background thread so it doesn't block frame processing.
|
|
|
|
| 128 |
if decision:
|
| 129 |
+
self._launch_coordinator(rr, decision, list(self._frame_buffer))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 130 |
|
| 131 |
# ββ Update shared state βββββββββββββββββββββββββββββββββββββββ
|
| 132 |
fps = round(1.0 / (time.time() - t0 + 1e-9), 1)
|
|
|
|
| 150 |
self.state['latest_decision'] = decision
|
| 151 |
self.state['decisions_log'] = self.db.get_recent_decisions(10)
|
| 152 |
|
| 153 |
+
def _launch_coordinator(self, rr, decision, frame_buffer):
|
| 154 |
+
"""Run CoordinatorAgent LLM call in a background thread."""
|
| 155 |
+
def _run():
|
| 156 |
+
try:
|
| 157 |
+
plan = self.coordinator.call(rr, decision, frame_buffer)
|
| 158 |
+
if plan:
|
| 159 |
+
decision.actions = plan.get('immediate_actions', [])
|
| 160 |
+
decision.justification = plan.get('actions_justification', '')
|
| 161 |
+
decision.selected_gates = plan.get('selected_gates', [])
|
| 162 |
+
self.db.save_coordinator_plan(rr.frame_id, plan)
|
| 163 |
+
self.state['coordinator_plan'] = plan
|
| 164 |
+
self.state['arabic_alert'] = plan.get('arabic_alert', '')
|
| 165 |
+
self.state['latest_decision'] = decision
|
| 166 |
+
self.state['decisions_log'] = self.db.get_recent_decisions(10)
|
| 167 |
+
except Exception as exc:
|
| 168 |
+
print(f'[Pipeline] Coordinator error: {exc}')
|
| 169 |
+
|
| 170 |
+
threading.Thread(target=_run, daemon=True).start()
|