Stable + Confidence + Hysteresis

#16
by nishanth-saka - opened
Files changed (1) hide show
  1. app.py +29 -19
app.py CHANGED
@@ -1,5 +1,5 @@
1
  # ============================================================
2
- # 🚦 Stage 3 β€” Wrong Direction Detection (Improved + Stable + Confidence)
3
  # ============================================================
4
 
5
  import os, cv2, json, tempfile, numpy as np, gradio as gr
@@ -32,8 +32,9 @@ class Track:
32
  self.history = []
33
  self.frames_seen = 0
34
  self.status = "OK"
35
- self.status_history = [] # πŸ”Έ maintain recent decision history
36
  self.confidence = 1.0
 
37
 
38
  def update(self, bbox):
39
  self.kf.predict()
@@ -45,12 +46,11 @@ class Track:
45
  self.frames_seen += 1
46
  return [x, y]
47
 
48
- def stable_status(self, new_status, new_conf, window=6, agree_ratio=0.7):
49
  """Debounce flicker using recent window consensus."""
50
  self.status_history.append(new_status)
51
  if len(self.status_history) > window:
52
  self.status_history.pop(0)
53
- # majority consensus
54
  if self.status_history.count(new_status) >= int(agree_ratio * len(self.status_history)):
55
  self.status = new_status
56
  self.confidence = new_conf
@@ -58,7 +58,7 @@ class Track:
58
 
59
 
60
  # ============================================================
61
- # βš™οΈ Utilities
62
  # ============================================================
63
  def compute_cosine_similarity(v1, v2):
64
  v1 = v1 / (np.linalg.norm(v1) + 1e-6)
@@ -91,9 +91,12 @@ def process_video(video_file, stage2_json, show_only_wrong=False):
91
  out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
92
 
93
  tracks, next_id = {}, 0
94
- SIM_THRESH = 0.5 # cosine similarity threshold
95
- DELAY_FRAMES = 8 # wait N frames before flagging
96
- MIN_FLOW_SPEED = 1.2 # ignore jitter
 
 
 
97
 
98
  while True:
99
  ret, frame = cap.read()
@@ -140,15 +143,23 @@ def process_video(video_file, stage2_json, show_only_wrong=False):
140
  if np.linalg.norm(motion) < MIN_FLOW_SPEED:
141
  continue
142
 
143
- # cosine similarity vs lane flows
144
  sims = [compute_cosine_similarity(motion, f) for f in lane_flows]
145
  best_sim = max(sims)
146
 
147
  if trk.frames_seen > DELAY_FRAMES:
148
- new_status = "WRONG" if best_sim < SIM_THRESH else "OK"
149
- trk.stable_status(new_status, new_conf=best_sim)
 
 
 
 
 
 
 
 
 
 
150
 
151
- # draw if toggle allows
152
  if (not show_only_wrong) or (trk.status == "WRONG"):
153
  color = (0, 0, 255) if trk.status == "WRONG" else (0, 255, 0)
154
  label = f"ID:{tid} {trk.status} ({trk.confidence:.2f})"
@@ -165,13 +176,12 @@ def process_video(video_file, stage2_json, show_only_wrong=False):
165
  # πŸŽ›οΈ Gradio Interface
166
  # ============================================================
167
  description = """
168
- ### 🚦 Stage 3 β€” Wrong Direction Detection (Improved + Stable + Confidence)
169
- - βœ… Cosine similarity instead of raw angle
170
- - βœ… Lane-wise flow support for curved roads
171
- - βœ… Temporal smoothing & delayed classification
172
- - βœ… Flicker-free labels using consensus window
173
- - βœ… Confidence score display (cosine similarity)
174
- - βœ… Toggle to show only WRONG overlays
175
  """
176
 
177
  demo = gr.Interface(
 
1
  # ============================================================
2
+ # 🚦 Stage 3 β€” Wrong Direction Detection (Stable + Confidence + Hysteresis)
3
  # ============================================================
4
 
5
  import os, cv2, json, tempfile, numpy as np, gradio as gr
 
32
  self.history = []
33
  self.frames_seen = 0
34
  self.status = "OK"
35
+ self.status_history = []
36
  self.confidence = 1.0
37
+ self.ema_sim = 1.0 # for exponential smoothing
38
 
39
  def update(self, bbox):
40
  self.kf.predict()
 
46
  self.frames_seen += 1
47
  return [x, y]
48
 
49
+ def stable_status(self, new_status, new_conf, window=10, agree_ratio=0.6):
50
  """Debounce flicker using recent window consensus."""
51
  self.status_history.append(new_status)
52
  if len(self.status_history) > window:
53
  self.status_history.pop(0)
 
54
  if self.status_history.count(new_status) >= int(agree_ratio * len(self.status_history)):
55
  self.status = new_status
56
  self.confidence = new_conf
 
58
 
59
 
60
  # ============================================================
61
+ # βš™οΈ Utility Functions
62
  # ============================================================
63
  def compute_cosine_similarity(v1, v2):
64
  v1 = v1 / (np.linalg.norm(v1) + 1e-6)
 
91
  out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
92
 
93
  tracks, next_id = {}, 0
94
+ SIM_THRESH = 0.5 # base reference
95
+ DELAY_FRAMES = 8
96
+ MIN_FLOW_SPEED = 1.2
97
+ HYST_OK = 0.55
98
+ HYST_WRONG = 0.45
99
+ ALPHA = 0.6 # exponential smoothing weight
100
 
101
  while True:
102
  ret, frame = cap.read()
 
143
  if np.linalg.norm(motion) < MIN_FLOW_SPEED:
144
  continue
145
 
 
146
  sims = [compute_cosine_similarity(motion, f) for f in lane_flows]
147
  best_sim = max(sims)
148
 
149
  if trk.frames_seen > DELAY_FRAMES:
150
+ # Exponential moving average
151
+ trk.ema_sim = ALPHA * best_sim + (1 - ALPHA) * getattr(trk, "ema_sim", best_sim)
152
+
153
+ # Hysteresis classification
154
+ if trk.ema_sim >= HYST_OK:
155
+ new_status = "OK"
156
+ elif trk.ema_sim <= HYST_WRONG:
157
+ new_status = "WRONG"
158
+ else:
159
+ new_status = trk.status # hold previous label
160
+
161
+ trk.stable_status(new_status, new_conf=trk.ema_sim, window=10, agree_ratio=0.6)
162
 
 
163
  if (not show_only_wrong) or (trk.status == "WRONG"):
164
  color = (0, 0, 255) if trk.status == "WRONG" else (0, 255, 0)
165
  label = f"ID:{tid} {trk.status} ({trk.confidence:.2f})"
 
176
  # πŸŽ›οΈ Gradio Interface
177
  # ============================================================
178
  description = """
179
+ ### 🚦 Stage 3 β€” Wrong Direction Detection (Stable + Confidence + Hysteresis)
180
+ - βœ… Cosine similarity with exponential smoothing
181
+ - βœ… Hysteresis (OKβ‰₯0.55 / WRONG≀0.45) for stability
182
+ - βœ… 10-frame consensus voting (flicker-free)
183
+ - βœ… Confidence score beside each ID
184
+ - βœ… Optional β€œShow Only Wrong Labels” toggle
 
185
  """
186
 
187
  demo = gr.Interface(