YOLO / app.py
wesam0099's picture
Update app.py
42d050d verified
import streamlit as st
import os
import tempfile
from PIL import Image
import numpy as np
import requests
from io import BytesIO
import cv2
from concurrent.futures import ThreadPoolExecutor
from pytube import YouTube
from ultralytics import YOLO
def load_yolov8_model(model_name='yolov8s.pt'):
try:
model = YOLO(model_name)
return model
except Exception as e:
st.error(f"Error loading model: {e}")
return None
def detect_objects(image, model):
try:
results = model(image)
for result in results[0].boxes:
x1, y1, x2, y2 = map(int, result.xyxy[0])
label = model.names[int(result.cls)]
confidence = result.conf.item()
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
label_text = f'{label} {confidence:.2f}'
cv2.putText(image, label_text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return image
except Exception as e:
st.error(f"Error during object detection: {e}")
return None
def process_image_with_yolov8(model_name, image=None, url=None):
model = load_yolov8_model(model_name)
if model is None:
return None
if url:
try:
response = requests.get(url)
image = Image.open(BytesIO(response.content))
except Exception as e:
st.error(f"Error loading image from URL: {e}")
return None
try:
image = np.array(image)
output_image = detect_objects(image, model)
return output_image
except Exception as e:
st.error(f"Error processing image: {e}")
return None
def download_youtube_video(youtube_url):
try:
yt = YouTube(youtube_url)
stream = yt.streams.filter(file_extension='mp4', res="720p").first()
if not stream:
stream = yt.streams.filter(file_extension='mp4').first()
output_path = os.path.join(tempfile.gettempdir(), "downloaded_video.mp4")
stream.download(output_path=os.path.dirname(output_path), filename=os.path.basename(output_path))
return output_path
except Exception as e:
st.error(f"Failed to retrieve video from YouTube. Error: {e}")
return None
def process_video(input_video_path, output_video_path, model):
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
st.error(f"Error: Cannot open video file {input_video_path}")
return None
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
def process_frame(frame):
return detect_objects(frame, model)
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
with ThreadPoolExecutor() as executor:
processed_frames = list(executor.map(process_frame, frames))
for processed_frame in processed_frames:
out.write(processed_frame)
cap.release()
out.release()
return output_video_path
def app():
st.title("YOLOv8 Object Detection on Images and Videos")
model_choice = st.radio("Select Model", ["yolov8s.pt", "yolov8m.pt"])
option = st.selectbox("Choose an option", ["Image Detection", "Video Detection"])
if option == "Image Detection":
input_choice = st.radio("Select Input Method", ["Upload", "URL"])
if input_choice == "Upload":
input_image = st.file_uploader("Upload Image", type=["jpg", "jpeg", "png"])
if input_image is not None:
image = Image.open(input_image)
output_image = process_image_with_yolov8(model_choice, image=image)
if output_image is not None:
st.image(output_image, caption="Processed Image", use_column_width=True)
elif input_choice == "URL":
input_url = st.text_input("Image URL", placeholder="Enter URL of the image")
if input_url:
output_image = process_image_with_yolov8(model_choice, url=input_url)
if output_image is not None:
st.image(output_image, caption="Processed Image", use_column_width=True)
elif option == "Video Detection":
local_video = st.file_uploader("Upload Local Video", type=["mp4"])
video_url = st.text_input("YouTube Video URL")
input_path = None # Initialize input_path
if local_video:
input_path = os.path.join(tempfile.gettempdir(), local_video.name)
with open(input_path, "wb") as f:
f.write(local_video.getbuffer())
elif video_url:
input_path = download_youtube_video(video_url)
if input_path:
output_path = os.path.join(tempfile.gettempdir(), "processed_video.mp4")
result = process_video(input_path, output_path, load_yolov8_model(model_choice))
if result:
st.video(result)
if __name__ == "__main__":
app()