| import gradio as gr |
| import numpy as np, cv2, json, tempfile, os |
| from sklearn.cluster import KMeans |
|
|
| |
| |
| |
| def extract_motion_vectors(data): |
| vectors = [] |
| for k, pts in data.items(): |
| pts = np.array(pts) |
| if len(pts) < 2: |
| continue |
| diffs = np.diff(pts, axis=0) |
| for d in diffs: |
| if np.linalg.norm(d) > 1: |
| vectors.append(d) |
| return np.array(vectors) |
|
|
|
|
| |
| |
| |
| def learn_flows_improved(vectors, n_clusters=2, normalize=True): |
| """ |
| Improved dominant-flow clustering: |
| - Normalizes all vectors to unit direction (ignores speed) |
| - Clusters by angular orientation (cosine distance) |
| - Ignores low-magnitude / noisy motions |
| """ |
| if len(vectors) < n_clusters: |
| return None, None |
|
|
| |
| norms = np.linalg.norm(vectors, axis=1, keepdims=True) |
| dirs = vectors / (norms + 1e-6) |
|
|
| |
| valid = (norms[:, 0] > 1.5) |
| dirs = dirs[valid] |
| if len(dirs) < n_clusters: |
| return None, None |
|
|
| |
| kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42) |
| kmeans.fit(dirs) |
| centers = kmeans.cluster_centers_ |
|
|
| |
| centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6) |
|
|
| |
| sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T) |
| labels = np.argmax(sims, axis=1) |
|
|
| return labels, centers |
|
|
|
|
| |
| |
| |
| def draw_flow_overlay(vectors, labels, centers, bg_img=None): |
| |
| if bg_img and os.path.exists(bg_img): |
| bg = cv2.imread(bg_img) |
| if bg is None: |
| bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 |
| else: |
| bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 |
|
|
| overlay = bg.copy() |
| colors = [(0, 0, 255), (255, 255, 0)] |
|
|
| |
| norms = np.linalg.norm(vectors, axis=1, keepdims=True) |
| vectors = np.divide(vectors, norms + 1e-6) * 10 |
|
|
| |
| for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)): |
| if i % 15 != 0: |
| continue |
| start = (np.random.randint(0, overlay.shape[1]), |
| np.random.randint(0, overlay.shape[0])) |
| end = (int(start[0] + vx), int(start[1] + vy)) |
| cv2.arrowedLine(overlay, start, end, colors[lab % 2], 1, tipLength=0.3) |
|
|
| |
| h, w = overlay.shape[:2] |
| scale = 300 |
| center_pt = (w // 2, h // 2) |
|
|
| for i, c in enumerate(centers): |
| c = c / (np.linalg.norm(c) + 1e-6) |
| end = (int(center_pt[0] + c[0] * scale), |
| int(center_pt[1] + c[1] * scale)) |
| offset = (i - 0.5) * 40 |
| start = (center_pt[0], int(center_pt[1] + offset)) |
| cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4) |
| cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) |
|
|
| combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0) |
| out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name |
| cv2.imwrite(out_path, combined) |
| return out_path |
|
|
|
|
| |
| |
| |
| def process_json(json_file, background=None): |
| try: |
| data = json.load(open(json_file)) |
| except Exception as e: |
| return None, {"error": f"Invalid JSON file: {e}"} |
|
|
| vectors = extract_motion_vectors(data) |
| if len(vectors) == 0: |
| return None, {"error": "No motion vectors found."} |
|
|
| labels, centers = learn_flows_improved(vectors) |
| if labels is None: |
| return None, {"error": "Insufficient data for clustering."} |
|
|
| centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6) |
| img_path = draw_flow_overlay(vectors, labels, centers, background) |
|
|
| stats = { |
| "num_vectors": int(len(vectors)), |
| "dominant_flows": int(len(centers)), |
| "flow_centers": centers.tolist() |
| } |
| return img_path, stats |
|
|
|
|
| |
| |
| |
| description_text = """ |
| ### ๐งญ Dominant Flow Learning (Stage 2 โ Cosine-Based Improved) |
| Upload the **trajectories JSON** from Stage 1. |
| Optionally upload a background frame for overlay visualization. |
| """ |
|
|
| example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None |
| example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None |
|
|
| demo = gr.Interface( |
| fn=process_json, |
| inputs=[ |
| gr.File(label="Upload trajectories JSON"), |
| gr.File(label="Optional background frame (.jpg)") |
| ], |
| outputs=[ |
| gr.Image(label="Dominant Flow Overlay"), |
| gr.JSON(label="Flow Stats") |
| ], |
| title="๐ Dominant Flow Learning โ Stage 2 (Cosine-Based Improved)", |
| description=description_text, |
| examples=[[example_json, example_bg]] if example_json else None, |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |