import gradio as gr
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
import os
import time
from collections import deque
from typing import Optional, Tuple, Generator
import threading
from queue import Queue
# ═════════════════════════════════════════════════════════════════════════════
# Configuration
# ═════════════════════════════════════════════════════════════════════════════
class Config:
"""Configuration for the detection system"""
MODEL_PATH = "best (3).pt"
# Performance settings
DEFAULT_CAMERA_WIDTH = 640
DEFAULT_CAMERA_HEIGHT = 480
DEFAULT_FPS = 30
# Detection settings
DEFAULT_CONF_THRESHOLD = 0.25
DEFAULT_IOU_THRESHOLD = 0.45
DEFAULT_IMGSZ = 640
# CLAHE settings
CLAHE_CLIP_LIMIT = 2.0
CLAHE_TILE_GRID = (8, 8)
# Live detection settings
FRAME_BUFFER_SIZE = 2 # Number of frames to buffer
MAX_STREAM_TIME = 3600 # 1 hour
# ═════════════════════════════════════════════════════════════════════════════
# Load YOLO Model
# ═════════════════════════════════════════════════════════════════════════════
try:
model = YOLO(Config.MODEL_PATH)
print(f"✅ Model loaded: {Config.MODEL_PATH}")
print(f"📊 Classes: {model.names}")
print(f"🔢 Number of classes: {len(model.names)}")
except Exception as e:
print(f"❌ Error loading model: {e}")
model = None
# ═════════════════════════════════════════════════════════════════════════════
# CLAHE Preprocessing
# ═════════════════════════════════════════════════════════════════════════════
class CLAHEProcessor:
"""Optimized CLAHE processor with caching"""
def __init__(self, clip_limit: float = Config.CLAHE_CLIP_LIMIT,
tile_grid: tuple = Config.CLAHE_TILE_GRID):
self.clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid)
def process(self, image: np.ndarray) -> np.ndarray:
"""Apply CLAHE preprocessing for shadow recovery."""
if image.dtype != np.uint8:
image = np.clip(image, 0, 255).astype(np.uint8)
# LAB color space - only enhance L channel
bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
# CLAHE on L channel
l_enhanced = self.clahe.apply(l)
# Merge back
lab_enhanced = cv2.merge([l_enhanced, a, b])
bgr_enhanced = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR)
rgb_enhanced = cv2.cvtColor(bgr_enhanced, cv2.COLOR_BGR2RGB)
return rgb_enhanced
# Global CLAHE processor
clahe_processor = CLAHEProcessor()
# ═════════════════════════════════════════════════════════════════════════════
# FPS Counter
# ═════════════════════════════════════════════════════════════════════════════
class FPSCounter:
"""Track FPS with moving average"""
def __init__(self, buffer_size=30):
self.frame_times = deque(maxlen=buffer_size)
self.last_time = time.time()
self.lock = threading.Lock()
def update(self):
with self.lock:
current_time = time.time()
self.frame_times.append(current_time - self.last_time)
self.last_time = current_time
def get_fps(self) -> float:
with self.lock:
if len(self.frame_times) == 0:
return 0.0
return 1.0 / (sum(self.frame_times) / len(self.frame_times))
# ═════════════════════════════════════════════════════════════════════════════
# Detection Statistics
# ═════════════════════════════════════════════════════════════════════════════
class DetectionStats:
"""Track detection statistics over time"""
def __init__(self, window_size=100):
self.total_frames = 0
self.total_detections = 0
self.class_history = deque(maxlen=window_size)
self.confidence_history = deque(maxlen=window_size)
self.lock = threading.Lock()
def update(self, class_counts: dict, confidences: list):
with self.lock:
self.total_frames += 1
frame_detections = sum(class_counts.values())
self.total_detections += frame_detections
self.class_history.append(class_counts)
if confidences:
avg_conf = sum(confidences) / len(confidences)
self.confidence_history.append(avg_conf)
def get_average_detections_per_frame(self) -> float:
with self.lock:
if self.total_frames == 0:
return 0.0
return self.total_detections / self.total_frames
def get_most_common_class(self) -> Optional[str]:
with self.lock:
all_classes = {}
for frame_counts in self.class_history:
for class_name, count in frame_counts.items():
all_classes[class_name] = all_classes.get(class_name, 0) + count
if not all_classes:
return None
return max(all_classes, key=all_classes.get)
# ═════════════════════════════════════════════════════════════════════════════
# Single Image Detection
# ═════════════════════════════════════════════════════════════════════════════
def detect_engine_parts(image, conf_threshold=0.25, apply_clahe_preprocessing=True):
"""
Detect engine parts with YOLO (single image).
"""
if model is None:
return image, "❌ Model not loaded. Please check the model file."
if isinstance(image, Image.Image):
image = np.array(image)
if image is None:
return None, "❌ No image provided"
# Apply CLAHE preprocessing
if apply_clahe_preprocessing:
image = clahe_processor.process(image)
# YOLO inference
results = model.predict(
source=image,
conf=conf_threshold,
iou=Config.DEFAULT_IOU_THRESHOLD,
imgsz=Config.DEFAULT_IMGSZ,
verbose=False,
)
# Get annotated image
annotated = results[0].plot()
annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)
# Extract detection info
boxes = results[0].boxes
if len(boxes) == 0:
summary = f"**No detections** (threshold: {conf_threshold:.0%})"
else:
summary = f"**Detected {len(boxes)} object(s):**\n\n"
# Group by class
class_counts = {}
for box in boxes:
class_id = int(box.cls[0])
class_name = model.names[class_id]
class_counts[class_name] = class_counts.get(class_name, 0) + 1
# Summary by class
for class_name, count in sorted(class_counts.items()):
summary += f"• **{class_name}**: {count}\n"
summary += f"\n**Details:**\n"
# Individual detections
for i, box in enumerate(boxes, 1):
conf = float(box.conf[0])
class_id = int(box.cls[0])
class_name = model.names[class_id]
summary += f"{i}. **{class_name}** — {conf:.2%} confidence\n"
return annotated_rgb, summary
# ═════════════════════════════════════════════════════════════════════════════
# Visualization
# ═════════════════════════════════════════════════════════════════════════════
def draw_info_overlay(frame: np.ndarray, fps: float, detection_counts: dict,
total_detections: int, conf_threshold: float,
avg_confidence: Optional[float] = None) -> np.ndarray:
"""Draw comprehensive info overlay on frame"""
overlay = frame.copy()
height, width = frame.shape[:2]
# Calculate overlay height based on number of classes
num_classes = len(detection_counts)
overlay_height = max(180, 120 + num_classes * 25)
# Semi-transparent background
cv2.rectangle(overlay, (10, 10), (380, overlay_height), (0, 0, 0), -1)
cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame)
# Header
cv2.putText(frame, "LIVE DETECTION", (20, 35),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# FPS
fps_color = (0, 255, 0) if fps > 20 else (255, 165, 0) if fps > 10 else (0, 0, 255)
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 65),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, fps_color, 2)
# Total detections
cv2.putText(frame, f"Objects: {total_detections}", (150, 65),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# Confidence threshold
cv2.putText(frame, f"Threshold: {conf_threshold:.0%}", (20, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
# Average confidence
if avg_confidence is not None:
cv2.putText(frame, f"Avg Conf: {avg_confidence:.0%}", (200, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
# Separator line
cv2.line(frame, (20, 100), (360, 100), (100, 100, 100), 1)
# Class counts
if detection_counts:
cv2.putText(frame, "Detected Parts:", (20, 125),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
y_offset = 150
for class_name, count in sorted(detection_counts.items()):
# Color code by count
color = (0, 255, 0) if count >= 3 else (255, 255, 0) if count >= 2 else (255, 165, 0)
text = f" {class_name}: {count}"
cv2.putText(frame, text, (20, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
y_offset += 25
else:
cv2.putText(frame, "No detections", (20, 125),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (150, 150, 150), 1)
return frame
# ═════════════════════════════════════════════════════════════════════════════
# Live Detection Engine
# ═════════════════════════════════════════════════════════════════════════════
class LiveDetectionEngine:
"""Optimized live detection with threading"""
def __init__(self):
self.running = False
self.frame_queue = Queue(maxsize=2)
self.result_queue = Queue(maxsize=2)
self.detection_thread = None
self.stats = DetectionStats()
def process_frame(self, frame: np.ndarray, conf_threshold: float,
apply_clahe: bool) -> Tuple[np.ndarray, dict]:
"""Process a single frame"""
if model is None:
return frame, {}
# Apply CLAHE
if apply_clahe:
processed_frame = clahe_processor.process(frame)
else:
processed_frame = frame
# YOLO inference
results = model.predict(
source=processed_frame,
conf=conf_threshold,
iou=Config.DEFAULT_IOU_THRESHOLD,
imgsz=Config.DEFAULT_IMGSZ,
verbose=False,
)
# Get annotated image
annotated = results[0].plot()
annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)
# Extract detection info
boxes = results[0].boxes
class_counts = {}
confidences = []
for box in boxes:
class_id = int(box.cls[0])
class_name = model.names[class_id]
class_counts[class_name] = class_counts.get(class_name, 0) + 1
confidences.append(float(box.conf[0]))
# Update stats
self.stats.update(class_counts, confidences)
detection_info = {
'class_counts': class_counts,
'total': len(boxes),
'confidences': confidences,
'avg_confidence': sum(confidences) / len(confidences) if confidences else None
}
return annotated_rgb, detection_info
# Global detection engine
detection_engine = LiveDetectionEngine()
# ═════════════════════════════════════════════════════════════════════════════
# Live Camera Detection
# ═════════════════════════════════════════════════════════════════════════════
def live_detection_stream(conf_threshold: float = 0.25,
apply_clahe: bool = True,
process_every_n_frames: int = 1,
show_advanced_stats: bool = False) -> Generator:
"""
Generator for live camera detection.
"""
fps_counter = FPSCounter()
frame_count = 0
last_detection_info = {}
last_annotated = None
# Open webcam
cap = cv2.VideoCapture(0)
# Set camera properties
cap.set(cv2.CAP_PROP_FRAME_WIDTH, Config.DEFAULT_CAMERA_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, Config.DEFAULT_CAMERA_HEIGHT)
cap.set(cv2.CAP_PROP_FPS, Config.DEFAULT_FPS)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Minimize buffer for real-time
if not cap.isOpened():
error_frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(error_frame, "❌ Camera not available", (120, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.putText(error_frame, "Please check camera permissions", (100, 280),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
yield error_frame
return
print("🎥 Live detection started")
try:
while True:
ret, frame = cap.read()
if not ret:
print("⚠️ Failed to read frame")
break
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_count += 1
# Process every Nth frame
if frame_count % process_every_n_frames == 0:
annotated_frame, detection_info = detection_engine.process_frame(
frame_rgb, conf_threshold, apply_clahe
)
last_annotated = annotated_frame
last_detection_info = detection_info
else:
# Reuse last detection
annotated_frame = last_annotated if last_annotated is not None else frame_rgb
detection_info = last_detection_info
# Update FPS
fps_counter.update()
fps = fps_counter.get_fps()
# Draw overlay
final_frame = draw_info_overlay(
annotated_frame,
fps,
detection_info.get('class_counts', {}),
detection_info.get('total', 0),
conf_threshold,
detection_info.get('avg_confidence')
)
# Add advanced stats if enabled
if show_advanced_stats:
avg_det = detection_engine.stats.get_average_detections_per_frame()
most_common = detection_engine.stats.get_most_common_class()
y_pos = final_frame.shape[0] - 60
cv2.putText(final_frame, f"Avg Det/Frame: {avg_det:.1f}", (10, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (150, 150, 150), 1)
if most_common:
cv2.putText(final_frame, f"Most Common: {most_common}", (10, y_pos + 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (150, 150, 150), 1)
yield final_frame
finally:
cap.release()
print("🛑 Live detection stopped")
# ═════════════════════════════════════════════════════════════════════════════
# Gradio Interface
# ═════════════════════════════════════════════════════════════════════════════
CSS = """
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600;700&display=swap');
body {
font-family: 'Inter', sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}
.gr-button-primary {
background: linear-gradient(90deg, #4CAF50 0%, #45a049 100%) !important;
border: none !important;
font-weight: 600 !important;
}
.gr-button-secondary {
background: linear-gradient(90deg, #2196F3 0%, #1976D2 100%) !important;
border: none !important;
}
.gradio-container {
max-width: 1400px !important;
}
"""
with gr.Blocks(css=CSS, title="Engine Part Detector - Live", theme=gr.themes.Soft()) as demo:
gr.HTML("""
⚙️ Engine Part Detection System
YOLOv8 with Live Camera Feed & CLAHE Shadow Recovery
""")
with gr.Tabs():
# ═════════════════════════════════════════════════════════════════
# TAB 1: Live Camera Detection
# ═════════════════════════════════════════════════════════════════
with gr.Tab("🎥 Live Camera Detection", id="live"):
gr.Markdown("""
### 🔴 Real-time engine part detection from your camera
**Instructions:** Adjust settings below and the camera will start automatically. The feed shows FPS, detection counts, and confidence scores in real-time.
""")
with gr.Row():
with gr.Column(scale=2):
# Camera feed
live_video = gr.Image(
label="Live Camera Feed with Detections",
streaming=True,
show_label=True,
height=540
)
with gr.Column(scale=1):
gr.Markdown("### ⚙️ Detection Settings")
live_conf_slider = gr.Slider(
minimum=0.1,
maximum=0.9,
value=0.25,
step=0.05,
label="🎯 Confidence Threshold",
info="Lower = more sensitive (may detect false positives)"
)
live_clahe_checkbox = gr.Checkbox(
value=True,
label="✨ CLAHE Shadow Recovery",
info="Enhances details in shadowed areas (recommended)"
)
frame_skip_slider = gr.Slider(
minimum=1,
maximum=5,
value=1,
step=1,
label="⚡ Frame Skip (Performance)",
info="Process every Nth frame (1=all, 3=every 3rd)"
)
advanced_stats_checkbox = gr.Checkbox(
value=False,
label="📊 Show Advanced Statistics",
info="Display average detections and most common class"
)
gr.Markdown("---")
gr.Markdown("### 📈 Performance Guide")
gr.Markdown("""
**Target FPS:** 20-30 fps (green)
- **<10 fps (red):** Increase frame skip to 2-3
- **10-20 fps (orange):** Optimal for most use cases
- **>20 fps (green):** Excellent performance
**Tips:**
- Frame skip 2-3 = ~2x faster
- Disable CLAHE = ~10% faster
- Lower camera resolution = faster
""")
gr.Markdown("### 🎨 Info Overlay Legend")
gr.Markdown("""
- **FPS:** Frames processed per second
- **Objects:** Total parts detected in frame
- **Threshold:** Current confidence cutoff
- **Avg Conf:** Average confidence of detections
- **Part Counts:** Color-coded by quantity
- 🟢 Green: 3+ detected
- 🟡 Yellow: 2 detected
- 🟠 Orange: 1 detected
""")
# Start live detection
live_video.stream(
fn=live_detection_stream,
inputs=[live_conf_slider, live_clahe_checkbox, frame_skip_slider, advanced_stats_checkbox],
outputs=live_video,
stream_every=0.05, # 20 FPS max update rate
time_limit=Config.MAX_STREAM_TIME,
)
# ═════════════════════════════════════════════════════════════════
# TAB 2: Single Image Detection
# ═════════════════════════════════════════════════════════════════
with gr.Tab("📷 Single Image Detection", id="single"):
gr.Markdown("### Upload an image or capture from webcam for batch detection")
with gr.Row():
with gr.Column(scale=1):
input_image = gr.Image(
sources=["upload", "webcam"],
type="numpy",
label="Upload or Capture Image"
)
conf_slider = gr.Slider(
minimum=0.1,
maximum=0.9,
value=0.25,
step=0.05,
label="🎯 Confidence Threshold"
)
clahe_checkbox = gr.Checkbox(
value=True,
label="✨ Apply CLAHE Preprocessing"
)
detect_btn = gr.Button("🔍 Detect Parts", variant="primary", size="lg")
with gr.Column(scale=1):
output_image = gr.Image(label="Detection Results")
output_text = gr.Markdown(label="Detection Summary")
detect_btn.click(
fn=detect_engine_parts,
inputs=[input_image, conf_slider, clahe_checkbox],
outputs=[output_image, output_text],
)
# ═════════════════════════════════════════════════════════════════════
# Help & Info Section
# ═════════════════════════════════════════════════════════════════════
gr.HTML("""
ℹ️ How to Use This System
🎥 Live Camera Mode
- Switch to "Live Camera Detection" tab
- Allow camera access when prompted
- Adjust confidence threshold (0.25 recommended)
- Enable CLAHE for better shadow handling
- Use frame skip for performance optimization
- Watch real-time detections with FPS overlay
📷 Single Image Mode
- Switch to "Single Image Detection" tab
- Upload image or capture from webcam
- Adjust confidence threshold if needed
- Enable CLAHE for shadowed images
- Click "Detect Parts" button
- View detailed results and summary
💡 Pro Tips:
- Lighting: Ensure good lighting for best results. CLAHE helps with shadows but bright, even lighting is ideal.
- Distance: Keep parts 30-100cm from camera for optimal detection.
- Performance: If FPS is low, increase frame skip to 2 or 3.
- Accuracy: Lower confidence threshold (0.15-0.20) for difficult cases, raise it (0.30-0.40) to reduce false positives.
⚠️ Troubleshooting:
- Camera not working: Check browser permissions (chrome://settings/content/camera)
- Low FPS: Increase frame skip, reduce camera resolution, or disable CLAHE
- No detections: Lower confidence threshold or improve lighting
- Too many false positives: Increase confidence threshold
Model: YOLOv8 | Classes: Based on your training data | Performance: 15-30 FPS typical
""")
# ═════════════════════════════════════════════════════════════════════════════
# Launch
# ═════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("\n" + "="*60)
print("🚀 Starting Engine Part Detection System")
print("="*60)
print(f"📦 Model: {Config.MODEL_PATH}")
print(f"🎥 Camera Resolution: {Config.DEFAULT_CAMERA_WIDTH}x{Config.DEFAULT_CAMERA_HEIGHT}")
print(f"🎯 Default Confidence: {Config.DEFAULT_CONF_THRESHOLD}")
print(f"⚡ CLAHE Enabled: Yes")
print("="*60 + "\n")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
)
# import gradio as gr
# import cv2
# import numpy as np
# from PIL import Image
# from ultralytics import YOLO
# import os
# import time
# from collections import deque
# from typing import Optional, Tuple
# import threading
# # ═════════════════════════════════════════════════════════════════════════════
# # Load YOLO Model
# # ═════════════════════════════════════════════════════════════════════════════
# MODEL_PATH = "best (1).pt"
# try:
# model = YOLO(MODEL_PATH)
# print(f"✅ Model loaded: {MODEL_PATH}")
# print(f"📊 Classes: {model.names}")
# except Exception as e:
# print(f"❌ Error loading model: {e}")
# model = None
# # ═════════════════════════════════════════════════════════════════════════════
# # CLAHE Preprocessing (Shadow Recovery)
# # ═════════════════════════════════════════════════════════════════════════════
# def apply_clahe(image: np.ndarray, clip_limit: float = 2.0) -> np.ndarray:
# """Apply CLAHE preprocessing for shadow recovery."""
# if image.dtype != np.uint8:
# image = np.clip(image, 0, 255).astype(np.uint8)
# # LAB color space - only enhance L channel
# bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
# lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB)
# l, a, b = cv2.split(lab)
# # CLAHE on L channel
# clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8))
# l_enhanced = clahe.apply(l)
# # Merge back
# lab_enhanced = cv2.merge([l_enhanced, a, b])
# bgr_enhanced = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR)
# rgb_enhanced = cv2.cvtColor(bgr_enhanced, cv2.COLOR_BGR2RGB)
# return rgb_enhanced
# # ═════════════════════════════════════════════════════════════════════════════
# # FPS Counter
# # ═════════════════════════════════════════════════════════════════════════════
# class FPSCounter:
# """Track FPS with moving average"""
# def __init__(self, buffer_size=30):
# self.frame_times = deque(maxlen=buffer_size)
# self.last_time = time.time()
# def update(self):
# current_time = time.time()
# self.frame_times.append(current_time - self.last_time)
# self.last_time = current_time
# def get_fps(self) -> float:
# if len(self.frame_times) == 0:
# return 0.0
# return 1.0 / (sum(self.frame_times) / len(self.frame_times))
# # ═════════════════════════════════════════════════════════════════════════════
# # Detection Functions
# # ═════════════════════════════════════════════════════════════════════════════
# def detect_engine_parts(image, conf_threshold=0.25, apply_clahe_preprocessing=True):
# """
# Detect engine parts with YOLO (single image).
# Args:
# image: Input image (PIL or numpy)
# conf_threshold: Confidence threshold (0-1)
# apply_clahe_preprocessing: Whether to apply CLAHE before detection
# Returns:
# annotated_image: Image with bounding boxes
# results_text: Detection summary
# """
# if model is None:
# return image, "❌ Model not loaded. Please check the model file."
# # Convert to numpy array
# if isinstance(image, Image.Image):
# image = np.array(image)
# if image is None:
# return None, "❌ No image provided"
# # Apply CLAHE preprocessing
# if apply_clahe_preprocessing:
# image = apply_clahe(image)
# # YOLO inference
# results = model.predict(
# source=image,
# conf=conf_threshold,
# iou=0.45,
# imgsz=640,
# verbose=False,
# )
# # Get annotated image
# annotated = results[0].plot() # BGR format
# annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)
# # Extract detection info
# boxes = results[0].boxes
# if len(boxes) == 0:
# summary = f"**No detections** (threshold: {conf_threshold:.0%})"
# else:
# summary = f"**Detected {len(boxes)} object(s):**\n\n"
# # Group by class
# class_counts = {}
# for box in boxes:
# class_id = int(box.cls[0])
# class_name = model.names[class_id]
# class_counts[class_name] = class_counts.get(class_name, 0) + 1
# # Summary by class
# for class_name, count in sorted(class_counts.items()):
# summary += f"• **{class_name}**: {count}\n"
# summary += f"\n**Details:**\n"
# # Individual detections
# for i, box in enumerate(boxes, 1):
# x1, y1, x2, y2 = box.xyxy[0].tolist()
# conf = float(box.conf[0])
# class_id = int(box.cls[0])
# class_name = model.names[class_id]
# summary += f"{i}. **{class_name}** — {conf:.2%} confidence\n"
# return annotated_rgb, summary
# def draw_info_overlay(frame: np.ndarray, fps: float, detection_counts: dict,
# total_detections: int, conf_threshold: float) -> np.ndarray:
# """Draw FPS and detection info overlay on frame"""
# overlay = frame.copy()
# height, width = frame.shape[:2]
# # Semi-transparent background for text
# cv2.rectangle(overlay, (10, 10), (350, 150), (0, 0, 0), -1)
# cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)
# # FPS
# cv2.putText(frame, f"FPS: {fps:.1f}", (20, 35),
# cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# # Total detections
# cv2.putText(frame, f"Total: {total_detections}", (20, 60),
# cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# # Confidence threshold
# cv2.putText(frame, f"Conf: {conf_threshold:.0%}", (20, 85),
# cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# # Class counts
# y_offset = 110
# for class_name, count in sorted(detection_counts.items()):
# text = f"{class_name}: {count}"
# cv2.putText(frame, text, (20, y_offset),
# cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 200, 0), 2)
# y_offset += 25
# return frame
# def process_video_frame(frame: np.ndarray, conf_threshold: float,
# apply_clahe: bool, fps_counter: FPSCounter) -> Tuple[np.ndarray, dict]:
# """
# Process a single video frame for detection.
# Returns:
# annotated_frame: Frame with detections drawn
# detection_info: Dictionary with detection counts
# """
# if model is None:
# return frame, {}
# # Apply CLAHE preprocessing
# if apply_clahe:
# processed_frame = apply_clahe(frame)
# else:
# processed_frame = frame
# # YOLO inference
# results = model.predict(
# source=processed_frame,
# conf=conf_threshold,
# iou=0.45,
# imgsz=640,
# verbose=False,
# stream=False,
# )
# # Get annotated image
# annotated = results[0].plot() # BGR format
# annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)
# # Count detections by class
# boxes = results[0].boxes
# class_counts = {}
# total_count = 0
# for box in boxes:
# class_id = int(box.cls[0])
# class_name = model.names[class_id]
# class_counts[class_name] = class_counts.get(class_name, 0) + 1
# total_count += 1
# # Update FPS
# fps_counter.update()
# fps = fps_counter.get_fps()
# # Draw overlay
# annotated_rgb = draw_info_overlay(
# annotated_rgb,
# fps,
# class_counts,
# total_count,
# conf_threshold
# )
# detection_info = {
# 'class_counts': class_counts,
# 'total': total_count,
# 'fps': fps
# }
# return annotated_rgb, detection_info
# # ═════════════════════════════════════════════════════════════════════════════
# # Live Camera Detection (Generator Function for Gradio)
# # ═════════════════════════════════════════════════════════════════════════════
# def live_detection_stream(conf_threshold: float = 0.25,
# apply_clahe: bool = True,
# process_every_n_frames: int = 1):
# """
# Generator function for live camera detection with Gradio.
# Args:
# conf_threshold: Detection confidence threshold
# apply_clahe: Whether to apply CLAHE preprocessing
# process_every_n_frames: Process every Nth frame (1 = all frames, 2 = every other frame)
# Yields:
# annotated_frame: Frame with detections and info overlay
# """
# fps_counter = FPSCounter()
# frame_count = 0
# last_detections = {}
# last_annotated = None
# # Open webcam
# cap = cv2.VideoCapture(0)
# # Set camera properties for better performance
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
# cap.set(cv2.CAP_PROP_FPS, 30)
# if not cap.isOpened():
# # Return error frame
# error_frame = np.zeros((480, 640, 3), dtype=np.uint8)
# cv2.putText(error_frame, "Camera not available", (150, 240),
# cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# yield error_frame
# return
# try:
# while True:
# ret, frame = cap.read()
# if not ret:
# break
# # Convert BGR to RGB
# frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# frame_count += 1
# # Process every Nth frame for performance
# if frame_count % process_every_n_frames == 0:
# annotated_frame, detection_info = process_video_frame(
# frame_rgb,
# conf_threshold,
# apply_clahe,
# fps_counter
# )
# last_annotated = annotated_frame
# last_detections = detection_info
# else:
# # Reuse last detection but update FPS
# if last_annotated is not None:
# fps_counter.update()
# annotated_frame = draw_info_overlay(
# frame_rgb,
# fps_counter.get_fps(),
# last_detections.get('class_counts', {}),
# last_detections.get('total', 0),
# conf_threshold
# )
# else:
# annotated_frame = frame_rgb
# yield annotated_frame
# finally:
# cap.release()
# # ═════════════════════════════════════════════════════════════════════════════
# # Gradio Interface
# # ═════════════════════════════════════════════════════════════════════════════
# CSS = """
# @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600&display=swap');
# body { font-family: 'Inter', sans-serif; }
# .gr-button-primary { background: linear-gradient(90deg, #4CAF50 0%, #45a049 100%) !important; }
# .gr-button-secondary { background: linear-gradient(90deg, #2196F3 0%, #1976D2 100%) !important; }
# """
# with gr.Blocks(css=CSS, title="Engine Part Detector") as demo:
# gr.HTML("""
#
#
⚙️ Engine Part Detection System
#
# YOLOv8 object detection with CLAHE shadow recovery preprocessing
#
#
# """)
# with gr.Tabs():
# # ═════════════════════════════════════════════════════════════════
# # TAB 1: Single Image Detection
# # ═════════════════════════════════════════════════════════════════
# with gr.Tab("📷 Single Image Detection"):
# gr.Markdown("### Upload an image or capture from webcam for one-time detection")
# with gr.Row():
# with gr.Column(scale=1):
# input_image = gr.Image(
# sources=["upload", "webcam"],
# type="numpy",
# label="Upload Engine Part Image"
# )
# with gr.Row():
# conf_slider = gr.Slider(
# minimum=0.1,
# maximum=0.9,
# value=0.25,
# step=0.05,
# label="Confidence Threshold",
# info="Lower = more detections (may include false positives)"
# )
# clahe_checkbox = gr.Checkbox(
# value=True,
# label="Apply CLAHE Preprocessing",
# info="Recovers details in shadowed areas (recommended)"
# )
# detect_btn = gr.Button("🔍 Detect Parts", variant="primary", size="lg")
# with gr.Column(scale=1):
# output_image = gr.Image(label="Detection Results")
# output_text = gr.Markdown(label="Summary")
# # Wire up the button
# detect_btn.click(
# fn=detect_engine_parts,
# inputs=[input_image, conf_slider, clahe_checkbox],
# outputs=[output_image, output_text],
# )
# # ═════════════════════════════════════════════════════════════════
# # TAB 2: Live Camera Detection
# # ═════════════════════════════════════════════════════════════════
# with gr.Tab("🎥 Live Camera Detection"):
# gr.Markdown("""
# ### Real-time engine part detection from your camera
# **Note:** Click "Start Detection" to begin, and "Stop" to end the stream.
# """)
# with gr.Row():
# with gr.Column(scale=1):
# # Camera feed
# live_video = gr.Image(
# label="Live Camera Feed",
# streaming=True,
# show_label=True,
# height=480
# )
# with gr.Column(scale=1):
# gr.Markdown("### ⚙️ Detection Settings")
# live_conf_slider = gr.Slider(
# minimum=0.1,
# maximum=0.9,
# value=0.25,
# step=0.05,
# label="Confidence Threshold",
# info="Adjust sensitivity"
# )
# live_clahe_checkbox = gr.Checkbox(
# value=True,
# label="Apply CLAHE Preprocessing",
# info="Better results in shadows"
# )
# frame_skip_slider = gr.Slider(
# minimum=1,
# maximum=5,
# value=1,
# step=1,
# label="Process Every N Frames",
# info="Higher = faster but less smooth (1 = process all frames)"
# )
# gr.Markdown("### 📊 Performance Tips")
# gr.Markdown("""
# - **Process Every N Frames**: Set to 2-3 for better performance
# - **Confidence**: Lower threshold = more detections
# - **CLAHE**: Slight performance cost but better quality
# """)
# gr.Markdown("### 📈 Info Overlay")
# gr.Markdown("""
# The video shows:
# - **FPS**: Frames per second
# - **Total**: Total objects detected
# - **Conf**: Current confidence threshold
# - **Class counts**: Number of each part type
# """)
# # Start live detection
# live_video.stream(
# fn=live_detection_stream,
# inputs=[live_conf_slider, live_clahe_checkbox, frame_skip_slider],
# outputs=live_video,
# stream_every=0.1, # Update interval in seconds
# time_limit=3600, # 1 hour max
# )
# # ═════════════════════════════════════════════════════════════════════
# # Info Section (Shared)
# # ═════════════════════════════════════════════════════════════════════
# gr.HTML("""
#
#
ℹ️ How to Use
#
📷 Single Image Mode:
#
# - Upload an image of an engine part or use your webcam to capture
# - Adjust confidence threshold (default 0.25 works well)
# - Enable CLAHE preprocessing for better results on shadowed images
# - Click "Detect Parts" to run detection
#
#
🎥 Live Camera Mode:
#
# - Switch to the "Live Camera Detection" tab
# - Adjust settings (confidence, CLAHE, frame skip)
# - The camera will start automatically showing real-time detections
# - View live FPS and detection counts on the video overlay
# - Close the tab or adjust settings to stop
#
#
Supported Classes: Depends on your model training (e.g., bearing_saddle, piston, defect, crack, corrosion)
#
Performance: Live mode processes at 15-30 FPS depending on your hardware. Use "Process Every N Frames" to optimize speed.
#
# """)
# # ═════════════════════════════════════════════════════════════════════════════
# # Launch
# # ═════════════════════════════════════════════════════════════════════════════
# if __name__ == "__main__":
# demo.launch(
# server_name="0.0.0.0",
# server_port=7860,
# share=False,
# )