| | import gradio as gr |
| | import numpy as np, cv2, json, tempfile, os |
| | from sklearn.cluster import KMeans |
| |
|
| |
|
| | |
| | |
| | |
| | def extract_motion_vectors(data): |
| | vectors = [] |
| | for k, pts in data.items(): |
| | pts = np.array(pts) |
| | if len(pts) < 2: |
| | continue |
| | diffs = np.diff(pts, axis=0) |
| | for d in diffs: |
| | if np.linalg.norm(d) > 1: |
| | vectors.append(d) |
| | return np.array(vectors) |
| |
|
| |
|
| | |
| | |
| | |
| | def learn_flows_auto(vectors, normalize=True, max_clusters=2): |
| | """ |
| | Automatically chooses 1 or 2 flow clusters depending on angular spread. |
| | Also returns cluster membership counts to rank dominance. |
| | """ |
| | if len(vectors) < 3: |
| | return None, None, None |
| |
|
| | |
| | norms = np.linalg.norm(vectors, axis=1, keepdims=True) |
| | dirs = vectors / (norms + 1e-6) |
| | valid = (norms[:, 0] > 1.5) |
| | dirs = dirs[valid] |
| | if len(dirs) < 3: |
| | return None, None, None |
| |
|
| | |
| | angles = np.degrees(np.arctan2(dirs[:, 1], dirs[:, 0])) |
| | angles = (angles + 360) % 360 |
| | spread = np.ptp(angles) |
| | n_clusters = 1 if spread < 15 else max_clusters |
| |
|
| | |
| | kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42) |
| | kmeans.fit(dirs) |
| | labels = kmeans.labels_ |
| | centers = kmeans.cluster_centers_ |
| |
|
| | |
| | centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6) |
| |
|
| | |
| | counts = np.bincount(labels) |
| | order = np.argsort(-counts) |
| | centers = centers[order] |
| | |
| | remap = {old: new for new, old in enumerate(order)} |
| | labels = np.array([remap[l] for l in labels]) |
| |
|
| | return labels, centers, counts[order] |
| |
|
| |
|
| | |
| | |
| | |
| | def draw_flow_overlay(vectors, labels, centers, counts, bg_img=None): |
| | if bg_img and os.path.exists(bg_img): |
| | bg = cv2.imread(bg_img) |
| | if bg is None: |
| | bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 |
| | else: |
| | bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 |
| |
|
| | overlay = bg.copy() |
| | colors = [(0, 0, 255), (255, 255, 0), (255, 0, 255)] |
| |
|
| | |
| | norms = np.linalg.norm(vectors, axis=1, keepdims=True) |
| | vectors = np.divide(vectors, norms + 1e-6) * 10 |
| |
|
| | |
| | for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)): |
| | if i % 15 != 0: |
| | continue |
| | start = (np.random.randint(0, overlay.shape[1]), |
| | np.random.randint(0, overlay.shape[0])) |
| | end = (int(start[0] + vx), int(start[1] + vy)) |
| | cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3) |
| |
|
| | |
| | h, w = overlay.shape[:2] |
| | scale = 300 |
| | center_pt = (w // 2, h // 2) |
| |
|
| | for i, (c, count) in enumerate(zip(centers, counts)): |
| | c = c / (np.linalg.norm(c) + 1e-6) |
| | end = (int(center_pt[0] + c[0] * scale), |
| | int(center_pt[1] + c[1] * scale)) |
| | offset = (i - 0.5) * 40 |
| | start = (center_pt[0], int(center_pt[1] + offset)) |
| | cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4) |
| | cv2.putText( |
| | overlay, |
| | f"Flow {i+1} ({count} vecs)", |
| | (end[0] + 10, end[1]), |
| | cv2.FONT_HERSHEY_SIMPLEX, |
| | 0.7, |
| | (0, 255, 0), |
| | 2, |
| | ) |
| |
|
| | combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0) |
| | out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name |
| | cv2.imwrite(out_path, combined) |
| | return out_path |
| |
|
| |
|
| | |
| | |
| | |
| | def process_json(json_file, background=None): |
| | try: |
| | data = json.load(open(json_file)) |
| | except Exception as e: |
| | return None, {"error": f"Invalid JSON file: {e}"} |
| |
|
| | vectors = extract_motion_vectors(data) |
| | if len(vectors) == 0: |
| | return None, {"error": "No motion vectors found."} |
| |
|
| | labels, centers, counts = learn_flows_auto(vectors) |
| | if labels is None: |
| | return None, {"error": "Insufficient data for clustering."} |
| |
|
| | centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6) |
| | img_path = draw_flow_overlay(vectors, labels, centers, counts, background) |
| |
|
| | stats = { |
| | "num_vectors": int(len(vectors)), |
| | "dominant_flows": int(len(centers)), |
| | "flow_counts": counts.tolist(), |
| | "flow_centers": centers.tolist(), |
| | } |
| | return img_path, stats |
| |
|
| |
|
| | |
| | |
| | |
| | description_text = """ |
| | ### 🧭 Dominant Flow Learning (Stage 2 – Auto + Dominance) |
| | Automatically detects if traffic is **one-way** or **two-way** |
| | and orders flows by **vehicle count** so Flow 1 is the true dominant direction. |
| | """ |
| |
|
| | example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None |
| | example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None |
| |
|
| | demo = gr.Interface( |
| | fn=process_json, |
| | inputs=[ |
| | gr.File(label="Upload trajectories JSON"), |
| | gr.File(label="Optional background frame (.jpg)") |
| | ], |
| | outputs=[ |
| | gr.Image(label="Dominant Flow Overlay"), |
| | gr.JSON(label="Flow Stats") |
| | ], |
| | title="🚗 Dominant Flow Learning – Stage 2 (Auto + Dominance)", |
| | description=description_text, |
| | examples=[[example_json, example_bg]] if example_json else None, |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|