File size: 4,412 Bytes
bd5a354
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from ultralytics import YOLO
import torch.nn as nn
import torch
import numpy as np
import queue
import threading
import cv2


class LicensePlateDetector(nn.Module):
    def __init__(
        self,
        model: YOLO | nn.Module | object,
        img_size: list = [192, 192],
        queue_size: int = 10,
    ):
        super(LicensePlateDetector, self).__init__()
        self.model = model

        if torch.backends.mps.is_available():
            mps_device = torch.device("mps")
            self.model.to(mps_device)
            
        self.img_size = img_size
        self.input_queue = queue.Queue(maxsize=queue_size)
        self.output_queue = queue.Queue(maxsize=queue_size)
        self.processing_thread = threading.Thread(
            target=self._process_queue, daemon=True
        )
        self.processing_thread.start()

    def forward(self, input):
        self.input_queue.put(input)
        return self.output_queue.get()

    def _process_queue(self):
        while True:
            input_frame = self.input_queue.get()
            try:
                result = self._process_single_frame(input_frame)
                self.output_queue.put(result)
            except Exception as e:
                self.output_queue.put(None)  # Indicate error
                print(f"Error in LicensePlateDetector: {e}")
            finally:
                self.input_queue.task_done()

    def _process_single_frame(self, input):
        detected_license_plates: list = self.predict(input)
        left_license_plate_half, right_license_plate_half, bbox = self.post_process(
            detected_license_plates
        )
        # self.annotate_frame(input, bbox)
        return left_license_plate_half, right_license_plate_half, bbox

    def predict(self, frame):
        license_plate_detections: list = self.model.predict(frame, imgsz=self.img_size)
        return license_plate_detections

    def post_process(self, detections):
        UPPER_LEFT_Y, UPPER_LEFT_X, BOTTOM_RIGHT_Y, BOTTOM_RIGHT_X = 1, 0, 3, 2

        for detection in detections:
            if len(detection.boxes.xyxy) == 0:
                continue
            bbox = (
                detection.boxes.xyxy[0].cpu()
                if detection.boxes.xyxy[0] is not None
                else np.empty((0, 4))
            )
            bbox = bbox.numpy().astype(int)

            DISTANCE_X = np.abs(bbox[UPPER_LEFT_X] - bbox[BOTTOM_RIGHT_X])
            RIGHT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.15).astype(int)
            LEFT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.01).astype(int)

            midpoint_y = (bbox[UPPER_LEFT_Y] + bbox[BOTTOM_RIGHT_Y]) // 2
            midpoint_x = (bbox[UPPER_LEFT_X] + bbox[BOTTOM_RIGHT_X]) // 2

            left_half = detection.orig_img[
                midpoint_y : bbox[BOTTOM_RIGHT_Y],
                bbox[UPPER_LEFT_X] : midpoint_x - LEFT_SHIFT_PERCENTAGE,
            ]
            right_half = detection.orig_img[
                midpoint_y : bbox[BOTTOM_RIGHT_Y],
                midpoint_x : bbox[BOTTOM_RIGHT_X] - RIGHT_SHIFT_PERCENTAGE,
            ]

            return left_half, right_half, bbox

        raise Exception("No License Plate")

    def annotate_frame(self, frame, bbox):
        if bbox is not None:
            cv2.rectangle(
                frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2
            )  # Red bounding box
        return frame


## Training, Validation, and Testing may be like this:

# transform = transforms.Compose([
#     transforms.Resize((192, 192)),
#     transforms.ToTensor()
# ])

# train_dataset = LicensePlateDetectorDataset(root_dir, subset='train', transform=transform)
# valid_dataset = LicensePlateDetectorDataset(root_dir, subset='valid', transform=transform)
# test_dataset = LicensePlateDetectorDataset(root_dir, subset='test', transform=transform)

# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)
# test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# yolo_model_path = 'path/to/yolo_model.pt'
# yolo_model = YOLO(yolo_model_path)

# detector = LicensePlateDetector(yolo_model)

# detector.train(data_path=data_path, num_epochs=100)

# # Evaluate on validation set
# detector.validate(detector, valid_loader)

# # Evaluate on test set
# detector.test(detector, test_loader)