|
|
import queue |
|
|
import re |
|
|
import threading |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from typing import Dict, Optional |
|
|
|
|
|
import cv2 |
|
|
import easyocr |
|
|
import numpy as np |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
|
|
|
|
|
|
class LicensePlateReader(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
model, |
|
|
char_to_num_mappings: Optional[Dict[str, str]] = None, |
|
|
num_to_char_mappings: Optional[Dict[str, str]] = None, |
|
|
confidence: float = 0.30, |
|
|
queue_size: int = 10, |
|
|
): |
|
|
""" |
|
|
Initialize the LicensePlateReader with the given model and mappings. |
|
|
Args:. |
|
|
model OCR model for reading text. |
|
|
char_to_num_mappings Mappings from characters to numbers. |
|
|
num_to_char_mappings Mappings from numbers to characters. |
|
|
confidence threshold for accepting OCR results. |
|
|
""" |
|
|
super(LicensePlateReader, self).__init__() |
|
|
|
|
|
|
|
|
self.char_to_num_mappings = char_to_num_mappings or { |
|
|
"L": "4", |
|
|
"D": "0", |
|
|
"S": "5", |
|
|
"Z": "2", |
|
|
"B": "8", |
|
|
"C": "0", |
|
|
} |
|
|
self.num_to_char_mappings = num_to_char_mappings or { |
|
|
"2": "Z", |
|
|
"4": "A", |
|
|
"6": "G", |
|
|
"5": "S", |
|
|
"0": "D", |
|
|
"7": "T", |
|
|
"8": "B", |
|
|
} |
|
|
self.model = model |
|
|
self.confidence = confidence |
|
|
|
|
|
self.input_queue = queue.Queue(maxsize=queue_size) |
|
|
self.output_queue = queue.Queue(maxsize=queue_size) |
|
|
self.executor = ThreadPoolExecutor(max_workers=2) |
|
|
self.processing_thread = threading.Thread( |
|
|
target=self._process_queue, daemon=True |
|
|
) |
|
|
self.processing_thread.start() |
|
|
|
|
|
def forward(self, numbers_side: np.ndarray, letters_side: np.ndarray) -> str: |
|
|
self.input_queue.put((numbers_side, letters_side)) |
|
|
return self.output_queue.get() |
|
|
|
|
|
def _process_queue(self): |
|
|
while True: |
|
|
numbers_side, letters_side = self.input_queue.get() |
|
|
result = self._process_single_plate(numbers_side, letters_side) |
|
|
self.output_queue.put(result) |
|
|
self.input_queue.task_done() |
|
|
|
|
|
def _process_single_plate( |
|
|
self, numbers_side: np.ndarray, letters_side: np.ndarray |
|
|
) -> str: |
|
|
future_preprocessed_numbers = self.executor.submit( |
|
|
self._pre_process, numbers_side |
|
|
) |
|
|
future_preprocessed_letters = self.executor.submit( |
|
|
self._pre_process, letters_side |
|
|
) |
|
|
|
|
|
preprocessed_numbers_side = future_preprocessed_numbers.result() |
|
|
preprocessed_letters_side = future_preprocessed_letters.result() |
|
|
|
|
|
future_extracted_numbers = self.executor.submit( |
|
|
self.predict, preprocessed_numbers_side |
|
|
) |
|
|
future_extracted_letters = self.executor.submit( |
|
|
self.predict, preprocessed_letters_side |
|
|
) |
|
|
|
|
|
extracted_numbers_side = future_extracted_numbers.result() |
|
|
extracted_letters_side = future_extracted_letters.result() |
|
|
|
|
|
future_postprocessed_numbers = self.executor.submit( |
|
|
self._post_process, extracted_numbers_side, True |
|
|
) |
|
|
future_postprocessed_letters = self.executor.submit( |
|
|
self._post_process, extracted_letters_side, False |
|
|
) |
|
|
|
|
|
postprocessed_numbers_side = future_postprocessed_numbers.result() |
|
|
postprocessed_letters_side = future_postprocessed_letters.result() |
|
|
|
|
|
return postprocessed_numbers_side + "" + postprocessed_letters_side |
|
|
|
|
|
def _pre_process(self, frame: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Preprocess the input frame by blurring, grayscaling, and thresholding. |
|
|
Args: |
|
|
frame Input image frame. |
|
|
Returns: |
|
|
Preprocessed binary image. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
greyscaled_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
_, binary_frame = cv2.threshold( |
|
|
greyscaled_frame, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
|
|
) |
|
|
return binary_frame |
|
|
|
|
|
def predict(self, frame: np.ndarray) -> str: |
|
|
""" |
|
|
Predict text from the preprocessed frame using the OCR model. |
|
|
Args: |
|
|
frame Preprocessed image frame. |
|
|
Returns: |
|
|
Extracted text if confidence is above threshold, else an empty string. |
|
|
""" |
|
|
|
|
|
extraction = self.model.readtext(frame) |
|
|
|
|
|
|
|
|
for _, text, confidence in extraction: |
|
|
if confidence > self.confidence: |
|
|
return extraction[-1][1] |
|
|
|
|
|
|
|
|
return "" |
|
|
|
|
|
def _post_process(self, extracted_text: str, is_numbers: bool) -> str: |
|
|
if not extracted_text: |
|
|
return "" |
|
|
|
|
|
if is_numbers: |
|
|
result = extracted_text.strip() |
|
|
result = "".join( |
|
|
self.char_to_num_mappings.get(char, char) for char in result |
|
|
) |
|
|
|
|
|
result = "".join(re.findall(r"\b([0-9]{1,4})\b", result)) |
|
|
|
|
|
|
|
|
return result |
|
|
else: |
|
|
result = extracted_text.strip().upper() |
|
|
result = "".join( |
|
|
self.num_to_char_mappings.get(char, char) for char in result |
|
|
) |
|
|
result = "".join(re.findall(r"[A-Z]{3}", result)) |
|
|
if len(result) != 3: |
|
|
return "" |
|
|
return result |
|
|
|
|
|
def annotate_frame(self, frame, bbox, extracted_text): |
|
|
if bbox is not None: |
|
|
color = (0, 255, 0) if extracted_text else (0, 0, 255) |
|
|
label = "No Extraction" if not extracted_text else extracted_text |
|
|
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) |
|
|
cv2.putText( |
|
|
frame, |
|
|
f"{label}", |
|
|
(bbox[0], bbox[1] - 10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, |
|
|
0.9, |
|
|
color, |
|
|
2, |
|
|
) |
|
|
return frame |
|
|
|