| import gradio as gr |
| import numpy as np, cv2, json, tempfile, os |
| from sklearn.cluster import KMeans |
|
|
| |
| |
| |
| def extract_motion_vectors(data): |
| vectors = [] |
| for k, pts in data.items(): |
| pts = np.array(pts) |
| if len(pts) < 2: |
| continue |
| diffs = np.diff(pts, axis=0) |
| for d in diffs: |
| if np.linalg.norm(d) > 1: |
| vectors.append(d) |
| return np.array(vectors) |
|
|
|
|
| |
| |
| |
| def cluster_by_angle(vectors, n_clusters=2): |
| """Cluster motion directions using circular (angle-space) logic.""" |
| if len(vectors) < n_clusters: |
| return None, None |
|
|
| |
| angles = np.degrees(np.arctan2(vectors[:, 1], vectors[:, 0])) |
| angles = angles.reshape(-1, 1) |
|
|
| |
| kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42) |
| kmeans.fit(angles) |
| centers = kmeans.cluster_centers_.flatten() |
|
|
| |
| centers_rad = np.radians(centers) |
| flow_vectors = np.column_stack((np.cos(centers_rad), np.sin(centers_rad))) |
|
|
| |
| if len(flow_vectors) >= 2: |
| sim = np.dot(flow_vectors[0], flow_vectors[1]) |
| if sim > -0.8: |
| flow_vectors[1] = -flow_vectors[0] |
|
|
| |
| def angle_distance(a, b): |
| d = np.abs(a - b) |
| return np.minimum(d, 360 - d) |
|
|
| labels = np.zeros(len(angles), dtype=int) |
| for i, ang in enumerate(angles.flatten()): |
| d0 = angle_distance(ang, centers[0]) |
| d1 = angle_distance(ang, centers[1]) if n_clusters > 1 else 999 |
| labels[i] = 0 if d0 < d1 else 1 |
|
|
| return labels, flow_vectors |
|
|
|
|
| |
| |
| |
| def estimate_road_angle(centers): |
| """Return average flow direction in degrees (0° = right).""" |
| if centers is None or len(centers) == 0: |
| return 0.0 |
| dominant = np.mean(centers, axis=0) |
| angle = np.degrees(np.arctan2(dominant[1], dominant[0])) |
| return float(angle % 360) |
|
|
|
|
| |
| |
| |
| def draw_flow_overlay(vectors, labels, centers, bg_img=None, |
| drive_zone=None, entry_zones=None): |
| if bg_img and os.path.exists(bg_img): |
| bg = cv2.imread(bg_img) |
| if bg is None: |
| bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 |
| else: |
| bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 |
|
|
| overlay = bg.copy() |
| colors = [(0, 0, 255), (255, 255, 0)] |
|
|
| norms = np.linalg.norm(vectors, axis=1, keepdims=True) |
| vectors = np.divide(vectors, norms + 1e-6) * 10 |
|
|
| for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)): |
| if i % 15 != 0: |
| continue |
| start = (np.random.randint(0, overlay.shape[1]), |
| np.random.randint(0, overlay.shape[0])) |
| end = (int(start[0] + vx), int(start[1] + vy)) |
| cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3) |
|
|
| h, w = overlay.shape[:2] |
| scale = 300 |
| center_pt = (w // 2, h // 2) |
|
|
| for i, c in enumerate(centers): |
| c = c / (np.linalg.norm(c) + 1e-6) |
| end = (int(center_pt[0] + c[0] * scale), |
| int(center_pt[1] + c[1] * scale)) |
| offset = (i - 0.5) * 40 |
| start = (center_pt[0], int(center_pt[1] + offset)) |
| cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4) |
| cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) |
|
|
| if drive_zone is not None: |
| cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2) |
| cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) |
| if entry_zones: |
| for ez in entry_zones: |
| cv2.polylines(overlay, [np.array(ez, np.int32)], True, (0, 0, 255), 2) |
| cv2.putText(overlay, "Entry Gate", tuple(np.array(ez[0], int)), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) |
|
|
| combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0) |
| out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name |
| cv2.imwrite(out_path, combined) |
| return out_path |
|
|
|
|
| |
| |
| |
| def process_json(json_file, background=None): |
| try: |
| data = json.load(open(json_file)) |
| except Exception as e: |
| return None, {"error": f"Invalid JSON file: {e}"} |
|
|
| vectors = extract_motion_vectors(data) |
| if len(vectors) == 0: |
| return None, {"error": "No motion vectors found."} |
|
|
| labels, centers = cluster_by_angle(vectors, n_clusters=2) |
| if labels is None: |
| return None, {"error": "Insufficient data for clustering."} |
|
|
| road_angle = estimate_road_angle(centers) |
|
|
| drive_zone = [[100, 100], [800, 100], [800, 500], [100, 500]] |
| entry_zones = [[[50, 100], [100, 100], [100, 500], [50, 500]]] |
|
|
| img_path = draw_flow_overlay(vectors, labels, centers, |
| background, drive_zone, entry_zones) |
|
|
| stats = { |
| "num_vectors": int(len(vectors)), |
| "dominant_flows": int(len(centers)), |
| "flow_centers": centers.tolist(), |
| "road_angle_deg": road_angle, |
| "drive_zone": drive_zone, |
| "entry_zones": entry_zones |
| } |
| return img_path, stats |
|
|
|
|
| |
| |
| |
| description_text = """ |
| ### 🧭 Dominant Flow Learning (Stage 2 — Angle-Based) |
| Clusters vehicle motion **by direction angle** on a circular scale, |
| giving cleaner opposite flows even on curved or diagonal roads. |
| """ |
|
|
| example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None |
| example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None |
|
|
| demo = gr.Interface( |
| fn=process_json, |
| inputs=[ |
| gr.File(label="Upload trajectories JSON"), |
| gr.File(label="Optional background frame (.jpg)") |
| ], |
| outputs=[ |
| gr.Image(label="Dominant Flow Overlay"), |
| gr.JSON(label="Flow Stats (Stage 2 Output)") |
| ], |
| title="🚗 Dominant Flow Learning – Stage 2 (Angle-Based)", |
| description=description_text, |
| examples=[[example_json, example_bg]] if example_json else None, |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|