Angle-Based Flow Learning

#8
Files changed (1) hide show
  1. app.py +40 -43
app.py CHANGED
@@ -19,48 +19,51 @@ def extract_motion_vectors(data):
19
 
20
 
21
  # ============================================================
22
- # 🧮 2. Dominant Flow Clustering (Cosine-based + Auto-Fix)
23
  # ============================================================
24
- def learn_flows_improved(vectors, n_clusters=2):
25
- """Cosine-based clustering of normalized motion directions."""
26
  if len(vectors) < n_clusters:
27
  return None, None
28
 
29
- # --- Normalize & filter weak motions ---
30
- norms = np.linalg.norm(vectors, axis=1, keepdims=True)
31
- dirs = vectors / (norms + 1e-6)
32
- valid = (norms[:, 0] > 1.5)
33
- dirs = dirs[valid]
34
- if len(dirs) < n_clusters:
35
- return None, None
36
 
37
- # --- Cluster ---
38
  kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
39
- kmeans.fit(dirs)
40
- centers = kmeans.cluster_centers_
41
- centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
42
-
43
- # --- ✅ Auto-fix: ensure at least one pair of flows are truly opposite ---
44
- if len(centers) >= 2:
45
- sim = np.dot(centers[0], centers[1])
46
- if sim > -0.8:
47
- # If not opposite enough, flip the smaller-magnitude cluster
48
- if np.linalg.norm(centers[0]) < np.linalg.norm(centers[1]):
49
- centers[0] = -centers[0]
50
- else:
51
- centers[1] = -centers[1]
52
-
53
- # --- Assign labels ---
54
- sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T)
55
- labels = np.argmax(sims, axis=1)
56
- return labels, centers
 
 
 
 
 
 
 
57
 
58
 
59
  # ============================================================
60
  # 🧭 3. Estimate Road Angle from Dominant Flow
61
  # ============================================================
62
  def estimate_road_angle(centers):
63
- """Return average flow direction in degrees (0° = horizontal right)."""
64
  if centers is None or len(centers) == 0:
65
  return 0.0
66
  dominant = np.mean(centers, axis=0)
@@ -81,12 +84,11 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
81
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
82
 
83
  overlay = bg.copy()
84
- colors = [(0, 0, 255), (255, 255, 0), (0, 255, 255), (255, 0, 255)]
85
 
86
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
87
  vectors = np.divide(vectors, norms + 1e-6) * 10
88
 
89
- # --- Sampled arrows for visual flow density ---
90
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
91
  if i % 15 != 0:
92
  continue
@@ -95,7 +97,6 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
95
  end = (int(start[0] + vx), int(start[1] + vy))
96
  cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3)
97
 
98
- # --- Draw dominant flow arrows ---
99
  h, w = overlay.shape[:2]
100
  scale = 300
101
  center_pt = (w // 2, h // 2)
@@ -110,7 +111,6 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
110
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
111
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
112
 
113
- # --- Optional zones overlay ---
114
  if drive_zone is not None:
115
  cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2)
116
  cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)),
@@ -140,17 +140,14 @@ def process_json(json_file, background=None):
140
  if len(vectors) == 0:
141
  return None, {"error": "No motion vectors found."}
142
 
143
- # --- Use 2 clusters normally, can bump to 3 if road has multiple flows ---
144
- labels, centers = learn_flows_improved(vectors, n_clusters=2)
145
  if labels is None:
146
  return None, {"error": "Insufficient data for clustering."}
147
 
148
  road_angle = estimate_road_angle(centers)
149
 
150
  drive_zone = [[100, 100], [800, 100], [800, 500], [100, 500]]
151
- entry_zones = [
152
- [[50, 100], [100, 100], [100, 500], [50, 500]]
153
- ]
154
 
155
  img_path = draw_flow_overlay(vectors, labels, centers,
156
  background, drive_zone, entry_zones)
@@ -170,9 +167,9 @@ def process_json(json_file, background=None):
170
  # 🖥️ 6. Gradio Interface
171
  # ============================================================
172
  description_text = """
173
- ### 🧭 Dominant Flow Learning (Stage 2 — Angle + Zone-Aware + Auto-Fix)
174
- Uploads the **trajectories JSON** from Stage 1 and optionally a background frame.
175
- Outputs dominant flow directions (auto-corrected if not opposite), estimated road angle, and zone polygons for Stage 3.
176
  """
177
 
178
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
@@ -188,7 +185,7 @@ demo = gr.Interface(
188
  gr.Image(label="Dominant Flow Overlay"),
189
  gr.JSON(label="Flow Stats (Stage 2 Output)")
190
  ],
191
- title="🚗 Dominant Flow Learning – Stage 2 (Auto-Fix for Opposite Flows)",
192
  description=description_text,
193
  examples=[[example_json, example_bg]] if example_json else None,
194
  )
 
19
 
20
 
21
  # ============================================================
22
+ # 🧮 2. Direction-Specific (Angle-Based) Clustering
23
  # ============================================================
24
+ def cluster_by_angle(vectors, n_clusters=2):
25
+ """Cluster motion directions using circular (angle-space) logic."""
26
  if len(vectors) < n_clusters:
27
  return None, None
28
 
29
+ # --- Convert to angles (−180° 180°) ---
30
+ angles = np.degrees(np.arctan2(vectors[:, 1], vectors[:, 0]))
31
+ angles = angles.reshape(-1, 1)
 
 
 
 
32
 
33
+ # --- Run clustering in angle space ---
34
  kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
35
+ kmeans.fit(angles)
36
+ centers = kmeans.cluster_centers_.flatten()
37
+
38
+ # --- Convert centers back to unit direction vectors ---
39
+ centers_rad = np.radians(centers)
40
+ flow_vectors = np.column_stack((np.cos(centers_rad), np.sin(centers_rad)))
41
+
42
+ # --- Ensure flows are sufficiently opposite (auto-flip if needed) ---
43
+ if len(flow_vectors) >= 2:
44
+ sim = np.dot(flow_vectors[0], flow_vectors[1])
45
+ if sim > -0.8: # not opposite enough
46
+ flow_vectors[1] = -flow_vectors[0]
47
+
48
+ # --- Assign labels based on closest angular distance ---
49
+ def angle_distance(a, b):
50
+ d = np.abs(a - b)
51
+ return np.minimum(d, 360 - d)
52
+
53
+ labels = np.zeros(len(angles), dtype=int)
54
+ for i, ang in enumerate(angles.flatten()):
55
+ d0 = angle_distance(ang, centers[0])
56
+ d1 = angle_distance(ang, centers[1]) if n_clusters > 1 else 999
57
+ labels[i] = 0 if d0 < d1 else 1
58
+
59
+ return labels, flow_vectors
60
 
61
 
62
  # ============================================================
63
  # 🧭 3. Estimate Road Angle from Dominant Flow
64
  # ============================================================
65
  def estimate_road_angle(centers):
66
+ """Return average flow direction in degrees (0° = right)."""
67
  if centers is None or len(centers) == 0:
68
  return 0.0
69
  dominant = np.mean(centers, axis=0)
 
84
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
85
 
86
  overlay = bg.copy()
87
+ colors = [(0, 0, 255), (255, 255, 0)]
88
 
89
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
90
  vectors = np.divide(vectors, norms + 1e-6) * 10
91
 
 
92
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
93
  if i % 15 != 0:
94
  continue
 
97
  end = (int(start[0] + vx), int(start[1] + vy))
98
  cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3)
99
 
 
100
  h, w = overlay.shape[:2]
101
  scale = 300
102
  center_pt = (w // 2, h // 2)
 
111
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
112
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
113
 
 
114
  if drive_zone is not None:
115
  cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2)
116
  cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)),
 
140
  if len(vectors) == 0:
141
  return None, {"error": "No motion vectors found."}
142
 
143
+ labels, centers = cluster_by_angle(vectors, n_clusters=2)
 
144
  if labels is None:
145
  return None, {"error": "Insufficient data for clustering."}
146
 
147
  road_angle = estimate_road_angle(centers)
148
 
149
  drive_zone = [[100, 100], [800, 100], [800, 500], [100, 500]]
150
+ entry_zones = [[[50, 100], [100, 100], [100, 500], [50, 500]]]
 
 
151
 
152
  img_path = draw_flow_overlay(vectors, labels, centers,
153
  background, drive_zone, entry_zones)
 
167
  # 🖥️ 6. Gradio Interface
168
  # ============================================================
169
  description_text = """
170
+ ### 🧭 Dominant Flow Learning (Stage 2 — Angle-Based)
171
+ Clusters vehicle motion **by direction angle** on a circular scale,
172
+ giving cleaner opposite flows even on curved or diagonal roads.
173
  """
174
 
175
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
 
185
  gr.Image(label="Dominant Flow Overlay"),
186
  gr.JSON(label="Flow Stats (Stage 2 Output)")
187
  ],
188
+ title="🚗 Dominant Flow Learning – Stage 2 (Angle-Based)",
189
  description=description_text,
190
  examples=[[example_json, example_bg]] if example_json else None,
191
  )