nishanth-saka commited on
Commit
954bd8b
·
verified ·
1 Parent(s): c1ceeb5

Angle estimation (#6)

Browse files

- Angle estimation (985c62c076c5799ace45d138fde3b9b897cf1679)

Files changed (1) hide show
  1. app.py +53 -35
app.py CHANGED
@@ -13,54 +13,53 @@ def extract_motion_vectors(data):
13
  continue
14
  diffs = np.diff(pts, axis=0)
15
  for d in diffs:
16
- if np.linalg.norm(d) > 1: # ignore jitter / static points
17
  vectors.append(d)
18
  return np.array(vectors)
19
 
20
 
21
  # ============================================================
22
- # 🧮 2. Improved Dominant Flow Clustering (Cosine-based)
23
  # ============================================================
24
- def learn_flows_improved(vectors, n_clusters=2, normalize=True):
25
- """
26
- Improved dominant-flow clustering:
27
- - Normalizes all vectors to unit direction (ignores speed)
28
- - Clusters by angular orientation (cosine distance)
29
- - Ignores low-magnitude / noisy motions
30
- """
31
  if len(vectors) < n_clusters:
32
  return None, None
33
 
34
- # (1) Normalize to direction only
35
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
36
  dirs = vectors / (norms + 1e-6)
37
-
38
- # (2) Filter out tiny motions
39
  valid = (norms[:, 0] > 1.5)
40
  dirs = dirs[valid]
41
  if len(dirs) < n_clusters:
42
  return None, None
43
 
44
- # (3) KMeans on direction vectors (≈ cosine distance)
45
  kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
46
  kmeans.fit(dirs)
47
  centers = kmeans.cluster_centers_
48
-
49
- # (4) Normalize cluster centers again
50
  centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
51
 
52
- # (5) Re-assign all original vectors to nearest angular center
53
  sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T)
54
  labels = np.argmax(sims, axis=1)
55
-
56
  return labels, centers
57
 
58
 
59
  # ============================================================
60
- # 🎨 3. Visualization Utility (Option A Scaled-up Arrows)
 
 
 
 
 
 
 
 
 
 
61
  # ============================================================
62
- def draw_flow_overlay(vectors, labels, centers, bg_img=None):
63
- # background
 
 
64
  if bg_img and os.path.exists(bg_img):
65
  bg = cv2.imread(bg_img)
66
  if bg is None:
@@ -69,13 +68,11 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None):
69
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
70
 
71
  overlay = bg.copy()
72
- colors = [(0, 0, 255), (255, 255, 0)] # red & yellow
73
 
74
- # normalize arrow lengths for small samples
75
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
76
  vectors = np.divide(vectors, norms + 1e-6) * 10
77
 
78
- # draw mini-arrows for field visualization
79
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
80
  if i % 15 != 0:
81
  continue
@@ -84,7 +81,6 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None):
84
  end = (int(start[0] + vx), int(start[1] + vy))
85
  cv2.arrowedLine(overlay, start, end, colors[lab % 2], 1, tipLength=0.3)
86
 
87
- # --- main dominant arrows ---
88
  h, w = overlay.shape[:2]
89
  scale = 300
90
  center_pt = (w // 2, h // 2)
@@ -99,6 +95,17 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None):
99
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
100
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
101
 
 
 
 
 
 
 
 
 
 
 
 
102
  combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
103
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
104
  cv2.imwrite(out_path, combined)
@@ -106,7 +113,7 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None):
106
 
107
 
108
  # ============================================================
109
- # 🚀 4. Combined Pipeline
110
  # ============================================================
111
  def process_json(json_file, background=None):
112
  try:
@@ -122,24 +129,35 @@ def process_json(json_file, background=None):
122
  if labels is None:
123
  return None, {"error": "Insufficient data for clustering."}
124
 
125
- centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
126
- img_path = draw_flow_overlay(vectors, labels, centers, background)
 
 
 
 
 
 
 
 
127
 
128
  stats = {
129
  "num_vectors": int(len(vectors)),
130
  "dominant_flows": int(len(centers)),
131
- "flow_centers": centers.tolist()
 
 
 
132
  }
133
  return img_path, stats
134
 
135
 
136
  # ============================================================
137
- # 🖥️ 5. Gradio Interface
138
  # ============================================================
139
  description_text = """
140
- ### 🧭 Dominant Flow Learning (Stage 2 — Cosine-Based Improved)
141
- Upload the **trajectories JSON** from Stage 1.
142
- Optionally upload a background frame for overlay visualization.
143
  """
144
 
145
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
@@ -153,12 +171,12 @@ demo = gr.Interface(
153
  ],
154
  outputs=[
155
  gr.Image(label="Dominant Flow Overlay"),
156
- gr.JSON(label="Flow Stats")
157
  ],
158
- title="🚗 Dominant Flow Learning – Stage 2 (Cosine-Based Improved)",
159
  description=description_text,
160
  examples=[[example_json, example_bg]] if example_json else None,
161
  )
162
 
163
  if __name__ == "__main__":
164
- demo.launch()
 
13
  continue
14
  diffs = np.diff(pts, axis=0)
15
  for d in diffs:
16
+ if np.linalg.norm(d) > 1: # ignore jitter/static
17
  vectors.append(d)
18
  return np.array(vectors)
19
 
20
 
21
  # ============================================================
22
+ # 🧮 2. Dominant Flow Clustering (Cosine-based)
23
  # ============================================================
24
+ def learn_flows_improved(vectors, n_clusters=2):
25
+ """Cosine-based clustering of normalized motion directions."""
 
 
 
 
 
26
  if len(vectors) < n_clusters:
27
  return None, None
28
 
 
29
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
30
  dirs = vectors / (norms + 1e-6)
 
 
31
  valid = (norms[:, 0] > 1.5)
32
  dirs = dirs[valid]
33
  if len(dirs) < n_clusters:
34
  return None, None
35
 
 
36
  kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
37
  kmeans.fit(dirs)
38
  centers = kmeans.cluster_centers_
 
 
39
  centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
40
 
 
41
  sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T)
42
  labels = np.argmax(sims, axis=1)
 
43
  return labels, centers
44
 
45
 
46
  # ============================================================
47
+ # 🧭 3. Estimate Road Angle from Dominant Flow
48
+ # ============================================================
49
+ def estimate_road_angle(centers):
50
+ """Return average flow direction in degrees (0° = horizontal right)."""
51
+ if centers is None or len(centers) == 0:
52
+ return 0.0
53
+ dominant = np.mean(centers, axis=0)
54
+ angle = np.degrees(np.arctan2(dominant[1], dominant[0]))
55
+ return float(angle % 360)
56
+
57
+
58
  # ============================================================
59
+ # 🎨 4. Visualization Utility
60
+ # ============================================================
61
+ def draw_flow_overlay(vectors, labels, centers, bg_img=None,
62
+ drive_zone=None, entry_zones=None):
63
  if bg_img and os.path.exists(bg_img):
64
  bg = cv2.imread(bg_img)
65
  if bg is None:
 
68
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
69
 
70
  overlay = bg.copy()
71
+ colors = [(0, 0, 255), (255, 255, 0)]
72
 
 
73
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
74
  vectors = np.divide(vectors, norms + 1e-6) * 10
75
 
 
76
  for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
77
  if i % 15 != 0:
78
  continue
 
81
  end = (int(start[0] + vx), int(start[1] + vy))
82
  cv2.arrowedLine(overlay, start, end, colors[lab % 2], 1, tipLength=0.3)
83
 
 
84
  h, w = overlay.shape[:2]
85
  scale = 300
86
  center_pt = (w // 2, h // 2)
 
95
  cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
96
  cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
97
 
98
+ # --- Optional zones overlay ---
99
+ if drive_zone is not None:
100
+ cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2)
101
+ cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)),
102
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
103
+ if entry_zones:
104
+ for ez in entry_zones:
105
+ cv2.polylines(overlay, [np.array(ez, np.int32)], True, (0, 0, 255), 2)
106
+ cv2.putText(overlay, "Entry Gate", tuple(np.array(ez[0], int)),
107
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
108
+
109
  combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
110
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
111
  cv2.imwrite(out_path, combined)
 
113
 
114
 
115
  # ============================================================
116
+ # 🚀 5. Combined Pipeline
117
  # ============================================================
118
  def process_json(json_file, background=None):
119
  try:
 
129
  if labels is None:
130
  return None, {"error": "Insufficient data for clustering."}
131
 
132
+ road_angle = estimate_road_angle(centers)
133
+
134
+ # Optionally define default polygons (can be user-drawn later)
135
+ drive_zone = [[100, 100], [800, 100], [800, 500], [100, 500]]
136
+ entry_zones = [
137
+ [[50, 100], [100, 100], [100, 500], [50, 500]] # left edge example
138
+ ]
139
+
140
+ img_path = draw_flow_overlay(vectors, labels, centers,
141
+ background, drive_zone, entry_zones)
142
 
143
  stats = {
144
  "num_vectors": int(len(vectors)),
145
  "dominant_flows": int(len(centers)),
146
+ "flow_centers": centers.tolist(),
147
+ "road_angle_deg": road_angle,
148
+ "drive_zone": drive_zone,
149
+ "entry_zones": entry_zones
150
  }
151
  return img_path, stats
152
 
153
 
154
  # ============================================================
155
+ # 🖥️ 6. Gradio Interface
156
  # ============================================================
157
  description_text = """
158
+ ### 🧭 Dominant Flow Learning (Stage 2 — Angle + Zone-Aware)
159
+ Uploads the **trajectories JSON** from Stage 1 and optionally a background frame.
160
+ Outputs dominant flow directions, estimated road angle, and zone polygons for Stage 3.
161
  """
162
 
163
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
 
171
  ],
172
  outputs=[
173
  gr.Image(label="Dominant Flow Overlay"),
174
+ gr.JSON(label="Flow Stats (Stage 2 Output)")
175
  ],
176
+ title="🚗 Dominant Flow Learning – Stage 2 (Angle + Zone-Aware)",
177
  description=description_text,
178
  examples=[[example_json, example_bg]] if example_json else None,
179
  )
180
 
181
  if __name__ == "__main__":
182
+ demo.launch()