Safety_CV / app.py
limitedonly41's picture
Update app.py
409e33f verified
import gradio as gr
import cv2
import numpy as np
from ultralytics import YOLO
import tempfile
import os
from PIL import Image
import time
import torch
import psutil
import spaces # Required for Zero GPU
from huggingface_hub import hf_hub_download
from transformers import (
AutoImageProcessor,
AutoModelForObjectDetection
)
import supervision as sv
# Download and load three YOLO models from private repos using HF tokens
model_path1 = hf_hub_download(
repo_id="limitedonly41/safety_best",
filename="safety_best.pt",
token=os.environ.get("HF_TOKEN") # Set in HF Secrets
)
model1 = YOLO(model_path1)
# model_path_2 = hf_hub_download(
# repo_id="limitedonly41/safety_best",
# filename="best_ppe.pt",
# token=os.environ.get("HF_TOKEN") # Set in HF Secrets
# )
# model2 = YOLO(model_path_2)
# model_path_3 = hf_hub_download(
# repo_id="limitedonly41/safety_best",
# filename="best_ppe2.pt",
# token=os.environ.get("HF_TOKEN") # Set in HF Secrets
# )
# model3 = YOLO(model_path_3)
model_path_4 = hf_hub_download(
repo_id="limitedonly41/safety_best",
filename="best_6_cls.pt",
token=os.environ.get("HF_TOKEN") # Set in HF Secrets
)
model4 = YOLO(model_path_4)
model_path_5 = hf_hub_download(
repo_id="limitedonly41/safety_best",
filename="best_ppe_big.pt",
token=os.environ.get("HF_TOKEN") # Set in HF Secrets
)
model5 = YOLO(model_path_5)
# --- Load Model 6: Hugging Face Transformers Object Detection Model ---
CHECKPOINT = "limitedonly41/ppe_rt_det"
# Global variables for Model 6
model6 = None
processor6 = None
def load_model6():
global model6, processor6
if model6 is None or processor6 is None:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
print("Loading Model 6...")
model6 = AutoModelForObjectDetection.from_pretrained(CHECKPOINT, token=os.environ.get("HF_TOKEN")).to(DEVICE)
processor6 = AutoImageProcessor.from_pretrained(CHECKPOINT)
print("Model 6 loaded successfully.")
except Exception as e:
print("Failed to load Model 6:", str(e))
# Move models to GPU if available
if torch.cuda.is_available():
model1.to('cuda')
# model2.to('cuda')
# model3.to('cuda')
model4.to('cuda')
model5.to('cuda')
def get_gpu_info():
"""Get GPU usage information"""
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3
return f"πŸš€ GPU: {gpu_name} ({gpu_memory:.1f}GB)"
else:
return "πŸ’» Using CPU"
def select_model(model_name):
if model_name == "YOLOv11_my_v1":
return model1
# elif model_name == "Model 2":
# return model2
# elif model_name == "Model 3":
# return model3
elif model_name == "Model 4":
return model4
elif model_name == "YOLOv11_my_v5":
return model5
else:
return model1
@spaces.GPU
def predict_image(image, model_choice):
if image is None:
return None, "Please upload an image"
try:
if model_choice == "ppe_rt_det":
return predict_with_model6(image)
# Otherwise, use YOLO models
current_model = select_model(model_choice)
results = current_model(image)
annotated_image = results[0].plot()
annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
detections = results[0].boxes
gpu_info = get_gpu_info()
if detections is not None and len(detections) > 0:
num_detections = len(detections)
confidence_scores = detections.conf.cpu().numpy()
classes = detections.cls.cpu().numpy()
class_names = [current_model.names[int(cls)] for cls in classes]
detection_info = f"🎯 Detection Results - {gpu_info}\n\n"
detection_info += f"Found {num_detections} objects:\n"
for i, (cls_name, conf) in enumerate(zip(class_names, confidence_scores)):
detection_info += f"β€’ {cls_name}: {conf:.2f}\n"
else:
detection_info = f"🎯 Detection Results - {gpu_info}\n\nNo objects detected"
return annotated_image, detection_info
except Exception as e:
return None, f"Error: {str(e)}"
@spaces.GPU
def predict_with_model6(image):
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if image is None:
return None, "No image provided."
try:
# Ensure model is loaded
if model6 is None:
load_model6()
if model6 is None:
return None, "Failed to load Model 6."
w, h = image.size
inputs = processor6(image, return_tensors="pt").to(DEVICE)
with torch.no_grad():
outputs = model6(**inputs)
results = processor6.post_process_object_detection(
outputs, target_sizes=[(h, w)], threshold=0.1
)
detections = sv.Detections.from_transformers(results[0])
labels = [
model6.config.id2label[class_id] for class_id in detections.class_id
]
# Annotate image
annotated_image = np.array(image).copy()
annotated_image = sv.BoxAnnotator().annotate(annotated_image, detections)
annotated_image = sv.LabelAnnotator().annotate(
annotated_image, detections, labels=labels
)
annotated_img_pil = Image.fromarray(annotated_image)
annotated_img_pil.thumbnail((600, 600)) # Resize for display
# Detection info
num_detections = len(detections)
gpu_info = get_gpu_info()
detection_info = f"🎯 Detection Results - {gpu_info}\n\n"
detection_info += f"Found {num_detections} objects:\n"
for class_id, conf, label in zip(detections.class_id, detections.confidence, labels):
# detection_info += f"β€’ {label} (ID {class_id}): {conf:.2f}\n"
detection_info += f"β€’ {label} (ID {class_id})\n"
return annotated_img_pil, detection_info
except Exception as e:
import traceback
return None, f"Error in Model 6: {str(e)}\n{traceback.format_exc()}"
@spaces.GPU
def predict_video(video_path, model_choice, progress=gr.Progress()):
if video_path is None:
return None, "Please upload a video"
try:
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if frame_count == 0 or fps == 0:
return None, "Error: Could not read video properties"
temp_output = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
output_path = temp_output.name
temp_output.close()
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
detection_summary = {"total_frames": 0, "frames_with_detections": 0, "total_detections": 0}
frame_num = 0
start_time = time.time()
progress(0, desc="Processing video...")
while True:
ret, frame = cap.read()
if not ret:
break
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if model_choice == "Model 6":
result_img_pil, _ = predict_with_model6(frame_pil)
annotated_frame = cv2.cvtColor(np.array(result_img_pil), cv2.COLOR_RGB2BGR)
num_detections = _.count("β€’") # Crude but works
else:
current_model = select_model(model_choice)
results = current_model(frame)
annotated_frame = results[0].plot()
boxes = results[0].boxes
num_detections = len(boxes) if boxes is not None else 0
out.write(annotated_frame)
detection_summary["total_frames"] += 1
if num_detections > 0:
detection_summary["frames_with_detections"] += 1
detection_summary["total_detections"] += num_detections
frame_num += 1
if frame_count > 0:
progress(frame_num / frame_count, desc=f"Frame {frame_num}/{frame_count}")
cap.release()
out.release()
processing_time = time.time() - start_time
gpu_info = get_gpu_info()
summary_text = f"""🎬 Video Processing Complete! - {gpu_info}
πŸ“Š Summary:
β€’ Total frames: {detection_summary['total_frames']}
β€’ Frames with detections: {detection_summary['frames_with_detections']}
β€’ Total detections: {detection_summary['total_detections']}
β€’ Detection rate: {detection_summary['frames_with_detections']/detection_summary['total_frames']*100:.1f}%
β€’ Processing time: {processing_time:.1f} seconds
β€’ FPS: {detection_summary['total_frames']/processing_time:.1f}
"""
return output_path, summary_text
except Exception as e:
return None, f"Error processing video: {str(e)}"
# Create Gradio interface
with gr.Blocks(
title="YOLO Object Detection - GPU Accelerated",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1200px !important;
}
.gpu-info {
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 10px;
border-radius: 8px;
text-align: center;
margin: 10px 0;
font-weight: bold;
}
"""
) as demo:
# GPU Status indicator
gpu_status = get_gpu_info()
gr.HTML(f"""
<div style="text-align: center;">
<h1>🎯 YOLO Object Detection</h1>
<p>Upload images or videos to detect objects using a trained YOLO model</p>
<div class="gpu-info">{gpu_status}</div>
</div>
""")
with gr.Tabs():
# Model selector common for image and video
model_selector = gr.Dropdown(
choices=["YOLOv11_my_v1", "YOLOv11_my_v5", "ppe_rt_det"],
value="YOLOv11_my_v1",
label="Choose model"
)
# Image Tab
with gr.Tab("πŸ“· Image "):
gr.Markdown("### Upload an image to detect objects")
with gr.Row():
with gr.Column(scale=1):
image_input = gr.Image(
label="Upload Image",
type="pil",
height=400
)
image_button = gr.Button(
"πŸ” Detect Objects (GPU)",
variant="primary",
size="lg",
scale=1,
)
with gr.Column(scale=1):
image_output = gr.Image(
label="Detection Results",
height=400
)
image_info = gr.Textbox(
label="Detection Info",
lines=8,
max_lines=10,
show_copy_button=True
)
# Video Tab
with gr.Tab("🎬 Video "):
gr.Markdown("### Upload a video to detect objects frame by frame")
with gr.Row():
with gr.Column(scale=1):
video_input = gr.Video(
label="Upload Video",
height=400
)
video_button = gr.Button(
"🎯 Process Video (GPU)",
variant="primary",
size="lg",
scale=1,
)
# gr.HTML("""
# <div style="background-color: #f0f0f0; padding: 10px; border-radius: 5px; margin-top: 10px;">
# <strong>⚠️ Note:</strong> Video processing uses GPU acceleration for faster inference.
# The progress bar shows current processing status.
# </div>
# """)
with gr.Column(scale=1):
video_output = gr.Video(
label="Processed Video",
height=400
)
video_info = gr.Textbox(
label="Processing Summary",
lines=8,
max_lines=10,
show_copy_button=True
)
# Usage instructions below tabs
with gr.Accordion("πŸ“ Usage Instructions", open=False):
gr.Markdown("""
### Image Detection:
- **Supported formats:** JPG, PNG, WEBP, BMP
- **Output:** Annotated image with bounding boxes and confidence scores
- **Info panel:** Lists all detected objects with confidence levels
- **Processing:** GPU-accelerated inference for fast results
### Video Detection:
- **Supported formats:** MP4, AVI, MOV, MKV
- **Processing:** Frame-by-frame detection with GPU acceleration
- **Output:** Annotated video with detection statistics
- **Summary:** Comprehensive processing report with performance metrics
### GPU Features:
- Automatic GPU detection and utilization
- Real-time processing status with GPU indicator
- Performance metrics showing processing speed
- Optimized memory usage for large files
### Tips:
- GPU acceleration significantly reduces processing time
- For best results, use clear, well-lit images/videos
- The model confidence threshold is optimized for balanced precision/recall
""")
# Button events linking selected model choice
image_button.click(
fn=predict_image,
inputs=[image_input, model_selector],
outputs=[image_output, image_info],
show_progress=True
)
video_button.click(
fn=predict_video,
inputs=[video_input, model_selector],
outputs=[video_output, video_info],
show_progress=True
)
# Footer
gr.HTML("""
<div style="text-align: center; margin-top: 20px; padding: 10px; border-top: 1px solid #ddd;">
<p>Builtby using <a href="https://gradio.app/" target="_blank">Gradio</a> | GPU Accelerated</p>
</div>
""")
# Launch the interface
if __name__ == "__main__":
demo.launch()