nishanth-saka's picture
BBX only
8754ca5 verified
import os
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
import cv2
import json
import time
import tempfile
import numpy as np
from tqdm import tqdm
import torch
import gradio as gr
from ultralytics import YOLO
# -------------------------------
# PyTorch safe load (HF fix)
# -------------------------------
import ultralytics.nn.tasks as ultralytics_tasks
torch.serialization.add_safe_globals([ultralytics_tasks.DetectionModel])
# -------------------------------
# Model init
# -------------------------------
MODEL_PATH = "yolov8n.pt"
model = YOLO(MODEL_PATH)
# Vehicle classes from COCO
VEHICLE_CLASSES = [2, 3, 5, 7] # car, motorcycle, bus, truck
# -------------------------------
# RTSP Processor
# -------------------------------
def process_rtsp(rtsp_url, duration_sec=60):
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
raise RuntimeError("❌ Unable to open RTSP stream")
fps = cap.get(cv2.CAP_PROP_FPS)
fps = fps if fps and fps > 0 else 25
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# Output video
out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
writer = cv2.VideoWriter(
out_file.name,
cv2.VideoWriter_fourcc(*"mp4v"),
fps,
(w, h),
)
trajectories = {}
start_time = time.time()
frame_count = 0
stream = model.track(
source=rtsp_url,
tracker="bytetrack.yaml",
stream=True,
persist=True,
conf=0.3,
iou=0.5,
classes=VEHICLE_CLASSES,
verbose=False,
)
pbar = tqdm(desc="RTSP Processing", unit="frame")
for result in stream:
if time.time() - start_time > duration_sec:
break
frame = result.orig_img.copy()
frame_count += 1
boxes = result.boxes
if boxes is not None and boxes.id is not None:
ids = boxes.id.int().cpu().numpy()
xyxy = boxes.xyxy.cpu().numpy()
for tid, box in zip(ids, xyxy):
x1, y1, x2, y2 = map(int, box)
# Draw bounding box ONLY
cv2.rectangle(
frame,
(x1, y1),
(x2, y2),
(0, 255, 0),
2,
)
# Update trajectory
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
trajectories.setdefault(int(tid), []).append((float(cx), float(cy)))
# Draw trajectories
for tid, pts in trajectories.items():
if len(pts) < 2:
continue
for i in range(1, len(pts)):
cv2.line(
frame,
(int(pts[i - 1][0]), int(pts[i - 1][1])),
(int(pts[i][0]), int(pts[i][1])),
(0, 255, 0),
1,
)
writer.write(frame)
pbar.update(1)
writer.release()
pbar.close()
summary = {
"duration_sec": round(time.time() - start_time, 1),
"frames_processed": frame_count,
"tracks": len(trajectories),
"avg_fps_est": round(frame_count / max(1, time.time() - start_time), 2),
}
# JSON-safe keys for Gradio
trajectories_json = {str(k): v for k, v in trajectories.items()}
return out_file.name, trajectories_json, summary
# -------------------------------
# Gradio wrapper
# -------------------------------
def run(rtsp_url, duration):
video, traj, stats = process_rtsp(rtsp_url, duration)
return video, traj, stats
# -------------------------------
# UI
# -------------------------------
description = """
### 🚦 Dominant Flow Tracker (RTSP – Stage 1)
- RTSP live CCTV / NVR input
- YOLOv8 + ByteTrack
- Bounding boxes only (no labels)
- Trajectory export for dominant flow analysis
"""
demo = gr.Interface(
fn=run,
inputs=[
gr.Textbox(
label="RTSP URL",
placeholder="rtsp://user:pass@ip:554/Streaming/Channels/101",
),
gr.Slider(10, 300, value=60, step=10, label="Capture Duration (seconds)"),
],
outputs=[
gr.Video(label="Tracked Output"),
gr.JSON(label="Trajectories"),
gr.JSON(label="Summary"),
],
title="πŸš— Dominant Flow Tracker – RTSP",
description=description,
)
if __name__ == "__main__":
demo.launch()