File size: 5,106 Bytes
bd5a354 bdfea3b bd5a354 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import queue
import sqlite3
import threading
from typing import Dict
import cv2
import easyocr
import torch.nn as nn
from LicensePlateDetector import LicensePlateDetector
from LicensePlateReader import LicensePlateReader
from LicensePlateRecognizer import LicensePlateRecognizer
from ultralytics import YOLO
class LicensePlateAuthorizer(nn.Module):
def __init__(
self,
detector_model: str = "./license_plate_192.pt",
database_path: str = None,
confidence_threshold: int = 5,
queue_size: int = 10,
):
super(LicensePlateAuthorizer, self).__init__()
detector = LicensePlateDetector(YOLO(detector_model))
reader = LicensePlateReader(easyocr.Reader(["en"], gpu=True))
self.recognizer = LicensePlateRecognizer(detector, reader)
self.database_path = database_path
self.confidence_threshold = confidence_threshold
self.ocr_result_counts: Dict[str, int] = {}
self.current_license_plate_text = None
self.input_queue = queue.Queue(maxsize=queue_size)
self.output_queue = queue.Queue(maxsize=queue_size)
self.processing_thread = threading.Thread(
target=self._process_queue, daemon=True
)
self.processing_thread.start()
def forward(self, frame):
self.input_queue.put(frame)
result = self.output_queue.get()
if result is None:
return None
return result[0]
def _process_queue(self):
while True:
frame = self.input_queue.get()
try:
result = self._process_single_frame(frame)
self.output_queue.put(result)
except Exception as e:
print(f"Error in LicensePlateAuthorizer: {e}")
self.output_queue.put(None)
finally:
self.input_queue.task_done()
def _process_single_frame(self, frame):
license_plate_text, bbox = self.recognizer.forward(frame)
if license_plate_text is None:
return frame, None, None, False
authorized = self.verify_license_plate_authorization(license_plate_text)
frame = self.annotate_frame(frame, bbox, authorized)
return frame, license_plate_text, bbox, authorized
def verify_license_plate_authorization(self, license_plate_text: str) -> bool:
if license_plate_text == "":
return False
if self.current_license_plate_text != license_plate_text:
self.current_license_plate_text = license_plate_text
self.ocr_result_counts = {}
self.ocr_result_counts[license_plate_text] = (
self.ocr_result_counts.get(license_plate_text, 0) + 1
)
if self.ocr_result_counts[license_plate_text] >= self.confidence_threshold:
if self.database_path is not None:
authorized = self.check_authorization_from_database(license_plate_text)
return authorized
return False
def check_authorization_from_database(self, license_plate: str) -> bool:
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
query = "SELECT EXISTS(SELECT 1 FROM license_plates WHERE plate = ?)"
cursor.execute(query, (license_plate,))
exists = cursor.fetchone()[0]
conn.close()
return bool(exists)
except Exception as e:
print(f"Database error: {e}")
return False
def annotate_frame(self, frame, bbox, authorized):
if bbox is not None:
color = (0, 255, 0) if authorized else (0, 0, 255)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
status = "Authorized" if authorized else "Unauthorized"
cv2.putText(
frame,
f"Status: {status}",
(bbox[0], bbox[1] - 10), # Top right above the bounding box
cv2.FONT_HERSHEY_SIMPLEX,
0.9,
color,
2,
)
return frame
def save_license_plate(self, license_plate: str) -> None:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS license_plates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plate TEXT UNIQUE
)"""
)
try:
cursor.execute(
"INSERT INTO license_plates (plate) VALUES (?)", (license_plate,)
)
conn.commit()
print(f"License plate '{license_plate}' added to database.")
except sqlite3.IntegrityError:
print(
f"License plate '{license_plate}' already exists in the database, skipping."
)
conn.close()
def clear_database(self) -> None:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM license_plates")
conn.commit()
conn.close()
|