File size: 7,696 Bytes
c26f873 b958c76 c26f873 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | # Import Libraries
import os
import cv2
import sys
import time
import numpy as np
from collections import defaultdict, deque
# Add ByteTrack to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'ByteTrack'))
from utils import *
import speed_config
from tqdm import tqdm
from ultralytics import YOLO
import supervision as sv
from yolox.tracker.byte_tracker import BYTETracker, STrack
class vehicle_tracker_and_counter:
def __init__(self,
source_video_path="assets/vehicle-counting.mp4",
target_video_path="assets/vehicle-counting-result.mp4",
use_tensorrt=False):
# YOLOv8 Object Detector
self.model_name = "yolov8x.pt"
self.yolo = YOLO(self.model_name)
if use_tensorrt:
try:
# Try to load model if it is already exported
self.model = YOLO('yolov8x.engine')
except:
# Export model
self.yolo.export(format='engine') # creates 'yolov8x.engine'
# Load the exported TensorRT model
self.model = YOLO('yolov8x.engine')
else:
self.model = self.yolo
self.model.fuse()
self.CLASS_NAMES_DICT = self.yolo.model.names
self.CLASS_ID = [2, 3, 5, 7]
# Line for counter
self.line_start = sv.Point(50, 1500)
self.line_end = sv.Point(3840-50, 1500)
# BYTETracke Object Tracker
self.byte_tracker = BYTETracker(BYTETrackerArgs())
# Video input and output path
self.source_video_path = source_video_path
self.target_video_path = target_video_path
# Create VideoInfo instance
self.video_info = sv.VideoInfo.from_video_path(self.source_video_path)
# Speed calculation setup
self.view_transformer = ViewTransformer(speed_config.SOURCE, speed_config.TARGET)
self.coordinates = defaultdict(lambda: deque(maxlen=self.video_info.fps))
# Create frame generator
self.generator = sv.get_video_frames_generator(self.source_video_path)
# Create LineCounter instance
self.line_counter = sv.LineZone(start=self.line_start, end=self.line_end)
# Create instance of BoxAnnotator and LineCounterAnnotator
thickness = sv.calculate_optimal_line_thickness(resolution_wh=self.video_info.resolution_wh)
text_scale = sv.calculate_optimal_text_scale(resolution_wh=self.video_info.resolution_wh)
self.box_annotator = sv.BoxAnnotator(thickness=thickness)
self.label_annotator = sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness)
self.line_annotator = sv.LineZoneAnnotator(thickness=thickness)
def run(self):
# Open target video file
with sv.VideoSink(self.target_video_path, self.video_info) as sink:
# loop over video frames
for frame in tqdm(self.generator, total=self.video_info.total_frames):
# model prediction on single frame and conversion to supervision Detections
start_time = time.time()
results = self.model(frame)
end_time = time.time()
fps = np.round(1/(end_time - start_time), 2)
cv2.putText(frame, f'FPS: {fps}s', (20,100), cv2.FONT_HERSHEY_SIMPLEX, 3, (0,0,255), 3)
detections = sv.Detections(
xyxy=results[0].boxes.xyxy.cpu().numpy(),
confidence=results[0].boxes.conf.cpu().numpy(),
class_id=results[0].boxes.cls.cpu().numpy().astype(int)
)
# filtering out detections with unwanted classes
mask = np.array([class_id in self.CLASS_ID for class_id in detections.class_id], dtype=bool)
detections = detections[mask]
# tracking detections
tracks = self.byte_tracker.update(
output_results=detections2boxes(detections=detections),
img_info=frame.shape,
img_size=frame.shape
)
tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
detections.tracker_id = np.array(tracker_id)
# filtering out detections without trackers
mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
detections = detections[mask]
# Calculate speeds using perspective transformation
points = []
for detection in detections:
x1, y1, x2, y2 = detection[0] # xyxy coordinates
bottom_center_x = (x1 + x2) / 2
bottom_center_y = y2
points.append([bottom_center_x, bottom_center_y])
points = np.array(points)
# Transform to real-world coordinates
if len(points) > 0:
transformed_points = self.view_transformer.transform_points(points)
else:
transformed_points = np.array([])
# format custom labels with speed
labels = []
for idx in range(len(detections)):
confidence = detections.confidence[idx]
class_id = detections.class_id[idx]
tracker_id = detections.tracker_id[idx] if detections.tracker_id is not None else None
if tracker_id is not None and idx < len(transformed_points):
_, y = transformed_points[idx]
self.coordinates[tracker_id].append(y)
# Need at least 0.5 seconds of data
if len(self.coordinates[tracker_id]) < self.video_info.fps / 2:
speed_text = ""
else:
# Calculate speed from first to last position
coordinate_start = self.coordinates[tracker_id][-1]
coordinate_end = self.coordinates[tracker_id][0]
distance = abs(coordinate_end - coordinate_start)
time_elapsed = len(self.coordinates[tracker_id]) / self.video_info.fps
speed = distance / time_elapsed * 3.6 # Convert m/s to km/h
speed_text = f" {int(speed)} km/h"
label = f"#{tracker_id} {self.CLASS_NAMES_DICT[class_id]} {confidence:0.2f}{speed_text}"
else:
label = f"{self.CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
labels.append(label)
# updating line counter
self.line_counter.trigger(detections=detections)
# annotate and display frame
frame = self.box_annotator.annotate(scene=frame.copy(), detections=detections)
frame = self.label_annotator.annotate(scene=frame, detections=detections, labels=labels)
frame = self.line_annotator.annotate(frame=frame, line_counter=self.line_counter)
sink.write_frame(frame)
if __name__ == '__main__':
input_video="assets/vehicle-counting.mp4"
output_video="assets/vehicle-counting-result.mp4"
pipeline = vehicle_tracker_and_counter(source_video_path=input_video, target_video_path=output_video, use_tensorrt=False)
pipeline.run()
|