wesam0099 commited on
Commit
bd5a354
·
verified ·
1 Parent(s): acbb1f1

Upload 12 files

Browse files
DetectorReaderTEST.ipynb ADDED
The diff for this file is too large to render. See raw diff
 
LicensePlateAuthorizer.py ADDED
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import queue
2
+ import sqlite3
3
+ import threading
4
+ from typing import Dict
5
+
6
+ import cv2
7
+ import easyocr
8
+ import torch.nn as nn
9
+ from LicensePlateDetector import LicensePlateDetector
10
+ from LicensePlateReader import LicensePlateReader
11
+ from LicensePlateRecognizer import LicensePlateRecognizer
12
+ from ultralytics import YOLO
13
+
14
+
15
+ class LicensePlateAuthorizer(nn.Module):
16
+ def __init__(
17
+ self,
18
+ detector_model: str = "./models/license_plate_192.pt",
19
+ database_path: str = None,
20
+ confidence_threshold: int = 5,
21
+ queue_size: int = 10,
22
+ ):
23
+ super(LicensePlateAuthorizer, self).__init__()
24
+
25
+ detector = LicensePlateDetector(YOLO(detector_model))
26
+ reader = LicensePlateReader(easyocr.Reader(["en"], gpu=True))
27
+ self.recognizer = LicensePlateRecognizer(detector, reader)
28
+
29
+ self.database_path = database_path
30
+ self.confidence_threshold = confidence_threshold
31
+ self.ocr_result_counts: Dict[str, int] = {}
32
+ self.current_license_plate_text = None
33
+
34
+ self.input_queue = queue.Queue(maxsize=queue_size)
35
+ self.output_queue = queue.Queue(maxsize=queue_size)
36
+ self.processing_thread = threading.Thread(
37
+ target=self._process_queue, daemon=True
38
+ )
39
+ self.processing_thread.start()
40
+
41
+ def forward(self, frame):
42
+ self.input_queue.put(frame)
43
+ result = self.output_queue.get()
44
+ if result is None:
45
+ return None
46
+ return result[0]
47
+
48
+ def _process_queue(self):
49
+ while True:
50
+ frame = self.input_queue.get()
51
+ try:
52
+ result = self._process_single_frame(frame)
53
+ self.output_queue.put(result)
54
+ except Exception as e:
55
+ print(f"Error in LicensePlateAuthorizer: {e}")
56
+ self.output_queue.put(None)
57
+ finally:
58
+ self.input_queue.task_done()
59
+
60
+ def _process_single_frame(self, frame):
61
+ license_plate_text, bbox = self.recognizer.forward(frame)
62
+
63
+ if license_plate_text is None:
64
+ return frame, None, None, False
65
+
66
+ authorized = self.verify_license_plate_authorization(license_plate_text)
67
+ frame = self.annotate_frame(frame, bbox, authorized)
68
+ return frame, license_plate_text, bbox, authorized
69
+
70
+ def verify_license_plate_authorization(self, license_plate_text: str) -> bool:
71
+ if license_plate_text == "":
72
+ return False
73
+
74
+ if self.current_license_plate_text != license_plate_text:
75
+ self.current_license_plate_text = license_plate_text
76
+ self.ocr_result_counts = {}
77
+
78
+ self.ocr_result_counts[license_plate_text] = (
79
+ self.ocr_result_counts.get(license_plate_text, 0) + 1
80
+ )
81
+
82
+ if self.ocr_result_counts[license_plate_text] >= self.confidence_threshold:
83
+ if self.database_path is not None:
84
+ authorized = self.check_authorization_from_database(license_plate_text)
85
+ return authorized
86
+ return False
87
+
88
+ def check_authorization_from_database(self, license_plate: str) -> bool:
89
+ try:
90
+ conn = sqlite3.connect(self.database_path)
91
+ cursor = conn.cursor()
92
+
93
+ query = "SELECT EXISTS(SELECT 1 FROM license_plates WHERE plate = ?)"
94
+ cursor.execute(query, (license_plate,))
95
+ exists = cursor.fetchone()[0]
96
+
97
+ conn.close()
98
+ return bool(exists)
99
+ except Exception as e:
100
+ print(f"Database error: {e}")
101
+ return False
102
+
103
+ def annotate_frame(self, frame, bbox, authorized):
104
+ if bbox is not None:
105
+ color = (0, 255, 0) if authorized else (0, 0, 255)
106
+ cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
107
+ status = "Authorized" if authorized else "Unauthorized"
108
+ cv2.putText(
109
+ frame,
110
+ f"Status: {status}",
111
+ (bbox[0], bbox[1] - 10), # Top right above the bounding box
112
+ cv2.FONT_HERSHEY_SIMPLEX,
113
+ 0.9,
114
+ color,
115
+ 2,
116
+ )
117
+ return frame
118
+
119
+ def save_license_plate(self, license_plate: str) -> None:
120
+ conn = sqlite3.connect(self.database_path)
121
+ cursor = conn.cursor()
122
+
123
+ cursor.execute(
124
+ """
125
+ CREATE TABLE IF NOT EXISTS license_plates (
126
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
127
+ plate TEXT UNIQUE
128
+ )"""
129
+ )
130
+
131
+ try:
132
+ cursor.execute(
133
+ "INSERT INTO license_plates (plate) VALUES (?)", (license_plate,)
134
+ )
135
+ conn.commit()
136
+ print(f"License plate '{license_plate}' added to database.")
137
+ except sqlite3.IntegrityError:
138
+ print(
139
+ f"License plate '{license_plate}' already exists in the database, skipping."
140
+ )
141
+
142
+ conn.close()
143
+
144
+ def clear_database(self) -> None:
145
+ conn = sqlite3.connect(self.database_path)
146
+ cursor = conn.cursor()
147
+
148
+ cursor.execute("DELETE FROM license_plates")
149
+ conn.commit()
150
+ conn.close()
LicensePlateDetector.py ADDED
@@ -0,0 +1,127 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from ultralytics import YOLO
2
+ import torch.nn as nn
3
+ import torch
4
+ import numpy as np
5
+ import queue
6
+ import threading
7
+ import cv2
8
+
9
+
10
+ class LicensePlateDetector(nn.Module):
11
+ def __init__(
12
+ self,
13
+ model: YOLO | nn.Module | object,
14
+ img_size: list = [192, 192],
15
+ queue_size: int = 10,
16
+ ):
17
+ super(LicensePlateDetector, self).__init__()
18
+ self.model = model
19
+
20
+ if torch.backends.mps.is_available():
21
+ mps_device = torch.device("mps")
22
+ self.model.to(mps_device)
23
+
24
+ self.img_size = img_size
25
+ self.input_queue = queue.Queue(maxsize=queue_size)
26
+ self.output_queue = queue.Queue(maxsize=queue_size)
27
+ self.processing_thread = threading.Thread(
28
+ target=self._process_queue, daemon=True
29
+ )
30
+ self.processing_thread.start()
31
+
32
+ def forward(self, input):
33
+ self.input_queue.put(input)
34
+ return self.output_queue.get()
35
+
36
+ def _process_queue(self):
37
+ while True:
38
+ input_frame = self.input_queue.get()
39
+ try:
40
+ result = self._process_single_frame(input_frame)
41
+ self.output_queue.put(result)
42
+ except Exception as e:
43
+ self.output_queue.put(None) # Indicate error
44
+ print(f"Error in LicensePlateDetector: {e}")
45
+ finally:
46
+ self.input_queue.task_done()
47
+
48
+ def _process_single_frame(self, input):
49
+ detected_license_plates: list = self.predict(input)
50
+ left_license_plate_half, right_license_plate_half, bbox = self.post_process(
51
+ detected_license_plates
52
+ )
53
+ # self.annotate_frame(input, bbox)
54
+ return left_license_plate_half, right_license_plate_half, bbox
55
+
56
+ def predict(self, frame):
57
+ license_plate_detections: list = self.model.predict(frame, imgsz=self.img_size)
58
+ return license_plate_detections
59
+
60
+ def post_process(self, detections):
61
+ UPPER_LEFT_Y, UPPER_LEFT_X, BOTTOM_RIGHT_Y, BOTTOM_RIGHT_X = 1, 0, 3, 2
62
+
63
+ for detection in detections:
64
+ if len(detection.boxes.xyxy) == 0:
65
+ continue
66
+ bbox = (
67
+ detection.boxes.xyxy[0].cpu()
68
+ if detection.boxes.xyxy[0] is not None
69
+ else np.empty((0, 4))
70
+ )
71
+ bbox = bbox.numpy().astype(int)
72
+
73
+ DISTANCE_X = np.abs(bbox[UPPER_LEFT_X] - bbox[BOTTOM_RIGHT_X])
74
+ RIGHT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.15).astype(int)
75
+ LEFT_SHIFT_PERCENTAGE = np.round(DISTANCE_X * 0.01).astype(int)
76
+
77
+ midpoint_y = (bbox[UPPER_LEFT_Y] + bbox[BOTTOM_RIGHT_Y]) // 2
78
+ midpoint_x = (bbox[UPPER_LEFT_X] + bbox[BOTTOM_RIGHT_X]) // 2
79
+
80
+ left_half = detection.orig_img[
81
+ midpoint_y : bbox[BOTTOM_RIGHT_Y],
82
+ bbox[UPPER_LEFT_X] : midpoint_x - LEFT_SHIFT_PERCENTAGE,
83
+ ]
84
+ right_half = detection.orig_img[
85
+ midpoint_y : bbox[BOTTOM_RIGHT_Y],
86
+ midpoint_x : bbox[BOTTOM_RIGHT_X] - RIGHT_SHIFT_PERCENTAGE,
87
+ ]
88
+
89
+ return left_half, right_half, bbox
90
+
91
+ raise Exception("No License Plate")
92
+
93
+ def annotate_frame(self, frame, bbox):
94
+ if bbox is not None:
95
+ cv2.rectangle(
96
+ frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2
97
+ ) # Red bounding box
98
+ return frame
99
+
100
+
101
+ ## Training, Validation, and Testing may be like this:
102
+
103
+ # transform = transforms.Compose([
104
+ # transforms.Resize((192, 192)),
105
+ # transforms.ToTensor()
106
+ # ])
107
+
108
+ # train_dataset = LicensePlateDetectorDataset(root_dir, subset='train', transform=transform)
109
+ # valid_dataset = LicensePlateDetectorDataset(root_dir, subset='valid', transform=transform)
110
+ # test_dataset = LicensePlateDetectorDataset(root_dir, subset='test', transform=transform)
111
+
112
+ # train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
113
+ # valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)
114
+ # test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
115
+
116
+ # yolo_model_path = 'path/to/yolo_model.pt'
117
+ # yolo_model = YOLO(yolo_model_path)
118
+
119
+ # detector = LicensePlateDetector(yolo_model)
120
+
121
+ # detector.train(data_path=data_path, num_epochs=100)
122
+
123
+ # # Evaluate on validation set
124
+ # detector.validate(detector, valid_loader)
125
+
126
+ # # Evaluate on test set
127
+ # detector.test(detector, test_loader)
LicensePlateDetectorDataset.py ADDED
@@ -0,0 +1,170 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from PIL import Image
2
+ import os
3
+ from torchvision import transforms
4
+ from torch.utils.data import Dataset
5
+ import os
6
+ import cv2
7
+ import numpy as np
8
+ import albumentations as A
9
+ from tqdm import tqdm # tqdm provides a nice progress bar
10
+
11
+ # TODO: Load Files From Google Drive
12
+
13
+
14
+
15
+ class LicensePlateDetectorDataset(Dataset):
16
+ ZOOM_AUGMENTATION = "zoom"
17
+ GRAYSCALE_AUGMENTATION = "grayscale"
18
+ FOG_AUGMENTATION = "foggy"
19
+ RAIN_AUGMENTATION = "rainy"
20
+ SALT_PEPPER_AUGMENTATION = "saltandpepper"
21
+ BLUR_AUGMENTATION = "blur"
22
+
23
+ def __init__(self, root_dir, subset: str = 'train', transform: transforms.Compose = None, augmentations = {
24
+ "zoom": [0.5],
25
+ "grayscale": [],
26
+ "foggy": [],
27
+ "rainy": [],
28
+ "saltandpepper": [0.01, 0.01],
29
+ "blur": [5],
30
+ }):
31
+ self.root_dir = os.path.join(root_dir, subset)
32
+ self.folder_subset = subset
33
+ self.transform = transform
34
+ self.image_paths = []
35
+ self.labels = []
36
+
37
+ self.augmentations_functions = {
38
+ LicensePlateDetectorDataset.ZOOM_AUGMENTATION: self.apply_zoom,
39
+ LicensePlateDetectorDataset.GRAYSCALE_AUGMENTATION: self.apply_grayscale,
40
+ LicensePlateDetectorDataset.FOG_AUGMENTATION: self.apply_fog,
41
+ LicensePlateDetectorDataset.RAIN_AUGMENTATION: self.apply_rain,
42
+ LicensePlateDetectorDataset.SALT_PEPPER_AUGMENTATION: self.apply_salt_and_pepper,
43
+ LicensePlateDetectorDataset.BLUR_AUGMENTATION: self.apply_gaussian_blur,
44
+ }
45
+
46
+ self.augmentations = augmentations
47
+
48
+ image_dir = os.path.join(self.root_dir, 'images')
49
+ label_dir = os.path.join(self.root_dir, 'labels')
50
+
51
+ # Count total number of images/files
52
+ total_files = sum(1 for file in os.listdir(image_dir) if file.endswith(('.png', '.jpg', '.jpeg')))
53
+
54
+ progress_bar = tqdm(total=total_files, desc='Initializing Files', unit='image')
55
+
56
+ for img_file in os.listdir(image_dir):
57
+ if img_file.endswith(('.png', '.jpg', '.jpeg')):
58
+ self.image_paths.append(os.path.join(image_dir, img_file))
59
+ progress_bar.update(1) # Update progress bar
60
+
61
+ print("Image:", img_file)
62
+
63
+ label_file = os.path.join(label_dir, img_file.replace('.jpg', '.txt'))
64
+ self.labels.append(label_file)
65
+ print("Label:", label_file)
66
+
67
+ progress_bar.close()
68
+
69
+ def __len__(self):
70
+ return len(self.image_paths)
71
+
72
+ def __getitem__(self, idx):
73
+ img_path = self.image_paths[idx]
74
+ image = Image.open(img_path).convert('RGB')
75
+ label_path = self.labels[idx]
76
+
77
+ if self.transform:
78
+ image = self.transform(image)
79
+
80
+ with open(label_path, 'r') as f:
81
+ label = f.readline().strip()
82
+
83
+ return image, label
84
+
85
+ def process_images(self, output_folder: str, images_limit=-1):
86
+ """Apply a given processing function to all images in the input folder and save the results to the output folder."""
87
+ os.makedirs(output_folder, exist_ok=True)
88
+ count = 0 # Initialize a counter for iterations
89
+
90
+ # Count total number of images/files
91
+ total_files = sum(1 for file in os.listdir(os.path.join(self.root_dir, "images")) if file.endswith(('.png', '.jpg', '.jpeg'))) if images_limit == -1 else images_limit
92
+
93
+ progress_bar = tqdm(total=total_files, desc='Applying Augmentation to Images', unit='image')
94
+
95
+ for filename in os.listdir(os.path.join(self.root_dir, "images")):
96
+ if filename.endswith((".jpg", ".jpeg", ".png")):
97
+ image_path = os.path.join(self.root_dir, "images", filename)
98
+ image = cv2.imread(image_path)
99
+ for augmentation_name, augmentation_options in self.augmentations.items():
100
+ processed_image = self.augmentations_functions[augmentation_name](image, *augmentation_options)
101
+ augmentation_folder = os.path.join(output_folder, augmentation_name)
102
+ os.makedirs(augmentation_folder, exist_ok=True) # Ensure augmentation_folder exists
103
+ output_path = os.path.join(augmentation_folder, filename)
104
+ cv2.imwrite(output_path, processed_image)
105
+
106
+ print("Image Processed with all augmentations: ", os.path.join(output_folder, augmentation_name, filename))
107
+ progress_bar.update(1) # Update progress bar
108
+
109
+ count += 1 # Increment the counter
110
+ if count >= images_limit and images_limit != -1: # Dont enter the loop if limit is not specified (-1 means until finish)
111
+ print("Reached Limit Specified", images_limit)
112
+ break # Exit the loop if the limit is reached
113
+
114
+ progress_bar.close()
115
+
116
+ def apply_zoom(self, image: np.ndarray, scale_factor: float) -> np.ndarray:
117
+ """Apply zoom to the image."""
118
+ height, width = image.shape[:2]
119
+ center = (width // 2, height // 2)
120
+ matrix = cv2.getRotationMatrix2D(center, 0, scale_factor)
121
+ zoomed_image = cv2.warpAffine(image, matrix, (width, height))
122
+ return zoomed_image
123
+
124
+ def apply_grayscale(self, image: np.ndarray) -> np.ndarray:
125
+ """Convert image to grayscale."""
126
+ return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
127
+
128
+ def apply_gaussian_blur(self, image: np.ndarray, kernel_size: int) -> np.ndarray:
129
+ """Apply Gaussian blur to the image."""
130
+ return cv2.GaussianBlur(image, (kernel_size, kernel_size), 0)
131
+
132
+ def apply_salt_and_pepper(self, image: np.ndarray, salt_prob: float, pepper_prob: float) -> np.ndarray:
133
+ """Apply salt and pepper noise to the image."""
134
+ noisy_image = np.copy(image)
135
+ row, col, _ = noisy_image.shape
136
+ salt_pixels = np.random.rand(row, col) < salt_prob
137
+ noisy_image[salt_pixels] = 255
138
+ pepper_pixels = np.random.rand(row, col) < pepper_prob
139
+ noisy_image[pepper_pixels] = 0
140
+ return noisy_image
141
+
142
+ def apply_fog(self, image: np.ndarray) -> np.ndarray:
143
+ """Apply fog effect to the image."""
144
+ transform = A.Compose([A.RandomFog(fog_coef_lower=0.3, fog_coef_upper=0.5, alpha_coef=0.3, p=1)])
145
+ transformed = transform(image=image)
146
+ return transformed['image']
147
+
148
+ def apply_rain(self, image: np.ndarray) -> np.ndarray:
149
+ """Apply rain effect to the image."""
150
+ transform = A.Compose([A.RandomRain(brightness_coefficient=0.9, drop_width=1, blur_value=5, p=1)])
151
+ transformed = transform(image=image)
152
+ return transformed['image']
153
+
154
+ def auto_annotate(self):
155
+ pass
156
+ ## Example Usage:
157
+ # transform = transforms.Compose([
158
+ # transforms.Resize((192, 192)),
159
+ # transforms.ToTensor()
160
+ # ])
161
+ # augmentations = {
162
+ # LicensePlateDetectorDataset.ZOOM_AUGMENTATION: [15],
163
+ # LicensePlateDetectorDataset.GRAYSCALE_AUGMENTATION: [],
164
+ # LicensePlateDetectorDataset.FOG_AUGMENTATION: [],
165
+ # LicensePlateDetectorDataset.RAIN_AUGMENTATION: [],
166
+ # LicensePlateDetectorDataset.SALT_PEPPER_AUGMENTATION: [0,0],
167
+ # LicensePlateDetectorDataset.BLUR_AUGMENTATION: [15],
168
+ # }
169
+ # lpdd = LicensePlateDetectorDataset("./License Plate YOLOv8/", subset="train", transform=transform, augmentations=augmentations)
170
+ # lpdd.process_images("./License Plate YOLOv8/augmented_images", images_limit=20) # remove images_limit or make it -1 to loop over all images
LicensePlateReader.py ADDED
@@ -0,0 +1,185 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import queue
2
+ import re
3
+ import threading
4
+ from concurrent.futures import ThreadPoolExecutor
5
+ from typing import Dict, Optional
6
+
7
+ import cv2
8
+ import easyocr
9
+ import numpy as np
10
+ import torch
11
+ import torch.nn as nn
12
+
13
+
14
+ class LicensePlateReader(nn.Module):
15
+ def __init__(
16
+ self,
17
+ model,
18
+ char_to_num_mappings: Optional[Dict[str, str]] = None,
19
+ num_to_char_mappings: Optional[Dict[str, str]] = None,
20
+ confidence: float = 0.30,
21
+ queue_size: int = 10,
22
+ ):
23
+ """
24
+ Initialize the LicensePlateReader with the given model and mappings.
25
+ Args:.
26
+ model OCR model for reading text.
27
+ char_to_num_mappings Mappings from characters to numbers.
28
+ num_to_char_mappings Mappings from numbers to characters.
29
+ confidence threshold for accepting OCR results.
30
+ """
31
+ super(LicensePlateReader, self).__init__()
32
+
33
+ # Initializing
34
+ self.char_to_num_mappings = char_to_num_mappings or {
35
+ "L": "4",
36
+ "D": "0",
37
+ "S": "5",
38
+ "Z": "2",
39
+ "B": "8",
40
+ "C": "0",
41
+ }
42
+ self.num_to_char_mappings = num_to_char_mappings or {
43
+ "2": "Z",
44
+ "4": "A",
45
+ "6": "G",
46
+ "5": "S",
47
+ "0": "D",
48
+ "7": "T",
49
+ "8": "B",
50
+ }
51
+ self.model = model
52
+ self.confidence = confidence
53
+
54
+ self.input_queue = queue.Queue(maxsize=queue_size)
55
+ self.output_queue = queue.Queue(maxsize=queue_size)
56
+ self.executor = ThreadPoolExecutor(max_workers=2)
57
+ self.processing_thread = threading.Thread(
58
+ target=self._process_queue, daemon=True
59
+ )
60
+ self.processing_thread.start()
61
+
62
+ def forward(self, numbers_side: np.ndarray, letters_side: np.ndarray) -> str:
63
+ self.input_queue.put((numbers_side, letters_side))
64
+ return self.output_queue.get()
65
+
66
+ def _process_queue(self):
67
+ while True:
68
+ numbers_side, letters_side = self.input_queue.get()
69
+ result = self._process_single_plate(numbers_side, letters_side)
70
+ self.output_queue.put(result)
71
+ self.input_queue.task_done()
72
+
73
+ def _process_single_plate(
74
+ self, numbers_side: np.ndarray, letters_side: np.ndarray
75
+ ) -> str:
76
+ future_preprocessed_numbers = self.executor.submit(
77
+ self._pre_process, numbers_side
78
+ )
79
+ future_preprocessed_letters = self.executor.submit(
80
+ self._pre_process, letters_side
81
+ )
82
+
83
+ preprocessed_numbers_side = future_preprocessed_numbers.result()
84
+ preprocessed_letters_side = future_preprocessed_letters.result()
85
+
86
+ future_extracted_numbers = self.executor.submit(
87
+ self.predict, preprocessed_numbers_side
88
+ )
89
+ future_extracted_letters = self.executor.submit(
90
+ self.predict, preprocessed_letters_side
91
+ )
92
+
93
+ extracted_numbers_side = future_extracted_numbers.result()
94
+ extracted_letters_side = future_extracted_letters.result()
95
+
96
+ future_postprocessed_numbers = self.executor.submit(
97
+ self._post_process, extracted_numbers_side, True
98
+ )
99
+ future_postprocessed_letters = self.executor.submit(
100
+ self._post_process, extracted_letters_side, False
101
+ )
102
+
103
+ postprocessed_numbers_side = future_postprocessed_numbers.result()
104
+ postprocessed_letters_side = future_postprocessed_letters.result()
105
+
106
+ return postprocessed_numbers_side + "" + postprocessed_letters_side
107
+
108
+ def _pre_process(self, frame: np.ndarray) -> np.ndarray:
109
+ """
110
+ Preprocess the input frame by blurring, grayscaling, and thresholding.
111
+ Args:
112
+ frame Input image frame.
113
+ Returns:
114
+ Preprocessed binary image.
115
+ """
116
+ # Blurring
117
+ # blurred_frame = cv2.GaussianBlur(frame, (3, 3), 0)
118
+
119
+ # greyscaleing
120
+ greyscaled_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
121
+
122
+ # thresholding white and black
123
+ _, binary_frame = cv2.threshold(
124
+ greyscaled_frame, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
125
+ )
126
+ return binary_frame
127
+
128
+ def predict(self, frame: np.ndarray) -> str:
129
+ """
130
+ Predict text from the preprocessed frame using the OCR model.
131
+ Args:
132
+ frame Preprocessed image frame.
133
+ Returns:
134
+ Extracted text if confidence is above threshold, else an empty string.
135
+ """
136
+ # OCR model extraction
137
+ extraction = self.model.readtext(frame)
138
+
139
+ # loop all text extractions that above the confidence
140
+ for _, text, confidence in extraction:
141
+ if confidence > self.confidence:
142
+ return extraction[-1][1]
143
+
144
+ # Error handling
145
+ return "" # raise Exception No OCR reading
146
+
147
+ def _post_process(self, extracted_text: str, is_numbers: bool) -> str:
148
+ if not extracted_text:
149
+ return ""
150
+
151
+ if is_numbers:
152
+ result = extracted_text.strip()
153
+ result = "".join(
154
+ self.char_to_num_mappings.get(char, char) for char in result
155
+ )
156
+
157
+ result = "".join(re.findall(r"\b([0-9]{1,4})\b", result))
158
+ # if not result.isdigit() or len(result) > 4:
159
+ # return ""
160
+ return result
161
+ else:
162
+ result = extracted_text.strip().upper()
163
+ result = "".join(
164
+ self.num_to_char_mappings.get(char, char) for char in result
165
+ )
166
+ result = "".join(re.findall(r"[A-Z]{3}", result))
167
+ if len(result) != 3:
168
+ return ""
169
+ return result
170
+
171
+ def annotate_frame(self, frame, bbox, extracted_text):
172
+ if bbox is not None:
173
+ color = (0, 255, 0) if extracted_text else (0, 0, 255)
174
+ label = "No Extraction" if not extracted_text else extracted_text
175
+ cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
176
+ cv2.putText(
177
+ frame,
178
+ f"{label}",
179
+ (bbox[0], bbox[1] - 10), # Top left above the bounding box
180
+ cv2.FONT_HERSHEY_SIMPLEX,
181
+ 0.9,
182
+ color,
183
+ 2,
184
+ )
185
+ return frame
LicensePlateRecognizer.py ADDED
@@ -0,0 +1,64 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import queue
2
+ import threading
3
+ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
4
+
5
+ import torch.nn as nn
6
+
7
+ if TYPE_CHECKING:
8
+ from LicensePlateDetector import LicensePlateDetector
9
+ from LicensePlateReader import LicensePlateReader
10
+
11
+
12
+ class LicensePlateRecognizer(nn.Module):
13
+ def __init__(
14
+ self,
15
+ license_detector: "LicensePlateDetector",
16
+ license_reader: "LicensePlateReader",
17
+ queue_size: int = 10,
18
+ ):
19
+ super(LicensePlateRecognizer, self).__init__()
20
+ self.license_detector = license_detector
21
+ self.license_reader = license_reader
22
+ self.confidence = 0.80
23
+
24
+ self.input_queue = queue.Queue(maxsize=queue_size)
25
+ self.output_queue = queue.Queue(maxsize=queue_size)
26
+ self.processing_thread = threading.Thread(
27
+ target=self._process_queue, daemon=True
28
+ )
29
+ self.processing_thread.start()
30
+
31
+ def forward(self, frame):
32
+ self.input_queue.put(frame)
33
+ result = self.output_queue.get()
34
+ if result == (None, None):
35
+ return None, None
36
+ return result
37
+
38
+ def _process_queue(self):
39
+ while True:
40
+ frame = self.input_queue.get()
41
+ try:
42
+ result = self._process_single_frame(frame)
43
+ self.output_queue.put(result)
44
+ except Exception as e:
45
+ print(f"Error in LicensePlateRecognizer: {e}")
46
+ self.output_queue.put((None, None))
47
+ finally:
48
+ self.input_queue.task_done()
49
+
50
+ def _process_single_frame(self, frame):
51
+ detector_result = self.license_detector.forward(frame)
52
+ if detector_result is None:
53
+ raise Exception("No license plate detected")
54
+
55
+ left_license_plate_half, right_license_plate_half, bbox = detector_result
56
+
57
+ extracted_license = self.license_reader.forward(
58
+ left_license_plate_half, right_license_plate_half
59
+ )
60
+
61
+ if not extracted_license:
62
+ raise Exception("Failed to read license plate")
63
+
64
+ return extracted_license, bbox
client_secrets.json ADDED
@@ -0,0 +1 @@
 
 
1
+ {"installed":{"client_id":"1072143795553-0fudoqtd2p40nvfdpltvnaf06h03rdnq.apps.googleusercontent.com","project_id":"dunes-aero-backup-service","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"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","redirect_uris":["http://localhost"]}}
hello_world.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # from LicensePlateDetector import LicensePlateDetector
2
+ # from LicensePlateReader import LicensePlateReader
3
+ # from LicensePlateRecognizer import LicensePlateRecognizer
4
+ # from LicensePlateAuthorizer import LicensePlateAuthorizer
5
+
6
+ # from ultralytics import YOLO
7
+ # import cv2
8
+ # import easyocr
9
+ # import numpy as np
10
+ # import re
11
+ # from utils import *
12
+ # from fastapi import FastAPI, Response
13
+ # from fastapi.responses import StreamingResponse
14
+ # import uvicorn
15
+ # import io
16
+ # from starlette.responses import HTMLResponse
17
+
18
+ # lpd = LicensePlateDetector(YOLO("license_plate_192.pt")) #
19
+ # lpr = LicensePlateReader(easyocr.Reader(['en'], gpu=False))
20
+ # lpz = LicensePlateRecognizer(lpd, lpr)
21
+ # lpa = LicensePlateAuthorizer(datebase_path="./database.db")
22
+
23
+ # cap = cv2.VideoCapture(0)
24
+ # color = (255,0,0)
25
+ # while True:
26
+ # rat , frame = video.read()
27
+ # if rat:
28
+ # # frame = cv2.flip(frame, 1) # if camera reverses image
29
+ # extracted_plate , bbox = lpz.forward(frame)
30
+ # # Authoriztion
31
+ # if lpa.verify_license_plate_authorization(extracted_plate):
32
+ # color = (0,255,0)
33
+ # frame = cv2.rectangle(frame, (bbox[0],bbox[1]),(bbox[2],bbox[3]), color , 2)
34
+ # # print(extracted_plate)
35
+ # # _, frame = cv2.imencode('.jpeg', frame) # for JupyterNotebook
36
+
37
+ # cap.release()
38
+
39
+
40
+ print("Hello World from DALPA, Dunes Aero License Plate Authorizor.")
license_plate_192.pt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:dcc21d67e4a65dd84aacd5fd4687ecc5b867350afb046835551d6e1d572ab734
3
+ size 22470894
main.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import cv2 as cv
3
+
4
+ from LicensePlateAuthorizer import LicensePlateAuthorizer
5
+
6
+ cap = cv.VideoCapture(0)
7
+ lpa = LicensePlateAuthorizer()
8
+
9
+ if not cap.isOpened():
10
+ print("Cannot open camera")
11
+ exit()
12
+
13
+ while True:
14
+ ret, frame = cap.read()
15
+ if not ret:
16
+ print("Can't receive frame (stream end?). Exiting ...")
17
+ break
18
+
19
+ processed_frame = lpa.forward(frame)
20
+ if processed_frame is None:
21
+ print("Processing error or no license plate detected. Skipping frame.")
22
+ continue
23
+
24
+ cv.imshow('frame', processed_frame)
25
+ if cv.waitKey(1) == ord('q'):
26
+ break
27
+
28
+ cap.release()
29
+ cv.destroyAllWindows()
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ streamlit
2
+ torch
3
+ opencv-python-headless
4
+ easyocr
5
+ ultralytics
test.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ from pydrive.auth import GoogleAuth
2
+ from pydrive.drive import GoogleDrive
3
+
4
+ gauth = GoogleAuth()
5
+ gauth.LocalWebserverAuth()
6
+
7
+ drive = GoogleDrive(gauth)