nishanth-saka's picture
Auto lane detection (#12)
1d74031 verified
import gradio as gr
import numpy as np, cv2, json, tempfile, os
from sklearn.cluster import KMeans
# ============================================================
# 🧩 1. Compute motion vectors from trajectory JSON
# ============================================================
def extract_motion_vectors(data):
vectors = []
for k, pts in data.items():
pts = np.array(pts)
if len(pts) < 2:
continue
diffs = np.diff(pts, axis=0)
for d in diffs:
if np.linalg.norm(d) > 1: # ignore jitter / static
vectors.append(d)
return np.array(vectors)
# ============================================================
# 🧮 2. Auto-Adaptive Dominant Flow Clustering (with dominance sort)
# ============================================================
def learn_flows_auto(vectors, normalize=True, max_clusters=2):
"""
Automatically chooses 1 or 2 flow clusters depending on angular spread.
Also returns cluster membership counts to rank dominance.
"""
if len(vectors) < 3:
return None, None, None
# Normalize
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
dirs = vectors / (norms + 1e-6)
valid = (norms[:, 0] > 1.5)
dirs = dirs[valid]
if len(dirs) < 3:
return None, None, None
# Angular spread check
angles = np.degrees(np.arctan2(dirs[:, 1], dirs[:, 0]))
angles = (angles + 360) % 360
spread = np.ptp(angles)
n_clusters = 1 if spread < 15 else max_clusters
# Cluster
kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
kmeans.fit(dirs)
labels = kmeans.labels_
centers = kmeans.cluster_centers_
# Normalize centers again
centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
# --- Dominance sorting by cluster size ---
counts = np.bincount(labels)
order = np.argsort(-counts) # descending
centers = centers[order]
# remap labels to new dominance order
remap = {old: new for new, old in enumerate(order)}
labels = np.array([remap[l] for l in labels])
return labels, centers, counts[order]
# ============================================================
# 🎨 3. Visualization Utility
# ============================================================
def draw_flow_overlay(vectors, labels, centers, counts, bg_img=None):
if bg_img and os.path.exists(bg_img):
bg = cv2.imread(bg_img)
if bg is None:
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
else:
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
overlay = bg.copy()
colors = [(0, 0, 255), (255, 255, 0), (255, 0, 255)]
# normalize arrow lengths
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
vectors = np.divide(vectors, norms + 1e-6) * 10
# small random field arrows
for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
if i % 15 != 0:
continue
start = (np.random.randint(0, overlay.shape[1]),
np.random.randint(0, overlay.shape[0]))
end = (int(start[0] + vx), int(start[1] + vy))
cv2.arrowedLine(overlay, start, end, colors[lab % len(colors)], 1, tipLength=0.3)
# --- Main dominant arrows ---
h, w = overlay.shape[:2]
scale = 300
center_pt = (w // 2, h // 2)
for i, (c, count) in enumerate(zip(centers, counts)):
c = c / (np.linalg.norm(c) + 1e-6)
end = (int(center_pt[0] + c[0] * scale),
int(center_pt[1] + c[1] * scale))
offset = (i - 0.5) * 40
start = (center_pt[0], int(center_pt[1] + offset))
cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4)
cv2.putText(
overlay,
f"Flow {i+1} ({count} vecs)",
(end[0] + 10, end[1]),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 0),
2,
)
combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
cv2.imwrite(out_path, combined)
return out_path
# ============================================================
# 🚀 4. Combined Pipeline
# ============================================================
def process_json(json_file, background=None):
try:
data = json.load(open(json_file))
except Exception as e:
return None, {"error": f"Invalid JSON file: {e}"}
vectors = extract_motion_vectors(data)
if len(vectors) == 0:
return None, {"error": "No motion vectors found."}
labels, centers, counts = learn_flows_auto(vectors)
if labels is None:
return None, {"error": "Insufficient data for clustering."}
centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
img_path = draw_flow_overlay(vectors, labels, centers, counts, background)
stats = {
"num_vectors": int(len(vectors)),
"dominant_flows": int(len(centers)),
"flow_counts": counts.tolist(),
"flow_centers": centers.tolist(),
}
return img_path, stats
# ============================================================
# 🖥️ 5. Gradio Interface
# ============================================================
description_text = """
### 🧭 Dominant Flow Learning (Stage 2 – Auto + Dominance)
Automatically detects if traffic is **one-way** or **two-way**
and orders flows by **vehicle count** so Flow 1 is the true dominant direction.
"""
example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None
demo = gr.Interface(
fn=process_json,
inputs=[
gr.File(label="Upload trajectories JSON"),
gr.File(label="Optional background frame (.jpg)")
],
outputs=[
gr.Image(label="Dominant Flow Overlay"),
gr.JSON(label="Flow Stats")
],
title="🚗 Dominant Flow Learning – Stage 2 (Auto + Dominance)",
description=description_text,
examples=[[example_json, example_bg]] if example_json else None,
)
if __name__ == "__main__":
demo.launch()