nishanth-saka's picture
Angle-Based Flow Learning
21e12bf verified
raw
history blame
7.31 kB
import gradio as gr
import numpy as np, cv2, json, tempfile, os
from sklearn.cluster import KMeans
# ============================================================
# 🧩 1. Compute motion vectors from trajectory JSON
# ============================================================
def extract_motion_vectors(data):
vectors = []
for k, pts in data.items():
pts = np.array(pts)
if len(pts) < 2:
continue
diffs = np.diff(pts, axis=0)
for d in diffs:
if np.linalg.norm(d) > 1: # ignore jitter/static
vectors.append(d)
return np.array(vectors)
# ============================================================
# 🧮 2. Direction-Specific (Angle-Based) Clustering
# ============================================================
def cluster_by_angle(vectors, n_clusters=2):
"""Cluster motion directions using circular (angle-space) logic."""
if len(vectors) < n_clusters:
return None, None
# --- Convert to angles (−180° → 180°) ---
angles = np.degrees(np.arctan2(vectors[:, 1], vectors[:, 0]))
angles = angles.reshape(-1, 1)
# --- Run clustering in angle space ---
kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
kmeans.fit(angles)
centers = kmeans.cluster_centers_.flatten()
# --- Convert centers back to unit direction vectors ---
centers_rad = np.radians(centers)
flow_vectors = np.column_stack((np.cos(centers_rad), np.sin(centers_rad)))
# --- Ensure flows are sufficiently opposite (auto-flip if needed) ---
if len(flow_vectors) >= 2:
sim = np.dot(flow_vectors[0], flow_vectors[1])
if sim > -0.8: # not opposite enough
flow_vectors[1] = -flow_vectors[0]
# --- Assign labels based on closest angular distance ---
def angle_distance(a, b):
d = np.abs(a - b)
return np.minimum(d, 360 - d)
labels = np.zeros(len(angles), dtype=int)
for i, ang in enumerate(angles.flatten()):
d0 = angle_distance(ang, centers[0])
d1 = angle_distance(ang, centers[1]) if n_clusters > 1 else 999
labels[i] = 0 if d0 < d1 else 1
return labels, flow_vectors
# ============================================================
# 🧭 3. Estimate Road Angle from Dominant Flow
# ============================================================
def estimate_road_angle(centers):
"""Return average flow direction in degrees (0° = right)."""
if centers is None or len(centers) == 0:
return 0.0
dominant = np.mean(centers, axis=0)
angle = np.degrees(np.arctan2(dominant[1], dominant[0]))
return float(angle % 360)
# ============================================================
# 🎨 4. Visualization Utility
# ============================================================
def draw_flow_overlay(vectors, labels, centers, bg_img=None,
drive_zone=None, entry_zones=None):
if bg_img and os.path.exists(bg_img):
bg = cv2.imread(bg_img)
if bg is None:
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
else:
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
overlay = bg.copy()
colors = [(0, 0, 255), (255, 255, 0)]
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
vectors = np.divide(vectors, norms + 1e-6) * 10
for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
if i % 15 != 0:
continue
start = (np.random.randint(0, overlay.shape[1]),
np.random.randint(0, overlay.shape[0]))
end = (int(start[0] + vx), int(start[1] + vy))
cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3)
h, w = overlay.shape[:2]
scale = 300
center_pt = (w // 2, h // 2)
for i, c in enumerate(centers):
c = c / (np.linalg.norm(c) + 1e-6)
end = (int(center_pt[0] + c[0] * scale),
int(center_pt[1] + c[1] * scale))
offset = (i - 0.5) * 40
start = (center_pt[0], int(center_pt[1] + offset))
cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4)
cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
if drive_zone is not None:
cv2.polylines(overlay, [np.array(drive_zone, np.int32)], True, (0, 255, 255), 2)
cv2.putText(overlay, "Drive Zone", tuple(np.array(drive_zone[0], int)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
if entry_zones:
for ez in entry_zones:
cv2.polylines(overlay, [np.array(ez, np.int32)], True, (0, 0, 255), 2)
cv2.putText(overlay, "Entry Gate", tuple(np.array(ez[0], int)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
cv2.imwrite(out_path, combined)
return out_path
# ============================================================
# 🚀 5. Combined Pipeline
# ============================================================
def process_json(json_file, background=None):
try:
data = json.load(open(json_file))
except Exception as e:
return None, {"error": f"Invalid JSON file: {e}"}
vectors = extract_motion_vectors(data)
if len(vectors) == 0:
return None, {"error": "No motion vectors found."}
labels, centers = cluster_by_angle(vectors, n_clusters=2)
if labels is None:
return None, {"error": "Insufficient data for clustering."}
road_angle = estimate_road_angle(centers)
drive_zone = [[100, 100], [800, 100], [800, 500], [100, 500]]
entry_zones = [[[50, 100], [100, 100], [100, 500], [50, 500]]]
img_path = draw_flow_overlay(vectors, labels, centers,
background, drive_zone, entry_zones)
stats = {
"num_vectors": int(len(vectors)),
"dominant_flows": int(len(centers)),
"flow_centers": centers.tolist(),
"road_angle_deg": road_angle,
"drive_zone": drive_zone,
"entry_zones": entry_zones
}
return img_path, stats
# ============================================================
# 🖥️ 6. Gradio Interface
# ============================================================
description_text = """
### 🧭 Dominant Flow Learning (Stage 2 — Angle-Based)
Clusters vehicle motion **by direction angle** on a circular scale,
giving cleaner opposite flows even on curved or diagonal roads.
"""
example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None
demo = gr.Interface(
fn=process_json,
inputs=[
gr.File(label="Upload trajectories JSON"),
gr.File(label="Optional background frame (.jpg)")
],
outputs=[
gr.Image(label="Dominant Flow Overlay"),
gr.JSON(label="Flow Stats (Stage 2 Output)")
],
title="🚗 Dominant Flow Learning – Stage 2 (Angle-Based)",
description=description_text,
examples=[[example_json, example_bg]] if example_json else None,
)
if __name__ == "__main__":
demo.launch()