Angle-Based + Dynamic Zones

#9
by nishanth-saka - opened
Files changed (1) hide show
  1. app.py +30 -21
app.py CHANGED
@@ -26,26 +26,24 @@ def cluster_by_angle(vectors, n_clusters=2):
26
  if len(vectors) < n_clusters:
27
  return None, None
28
 
29
- # --- Convert to angles (βˆ’180Β° β†’ 180Β°) ---
30
- angles = np.degrees(np.arctan2(vectors[:, 1], vectors[:, 0]))
31
- angles = angles.reshape(-1, 1)
32
 
33
- # --- Run clustering in angle space ---
34
  kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
35
  kmeans.fit(angles)
36
  centers = kmeans.cluster_centers_.flatten()
37
 
38
- # --- Convert centers back to unit direction vectors ---
39
  centers_rad = np.radians(centers)
40
  flow_vectors = np.column_stack((np.cos(centers_rad), np.sin(centers_rad)))
41
 
42
- # --- Ensure flows are sufficiently opposite (auto-flip if needed) ---
43
  if len(flow_vectors) >= 2:
44
  sim = np.dot(flow_vectors[0], flow_vectors[1])
45
- if sim > -0.8: # not opposite enough
46
  flow_vectors[1] = -flow_vectors[0]
47
 
48
- # --- Assign labels based on closest angular distance ---
49
  def angle_distance(a, b):
50
  d = np.abs(a - b)
51
  return np.minimum(d, 360 - d)
@@ -63,7 +61,6 @@ def cluster_by_angle(vectors, n_clusters=2):
63
  # 🧭 3. Estimate Road Angle from Dominant Flow
64
  # ============================================================
65
  def estimate_road_angle(centers):
66
- """Return average flow direction in degrees (0Β° = right)."""
67
  if centers is None or len(centers) == 0:
68
  return 0.0
69
  dominant = np.mean(centers, axis=0)
@@ -76,6 +73,7 @@ def estimate_road_angle(centers):
76
  # ============================================================
77
  def draw_flow_overlay(vectors, labels, centers, bg_img=None,
78
  drive_zone=None, entry_zones=None):
 
79
  if bg_img and os.path.exists(bg_img):
80
  bg = cv2.imread(bg_img)
81
  if bg is None:
@@ -83,24 +81,23 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
83
  else:
84
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
85
 
 
86
  overlay = bg.copy()
87
  colors = [(0, 0, 255), (255, 255, 0)]
88
 
 
89
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
90
  vectors = np.divide(vectors, norms + 1e-6) * 10
91
-
92
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
93
  if i % 15 != 0:
94
  continue
95
- start = (np.random.randint(0, overlay.shape[1]),
96
- np.random.randint(0, overlay.shape[0]))
97
  end = (int(start[0] + vx), int(start[1] + vy))
98
  cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3)
99
 
100
- h, w = overlay.shape[:2]
101
  scale = 300
102
  center_pt = (w // 2, h // 2)
103
-
104
  for i, c in enumerate(centers):
105
  c = c / (np.linalg.norm(c) + 1e-6)
106
  end = (int(center_pt[0] + c[0] * scale),
@@ -111,6 +108,7 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
111
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
112
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
113
 
 
114
  if drive_zone is not None:
115
  cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2)
116
  cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)),
@@ -128,7 +126,7 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
128
 
129
 
130
  # ============================================================
131
- # πŸš€ 5. Combined Pipeline
132
  # ============================================================
133
  def process_json(json_file, background=None):
134
  try:
@@ -146,8 +144,19 @@ def process_json(json_file, background=None):
146
 
147
  road_angle = estimate_road_angle(centers)
148
 
149
- drive_zone = [[100, 100], [800, 100], [800, 500], [100, 500]]
150
- entry_zones = [[[50, 100], [100, 100], [100, 500], [50, 500]]]
 
 
 
 
 
 
 
 
 
 
 
151
 
152
  img_path = draw_flow_overlay(vectors, labels, centers,
153
  background, drive_zone, entry_zones)
@@ -167,9 +176,9 @@ def process_json(json_file, background=None):
167
  # πŸ–₯️ 6. Gradio Interface
168
  # ============================================================
169
  description_text = """
170
- ### 🧭 Dominant Flow Learning (Stage 2 β€” Angle-Based)
171
- Clusters vehicle motion **by direction angle** on a circular scale,
172
- giving cleaner opposite flows even on curved or diagonal roads.
173
  """
174
 
175
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
@@ -185,7 +194,7 @@ demo = gr.Interface(
185
  gr.Image(label="Dominant Flow Overlay"),
186
  gr.JSON(label="Flow Stats (Stage 2 Output)")
187
  ],
188
- title="πŸš— Dominant Flow Learning – Stage 2 (Angle-Based)",
189
  description=description_text,
190
  examples=[[example_json, example_bg]] if example_json else None,
191
  )
 
26
  if len(vectors) < n_clusters:
27
  return None, None
28
 
29
+ # Convert to angles (βˆ’180Β° β†’ 180Β°)
30
+ angles = np.degrees(np.arctan2(vectors[:, 1], vectors[:, 0])).reshape(-1, 1)
 
31
 
 
32
  kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
33
  kmeans.fit(angles)
34
  centers = kmeans.cluster_centers_.flatten()
35
 
36
+ # Convert cluster centers β†’ unit vectors
37
  centers_rad = np.radians(centers)
38
  flow_vectors = np.column_stack((np.cos(centers_rad), np.sin(centers_rad)))
39
 
40
+ # Ensure flows are opposite (auto-flip if needed)
41
  if len(flow_vectors) >= 2:
42
  sim = np.dot(flow_vectors[0], flow_vectors[1])
43
+ if sim > -0.8:
44
  flow_vectors[1] = -flow_vectors[0]
45
 
46
+ # Assign labels by smallest angular distance
47
  def angle_distance(a, b):
48
  d = np.abs(a - b)
49
  return np.minimum(d, 360 - d)
 
61
  # 🧭 3. Estimate Road Angle from Dominant Flow
62
  # ============================================================
63
  def estimate_road_angle(centers):
 
64
  if centers is None or len(centers) == 0:
65
  return 0.0
66
  dominant = np.mean(centers, axis=0)
 
73
  # ============================================================
74
  def draw_flow_overlay(vectors, labels, centers, bg_img=None,
75
  drive_zone=None, entry_zones=None):
76
+ # Load background or fallback canvas
77
  if bg_img and os.path.exists(bg_img):
78
  bg = cv2.imread(bg_img)
79
  if bg is None:
 
81
  else:
82
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
83
 
84
+ h, w = bg.shape[:2]
85
  overlay = bg.copy()
86
  colors = [(0, 0, 255), (255, 255, 0)]
87
 
88
+ # Draw sample motion vectors
89
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
90
  vectors = np.divide(vectors, norms + 1e-6) * 10
 
91
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
92
  if i % 15 != 0:
93
  continue
94
+ start = (np.random.randint(0, w), np.random.randint(0, h))
 
95
  end = (int(start[0] + vx), int(start[1] + vy))
96
  cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3)
97
 
98
+ # Draw dominant flow arrows
99
  scale = 300
100
  center_pt = (w // 2, h // 2)
 
101
  for i, c in enumerate(centers):
102
  c = c / (np.linalg.norm(c) + 1e-6)
103
  end = (int(center_pt[0] + c[0] * scale),
 
108
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
109
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
110
 
111
+ # Draw zones if provided
112
  if drive_zone is not None:
113
  cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2)
114
  cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)),
 
126
 
127
 
128
  # ============================================================
129
+ # πŸš€ 5. Combined Pipeline (With Dynamic Zones)
130
  # ============================================================
131
  def process_json(json_file, background=None):
132
  try:
 
144
 
145
  road_angle = estimate_road_angle(centers)
146
 
147
+ # --- determine frame size for zones ---
148
+ if background and os.path.exists(background):
149
+ bg_img = cv2.imread(background)
150
+ h, w = bg_img.shape[:2]
151
+ else:
152
+ # fallback for unknown resolution
153
+ w, h = 1280, 720
154
+
155
+ # --- dynamic zones based on frame width ---
156
+ drive_zone = [[100, 100], [w - 100, 100], [w - 100, 500], [100, 500]]
157
+ entry_zones = [
158
+ [[w - 100, 100], [w - 50, 100], [w - 50, 500], [w - 100, 500]] # right-edge entry
159
+ ]
160
 
161
  img_path = draw_flow_overlay(vectors, labels, centers,
162
  background, drive_zone, entry_zones)
 
176
  # πŸ–₯️ 6. Gradio Interface
177
  # ============================================================
178
  description_text = """
179
+ ### 🧭 Dominant Flow Learning (Stage 2 β€” Angle-Based + Dynamic Zones)
180
+ Clusters vehicle motion **by direction angle** on a circular scale
181
+ and automatically places the Drive Zone and Entry Zone using frame width.
182
  """
183
 
184
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
 
194
  gr.Image(label="Dominant Flow Overlay"),
195
  gr.JSON(label="Flow Stats (Stage 2 Output)")
196
  ],
197
+ title="πŸš— Dominant Flow Learning – Stage 2 (Dynamic Zones)",
198
  description=description_text,
199
  examples=[[example_json, example_bg]] if example_json else None,
200
  )