Improved Dominant Flow Learning (Cosine-based)

#3
by nishanth-saka - opened
Files changed (1) hide show
  1. app.py +71 -51
app.py CHANGED
@@ -2,9 +2,9 @@ import gradio as gr
2
  import numpy as np, cv2, json, tempfile, os
3
  from sklearn.cluster import KMeans
4
 
5
- # ------------------------------------------------------------
6
  # ๐Ÿงฉ 1. Compute motion vectors from trajectory JSON
7
- # ------------------------------------------------------------
8
  def extract_motion_vectors(data):
9
  vectors = []
10
  for k, pts in data.items():
@@ -18,74 +18,96 @@ def extract_motion_vectors(data):
18
  return np.array(vectors)
19
 
20
 
21
- # ------------------------------------------------------------
22
- # ๐Ÿงฎ 2. Dominant flow clustering (KMeans โ†’ 2 clusters)
23
- # ------------------------------------------------------------
24
- def learn_flows(vectors, n_clusters=2):
 
 
 
 
 
 
25
  if len(vectors) < n_clusters:
26
  return None, None
27
- kmeans = KMeans(n_clusters=n_clusters, n_init=10, random_state=42)
28
- kmeans.fit(vectors)
 
 
 
 
 
 
 
 
 
 
 
 
29
  centers = kmeans.cluster_centers_
30
- return kmeans.labels_, centers
 
 
 
 
 
 
 
 
31
 
32
 
33
- # ------------------------------------------------------------
34
- # ๐ŸŽจ 3. Visualization utility (Option A โ€” Simple scale-up)
35
- # ------------------------------------------------------------
36
  def draw_flow_overlay(vectors, labels, centers, bg_img=None):
37
- # background setup
38
  if bg_img and os.path.exists(bg_img):
39
  bg = cv2.imread(bg_img)
40
  if bg is None:
41
- bg = np.ones((600,900,3),dtype=np.uint8)*40
42
  else:
43
- bg = np.ones((600,900,3),dtype=np.uint8)*40
44
 
45
  overlay = bg.copy()
46
- colors = [(0,0,255),(255,255,0)] # red & yellow for lanes
47
 
48
- # Normalize arrow lengths (for motion field)
49
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
50
  vectors = np.divide(vectors, norms + 1e-6) * 10
51
 
52
- # Draw sampled small flow arrows
53
- for i, ((vx,vy), lab) in enumerate(zip(vectors, labels)):
54
  if i % 15 != 0:
55
  continue
56
  start = (np.random.randint(0, overlay.shape[1]),
57
  np.random.randint(0, overlay.shape[0]))
58
- end = (int(start[0]+vx), int(start[1]+vy))
59
- cv2.arrowedLine(overlay, start, end, colors[lab%2], 1, tipLength=0.3)
60
 
61
- # --- โœ… Option A: Simple fixed scale-up for visible arrows ---
62
  h, w = overlay.shape[:2]
63
- scale = 300 # fixed pixel length (~300 px)
64
- center_pt = (w//2, h//2)
65
 
66
  for i, c in enumerate(centers):
67
- # normalize center first
68
  c = c / (np.linalg.norm(c) + 1e-6)
69
- end = (int(center_pt[0] + c[0]*scale),
70
- int(center_pt[1] + c[1]*scale))
71
- offset = (i - 0.5) * 40 # small vertical offset
72
  start = (center_pt[0], int(center_pt[1] + offset))
73
- cv2.arrowedLine(overlay, start, end, (0,255,0), 4, tipLength=0.4)
74
- cv2.putText(overlay, f"Flow {i+1}", (end[0]+10, end[1]),
75
- cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)
76
-
77
- # Blend overlay with background
78
- alpha = 0.6
79
- combined = cv2.addWeighted(bg, alpha, overlay, 1-alpha, 0)
80
 
 
81
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
82
  cv2.imwrite(out_path, combined)
83
  return out_path
84
 
85
 
86
- # ------------------------------------------------------------
87
- # ๐Ÿš€ 4. Combined pipeline
88
- # ------------------------------------------------------------
89
  def process_json(json_file, background=None):
90
  try:
91
  data = json.load(open(json_file))
@@ -93,18 +115,16 @@ def process_json(json_file, background=None):
93
  return None, {"error": f"Invalid JSON file: {e}"}
94
 
95
  vectors = extract_motion_vectors(data)
96
- if len(vectors)==0:
97
- return None, {"error":"No motion vectors found."}
98
 
99
- labels, centers = learn_flows(vectors)
100
  if labels is None:
101
- return None, {"error":"Insufficient data for clustering."}
102
 
103
- # normalize flow centers before saving
104
- centers = np.array(centers)
105
  centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
106
-
107
  img_path = draw_flow_overlay(vectors, labels, centers, background)
 
108
  stats = {
109
  "num_vectors": int(len(vectors)),
110
  "dominant_flows": int(len(centers)),
@@ -113,13 +133,13 @@ def process_json(json_file, background=None):
113
  return img_path, stats
114
 
115
 
116
- # ------------------------------------------------------------
117
- # ๐Ÿ–ฅ๏ธ Gradio Interface
118
- # ------------------------------------------------------------
119
  description_text = """
120
- ### ๐Ÿงญ Dominant Flow Learning (Stage 2 โ€” Scaled Arrows)
121
  Upload the **trajectories JSON** from Stage 1.
122
- Optionally upload a background road frame image for overlay visualization.
123
  """
124
 
125
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
@@ -135,7 +155,7 @@ demo = gr.Interface(
135
  gr.Image(label="Dominant Flow Overlay"),
136
  gr.JSON(label="Flow Stats")
137
  ],
138
- title="๐Ÿš— Dominant Flow Learning โ€“ Stage 2 (Option A Scale-Up)",
139
  description=description_text,
140
  examples=[[example_json, example_bg]] if example_json else None,
141
  )
 
2
  import numpy as np, cv2, json, tempfile, os
3
  from sklearn.cluster import KMeans
4
 
5
+ # ============================================================
6
  # ๐Ÿงฉ 1. Compute motion vectors from trajectory JSON
7
+ # ============================================================
8
  def extract_motion_vectors(data):
9
  vectors = []
10
  for k, pts in data.items():
 
18
  return np.array(vectors)
19
 
20
 
21
+ # ============================================================
22
+ # ๐Ÿงฎ 2. Improved Dominant Flow Clustering (Cosine-based)
23
+ # ============================================================
24
+ def learn_flows_improved(vectors, n_clusters=2, normalize=True):
25
+ """
26
+ Improved dominant-flow clustering:
27
+ - Normalizes all vectors to unit direction (ignores speed)
28
+ - Clusters by angular orientation (cosine distance)
29
+ - Ignores low-magnitude / noisy motions
30
+ """
31
  if len(vectors) < n_clusters:
32
  return None, None
33
+
34
+ # (1) Normalize to direction only
35
+ norms = np.linalg.norm(vectors, axis=1, keepdims=True)
36
+ dirs = vectors / (norms + 1e-6)
37
+
38
+ # (2) Filter out tiny motions
39
+ valid = (norms[:, 0] > 1.5)
40
+ dirs = dirs[valid]
41
+ if len(dirs) < n_clusters:
42
+ return None, None
43
+
44
+ # (3) KMeans on direction vectors (โ‰ˆ cosine distance)
45
+ kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
46
+ kmeans.fit(dirs)
47
  centers = kmeans.cluster_centers_
48
+
49
+ # (4) Normalize cluster centers again
50
+ centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
51
+
52
+ # (5) Re-assign all original vectors to nearest angular center
53
+ sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T)
54
+ labels = np.argmax(sims, axis=1)
55
+
56
+ return labels, centers
57
 
58
 
59
+ # ============================================================
60
+ # ๐ŸŽจ 3. Visualization Utility (Option A โ€” Scaled-up Arrows)
61
+ # ============================================================
62
  def draw_flow_overlay(vectors, labels, centers, bg_img=None):
63
+ # background
64
  if bg_img and os.path.exists(bg_img):
65
  bg = cv2.imread(bg_img)
66
  if bg is None:
67
+ bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
68
  else:
69
+ bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
70
 
71
  overlay = bg.copy()
72
+ colors = [(0, 0, 255), (255, 255, 0)] # red & yellow
73
 
74
+ # normalize arrow lengths for small samples
75
  norms = np.linalg.norm(vectors, axis=1, keepdims=True)
76
  vectors = np.divide(vectors, norms + 1e-6) * 10
77
 
78
+ # draw mini-arrows for field visualization
79
+ for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
80
  if i % 15 != 0:
81
  continue
82
  start = (np.random.randint(0, overlay.shape[1]),
83
  np.random.randint(0, overlay.shape[0]))
84
+ end = (int(start[0] + vx), int(start[1] + vy))
85
+ cv2.arrowedLine(overlay, start, end, colors[lab % 2], 1, tipLength=0.3)
86
 
87
+ # --- main dominant arrows ---
88
  h, w = overlay.shape[:2]
89
+ scale = 300
90
+ center_pt = (w // 2, h // 2)
91
 
92
  for i, c in enumerate(centers):
 
93
  c = c / (np.linalg.norm(c) + 1e-6)
94
+ end = (int(center_pt[0] + c[0] * scale),
95
+ int(center_pt[1] + c[1] * scale))
96
+ offset = (i - 0.5) * 40
97
  start = (center_pt[0], int(center_pt[1] + offset))
98
+ cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4)
99
+ cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
100
+ cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
 
 
 
 
101
 
102
+ combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
103
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
104
  cv2.imwrite(out_path, combined)
105
  return out_path
106
 
107
 
108
+ # ============================================================
109
+ # ๐Ÿš€ 4. Combined Pipeline
110
+ # ============================================================
111
  def process_json(json_file, background=None):
112
  try:
113
  data = json.load(open(json_file))
 
115
  return None, {"error": f"Invalid JSON file: {e}"}
116
 
117
  vectors = extract_motion_vectors(data)
118
+ if len(vectors) == 0:
119
+ return None, {"error": "No motion vectors found."}
120
 
121
+ labels, centers = learn_flows_improved(vectors)
122
  if labels is None:
123
+ return None, {"error": "Insufficient data for clustering."}
124
 
 
 
125
  centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
 
126
  img_path = draw_flow_overlay(vectors, labels, centers, background)
127
+
128
  stats = {
129
  "num_vectors": int(len(vectors)),
130
  "dominant_flows": int(len(centers)),
 
133
  return img_path, stats
134
 
135
 
136
+ # ============================================================
137
+ # ๐Ÿ–ฅ๏ธ 5. Gradio Interface
138
+ # ============================================================
139
  description_text = """
140
+ ### ๐Ÿงญ Dominant Flow Learning (Stage 2 โ€” Cosine-Based Improved)
141
  Upload the **trajectories JSON** from Stage 1.
142
+ Optionally upload a background frame for overlay visualization.
143
  """
144
 
145
  example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
 
155
  gr.Image(label="Dominant Flow Overlay"),
156
  gr.JSON(label="Flow Stats")
157
  ],
158
+ title="๐Ÿš— Dominant Flow Learning โ€“ Stage 2 (Cosine-Based Improved)",
159
  description=description_text,
160
  examples=[[example_json, example_bg]] if example_json else None,
161
  )