|
|
import queue |
|
|
import sqlite3 |
|
|
import threading |
|
|
from typing import Dict |
|
|
|
|
|
import cv2 |
|
|
import easyocr |
|
|
import torch.nn as nn |
|
|
from LicensePlateDetector import LicensePlateDetector |
|
|
from LicensePlateReader import LicensePlateReader |
|
|
from LicensePlateRecognizer import LicensePlateRecognizer |
|
|
from ultralytics import YOLO |
|
|
|
|
|
|
|
|
class LicensePlateAuthorizer(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
detector_model: str = "./license_plate_192.pt", |
|
|
database_path: str = None, |
|
|
confidence_threshold: int = 5, |
|
|
queue_size: int = 10, |
|
|
): |
|
|
super(LicensePlateAuthorizer, self).__init__() |
|
|
|
|
|
detector = LicensePlateDetector(YOLO(detector_model)) |
|
|
reader = LicensePlateReader(easyocr.Reader(["en"], gpu=True)) |
|
|
self.recognizer = LicensePlateRecognizer(detector, reader) |
|
|
|
|
|
self.database_path = database_path |
|
|
self.confidence_threshold = confidence_threshold |
|
|
self.ocr_result_counts: Dict[str, int] = {} |
|
|
self.current_license_plate_text = None |
|
|
|
|
|
self.input_queue = queue.Queue(maxsize=queue_size) |
|
|
self.output_queue = queue.Queue(maxsize=queue_size) |
|
|
self.processing_thread = threading.Thread( |
|
|
target=self._process_queue, daemon=True |
|
|
) |
|
|
self.processing_thread.start() |
|
|
|
|
|
def forward(self, frame): |
|
|
self.input_queue.put(frame) |
|
|
result = self.output_queue.get() |
|
|
if result is None: |
|
|
return None |
|
|
return result[0] |
|
|
|
|
|
def _process_queue(self): |
|
|
while True: |
|
|
frame = self.input_queue.get() |
|
|
try: |
|
|
result = self._process_single_frame(frame) |
|
|
self.output_queue.put(result) |
|
|
except Exception as e: |
|
|
print(f"Error in LicensePlateAuthorizer: {e}") |
|
|
self.output_queue.put(None) |
|
|
finally: |
|
|
self.input_queue.task_done() |
|
|
|
|
|
def _process_single_frame(self, frame): |
|
|
license_plate_text, bbox = self.recognizer.forward(frame) |
|
|
|
|
|
if license_plate_text is None: |
|
|
return frame, None, None, False |
|
|
|
|
|
authorized = self.verify_license_plate_authorization(license_plate_text) |
|
|
frame = self.annotate_frame(frame, bbox, authorized) |
|
|
return frame, license_plate_text, bbox, authorized |
|
|
|
|
|
def verify_license_plate_authorization(self, license_plate_text: str) -> bool: |
|
|
if license_plate_text == "": |
|
|
return False |
|
|
|
|
|
if self.current_license_plate_text != license_plate_text: |
|
|
self.current_license_plate_text = license_plate_text |
|
|
self.ocr_result_counts = {} |
|
|
|
|
|
self.ocr_result_counts[license_plate_text] = ( |
|
|
self.ocr_result_counts.get(license_plate_text, 0) + 1 |
|
|
) |
|
|
|
|
|
if self.ocr_result_counts[license_plate_text] >= self.confidence_threshold: |
|
|
if self.database_path is not None: |
|
|
authorized = self.check_authorization_from_database(license_plate_text) |
|
|
return authorized |
|
|
return False |
|
|
|
|
|
def check_authorization_from_database(self, license_plate: str) -> bool: |
|
|
try: |
|
|
conn = sqlite3.connect(self.database_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
query = "SELECT EXISTS(SELECT 1 FROM license_plates WHERE plate = ?)" |
|
|
cursor.execute(query, (license_plate,)) |
|
|
exists = cursor.fetchone()[0] |
|
|
|
|
|
conn.close() |
|
|
return bool(exists) |
|
|
except Exception as e: |
|
|
print(f"Database error: {e}") |
|
|
return False |
|
|
|
|
|
def annotate_frame(self, frame, bbox, authorized): |
|
|
if bbox is not None: |
|
|
color = (0, 255, 0) if authorized else (0, 0, 255) |
|
|
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) |
|
|
status = "Authorized" if authorized else "Unauthorized" |
|
|
cv2.putText( |
|
|
frame, |
|
|
f"Status: {status}", |
|
|
(bbox[0], bbox[1] - 10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, |
|
|
0.9, |
|
|
color, |
|
|
2, |
|
|
) |
|
|
return frame |
|
|
|
|
|
def save_license_plate(self, license_plate: str) -> None: |
|
|
conn = sqlite3.connect(self.database_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute( |
|
|
""" |
|
|
CREATE TABLE IF NOT EXISTS license_plates ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
plate TEXT UNIQUE |
|
|
)""" |
|
|
) |
|
|
|
|
|
try: |
|
|
cursor.execute( |
|
|
"INSERT INTO license_plates (plate) VALUES (?)", (license_plate,) |
|
|
) |
|
|
conn.commit() |
|
|
print(f"License plate '{license_plate}' added to database.") |
|
|
except sqlite3.IntegrityError: |
|
|
print( |
|
|
f"License plate '{license_plate}' already exists in the database, skipping." |
|
|
) |
|
|
|
|
|
conn.close() |
|
|
|
|
|
def clear_database(self) -> None: |
|
|
conn = sqlite3.connect(self.database_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute("DELETE FROM license_plates") |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|