detect / LicensePlateAuthorizer.py
wesam0099's picture
Update LicensePlateAuthorizer.py
bdfea3b verified
import queue
import sqlite3
import threading
from typing import Dict
import cv2
import easyocr
import torch.nn as nn
from LicensePlateDetector import LicensePlateDetector
from LicensePlateReader import LicensePlateReader
from LicensePlateRecognizer import LicensePlateRecognizer
from ultralytics import YOLO
class LicensePlateAuthorizer(nn.Module):
def __init__(
self,
detector_model: str = "./license_plate_192.pt",
database_path: str = None,
confidence_threshold: int = 5,
queue_size: int = 10,
):
super(LicensePlateAuthorizer, self).__init__()
detector = LicensePlateDetector(YOLO(detector_model))
reader = LicensePlateReader(easyocr.Reader(["en"], gpu=True))
self.recognizer = LicensePlateRecognizer(detector, reader)
self.database_path = database_path
self.confidence_threshold = confidence_threshold
self.ocr_result_counts: Dict[str, int] = {}
self.current_license_plate_text = None
self.input_queue = queue.Queue(maxsize=queue_size)
self.output_queue = queue.Queue(maxsize=queue_size)
self.processing_thread = threading.Thread(
target=self._process_queue, daemon=True
)
self.processing_thread.start()
def forward(self, frame):
self.input_queue.put(frame)
result = self.output_queue.get()
if result is None:
return None
return result[0]
def _process_queue(self):
while True:
frame = self.input_queue.get()
try:
result = self._process_single_frame(frame)
self.output_queue.put(result)
except Exception as e:
print(f"Error in LicensePlateAuthorizer: {e}")
self.output_queue.put(None)
finally:
self.input_queue.task_done()
def _process_single_frame(self, frame):
license_plate_text, bbox = self.recognizer.forward(frame)
if license_plate_text is None:
return frame, None, None, False
authorized = self.verify_license_plate_authorization(license_plate_text)
frame = self.annotate_frame(frame, bbox, authorized)
return frame, license_plate_text, bbox, authorized
def verify_license_plate_authorization(self, license_plate_text: str) -> bool:
if license_plate_text == "":
return False
if self.current_license_plate_text != license_plate_text:
self.current_license_plate_text = license_plate_text
self.ocr_result_counts = {}
self.ocr_result_counts[license_plate_text] = (
self.ocr_result_counts.get(license_plate_text, 0) + 1
)
if self.ocr_result_counts[license_plate_text] >= self.confidence_threshold:
if self.database_path is not None:
authorized = self.check_authorization_from_database(license_plate_text)
return authorized
return False
def check_authorization_from_database(self, license_plate: str) -> bool:
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
query = "SELECT EXISTS(SELECT 1 FROM license_plates WHERE plate = ?)"
cursor.execute(query, (license_plate,))
exists = cursor.fetchone()[0]
conn.close()
return bool(exists)
except Exception as e:
print(f"Database error: {e}")
return False
def annotate_frame(self, frame, bbox, authorized):
if bbox is not None:
color = (0, 255, 0) if authorized else (0, 0, 255)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
status = "Authorized" if authorized else "Unauthorized"
cv2.putText(
frame,
f"Status: {status}",
(bbox[0], bbox[1] - 10), # Top right above the bounding box
cv2.FONT_HERSHEY_SIMPLEX,
0.9,
color,
2,
)
return frame
def save_license_plate(self, license_plate: str) -> None:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS license_plates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plate TEXT UNIQUE
)"""
)
try:
cursor.execute(
"INSERT INTO license_plates (plate) VALUES (?)", (license_plate,)
)
conn.commit()
print(f"License plate '{license_plate}' added to database.")
except sqlite3.IntegrityError:
print(
f"License plate '{license_plate}' already exists in the database, skipping."
)
conn.close()
def clear_database(self) -> None:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM license_plates")
conn.commit()
conn.close()