| | |
| | import os |
| | import cv2 |
| | import sys |
| | import time |
| | import numpy as np |
| | from collections import defaultdict, deque |
| |
|
| | |
| | sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'ByteTrack')) |
| |
|
| | from utils import * |
| | import speed_config |
| | from tqdm import tqdm |
| | from ultralytics import YOLO |
| | import supervision as sv |
| | from yolox.tracker.byte_tracker import BYTETracker, STrack |
| |
|
| |
|
| | class vehicle_tracker_and_counter: |
| |
|
| | def __init__(self, |
| | source_video_path="assets/vehicle-counting.mp4", |
| | target_video_path="assets/vehicle-counting-result.mp4", |
| | use_tensorrt=False): |
| | |
| | |
| | self.model_name = "yolov8x.pt" |
| | self.yolo = YOLO(self.model_name) |
| |
|
| | if use_tensorrt: |
| | try: |
| | |
| | self.model = YOLO('yolov8x.engine') |
| | except: |
| | |
| | self.yolo.export(format='engine') |
| | |
| | self.model = YOLO('yolov8x.engine') |
| | else: |
| | self.model = self.yolo |
| | self.model.fuse() |
| |
|
| | self.CLASS_NAMES_DICT = self.yolo.model.names |
| | self.CLASS_ID = [2, 3, 5, 7] |
| | |
| | |
| | self.line_start = sv.Point(50, 1500) |
| | self.line_end = sv.Point(3840-50, 1500) |
| |
|
| | |
| | self.byte_tracker = BYTETracker(BYTETrackerArgs()) |
| |
|
| | |
| | self.source_video_path = source_video_path |
| | self.target_video_path = target_video_path |
| |
|
| | |
| | self.video_info = sv.VideoInfo.from_video_path(self.source_video_path) |
| | |
| | |
| | self.view_transformer = ViewTransformer(speed_config.SOURCE, speed_config.TARGET) |
| | self.coordinates = defaultdict(lambda: deque(maxlen=self.video_info.fps)) |
| | |
| | self.generator = sv.get_video_frames_generator(self.source_video_path) |
| | |
| | self.line_counter = sv.LineZone(start=self.line_start, end=self.line_end) |
| | |
| | thickness = sv.calculate_optimal_line_thickness(resolution_wh=self.video_info.resolution_wh) |
| | text_scale = sv.calculate_optimal_text_scale(resolution_wh=self.video_info.resolution_wh) |
| | self.box_annotator = sv.BoxAnnotator(thickness=thickness) |
| | self.label_annotator = sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness) |
| | self.line_annotator = sv.LineZoneAnnotator(thickness=thickness) |
| | |
| |
|
| | def run(self): |
| | |
| | with sv.VideoSink(self.target_video_path, self.video_info) as sink: |
| | |
| | for frame in tqdm(self.generator, total=self.video_info.total_frames): |
| | |
| | start_time = time.time() |
| | results = self.model(frame) |
| | end_time = time.time() |
| | fps = np.round(1/(end_time - start_time), 2) |
| | cv2.putText(frame, f'FPS: {fps}s', (20,100), cv2.FONT_HERSHEY_SIMPLEX, 3, (0,0,255), 3) |
| |
|
| | detections = sv.Detections( |
| | xyxy=results[0].boxes.xyxy.cpu().numpy(), |
| | confidence=results[0].boxes.conf.cpu().numpy(), |
| | class_id=results[0].boxes.cls.cpu().numpy().astype(int) |
| | ) |
| | |
| | mask = np.array([class_id in self.CLASS_ID for class_id in detections.class_id], dtype=bool) |
| | detections = detections[mask] |
| | |
| | tracks = self.byte_tracker.update( |
| | output_results=detections2boxes(detections=detections), |
| | img_info=frame.shape, |
| | img_size=frame.shape |
| | ) |
| | tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks) |
| | detections.tracker_id = np.array(tracker_id) |
| | |
| | mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool) |
| | detections = detections[mask] |
| | |
| | |
| | points = [] |
| | for detection in detections: |
| | x1, y1, x2, y2 = detection[0] |
| | bottom_center_x = (x1 + x2) / 2 |
| | bottom_center_y = y2 |
| | points.append([bottom_center_x, bottom_center_y]) |
| | points = np.array(points) |
| | |
| | |
| | if len(points) > 0: |
| | transformed_points = self.view_transformer.transform_points(points) |
| | else: |
| | transformed_points = np.array([]) |
| | |
| | |
| | labels = [] |
| | for idx in range(len(detections)): |
| | confidence = detections.confidence[idx] |
| | class_id = detections.class_id[idx] |
| | tracker_id = detections.tracker_id[idx] if detections.tracker_id is not None else None |
| | |
| | if tracker_id is not None and idx < len(transformed_points): |
| | _, y = transformed_points[idx] |
| | self.coordinates[tracker_id].append(y) |
| | |
| | |
| | if len(self.coordinates[tracker_id]) < self.video_info.fps / 2: |
| | speed_text = "" |
| | else: |
| | |
| | coordinate_start = self.coordinates[tracker_id][-1] |
| | coordinate_end = self.coordinates[tracker_id][0] |
| | distance = abs(coordinate_end - coordinate_start) |
| | time_elapsed = len(self.coordinates[tracker_id]) / self.video_info.fps |
| | speed = distance / time_elapsed * 3.6 |
| | speed_text = f" {int(speed)} km/h" |
| | |
| | label = f"#{tracker_id} {self.CLASS_NAMES_DICT[class_id]} {confidence:0.2f}{speed_text}" |
| | else: |
| | label = f"{self.CLASS_NAMES_DICT[class_id]} {confidence:0.2f}" |
| | |
| | labels.append(label) |
| | |
| | self.line_counter.trigger(detections=detections) |
| | |
| | frame = self.box_annotator.annotate(scene=frame.copy(), detections=detections) |
| | frame = self.label_annotator.annotate(scene=frame, detections=detections, labels=labels) |
| | frame = self.line_annotator.annotate(frame=frame, line_counter=self.line_counter) |
| | sink.write_frame(frame) |
| |
|
| | if __name__ == '__main__': |
| |
|
| | input_video="assets/vehicle-counting.mp4" |
| | output_video="assets/vehicle-counting-result.mp4" |
| | pipeline = vehicle_tracker_and_counter(source_video_path=input_video, target_video_path=output_video, use_tensorrt=False) |
| | pipeline.run() |
| |
|