nishanth-saka commited on
Commit
d4c291c
·
verified ·
1 Parent(s): 2e7d5ff
Files changed (1) hide show
  1. app.py +52 -91
app.py CHANGED
@@ -13,67 +13,54 @@ def extract_motion_vectors(data):
13
  continue
14
  diffs = np.diff(pts, axis=0)
15
  for d in diffs:
16
- if np.linalg.norm(d) > 1: # ignore jitter/static
17
  vectors.append(d)
18
  return np.array(vectors)
19
 
20
 
21
  # ============================================================
22
- # 🧮 2. Direction-Specific (Angle-Based) Clustering
23
  # ============================================================
24
- def cluster_by_angle(vectors, n_clusters=2):
25
- """Cluster motion directions using circular (angle-space) logic."""
 
 
 
 
 
26
  if len(vectors) < n_clusters:
27
  return None, None
28
 
29
- # Convert to angles (−180° → 180°)
30
- angles = np.degrees(np.arctan2(vectors[:, 1], vectors[:, 0])).reshape(-1, 1)
31
-
32
- kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
33
- kmeans.fit(angles)
34
- centers = kmeans.cluster_centers_.flatten()
35
-
36
- # Convert cluster centers → unit vectors
37
- centers_rad = np.radians(centers)
38
- flow_vectors = np.column_stack((np.cos(centers_rad), np.sin(centers_rad)))
39
-
40
- # Ensure flows are opposite (auto-flip if needed)
41
- if len(flow_vectors) >= 2:
42
- sim = np.dot(flow_vectors[0], flow_vectors[1])
43
- if sim > -0.8:
44
- flow_vectors[1] = -flow_vectors[0]
45
 
46
- # Assign labels by smallest angular distance
47
- def angle_distance(a, b):
48
- d = np.abs(a - b)
49
- return np.minimum(d, 360 - d)
 
50
 
51
- labels = np.zeros(len(angles), dtype=int)
52
- for i, ang in enumerate(angles.flatten()):
53
- d0 = angle_distance(ang, centers[0])
54
- d1 = angle_distance(ang, centers[1]) if n_clusters > 1 else 999
55
- labels[i] = 0 if d0 < d1 else 1
56
 
57
- return labels, flow_vectors
 
58
 
 
 
 
59
 
60
- # ============================================================
61
- # 🧭 3. Estimate Road Angle from Dominant Flow
62
- # ============================================================
63
- def estimate_road_angle(centers):
64
- if centers is None or len(centers) == 0:
65
- return 0.0
66
- dominant = np.mean(centers, axis=0)
67
- angle = np.degrees(np.arctan2(dominant[1], dominant[0]))
68
- return float(angle % 360)
69
 
70
 
71
  # ============================================================
72
- # 🎨 4. Visualization Utility
73
  # ============================================================
74
- def draw_flow_overlay(vectors, labels, centers, bg_img=None,
75
- drive_zone=None, entry_zones=None):
76
- # Load background or fallback canvas
77
  if bg_img and os.path.exists(bg_img):
78
  bg = cv2.imread(bg_img)
79
  if bg is None:
@@ -81,23 +68,27 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
81
  else:
82
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
83
 
84
- h, w = bg.shape[:2]
85
  overlay = bg.copy()
86
- colors = [(0, 0, 255), (255, 255, 0)]
87
 
88
- # Draw sample motion vectors
89
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
90
  vectors = np.divide(vectors, norms + 1e-6) * 10
 
 
91
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
92
  if i % 15 != 0:
93
  continue
94
- start = (np.random.randint(0, w), np.random.randint(0, h))
 
95
  end = (int(start[0] + vx), int(start[1] + vy))
96
- cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3)
97
 
98
- # Draw dominant flow arrows
 
99
  scale = 300
100
  center_pt = (w // 2, h // 2)
 
101
  for i, c in enumerate(centers):
102
  c = c / (np.linalg.norm(c) + 1e-6)
103
  end = (int(center_pt[0] + c[0] * scale),
@@ -108,17 +99,6 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
108
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
109
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
110
 
111
- # Draw zones if provided
112
- if drive_zone is not None:
113
- cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2)
114
- cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)),
115
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
116
- if entry_zones:
117
- for ez in entry_zones:
118
- cv2.polylines(overlay, [np.array(ez, np.int32)], True, (0, 0, 255), 2)
119
- cv2.putText(overlay, "Entry Gate", tuple(np.array(ez[0], int)),
120
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
121
-
122
  combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
123
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
124
  cv2.imwrite(out_path, combined)
@@ -126,7 +106,7 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
126
 
127
 
128
  # ============================================================
129
- # 🚀 5. Combined Pipeline (With Dynamic Zones)
130
  # ============================================================
131
  def process_json(json_file, background=None):
132
  try:
@@ -138,47 +118,28 @@ def process_json(json_file, background=None):
138
  if len(vectors) == 0:
139
  return None, {"error": "No motion vectors found."}
140
 
141
- labels, centers = cluster_by_angle(vectors, n_clusters=2)
142
  if labels is None:
143
  return None, {"error": "Insufficient data for clustering."}
144
 
145
- road_angle = estimate_road_angle(centers)
146
-
147
- # --- determine frame size for zones ---
148
- if background and os.path.exists(background):
149
- bg_img = cv2.imread(background)
150
- h, w = bg_img.shape[:2]
151
- else:
152
- # fallback for unknown resolution
153
- w, h = 1280, 720
154
-
155
- # --- dynamic zones based on frame width ---
156
- drive_zone = [[100, 100], [w - 100, 100], [w - 100, 500], [100, 500]]
157
- entry_zones = [
158
- [[w - 100, 100], [w - 50, 100], [w - 50, 500], [w - 100, 500]] # right-edge entry
159
- ]
160
-
161
- img_path = draw_flow_overlay(vectors, labels, centers,
162
- background, drive_zone, entry_zones)
163
 
164
  stats = {
165
  "num_vectors": int(len(vectors)),
166
  "dominant_flows": int(len(centers)),
167
- "flow_centers": centers.tolist(),
168
- "road_angle_deg": road_angle,
169
- "drive_zone": drive_zone,
170
- "entry_zones": entry_zones
171
  }
172
  return img_path, stats
173
 
174
 
175
  # ============================================================
176
- # 🖥️ 6. Gradio Interface
177
  # ============================================================
178
  description_text = """
179
- ### 🧭 Dominant Flow Learning (Stage 2 — Angle-Based + Dynamic Zones)
180
- Clusters vehicle motion **by direction angle** on a circular scale
181
- and automatically places the Drive Zone and Entry Zone using frame width.
182
  """
183
 
184
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
@@ -192,12 +153,12 @@ demo = gr.Interface(
192
  ],
193
  outputs=[
194
  gr.Image(label="Dominant Flow Overlay"),
195
- gr.JSON(label="Flow Stats (Stage 2 Output)")
196
  ],
197
- title="🚗 Dominant Flow Learning – Stage 2 (Dynamic Zones)",
198
  description=description_text,
199
  examples=[[example_json, example_bg]] if example_json else None,
200
  )
201
 
202
  if __name__ == "__main__":
203
- demo.launch()
 
13
  continue
14
  diffs = np.diff(pts, axis=0)
15
  for d in diffs:
16
+ if np.linalg.norm(d) > 1: # ignore jitter / static points
17
  vectors.append(d)
18
  return np.array(vectors)
19
 
20
 
21
  # ============================================================
22
+ # 🧮 2. Improved Dominant Flow Clustering (Cosine-based)
23
  # ============================================================
24
+ def learn_flows_improved(vectors, n_clusters=2, normalize=True):
25
+ """
26
+ Improved dominant-flow clustering:
27
+ - Normalizes all vectors to unit direction (ignores speed)
28
+ - Clusters by angular orientation (cosine distance)
29
+ - Ignores low-magnitude / noisy motions
30
+ """
31
  if len(vectors) < n_clusters:
32
  return None, None
33
 
34
+ # (1) Normalize to direction only
35
+ norms = np.linalg.norm(vectors, axis=1, keepdims=True)
36
+ dirs = vectors / (norms + 1e-6)
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
+ # (2) Filter out tiny motions
39
+ valid = (norms[:, 0] > 1.5)
40
+ dirs = dirs[valid]
41
+ if len(dirs) < n_clusters:
42
+ return None, None
43
 
44
+ # (3) KMeans on direction vectors (≈ cosine distance)
45
+ kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
46
+ kmeans.fit(dirs)
47
+ centers = kmeans.cluster_centers_
 
48
 
49
+ # (4) Normalize cluster centers again
50
+ centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
51
 
52
+ # (5) Re-assign all original vectors to nearest angular center
53
+ sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T)
54
+ labels = np.argmax(sims, axis=1)
55
 
56
+ return labels, centers
 
 
 
 
 
 
 
 
57
 
58
 
59
  # ============================================================
60
+ # 🎨 3. Visualization Utility (Option A — Scaled-up Arrows)
61
  # ============================================================
62
+ def draw_flow_overlay(vectors, labels, centers, bg_img=None):
63
+ # background
 
64
  if bg_img and os.path.exists(bg_img):
65
  bg = cv2.imread(bg_img)
66
  if bg is None:
 
68
  else:
69
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
70
 
 
71
  overlay = bg.copy()
72
+ colors = [(0, 0, 255), (255, 255, 0)] # red & yellow
73
 
74
+ # normalize arrow lengths for small samples
75
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
76
  vectors = np.divide(vectors, norms + 1e-6) * 10
77
+
78
+ # draw mini-arrows for field visualization
79
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
80
  if i % 15 != 0:
81
  continue
82
+ start = (np.random.randint(0, overlay.shape[1]),
83
+ np.random.randint(0, overlay.shape[0]))
84
  end = (int(start[0] + vx), int(start[1] + vy))
85
+ cv2.arrowedLine(overlay, start, end, colors[lab % 2], 1, tipLength=0.3)
86
 
87
+ # --- main dominant arrows ---
88
+ h, w = overlay.shape[:2]
89
  scale = 300
90
  center_pt = (w // 2, h // 2)
91
+
92
  for i, c in enumerate(centers):
93
  c = c / (np.linalg.norm(c) + 1e-6)
94
  end = (int(center_pt[0] + c[0] * scale),
 
99
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
100
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
101
 
 
 
 
 
 
 
 
 
 
 
 
102
  combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
103
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
104
  cv2.imwrite(out_path, combined)
 
106
 
107
 
108
  # ============================================================
109
+ # 🚀 4. Combined Pipeline
110
  # ============================================================
111
  def process_json(json_file, background=None):
112
  try:
 
118
  if len(vectors) == 0:
119
  return None, {"error": "No motion vectors found."}
120
 
121
+ labels, centers = learn_flows_improved(vectors)
122
  if labels is None:
123
  return None, {"error": "Insufficient data for clustering."}
124
 
125
+ centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
126
+ img_path = draw_flow_overlay(vectors, labels, centers, background)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
 
128
  stats = {
129
  "num_vectors": int(len(vectors)),
130
  "dominant_flows": int(len(centers)),
131
+ "flow_centers": centers.tolist()
 
 
 
132
  }
133
  return img_path, stats
134
 
135
 
136
  # ============================================================
137
+ # 🖥️ 5. Gradio Interface
138
  # ============================================================
139
  description_text = """
140
+ ### 🧭 Dominant Flow Learning (Stage 2 — Cosine-Based Improved)
141
+ Upload the **trajectories JSON** from Stage 1.
142
+ Optionally upload a background frame for overlay visualization.
143
  """
144
 
145
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
 
153
  ],
154
  outputs=[
155
  gr.Image(label="Dominant Flow Overlay"),
156
+ gr.JSON(label="Flow Stats")
157
  ],
158
+ title="🚗 Dominant Flow Learning – Stage 2 (Cosine-Based Improved)",
159
  description=description_text,
160
  examples=[[example_json, example_bg]] if example_json else None,
161
  )
162
 
163
  if __name__ == "__main__":
164
+ demo.launch()