nishanth-saka's picture
learned dominant flow
4ca1d80 verified
raw
history blame
9.27 kB
import torch
import gradio as gr
import cv2, os, numpy as np, tempfile, time, json
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment
from tqdm import tqdm
from sklearn.cluster import KMeans
# --- πŸ”§ PyTorch 2.6 safe load fix ---
import ultralytics.nn.tasks as ultralytics_tasks
torch.serialization.add_safe_globals([ultralytics_tasks.DetectionModel])
# -----------------------------------
from ultralytics import YOLO
# ---------------------------------------------------------
# βš™οΈ INIT
# ---------------------------------------------------------
MODEL_PATH = "yolov8n.pt"
model = YOLO(MODEL_PATH)
VEHICLE_CLASSES = [2, 3, 5, 7] # car, motorcycle, bus, truck
# ---------------------------------------------------------
# πŸ” SIMPLE KALMAN TRACKER
# ---------------------------------------------------------
class Track:
def __init__(self, bbox, track_id):
self.id = track_id
self.kf = KalmanFilter(dim_x=4, dim_z=2)
self.kf.F = np.array([[1,0,1,0],
[0,1,0,1],
[0,0,1,0],
[0,0,0,1]])
self.kf.H = np.array([[1,0,0,0],
[0,1,0,0]])
self.kf.P *= 1000.0
self.kf.R *= 10.0
self.kf.x[:2] = np.array(self.get_centroid(bbox)).reshape(2,1)
self.trace = []
self.vel_history = []
def get_centroid(self, bbox):
x1,y1,x2,y2 = bbox
return [(x1+x2)/2,(y1+y2)/2]
def predict(self):
self.kf.predict()
return self.kf.x[:2].reshape(2)
def update(self, bbox):
z = np.array(self.get_centroid(bbox)).reshape(2,1)
self.kf.update(z)
cx, cy = self.kf.x[:2].reshape(2)
# Save smoothed velocity
vx, vy = self.kf.x[2], self.kf.x[3]
self.vel_history.append([float(vx), float(vy)])
self.trace.append((float(cx), float(cy)))
return (cx, cy)
# ---------------------------------------------------------
# 🧠 AUTO-DETECT DOMINANT FLOW
# ---------------------------------------------------------
def compute_dominant_direction(all_velocities):
if len(all_velocities) < 20:
return np.array([0, -1]) # fallback (upwards)
V = np.array(all_velocities)
# Filter out tiny noise
mags = np.linalg.norm(V, axis=1)
V = V[mags > 0.5]
if len(V) < 10:
return np.array([0, -1])
# Normalize velocities
Vn = V / (np.linalg.norm(V, axis=1, keepdims=True) + 1e-6)
# Cluster using KMeans (2 flows expected in most roads)
kmeans = KMeans(n_clusters=2, n_init=10)
labels = kmeans.fit_predict(Vn)
# Largest cluster = dominant flow
counts = np.bincount(labels)
dominant_cluster = np.argmax(counts)
dominant_vec = Vn[labels == dominant_cluster].mean(axis=0)
dominant_vec /= (np.linalg.norm(dominant_vec) + 1e-6)
return dominant_vec
# ---------------------------------------------------------
# πŸŽ₯ MAIN PROCESSOR
# ---------------------------------------------------------
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
temp_out = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
out = cv2.VideoWriter(temp_out.name, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
tracks = []
next_id = 0
trajectories = {}
all_velocities = []
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
pbar = tqdm(total=total_frames if total_frames>0 else 100, desc="Processing")
frame_count = 0
dominant_vector = None
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# --- YOLO DETECTION ---
results = model(frame, verbose=False)[0]
detections = []
for box in results.boxes:
cls = int(box.cls)
if cls in VEHICLE_CLASSES and box.conf > 0.3:
detections.append(box.xyxy[0].cpu().numpy())
# --- PREDICT EXISTING TRACKS ---
predicted = [trk.predict() for trk in tracks]
predicted = np.array(predicted) if predicted else np.empty((0,2))
# --- ASSIGN DETECTIONS ---
assigned = set()
if len(predicted) > 0 and len(detections) > 0:
cost = np.zeros((len(predicted), len(detections)))
for i, trk in enumerate(predicted):
for j, det in enumerate(detections):
cx, cy = ( (det[0]+det[2])/2 , (det[1]+det[3])/2 )
cost[i,j] = np.linalg.norm(trk - np.array([cx,cy]))
row_ind, col_ind = linear_sum_assignment(cost)
for r, c in zip(row_ind, col_ind):
if cost[r, c] < 80:
assigned.add(c)
tracks[r].update(detections[c])
# --- NEW TRACKS ---
for j, det in enumerate(detections):
if j not in assigned:
trk = Track(det, next_id)
next_id += 1
trk.update(det)
tracks.append(trk)
# --- COLLECT VELOCITIES FOR DOMINANT FLOW ---
if frame_count < int(fps * 4): # first 4 seconds for learning
for trk in tracks:
if len(trk.vel_history) > 1:
all_velocities.append(trk.vel_history[-1])
# Compute dominant flow once enough samples are available
if frame_count == int(fps * 4):
dominant_vector = compute_dominant_direction(all_velocities)
else:
# Fallback if video too short
if dominant_vector is None:
dominant_vector = compute_dominant_direction(all_velocities)
# --- DRAW OUTPUT ---
for trk in tracks:
if len(trk.trace) < 2:
continue
x, y = map(int, trk.trace[-1])
# compute smoothed motion direction
if len(trk.vel_history) >= 1:
vx, vy = trk.vel_history[-1]
mv = np.array([vx, vy])
else:
mv = np.array([0, 0])
mv_norm = mv / (np.linalg.norm(mv) + 1e-6)
# cosine similarity with dominant direction
if dominant_vector is not None:
cos_sim = float(np.dot(mv_norm, dominant_vector))
else:
cos_sim = 1.0
# wrong-way logic
if cos_sim < -0.3:
color = (0, 0, 255)
label = f"ID:{trk.id} WRONG"
elif cos_sim < 0.1:
color = (0, 140, 255)
label = f"ID:{trk.id} ?"
else:
color = (0, 255, 0)
label = f"ID:{trk.id}"
# draw ID + path
cv2.circle(frame, (x, y), 4, color, -1)
cv2.putText(frame, label, (x-10, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
for i in range(1, len(trk.trace)):
cv2.line(frame,
(int(trk.trace[i-1][0]), int(trk.trace[i-1][1])),
(int(trk.trace[i][0]), int(trk.trace[i][1])),
color, 1)
trajectories[trk.id] = trk.trace
out.write(frame)
pbar.update(1)
cap.release()
out.release()
pbar.close()
# Save trajectories JSON
traj_json = tempfile.NamedTemporaryFile(delete=False, suffix=".json")
with open(traj_json.name, "w") as f:
json.dump(trajectories, f)
return temp_out.name, traj_json.name
# ---------------------------------------------------------
# πŸ“€ WRAPPER FOR GRADIO
# ---------------------------------------------------------
def run_app(video_file):
# Copy uploaded file
temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
if isinstance(video_file, dict) and "name" in video_file:
src_path = video_file["name"]
else:
src_path = video_file
with open(src_path, "rb") as src, open(temp_path, "wb") as dst:
dst.write(src.read())
start = time.time()
out_path, json_path = process_video(temp_path)
end = time.time()
summary = {
"total_time_sec": round(end-start, 1),
"num_tracks": len(json.load(open(json_path))),
"avg_fps": round(cv2.VideoCapture(temp_path).get(cv2.CAP_PROP_FPS), 2)
}
return out_path, json.load(open(json_path)), summary
# ---------------------------------------------------------
# πŸ–₯️ INTERFACE
# ---------------------------------------------------------
description_text = """
### 🚦 Dominant Flow Tracker (Stage 1)
Now with **Auto-Learn Wrong-Way Detection**
- YOLOv8 + Kalman Tracking
- Auto-dominant direction estimation
- Wrong-Way annotation (RED)
"""
demo = gr.Interface(
fn=run_app,
inputs=gr.Video(label="Upload Video (.mp4)"),
outputs=[
gr.Video(label="Tracked Output (Wrong-Way Highlighted)"),
gr.JSON(label="Trajectories"),
gr.JSON(label="Summary Stats")
],
title="πŸš— Stage-1 Auto Wrong-Way Tracker",
description=description_text
)
if __name__ == "__main__":
demo.launch()