nishanth-saka's picture
revert
9e65b10 verified
raw
history blame
5.91 kB
import gradio as gr
import numpy as np, cv2, json, tempfile, os
from sklearn.cluster import KMeans
# ============================================================
# ๐Ÿงฉ 1. Compute motion vectors from trajectory JSON
# ============================================================
def extract_motion_vectors(data):
vectors = []
for k, pts in data.items():
pts = np.array(pts)
if len(pts) < 2:
continue
diffs = np.diff(pts, axis=0)
for d in diffs:
if np.linalg.norm(d) > 1: # ignore jitter / static points
vectors.append(d)
return np.array(vectors)
# ============================================================
# ๐Ÿงฎ 2. Improved Dominant Flow Clustering (Cosine-based)
# ============================================================
def learn_flows_improved(vectors, n_clusters=2, normalize=True):
"""
Improved dominant-flow clustering:
- Normalizes all vectors to unit direction (ignores speed)
- Clusters by angular orientation (cosine distance)
- Ignores low-magnitude / noisy motions
"""
if len(vectors) < n_clusters:
return None, None
# (1) Normalize to direction only
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
dirs = vectors / (norms + 1e-6)
# (2) Filter out tiny motions
valid = (norms[:, 0] > 1.5)
dirs = dirs[valid]
if len(dirs) < n_clusters:
return None, None
# (3) KMeans on direction vectors (โ‰ˆ cosine distance)
kmeans = KMeans(n_clusters=n_clusters, n_init=20, random_state=42)
kmeans.fit(dirs)
centers = kmeans.cluster_centers_
# (4) Normalize cluster centers again
centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
# (5) Re-assign all original vectors to nearest angular center
sims = np.dot(vectors / (np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6), centers.T)
labels = np.argmax(sims, axis=1)
return labels, centers
# ============================================================
# ๐ŸŽจ 3. Visualization Utility (Option A โ€” Scaled-up Arrows)
# ============================================================
def draw_flow_overlay(vectors, labels, centers, bg_img=None):
# background
if bg_img and os.path.exists(bg_img):
bg = cv2.imread(bg_img)
if bg is None:
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
else:
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
overlay = bg.copy()
colors = [(0, 0, 255), (255, 255, 0)] # red & yellow
# normalize arrow lengths for small samples
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
vectors = np.divide(vectors, norms + 1e-6) * 10
# draw mini-arrows for field visualization
for i, ((vx, vy), lab) in enumerate(zip(vectors, labels)):
if i % 15 != 0:
continue
start = (np.random.randint(0, overlay.shape[1]),
np.random.randint(0, overlay.shape[0]))
end = (int(start[0] + vx), int(start[1] + vy))
cv2.arrowedLine(overlay, start, end, colors[lab % 2], 1, tipLength=0.3)
# --- main dominant arrows ---
h, w = overlay.shape[:2]
scale = 300
center_pt = (w // 2, h // 2)
for i, c in enumerate(centers):
c = c / (np.linalg.norm(c) + 1e-6)
end = (int(center_pt[0] + c[0] * scale),
int(center_pt[1] + c[1] * scale))
offset = (i - 0.5) * 40
start = (center_pt[0], int(center_pt[1] + offset))
cv2.arrowedLine(overlay, start, end, (0, 255, 0), 4, tipLength=0.4)
cv2.putText(overlay, f"Flow {i+1}", (end[0] + 10, end[1]),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
cv2.imwrite(out_path, combined)
return out_path
# ============================================================
# ๐Ÿš€ 4. Combined Pipeline
# ============================================================
def process_json(json_file, background=None):
try:
data = json.load(open(json_file))
except Exception as e:
return None, {"error": f"Invalid JSON file: {e}"}
vectors = extract_motion_vectors(data)
if len(vectors) == 0:
return None, {"error": "No motion vectors found."}
labels, centers = learn_flows_improved(vectors)
if labels is None:
return None, {"error": "Insufficient data for clustering."}
centers = centers / (np.linalg.norm(centers, axis=1, keepdims=True) + 1e-6)
img_path = draw_flow_overlay(vectors, labels, centers, background)
stats = {
"num_vectors": int(len(vectors)),
"dominant_flows": int(len(centers)),
"flow_centers": centers.tolist()
}
return img_path, stats
# ============================================================
# ๐Ÿ–ฅ๏ธ 5. Gradio Interface
# ============================================================
description_text = """
### ๐Ÿงญ Dominant Flow Learning (Stage 2 โ€” Cosine-Based Improved)
Upload the **trajectories JSON** from Stage 1.
Optionally upload a background frame for overlay visualization.
"""
example_json = "trajectories_sample.json" if os.path.exists("trajectories_sample.json") else None
example_bg = "frame_sample.jpg" if os.path.exists("frame_sample.jpg") else None
demo = gr.Interface(
fn=process_json,
inputs=[
gr.File(label="Upload trajectories JSON"),
gr.File(label="Optional background frame (.jpg)")
],
outputs=[
gr.Image(label="Dominant Flow Overlay"),
gr.JSON(label="Flow Stats")
],
title="๐Ÿš— Dominant Flow Learning โ€“ Stage 2 (Cosine-Based Improved)",
description=description_text,
examples=[[example_json, example_bg]] if example_json else None,
)
if __name__ == "__main__":
demo.launch()