|
|
from ultralytics import YOLO |
|
|
import torch.nn as nn |
|
|
import torch |
|
|
import numpy as np |
|
|
import queue |
|
|
import threading |
|
|
import cv2 |
|
|
|
|
|
|
|
|
class LicensePlateDetector(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
model: YOLO | nn.Module | object, |
|
|
img_size: list = [192, 192], |
|
|
queue_size: int = 10, |
|
|
): |
|
|
super(LicensePlateDetector, self).__init__() |
|
|
self.model = model |
|
|
|
|
|
if torch.backends.mps.is_available(): |
|
|
mps_device = torch.device("mps") |
|
|
self.model.to(mps_device) |
|
|
|
|
|
self.img_size = img_size |
|
|
self.input_queue = queue.Queue(maxsize=queue_size) |
|
|
self.output_queue = queue.Queue(maxsize=queue_size) |
|
|
self.processing_thread = threading.Thread( |
|
|
target=self._process_queue, daemon=True |
|
|
) |
|
|
self.processing_thread.start() |
|
|
|
|
|
def forward(self, input): |
|
|
self.input_queue.put(input) |
|
|
return self.output_queue.get() |
|
|
|
|
|
def _process_queue(self): |
|
|
while True: |
|
|
input_frame = self.input_queue.get() |
|
|
try: |
|
|
result = self._process_single_frame(input_frame) |
|
|
self.output_queue.put(result) |
|
|
except Exception as e: |
|
|
self.output_queue.put(None) |
|
|
print(f"Error in LicensePlateDetector: {e}") |
|
|
finally: |
|
|
self.input_queue.task_done() |
|
|
|
|
|
def _process_single_frame(self, input): |
|
|
detected_license_plates: list = self.predict(input) |
|
|
left_license_plate_half, right_license_plate_half, bbox = self.post_process( |
|
|
detected_license_plates |
|
|
) |
|
|
|
|
|
return left_license_plate_half, right_license_plate_half, bbox |
|
|
|
|
|
def predict(self, frame): |
|
|
license_plate_detections: list = self.model.predict(frame, imgsz=self.img_size) |
|
|
return license_plate_detections |
|
|
|
|
|
def post_process(self, detections): |
|
|
UPPER_LEFT_Y, UPPER_LEFT_X, BOTTOM_RIGHT_Y, BOTTOM_RIGHT_X = 1, 0, 3, 2 |
|
|
|
|
|
for detection in detections: |
|
|
if len(detection.boxes.xyxy) == 0: |
|
|
continue |
|
|
bbox = ( |
|
|
detection.boxes.xyxy[0].cpu() |
|
|
if detection.boxes.xyxy[0] is not None |
|
|
else np.empty((0, 4)) |
|
|
) |
|
|
bbox = bbox.numpy().astype(int) |
|
|
|
|
|
DISTANCE_X = np.abs(bbox[UPPER_LEFT_X] - bbox[BOTTOM_RIGHT_X]) |
|
|
RIGHT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.15).astype(int) |
|
|
LEFT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.01).astype(int) |
|
|
|
|
|
midpoint_y = (bbox[UPPER_LEFT_Y] + bbox[BOTTOM_RIGHT_Y]) // 2 |
|
|
midpoint_x = (bbox[UPPER_LEFT_X] + bbox[BOTTOM_RIGHT_X]) // 2 |
|
|
|
|
|
left_half = detection.orig_img[ |
|
|
midpoint_y : bbox[BOTTOM_RIGHT_Y], |
|
|
bbox[UPPER_LEFT_X] : midpoint_x - LEFT_SHIFT_PERCENTAGE, |
|
|
] |
|
|
right_half = detection.orig_img[ |
|
|
midpoint_y : bbox[BOTTOM_RIGHT_Y], |
|
|
midpoint_x : bbox[BOTTOM_RIGHT_X] - RIGHT_SHIFT_PERCENTAGE, |
|
|
] |
|
|
|
|
|
return left_half, right_half, bbox |
|
|
|
|
|
raise Exception("No License Plate") |
|
|
|
|
|
def annotate_frame(self, frame, bbox): |
|
|
if bbox is not None: |
|
|
cv2.rectangle( |
|
|
frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2 |
|
|
) |
|
|
return frame |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|