from ultralytics import YOLO import torch.nn as nn import torch import numpy as np import queue import threading import cv2 class LicensePlateDetector(nn.Module): def __init__( self, model: YOLO | nn.Module | object, img_size: list = [192, 192], queue_size: int = 10, ): super(LicensePlateDetector, self).__init__() self.model = model if torch.backends.mps.is_available(): mps_device = torch.device("mps") self.model.to(mps_device) self.img_size = img_size self.input_queue = queue.Queue(maxsize=queue_size) self.output_queue = queue.Queue(maxsize=queue_size) self.processing_thread = threading.Thread( target=self._process_queue, daemon=True ) self.processing_thread.start() def forward(self, input): self.input_queue.put(input) return self.output_queue.get() def _process_queue(self): while True: input_frame = self.input_queue.get() try: result = self._process_single_frame(input_frame) self.output_queue.put(result) except Exception as e: self.output_queue.put(None) # Indicate error print(f"Error in LicensePlateDetector: {e}") finally: self.input_queue.task_done() def _process_single_frame(self, input): detected_license_plates: list = self.predict(input) left_license_plate_half, right_license_plate_half, bbox = self.post_process( detected_license_plates ) # self.annotate_frame(input, bbox) return left_license_plate_half, right_license_plate_half, bbox def predict(self, frame): license_plate_detections: list = self.model.predict(frame, imgsz=self.img_size) return license_plate_detections def post_process(self, detections): UPPER_LEFT_Y, UPPER_LEFT_X, BOTTOM_RIGHT_Y, BOTTOM_RIGHT_X = 1, 0, 3, 2 for detection in detections: if len(detection.boxes.xyxy) == 0: continue bbox = ( detection.boxes.xyxy[0].cpu() if detection.boxes.xyxy[0] is not None else np.empty((0, 4)) ) bbox = bbox.numpy().astype(int) DISTANCE_X = np.abs(bbox[UPPER_LEFT_X] - bbox[BOTTOM_RIGHT_X]) RIGHT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.15).astype(int) LEFT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.01).astype(int) midpoint_y = (bbox[UPPER_LEFT_Y] + bbox[BOTTOM_RIGHT_Y]) // 2 midpoint_x = (bbox[UPPER_LEFT_X] + bbox[BOTTOM_RIGHT_X]) // 2 left_half = detection.orig_img[ midpoint_y : bbox[BOTTOM_RIGHT_Y], bbox[UPPER_LEFT_X] : midpoint_x - LEFT_SHIFT_PERCENTAGE, ] right_half = detection.orig_img[ midpoint_y : bbox[BOTTOM_RIGHT_Y], midpoint_x : bbox[BOTTOM_RIGHT_X] - RIGHT_SHIFT_PERCENTAGE, ] return left_half, right_half, bbox raise Exception("No License Plate") def annotate_frame(self, frame, bbox): if bbox is not None: cv2.rectangle( frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2 ) # Red bounding box return frame ## Training, Validation, and Testing may be like this: # transform = transforms.Compose([ # transforms.Resize((192, 192)), # transforms.ToTensor() # ]) # train_dataset = LicensePlateDetectorDataset(root_dir, subset='train', transform=transform) # valid_dataset = LicensePlateDetectorDataset(root_dir, subset='valid', transform=transform) # test_dataset = LicensePlateDetectorDataset(root_dir, subset='test', transform=transform) # train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) # valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False) # test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False) # yolo_model_path = 'path/to/yolo_model.pt' # yolo_model = YOLO(yolo_model_path) # detector = LicensePlateDetector(yolo_model) # detector.train(data_path=data_path, num_epochs=100) # # Evaluate on validation set # detector.validate(detector, valid_loader) # # Evaluate on test set # detector.test(detector, test_loader)