Tuned for Angle-Based Flows

#18
Files changed (1) hide show
  1. app.py +31 -26
app.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🚦 Stage 3 — Wrong Direction Detection (Stable + Confidence + Hysteresis + Filter)
3
  # ============================================================
4
 
5
  import os, cv2, json, tempfile, numpy as np, gradio as gr
@@ -46,7 +46,7 @@ class Track:
46
  self.frames_seen += 1
47
  return [x, y]
48
 
49
- def stable_status(self, new_status, new_conf, window=10, agree_ratio=0.6):
50
  """Debounce flicker using recent window consensus."""
51
  self.status_history.append(new_status)
52
  if len(self.status_history) > window:
@@ -75,6 +75,7 @@ def smooth_direction(points, window=5):
75
  return None
76
  return avg_vec
77
 
 
78
  # ============================================================
79
  # 🧭 Wrong-Direction Detection Core
80
  # ============================================================
@@ -92,10 +93,10 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
92
 
93
  tracks, next_id = {}, 0
94
  DELAY_FRAMES = 8
95
- MIN_FLOW_SPEED = 1.2
96
- HYST_OK = 0.55
97
- HYST_WRONG = 0.45
98
- ALPHA = 0.6 # exponential smoothing weight
99
 
100
  while True:
101
  ret, frame = cap.read()
@@ -112,7 +113,7 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
112
  dets.append([cx, cy])
113
  dets = np.array(dets)
114
 
115
- # --- Tracker update ---
116
  assigned = set()
117
  if len(dets) > 0 and len(tracks) > 0:
118
  existing = np.array([t.kf.x[:2].reshape(-1) for t in tracks.values()])
@@ -123,6 +124,7 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
123
  tid = list(tracks.keys())[r]
124
  tracks[tid].update(dets[c])
125
  assigned.add(c)
 
126
  for i, d in enumerate(dets):
127
  if i not in assigned:
128
  tracks[next_id] = Track(d, next_id)
@@ -134,32 +136,29 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
134
  pts = np.array(trk.history)
135
  if len(pts) > 1:
136
  for i in range(1, len(pts)):
137
- cv2.line(frame, tuple(np.int32(pts[i-1])), tuple(np.int32(pts[i])), (0, 0, 255), 1)
138
 
139
  motion = smooth_direction(pts)
140
- if motion is None:
141
- continue
142
- if np.linalg.norm(motion) < MIN_FLOW_SPEED:
143
  continue
144
 
145
  sims = [compute_cosine_similarity(motion, f) for f in lane_flows]
146
  best_sim = max(sims)
147
 
148
  if trk.frames_seen > DELAY_FRAMES:
149
- # Exponential moving average
150
  trk.ema_sim = ALPHA * best_sim + (1 - ALPHA) * getattr(trk, "ema_sim", best_sim)
151
 
152
- # Hysteresis classification
153
  if trk.ema_sim >= HYST_OK:
154
  new_status = "OK"
155
  elif trk.ema_sim <= HYST_WRONG:
156
  new_status = "WRONG"
157
  else:
158
- new_status = trk.status # hold previous label
159
 
160
- trk.stable_status(new_status, new_conf=trk.ema_sim, window=10, agree_ratio=0.6)
161
 
162
- # --- Filter by UI controls ---
163
  show_label = True
164
  if trk.confidence < conf_threshold:
165
  show_label = False
@@ -167,7 +166,13 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
167
  show_label = False
168
 
169
  if show_label:
170
- color = (0, 0, 255) if trk.status == "WRONG" else (0, 255, 0)
 
 
 
 
 
 
171
  label = f"ID:{tid} {trk.status} ({trk.confidence:.2f})"
172
  cv2.putText(frame, label, tuple(np.int32(pos)),
173
  cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
@@ -178,16 +183,16 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
178
  out.release()
179
  return out_path
180
 
 
181
  # ============================================================
182
  # 🎛️ Gradio Interface
183
  # ============================================================
184
  description = """
185
- ### 🚦 Stage 3 — Wrong Direction Detection (Stable + Confidence + Filter)
186
- - ✅ Cosine similarity with exponential smoothing
187
- - ✅ Hysteresis (OK≥0.55 / WRONG≤0.45) for stability
188
- - ✅ 10-frame consensus voting (flicker-free)
189
- - ✅ Confidence-based label filtering
190
- - ✅ “Show Only Wrong” toggle
191
  """
192
 
193
  demo = gr.Interface(
@@ -195,11 +200,11 @@ demo = gr.Interface(
195
  inputs=[
196
  gr.File(label="Input Video"),
197
  gr.File(label="Stage 2 Flow JSON"),
198
- gr.Checkbox(label="Show ONLY Wrong Labels Overlay", value=False),
199
- gr.Slider(0.0, 1.0, value=0.0, step=0.05, label="Confidence Level Filter (Show ≥ this value)")
200
  ],
201
  outputs=gr.Video(label="Output Video"),
202
- title="🚗 Stage 3 – Stable Wrong-Direction Detection (with Confidence Filter)",
203
  description=description
204
  )
205
 
 
1
  # ============================================================
2
+ # 🚦 Stage 3 — Wrong Direction Detection (Tuned + Confidence + Stability)
3
  # ============================================================
4
 
5
  import os, cv2, json, tempfile, numpy as np, gradio as gr
 
46
  self.frames_seen += 1
47
  return [x, y]
48
 
49
+ def stable_status(self, new_status, new_conf, window=15, agree_ratio=0.7):
50
  """Debounce flicker using recent window consensus."""
51
  self.status_history.append(new_status)
52
  if len(self.status_history) > window:
 
75
  return None
76
  return avg_vec
77
 
78
+
79
  # ============================================================
80
  # 🧭 Wrong-Direction Detection Core
81
  # ============================================================
 
93
 
94
  tracks, next_id = {}, 0
95
  DELAY_FRAMES = 8
96
+ MIN_FLOW_SPEED = 1.8
97
+ HYST_OK = 0.65
98
+ HYST_WRONG = 0.35
99
+ ALPHA = 0.75
100
 
101
  while True:
102
  ret, frame = cap.read()
 
113
  dets.append([cx, cy])
114
  dets = np.array(dets)
115
 
116
+ # --- Tracker association ---
117
  assigned = set()
118
  if len(dets) > 0 and len(tracks) > 0:
119
  existing = np.array([t.kf.x[:2].reshape(-1) for t in tracks.values()])
 
124
  tid = list(tracks.keys())[r]
125
  tracks[tid].update(dets[c])
126
  assigned.add(c)
127
+
128
  for i, d in enumerate(dets):
129
  if i not in assigned:
130
  tracks[next_id] = Track(d, next_id)
 
136
  pts = np.array(trk.history)
137
  if len(pts) > 1:
138
  for i in range(1, len(pts)):
139
+ cv2.line(frame, tuple(np.int32(pts[i-1])), tuple(np.int32(pts[i])), (100, 100, 255), 1)
140
 
141
  motion = smooth_direction(pts)
142
+ if motion is None or np.linalg.norm(motion) < MIN_FLOW_SPEED:
 
 
143
  continue
144
 
145
  sims = [compute_cosine_similarity(motion, f) for f in lane_flows]
146
  best_sim = max(sims)
147
 
148
  if trk.frames_seen > DELAY_FRAMES:
 
149
  trk.ema_sim = ALPHA * best_sim + (1 - ALPHA) * getattr(trk, "ema_sim", best_sim)
150
 
151
+ # Hysteresis decision
152
  if trk.ema_sim >= HYST_OK:
153
  new_status = "OK"
154
  elif trk.ema_sim <= HYST_WRONG:
155
  new_status = "WRONG"
156
  else:
157
+ new_status = trk.status
158
 
159
+ trk.stable_status(new_status, new_conf=trk.ema_sim, window=15, agree_ratio=0.7)
160
 
161
+ # --- Confidence-based label filtering ---
162
  show_label = True
163
  if trk.confidence < conf_threshold:
164
  show_label = False
 
166
  show_label = False
167
 
168
  if show_label:
169
+ if trk.status == "WRONG":
170
+ color = (0, 0, 255)
171
+ elif trk.confidence < 0.5:
172
+ color = (0, 255, 255)
173
+ else:
174
+ color = (0, 255, 0)
175
+
176
  label = f"ID:{tid} {trk.status} ({trk.confidence:.2f})"
177
  cv2.putText(frame, label, tuple(np.int32(pos)),
178
  cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
 
183
  out.release()
184
  return out_path
185
 
186
+
187
  # ============================================================
188
  # 🎛️ Gradio Interface
189
  # ============================================================
190
  description = """
191
+ ### 🚦 Stage 3 — Wrong Direction Detection (Tuned for Angle-Based Flows)
192
+ - ✅ Hysteresis: OK 0.65 / WRONG ≤ 0.35
193
+ - ✅ EMA smoothing α = 0.75
194
+ - ✅ 15-frame consensus for flicker-free labeling
195
+ - ✅ Confidence color coding (Green→OK, Yellow→Borderline, Red→Wrong)
 
196
  """
197
 
198
  demo = gr.Interface(
 
200
  inputs=[
201
  gr.File(label="Input Video"),
202
  gr.File(label="Stage 2 Flow JSON"),
203
+ gr.Checkbox(label="Show ONLY Wrong Labels", value=False),
204
+ gr.Slider(0.0, 1.0, value=0.0, step=0.05, label="Confidence Filter (Show ≥ this value)")
205
  ],
206
  outputs=gr.Video(label="Output Video"),
207
+ title="🚗 Stage 3 – Tuned Wrong-Direction Detection (Confidence + Stability)",
208
  description=description
209
  )
210