Spaces:
Runtime error
Runtime error
| import matplotlib | |
| matplotlib.use('Agg') | |
| import streamlit as st | |
| import cv2 | |
| import numpy as np | |
| from yolov5 import YOLOv5 | |
| from sort.sort import Sort | |
| import tempfile | |
| import shutil | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, ImageSequenceClip | |
| import os | |
| # Load the pre-trained model and initialize the SORT tracker | |
| model_path = 'yolov5s.pt' # Ensure this path points to the model file | |
| model = YOLOv5(model_path, device='cpu') | |
| tracker = Sort() | |
| def process_video(uploaded_file): | |
| # Save the uploaded file to a temporary location | |
| temp_file_path = "temp_video.mp4" | |
| with open(temp_file_path, "wb") as temp_file: | |
| temp_file.write(uploaded_file.getbuffer()) | |
| # Use moviepy to read the video file | |
| video_clip = VideoFileClip(temp_file_path) | |
| total_frames = int(video_clip.fps * video_clip.duration) | |
| width, height = video_clip.size | |
| # Temporary directory to save processed video frames | |
| temp_dir = tempfile.mkdtemp() | |
| unique_cars = set() | |
| progress_bar = st.progress(0) | |
| for frame_idx, frame in enumerate(video_clip.iter_frames()): | |
| frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| progress_percentage = int((frame_idx + 1) / total_frames * 100) | |
| progress_bar.progress(progress_percentage) | |
| # Detection and tracking logic | |
| results = model.predict(frame) | |
| preds = results.pandas().xyxy[0] | |
| detections = [] | |
| for index, row in preds.iterrows(): | |
| if row['name'] == 'car': | |
| xmin, ymin, xmax, ymax, conf = row['xmin'], row['ymin'], row['xmax'], row['ymax'], row['confidence'] | |
| detections.append([xmin, ymin, xmax, ymax, conf]) | |
| if detections: | |
| detections_np = np.array(detections) | |
| trackers = tracker.update(detections_np) | |
| for d in trackers: | |
| unique_cars.add(int(d[4])) | |
| xmin, ymin, xmax, ymax = map(int, d[:4]) | |
| cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (255, 0, 0), 2) | |
| cv2.putText(frame, f'ID: {int(d[4])}', (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) | |
| cv2.putText(frame, f'Unique Cars: {len(unique_cars)}', (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.25, (0, 255, 0), 2) | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert back to RGB for moviepy | |
| cv2.imwrite(f"{temp_dir}/{frame_idx:04d}.jpg", frame) | |
| frames_files = [os.path.join(temp_dir, f"{i:04d}.jpg") for i in range(total_frames)] | |
| clip = ImageSequenceClip(frames_files, fps=video_clip.fps) | |
| output_video_path = 'processed_video.mp4' | |
| clip.write_videofile(output_video_path, codec='libx264') # Use libx264 codec for compatibility | |
| # Remove temporary directory and temporary files | |
| shutil.rmtree(temp_dir) | |
| return output_video_path | |
| def main(): | |
| # Initialize session state variables if they don't exist | |
| if 'output_video_path' not in st.session_state: | |
| st.session_state.output_video_path = None | |
| st.sidebar.image("logo.jpg", use_column_width=True) | |
| uploaded_file = st.sidebar.file_uploader("Upload a video", type=['mp4']) | |
| st.title("Car Detection and Tracking") | |
| if uploaded_file is not None: | |
| # Process the video only if it hasn't been processed yet or a new file is uploaded | |
| if st.session_state.output_video_path is None or st.session_state.uploaded_file_name != uploaded_file.name: | |
| st.session_state.uploaded_file_name = uploaded_file.name | |
| st.session_state.output_video_path = process_video(uploaded_file) | |
| # Display the processed video | |
| st.video(st.session_state.output_video_path) | |
| # Provide a download link for the processed video | |
| with open(st.session_state.output_video_path, "rb") as file: | |
| st.download_button("Download Processed Video", file, file_name="processed_video.mp4") | |
| if __name__ == "__main__": | |
| main() | |