| import os |
| os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" |
|
|
| import cv2 |
| import json |
| import time |
| import tempfile |
| import numpy as np |
| from tqdm import tqdm |
|
|
| import torch |
| import gradio as gr |
| from ultralytics import YOLO |
|
|
| |
| |
| |
| import ultralytics.nn.tasks as ultralytics_tasks |
| torch.serialization.add_safe_globals([ultralytics_tasks.DetectionModel]) |
|
|
| |
| |
| |
| MODEL_PATH = "yolov8n.pt" |
| model = YOLO(MODEL_PATH) |
|
|
| |
| VEHICLE_CLASSES = [2, 3, 5, 7] |
|
|
|
|
| |
| |
| |
| def process_rtsp(rtsp_url, duration_sec=60): |
|
|
| cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) |
| if not cap.isOpened(): |
| raise RuntimeError("β Unable to open RTSP stream") |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) |
| fps = fps if fps and fps > 0 else 25 |
|
|
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| cap.release() |
|
|
| |
| out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
| writer = cv2.VideoWriter( |
| out_file.name, |
| cv2.VideoWriter_fourcc(*"mp4v"), |
| fps, |
| (w, h), |
| ) |
|
|
| trajectories = {} |
| start_time = time.time() |
| frame_count = 0 |
|
|
| stream = model.track( |
| source=rtsp_url, |
| tracker="bytetrack.yaml", |
| stream=True, |
| persist=True, |
| conf=0.3, |
| iou=0.5, |
| classes=VEHICLE_CLASSES, |
| verbose=False, |
| ) |
|
|
| pbar = tqdm(desc="RTSP Processing", unit="frame") |
|
|
| for result in stream: |
| if time.time() - start_time > duration_sec: |
| break |
|
|
| frame = result.orig_img.copy() |
| frame_count += 1 |
|
|
| boxes = result.boxes |
| if boxes is not None and boxes.id is not None: |
| ids = boxes.id.int().cpu().numpy() |
| xyxy = boxes.xyxy.cpu().numpy() |
|
|
| for tid, box in zip(ids, xyxy): |
| x1, y1, x2, y2 = map(int, box) |
|
|
| |
| cv2.rectangle( |
| frame, |
| (x1, y1), |
| (x2, y2), |
| (0, 255, 0), |
| 2, |
| ) |
|
|
| |
| cx = (x1 + x2) / 2 |
| cy = (y1 + y2) / 2 |
| trajectories.setdefault(int(tid), []).append((float(cx), float(cy))) |
|
|
| |
| for tid, pts in trajectories.items(): |
| if len(pts) < 2: |
| continue |
| for i in range(1, len(pts)): |
| cv2.line( |
| frame, |
| (int(pts[i - 1][0]), int(pts[i - 1][1])), |
| (int(pts[i][0]), int(pts[i][1])), |
| (0, 255, 0), |
| 1, |
| ) |
|
|
| writer.write(frame) |
| pbar.update(1) |
|
|
| writer.release() |
| pbar.close() |
|
|
| summary = { |
| "duration_sec": round(time.time() - start_time, 1), |
| "frames_processed": frame_count, |
| "tracks": len(trajectories), |
| "avg_fps_est": round(frame_count / max(1, time.time() - start_time), 2), |
| } |
|
|
| |
| trajectories_json = {str(k): v for k, v in trajectories.items()} |
|
|
| return out_file.name, trajectories_json, summary |
|
|
|
|
| |
| |
| |
| def run(rtsp_url, duration): |
| video, traj, stats = process_rtsp(rtsp_url, duration) |
| return video, traj, stats |
|
|
|
|
| |
| |
| |
| description = """ |
| ### π¦ Dominant Flow Tracker (RTSP β Stage 1) |
| |
| - RTSP live CCTV / NVR input |
| - YOLOv8 + ByteTrack |
| - Bounding boxes only (no labels) |
| - Trajectory export for dominant flow analysis |
| """ |
|
|
| demo = gr.Interface( |
| fn=run, |
| inputs=[ |
| gr.Textbox( |
| label="RTSP URL", |
| placeholder="rtsp://user:pass@ip:554/Streaming/Channels/101", |
| ), |
| gr.Slider(10, 300, value=60, step=10, label="Capture Duration (seconds)"), |
| ], |
| outputs=[ |
| gr.Video(label="Tracked Output"), |
| gr.JSON(label="Trajectories"), |
| gr.JSON(label="Summary"), |
| ], |
| title="π Dominant Flow Tracker β RTSP", |
| description=description, |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|