| import queue | |
| import threading | |
| from typing import TYPE_CHECKING, Dict, List, Optional, Tuple | |
| import torch.nn as nn | |
| if TYPE_CHECKING: | |
| from LicensePlateDetector import LicensePlateDetector | |
| from LicensePlateReader import LicensePlateReader | |
| class LicensePlateRecognizer(nn.Module): | |
| def __init__( | |
| self, | |
| license_detector: "LicensePlateDetector", | |
| license_reader: "LicensePlateReader", | |
| queue_size: int = 10, | |
| ): | |
| super(LicensePlateRecognizer, self).__init__() | |
| self.license_detector = license_detector | |
| self.license_reader = license_reader | |
| self.confidence = 0.80 | |
| self.input_queue = queue.Queue(maxsize=queue_size) | |
| self.output_queue = queue.Queue(maxsize=queue_size) | |
| self.processing_thread = threading.Thread( | |
| target=self._process_queue, daemon=True | |
| ) | |
| self.processing_thread.start() | |
| def forward(self, frame): | |
| self.input_queue.put(frame) | |
| result = self.output_queue.get() | |
| if result == (None, None): | |
| return None, None | |
| return result | |
| def _process_queue(self): | |
| while True: | |
| frame = self.input_queue.get() | |
| try: | |
| result = self._process_single_frame(frame) | |
| self.output_queue.put(result) | |
| except Exception as e: | |
| print(f"Error in LicensePlateRecognizer: {e}") | |
| self.output_queue.put((None, None)) | |
| finally: | |
| self.input_queue.task_done() | |
| def _process_single_frame(self, frame): | |
| detector_result = self.license_detector.forward(frame) | |
| if detector_result is None: | |
| raise Exception("No license plate detected") | |
| left_license_plate_half, right_license_plate_half, bbox = detector_result | |
| extracted_license = self.license_reader.forward( | |
| left_license_plate_half, right_license_plate_half | |
| ) | |
| if not extracted_license: | |
| raise Exception("Failed to read license plate") | |
| return extracted_license, bbox | |