munals commited on
Commit
8d2f3a0
Β·
verified Β·
1 Parent(s): b274069

Update backend/pipeline.py

Browse files
Files changed (1) hide show
  1. backend/pipeline.py +31 -14
backend/pipeline.py CHANGED
@@ -15,8 +15,10 @@ Agent execution order per frame:
15
  import time
16
  import cv2
17
  import numpy as np
 
18
  from collections import deque
19
 
 
20
  from core.database import HajjFlowDB
21
  from agents.perception_agent import PerceptionAgent
22
  from agents.risk_agent import RiskAgent
@@ -38,12 +40,14 @@ class RealTimePipeline:
38
  self.db = HajjFlowDB(db_path)
39
 
40
  # ── Instantiate agents ────────────────────────────────────────
41
- # YOLO-only mode β€” VisionCountAgent (Claude Vision) disabled
42
- self.perception = PerceptionAgent(model_path, anthropic_key=None)
 
43
  self.risk = RiskAgent()
44
  self.reflection = ReflectionAgent()
45
  self.operations = OperationsAgent(self.db)
46
  self.coordinator = CoordinatorAgent(groq_api_key)
 
47
 
48
  self._frame_buffer = deque(maxlen=30)
49
 
@@ -99,12 +103,17 @@ class RealTimePipeline:
99
 
100
  # Agent 2 β€” Risk scoring
101
  rr = self.risk.process_frame(fr)
 
102
 
103
  # Agent 3 β€” Reflection (self-critique)
104
  reflection = self.reflection.reflect(rr, fr)
105
  if reflection['bias_detected']:
106
  rr.risk_level = reflection['corrected_level']
107
  rr.risk_score = reflection['corrected_score']
 
 
 
 
108
  self.db.save_reflection(reflection)
109
 
110
  # Save risk event every 30 frames
@@ -115,16 +124,9 @@ class RealTimePipeline:
115
  decision = self.operations.process(rr, context='Mecca_Main_Area')
116
 
117
  # Agent 5 β€” Coordinator (all priorities: P0/P1/P2)
118
- # LLM dynamically decides which gates to act on and what actions to take.
119
- plan = None
120
  if decision:
121
- plan = self.coordinator.call(rr, decision, list(self._frame_buffer))
122
- if plan:
123
- # Fill decision with LLM-chosen gates and actions
124
- decision.actions = plan.get('immediate_actions', [])
125
- decision.justification = plan.get('actions_justification', '')
126
- decision.selected_gates = plan.get('selected_gates', [])
127
- self.db.save_coordinator_plan(fr.frame_id, plan)
128
 
129
  # ── Update shared state ───────────────────────────────────────
130
  fps = round(1.0 / (time.time() - t0 + 1e-9), 1)
@@ -148,6 +150,21 @@ class RealTimePipeline:
148
  self.state['latest_decision'] = decision
149
  self.state['decisions_log'] = self.db.get_recent_decisions(10)
150
 
151
- if plan:
152
- self.state['coordinator_plan'] = plan
153
- self.state['arabic_alert'] = plan.get('arabic_alert', '')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  import time
16
  import cv2
17
  import numpy as np
18
+ import threading
19
  from collections import deque
20
 
21
+ import config
22
  from core.database import HajjFlowDB
23
  from agents.perception_agent import PerceptionAgent
24
  from agents.risk_agent import RiskAgent
 
40
  self.db = HajjFlowDB(db_path)
41
 
42
  # ── Instantiate agents ────────────────────────────────────────
43
+ # Cached mode: read pre-computed detections; Live mode: run YOLO
44
+ cached_path = getattr(config, 'CACHED_DETECTIONS_PATH', None)
45
+ self.perception = PerceptionAgent(model_path, anthropic_key=None, cached_path=cached_path)
46
  self.risk = RiskAgent()
47
  self.reflection = ReflectionAgent()
48
  self.operations = OperationsAgent(self.db)
49
  self.coordinator = CoordinatorAgent(groq_api_key)
50
+ self._last_effective_risk_level = 'LOW'
51
 
52
  self._frame_buffer = deque(maxlen=30)
53
 
 
103
 
104
  # Agent 2 β€” Risk scoring
105
  rr = self.risk.process_frame(fr)
106
+ original_level_changed = rr.level_changed
107
 
108
  # Agent 3 β€” Reflection (self-critique)
109
  reflection = self.reflection.reflect(rr, fr)
110
  if reflection['bias_detected']:
111
  rr.risk_level = reflection['corrected_level']
112
  rr.risk_score = reflection['corrected_score']
113
+ # Ensure event-driven decisions use effective risk after reflection.
114
+ effective_level_changed = rr.risk_level != self._last_effective_risk_level
115
+ rr.level_changed = bool(original_level_changed or effective_level_changed)
116
+ self._last_effective_risk_level = rr.risk_level
117
  self.db.save_reflection(reflection)
118
 
119
  # Save risk event every 30 frames
 
124
  decision = self.operations.process(rr, context='Mecca_Main_Area')
125
 
126
  # Agent 5 β€” Coordinator (all priorities: P0/P1/P2)
127
+ # LLM runs in background thread so it doesn't block frame processing.
 
128
  if decision:
129
+ self._launch_coordinator(rr, decision, list(self._frame_buffer))
 
 
 
 
 
 
130
 
131
  # ── Update shared state ───────────────────────────────────────
132
  fps = round(1.0 / (time.time() - t0 + 1e-9), 1)
 
150
  self.state['latest_decision'] = decision
151
  self.state['decisions_log'] = self.db.get_recent_decisions(10)
152
 
153
+ def _launch_coordinator(self, rr, decision, frame_buffer):
154
+ """Run CoordinatorAgent LLM call in a background thread."""
155
+ def _run():
156
+ try:
157
+ plan = self.coordinator.call(rr, decision, frame_buffer)
158
+ if plan:
159
+ decision.actions = plan.get('immediate_actions', [])
160
+ decision.justification = plan.get('actions_justification', '')
161
+ decision.selected_gates = plan.get('selected_gates', [])
162
+ self.db.save_coordinator_plan(rr.frame_id, plan)
163
+ self.state['coordinator_plan'] = plan
164
+ self.state['arabic_alert'] = plan.get('arabic_alert', '')
165
+ self.state['latest_decision'] = decision
166
+ self.state['decisions_log'] = self.db.get_recent_decisions(10)
167
+ except Exception as exc:
168
+ print(f'[Pipeline] Coordinator error: {exc}')
169
+
170
+ threading.Thread(target=_run, daemon=True).start()