import gradio as gr import cv2 import numpy as np import json import os import logging import traceback import tempfile from typing import Optional, Tuple QUEUE_MONITOR_AVAILABLE = False LLM_ANALYZER_AVAILABLE = False UTILS_AVAILABLE = False try: from queue_monitor import QueueMonitor QUEUE_MONITOR_AVAILABLE = True except ImportError as e: logging.warning(f"QueueMonitor import error: {e}. Video/image processing will be disabled.") try: from llm_analyzer import LogAnalyzer LLM_ANALYZER_AVAILABLE = True except ImportError as e: logging.warning(f"LogAnalyzer import error: {e}. LLM analysis will be disabled.") try: from utils import ( is_valid_youtube_url, download_youtube_video, get_youtube_info, YT_DOWNLOADER_AVAILABLE ) UTILS_AVAILABLE = True except ImportError as e: logging.warning(f"Utils import error: {e}. YouTube download will be disabled.") YT_DOWNLOADER_AVAILABLE = False logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) monitor = None analyzer = None DEFAULT_ZONE = np.array([[100, 100], [1100, 100], [1100, 600], [100, 600]]) EXAMPLE_VIDEO_URL = "https://youtu.be/5rkwqp6nnr4?si=itvwJ-oSR0S8xSZQ" EXAMPLE_VIDEO_CACHED = False EXAMPLE_VIDEO_PATH: Optional[str] = None def initialize_monitor(confidence: float = 0.3): global monitor if not QUEUE_MONITOR_AVAILABLE: logger.error("QueueMonitor not available. Please check imports.") return None try: if monitor is None: logger.info("Initializing QueueMonitor...") monitor = QueueMonitor(confidence=confidence, fps=30.0) monitor.setup_zones([DEFAULT_ZONE]) logger.info("QueueMonitor initialized successfully") return monitor except Exception as e: logger.error(f"Failed to initialize monitor: {e}") return None def initialize_analyzer(): global analyzer if not LLM_ANALYZER_AVAILABLE: logger.error("LogAnalyzer not available. Please check imports.") return None try: if analyzer is None: logger.info("Initializing LogAnalyzer...") hf_token = os.getenv("HF_TOKEN") analyzer = LogAnalyzer(hf_token=hf_token) logger.info("LogAnalyzer initialized successfully") return analyzer except Exception as e: logger.error(f"Failed to initialize analyzer: {e}") return None def validate_video_file(video_path: Optional[str]) -> Tuple[bool, str]: if video_path is None: return False, "No video file provided" if not os.path.exists(video_path): return False, f"Video file not found: {video_path}" try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return False, "Cannot open video file. Unsupported format or corrupted file." frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() if frame_count == 0: return False, "Video file appears to be empty" if fps <= 0: return False, "Invalid frame rate detected" return True, f"Valid video: {frame_count} frames, {fps:.2f} fps" except Exception as e: return False, f"Error validating video: {str(e)}" def process_video(video_path: Optional[str], confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: try: if video_path is None: return None, "", "Error: No video file provided" if not QUEUE_MONITOR_AVAILABLE: return None, "", "Error: QueueMonitor module not available. Please check installation." is_valid, validation_msg = validate_video_file(video_path) if not is_valid: return None, "", f"Validation Error: {validation_msg}" monitor_instance = initialize_monitor(confidence) if monitor_instance is None: return None, "", "Error: Failed to initialize QueueMonitor" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "", "Error: Cannot open video file" frames_processed = [] all_stats = [] frame_idx = 0 try: while frame_idx < max_frames: ret, frame = cap.read() if not ret: break try: annotated, stats = monitor_instance.process_frame(frame) frames_processed.append(cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)) all_stats.append(stats) frame_idx += 1 except Exception as e: logger.warning(f"Error processing frame {frame_idx}: {e}") continue cap.release() if len(frames_processed) == 0: return None, "", "Error: No frames were successfully processed" summary_stats = {} if all_stats: for zone_idx in range(len(all_stats[0])): zone_data = all_stats[0][zone_idx] summary_stats[f"zone_{zone_idx}"] = { "current_count": zone_data.get("count", 0), "avg_time_seconds": zone_data.get("avg_time_seconds", 0.0), "max_time_seconds": zone_data.get("max_time_seconds", 0.0), "total_visits": zone_data.get("total_visits", 0) } stats_json = json.dumps(summary_stats, indent=2) return frames_processed[0], stats_json, f"Successfully processed {len(frames_processed)} frames" except Exception as e: cap.release() logger.error(f"Error during video processing: {e}") return None, "", f"Processing Error: {str(e)}" except Exception as e: error_msg = f"Unexpected error: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return None, "", error_msg def process_image(image: Optional[np.ndarray], confidence: float = 0.3) -> Tuple[Optional[np.ndarray], str]: try: if image is None: return None, "Error: No image provided" if not isinstance(image, np.ndarray): return None, "Error: Invalid image format" if not QUEUE_MONITOR_AVAILABLE: return None, "Error: QueueMonitor module not available. Please check installation." monitor_instance = initialize_monitor(confidence) if monitor_instance is None: return None, "Error: Failed to initialize QueueMonitor" try: annotated, stats = monitor_instance.process_frame(image) result_image = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) stats_json = json.dumps(stats, indent=2) return result_image, stats_json except Exception as e: logger.error(f"Error processing image: {e}") return None, f"Processing Error: {str(e)}" except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(error_msg) return None, error_msg def analyze_logs(log_json: str) -> str: try: if not log_json or log_json.strip() == "": return "Error: No log data provided" if not LLM_ANALYZER_AVAILABLE: return "Error: LogAnalyzer module not available. Please check installation." try: log_data = json.loads(log_json) except json.JSONDecodeError as e: return f"Error: Invalid JSON format - {str(e)}" if not isinstance(log_data, dict): return "Error: Log data must be a JSON object" analyzer_instance = initialize_analyzer() if analyzer_instance is None: return "Error: LLM analyzer failed to initialize. Please check model availability." try: analysis = analyzer_instance.analyze_logs(log_data) return analysis except Exception as e: logger.error(f"Error during log analysis: {e}") return f"Analysis Error: {str(e)}" except Exception as e: error_msg = f"Unexpected error: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return error_msg def get_sample_log() -> str: sample_log = { "date": "2026-01-24", "branch": "SBI Jabalpur", "avg_wait_time_sec": 420, "max_wait_time_sec": 980, "customers_served": 134, "counter_1_avg_service": 180, "counter_2_avg_service": 310, "peak_hour": "12:00-13:00", "queue_overflow_events": 5 } return json.dumps(sample_log, indent=2) def process_youtube_url(youtube_url: str, confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: try: if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: return None, "", "Error: YouTube download not available. Install pytube: pip install pytube" if not youtube_url or not youtube_url.strip(): return None, "", "Error: No YouTube URL provided" if not is_valid_youtube_url(youtube_url): return None, "", "Error: Invalid YouTube URL format" logger.info(f"Downloading YouTube video: {youtube_url}") success, message, video_path = download_youtube_video(youtube_url) if not success or video_path is None: return None, "", f"YouTube Download Error: {message}" try: result = process_video(video_path, confidence, max_frames) if os.path.exists(video_path): try: os.remove(video_path) except Exception as e: logger.warning(f"Could not delete temporary file {video_path}: {e}") return result except Exception as e: if os.path.exists(video_path): try: os.remove(video_path) except: pass raise e except Exception as e: error_msg = f"Unexpected error processing YouTube video: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return None, "", error_msg def stream_youtube_realtime(youtube_url: str, confidence: float = 0.3) -> Tuple[Optional[np.ndarray], str]: try: if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: return None, "Error: YouTube streaming not available. Install pytube: pip install pytube" if not youtube_url or not youtube_url.strip(): return None, "Error: No YouTube URL provided" if not is_valid_youtube_url(youtube_url): return None, "Error: Invalid YouTube URL format" if not QUEUE_MONITOR_AVAILABLE: return None, "Error: QueueMonitor module not available" monitor_instance = initialize_monitor(confidence) if monitor_instance is None: return None, "Error: Failed to initialize QueueMonitor" success, message, video_path = download_youtube_video(youtube_url) if not success or video_path is None: return None, f"YouTube Download Error: {message}" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): if os.path.exists(video_path): os.remove(video_path) return None, "Error: Cannot open downloaded video" try: ret, frame = cap.read() if not ret: return None, "Error: Could not read frame from video" annotated, stats = monitor_instance.process_frame(frame) result_image = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) stats_json = json.dumps(stats, indent=2) return result_image, stats_json finally: cap.release() if os.path.exists(video_path): try: os.remove(video_path) except Exception as e: logger.warning(f"Could not delete temporary file: {e}") except Exception as e: error_msg = f"Streaming error: {str(e)}" logger.error(error_msg) return None, error_msg def download_example_video() -> Tuple[str, str]: try: example_info = { "status": "Example video available", "url": EXAMPLE_VIDEO_URL, "note": "Click 'Preload Example Video' to download and cache", "supported_formats": ["mp4", "avi", "mov"], "example_url": EXAMPLE_VIDEO_URL } return json.dumps(example_info, indent=2), "Example information retrieved" except Exception as e: error_msg = f"Error getting example info: {str(e)}" logger.error(error_msg) return "", error_msg def preload_example_video() -> Tuple[str, str]: global EXAMPLE_VIDEO_CACHED, EXAMPLE_VIDEO_PATH try: if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: return json.dumps({"error": "YouTube download not available"}, indent=2), "Error: YouTube download not available. Install pytube: pip install pytube" if EXAMPLE_VIDEO_CACHED and EXAMPLE_VIDEO_PATH and os.path.exists(EXAMPLE_VIDEO_PATH): file_size = os.path.getsize(EXAMPLE_VIDEO_PATH) / (1024 * 1024) info = { "status": "cached", "url": EXAMPLE_VIDEO_URL, "file_path": EXAMPLE_VIDEO_PATH, "file_size_mb": round(file_size, 2), "message": "Example video already cached and ready to process" } return json.dumps(info, indent=2), f"Example video already cached ({file_size:.2f} MB). Ready to process!" logger.info(f"Preloading example video: {EXAMPLE_VIDEO_URL}") success, message, video_path = download_youtube_video(EXAMPLE_VIDEO_URL) if not success or video_path is None: error_info = { "status": "error", "url": EXAMPLE_VIDEO_URL, "error": message } return json.dumps(error_info, indent=2), f"Preload Error: {message}" EXAMPLE_VIDEO_CACHED = True EXAMPLE_VIDEO_PATH = video_path file_size = os.path.getsize(video_path) / (1024 * 1024) info = { "status": "success", "url": EXAMPLE_VIDEO_URL, "file_path": video_path, "file_size_mb": round(file_size, 2), "message": "Example video successfully preloaded" } return json.dumps(info, indent=2), f"Successfully preloaded example video ({file_size:.2f} MB). Ready to process!" except Exception as e: error_msg = f"Error preloading example video: {str(e)}" logger.error(error_msg) error_info = { "status": "error", "url": EXAMPLE_VIDEO_URL, "error": error_msg } return json.dumps(error_info, indent=2), error_msg def process_example_video(confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: global EXAMPLE_VIDEO_PATH try: if not EXAMPLE_VIDEO_CACHED or EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): preload_info, preload_msg = preload_example_video() if "error" in preload_info.lower() or "not available" in preload_msg.lower(): return None, "", f"Error: {preload_msg}. Please preload the example video first." try: preload_data = json.loads(preload_info) EXAMPLE_VIDEO_PATH = preload_data.get("file_path") except: pass if EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): return None, "", "Error: Example video not found. Please preload it first." return process_video(EXAMPLE_VIDEO_PATH, confidence, max_frames) except Exception as e: error_msg = f"Error processing example video: {str(e)}" logger.error(error_msg) return None, "", error_msg def preload_example_video() -> Tuple[str, str]: global EXAMPLE_VIDEO_CACHED, EXAMPLE_VIDEO_PATH try: if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: error_info = { "status": "error", "url": EXAMPLE_VIDEO_URL, "error": "YouTube download not available. Install pytube: pip install pytube" } return json.dumps(error_info, indent=2), "Error: YouTube download not available. Install pytube: pip install pytube" if EXAMPLE_VIDEO_CACHED and EXAMPLE_VIDEO_PATH and os.path.exists(EXAMPLE_VIDEO_PATH): file_size = os.path.getsize(EXAMPLE_VIDEO_PATH) / (1024 * 1024) info = { "status": "cached", "url": EXAMPLE_VIDEO_URL, "file_path": EXAMPLE_VIDEO_PATH, "file_size_mb": round(file_size, 2), "message": "Example video already cached and ready to process" } return json.dumps(info, indent=2), f"Example video already cached ({file_size:.2f} MB). Ready to process!" logger.info(f"Preloading example video: {EXAMPLE_VIDEO_URL}") success, message, video_path = download_youtube_video(EXAMPLE_VIDEO_URL) if not success or video_path is None: error_info = { "status": "error", "url": EXAMPLE_VIDEO_URL, "error": message } return json.dumps(error_info, indent=2), f"Preload Error: {message}" EXAMPLE_VIDEO_CACHED = True EXAMPLE_VIDEO_PATH = video_path file_size = os.path.getsize(video_path) / (1024 * 1024) info = { "status": "success", "url": EXAMPLE_VIDEO_URL, "file_path": video_path, "file_size_mb": round(file_size, 2), "message": "Example video successfully preloaded" } return json.dumps(info, indent=2), f"Successfully preloaded example video ({file_size:.2f} MB). Ready to process!" except Exception as e: error_msg = f"Error preloading example video: {str(e)}" logger.error(error_msg) error_info = { "status": "error", "url": EXAMPLE_VIDEO_URL, "error": error_msg } return json.dumps(error_info, indent=2), error_msg def process_example_video(confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: global EXAMPLE_VIDEO_PATH try: if not EXAMPLE_VIDEO_CACHED or EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): preload_info, preload_msg = preload_example_video() try: preload_data = json.loads(preload_info) if preload_data.get("status") == "error" or "error" in preload_msg.lower(): return None, "", f"Error: {preload_msg}. Please preload the example video first." EXAMPLE_VIDEO_PATH = preload_data.get("file_path") except: if "error" in preload_msg.lower() or "not available" in preload_msg.lower(): return None, "", f"Error: {preload_msg}. Please preload the example video first." if EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): return None, "", "Error: Example video not found. Please preload it first." return process_video(EXAMPLE_VIDEO_PATH, confidence, max_frames) except Exception as e: error_msg = f"Error processing example video: {str(e)}" logger.error(error_msg) return None, "", error_msg with gr.Blocks(title="AI Queue Management - Time in Zone Tracking", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # đŸŽ¯ AI Queue Management System ## Real-time Zone Tracking with Time-in-Zone Analytics This application combines computer vision (YOLOv8 + Supervision) for real-time tracking and LLM analysis for business insights. """) with gr.Tab("📹 Video Processing"): gr.Markdown("### Upload and process CCTV footage with zone-based tracking") with gr.Row(): with gr.Column(): video_input = gr.Video(label="Upload Video", sources=["upload"]) confidence_slider = gr.Slider( minimum=0.1, maximum=1.0, value=0.3, step=0.05, label="Detection Confidence Threshold" ) max_frames_slider = gr.Slider( minimum=10, maximum=200, value=100, step=10, label="Max Frames to Process" ) process_video_btn = gr.Button("Process Video", variant="primary") with gr.Column(): video_output = gr.Image(label="Processed Frame with Zone Tracking") video_status = gr.Textbox(label="Status", interactive=False) video_stats = gr.Code( label="Zone Statistics (JSON)", language="json", lines=10 ) process_video_btn.click( fn=process_video, inputs=[video_input, confidence_slider, max_frames_slider], outputs=[video_output, video_stats, video_status] ) with gr.Tab("đŸŽĨ YouTube Processing"): gr.Markdown("### Process YouTube videos with real-time detection (Optional)") if not YT_DOWNLOADER_AVAILABLE: gr.Markdown("âš ī¸ **YouTube download not available**. Install pytube: `pip install pytube`") with gr.Row(): with gr.Column(): youtube_url_input = gr.Textbox( label="YouTube URL", placeholder="https://www.youtube.com/watch?v=...", lines=1 ) yt_confidence = gr.Slider( minimum=0.1, maximum=1.0, value=0.3, step=0.05, label="Detection Confidence Threshold" ) yt_max_frames = gr.Slider( minimum=10, maximum=200, value=100, step=10, label="Max Frames to Process" ) with gr.Row(): process_yt_btn = gr.Button("Download & Process", variant="primary") stream_yt_btn = gr.Button("Real-time Stream", variant="secondary") with gr.Column(): yt_output = gr.Image(label="Processed Frame") yt_status = gr.Textbox(label="Status", interactive=False) yt_stats = gr.Code( label="Zone Statistics (JSON)", language="json", lines=10 ) process_yt_btn.click( fn=process_youtube_url, inputs=[youtube_url_input, yt_confidence, yt_max_frames], outputs=[yt_output, yt_stats, yt_status] ) stream_yt_btn.click( fn=stream_youtube_realtime, inputs=[youtube_url_input, yt_confidence], outputs=[yt_output, yt_stats] ) with gr.Accordion("đŸ“Ĩ Example Video", open=True): gr.Markdown(f""" **Example Video URL:** `{EXAMPLE_VIDEO_URL}` Click "Preload Example" to download and cache the example video, then use "Process Example" to analyze it. """) with gr.Row(): preload_example_btn = gr.Button("Preload Example Video", variant="secondary") process_example_btn = gr.Button("Process Example Video", variant="primary") example_info = gr.Code( label="Example Information", language="json", lines=3, value=json.dumps({ "example_url": EXAMPLE_VIDEO_URL, "status": "Not preloaded yet" }, indent=2) ) preload_example_btn.click( fn=preload_example_video, outputs=[example_info, yt_status] ) process_example_btn.click( fn=process_example_video, inputs=[yt_confidence, yt_max_frames], outputs=[yt_output, yt_stats, yt_status] ) with gr.Tab("đŸ–ŧī¸ Image Processing"): gr.Markdown("### Process single images with zone detection") with gr.Row(): with gr.Column(): image_input = gr.Image(label="Upload Image", type="numpy") image_confidence = gr.Slider( minimum=0.1, maximum=1.0, value=0.3, step=0.05, label="Detection Confidence Threshold" ) process_image_btn = gr.Button("Process Image", variant="primary") with gr.Column(): image_output = gr.Image(label="Processed Image with Zone Tracking") image_stats = gr.Code( label="Zone Statistics (JSON)", language="json", lines=10 ) process_image_btn.click( fn=process_image, inputs=[image_input, image_confidence], outputs=[image_output, image_stats] ) with gr.Tab("🤖 AI Log Analysis"): gr.Markdown("### Analyze queue performance logs using AI") with gr.Row(): with gr.Column(): log_input = gr.Textbox( label="Queue Log Data (JSON)", value=get_sample_log(), lines=15, placeholder="Enter your queue log data in JSON format..." ) analyze_btn = gr.Button("Generate AI Insights", variant="primary") with gr.Column(): analysis_output = gr.Markdown(label="AI Recommendations & Insights") analyze_btn.click( fn=analyze_logs, inputs=log_input, outputs=analysis_output ) with gr.Tab("â„šī¸ About & Use Cases"): gr.Markdown(""" ## 📋 System Overview This AI-powered queue management system provides: - **Real-time Object Tracking**: YOLOv8 detection with ByteTrack tracking - **Time-in-Zone Analytics**: Precise measurement of dwell time in defined zones - **AI-Powered Insights**: LLM analysis of performance logs ## đŸŽ¯ Use Cases - **Retail Analytics**: Track customer movement and dwell time in product sections - **Bank Branch Efficiency**: Monitor counter service times and optimize staffing - **Airport Security**: Predict wait times and manage security lane staffing - **Hospital ER**: Ensure patients are seen within target wait times - **Smart Parking**: Monitor parking bay occupancy and turnover rates - **Safety Monitoring**: Alert security if someone enters or lingers in restricted areas ## 🔧 Technical Details - **Detection Model**: YOLOv8 (Ultralytics) - **Tracking**: ByteTrack (Supervision) - **Time Tracking**: Supervision TimeInZone - **LLM**: Qwen-2.5-1.5B-Instruct ## âš ī¸ Error Handling The application includes comprehensive error handling for: - Invalid video/image formats - Model loading failures - Zone configuration errors - JSON parsing errors - Processing exceptions """) if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) demo.launch( server_name="0.0.0.0", server_port=port, share=False )