Spaces:
Sleeping
Sleeping
| """ | |
| Smart Public Queue Traffic Analyzer & Decision Assistant | |
| Final Year Engineering Project | |
| Uses Computer Vision (OpenCV HOG+SVM) for Queue Analysis | |
| """ | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from matplotlib.figure import Figure | |
| from PIL import Image | |
| import io | |
| import tempfile | |
| import os | |
| class QueueAnalyzer: | |
| """Core queue analysis engine using OpenCV HOG + SVM""" | |
| def __init__(self): | |
| # Initialize HOG descriptor with default people detector | |
| self.hog = cv2.HOGDescriptor() | |
| self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector()) | |
| # Queue type service time rules (minutes per person) | |
| self.service_times = { | |
| "College Office": 2, | |
| "Hospital": 5, | |
| "Railway Counter": 3, | |
| "Supermarket": 1 | |
| } | |
| def detect_people(self, image): | |
| """ | |
| Detect people in image using HOG+SVM | |
| Returns: list of bounding boxes and count | |
| """ | |
| # Resize for better performance on CPU | |
| height, width = image.shape[:2] | |
| scale = 1.0 | |
| if width > 800: | |
| scale = 800 / width | |
| image = cv2.resize(image, None, fx=scale, fy=scale) | |
| # Detect people | |
| # Parameters tuned for CPU performance | |
| boxes, weights = self.hog.detectMultiScale( | |
| image, | |
| winStride=(8, 8), | |
| padding=(4, 4), | |
| scale=1.05, | |
| useMeanshiftGrouping=False | |
| ) | |
| # Scale boxes back if image was resized | |
| if scale != 1.0: | |
| boxes = [[int(x/scale), int(y/scale), int(w/scale), int(h/scale)] | |
| for x, y, w, h in boxes] | |
| return boxes, len(boxes) | |
| def annotate_image(self, image, boxes, count, wait_time, decision): | |
| """Draw bounding boxes and overlay information""" | |
| annotated = image.copy() | |
| # Draw bounding boxes | |
| for (x, y, w, h) in boxes: | |
| cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
| # Prepare overlay text | |
| overlay_height = 120 | |
| overlay = annotated.copy() | |
| cv2.rectangle(overlay, (0, 0), (annotated.shape[1], overlay_height), | |
| (0, 0, 0), -1) | |
| cv2.addWeighted(overlay, 0.7, annotated, 0.3, 0, annotated) | |
| # Add text information | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| cv2.putText(annotated, f"People Count: {count}", (10, 30), | |
| font, 1, (255, 255, 255), 2) | |
| cv2.putText(annotated, f"Wait Time: {wait_time:.0f} min", (10, 70), | |
| font, 1, (255, 255, 255), 2) | |
| # Color-coded decision | |
| decision_color = self._get_decision_color(wait_time) | |
| cv2.putText(annotated, f"Decision: {decision}", (10, 110), | |
| font, 1, decision_color, 2) | |
| return annotated | |
| def _get_decision_color(self, wait_time): | |
| """Get color for decision based on wait time""" | |
| if wait_time < 10: | |
| return (0, 255, 0) # Green | |
| elif wait_time <= 20: | |
| return (0, 255, 255) # Yellow | |
| else: | |
| return (0, 0, 255) # Red | |
| def calculate_wait_time(self, queue_size, queue_type): | |
| """Calculate estimated waiting time""" | |
| service_time = self.service_times.get(queue_type, 2) | |
| return queue_size * service_time | |
| def make_decision(self, wait_time): | |
| """Generate decision recommendation""" | |
| if wait_time < 10: | |
| return "π’ Go Now", "success" | |
| elif wait_time <= 20: | |
| return "π‘ Moderate Wait", "warning" | |
| else: | |
| return "π΄ Come Later", "error" | |
| def process_image(self, image_path, queue_type, show_analytics): | |
| """Process single image""" | |
| # Read image | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| return None, "Error: Could not read image", None | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # Detect people | |
| boxes, count = self.detect_people(image) | |
| # Calculate metrics | |
| wait_time = self.calculate_wait_time(count, queue_type) | |
| decision_text, decision_type = self.make_decision(wait_time) | |
| # Annotate image | |
| annotated = self.annotate_image(image, boxes, count, wait_time, decision_text) | |
| # Prepare metrics | |
| metrics = f""" | |
| ### π Queue Analysis Results | |
| **π₯ People Count:** {count} | |
| **β± Estimated Waiting Time:** {wait_time:.0f} minutes | |
| **π― Decision:** {decision_text} | |
| """ | |
| # Generate analytics chart if requested | |
| chart = None | |
| if show_analytics and count > 0: | |
| chart = self._create_simple_bar_chart(count) | |
| return annotated, metrics, chart | |
| def process_video(self, video_path, queue_type, show_analytics): | |
| """Process video by sampling frames""" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, "Error: Could not read video", None | |
| frame_counts = [] | |
| frame_indices = [] | |
| last_annotated = None | |
| frame_idx = 0 | |
| sample_interval = 10 # Process every 10th frame for CPU efficiency | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Sample frames | |
| if frame_idx % sample_interval == 0: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| boxes, count = self.detect_people(frame_rgb) | |
| frame_counts.append(count) | |
| frame_indices.append(frame_idx) | |
| # Keep last frame for annotation | |
| if len(frame_counts) > 0: | |
| wait_time = self.calculate_wait_time(count, queue_type) | |
| decision_text, _ = self.make_decision(wait_time) | |
| last_annotated = self.annotate_image(frame_rgb, boxes, count, | |
| wait_time, decision_text) | |
| frame_idx += 1 | |
| cap.release() | |
| if len(frame_counts) == 0: | |
| return None, "Error: No frames could be processed", None | |
| # Calculate statistics | |
| avg_count = np.mean(frame_counts) | |
| max_count = np.max(frame_counts) | |
| # Use average count for decision | |
| wait_time = self.calculate_wait_time(avg_count, queue_type) | |
| decision_text, decision_type = self.make_decision(wait_time) | |
| # Prepare metrics | |
| metrics = f""" | |
| ### π Queue Analysis Results (Video) | |
| **π₯ Average People Count:** {avg_count:.1f} | |
| **π₯ Maximum People Count:** {max_count} | |
| **πΉ Frames Analyzed:** {len(frame_counts)} | |
| **β± Estimated Waiting Time:** {wait_time:.0f} minutes | |
| **π― Decision:** {decision_text} | |
| """ | |
| # Generate analytics charts | |
| chart = None | |
| if show_analytics: | |
| chart = self._create_video_analytics(frame_indices, frame_counts, | |
| avg_count, max_count) | |
| return last_annotated, metrics, chart | |
| def _create_simple_bar_chart(self, count): | |
| """Create simple bar chart for image analysis""" | |
| fig = Figure(figsize=(8, 4)) | |
| ax = fig.add_subplot(111) | |
| ax.bar(['Detected People'], [count], color='#2196F3', width=0.4) | |
| ax.set_ylabel('Count', fontsize=12) | |
| ax.set_title('People Detection Result', fontsize=14, fontweight='bold') | |
| ax.grid(axis='y', alpha=0.3) | |
| # Convert to image | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format='png', bbox_inches='tight', dpi=100) | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| plt.close(fig) | |
| return img | |
| def _create_video_analytics(self, frame_indices, frame_counts, avg_count, max_count): | |
| """Create analytics charts for video analysis""" | |
| fig = Figure(figsize=(14, 5)) | |
| # Line chart: People count over frames | |
| ax1 = fig.add_subplot(121) | |
| ax1.plot(frame_indices, frame_counts, marker='o', linewidth=2, | |
| markersize=4, color='#2196F3', label='Detected People') | |
| ax1.axhline(y=avg_count, color='#FF9800', linestyle='--', | |
| linewidth=2, label=f'Average: {avg_count:.1f}') | |
| ax1.set_xlabel('Frame Index', fontsize=11) | |
| ax1.set_ylabel('People Count', fontsize=11) | |
| ax1.set_title('People Count Over Time', fontsize=13, fontweight='bold') | |
| ax1.legend() | |
| ax1.grid(True, alpha=0.3) | |
| # Bar chart: Statistics | |
| ax2 = fig.add_subplot(122) | |
| metrics = ['Average', 'Maximum'] | |
| values = [avg_count, max_count] | |
| colors = ['#4CAF50', '#F44336'] | |
| bars = ax2.bar(metrics, values, color=colors, width=0.5) | |
| ax2.set_ylabel('People Count', fontsize=11) | |
| ax2.set_title('Queue Statistics', fontsize=13, fontweight='bold') | |
| ax2.grid(axis='y', alpha=0.3) | |
| # Add value labels on bars | |
| for bar, value in zip(bars, values): | |
| height = bar.get_height() | |
| ax2.text(bar.get_x() + bar.get_width()/2., height, | |
| f'{value:.1f}', | |
| ha='center', va='bottom', fontsize=10, fontweight='bold') | |
| fig.tight_layout() | |
| # Convert to image | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format='png', bbox_inches='tight', dpi=100) | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| plt.close(fig) | |
| return img | |
| def analyze_queue(file, queue_type, show_analytics): | |
| """Main analysis function called by Gradio""" | |
| if file is None: | |
| return None, "β οΈ Please upload an image or video file.", None | |
| analyzer = QueueAnalyzer() | |
| # Determine file type | |
| file_ext = os.path.splitext(file.name)[1].lower() | |
| try: | |
| if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.webp']: | |
| # Process as image | |
| return analyzer.process_image(file.name, queue_type, show_analytics) | |
| elif file_ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']: | |
| # Process as video | |
| return analyzer.process_video(file.name, queue_type, show_analytics) | |
| else: | |
| return None, "β Unsupported file format. Please upload an image or video.", None | |
| except Exception as e: | |
| return None, f"β Error processing file: {str(e)}", None | |
| # Build Gradio Interface | |
| def create_interface(): | |
| """Create professional Gradio Blocks interface""" | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Smart Queue Analyzer") as app: | |
| # Header | |
| gr.Markdown(""" | |
| # π― Smart Public Queue Traffic Analyzer | |
| ### AI-Powered Decision Assistant Using Computer Vision | |
| Upload an image or video of a public queue to get instant analysis and recommendations. | |
| """) | |
| with gr.Row(): | |
| # Input Section | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ Input") | |
| file_input = gr.File( | |
| label="Upload Image or Video", | |
| file_types=["image", "video"], | |
| type="filepath" | |
| ) | |
| queue_type = gr.Dropdown( | |
| choices=["College Office", "Hospital", "Railway Counter", "Supermarket"], | |
| value="College Office", | |
| label="Queue Type", | |
| info="Select the type of queue for accurate wait time estimation" | |
| ) | |
| show_analytics = gr.Checkbox( | |
| label="Show Analytics Charts", | |
| value=True, | |
| info="Display detailed visualization (for video: trend analysis)" | |
| ) | |
| analyze_btn = gr.Button( | |
| "π Analyze Queue", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| **Supported Formats:** | |
| - Images: JPG, PNG, BMP, WEBP | |
| - Videos: MP4, AVI, MOV, MKV, WEBM | |
| **Decision Guide:** | |
| - π’ **Go Now**: < 10 min wait | |
| - π‘ **Moderate Wait**: 10-20 min | |
| - π΄ **Come Later**: > 20 min | |
| """) | |
| # Output Section | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π Analysis Results") | |
| output_image = gr.Image( | |
| label="Annotated Output", | |
| type="numpy", | |
| height=400 | |
| ) | |
| output_metrics = gr.Markdown( | |
| value="*Analysis results will appear here*" | |
| ) | |
| output_chart = gr.Image( | |
| label="Analytics Visualization", | |
| type="pil", | |
| visible=True | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| **Technology Stack:** OpenCV HOG+SVM | Gradio | Python | |
| **Project Type:** Final Year Engineering Project | |
| **Detection Method:** Histogram of Oriented Gradients (HOG) with SVM Classifier | |
| **Deployment:** Optimized for CPU-only environments (Hugging Face Spaces compatible) | |
| """) | |
| # Event Handler | |
| analyze_btn.click( | |
| fn=analyze_queue, | |
| inputs=[file_input, queue_type, show_analytics], | |
| outputs=[output_image, output_metrics, output_chart] | |
| ) | |
| return app | |
| # Launch Application | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch() | |