nishanth-saka commited on
Commit
67f3b81
·
verified ·
1 Parent(s): 1cd4c84

Angle-based check

Browse files
Files changed (1) hide show
  1. app.py +112 -132
app.py CHANGED
@@ -1,103 +1,111 @@
1
- import gradio as gr
2
- import numpy as np, cv2, json, tempfile, os
3
- from sklearn.cluster import KMeans
4
-
5
  # ============================================================
6
- # 🧩 1. Compute motion vectors from trajectory JSON
7
  # ============================================================
8
- def extract_motion_vectors(data):
9
- vectors = []
10
- for k, pts in data.items():
11
- pts = np.array(pts)
12
- if len(pts) < 2:
13
- continue
14
- diffs = np.diff(pts, axis=0)
15
- for d in diffs:
16
- if np.linalg.norm(d) > 1: # ignore jitter / static points
17
- vectors.append(d)
18
- return np.array(vectors)
19
 
20
-
21
- # ============================================================
22
- # 🧮 2. Improved Dominant Flow Clustering (Cosine-based)
23
- # ============================================================
24
- def learn_flows_improved(vectors, n_clusters=2, normalize=True):
25
- """
26
- Improved dominant-flow clustering:
27
- - Normalizes all vectors to unit direction (ignores speed)
28
- - Clusters by angular orientation (cosine distance)
29
- - Ignores low-magnitude / noisy motions
 
 
 
 
 
 
 
 
 
 
30
  """
31
- if len(vectors) < n_clusters:
32
- return None, None
33
-
34
- # (1) Normalize to direction only
35
- norms = np.linalg.norm(vectors, axis=1, keepdims=True)
36
- dirs = vectors / (norms + 1e-6)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
- # (2) Filter out tiny motions
39
- valid = (norms[:, 0] > 1.5)
40
- dirs = dirs[valid]
41
- if len(dirs) < n_clusters:
42
- return None, None
43
-
44
- # (3) KMeans on direction vectors (≈ cosine distance)
45
- kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
46
- kmeans.fit(dirs)
47
- centers = kmeans.cluster_centers_
48
-
49
- # (4) Normalize cluster centers again
50
- centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
51
-
52
- # (5) Re-assign all original vectors to nearest angular center
53
- sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T)
54
- labels = np.argmax(sims, axis=1)
55
-
56
- return labels, centers
57
-
58
-
59
- # ============================================================
60
- # 🎨 3. Visualization Utility (Option A — Scaled-up Arrows)
61
- # ============================================================
62
- def draw_flow_overlay(vectors, labels, centers, bg_img=None):
63
- # background
64
  if bg_img and os.path.exists(bg_img):
65
  bg = cv2.imread(bg_img)
66
- if bg is None:
67
- bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
68
  else:
69
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
 
70
 
71
  overlay = bg.copy()
72
- colors = [(0, 0, 255), (255, 255, 0)] # red & yellow
73
 
74
- # normalize arrow lengths for small samples
75
- norms = np.linalg.norm(vectors, axis=1, keepdims=True)
76
- vectors = np.divide(vectors, norms + 1e-6) * 10
 
 
 
77
 
78
- # draw mini-arrows for field visualization
79
- for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
80
- if i % 15 != 0:
81
  continue
82
- start = (np.random.randint(0, overlay.shape[1]),
83
- np.random.randint(0, overlay.shape[0]))
84
- end = (int(start[0] + vx), int(start[1] + vy))
85
- cv2.arrowedLine(overlay, start, end, colors[lab % 2], 1, tipLength=0.3)
86
-
87
- # --- main dominant arrows ---
88
- h, w = overlay.shape[:2]
89
- scale = 300
90
- center_pt = (w // 2, h // 2)
91
-
92
- for i, c in enumerate(centers):
93
- c = c / (np.linalg.norm(c) + 1e-6)
94
- end = (int(center_pt[0] + c[0] * scale),
95
- int(center_pt[1] + c[1] * scale))
96
- offset = (i - 0.5) * 40
97
- start = (center_pt[0], int(center_pt[1] + offset))
98
- cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4)
99
- cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
100
- cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
101
 
102
  combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
103
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
@@ -105,59 +113,31 @@ def draw_flow_overlay(vectors, labels, centers, bg_img=None):
105
  return out_path
106
 
107
 
108
- # ============================================================
109
- # 🚀 4. Combined Pipeline
110
- # ============================================================
111
- def process_json(json_file, background=None):
112
- try:
113
- data = json.load(open(json_file))
114
- except Exception as e:
115
- return None, {"error": f"Invalid JSON file: {e}"}
116
-
117
- vectors = extract_motion_vectors(data)
118
- if len(vectors) == 0:
119
- return None, {"error": "No motion vectors found."}
120
-
121
- labels, centers = learn_flows_improved(vectors)
122
- if labels is None:
123
- return None, {"error": "Insufficient data for clustering."}
124
-
125
- centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
126
- img_path = draw_flow_overlay(vectors, labels, centers, background)
127
-
128
- stats = {
129
- "num_vectors": int(len(vectors)),
130
- "dominant_flows": int(len(centers)),
131
- "flow_centers": centers.tolist()
132
- }
133
- return img_path, stats
134
-
135
-
136
- # ============================================================
137
- # 🖥️ 5. Gradio Interface
138
- # ============================================================
139
  description_text = """
140
- ### 🧭 Dominant Flow Learning (Stage 2Cosine-Based Improved)
141
- Upload the **trajectories JSON** from Stage 1.
142
- Optionally upload a background frame for overlay visualization.
 
 
 
 
 
 
143
  """
144
 
145
- example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
146
- example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None
147
-
148
  demo = gr.Interface(
149
- fn=process_json,
150
  inputs=[
151
- gr.File(label="Upload trajectories JSON"),
 
152
  gr.File(label="Optional background frame (.jpg)")
153
  ],
154
- outputs=[
155
- gr.Image(label="Dominant Flow Overlay"),
156
- gr.JSON(label="Flow Stats")
157
- ],
158
- title="🚗 Dominant Flow Learning – Stage 2 (Cosine-Based Improved)",
159
- description=description_text,
160
- examples=[[example_json, example_bg]] if example_json else None,
161
  )
162
 
163
  if __name__ == "__main__":
 
 
 
 
 
1
  # ============================================================
2
+ # 🚦 Stage 3 Wrong-Direction Detection (Angle + Temporal + Zones)
3
  # ============================================================
 
 
 
 
 
 
 
 
 
 
 
4
 
5
+ import gradio as gr
6
+ import numpy as np, cv2, json, os, tempfile
7
+ from collections import defaultdict
8
+
9
+ # ------------------------------------------------------------
10
+ # ⚙️ CONFIG
11
+ # ------------------------------------------------------------
12
+ ANGLE_THRESHOLD = 60 # degrees above this = wrong direction
13
+ SMOOTH_FRAMES = 5 # frames used for temporal smoothing
14
+ ENTRY_ZONE_RATIO = 0.15 # top 15% region = entry gate (skip labeling)
15
+
16
+ # ------------------------------------------------------------
17
+ # 1️⃣ Load flow model (from Stage 2 JSON or dict)
18
+ # ------------------------------------------------------------
19
+ def load_flow_model(flow_model_json):
20
+ """Expected keys:
21
+ {
22
+ "zones": N,
23
+ "zone_flow_centers": [[[dx,dy], ...], ...]
24
+ }
25
  """
26
+ model = json.load(open(flow_model_json))
27
+ centers = [np.array(z) for z in model["zone_flow_centers"]]
28
+ return centers
29
+
30
+ # ------------------------------------------------------------
31
+ # 2️⃣ Extract trajectories
32
+ # ------------------------------------------------------------
33
+ def extract_trajectories(json_file):
34
+ data = json.load(open(json_file))
35
+ tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2}
36
+ return tracks
37
+
38
+ # ------------------------------------------------------------
39
+ # 3️⃣ Utility: compute smoothed direction
40
+ # ------------------------------------------------------------
41
+ def smooth_direction(pts, window=SMOOTH_FRAMES):
42
+ if len(pts) < 2:
43
+ return np.array([0, 0])
44
+ diffs = np.diff(pts[-window:], axis=0)
45
+ v = np.mean(diffs, axis=0)
46
+ n = np.linalg.norm(v)
47
+ return v / (n + 1e-6)
48
+
49
+ # ------------------------------------------------------------
50
+ # 4️⃣ Compute angle between two unit vectors
51
+ # ------------------------------------------------------------
52
+ def angle_between(v1, v2):
53
+ v1 = v1 / (np.linalg.norm(v1) + 1e-6)
54
+ v2 = v2 / (np.linalg.norm(v2) + 1e-6)
55
+ cosang = np.clip(np.dot(v1, v2), -1, 1)
56
+ return np.degrees(np.arccos(cosang))
57
+
58
+ # ------------------------------------------------------------
59
+ # 5️⃣ Determine zone index for a y-coordinate
60
+ # ------------------------------------------------------------
61
+ def get_zone_idx(y, frame_h, n_zones):
62
+ zone_height = frame_h / n_zones
63
+ return int(np.clip(y // zone_height, 0, n_zones - 1))
64
+
65
+ # ------------------------------------------------------------
66
+ # 6️⃣ Main Logic
67
+ # ------------------------------------------------------------
68
+ def classify_wrong_direction(traj_json, flow_model_json, bg_img=None):
69
+ tracks = extract_trajectories(traj_json)
70
+ centers_by_zone = load_flow_model(flow_model_json)
71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  if bg_img and os.path.exists(bg_img):
73
  bg = cv2.imread(bg_img)
 
 
74
  else:
75
  bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
76
+ h, w = bg.shape[:2]
77
 
78
  overlay = bg.copy()
79
+ font = cv2.FONT_HERSHEY_SIMPLEX
80
 
81
+ for tid, pts in tracks.items():
82
+ if len(pts) < 3:
83
+ continue
84
+ cur_pt = pts[-1]
85
+ y = cur_pt[1]
86
+ zone_idx = get_zone_idx(y, h, len(centers_by_zone))
87
 
88
+ # skip entry region
89
+ if y < h * ENTRY_ZONE_RATIO:
 
90
  continue
91
+
92
+ # current smoothed direction
93
+ v = smooth_direction(pts)
94
+
95
+ # compare with zone's dominant flows (take best match)
96
+ centers = centers_by_zone[zone_idx]
97
+ angles = [angle_between(v, c) for c in centers]
98
+ best_angle = min(angles)
99
+ label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG"
100
+ color = (0, 255, 0) if label == "OK" else (0, 0, 255)
101
+
102
+ # draw trajectory and label
103
+ for p1, p2 in zip(pts[:-1], pts[1:]):
104
+ cv2.line(overlay, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2)
105
+ cv2.circle(overlay, tuple(cur_pt.astype(int)), 5, color, -1)
106
+ cv2.putText(overlay, f"ID:{tid} {label}",
107
+ (int(cur_pt[0]) + 5, int(cur_pt[1]) - 5),
108
+ font, 0.6, color, 2)
 
109
 
110
  combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
111
  out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
 
113
  return out_path
114
 
115
 
116
+ # ------------------------------------------------------------
117
+ # 🖥️ Gradio Interface
118
+ # ------------------------------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
  description_text = """
120
+ ### 🚦 Wrong-Direction Detection (Stage 3Angle + Temporal + Zone-Aware)
121
+ 1. Upload the **trajectories JSON** from Stage 1.
122
+ 2. Upload the **flow model JSON** from Stage 2 (zone-wise).
123
+ 3. Optionally add a background road frame for overlay.
124
+
125
+ **Logic:**
126
+ - Compares each vehicle’s smoothed direction vector with the dominant flow of its zone.
127
+ - Ignores top-entry region to avoid false positives.
128
+ - Flags vehicles as WRONG if angular difference > 60°.
129
  """
130
 
 
 
 
131
  demo = gr.Interface(
132
+ fn=classify_wrong_direction,
133
  inputs=[
134
+ gr.File(label="Trajectories JSON (Stage 1)"),
135
+ gr.File(label="Flow Model JSON (Stage 2)"),
136
  gr.File(label="Optional background frame (.jpg)")
137
  ],
138
+ outputs=gr.Image(label="Wrong-Direction Overlay"),
139
+ title="🚗 Wrong-Direction Detection – Stage 3 (Angle + Temporal + Zones)",
140
+ description=description_text
 
 
 
 
141
  )
142
 
143
  if __name__ == "__main__":