Spaces:
Running
Running
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import json | |
| import os | |
| import logging | |
| import traceback | |
| import tempfile | |
| from typing import Optional, Tuple | |
| QUEUE_MONITOR_AVAILABLE = False | |
| LLM_ANALYZER_AVAILABLE = False | |
| UTILS_AVAILABLE = False | |
| try: | |
| from queue_monitor import QueueMonitor | |
| QUEUE_MONITOR_AVAILABLE = True | |
| except ImportError as e: | |
| logging.warning(f"QueueMonitor import error: {e}. Video/image processing will be disabled.") | |
| try: | |
| from llm_analyzer import LogAnalyzer | |
| LLM_ANALYZER_AVAILABLE = True | |
| except ImportError as e: | |
| logging.warning(f"LogAnalyzer import error: {e}. LLM analysis will be disabled.") | |
| try: | |
| from utils import ( | |
| is_valid_youtube_url, | |
| download_youtube_video, | |
| get_youtube_info, | |
| YT_DOWNLOADER_AVAILABLE | |
| ) | |
| UTILS_AVAILABLE = True | |
| except ImportError as e: | |
| logging.warning(f"Utils import error: {e}. YouTube download will be disabled.") | |
| YT_DOWNLOADER_AVAILABLE = False | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| monitor = None | |
| analyzer = None | |
| DEFAULT_ZONE = np.array([[100, 100], [1100, 100], [1100, 600], [100, 600]]) | |
| EXAMPLE_VIDEO_URL = "https://youtu.be/5rkwqp6nnr4?si=itvwJ-oSR0S8xSZQ" | |
| EXAMPLE_VIDEO_CACHED = False | |
| EXAMPLE_VIDEO_PATH: Optional[str] = None | |
| def initialize_monitor(confidence: float = 0.3): | |
| global monitor | |
| if not QUEUE_MONITOR_AVAILABLE: | |
| logger.error("QueueMonitor not available. Please check imports.") | |
| return None | |
| try: | |
| if monitor is None: | |
| logger.info("Initializing QueueMonitor...") | |
| monitor = QueueMonitor(confidence=confidence, fps=30.0) | |
| monitor.setup_zones([DEFAULT_ZONE]) | |
| logger.info("QueueMonitor initialized successfully") | |
| return monitor | |
| except Exception as e: | |
| logger.error(f"Failed to initialize monitor: {e}") | |
| return None | |
| def initialize_analyzer(): | |
| global analyzer | |
| if not LLM_ANALYZER_AVAILABLE: | |
| logger.error("LogAnalyzer not available. Please check imports.") | |
| return None | |
| try: | |
| if analyzer is None: | |
| logger.info("Initializing LogAnalyzer...") | |
| hf_token = os.getenv("HF_TOKEN") | |
| analyzer = LogAnalyzer(hf_token=hf_token) | |
| logger.info("LogAnalyzer initialized successfully") | |
| return analyzer | |
| except Exception as e: | |
| logger.error(f"Failed to initialize analyzer: {e}") | |
| return None | |
| def validate_video_file(video_path: Optional[str]) -> Tuple[bool, str]: | |
| if video_path is None: | |
| return False, "No video file provided" | |
| if not os.path.exists(video_path): | |
| return False, f"Video file not found: {video_path}" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return False, "Cannot open video file. Unsupported format or corrupted file." | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| cap.release() | |
| if frame_count == 0: | |
| return False, "Video file appears to be empty" | |
| if fps <= 0: | |
| return False, "Invalid frame rate detected" | |
| return True, f"Valid video: {frame_count} frames, {fps:.2f} fps" | |
| except Exception as e: | |
| return False, f"Error validating video: {str(e)}" | |
| def process_video(video_path: Optional[str], confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: | |
| try: | |
| if video_path is None: | |
| return None, "", "Error: No video file provided" | |
| if not QUEUE_MONITOR_AVAILABLE: | |
| return None, "", "Error: QueueMonitor module not available. Please check installation." | |
| is_valid, validation_msg = validate_video_file(video_path) | |
| if not is_valid: | |
| return None, "", f"Validation Error: {validation_msg}" | |
| monitor_instance = initialize_monitor(confidence) | |
| if monitor_instance is None: | |
| return None, "", "Error: Failed to initialize QueueMonitor" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, "", "Error: Cannot open video file" | |
| frames_processed = [] | |
| all_stats = [] | |
| frame_idx = 0 | |
| try: | |
| while frame_idx < max_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| try: | |
| annotated, stats = monitor_instance.process_frame(frame) | |
| frames_processed.append(cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)) | |
| all_stats.append(stats) | |
| frame_idx += 1 | |
| except Exception as e: | |
| logger.warning(f"Error processing frame {frame_idx}: {e}") | |
| continue | |
| cap.release() | |
| if len(frames_processed) == 0: | |
| return None, "", "Error: No frames were successfully processed" | |
| summary_stats = {} | |
| if all_stats: | |
| for zone_idx in range(len(all_stats[0])): | |
| zone_data = all_stats[0][zone_idx] | |
| summary_stats[f"zone_{zone_idx}"] = { | |
| "current_count": zone_data.get("count", 0), | |
| "avg_time_seconds": zone_data.get("avg_time_seconds", 0.0), | |
| "max_time_seconds": zone_data.get("max_time_seconds", 0.0), | |
| "total_visits": zone_data.get("total_visits", 0) | |
| } | |
| stats_json = json.dumps(summary_stats, indent=2) | |
| return frames_processed[0], stats_json, f"Successfully processed {len(frames_processed)} frames" | |
| except Exception as e: | |
| cap.release() | |
| logger.error(f"Error during video processing: {e}") | |
| return None, "", f"Processing Error: {str(e)}" | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return None, "", error_msg | |
| def process_image(image: Optional[np.ndarray], confidence: float = 0.3) -> Tuple[Optional[np.ndarray], str]: | |
| try: | |
| if image is None: | |
| return None, "Error: No image provided" | |
| if not isinstance(image, np.ndarray): | |
| return None, "Error: Invalid image format" | |
| if not QUEUE_MONITOR_AVAILABLE: | |
| return None, "Error: QueueMonitor module not available. Please check installation." | |
| monitor_instance = initialize_monitor(confidence) | |
| if monitor_instance is None: | |
| return None, "Error: Failed to initialize QueueMonitor" | |
| try: | |
| annotated, stats = monitor_instance.process_frame(image) | |
| result_image = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) | |
| stats_json = json.dumps(stats, indent=2) | |
| return result_image, stats_json | |
| except Exception as e: | |
| logger.error(f"Error processing image: {e}") | |
| return None, f"Processing Error: {str(e)}" | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| logger.error(error_msg) | |
| return None, error_msg | |
| def analyze_logs(log_json: str) -> str: | |
| try: | |
| if not log_json or log_json.strip() == "": | |
| return "Error: No log data provided" | |
| if not LLM_ANALYZER_AVAILABLE: | |
| return "Error: LogAnalyzer module not available. Please check installation." | |
| try: | |
| log_data = json.loads(log_json) | |
| except json.JSONDecodeError as e: | |
| return f"Error: Invalid JSON format - {str(e)}" | |
| if not isinstance(log_data, dict): | |
| return "Error: Log data must be a JSON object" | |
| analyzer_instance = initialize_analyzer() | |
| if analyzer_instance is None: | |
| return "Error: LLM analyzer failed to initialize. Please check model availability." | |
| try: | |
| analysis = analyzer_instance.analyze_logs(log_data) | |
| return analysis | |
| except Exception as e: | |
| logger.error(f"Error during log analysis: {e}") | |
| return f"Analysis Error: {str(e)}" | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return error_msg | |
| def get_sample_log() -> str: | |
| sample_log = { | |
| "date": "2026-01-24", | |
| "branch": "SBI Jabalpur", | |
| "avg_wait_time_sec": 420, | |
| "max_wait_time_sec": 980, | |
| "customers_served": 134, | |
| "counter_1_avg_service": 180, | |
| "counter_2_avg_service": 310, | |
| "peak_hour": "12:00-13:00", | |
| "queue_overflow_events": 5 | |
| } | |
| return json.dumps(sample_log, indent=2) | |
| def process_youtube_url(youtube_url: str, confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: | |
| try: | |
| if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: | |
| return None, "", "Error: YouTube download not available. Install pytube: pip install pytube" | |
| if not youtube_url or not youtube_url.strip(): | |
| return None, "", "Error: No YouTube URL provided" | |
| if not is_valid_youtube_url(youtube_url): | |
| return None, "", "Error: Invalid YouTube URL format" | |
| logger.info(f"Downloading YouTube video: {youtube_url}") | |
| success, message, video_path = download_youtube_video(youtube_url) | |
| if not success or video_path is None: | |
| return None, "", f"YouTube Download Error: {message}" | |
| try: | |
| result = process_video(video_path, confidence, max_frames) | |
| if os.path.exists(video_path): | |
| try: | |
| os.remove(video_path) | |
| except Exception as e: | |
| logger.warning(f"Could not delete temporary file {video_path}: {e}") | |
| return result | |
| except Exception as e: | |
| if os.path.exists(video_path): | |
| try: | |
| os.remove(video_path) | |
| except: | |
| pass | |
| raise e | |
| except Exception as e: | |
| error_msg = f"Unexpected error processing YouTube video: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return None, "", error_msg | |
| def stream_youtube_realtime(youtube_url: str, confidence: float = 0.3) -> Tuple[Optional[np.ndarray], str]: | |
| try: | |
| if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: | |
| return None, "Error: YouTube streaming not available. Install pytube: pip install pytube" | |
| if not youtube_url or not youtube_url.strip(): | |
| return None, "Error: No YouTube URL provided" | |
| if not is_valid_youtube_url(youtube_url): | |
| return None, "Error: Invalid YouTube URL format" | |
| if not QUEUE_MONITOR_AVAILABLE: | |
| return None, "Error: QueueMonitor module not available" | |
| monitor_instance = initialize_monitor(confidence) | |
| if monitor_instance is None: | |
| return None, "Error: Failed to initialize QueueMonitor" | |
| success, message, video_path = download_youtube_video(youtube_url) | |
| if not success or video_path is None: | |
| return None, f"YouTube Download Error: {message}" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| return None, "Error: Cannot open downloaded video" | |
| try: | |
| ret, frame = cap.read() | |
| if not ret: | |
| return None, "Error: Could not read frame from video" | |
| annotated, stats = monitor_instance.process_frame(frame) | |
| result_image = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) | |
| stats_json = json.dumps(stats, indent=2) | |
| return result_image, stats_json | |
| finally: | |
| cap.release() | |
| if os.path.exists(video_path): | |
| try: | |
| os.remove(video_path) | |
| except Exception as e: | |
| logger.warning(f"Could not delete temporary file: {e}") | |
| except Exception as e: | |
| error_msg = f"Streaming error: {str(e)}" | |
| logger.error(error_msg) | |
| return None, error_msg | |
| def download_example_video() -> Tuple[str, str]: | |
| try: | |
| example_info = { | |
| "status": "Example video available", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "note": "Click 'Preload Example Video' to download and cache", | |
| "supported_formats": ["mp4", "avi", "mov"], | |
| "example_url": EXAMPLE_VIDEO_URL | |
| } | |
| return json.dumps(example_info, indent=2), "Example information retrieved" | |
| except Exception as e: | |
| error_msg = f"Error getting example info: {str(e)}" | |
| logger.error(error_msg) | |
| return "", error_msg | |
| def preload_example_video() -> Tuple[str, str]: | |
| global EXAMPLE_VIDEO_CACHED, EXAMPLE_VIDEO_PATH | |
| try: | |
| if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: | |
| return json.dumps({"error": "YouTube download not available"}, indent=2), "Error: YouTube download not available. Install pytube: pip install pytube" | |
| if EXAMPLE_VIDEO_CACHED and EXAMPLE_VIDEO_PATH and os.path.exists(EXAMPLE_VIDEO_PATH): | |
| file_size = os.path.getsize(EXAMPLE_VIDEO_PATH) / (1024 * 1024) | |
| info = { | |
| "status": "cached", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "file_path": EXAMPLE_VIDEO_PATH, | |
| "file_size_mb": round(file_size, 2), | |
| "message": "Example video already cached and ready to process" | |
| } | |
| return json.dumps(info, indent=2), f"Example video already cached ({file_size:.2f} MB). Ready to process!" | |
| logger.info(f"Preloading example video: {EXAMPLE_VIDEO_URL}") | |
| success, message, video_path = download_youtube_video(EXAMPLE_VIDEO_URL) | |
| if not success or video_path is None: | |
| error_info = { | |
| "status": "error", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "error": message | |
| } | |
| return json.dumps(error_info, indent=2), f"Preload Error: {message}" | |
| EXAMPLE_VIDEO_CACHED = True | |
| EXAMPLE_VIDEO_PATH = video_path | |
| file_size = os.path.getsize(video_path) / (1024 * 1024) | |
| info = { | |
| "status": "success", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "file_path": video_path, | |
| "file_size_mb": round(file_size, 2), | |
| "message": "Example video successfully preloaded" | |
| } | |
| return json.dumps(info, indent=2), f"Successfully preloaded example video ({file_size:.2f} MB). Ready to process!" | |
| except Exception as e: | |
| error_msg = f"Error preloading example video: {str(e)}" | |
| logger.error(error_msg) | |
| error_info = { | |
| "status": "error", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "error": error_msg | |
| } | |
| return json.dumps(error_info, indent=2), error_msg | |
| def process_example_video(confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: | |
| global EXAMPLE_VIDEO_PATH | |
| try: | |
| if not EXAMPLE_VIDEO_CACHED or EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): | |
| preload_info, preload_msg = preload_example_video() | |
| if "error" in preload_info.lower() or "not available" in preload_msg.lower(): | |
| return None, "", f"Error: {preload_msg}. Please preload the example video first." | |
| try: | |
| preload_data = json.loads(preload_info) | |
| EXAMPLE_VIDEO_PATH = preload_data.get("file_path") | |
| except: | |
| pass | |
| if EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): | |
| return None, "", "Error: Example video not found. Please preload it first." | |
| return process_video(EXAMPLE_VIDEO_PATH, confidence, max_frames) | |
| except Exception as e: | |
| error_msg = f"Error processing example video: {str(e)}" | |
| logger.error(error_msg) | |
| return None, "", error_msg | |
| def preload_example_video() -> Tuple[str, str]: | |
| global EXAMPLE_VIDEO_CACHED, EXAMPLE_VIDEO_PATH | |
| try: | |
| if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE: | |
| error_info = { | |
| "status": "error", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "error": "YouTube download not available. Install pytube: pip install pytube" | |
| } | |
| return json.dumps(error_info, indent=2), "Error: YouTube download not available. Install pytube: pip install pytube" | |
| if EXAMPLE_VIDEO_CACHED and EXAMPLE_VIDEO_PATH and os.path.exists(EXAMPLE_VIDEO_PATH): | |
| file_size = os.path.getsize(EXAMPLE_VIDEO_PATH) / (1024 * 1024) | |
| info = { | |
| "status": "cached", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "file_path": EXAMPLE_VIDEO_PATH, | |
| "file_size_mb": round(file_size, 2), | |
| "message": "Example video already cached and ready to process" | |
| } | |
| return json.dumps(info, indent=2), f"Example video already cached ({file_size:.2f} MB). Ready to process!" | |
| logger.info(f"Preloading example video: {EXAMPLE_VIDEO_URL}") | |
| success, message, video_path = download_youtube_video(EXAMPLE_VIDEO_URL) | |
| if not success or video_path is None: | |
| error_info = { | |
| "status": "error", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "error": message | |
| } | |
| return json.dumps(error_info, indent=2), f"Preload Error: {message}" | |
| EXAMPLE_VIDEO_CACHED = True | |
| EXAMPLE_VIDEO_PATH = video_path | |
| file_size = os.path.getsize(video_path) / (1024 * 1024) | |
| info = { | |
| "status": "success", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "file_path": video_path, | |
| "file_size_mb": round(file_size, 2), | |
| "message": "Example video successfully preloaded" | |
| } | |
| return json.dumps(info, indent=2), f"Successfully preloaded example video ({file_size:.2f} MB). Ready to process!" | |
| except Exception as e: | |
| error_msg = f"Error preloading example video: {str(e)}" | |
| logger.error(error_msg) | |
| error_info = { | |
| "status": "error", | |
| "url": EXAMPLE_VIDEO_URL, | |
| "error": error_msg | |
| } | |
| return json.dumps(error_info, indent=2), error_msg | |
| def process_example_video(confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]: | |
| global EXAMPLE_VIDEO_PATH | |
| try: | |
| if not EXAMPLE_VIDEO_CACHED or EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): | |
| preload_info, preload_msg = preload_example_video() | |
| try: | |
| preload_data = json.loads(preload_info) | |
| if preload_data.get("status") == "error" or "error" in preload_msg.lower(): | |
| return None, "", f"Error: {preload_msg}. Please preload the example video first." | |
| EXAMPLE_VIDEO_PATH = preload_data.get("file_path") | |
| except: | |
| if "error" in preload_msg.lower() or "not available" in preload_msg.lower(): | |
| return None, "", f"Error: {preload_msg}. Please preload the example video first." | |
| if EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH): | |
| return None, "", "Error: Example video not found. Please preload it first." | |
| return process_video(EXAMPLE_VIDEO_PATH, confidence, max_frames) | |
| except Exception as e: | |
| error_msg = f"Error processing example video: {str(e)}" | |
| logger.error(error_msg) | |
| return None, "", error_msg | |
| with gr.Blocks(title="AI Queue Management - Time in Zone Tracking", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🎯 AI Queue Management System | |
| ## Real-time Zone Tracking with Time-in-Zone Analytics | |
| This application combines computer vision (YOLOv8 + Supervision) for real-time tracking and LLM analysis for business insights. | |
| """) | |
| with gr.Tab("📹 Video Processing"): | |
| gr.Markdown("### Upload and process CCTV footage with zone-based tracking") | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video(label="Upload Video", sources=["upload"]) | |
| confidence_slider = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.05, | |
| label="Detection Confidence Threshold" | |
| ) | |
| max_frames_slider = gr.Slider( | |
| minimum=10, | |
| maximum=200, | |
| value=100, | |
| step=10, | |
| label="Max Frames to Process" | |
| ) | |
| process_video_btn = gr.Button("Process Video", variant="primary") | |
| with gr.Column(): | |
| video_output = gr.Image(label="Processed Frame with Zone Tracking") | |
| video_status = gr.Textbox(label="Status", interactive=False) | |
| video_stats = gr.Code( | |
| label="Zone Statistics (JSON)", | |
| language="json", | |
| lines=10 | |
| ) | |
| process_video_btn.click( | |
| fn=process_video, | |
| inputs=[video_input, confidence_slider, max_frames_slider], | |
| outputs=[video_output, video_stats, video_status] | |
| ) | |
| with gr.Tab("🎥 YouTube Processing"): | |
| gr.Markdown("### Process YouTube videos with real-time detection (Optional)") | |
| if not YT_DOWNLOADER_AVAILABLE: | |
| gr.Markdown("⚠️ **YouTube download not available**. Install pytube: `pip install pytube`") | |
| with gr.Row(): | |
| with gr.Column(): | |
| youtube_url_input = gr.Textbox( | |
| label="YouTube URL", | |
| placeholder="https://www.youtube.com/watch?v=...", | |
| lines=1 | |
| ) | |
| yt_confidence = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.05, | |
| label="Detection Confidence Threshold" | |
| ) | |
| yt_max_frames = gr.Slider( | |
| minimum=10, | |
| maximum=200, | |
| value=100, | |
| step=10, | |
| label="Max Frames to Process" | |
| ) | |
| with gr.Row(): | |
| process_yt_btn = gr.Button("Download & Process", variant="primary") | |
| stream_yt_btn = gr.Button("Real-time Stream", variant="secondary") | |
| with gr.Column(): | |
| yt_output = gr.Image(label="Processed Frame") | |
| yt_status = gr.Textbox(label="Status", interactive=False) | |
| yt_stats = gr.Code( | |
| label="Zone Statistics (JSON)", | |
| language="json", | |
| lines=10 | |
| ) | |
| process_yt_btn.click( | |
| fn=process_youtube_url, | |
| inputs=[youtube_url_input, yt_confidence, yt_max_frames], | |
| outputs=[yt_output, yt_stats, yt_status] | |
| ) | |
| stream_yt_btn.click( | |
| fn=stream_youtube_realtime, | |
| inputs=[youtube_url_input, yt_confidence], | |
| outputs=[yt_output, yt_stats] | |
| ) | |
| with gr.Accordion("📥 Example Video", open=True): | |
| gr.Markdown(f""" | |
| **Example Video URL:** `{EXAMPLE_VIDEO_URL}` | |
| Click "Preload Example" to download and cache the example video, then use "Process Example" to analyze it. | |
| """) | |
| with gr.Row(): | |
| preload_example_btn = gr.Button("Preload Example Video", variant="secondary") | |
| process_example_btn = gr.Button("Process Example Video", variant="primary") | |
| example_info = gr.Code( | |
| label="Example Information", | |
| language="json", | |
| lines=3, | |
| value=json.dumps({ | |
| "example_url": EXAMPLE_VIDEO_URL, | |
| "status": "Not preloaded yet" | |
| }, indent=2) | |
| ) | |
| preload_example_btn.click( | |
| fn=preload_example_video, | |
| outputs=[example_info, yt_status] | |
| ) | |
| process_example_btn.click( | |
| fn=process_example_video, | |
| inputs=[yt_confidence, yt_max_frames], | |
| outputs=[yt_output, yt_stats, yt_status] | |
| ) | |
| with gr.Tab("🖼️ Image Processing"): | |
| gr.Markdown("### Process single images with zone detection") | |
| with gr.Row(): | |
| with gr.Column(): | |
| image_input = gr.Image(label="Upload Image", type="numpy") | |
| image_confidence = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.05, | |
| label="Detection Confidence Threshold" | |
| ) | |
| process_image_btn = gr.Button("Process Image", variant="primary") | |
| with gr.Column(): | |
| image_output = gr.Image(label="Processed Image with Zone Tracking") | |
| image_stats = gr.Code( | |
| label="Zone Statistics (JSON)", | |
| language="json", | |
| lines=10 | |
| ) | |
| process_image_btn.click( | |
| fn=process_image, | |
| inputs=[image_input, image_confidence], | |
| outputs=[image_output, image_stats] | |
| ) | |
| with gr.Tab("🤖 AI Log Analysis"): | |
| gr.Markdown("### Analyze queue performance logs using AI") | |
| with gr.Row(): | |
| with gr.Column(): | |
| log_input = gr.Textbox( | |
| label="Queue Log Data (JSON)", | |
| value=get_sample_log(), | |
| lines=15, | |
| placeholder="Enter your queue log data in JSON format..." | |
| ) | |
| analyze_btn = gr.Button("Generate AI Insights", variant="primary") | |
| with gr.Column(): | |
| analysis_output = gr.Markdown(label="AI Recommendations & Insights") | |
| analyze_btn.click( | |
| fn=analyze_logs, | |
| inputs=log_input, | |
| outputs=analysis_output | |
| ) | |
| with gr.Tab("ℹ️ About & Use Cases"): | |
| gr.Markdown(""" | |
| ## 📋 System Overview | |
| This AI-powered queue management system provides: | |
| - **Real-time Object Tracking**: YOLOv8 detection with ByteTrack tracking | |
| - **Time-in-Zone Analytics**: Precise measurement of dwell time in defined zones | |
| - **AI-Powered Insights**: LLM analysis of performance logs | |
| ## 🎯 Use Cases | |
| - **Retail Analytics**: Track customer movement and dwell time in product sections | |
| - **Bank Branch Efficiency**: Monitor counter service times and optimize staffing | |
| - **Airport Security**: Predict wait times and manage security lane staffing | |
| - **Hospital ER**: Ensure patients are seen within target wait times | |
| - **Smart Parking**: Monitor parking bay occupancy and turnover rates | |
| - **Safety Monitoring**: Alert security if someone enters or lingers in restricted areas | |
| ## 🔧 Technical Details | |
| - **Detection Model**: YOLOv8 (Ultralytics) | |
| - **Tracking**: ByteTrack (Supervision) | |
| - **Time Tracking**: Supervision TimeInZone | |
| - **LLM**: Qwen-2.5-1.5B-Instruct | |
| ## ⚠️ Error Handling | |
| The application includes comprehensive error handling for: | |
| - Invalid video/image formats | |
| - Model loading failures | |
| - Zone configuration errors | |
| - JSON parsing errors | |
| - Processing exceptions | |
| """) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", 7860)) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False | |
| ) | |