Agent
Initial commit: AI Queue Management System
ad1bda5
import gradio as gr
import cv2
import numpy as np
import json
import os
import logging
import traceback
import tempfile
from typing import Optional, Tuple
QUEUE_MONITOR_AVAILABLE = False
LLM_ANALYZER_AVAILABLE = False
UTILS_AVAILABLE = False
try:
from queue_monitor import QueueMonitor
QUEUE_MONITOR_AVAILABLE = True
except ImportError as e:
logging.warning(f"QueueMonitor import error: {e}. Video/image processing will be disabled.")
try:
from llm_analyzer import LogAnalyzer
LLM_ANALYZER_AVAILABLE = True
except ImportError as e:
logging.warning(f"LogAnalyzer import error: {e}. LLM analysis will be disabled.")
try:
from utils import (
is_valid_youtube_url,
download_youtube_video,
get_youtube_info,
YT_DOWNLOADER_AVAILABLE
)
UTILS_AVAILABLE = True
except ImportError as e:
logging.warning(f"Utils import error: {e}. YouTube download will be disabled.")
YT_DOWNLOADER_AVAILABLE = False
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
monitor = None
analyzer = None
DEFAULT_ZONE = np.array([[100, 100], [1100, 100], [1100, 600], [100, 600]])
EXAMPLE_VIDEO_URL = "https://youtu.be/5rkwqp6nnr4?si=itvwJ-oSR0S8xSZQ"
EXAMPLE_VIDEO_CACHED = False
EXAMPLE_VIDEO_PATH: Optional[str] = None
def initialize_monitor(confidence: float = 0.3):
global monitor
if not QUEUE_MONITOR_AVAILABLE:
logger.error("QueueMonitor not available. Please check imports.")
return None
try:
if monitor is None:
logger.info("Initializing QueueMonitor...")
monitor = QueueMonitor(confidence=confidence, fps=30.0)
monitor.setup_zones([DEFAULT_ZONE])
logger.info("QueueMonitor initialized successfully")
return monitor
except Exception as e:
logger.error(f"Failed to initialize monitor: {e}")
return None
def initialize_analyzer():
global analyzer
if not LLM_ANALYZER_AVAILABLE:
logger.error("LogAnalyzer not available. Please check imports.")
return None
try:
if analyzer is None:
logger.info("Initializing LogAnalyzer...")
hf_token = os.getenv("HF_TOKEN")
analyzer = LogAnalyzer(hf_token=hf_token)
logger.info("LogAnalyzer initialized successfully")
return analyzer
except Exception as e:
logger.error(f"Failed to initialize analyzer: {e}")
return None
def validate_video_file(video_path: Optional[str]) -> Tuple[bool, str]:
if video_path is None:
return False, "No video file provided"
if not os.path.exists(video_path):
return False, f"Video file not found: {video_path}"
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return False, "Cannot open video file. Unsupported format or corrupted file."
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
if frame_count == 0:
return False, "Video file appears to be empty"
if fps <= 0:
return False, "Invalid frame rate detected"
return True, f"Valid video: {frame_count} frames, {fps:.2f} fps"
except Exception as e:
return False, f"Error validating video: {str(e)}"
def process_video(video_path: Optional[str], confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]:
try:
if video_path is None:
return None, "", "Error: No video file provided"
if not QUEUE_MONITOR_AVAILABLE:
return None, "", "Error: QueueMonitor module not available. Please check installation."
is_valid, validation_msg = validate_video_file(video_path)
if not is_valid:
return None, "", f"Validation Error: {validation_msg}"
monitor_instance = initialize_monitor(confidence)
if monitor_instance is None:
return None, "", "Error: Failed to initialize QueueMonitor"
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "", "Error: Cannot open video file"
frames_processed = []
all_stats = []
frame_idx = 0
try:
while frame_idx < max_frames:
ret, frame = cap.read()
if not ret:
break
try:
annotated, stats = monitor_instance.process_frame(frame)
frames_processed.append(cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB))
all_stats.append(stats)
frame_idx += 1
except Exception as e:
logger.warning(f"Error processing frame {frame_idx}: {e}")
continue
cap.release()
if len(frames_processed) == 0:
return None, "", "Error: No frames were successfully processed"
summary_stats = {}
if all_stats:
for zone_idx in range(len(all_stats[0])):
zone_data = all_stats[0][zone_idx]
summary_stats[f"zone_{zone_idx}"] = {
"current_count": zone_data.get("count", 0),
"avg_time_seconds": zone_data.get("avg_time_seconds", 0.0),
"max_time_seconds": zone_data.get("max_time_seconds", 0.0),
"total_visits": zone_data.get("total_visits", 0)
}
stats_json = json.dumps(summary_stats, indent=2)
return frames_processed[0], stats_json, f"Successfully processed {len(frames_processed)} frames"
except Exception as e:
cap.release()
logger.error(f"Error during video processing: {e}")
return None, "", f"Processing Error: {str(e)}"
except Exception as e:
error_msg = f"Unexpected error: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return None, "", error_msg
def process_image(image: Optional[np.ndarray], confidence: float = 0.3) -> Tuple[Optional[np.ndarray], str]:
try:
if image is None:
return None, "Error: No image provided"
if not isinstance(image, np.ndarray):
return None, "Error: Invalid image format"
if not QUEUE_MONITOR_AVAILABLE:
return None, "Error: QueueMonitor module not available. Please check installation."
monitor_instance = initialize_monitor(confidence)
if monitor_instance is None:
return None, "Error: Failed to initialize QueueMonitor"
try:
annotated, stats = monitor_instance.process_frame(image)
result_image = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)
stats_json = json.dumps(stats, indent=2)
return result_image, stats_json
except Exception as e:
logger.error(f"Error processing image: {e}")
return None, f"Processing Error: {str(e)}"
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
logger.error(error_msg)
return None, error_msg
def analyze_logs(log_json: str) -> str:
try:
if not log_json or log_json.strip() == "":
return "Error: No log data provided"
if not LLM_ANALYZER_AVAILABLE:
return "Error: LogAnalyzer module not available. Please check installation."
try:
log_data = json.loads(log_json)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON format - {str(e)}"
if not isinstance(log_data, dict):
return "Error: Log data must be a JSON object"
analyzer_instance = initialize_analyzer()
if analyzer_instance is None:
return "Error: LLM analyzer failed to initialize. Please check model availability."
try:
analysis = analyzer_instance.analyze_logs(log_data)
return analysis
except Exception as e:
logger.error(f"Error during log analysis: {e}")
return f"Analysis Error: {str(e)}"
except Exception as e:
error_msg = f"Unexpected error: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return error_msg
def get_sample_log() -> str:
sample_log = {
"date": "2026-01-24",
"branch": "SBI Jabalpur",
"avg_wait_time_sec": 420,
"max_wait_time_sec": 980,
"customers_served": 134,
"counter_1_avg_service": 180,
"counter_2_avg_service": 310,
"peak_hour": "12:00-13:00",
"queue_overflow_events": 5
}
return json.dumps(sample_log, indent=2)
def process_youtube_url(youtube_url: str, confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]:
try:
if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE:
return None, "", "Error: YouTube download not available. Install pytube: pip install pytube"
if not youtube_url or not youtube_url.strip():
return None, "", "Error: No YouTube URL provided"
if not is_valid_youtube_url(youtube_url):
return None, "", "Error: Invalid YouTube URL format"
logger.info(f"Downloading YouTube video: {youtube_url}")
success, message, video_path = download_youtube_video(youtube_url)
if not success or video_path is None:
return None, "", f"YouTube Download Error: {message}"
try:
result = process_video(video_path, confidence, max_frames)
if os.path.exists(video_path):
try:
os.remove(video_path)
except Exception as e:
logger.warning(f"Could not delete temporary file {video_path}: {e}")
return result
except Exception as e:
if os.path.exists(video_path):
try:
os.remove(video_path)
except:
pass
raise e
except Exception as e:
error_msg = f"Unexpected error processing YouTube video: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return None, "", error_msg
def stream_youtube_realtime(youtube_url: str, confidence: float = 0.3) -> Tuple[Optional[np.ndarray], str]:
try:
if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE:
return None, "Error: YouTube streaming not available. Install pytube: pip install pytube"
if not youtube_url or not youtube_url.strip():
return None, "Error: No YouTube URL provided"
if not is_valid_youtube_url(youtube_url):
return None, "Error: Invalid YouTube URL format"
if not QUEUE_MONITOR_AVAILABLE:
return None, "Error: QueueMonitor module not available"
monitor_instance = initialize_monitor(confidence)
if monitor_instance is None:
return None, "Error: Failed to initialize QueueMonitor"
success, message, video_path = download_youtube_video(youtube_url)
if not success or video_path is None:
return None, f"YouTube Download Error: {message}"
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
if os.path.exists(video_path):
os.remove(video_path)
return None, "Error: Cannot open downloaded video"
try:
ret, frame = cap.read()
if not ret:
return None, "Error: Could not read frame from video"
annotated, stats = monitor_instance.process_frame(frame)
result_image = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)
stats_json = json.dumps(stats, indent=2)
return result_image, stats_json
finally:
cap.release()
if os.path.exists(video_path):
try:
os.remove(video_path)
except Exception as e:
logger.warning(f"Could not delete temporary file: {e}")
except Exception as e:
error_msg = f"Streaming error: {str(e)}"
logger.error(error_msg)
return None, error_msg
def download_example_video() -> Tuple[str, str]:
try:
example_info = {
"status": "Example video available",
"url": EXAMPLE_VIDEO_URL,
"note": "Click 'Preload Example Video' to download and cache",
"supported_formats": ["mp4", "avi", "mov"],
"example_url": EXAMPLE_VIDEO_URL
}
return json.dumps(example_info, indent=2), "Example information retrieved"
except Exception as e:
error_msg = f"Error getting example info: {str(e)}"
logger.error(error_msg)
return "", error_msg
def preload_example_video() -> Tuple[str, str]:
global EXAMPLE_VIDEO_CACHED, EXAMPLE_VIDEO_PATH
try:
if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE:
return json.dumps({"error": "YouTube download not available"}, indent=2), "Error: YouTube download not available. Install pytube: pip install pytube"
if EXAMPLE_VIDEO_CACHED and EXAMPLE_VIDEO_PATH and os.path.exists(EXAMPLE_VIDEO_PATH):
file_size = os.path.getsize(EXAMPLE_VIDEO_PATH) / (1024 * 1024)
info = {
"status": "cached",
"url": EXAMPLE_VIDEO_URL,
"file_path": EXAMPLE_VIDEO_PATH,
"file_size_mb": round(file_size, 2),
"message": "Example video already cached and ready to process"
}
return json.dumps(info, indent=2), f"Example video already cached ({file_size:.2f} MB). Ready to process!"
logger.info(f"Preloading example video: {EXAMPLE_VIDEO_URL}")
success, message, video_path = download_youtube_video(EXAMPLE_VIDEO_URL)
if not success or video_path is None:
error_info = {
"status": "error",
"url": EXAMPLE_VIDEO_URL,
"error": message
}
return json.dumps(error_info, indent=2), f"Preload Error: {message}"
EXAMPLE_VIDEO_CACHED = True
EXAMPLE_VIDEO_PATH = video_path
file_size = os.path.getsize(video_path) / (1024 * 1024)
info = {
"status": "success",
"url": EXAMPLE_VIDEO_URL,
"file_path": video_path,
"file_size_mb": round(file_size, 2),
"message": "Example video successfully preloaded"
}
return json.dumps(info, indent=2), f"Successfully preloaded example video ({file_size:.2f} MB). Ready to process!"
except Exception as e:
error_msg = f"Error preloading example video: {str(e)}"
logger.error(error_msg)
error_info = {
"status": "error",
"url": EXAMPLE_VIDEO_URL,
"error": error_msg
}
return json.dumps(error_info, indent=2), error_msg
def process_example_video(confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]:
global EXAMPLE_VIDEO_PATH
try:
if not EXAMPLE_VIDEO_CACHED or EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH):
preload_info, preload_msg = preload_example_video()
if "error" in preload_info.lower() or "not available" in preload_msg.lower():
return None, "", f"Error: {preload_msg}. Please preload the example video first."
try:
preload_data = json.loads(preload_info)
EXAMPLE_VIDEO_PATH = preload_data.get("file_path")
except:
pass
if EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH):
return None, "", "Error: Example video not found. Please preload it first."
return process_video(EXAMPLE_VIDEO_PATH, confidence, max_frames)
except Exception as e:
error_msg = f"Error processing example video: {str(e)}"
logger.error(error_msg)
return None, "", error_msg
def preload_example_video() -> Tuple[str, str]:
global EXAMPLE_VIDEO_CACHED, EXAMPLE_VIDEO_PATH
try:
if not UTILS_AVAILABLE or not YT_DOWNLOADER_AVAILABLE:
error_info = {
"status": "error",
"url": EXAMPLE_VIDEO_URL,
"error": "YouTube download not available. Install pytube: pip install pytube"
}
return json.dumps(error_info, indent=2), "Error: YouTube download not available. Install pytube: pip install pytube"
if EXAMPLE_VIDEO_CACHED and EXAMPLE_VIDEO_PATH and os.path.exists(EXAMPLE_VIDEO_PATH):
file_size = os.path.getsize(EXAMPLE_VIDEO_PATH) / (1024 * 1024)
info = {
"status": "cached",
"url": EXAMPLE_VIDEO_URL,
"file_path": EXAMPLE_VIDEO_PATH,
"file_size_mb": round(file_size, 2),
"message": "Example video already cached and ready to process"
}
return json.dumps(info, indent=2), f"Example video already cached ({file_size:.2f} MB). Ready to process!"
logger.info(f"Preloading example video: {EXAMPLE_VIDEO_URL}")
success, message, video_path = download_youtube_video(EXAMPLE_VIDEO_URL)
if not success or video_path is None:
error_info = {
"status": "error",
"url": EXAMPLE_VIDEO_URL,
"error": message
}
return json.dumps(error_info, indent=2), f"Preload Error: {message}"
EXAMPLE_VIDEO_CACHED = True
EXAMPLE_VIDEO_PATH = video_path
file_size = os.path.getsize(video_path) / (1024 * 1024)
info = {
"status": "success",
"url": EXAMPLE_VIDEO_URL,
"file_path": video_path,
"file_size_mb": round(file_size, 2),
"message": "Example video successfully preloaded"
}
return json.dumps(info, indent=2), f"Successfully preloaded example video ({file_size:.2f} MB). Ready to process!"
except Exception as e:
error_msg = f"Error preloading example video: {str(e)}"
logger.error(error_msg)
error_info = {
"status": "error",
"url": EXAMPLE_VIDEO_URL,
"error": error_msg
}
return json.dumps(error_info, indent=2), error_msg
def process_example_video(confidence: float = 0.3, max_frames: int = 100) -> Tuple[Optional[np.ndarray], str, str]:
global EXAMPLE_VIDEO_PATH
try:
if not EXAMPLE_VIDEO_CACHED or EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH):
preload_info, preload_msg = preload_example_video()
try:
preload_data = json.loads(preload_info)
if preload_data.get("status") == "error" or "error" in preload_msg.lower():
return None, "", f"Error: {preload_msg}. Please preload the example video first."
EXAMPLE_VIDEO_PATH = preload_data.get("file_path")
except:
if "error" in preload_msg.lower() or "not available" in preload_msg.lower():
return None, "", f"Error: {preload_msg}. Please preload the example video first."
if EXAMPLE_VIDEO_PATH is None or not os.path.exists(EXAMPLE_VIDEO_PATH):
return None, "", "Error: Example video not found. Please preload it first."
return process_video(EXAMPLE_VIDEO_PATH, confidence, max_frames)
except Exception as e:
error_msg = f"Error processing example video: {str(e)}"
logger.error(error_msg)
return None, "", error_msg
with gr.Blocks(title="AI Queue Management - Time in Zone Tracking", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🎯 AI Queue Management System
## Real-time Zone Tracking with Time-in-Zone Analytics
This application combines computer vision (YOLOv8 + Supervision) for real-time tracking and LLM analysis for business insights.
""")
with gr.Tab("📹 Video Processing"):
gr.Markdown("### Upload and process CCTV footage with zone-based tracking")
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Upload Video", sources=["upload"])
confidence_slider = gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.3,
step=0.05,
label="Detection Confidence Threshold"
)
max_frames_slider = gr.Slider(
minimum=10,
maximum=200,
value=100,
step=10,
label="Max Frames to Process"
)
process_video_btn = gr.Button("Process Video", variant="primary")
with gr.Column():
video_output = gr.Image(label="Processed Frame with Zone Tracking")
video_status = gr.Textbox(label="Status", interactive=False)
video_stats = gr.Code(
label="Zone Statistics (JSON)",
language="json",
lines=10
)
process_video_btn.click(
fn=process_video,
inputs=[video_input, confidence_slider, max_frames_slider],
outputs=[video_output, video_stats, video_status]
)
with gr.Tab("🎥 YouTube Processing"):
gr.Markdown("### Process YouTube videos with real-time detection (Optional)")
if not YT_DOWNLOADER_AVAILABLE:
gr.Markdown("⚠️ **YouTube download not available**. Install pytube: `pip install pytube`")
with gr.Row():
with gr.Column():
youtube_url_input = gr.Textbox(
label="YouTube URL",
placeholder="https://www.youtube.com/watch?v=...",
lines=1
)
yt_confidence = gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.3,
step=0.05,
label="Detection Confidence Threshold"
)
yt_max_frames = gr.Slider(
minimum=10,
maximum=200,
value=100,
step=10,
label="Max Frames to Process"
)
with gr.Row():
process_yt_btn = gr.Button("Download & Process", variant="primary")
stream_yt_btn = gr.Button("Real-time Stream", variant="secondary")
with gr.Column():
yt_output = gr.Image(label="Processed Frame")
yt_status = gr.Textbox(label="Status", interactive=False)
yt_stats = gr.Code(
label="Zone Statistics (JSON)",
language="json",
lines=10
)
process_yt_btn.click(
fn=process_youtube_url,
inputs=[youtube_url_input, yt_confidence, yt_max_frames],
outputs=[yt_output, yt_stats, yt_status]
)
stream_yt_btn.click(
fn=stream_youtube_realtime,
inputs=[youtube_url_input, yt_confidence],
outputs=[yt_output, yt_stats]
)
with gr.Accordion("📥 Example Video", open=True):
gr.Markdown(f"""
**Example Video URL:** `{EXAMPLE_VIDEO_URL}`
Click "Preload Example" to download and cache the example video, then use "Process Example" to analyze it.
""")
with gr.Row():
preload_example_btn = gr.Button("Preload Example Video", variant="secondary")
process_example_btn = gr.Button("Process Example Video", variant="primary")
example_info = gr.Code(
label="Example Information",
language="json",
lines=3,
value=json.dumps({
"example_url": EXAMPLE_VIDEO_URL,
"status": "Not preloaded yet"
}, indent=2)
)
preload_example_btn.click(
fn=preload_example_video,
outputs=[example_info, yt_status]
)
process_example_btn.click(
fn=process_example_video,
inputs=[yt_confidence, yt_max_frames],
outputs=[yt_output, yt_stats, yt_status]
)
with gr.Tab("🖼️ Image Processing"):
gr.Markdown("### Process single images with zone detection")
with gr.Row():
with gr.Column():
image_input = gr.Image(label="Upload Image", type="numpy")
image_confidence = gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.3,
step=0.05,
label="Detection Confidence Threshold"
)
process_image_btn = gr.Button("Process Image", variant="primary")
with gr.Column():
image_output = gr.Image(label="Processed Image with Zone Tracking")
image_stats = gr.Code(
label="Zone Statistics (JSON)",
language="json",
lines=10
)
process_image_btn.click(
fn=process_image,
inputs=[image_input, image_confidence],
outputs=[image_output, image_stats]
)
with gr.Tab("🤖 AI Log Analysis"):
gr.Markdown("### Analyze queue performance logs using AI")
with gr.Row():
with gr.Column():
log_input = gr.Textbox(
label="Queue Log Data (JSON)",
value=get_sample_log(),
lines=15,
placeholder="Enter your queue log data in JSON format..."
)
analyze_btn = gr.Button("Generate AI Insights", variant="primary")
with gr.Column():
analysis_output = gr.Markdown(label="AI Recommendations & Insights")
analyze_btn.click(
fn=analyze_logs,
inputs=log_input,
outputs=analysis_output
)
with gr.Tab("ℹ️ About & Use Cases"):
gr.Markdown("""
## 📋 System Overview
This AI-powered queue management system provides:
- **Real-time Object Tracking**: YOLOv8 detection with ByteTrack tracking
- **Time-in-Zone Analytics**: Precise measurement of dwell time in defined zones
- **AI-Powered Insights**: LLM analysis of performance logs
## 🎯 Use Cases
- **Retail Analytics**: Track customer movement and dwell time in product sections
- **Bank Branch Efficiency**: Monitor counter service times and optimize staffing
- **Airport Security**: Predict wait times and manage security lane staffing
- **Hospital ER**: Ensure patients are seen within target wait times
- **Smart Parking**: Monitor parking bay occupancy and turnover rates
- **Safety Monitoring**: Alert security if someone enters or lingers in restricted areas
## 🔧 Technical Details
- **Detection Model**: YOLOv8 (Ultralytics)
- **Tracking**: ByteTrack (Supervision)
- **Time Tracking**: Supervision TimeInZone
- **LLM**: Qwen-2.5-1.5B-Instruct
## ⚠️ Error Handling
The application includes comprehensive error handling for:
- Invalid video/image formats
- Model loading failures
- Zone configuration errors
- JSON parsing errors
- Processing exceptions
""")
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
demo.launch(
server_name="0.0.0.0",
server_port=port,
share=False
)