import gradio as gr import numpy as np, cv2, json, tempfile, os from sklearn.cluster import KMeans # ============================================================ # 🧩 1. Compute motion vectors from trajectory JSON # ============================================================ def extract_motion_vectors(data): vectors = [] for k, pts in data.items(): pts = np.array(pts) if len(pts) < 2: continue diffs = np.diff(pts, axis=0) for d in diffs: if np.linalg.norm(d) > 1: # ignore jitter / static vectors.append(d) return np.array(vectors) # ============================================================ # 🧮 2. Auto-Adaptive Dominant Flow Clustering (with dominance sort) # ============================================================ def learn_flows_auto(vectors, normalize=True, max_clusters=2): """ Automatically chooses 1 or 2 flow clusters depending on angular spread. Also returns cluster membership counts to rank dominance. """ if len(vectors) < 3: return None, None, None # Normalize norms = np.linalg.norm(vectors, axis=1, keepdims=True) dirs = vectors / (norms + 1e-6) valid = (norms[:, 0] > 1.5) dirs = dirs[valid] if len(dirs) < 3: return None, None, None # Angular spread check angles = np.degrees(np.arctan2(dirs[:, 1], dirs[:, 0])) angles = (angles + 360) % 360 spread = np.ptp(angles) n_clusters = 1 if spread < 15 else max_clusters # Cluster kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42) kmeans.fit(dirs) labels = kmeans.labels_ centers = kmeans.cluster_centers_ # Normalize centers again centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6) # --- Dominance sorting by cluster size --- counts = np.bincount(labels) order = np.argsort(-counts) # descending centers = centers[order] # remap labels to new dominance order remap = {old: new for new, old in enumerate(order)} labels = np.array([remap[l] for l in labels]) return labels, centers, counts[order] # ============================================================ # 🎨 3. Visualization Utility # ============================================================ def draw_flow_overlay(vectors, labels, centers, counts, bg_img=None): if bg_img and os.path.exists(bg_img): bg = cv2.imread(bg_img) if bg is None: bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 else: bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 overlay = bg.copy() colors = [(0, 0, 255), (255, 255, 0), (255, 0, 255)] # normalize arrow lengths norms = np.linalg.norm(vectors, axis=1, keepdims=True) vectors = np.divide(vectors, norms + 1e-6) * 10 # small random field arrows for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)): if i % 15 != 0: continue start = (np.random.randint(0, overlay.shape[1]), np.random.randint(0, overlay.shape[0])) end = (int(start[0] + vx), int(start[1] + vy)) cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3) # --- Main dominant arrows --- h, w = overlay.shape[:2] scale = 300 center_pt = (w // 2, h // 2) for i, (c, count) in enumerate(zip(centers, counts)): c = c / (np.linalg.norm(c) + 1e-6) end = (int(center_pt[0] + c[0] * scale), int(center_pt[1] + c[1] * scale)) offset = (i - 0.5) * 40 start = (center_pt[0], int(center_pt[1] + offset)) cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4) cv2.putText( overlay, f"Flow {i+1} ({count} vecs)", (end[0] + 10, end[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, ) combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0) out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name cv2.imwrite(out_path, combined) return out_path # ============================================================ # 🚀 4. Combined Pipeline # ============================================================ def process_json(json_file, background=None): try: data = json.load(open(json_file)) except Exception as e: return None, {"error": f"Invalid JSON file: {e}"} vectors = extract_motion_vectors(data) if len(vectors) == 0: return None, {"error": "No motion vectors found."} labels, centers, counts = learn_flows_auto(vectors) if labels is None: return None, {"error": "Insufficient data for clustering."} centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6) img_path = draw_flow_overlay(vectors, labels, centers, counts, background) stats = { "num_vectors": int(len(vectors)), "dominant_flows": int(len(centers)), "flow_counts": counts.tolist(), "flow_centers": centers.tolist(), } return img_path, stats # ============================================================ # 🖥️ 5. Gradio Interface # ============================================================ description_text = """ ### 🧭 Dominant Flow Learning (Stage 2 – Auto + Dominance) Automatically detects if traffic is **one-way** or **two-way** and orders flows by **vehicle count** so Flow 1 is the true dominant direction. """ example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None demo = gr.Interface( fn=process_json, inputs=[ gr.File(label="Upload trajectories JSON"), gr.File(label="Optional background frame (.jpg)") ], outputs=[ gr.Image(label="Dominant Flow Overlay"), gr.JSON(label="Flow Stats") ], title="🚗 Dominant Flow Learning – Stage 2 (Auto + Dominance)", description=description_text, examples=[[example_json, example_bg]] if example_json else None, ) if __name__ == "__main__": demo.launch()