Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from ultralytics import YOLO | |
| import tempfile | |
| import os | |
| from PIL import Image | |
| import time | |
| import torch | |
| import psutil | |
| import spaces # Required for Zero GPU | |
| from huggingface_hub import hf_hub_download | |
| from transformers import ( | |
| AutoImageProcessor, | |
| AutoModelForObjectDetection | |
| ) | |
| import supervision as sv | |
| # Download and load three YOLO models from private repos using HF tokens | |
| model_path1 = hf_hub_download( | |
| repo_id="limitedonly41/safety_best", | |
| filename="safety_best.pt", | |
| token=os.environ.get("HF_TOKEN") # Set in HF Secrets | |
| ) | |
| model1 = YOLO(model_path1) | |
| # model_path_2 = hf_hub_download( | |
| # repo_id="limitedonly41/safety_best", | |
| # filename="best_ppe.pt", | |
| # token=os.environ.get("HF_TOKEN") # Set in HF Secrets | |
| # ) | |
| # model2 = YOLO(model_path_2) | |
| # model_path_3 = hf_hub_download( | |
| # repo_id="limitedonly41/safety_best", | |
| # filename="best_ppe2.pt", | |
| # token=os.environ.get("HF_TOKEN") # Set in HF Secrets | |
| # ) | |
| # model3 = YOLO(model_path_3) | |
| model_path_4 = hf_hub_download( | |
| repo_id="limitedonly41/safety_best", | |
| filename="best_6_cls.pt", | |
| token=os.environ.get("HF_TOKEN") # Set in HF Secrets | |
| ) | |
| model4 = YOLO(model_path_4) | |
| model_path_5 = hf_hub_download( | |
| repo_id="limitedonly41/safety_best", | |
| filename="best_ppe_big.pt", | |
| token=os.environ.get("HF_TOKEN") # Set in HF Secrets | |
| ) | |
| model5 = YOLO(model_path_5) | |
| # --- Load Model 6: Hugging Face Transformers Object Detection Model --- | |
| CHECKPOINT = "limitedonly41/ppe_rt_det" | |
| # Global variables for Model 6 | |
| model6 = None | |
| processor6 = None | |
| def load_model6(): | |
| global model6, processor6 | |
| if model6 is None or processor6 is None: | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| try: | |
| print("Loading Model 6...") | |
| model6 = AutoModelForObjectDetection.from_pretrained(CHECKPOINT, token=os.environ.get("HF_TOKEN")).to(DEVICE) | |
| processor6 = AutoImageProcessor.from_pretrained(CHECKPOINT) | |
| print("Model 6 loaded successfully.") | |
| except Exception as e: | |
| print("Failed to load Model 6:", str(e)) | |
| # Move models to GPU if available | |
| if torch.cuda.is_available(): | |
| model1.to('cuda') | |
| # model2.to('cuda') | |
| # model3.to('cuda') | |
| model4.to('cuda') | |
| model5.to('cuda') | |
| def get_gpu_info(): | |
| """Get GPU usage information""" | |
| if torch.cuda.is_available(): | |
| gpu_name = torch.cuda.get_device_name(0) | |
| gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3 | |
| return f"π GPU: {gpu_name} ({gpu_memory:.1f}GB)" | |
| else: | |
| return "π» Using CPU" | |
| def select_model(model_name): | |
| if model_name == "YOLOv11_my_v1": | |
| return model1 | |
| # elif model_name == "Model 2": | |
| # return model2 | |
| # elif model_name == "Model 3": | |
| # return model3 | |
| elif model_name == "Model 4": | |
| return model4 | |
| elif model_name == "YOLOv11_my_v5": | |
| return model5 | |
| else: | |
| return model1 | |
| def predict_image(image, model_choice): | |
| if image is None: | |
| return None, "Please upload an image" | |
| try: | |
| if model_choice == "ppe_rt_det": | |
| return predict_with_model6(image) | |
| # Otherwise, use YOLO models | |
| current_model = select_model(model_choice) | |
| results = current_model(image) | |
| annotated_image = results[0].plot() | |
| annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB) | |
| detections = results[0].boxes | |
| gpu_info = get_gpu_info() | |
| if detections is not None and len(detections) > 0: | |
| num_detections = len(detections) | |
| confidence_scores = detections.conf.cpu().numpy() | |
| classes = detections.cls.cpu().numpy() | |
| class_names = [current_model.names[int(cls)] for cls in classes] | |
| detection_info = f"π― Detection Results - {gpu_info}\n\n" | |
| detection_info += f"Found {num_detections} objects:\n" | |
| for i, (cls_name, conf) in enumerate(zip(class_names, confidence_scores)): | |
| detection_info += f"β’ {cls_name}: {conf:.2f}\n" | |
| else: | |
| detection_info = f"π― Detection Results - {gpu_info}\n\nNo objects detected" | |
| return annotated_image, detection_info | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| def predict_with_model6(image): | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| if image is None: | |
| return None, "No image provided." | |
| try: | |
| # Ensure model is loaded | |
| if model6 is None: | |
| load_model6() | |
| if model6 is None: | |
| return None, "Failed to load Model 6." | |
| w, h = image.size | |
| inputs = processor6(image, return_tensors="pt").to(DEVICE) | |
| with torch.no_grad(): | |
| outputs = model6(**inputs) | |
| results = processor6.post_process_object_detection( | |
| outputs, target_sizes=[(h, w)], threshold=0.1 | |
| ) | |
| detections = sv.Detections.from_transformers(results[0]) | |
| labels = [ | |
| model6.config.id2label[class_id] for class_id in detections.class_id | |
| ] | |
| # Annotate image | |
| annotated_image = np.array(image).copy() | |
| annotated_image = sv.BoxAnnotator().annotate(annotated_image, detections) | |
| annotated_image = sv.LabelAnnotator().annotate( | |
| annotated_image, detections, labels=labels | |
| ) | |
| annotated_img_pil = Image.fromarray(annotated_image) | |
| annotated_img_pil.thumbnail((600, 600)) # Resize for display | |
| # Detection info | |
| num_detections = len(detections) | |
| gpu_info = get_gpu_info() | |
| detection_info = f"π― Detection Results - {gpu_info}\n\n" | |
| detection_info += f"Found {num_detections} objects:\n" | |
| for class_id, conf, label in zip(detections.class_id, detections.confidence, labels): | |
| # detection_info += f"β’ {label} (ID {class_id}): {conf:.2f}\n" | |
| detection_info += f"β’ {label} (ID {class_id})\n" | |
| return annotated_img_pil, detection_info | |
| except Exception as e: | |
| import traceback | |
| return None, f"Error in Model 6: {str(e)}\n{traceback.format_exc()}" | |
| def predict_video(video_path, model_choice, progress=gr.Progress()): | |
| if video_path is None: | |
| return None, "Please upload a video" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| if frame_count == 0 or fps == 0: | |
| return None, "Error: Could not read video properties" | |
| temp_output = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| output_path = temp_output.name | |
| temp_output.close() | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| detection_summary = {"total_frames": 0, "frames_with_detections": 0, "total_detections": 0} | |
| frame_num = 0 | |
| start_time = time.time() | |
| progress(0, desc="Processing video...") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| if model_choice == "Model 6": | |
| result_img_pil, _ = predict_with_model6(frame_pil) | |
| annotated_frame = cv2.cvtColor(np.array(result_img_pil), cv2.COLOR_RGB2BGR) | |
| num_detections = _.count("β’") # Crude but works | |
| else: | |
| current_model = select_model(model_choice) | |
| results = current_model(frame) | |
| annotated_frame = results[0].plot() | |
| boxes = results[0].boxes | |
| num_detections = len(boxes) if boxes is not None else 0 | |
| out.write(annotated_frame) | |
| detection_summary["total_frames"] += 1 | |
| if num_detections > 0: | |
| detection_summary["frames_with_detections"] += 1 | |
| detection_summary["total_detections"] += num_detections | |
| frame_num += 1 | |
| if frame_count > 0: | |
| progress(frame_num / frame_count, desc=f"Frame {frame_num}/{frame_count}") | |
| cap.release() | |
| out.release() | |
| processing_time = time.time() - start_time | |
| gpu_info = get_gpu_info() | |
| summary_text = f"""π¬ Video Processing Complete! - {gpu_info} | |
| π Summary: | |
| β’ Total frames: {detection_summary['total_frames']} | |
| β’ Frames with detections: {detection_summary['frames_with_detections']} | |
| β’ Total detections: {detection_summary['total_detections']} | |
| β’ Detection rate: {detection_summary['frames_with_detections']/detection_summary['total_frames']*100:.1f}% | |
| β’ Processing time: {processing_time:.1f} seconds | |
| β’ FPS: {detection_summary['total_frames']/processing_time:.1f} | |
| """ | |
| return output_path, summary_text | |
| except Exception as e: | |
| return None, f"Error processing video: {str(e)}" | |
| # Create Gradio interface | |
| with gr.Blocks( | |
| title="YOLO Object Detection - GPU Accelerated", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .gpu-info { | |
| background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 10px; | |
| border-radius: 8px; | |
| text-align: center; | |
| margin: 10px 0; | |
| font-weight: bold; | |
| } | |
| """ | |
| ) as demo: | |
| # GPU Status indicator | |
| gpu_status = get_gpu_info() | |
| gr.HTML(f""" | |
| <div style="text-align: center;"> | |
| <h1>π― YOLO Object Detection</h1> | |
| <p>Upload images or videos to detect objects using a trained YOLO model</p> | |
| <div class="gpu-info">{gpu_status}</div> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # Model selector common for image and video | |
| model_selector = gr.Dropdown( | |
| choices=["YOLOv11_my_v1", "YOLOv11_my_v5", "ppe_rt_det"], | |
| value="YOLOv11_my_v1", | |
| label="Choose model" | |
| ) | |
| # Image Tab | |
| with gr.Tab("π· Image "): | |
| gr.Markdown("### Upload an image to detect objects") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input = gr.Image( | |
| label="Upload Image", | |
| type="pil", | |
| height=400 | |
| ) | |
| image_button = gr.Button( | |
| "π Detect Objects (GPU)", | |
| variant="primary", | |
| size="lg", | |
| scale=1, | |
| ) | |
| with gr.Column(scale=1): | |
| image_output = gr.Image( | |
| label="Detection Results", | |
| height=400 | |
| ) | |
| image_info = gr.Textbox( | |
| label="Detection Info", | |
| lines=8, | |
| max_lines=10, | |
| show_copy_button=True | |
| ) | |
| # Video Tab | |
| with gr.Tab("π¬ Video "): | |
| gr.Markdown("### Upload a video to detect objects frame by frame") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video( | |
| label="Upload Video", | |
| height=400 | |
| ) | |
| video_button = gr.Button( | |
| "π― Process Video (GPU)", | |
| variant="primary", | |
| size="lg", | |
| scale=1, | |
| ) | |
| # gr.HTML(""" | |
| # <div style="background-color: #f0f0f0; padding: 10px; border-radius: 5px; margin-top: 10px;"> | |
| # <strong>β οΈ Note:</strong> Video processing uses GPU acceleration for faster inference. | |
| # The progress bar shows current processing status. | |
| # </div> | |
| # """) | |
| with gr.Column(scale=1): | |
| video_output = gr.Video( | |
| label="Processed Video", | |
| height=400 | |
| ) | |
| video_info = gr.Textbox( | |
| label="Processing Summary", | |
| lines=8, | |
| max_lines=10, | |
| show_copy_button=True | |
| ) | |
| # Usage instructions below tabs | |
| with gr.Accordion("π Usage Instructions", open=False): | |
| gr.Markdown(""" | |
| ### Image Detection: | |
| - **Supported formats:** JPG, PNG, WEBP, BMP | |
| - **Output:** Annotated image with bounding boxes and confidence scores | |
| - **Info panel:** Lists all detected objects with confidence levels | |
| - **Processing:** GPU-accelerated inference for fast results | |
| ### Video Detection: | |
| - **Supported formats:** MP4, AVI, MOV, MKV | |
| - **Processing:** Frame-by-frame detection with GPU acceleration | |
| - **Output:** Annotated video with detection statistics | |
| - **Summary:** Comprehensive processing report with performance metrics | |
| ### GPU Features: | |
| - Automatic GPU detection and utilization | |
| - Real-time processing status with GPU indicator | |
| - Performance metrics showing processing speed | |
| - Optimized memory usage for large files | |
| ### Tips: | |
| - GPU acceleration significantly reduces processing time | |
| - For best results, use clear, well-lit images/videos | |
| - The model confidence threshold is optimized for balanced precision/recall | |
| """) | |
| # Button events linking selected model choice | |
| image_button.click( | |
| fn=predict_image, | |
| inputs=[image_input, model_selector], | |
| outputs=[image_output, image_info], | |
| show_progress=True | |
| ) | |
| video_button.click( | |
| fn=predict_video, | |
| inputs=[video_input, model_selector], | |
| outputs=[video_output, video_info], | |
| show_progress=True | |
| ) | |
| # Footer | |
| gr.HTML(""" | |
| <div style="text-align: center; margin-top: 20px; padding: 10px; border-top: 1px solid #ddd;"> | |
| <p>Builtby using <a href="https://gradio.app/" target="_blank">Gradio</a> | GPU Accelerated</p> | |
| </div> | |
| """) | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| demo.launch() | |