| import torch |
| import gradio as gr |
| import cv2, os, numpy as np, tempfile, time, json |
| from filterpy.kalman import KalmanFilter |
| from scipy.optimize import linear_sum_assignment |
| from tqdm import tqdm |
| from sklearn.cluster import KMeans |
|
|
| |
| import ultralytics.nn.tasks as ultralytics_tasks |
| torch.serialization.add_safe_globals([ultralytics_tasks.DetectionModel]) |
| |
|
|
| from ultralytics import YOLO |
|
|
|
|
| |
| |
| |
| MODEL_PATH = "yolov8n.pt" |
| model = YOLO(MODEL_PATH) |
|
|
| VEHICLE_CLASSES = [2, 3, 5, 7] |
|
|
|
|
| |
| |
| |
| class Track: |
| def __init__(self, bbox, track_id): |
| self.id = track_id |
| self.kf = KalmanFilter(dim_x=4, dim_z=2) |
| self.kf.F = np.array([[1,0,1,0], |
| [0,1,0,1], |
| [0,0,1,0], |
| [0,0,0,1]]) |
| self.kf.H = np.array([[1,0,0,0], |
| [0,1,0,0]]) |
| self.kf.P *= 1000.0 |
| self.kf.R *= 10.0 |
|
|
| self.kf.x[:2] = np.array(self.get_centroid(bbox)).reshape(2,1) |
| self.trace = [] |
| self.vel_history = [] |
|
|
| def get_centroid(self, bbox): |
| x1,y1,x2,y2 = bbox |
| return [(x1+x2)/2,(y1+y2)/2] |
|
|
| def predict(self): |
| self.kf.predict() |
| return self.kf.x[:2].reshape(2) |
|
|
| def update(self, bbox): |
| z = np.array(self.get_centroid(bbox)).reshape(2,1) |
| self.kf.update(z) |
| cx, cy = self.kf.x[:2].reshape(2) |
|
|
| |
| vx, vy = self.kf.x[2], self.kf.x[3] |
| self.vel_history.append([float(vx), float(vy)]) |
|
|
| self.trace.append((float(cx), float(cy))) |
| return (cx, cy) |
|
|
|
|
| |
| |
| |
| def compute_dominant_direction(all_velocities): |
| if len(all_velocities) < 20: |
| return np.array([0, -1]) |
|
|
| V = np.array(all_velocities) |
|
|
| |
| mags = np.linalg.norm(V, axis=1) |
| V = V[mags > 0.5] |
| if len(V) < 10: |
| return np.array([0, -1]) |
|
|
| |
| Vn = V / (np.linalg.norm(V, axis=1, keepdims=True) + 1e-6) |
|
|
| |
| kmeans = KMeans(n_clusters=2, n_init=10) |
| labels = kmeans.fit_predict(Vn) |
|
|
| |
| counts = np.bincount(labels) |
| dominant_cluster = np.argmax(counts) |
|
|
| dominant_vec = Vn[labels == dominant_cluster].mean(axis=0) |
| dominant_vec /= (np.linalg.norm(dominant_vec) + 1e-6) |
|
|
| return dominant_vec |
|
|
|
|
| |
| |
| |
| def process_video(video_path): |
| cap = cv2.VideoCapture(video_path) |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25 |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| temp_out = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
| out = cv2.VideoWriter(temp_out.name, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)) |
|
|
| tracks = [] |
| next_id = 0 |
| trajectories = {} |
| all_velocities = [] |
|
|
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| pbar = tqdm(total=total_frames if total_frames>0 else 100, desc="Processing") |
|
|
| frame_count = 0 |
| dominant_vector = None |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| frame_count += 1 |
|
|
| |
| results = model(frame, verbose=False)[0] |
| detections = [] |
| for box in results.boxes: |
| cls = int(box.cls) |
| if cls in VEHICLE_CLASSES and box.conf > 0.3: |
| detections.append(box.xyxy[0].cpu().numpy()) |
|
|
| |
| predicted = [trk.predict() for trk in tracks] |
| predicted = np.array(predicted) if predicted else np.empty((0,2)) |
|
|
| |
| assigned = set() |
| if len(predicted) > 0 and len(detections) > 0: |
| cost = np.zeros((len(predicted), len(detections))) |
| for i, trk in enumerate(predicted): |
| for j, det in enumerate(detections): |
| cx, cy = ( (det[0]+det[2])/2 , (det[1]+det[3])/2 ) |
| cost[i,j] = np.linalg.norm(trk - np.array([cx,cy])) |
|
|
| row_ind, col_ind = linear_sum_assignment(cost) |
| for r, c in zip(row_ind, col_ind): |
| if cost[r, c] < 80: |
| assigned.add(c) |
| tracks[r].update(detections[c]) |
|
|
| |
| for j, det in enumerate(detections): |
| if j not in assigned: |
| trk = Track(det, next_id) |
| next_id += 1 |
| trk.update(det) |
| tracks.append(trk) |
|
|
| |
| if frame_count < int(fps * 4): |
| for trk in tracks: |
| if len(trk.vel_history) > 1: |
| all_velocities.append(trk.vel_history[-1]) |
|
|
| |
| if frame_count == int(fps * 4): |
| dominant_vector = compute_dominant_direction(all_velocities) |
| else: |
| |
| if dominant_vector is None: |
| dominant_vector = compute_dominant_direction(all_velocities) |
|
|
| |
| for trk in tracks: |
| if len(trk.trace) < 2: |
| continue |
|
|
| x, y = map(int, trk.trace[-1]) |
|
|
| |
| if len(trk.vel_history) >= 1: |
| vx, vy = trk.vel_history[-1] |
| mv = np.array([vx, vy]) |
| else: |
| mv = np.array([0, 0]) |
|
|
| mv_norm = mv / (np.linalg.norm(mv) + 1e-6) |
|
|
| |
| if dominant_vector is not None: |
| cos_sim = float(np.dot(mv_norm, dominant_vector)) |
| else: |
| cos_sim = 1.0 |
|
|
| |
| if cos_sim < -0.3: |
| color = (0, 0, 255) |
| label = f"ID:{trk.id} WRONG" |
| elif cos_sim < 0.1: |
| color = (0, 140, 255) |
| label = f"ID:{trk.id} ?" |
| else: |
| color = (0, 255, 0) |
| label = f"ID:{trk.id}" |
|
|
| |
| cv2.circle(frame, (x, y), 4, color, -1) |
| cv2.putText(frame, label, (x-10, y-10), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
|
|
| for i in range(1, len(trk.trace)): |
| cv2.line(frame, |
| (int(trk.trace[i-1][0]), int(trk.trace[i-1][1])), |
| (int(trk.trace[i][0]), int(trk.trace[i][1])), |
| color, 1) |
|
|
| trajectories[trk.id] = trk.trace |
|
|
| out.write(frame) |
| pbar.update(1) |
|
|
| cap.release() |
| out.release() |
| pbar.close() |
|
|
| |
| traj_json = tempfile.NamedTemporaryFile(delete=False, suffix=".json") |
| with open(traj_json.name, "w") as f: |
| json.dump(trajectories, f) |
|
|
| return temp_out.name, traj_json.name |
|
|
|
|
|
|
| |
| |
| |
| def run_app(video_file): |
| |
| temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name |
| if isinstance(video_file, dict) and "name" in video_file: |
| src_path = video_file["name"] |
| else: |
| src_path = video_file |
| with open(src_path, "rb") as src, open(temp_path, "wb") as dst: |
| dst.write(src.read()) |
|
|
| start = time.time() |
| out_path, json_path = process_video(temp_path) |
| end = time.time() |
|
|
| summary = { |
| "total_time_sec": round(end-start, 1), |
| "num_tracks": len(json.load(open(json_path))), |
| "avg_fps": round(cv2.VideoCapture(temp_path).get(cv2.CAP_PROP_FPS), 2) |
| } |
|
|
| return out_path, json.load(open(json_path)), summary |
|
|
|
|
| |
| |
| |
| description_text = """ |
| ### π¦ Dominant Flow Tracker (Stage 1) |
| Now with **Auto-Learn Wrong-Way Detection** |
| - YOLOv8 + Kalman Tracking |
| - Auto-dominant direction estimation |
| - Wrong-Way annotation (RED) |
| """ |
|
|
| demo = gr.Interface( |
| fn=run_app, |
| inputs=gr.Video(label="Upload Video (.mp4)"), |
| outputs=[ |
| gr.Video(label="Tracked Output (Wrong-Way Highlighted)"), |
| gr.JSON(label="Trajectories"), |
| gr.JSON(label="Summary Stats") |
| ], |
| title="π Stage-1 Auto Wrong-Way Tracker", |
| description=description_text |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|