File size: 14,141 Bytes
d6afa35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
"""
Smart Public Queue Traffic Analyzer & Decision Assistant
Final Year Engineering Project
Uses Computer Vision (OpenCV HOG+SVM) for Queue Analysis
"""

import gradio as gr
import cv2
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from PIL import Image
import io
import tempfile
import os


class QueueAnalyzer:
    """Core queue analysis engine using OpenCV HOG + SVM"""
    
    def __init__(self):
        # Initialize HOG descriptor with default people detector
        self.hog = cv2.HOGDescriptor()
        self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
        
        # Queue type service time rules (minutes per person)
        self.service_times = {
            "College Office": 2,
            "Hospital": 5,
            "Railway Counter": 3,
            "Supermarket": 1
        }
    
    def detect_people(self, image):
        """
        Detect people in image using HOG+SVM
        Returns: list of bounding boxes and count
        """
        # Resize for better performance on CPU
        height, width = image.shape[:2]
        scale = 1.0
        if width > 800:
            scale = 800 / width
            image = cv2.resize(image, None, fx=scale, fy=scale)
        
        # Detect people
        # Parameters tuned for CPU performance
        boxes, weights = self.hog.detectMultiScale(
            image,
            winStride=(8, 8),
            padding=(4, 4),
            scale=1.05,
            useMeanshiftGrouping=False
        )
        
        # Scale boxes back if image was resized
        if scale != 1.0:
            boxes = [[int(x/scale), int(y/scale), int(w/scale), int(h/scale)] 
                     for x, y, w, h in boxes]
        
        return boxes, len(boxes)
    
    def annotate_image(self, image, boxes, count, wait_time, decision):
        """Draw bounding boxes and overlay information"""
        annotated = image.copy()
        
        # Draw bounding boxes
        for (x, y, w, h) in boxes:
            cv2.rectangle(annotated, (x, y), (x + w, y + h), (0, 255, 0), 2)
        
        # Prepare overlay text
        overlay_height = 120
        overlay = annotated.copy()
        cv2.rectangle(overlay, (0, 0), (annotated.shape[1], overlay_height), 
                     (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.7, annotated, 0.3, 0, annotated)
        
        # Add text information
        font = cv2.FONT_HERSHEY_SIMPLEX
        cv2.putText(annotated, f"People Count: {count}", (10, 30),
                   font, 1, (255, 255, 255), 2)
        cv2.putText(annotated, f"Wait Time: {wait_time:.0f} min", (10, 70),
                   font, 1, (255, 255, 255), 2)
        
        # Color-coded decision
        decision_color = self._get_decision_color(wait_time)
        cv2.putText(annotated, f"Decision: {decision}", (10, 110),
                   font, 1, decision_color, 2)
        
        return annotated
    
    def _get_decision_color(self, wait_time):
        """Get color for decision based on wait time"""
        if wait_time < 10:
            return (0, 255, 0)  # Green
        elif wait_time <= 20:
            return (0, 255, 255)  # Yellow
        else:
            return (0, 0, 255)  # Red
    
    def calculate_wait_time(self, queue_size, queue_type):
        """Calculate estimated waiting time"""
        service_time = self.service_times.get(queue_type, 2)
        return queue_size * service_time
    
    def make_decision(self, wait_time):
        """Generate decision recommendation"""
        if wait_time < 10:
            return "🟒 Go Now", "success"
        elif wait_time <= 20:
            return "🟑 Moderate Wait", "warning"
        else:
            return "πŸ”΄ Come Later", "error"
    
    def process_image(self, image_path, queue_type, show_analytics):
        """Process single image"""
        # Read image
        image = cv2.imread(image_path)
        if image is None:
            return None, "Error: Could not read image", None
        
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Detect people
        boxes, count = self.detect_people(image)
        
        # Calculate metrics
        wait_time = self.calculate_wait_time(count, queue_type)
        decision_text, decision_type = self.make_decision(wait_time)
        
        # Annotate image
        annotated = self.annotate_image(image, boxes, count, wait_time, decision_text)
        
        # Prepare metrics
        metrics = f"""
### πŸ“Š Queue Analysis Results

**πŸ‘₯ People Count:** {count}

**⏱ Estimated Waiting Time:** {wait_time:.0f} minutes

**🎯 Decision:** {decision_text}
"""
        
        # Generate analytics chart if requested
        chart = None
        if show_analytics and count > 0:
            chart = self._create_simple_bar_chart(count)
        
        return annotated, metrics, chart
    
    def process_video(self, video_path, queue_type, show_analytics):
        """Process video by sampling frames"""
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            return None, "Error: Could not read video", None
        
        frame_counts = []
        frame_indices = []
        last_annotated = None
        
        frame_idx = 0
        sample_interval = 10  # Process every 10th frame for CPU efficiency
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Sample frames
            if frame_idx % sample_interval == 0:
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                boxes, count = self.detect_people(frame_rgb)
                
                frame_counts.append(count)
                frame_indices.append(frame_idx)
                
                # Keep last frame for annotation
                if len(frame_counts) > 0:
                    wait_time = self.calculate_wait_time(count, queue_type)
                    decision_text, _ = self.make_decision(wait_time)
                    last_annotated = self.annotate_image(frame_rgb, boxes, count, 
                                                         wait_time, decision_text)
            
            frame_idx += 1
        
        cap.release()
        
        if len(frame_counts) == 0:
            return None, "Error: No frames could be processed", None
        
        # Calculate statistics
        avg_count = np.mean(frame_counts)
        max_count = np.max(frame_counts)
        
        # Use average count for decision
        wait_time = self.calculate_wait_time(avg_count, queue_type)
        decision_text, decision_type = self.make_decision(wait_time)
        
        # Prepare metrics
        metrics = f"""
### πŸ“Š Queue Analysis Results (Video)

**πŸ‘₯ Average People Count:** {avg_count:.1f}

**πŸ‘₯ Maximum People Count:** {max_count}

**πŸ“Ή Frames Analyzed:** {len(frame_counts)}

**⏱ Estimated Waiting Time:** {wait_time:.0f} minutes

**🎯 Decision:** {decision_text}
"""
        
        # Generate analytics charts
        chart = None
        if show_analytics:
            chart = self._create_video_analytics(frame_indices, frame_counts, 
                                                 avg_count, max_count)
        
        return last_annotated, metrics, chart
    
    def _create_simple_bar_chart(self, count):
        """Create simple bar chart for image analysis"""
        fig = Figure(figsize=(8, 4))
        ax = fig.add_subplot(111)
        
        ax.bar(['Detected People'], [count], color='#2196F3', width=0.4)
        ax.set_ylabel('Count', fontsize=12)
        ax.set_title('People Detection Result', fontsize=14, fontweight='bold')
        ax.grid(axis='y', alpha=0.3)
        
        # Convert to image
        buf = io.BytesIO()
        fig.savefig(buf, format='png', bbox_inches='tight', dpi=100)
        buf.seek(0)
        img = Image.open(buf)
        plt.close(fig)
        
        return img
    
    def _create_video_analytics(self, frame_indices, frame_counts, avg_count, max_count):
        """Create analytics charts for video analysis"""
        fig = Figure(figsize=(14, 5))
        
        # Line chart: People count over frames
        ax1 = fig.add_subplot(121)
        ax1.plot(frame_indices, frame_counts, marker='o', linewidth=2, 
                markersize=4, color='#2196F3', label='Detected People')
        ax1.axhline(y=avg_count, color='#FF9800', linestyle='--', 
                   linewidth=2, label=f'Average: {avg_count:.1f}')
        ax1.set_xlabel('Frame Index', fontsize=11)
        ax1.set_ylabel('People Count', fontsize=11)
        ax1.set_title('People Count Over Time', fontsize=13, fontweight='bold')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # Bar chart: Statistics
        ax2 = fig.add_subplot(122)
        metrics = ['Average', 'Maximum']
        values = [avg_count, max_count]
        colors = ['#4CAF50', '#F44336']
        
        bars = ax2.bar(metrics, values, color=colors, width=0.5)
        ax2.set_ylabel('People Count', fontsize=11)
        ax2.set_title('Queue Statistics', fontsize=13, fontweight='bold')
        ax2.grid(axis='y', alpha=0.3)
        
        # Add value labels on bars
        for bar, value in zip(bars, values):
            height = bar.get_height()
            ax2.text(bar.get_x() + bar.get_width()/2., height,
                    f'{value:.1f}',
                    ha='center', va='bottom', fontsize=10, fontweight='bold')
        
        fig.tight_layout()
        
        # Convert to image
        buf = io.BytesIO()
        fig.savefig(buf, format='png', bbox_inches='tight', dpi=100)
        buf.seek(0)
        img = Image.open(buf)
        plt.close(fig)
        
        return img


def analyze_queue(file, queue_type, show_analytics):
    """Main analysis function called by Gradio"""
    if file is None:
        return None, "⚠️ Please upload an image or video file.", None
    
    analyzer = QueueAnalyzer()
    
    # Determine file type
    file_ext = os.path.splitext(file.name)[1].lower()
    
    try:
        if file_ext in ['.jpg', '.jpeg', '.png', '.bmp', '.webp']:
            # Process as image
            return analyzer.process_image(file.name, queue_type, show_analytics)
        
        elif file_ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']:
            # Process as video
            return analyzer.process_video(file.name, queue_type, show_analytics)
        
        else:
            return None, "❌ Unsupported file format. Please upload an image or video.", None
    
    except Exception as e:
        return None, f"❌ Error processing file: {str(e)}", None


# Build Gradio Interface
def create_interface():
    """Create professional Gradio Blocks interface"""
    
    with gr.Blocks(theme=gr.themes.Soft(), title="Smart Queue Analyzer") as app:
        
        # Header
        gr.Markdown("""
        # 🎯 Smart Public Queue Traffic Analyzer
        ### AI-Powered Decision Assistant Using Computer Vision
        
        Upload an image or video of a public queue to get instant analysis and recommendations.
        """)
        
        with gr.Row():
            # Input Section
            with gr.Column(scale=1):
                gr.Markdown("### πŸ“€ Input")
                
                file_input = gr.File(
                    label="Upload Image or Video",
                    file_types=["image", "video"],
                    type="filepath"
                )
                
                queue_type = gr.Dropdown(
                    choices=["College Office", "Hospital", "Railway Counter", "Supermarket"],
                    value="College Office",
                    label="Queue Type",
                    info="Select the type of queue for accurate wait time estimation"
                )
                
                show_analytics = gr.Checkbox(
                    label="Show Analytics Charts",
                    value=True,
                    info="Display detailed visualization (for video: trend analysis)"
                )
                
                analyze_btn = gr.Button(
                    "πŸ” Analyze Queue",
                    variant="primary",
                    size="lg"
                )
                
                gr.Markdown("""
                ---
                **Supported Formats:**
                - Images: JPG, PNG, BMP, WEBP
                - Videos: MP4, AVI, MOV, MKV, WEBM
                
                **Decision Guide:**
                - 🟒 **Go Now**: < 10 min wait
                - 🟑 **Moderate Wait**: 10-20 min
                - πŸ”΄ **Come Later**: > 20 min
                """)
            
            # Output Section
            with gr.Column(scale=2):
                gr.Markdown("### πŸ“Š Analysis Results")
                
                output_image = gr.Image(
                    label="Annotated Output",
                    type="numpy",
                    height=400
                )
                
                output_metrics = gr.Markdown(
                    value="*Analysis results will appear here*"
                )
                
                output_chart = gr.Image(
                    label="Analytics Visualization",
                    type="pil",
                    visible=True
                )
        
        # Footer
        gr.Markdown("""
        ---
        **Technology Stack:** OpenCV HOG+SVM | Gradio | Python  
        **Project Type:** Final Year Engineering Project  
        **Detection Method:** Histogram of Oriented Gradients (HOG) with SVM Classifier  
        **Deployment:** Optimized for CPU-only environments (Hugging Face Spaces compatible)
        """)
        
        # Event Handler
        analyze_btn.click(
            fn=analyze_queue,
            inputs=[file_input, queue_type, show_analytics],
            outputs=[output_image, output_metrics, output_chart]
        )
    
    return app


# Launch Application
if __name__ == "__main__":
    app = create_interface()
    app.launch()