Files changed (1) hide show
  1. app.py +27 -32
app.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🚦 Stage 3 — Wrong Direction Detection (Tuned + Confidence + Stability)
3
  # ============================================================
4
 
5
  import os, cv2, json, tempfile, numpy as np, gradio as gr
@@ -46,7 +46,7 @@ class Track:
46
  self.frames_seen += 1
47
  return [x, y]
48
 
49
- def stable_status(self, new_status, new_conf, window=15, agree_ratio=0.7):
50
  """Debounce flicker using recent window consensus."""
51
  self.status_history.append(new_status)
52
  if len(self.status_history) > window:
@@ -75,7 +75,6 @@ def smooth_direction(points, window=5):
75
  return None
76
  return avg_vec
77
 
78
-
79
  # ============================================================
80
  # 🧭 Wrong-Direction Detection Core
81
  # ============================================================
@@ -93,10 +92,10 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
93
 
94
  tracks, next_id = {}, 0
95
  DELAY_FRAMES = 8
96
- MIN_FLOW_SPEED = 1.8
97
- HYST_OK = 0.65
98
- HYST_WRONG = 0.35
99
- ALPHA = 0.75
100
 
101
  while True:
102
  ret, frame = cap.read()
@@ -113,7 +112,7 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
113
  dets.append([cx, cy])
114
  dets = np.array(dets)
115
 
116
- # --- Tracker association ---
117
  assigned = set()
118
  if len(dets) > 0 and len(tracks) > 0:
119
  existing = np.array([t.kf.x[:2].reshape(-1) for t in tracks.values()])
@@ -124,7 +123,6 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
124
  tid = list(tracks.keys())[r]
125
  tracks[tid].update(dets[c])
126
  assigned.add(c)
127
-
128
  for i, d in enumerate(dets):
129
  if i not in assigned:
130
  tracks[next_id] = Track(d, next_id)
@@ -136,29 +134,32 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
136
  pts = np.array(trk.history)
137
  if len(pts) > 1:
138
  for i in range(1, len(pts)):
139
- cv2.line(frame, tuple(np.int32(pts[i-1])), tuple(np.int32(pts[i])), (100, 100, 255), 1)
140
 
141
  motion = smooth_direction(pts)
142
- if motion is None or np.linalg.norm(motion) < MIN_FLOW_SPEED:
 
 
143
  continue
144
 
145
  sims = [compute_cosine_similarity(motion, f) for f in lane_flows]
146
  best_sim = max(sims)
147
 
148
  if trk.frames_seen > DELAY_FRAMES:
 
149
  trk.ema_sim = ALPHA * best_sim + (1 - ALPHA) * getattr(trk, "ema_sim", best_sim)
150
 
151
- # Hysteresis decision
152
  if trk.ema_sim >= HYST_OK:
153
  new_status = "OK"
154
  elif trk.ema_sim <= HYST_WRONG:
155
  new_status = "WRONG"
156
  else:
157
- new_status = trk.status
158
 
159
- trk.stable_status(new_status, new_conf=trk.ema_sim, window=15, agree_ratio=0.7)
160
 
161
- # --- Confidence-based label filtering ---
162
  show_label = True
163
  if trk.confidence < conf_threshold:
164
  show_label = False
@@ -166,13 +167,7 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
166
  show_label = False
167
 
168
  if show_label:
169
- if trk.status == "WRONG":
170
- color = (0, 0, 255)
171
- elif trk.confidence < 0.5:
172
- color = (0, 255, 255)
173
- else:
174
- color = (0, 255, 0)
175
-
176
  label = f"ID:{tid} {trk.status} ({trk.confidence:.2f})"
177
  cv2.putText(frame, label, tuple(np.int32(pos)),
178
  cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
@@ -183,16 +178,16 @@ def process_video(video_file, stage2_json, show_only_wrong=False, conf_threshold
183
  out.release()
184
  return out_path
185
 
186
-
187
  # ============================================================
188
  # 🎛️ Gradio Interface
189
  # ============================================================
190
  description = """
191
- ### 🚦 Stage 3 — Wrong Direction Detection (Tuned for Angle-Based Flows)
192
- - ✅ Hysteresis: OK 0.65 / WRONG ≤ 0.35
193
- - ✅ EMA smoothing α = 0.75
194
- - ✅ 15-frame consensus for flicker-free labeling
195
- - ✅ Confidence color coding (Green→OK, Yellow→Borderline, Red→Wrong)
 
196
  """
197
 
198
  demo = gr.Interface(
@@ -200,13 +195,13 @@ demo = gr.Interface(
200
  inputs=[
201
  gr.File(label="Input Video"),
202
  gr.File(label="Stage 2 Flow JSON"),
203
- gr.Checkbox(label="Show ONLY Wrong Labels", value=False),
204
- gr.Slider(0.0, 1.0, value=0.0, step=0.05, label="Confidence Filter (Show ≥ this value)")
205
  ],
206
  outputs=gr.Video(label="Output Video"),
207
- title="🚗 Stage 3 – Tuned Wrong-Direction Detection (Confidence + Stability)",
208
  description=description
209
  )
210
 
211
  if __name__ == "__main__":
212
- demo.launch()
 
1
  # ============================================================
2
+ # 🚦 Stage 3 — Wrong Direction Detection (Stable + Confidence + Hysteresis + Filter)
3
  # ============================================================
4
 
5
  import os, cv2, json, tempfile, numpy as np, gradio as gr
 
46
  self.frames_seen += 1
47
  return [x, y]
48
 
49
+ def stable_status(self, new_status, new_conf, window=10, agree_ratio=0.6):
50
  """Debounce flicker using recent window consensus."""
51
  self.status_history.append(new_status)
52
  if len(self.status_history) > window:
 
75
  return None
76
  return avg_vec
77
 
 
78
  # ============================================================
79
  # 🧭 Wrong-Direction Detection Core
80
  # ============================================================
 
92
 
93
  tracks, next_id = {}, 0
94
  DELAY_FRAMES = 8
95
+ MIN_FLOW_SPEED = 1.2
96
+ HYST_OK = 0.55
97
+ HYST_WRONG = 0.45
98
+ ALPHA = 0.6 # exponential smoothing weight
99
 
100
  while True:
101
  ret, frame = cap.read()
 
112
  dets.append([cx, cy])
113
  dets = np.array(dets)
114
 
115
+ # --- Tracker update ---
116
  assigned = set()
117
  if len(dets) > 0 and len(tracks) > 0:
118
  existing = np.array([t.kf.x[:2].reshape(-1) for t in tracks.values()])
 
123
  tid = list(tracks.keys())[r]
124
  tracks[tid].update(dets[c])
125
  assigned.add(c)
 
126
  for i, d in enumerate(dets):
127
  if i not in assigned:
128
  tracks[next_id] = Track(d, next_id)
 
134
  pts = np.array(trk.history)
135
  if len(pts) > 1:
136
  for i in range(1, len(pts)):
137
+ cv2.line(frame, tuple(np.int32(pts[i-1])), tuple(np.int32(pts[i])), (0, 0, 255), 1)
138
 
139
  motion = smooth_direction(pts)
140
+ if motion is None:
141
+ continue
142
+ if np.linalg.norm(motion) < MIN_FLOW_SPEED:
143
  continue
144
 
145
  sims = [compute_cosine_similarity(motion, f) for f in lane_flows]
146
  best_sim = max(sims)
147
 
148
  if trk.frames_seen > DELAY_FRAMES:
149
+ # Exponential moving average
150
  trk.ema_sim = ALPHA * best_sim + (1 - ALPHA) * getattr(trk, "ema_sim", best_sim)
151
 
152
+ # Hysteresis classification
153
  if trk.ema_sim >= HYST_OK:
154
  new_status = "OK"
155
  elif trk.ema_sim <= HYST_WRONG:
156
  new_status = "WRONG"
157
  else:
158
+ new_status = trk.status # hold previous label
159
 
160
+ trk.stable_status(new_status, new_conf=trk.ema_sim, window=10, agree_ratio=0.6)
161
 
162
+ # --- Filter by UI controls ---
163
  show_label = True
164
  if trk.confidence < conf_threshold:
165
  show_label = False
 
167
  show_label = False
168
 
169
  if show_label:
170
+ color = (0, 0, 255) if trk.status == "WRONG" else (0, 255, 0)
 
 
 
 
 
 
171
  label = f"ID:{tid} {trk.status} ({trk.confidence:.2f})"
172
  cv2.putText(frame, label, tuple(np.int32(pos)),
173
  cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
 
178
  out.release()
179
  return out_path
180
 
 
181
  # ============================================================
182
  # 🎛️ Gradio Interface
183
  # ============================================================
184
  description = """
185
+ ### 🚦 Stage 3 — Wrong Direction Detection (Stable + Confidence + Filter)
186
+ - ✅ Cosine similarity with exponential smoothing
187
+ - ✅ Hysteresis (OK≥0.55 / WRONG≤0.45) for stability
188
+ - ✅ 10-frame consensus voting (flicker-free)
189
+ - ✅ Confidence-based label filtering
190
+ - ✅ “Show Only Wrong” toggle
191
  """
192
 
193
  demo = gr.Interface(
 
195
  inputs=[
196
  gr.File(label="Input Video"),
197
  gr.File(label="Stage 2 Flow JSON"),
198
+ gr.Checkbox(label="Show ONLY Wrong Labels Overlay", value=False),
199
+ gr.Slider(0.0, 1.0, value=0.0, step=0.05, label="Confidence Level Filter (Show ≥ this value)")
200
  ],
201
  outputs=gr.Video(label="Output Video"),
202
+ title="🚗 Stage 3 – Stable Wrong-Direction Detection (with Confidence Filter)",
203
  description=description
204
  )
205
 
206
  if __name__ == "__main__":
207
+ demo.launch()