""" Object Detection with model_api - Gradio Application Copyright (C) 2025 """ import gradio as gr import numpy as np from pathlib import Path from PIL import Image import time import os from typing import Tuple, List import asyncio import warnings import cv2 import uuid from model_api.models import Model from model_api.visualizer import Visualizer warnings.filterwarnings("ignore", message=".*Invalid file descriptor.*") if hasattr(asyncio, 'set_event_loop_policy'): try: asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) except Exception: pass # Global variables for model caching current_model = None current_model_name = None visualizer = Visualizer() # Global variable for video streaming control streaming = False def get_available_models(): """ Scan the models folder for .xml files and return list of model names. Returns: list: List of model names (without .xml extension) """ models_dir = Path("models") xml_files = list(models_dir.glob("*.xml")) model_names = [f.stem for f in xml_files] return sorted(model_names) def load_model(model_name: str, device: str = "CPU", confidence_threshold: float = 0.3): """ Load OpenVINO model using model_api. Args: model_name: Name of the model (without .xml extension) device: Inference device (CPU, GPU, etc.) confidence_threshold: Confidence threshold for predictions Returns: Model instance from model_api """ global current_model, current_model_name # Always reload model to apply new confidence threshold model_path = Path("models") / f"{model_name}.xml" if not model_path.exists(): raise FileNotFoundError(f"Model not found: {model_path}") print(f"Loading model: {model_name} with confidence threshold: {confidence_threshold}") # Set configuration based on model type configuration = {} if "YOLO" in model_name.upper(): # YOLO models use confidence_threshold and iou_threshold configuration["confidence_threshold"] = confidence_threshold configuration["iou_threshold"] = 0.5 # Standard IoU threshold for NMS else: # Other detection models typically use CONFIDENCE_THRESHOLD configuration["confidence_threshold"] = confidence_threshold model = Model.create_model(str(model_path), device=device, configuration=configuration) model.get_performance_metrics().reset() current_model = model current_model_name = model_name print(f"Model {model_name} loaded successfully") return model def run_inference( image: np.ndarray, model_name: str, confidence_threshold: float ) -> Tuple[Image.Image, str]: """ Perform inference and return visualized result with metrics. Args: image: Input image as numpy array model_name: Name of the model to use confidence_threshold: Confidence threshold for filtering predictions Returns: Tuple of (visualized_image, metrics_text) """ # Input validation if image is None: return None, "⚠️ Please upload an image first." if model_name is None or model_name == "No models available": return None, "⚠️ No model selected or available." try: model = load_model(model_name, confidence_threshold=confidence_threshold) # Run inference result = model(image) # Get performance metrics metrics = model.get_performance_metrics() inference_time = metrics.get_inference_time() preprocess_time = metrics.get_preprocess_time() postprocess_time = metrics.get_postprocess_time() fps = metrics.get_fps() # Format metrics text metrics_text = f"""πŸ”„ Preprocessing: {preprocess_time.mean()*1000:.2f} ms βš™οΈ Inference: {inference_time.mean()*1000:.2f} ms πŸ“Š Postprocessing: {postprocess_time.mean()*1000:.2f} ms ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms 🎯 FPS: {fps:.2f} πŸ“ˆ Total Frames: {inference_time.count} """ # Visualize results using model_api's visualizer print(f"Visualizing results with confidence threshold: {confidence_threshold}") visualized_image = visualizer.render(image, result) return visualized_image, metrics_text except Exception as e: error_msg = f"Error during inference: {str(e)}" return None, error_msg def run_video_inference( video_path: str, model_name: str, confidence_threshold: float ): """ Process video and return complete result with inference. Args: video_path: Path to input video file model_name: Name of the model to use confidence_threshold: Confidence threshold for filtering predictions Returns: Tuple of (output_video_path, metrics_text, start_btn_state, stop_btn_state) """ global streaming streaming = True if video_path is None: return None, "⚠️ Please upload a video first.", gr.update(interactive=True), gr.update(interactive=False) if model_name is None or model_name == "No models available": return None, "⚠️ No model selected or available.", gr.update(interactive=True), gr.update(interactive=False) try: # Load model model = load_model(model_name, confidence_threshold=confidence_threshold) # Open video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "⚠️ Error: Could not open video file.", gr.update(interactive=True), gr.update(interactive=False) # Get video properties video_codec = cv2.VideoWriter_fourcc(*"mp4v") fps = int(cap.get(cv2.CAP_PROP_FPS)) desired_fps = fps if fps > 0 else 30 # Read first frame to get dimensions ret, frame = cap.read() if not ret or frame is None: return None, "⚠️ Error: Could not read video frames.", gr.update(interactive=True), gr.update(interactive=False) # Process first frame to get output dimensions result = model(frame) result_image = visualizer.render(frame, result) height, width = result_image.shape[:2] # Create output video writer output_video_name = f"/tmp/output_{uuid.uuid4()}.mp4" output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height)) # Write first frame output_video.write(result_image) n_frames = 1 # Process remaining frames while streaming: ret, frame = cap.read() if not ret or frame is None: break # Run inference result = model(frame) result_image = visualizer.render(frame, result) output_video.write(result_image) n_frames += 1 # Release resources output_video.release() cap.release() # Get final metrics metrics = model.get_performance_metrics() inference_time = metrics.get_inference_time() preprocess_time = metrics.get_preprocess_time() postprocess_time = metrics.get_postprocess_time() fps_metric = metrics.get_fps() final_metrics = f"""βœ… Video Processing Complete! πŸ”„ Preprocessing: {preprocess_time.mean()*1000:.2f} ms βš™οΈ Inference: {inference_time.mean()*1000:.2f} ms πŸ“Š Postprocessing: {postprocess_time.mean()*1000:.2f} ms ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms 🎯 Average FPS: {fps_metric:.2f} πŸ“ˆ Total Frames: {n_frames} """ # Verify final file exists before returning if os.path.exists(output_video_name) and os.path.getsize(output_video_name) > 0: return output_video_name, final_metrics, gr.update(interactive=True), gr.update(interactive=False) else: return None, final_metrics + "\n⚠️ Final video file not available.", gr.update(interactive=True), gr.update(interactive=False) except Exception as e: error_msg = f"Error during video inference: {str(e)}" return None, error_msg, gr.update(interactive=True), gr.update(interactive=False) def stop_video_inference(): """Stop video processing.""" global streaming streaming = False return "⏹️ Video processing stopped.", gr.update(interactive=True), gr.update(interactive=False) def run_webcam_inference( frame: np.ndarray, model_name: str, confidence_threshold: float ) -> Tuple[Image.Image, str]: """ Process webcam stream - runs inference on captured camera frame. Args: frame: Input frame from webcam as numpy array model_name: Name of the model to use confidence_threshold: Confidence threshold for filtering predictions Returns: Tuple of (visualized_image, metrics_text) """ if frame is None: return None, "⚠️ No frame received from webcam." if model_name is None or model_name == "No models available": return None, "⚠️ No model selected or available." try: # Load or use cached model model = load_model(model_name, confidence_threshold=confidence_threshold) # Run inference result = model(frame) # Visualize results visualized_image = visualizer.render(frame, result) # Get performance metrics metrics = model.get_performance_metrics() inference_time = metrics.get_inference_time() preprocess_time = metrics.get_preprocess_time() postprocess_time = metrics.get_postprocess_time() fps = metrics.get_fps() # Format metrics text metrics_text = f"""πŸ”„ Preprocessing: {preprocess_time.mean()*1000:.2f} ms βš™οΈ Inference: {inference_time.mean()*1000:.2f} ms πŸ“Š Postprocessing: {postprocess_time.mean()*1000:.2f} ms ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ⏱️ Total Time: {(preprocess_time.mean() + inference_time.mean() + postprocess_time.mean())*1000:.2f} ms 🎯 FPS: {fps:.2f} πŸ“ˆ Total Frames: {inference_time.count} """ return visualized_image, metrics_text except Exception as e: error_msg = f"Error during webcam inference: {str(e)}" return None, error_msg def enable_video_buttons(video): """Enable start button when video is uploaded.""" if video is not None: return gr.update(interactive=True), gr.update(interactive=False) else: return gr.update(interactive=False), gr.update(interactive=False) def format_results(result, confidence_threshold: float) -> str: """ Format model results (classification or detection) as text. Args: result: Result object from model_api confidence_threshold: Confidence threshold for filtering Returns: Formatted results text """ # Check if it's a classification result if hasattr(result, 'top_labels') and result.top_labels: results_text = "πŸ” Classification Results:\n" results_text += "━" * 50 + "\n" filtered_labels = [ label for label in result.top_labels if label.confidence >= confidence_threshold ] if filtered_labels: for i, label in enumerate(filtered_labels, 1): results_text += f"{i}. {label.name}: {label.confidence:.3f}\n" else: results_text += f"No predictions above confidence threshold {confidence_threshold:.2f}\n" # Check if it's a detection result elif hasattr(result, 'segmentedObjects') and result.segmentedObjects: results_text = "πŸ” Detected Objects:\n" results_text += "━" * 50 + "\n" # Filter by confidence filtered_objects = [ obj for obj in result.segmentedObjects if obj.score >= confidence_threshold ] if filtered_objects: from collections import Counter label_counts = Counter(obj.str_label for obj in filtered_objects) for i, obj in enumerate(filtered_objects, 1): x1, y1 = int(obj.xmin), int(obj.ymin) x2, y2 = int(obj.xmax), int(obj.ymax) results_text += f"{i}. {obj.str_label}: {obj.score:.3f} @ [{x1}, {y1}, {x2}, {y2}]\n" results_text += "\nπŸ“Š Summary:\n" for label, count in label_counts.most_common(): results_text += f" β€’ {label}: {count}\n" else: results_text += f"No detections above confidence threshold {confidence_threshold:.2f}\n" else: results_text = "No results available\n" return results_text def create_gradio_interface(): """ Create and configure the Gradio interface. Returns: gr.Blocks: Configured Gradio interface """ available_models = get_available_models() if not available_models: print("Warning: No models found in models/ folder") available_models = ["No models available"] with gr.Blocks(title="OpenVINO with model_api") as demo: gr.Markdown("# 🎯 OpenVINO with model_api") gr.Markdown("Experience high-performance object detection powered by **OpenVINOβ„’** and **model_api**. See real-time inference with detailed performance metrics.") with gr.Tabs() as tabs: with gr.TabItem("πŸ“Έ Image Inference"): with gr.Row(): with gr.Column(scale=1): input_image = gr.Image( label="Input Image", type="numpy" ) model_dropdown = gr.Dropdown( choices=available_models, value=available_models[0] if available_models else None, label="Select Model", info="Choose a model from the models/ folder" ) confidence_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.05, label="Confidence Threshold", info="Minimum confidence for displaying predictions" ) classify_btn = gr.Button("πŸš€ Run Inference", variant="primary") with gr.Column(scale=1): output_image = gr.Image( label="Detection Result", type="pil", show_label=False ) metrics_output = gr.Textbox( label="Performance Metrics", lines=8, max_lines=15 ) # Connect the button to the inference function classify_btn.click( fn=run_inference, inputs=[input_image, model_dropdown, confidence_slider], outputs=[output_image, metrics_output] ) # Examples section gr.Markdown("---") gr.Markdown("## πŸ“Έ Example Images") gr.Examples( examples=[ ["examples/vehicles.png", "YOLO-11-N" if "YOLO-11-N" in available_models else available_models[0], 0.5], ["examples/dog.jpg", "YOLO-11-S" if "YOLO-11-S" in available_models else available_models[0], 0.6], ["examples/people-walking.png", "YOLO-11-M" if "YOLO-11-M" in available_models else available_models[0], 0.3], ["examples/zidane.jpg", "resnet50" if "resnet50" in available_models else available_models[0], 0.5], ], inputs=[input_image, model_dropdown, confidence_slider], outputs=[output_image, metrics_output], fn=run_inference, cache_examples=True ) with gr.TabItem("πŸŽ₯ Video Inference"): with gr.Row(): with gr.Column(scale=1): input_video = gr.Video( label="Input Video" ) video_model_dropdown = gr.Dropdown( choices=available_models, value=available_models[0] if available_models else None, label="Select Model", info="Choose a model from the models/ folder" ) video_confidence_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.05, label="Confidence Threshold", info="Minimum confidence for displaying predictions" ) with gr.Row(): video_start_btn = gr.Button("▢️ Start Processing", variant="primary", interactive=False) video_stop_btn = gr.Button("⏹️ Stop", variant="stop", interactive=False) with gr.Column(scale=1): output_video = gr.Video( label="Processed Video", autoplay=True, show_label=False ) video_metrics_output = gr.Textbox( label="Performance Metrics", lines=8, max_lines=15 ) # Enable start button when video is uploaded input_video.change( fn=enable_video_buttons, inputs=[input_video], outputs=[video_start_btn, video_stop_btn] ) # Connect video buttons - when clicked, start is disabled and stop is enabled def start_processing_wrapper(video, model, conf): # First disable start and enable stop yield None, "πŸ”„ Starting video processing...", gr.update(interactive=False), gr.update(interactive=True) # Then run the actual processing result = run_video_inference(video, model, conf) yield result video_start_btn.click( fn=start_processing_wrapper, inputs=[input_video, video_model_dropdown, video_confidence_slider], outputs=[output_video, video_metrics_output, video_start_btn, video_stop_btn] ) video_stop_btn.click( fn=stop_video_inference, inputs=None, outputs=[video_metrics_output, video_start_btn, video_stop_btn] ) # Video Examples section gr.Markdown("---") gr.Markdown("## 🎬 Example Videos") gr.Examples( examples=[ ["examples/doggo.mp4", "YOLO-11-S" if "YOLO-11-S" in available_models else available_models[0], 0.4], ["examples/basketball.mp4", "YOLO-11-N" if "YOLO-11-N" in available_models else available_models[0], 0.3], ], inputs=[input_video, video_model_dropdown, video_confidence_slider], outputs=[output_video, video_metrics_output], fn=run_video_inference, cache_examples=True ) with gr.TabItem("πŸ“Ή Live Inference"): gr.Markdown("### Real-time inference using your webcam") gr.Markdown("⚠️ **Note:** Allow browser access to your webcam when prompted.") with gr.Row(): with gr.Column(scale=1): webcam_input = gr.Image( sources=["webcam"], label="Webcam", type="numpy", streaming=True, show_label=False ) webcam_model_dropdown = gr.Dropdown( choices=available_models, value=available_models[0] if available_models else None, label="Select Model", info="Choose a model from the models/ folder" ) webcam_confidence_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.05, label="Confidence Threshold", info="Minimum confidence for displaying predictions" ) with gr.Column(scale=1): webcam_output = gr.Image( label="Detection Result", type="pil", show_label=False ) webcam_metrics_output = gr.Textbox( label="Performance Metrics", lines=8, max_lines=15 ) # Set up streaming from webcam webcam_input.stream( fn=run_webcam_inference, inputs=[webcam_input, webcam_model_dropdown, webcam_confidence_slider], outputs=[webcam_output, webcam_metrics_output], time_limit=60, stream_every=0.1, concurrency_limit=16 ) return demo if __name__ == "__main__": demo = create_gradio_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, # Disable experimental Server-Side Rendering to avoid asyncio fd cleanup errors # observed on some hosting environments (e.g., HF Spaces with Python 3.10). # Falling back to the classic Gradio frontend keeps the event loop lifecycle # straightforward and prevents "Invalid file descriptor: -1" warnings at shutdown. ssr_mode=False, )