License_plate / app.py
iamisha's picture
Upload 6 files
dd47838 verified
Raw
History Blame Contribute Delete
12.8 kB
import cv2
import tempfile
import streamlit as st
from ultralytics import YOLO, solutions
import os
import time
from collections import defaultdict
def main():
st.title('Licence Plate Recognition and Vehicle Counting')
st.sidebar.title('Settings')
st.markdown(
"""
<style>
[data-testid="stSidebar"][aria-expanded="true"] > div:first-child{width: 350px;}
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child{width: 350px; margin-left: -400px;}
</style>
""",
unsafe_allow_html=True,
)
enable_GPU = st.sidebar.checkbox('Enable GPU')
custom_classes = st.sidebar.checkbox('Use Custom Classes')
assigned_class_id = []
names = ['car', 'motorcycle', 'bus', 'train', 'truck', 'bike']
if custom_classes:
assigned_class = st.sidebar.multiselect('Select The Custom Classes', list(names), default='bike')
for each in assigned_class:
assigned_class_id.append(names.index(each))
media_type = st.sidebar.radio("Choose media type", ('Video', 'Image'))
# Load the YOLO models
license_plate_model = YOLO('best24.pt')
character_model = YOLO('best.pt') # replace with your character segmentation model
def process_image(image):
results = license_plate_model.predict(image)
return results
if media_type == 'Video':
video_file_buffer = st.sidebar.file_uploader("Upload a video", type=["mp4", "mov", 'avi', 'asf', 'm4v'])
DEMO_VIDEO = 'demo0.mp4'
tffile = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
if not video_file_buffer:
if os.path.exists(DEMO_VIDEO):
tffile.name = DEMO_VIDEO
vid = cv2.VideoCapture(DEMO_VIDEO)
else:
st.error(f"Demo video file not found: {DEMO_VIDEO}. Please upload a video.")
return
else:
tffile.write(video_file_buffer.read())
tffile.close() # Ensure the file is closed before using it
vid = cv2.VideoCapture(tffile.name)
dem_vid = open(tffile.name, 'rb')
demo_bytes = dem_vid.read()
dem_vid.close() # Ensure the file is closed after reading
stframe = st.empty()
st.info('Input Video')
# st.video(demo_bytes)
st.sidebar.markdown('---')
confidence = st.sidebar.slider('Confidence', min_value=0.0, max_value=1.0, value=0.25)
st.sidebar.markdown('---')
cap = cv2.VideoCapture(tffile.name)
if not cap.isOpened():
st.error(f"Error reading video file: {tffile.name}")
return
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
output_width = w
output_height = h
# Radio button to choose between vertical and horizontal line
line_orientation = st.sidebar.radio('Line Orientation', ('Horizontal', 'Vertical'))
# Slider for dynamic shift amount
if line_orientation == 'Horizontal':
vertical_shift = st.sidebar.slider('Shift Amount (Vertical Shift)', min_value=0, max_value=h, value=250, step=50)
line_points = [(0, vertical_shift), (w, vertical_shift)]
else:
horizontal_shift = st.sidebar.slider('Shift Amount (Horizontal Shift)', min_value=0, max_value=w, value=200, step=50)
line_points = [(horizontal_shift, 0), (horizontal_shift, h)]
st.sidebar.markdown('---')
# Display the initial frame with the line
ret, frame = cap.read()
if ret:
cv2.line(frame, line_points[0], line_points[1], (255, 0, 0), 2)
st.image(frame, channels="BGR")
kpi1, kpi2, kpi3 = st.columns(3)
with kpi1:
st.markdown("**Frame Rate**")
kpi1_text = st.markdown(fps)
with kpi2:
st.markdown("**Height**")
kpi2_text = st.markdown(h)
with kpi3:
st.markdown("**Width**")
kpi3_text = st.markdown(w)
if st.button('Process Video'):
classes_to_count = [0,1,2]
output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.avi').name
video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (output_width, output_height))
# Init Object Counter
counter = solutions.ObjectCounter(
view_img=False,
reg_pts=line_points,
names=license_plate_model.names,
draw_tracks=True,
line_thickness=2,
line_dist_thresh=50
)
while cap.isOpened():
success, im0 = cap.read()
if not success:
print("Video frame is empty or video processing has been successfully completed.")
break
tracks = license_plate_model.track(im0, persist=True, show=False, classes=classes_to_count)
im0 = counter.start_counting(im0, tracks)
video_writer.write(im0)
cap.release()
video_writer.release()
cv2.destroyAllWindows()
# Provide a download link for the processed video
if os.path.exists(output_video_path):
with open(output_video_path, 'rb') as f:
st.download_button(
label="Download Processed Video",
data=f,
file_name="processed_video.avi",
mime="video/avi"
)
else:
st.error(f"Processed video file not found: {output_video_path}")
time.sleep(1) # Wait for a second before attempting to delete the file
def is_file_in_use(filepath):
try:
os.rename(filepath, filepath)
return False
except OSError:
return True
if not is_file_in_use(tffile.name):
try:
os.remove(tffile.name)
except Exception as e:
st.write(f"Error deleting temporary video file: {e}")
if not is_file_in_use(output_video_path):
try:
os.remove(output_video_path)
except Exception as e:
st.write(f"Error deleting output video file: {e}")
else:
image_file_buffer = st.sidebar.file_uploader("Upload an image", type=["jpg", "jpeg", "png"])
if image_file_buffer is not None:
tffile = tempfile.NamedTemporaryFile(delete=False)
tffile.write(image_file_buffer.read())
tffile.close() # Ensure the file is closed before using it
img = cv2.imread(tffile.name)
st.text('Input Image')
st.image(img, use_column_width=True)
if st.button('Process Image'):
results = process_image(img)
license_plate_class_index = 1 # Update this if needed for your model
detected_classes = [] # To store detected classes from character segmentation
# Filter results for license plate class
for result in results:
filtered_boxes = [box for box in result.boxes if int(box.cls.item()) == license_plate_class_index]
for box in filtered_boxes:
try:
cls = int(box.cls.item())
conf = float(box.conf.item())
label = f"{license_plate_model.names[cls]} {conf:.2f}"
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
# Extract the license plate region, resize it, and pass it to the character model
license_plate_region = img[y1:y2, x1:x2]
zoomed_license_plate = cv2.resize(license_plate_region, (224, 224)) # Adjust size as needed
char_results = character_model.predict(zoomed_license_plate)
# Draw character boxes on the original image
char_boxes = []
for char_result in char_results:
for char_box in char_result.boxes:
try:
char_cls = int(char_box.cls.item())
char_conf = float(char_box.conf.item())
char_label = f"{character_model.names[char_cls]} {char_conf:.2f}"
char_x1, char_y1, char_x2, char_y2 = map(int, char_box.xyxy[0].tolist())
# Adjust coordinates relative to the original image
orig_char_x1 = x1 + char_x1 * ((x2 - x1) / 224)
orig_char_y1 = y1 + char_y1 * ((y2 - y1) / 224)
orig_char_x2 = x1 + char_x2 * ((x2 - x1) / 224)
orig_char_y2 = y1 + char_y2 * ((y2 - y1) / 224)
cv2.rectangle(img, (int(orig_char_x1), int(orig_char_y1)), (int(orig_char_x2), int(orig_char_y2)), (0, 255, 0), 2)
cv2.putText(img, char_label, (int(orig_char_x1), int(orig_char_y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
detected_classes.append((character_model.names[char_cls], orig_char_x1, orig_char_y1))
char_boxes.append(((character_model.names[char_cls], orig_char_x1, orig_char_y1), (int(orig_char_x1), int(orig_char_y1), int(orig_char_x2), int(orig_char_y2))))
except Exception as e:
st.write(f"Error processing character box: {e}")
st.write(f"Character box data: {char_box}")
except Exception as e:
st.write(f"Error processing license plate box: {e}")
st.write(f"License plate box data: {box}")
st.image(img, caption='Processed Image', use_column_width=True)
# Group detected classes by their y-coordinates
grouped_classes = defaultdict(list)
for cls, x, y in detected_classes:
grouped_classes[int(y // 20)].append((cls, x)) # Adjust the divisor (20) as needed based on character height
# Sort each group by x-coordinate
for key in grouped_classes:
grouped_classes[key].sort(key=lambda x: x[1])
# Print detected classes in order for each row
st.sidebar.markdown("### Detected Character Classes (Ordered by Rows)")
row_counter = 1
for key in sorted(grouped_classes.keys()):
st.sidebar.write(f"Row {row_counter}:")
c = ""
for cls, _ in grouped_classes[key]:
c += cls
st.sidebar.write(c)
row_counter += 1
# Check if any detected class belongs to Bagmati State
if any("BA" in cls or "Bagmati" in cls for cls, _, _ in detected_classes):
st.sidebar.write("It belongs to Bagmati State")
time.sleep(1) # Wait for a second before attempting to delete the file
def is_file_in_use(filepath):
try:
os.rename(filepath, filepath)
return False
except OSError:
return True
if not is_file_in_use(tffile.name):
try:
os.remove(tffile.name)
except Exception as e:
st.write(f"Error deleting temporary image file: {e}")
if __name__ == '__main__':
main()