Spaces:
Build error
Build error
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from ultralytics import solutions | |
| from tqdm import tqdm | |
| import tempfile | |
| import os | |
| # Global variables to store points and first frame | |
| points = [] | |
| first_frame = None | |
| video_path = None | |
| def extract_first_frame(video): | |
| """Extract first frame from uploaded video""" | |
| global first_frame, video_path, points | |
| points = [] # Reset points when new video is uploaded | |
| if video is None: | |
| return None, "Please upload a video first" | |
| video_path = video | |
| cap = cv2.VideoCapture(video) | |
| if not cap.isOpened(): | |
| return None, "Error: Could not open video file" | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| return None, "Error: Could not read video frame" | |
| first_frame = frame.copy() | |
| # Convert BGR to RGB for display | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| return frame_rgb, "Video loaded! Click on the image to select 2 points for the counting line." | |
| def select_points(evt: gr.SelectData, image): | |
| """Handle point selection on the image""" | |
| global points, first_frame | |
| if first_frame is None: | |
| return image, "Please upload a video first" | |
| if len(points) >= 2: | |
| return image, "Two points already selected. Click 'Reset Points' to start over." | |
| # Get click coordinates | |
| x, y = evt.index[0], evt.index[1] | |
| points.append((x, y)) | |
| # Draw points and line on image | |
| frame_display = first_frame.copy() | |
| # Draw all points | |
| for i, point in enumerate(points): | |
| cv2.circle(frame_display, point, 8, (0, 255, 0), -1) | |
| label = "Start" if i == 0 else "End" | |
| cv2.putText(frame_display, label, (point[0] + 10, point[1]), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| # Draw line if both points selected | |
| if len(points) == 2: | |
| cv2.line(frame_display, points[0], points[1], (0, 255, 0), 3) | |
| # Convert BGR to RGB for display | |
| frame_rgb = cv2.cvtColor(frame_display, cv2.COLOR_BGR2RGB) | |
| status = f"Points selected: {len(points)}/2" | |
| if len(points) == 2: | |
| status += " - Ready to process! Click 'Start Processing'" | |
| return frame_rgb, status | |
| def reset_points(): | |
| """Reset selected points""" | |
| global points, first_frame | |
| points = [] | |
| if first_frame is None: | |
| return None, "No video loaded" | |
| # Show original frame without points | |
| frame_rgb = cv2.cvtColor(first_frame, cv2.COLOR_BGR2RGB) | |
| return frame_rgb, "Points reset. Click on the image to select new points." | |
| def process_video(progress=gr.Progress()): | |
| """Process video with selected counting line""" | |
| global points, video_path | |
| if video_path is None: | |
| return None, "Please upload a video first" | |
| if len(points) != 2: | |
| return None, f"Please select exactly 2 points. Currently selected: {len(points)}" | |
| # Create region points in the format expected by ObjectCounter | |
| region_points = [points[0], points[1]] | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, "Error: Could not open video file" | |
| # Get video properties | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Create temporary output file | |
| output_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name | |
| # Video writer | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| video_writer = cv2.VideoWriter(output_path, fourcc, fps, (w, h)) | |
| # Initialize ObjectCounter | |
| counter = solutions.ObjectCounter( | |
| show=False, # Don't display during processing | |
| region=region_points, | |
| model="yolo11n.pt", | |
| classes=[2, 3], # car and motorcycle | |
| show_in=True, | |
| line_width=2, | |
| ) | |
| # Process video with progress bar | |
| frame_count = 0 | |
| progress(0, desc="Starting processing...") | |
| for frame_count in tqdm(range(total_frames), desc="Processing video"): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Process frame | |
| results = counter(frame) | |
| # Write processed frame | |
| video_writer.write(results.plot_im) | |
| # Update progress | |
| progress((frame_count + 1) / total_frames, | |
| desc=f"Processing frame {frame_count + 1}/{total_frames}") | |
| # Release resources | |
| cap.release() | |
| video_writer.release() | |
| # Get final counts | |
| status = f"✅ Processing complete!\n" | |
| status += f"Total frames processed: {frame_count + 1}\n" | |
| status += f"Output saved to: {output_path}" | |
| return output_path, status | |
| # Create Gradio interface | |
| with gr.Blocks(title="Vehicle Counter", theme=gr.themes.Soft()) as app: | |
| gr.Markdown( | |
| """ | |
| <h1><center>🚗 Vehicle Counter</center></h1> | |
| Upload a video, select a counting line by clicking two points, and process the video to count vehicles. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video(label="Upload Video") | |
| load_btn = gr.Button("Load Video", variant="primary") | |
| gr.Markdown("### Instructions") | |
| gr.Markdown( | |
| """ | |
| 1. Upload a video file | |
| 2. Click 'Load Video' to extract first frame | |
| 3. Click on the image to select **Start** point | |
| 4. Click again to select **End** point | |
| 5. Click 'Start Processing' to count vehicles | |
| """ | |
| ) | |
| reset_btn = gr.Button("Reset Points", variant="secondary") | |
| process_btn = gr.Button("Start Processing", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| frame_display = gr.Image(label="Video Frame - Click to select points", type="numpy") | |
| status_text = gr.Textbox(label="Status", lines=3, interactive=False) | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| output_video = gr.Video(label="Processed Video with Counts") | |
| # Event handlers | |
| load_btn.click( | |
| fn=extract_first_frame, | |
| inputs=[video_input], | |
| outputs=[frame_display, status_text] | |
| ) | |
| frame_display.select( | |
| fn=select_points, | |
| inputs=[frame_display], | |
| outputs=[frame_display, status_text] | |
| ) | |
| reset_btn.click( | |
| fn=reset_points, | |
| inputs=[], | |
| outputs=[frame_display, status_text] | |
| ) | |
| process_btn.click( | |
| fn=process_video, | |
| inputs=[], | |
| outputs=[output_video, status_text] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| app.launch() |