import gradio as gr import cv2 import numpy as np from PIL import Image from ultralytics import YOLO import os import time from collections import deque from typing import Optional, Tuple, Generator import threading from queue import Queue # ═════════════════════════════════════════════════════════════════════════════ # Configuration # ═════════════════════════════════════════════════════════════════════════════ class Config: """Configuration for the detection system""" MODEL_PATH = "best (3).pt" # Performance settings DEFAULT_CAMERA_WIDTH = 640 DEFAULT_CAMERA_HEIGHT = 480 DEFAULT_FPS = 30 # Detection settings DEFAULT_CONF_THRESHOLD = 0.25 DEFAULT_IOU_THRESHOLD = 0.45 DEFAULT_IMGSZ = 640 # CLAHE settings CLAHE_CLIP_LIMIT = 2.0 CLAHE_TILE_GRID = (8, 8) # Live detection settings FRAME_BUFFER_SIZE = 2 # Number of frames to buffer MAX_STREAM_TIME = 3600 # 1 hour # ═════════════════════════════════════════════════════════════════════════════ # Load YOLO Model # ═════════════════════════════════════════════════════════════════════════════ try: model = YOLO(Config.MODEL_PATH) print(f"✅ Model loaded: {Config.MODEL_PATH}") print(f"📊 Classes: {model.names}") print(f"🔢 Number of classes: {len(model.names)}") except Exception as e: print(f"❌ Error loading model: {e}") model = None # ═════════════════════════════════════════════════════════════════════════════ # CLAHE Preprocessing # ═════════════════════════════════════════════════════════════════════════════ class CLAHEProcessor: """Optimized CLAHE processor with caching""" def __init__(self, clip_limit: float = Config.CLAHE_CLIP_LIMIT, tile_grid: tuple = Config.CLAHE_TILE_GRID): self.clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid) def process(self, image: np.ndarray) -> np.ndarray: """Apply CLAHE preprocessing for shadow recovery.""" if image.dtype != np.uint8: image = np.clip(image, 0, 255).astype(np.uint8) # LAB color space - only enhance L channel bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) # CLAHE on L channel l_enhanced = self.clahe.apply(l) # Merge back lab_enhanced = cv2.merge([l_enhanced, a, b]) bgr_enhanced = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR) rgb_enhanced = cv2.cvtColor(bgr_enhanced, cv2.COLOR_BGR2RGB) return rgb_enhanced # Global CLAHE processor clahe_processor = CLAHEProcessor() # ═════════════════════════════════════════════════════════════════════════════ # FPS Counter # ═════════════════════════════════════════════════════════════════════════════ class FPSCounter: """Track FPS with moving average""" def __init__(self, buffer_size=30): self.frame_times = deque(maxlen=buffer_size) self.last_time = time.time() self.lock = threading.Lock() def update(self): with self.lock: current_time = time.time() self.frame_times.append(current_time - self.last_time) self.last_time = current_time def get_fps(self) -> float: with self.lock: if len(self.frame_times) == 0: return 0.0 return 1.0 / (sum(self.frame_times) / len(self.frame_times)) # ═════════════════════════════════════════════════════════════════════════════ # Detection Statistics # ═════════════════════════════════════════════════════════════════════════════ class DetectionStats: """Track detection statistics over time""" def __init__(self, window_size=100): self.total_frames = 0 self.total_detections = 0 self.class_history = deque(maxlen=window_size) self.confidence_history = deque(maxlen=window_size) self.lock = threading.Lock() def update(self, class_counts: dict, confidences: list): with self.lock: self.total_frames += 1 frame_detections = sum(class_counts.values()) self.total_detections += frame_detections self.class_history.append(class_counts) if confidences: avg_conf = sum(confidences) / len(confidences) self.confidence_history.append(avg_conf) def get_average_detections_per_frame(self) -> float: with self.lock: if self.total_frames == 0: return 0.0 return self.total_detections / self.total_frames def get_most_common_class(self) -> Optional[str]: with self.lock: all_classes = {} for frame_counts in self.class_history: for class_name, count in frame_counts.items(): all_classes[class_name] = all_classes.get(class_name, 0) + count if not all_classes: return None return max(all_classes, key=all_classes.get) # ═════════════════════════════════════════════════════════════════════════════ # Single Image Detection # ═════════════════════════════════════════════════════════════════════════════ def detect_engine_parts(image, conf_threshold=0.25, apply_clahe_preprocessing=True): """ Detect engine parts with YOLO (single image). """ if model is None: return image, "❌ Model not loaded. Please check the model file." if isinstance(image, Image.Image): image = np.array(image) if image is None: return None, "❌ No image provided" # Apply CLAHE preprocessing if apply_clahe_preprocessing: image = clahe_processor.process(image) # YOLO inference results = model.predict( source=image, conf=conf_threshold, iou=Config.DEFAULT_IOU_THRESHOLD, imgsz=Config.DEFAULT_IMGSZ, verbose=False, ) # Get annotated image annotated = results[0].plot() annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) # Extract detection info boxes = results[0].boxes if len(boxes) == 0: summary = f"**No detections** (threshold: {conf_threshold:.0%})" else: summary = f"**Detected {len(boxes)} object(s):**\n\n" # Group by class class_counts = {} for box in boxes: class_id = int(box.cls[0]) class_name = model.names[class_id] class_counts[class_name] = class_counts.get(class_name, 0) + 1 # Summary by class for class_name, count in sorted(class_counts.items()): summary += f"• **{class_name}**: {count}\n" summary += f"\n**Details:**\n" # Individual detections for i, box in enumerate(boxes, 1): conf = float(box.conf[0]) class_id = int(box.cls[0]) class_name = model.names[class_id] summary += f"{i}. **{class_name}** — {conf:.2%} confidence\n" return annotated_rgb, summary # ═════════════════════════════════════════════════════════════════════════════ # Visualization # ═════════════════════════════════════════════════════════════════════════════ def draw_info_overlay(frame: np.ndarray, fps: float, detection_counts: dict, total_detections: int, conf_threshold: float, avg_confidence: Optional[float] = None) -> np.ndarray: """Draw comprehensive info overlay on frame""" overlay = frame.copy() height, width = frame.shape[:2] # Calculate overlay height based on number of classes num_classes = len(detection_counts) overlay_height = max(180, 120 + num_classes * 25) # Semi-transparent background cv2.rectangle(overlay, (10, 10), (380, overlay_height), (0, 0, 0), -1) cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame) # Header cv2.putText(frame, "LIVE DETECTION", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) # FPS fps_color = (0, 255, 0) if fps > 20 else (255, 165, 0) if fps > 10 else (0, 0, 255) cv2.putText(frame, f"FPS: {fps:.1f}", (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, fps_color, 2) # Total detections cv2.putText(frame, f"Objects: {total_detections}", (150, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Confidence threshold cv2.putText(frame, f"Threshold: {conf_threshold:.0%}", (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) # Average confidence if avg_confidence is not None: cv2.putText(frame, f"Avg Conf: {avg_confidence:.0%}", (200, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) # Separator line cv2.line(frame, (20, 100), (360, 100), (100, 100, 100), 1) # Class counts if detection_counts: cv2.putText(frame, "Detected Parts:", (20, 125), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) y_offset = 150 for class_name, count in sorted(detection_counts.items()): # Color code by count color = (0, 255, 0) if count >= 3 else (255, 255, 0) if count >= 2 else (255, 165, 0) text = f" {class_name}: {count}" cv2.putText(frame, text, (20, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) y_offset += 25 else: cv2.putText(frame, "No detections", (20, 125), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (150, 150, 150), 1) return frame # ═════════════════════════════════════════════════════════════════════════════ # Live Detection Engine # ═════════════════════════════════════════════════════════════════════════════ class LiveDetectionEngine: """Optimized live detection with threading""" def __init__(self): self.running = False self.frame_queue = Queue(maxsize=2) self.result_queue = Queue(maxsize=2) self.detection_thread = None self.stats = DetectionStats() def process_frame(self, frame: np.ndarray, conf_threshold: float, apply_clahe: bool) -> Tuple[np.ndarray, dict]: """Process a single frame""" if model is None: return frame, {} # Apply CLAHE if apply_clahe: processed_frame = clahe_processor.process(frame) else: processed_frame = frame # YOLO inference results = model.predict( source=processed_frame, conf=conf_threshold, iou=Config.DEFAULT_IOU_THRESHOLD, imgsz=Config.DEFAULT_IMGSZ, verbose=False, ) # Get annotated image annotated = results[0].plot() annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) # Extract detection info boxes = results[0].boxes class_counts = {} confidences = [] for box in boxes: class_id = int(box.cls[0]) class_name = model.names[class_id] class_counts[class_name] = class_counts.get(class_name, 0) + 1 confidences.append(float(box.conf[0])) # Update stats self.stats.update(class_counts, confidences) detection_info = { 'class_counts': class_counts, 'total': len(boxes), 'confidences': confidences, 'avg_confidence': sum(confidences) / len(confidences) if confidences else None } return annotated_rgb, detection_info # Global detection engine detection_engine = LiveDetectionEngine() # ═════════════════════════════════════════════════════════════════════════════ # Live Camera Detection # ═════════════════════════════════════════════════════════════════════════════ def live_detection_stream(conf_threshold: float = 0.25, apply_clahe: bool = True, process_every_n_frames: int = 1, show_advanced_stats: bool = False) -> Generator: """ Generator for live camera detection. """ fps_counter = FPSCounter() frame_count = 0 last_detection_info = {} last_annotated = None # Open webcam cap = cv2.VideoCapture(0) # Set camera properties cap.set(cv2.CAP_PROP_FRAME_WIDTH, Config.DEFAULT_CAMERA_WIDTH) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, Config.DEFAULT_CAMERA_HEIGHT) cap.set(cv2.CAP_PROP_FPS, Config.DEFAULT_FPS) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Minimize buffer for real-time if not cap.isOpened(): error_frame = np.zeros((480, 640, 3), dtype=np.uint8) cv2.putText(error_frame, "❌ Camera not available", (120, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) cv2.putText(error_frame, "Please check camera permissions", (100, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1) yield error_frame return print("🎥 Live detection started") try: while True: ret, frame = cap.read() if not ret: print("⚠️ Failed to read frame") break # Convert BGR to RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_count += 1 # Process every Nth frame if frame_count % process_every_n_frames == 0: annotated_frame, detection_info = detection_engine.process_frame( frame_rgb, conf_threshold, apply_clahe ) last_annotated = annotated_frame last_detection_info = detection_info else: # Reuse last detection annotated_frame = last_annotated if last_annotated is not None else frame_rgb detection_info = last_detection_info # Update FPS fps_counter.update() fps = fps_counter.get_fps() # Draw overlay final_frame = draw_info_overlay( annotated_frame, fps, detection_info.get('class_counts', {}), detection_info.get('total', 0), conf_threshold, detection_info.get('avg_confidence') ) # Add advanced stats if enabled if show_advanced_stats: avg_det = detection_engine.stats.get_average_detections_per_frame() most_common = detection_engine.stats.get_most_common_class() y_pos = final_frame.shape[0] - 60 cv2.putText(final_frame, f"Avg Det/Frame: {avg_det:.1f}", (10, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (150, 150, 150), 1) if most_common: cv2.putText(final_frame, f"Most Common: {most_common}", (10, y_pos + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (150, 150, 150), 1) yield final_frame finally: cap.release() print("🛑 Live detection stopped") # ═════════════════════════════════════════════════════════════════════════════ # Gradio Interface # ═════════════════════════════════════════════════════════════════════════════ CSS = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600;700&display=swap'); body { font-family: 'Inter', sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); } .gr-button-primary { background: linear-gradient(90deg, #4CAF50 0%, #45a049 100%) !important; border: none !important; font-weight: 600 !important; } .gr-button-secondary { background: linear-gradient(90deg, #2196F3 0%, #1976D2 100%) !important; border: none !important; } .gradio-container { max-width: 1400px !important; } """ with gr.Blocks(css=CSS, title="Engine Part Detector - Live", theme=gr.themes.Soft()) as demo: gr.HTML("""

⚙️ Engine Part Detection System

YOLOv8 with Live Camera Feed & CLAHE Shadow Recovery

""") with gr.Tabs(): # ═════════════════════════════════════════════════════════════════ # TAB 1: Live Camera Detection # ═════════════════════════════════════════════════════════════════ with gr.Tab("🎥 Live Camera Detection", id="live"): gr.Markdown(""" ### 🔴 Real-time engine part detection from your camera **Instructions:** Adjust settings below and the camera will start automatically. The feed shows FPS, detection counts, and confidence scores in real-time. """) with gr.Row(): with gr.Column(scale=2): # Camera feed live_video = gr.Image( label="Live Camera Feed with Detections", streaming=True, show_label=True, height=540 ) with gr.Column(scale=1): gr.Markdown("### ⚙️ Detection Settings") live_conf_slider = gr.Slider( minimum=0.1, maximum=0.9, value=0.25, step=0.05, label="🎯 Confidence Threshold", info="Lower = more sensitive (may detect false positives)" ) live_clahe_checkbox = gr.Checkbox( value=True, label="✨ CLAHE Shadow Recovery", info="Enhances details in shadowed areas (recommended)" ) frame_skip_slider = gr.Slider( minimum=1, maximum=5, value=1, step=1, label="⚡ Frame Skip (Performance)", info="Process every Nth frame (1=all, 3=every 3rd)" ) advanced_stats_checkbox = gr.Checkbox( value=False, label="📊 Show Advanced Statistics", info="Display average detections and most common class" ) gr.Markdown("---") gr.Markdown("### 📈 Performance Guide") gr.Markdown(""" **Target FPS:** 20-30 fps (green) - **<10 fps (red):** Increase frame skip to 2-3 - **10-20 fps (orange):** Optimal for most use cases - **>20 fps (green):** Excellent performance **Tips:** - Frame skip 2-3 = ~2x faster - Disable CLAHE = ~10% faster - Lower camera resolution = faster """) gr.Markdown("### 🎨 Info Overlay Legend") gr.Markdown(""" - **FPS:** Frames processed per second - **Objects:** Total parts detected in frame - **Threshold:** Current confidence cutoff - **Avg Conf:** Average confidence of detections - **Part Counts:** Color-coded by quantity - 🟢 Green: 3+ detected - 🟡 Yellow: 2 detected - 🟠 Orange: 1 detected """) # Start live detection live_video.stream( fn=live_detection_stream, inputs=[live_conf_slider, live_clahe_checkbox, frame_skip_slider, advanced_stats_checkbox], outputs=live_video, stream_every=0.05, # 20 FPS max update rate time_limit=Config.MAX_STREAM_TIME, ) # ═════════════════════════════════════════════════════════════════ # TAB 2: Single Image Detection # ═════════════════════════════════════════════════════════════════ with gr.Tab("📷 Single Image Detection", id="single"): gr.Markdown("### Upload an image or capture from webcam for batch detection") with gr.Row(): with gr.Column(scale=1): input_image = gr.Image( sources=["upload", "webcam"], type="numpy", label="Upload or Capture Image" ) conf_slider = gr.Slider( minimum=0.1, maximum=0.9, value=0.25, step=0.05, label="🎯 Confidence Threshold" ) clahe_checkbox = gr.Checkbox( value=True, label="✨ Apply CLAHE Preprocessing" ) detect_btn = gr.Button("🔍 Detect Parts", variant="primary", size="lg") with gr.Column(scale=1): output_image = gr.Image(label="Detection Results") output_text = gr.Markdown(label="Detection Summary") detect_btn.click( fn=detect_engine_parts, inputs=[input_image, conf_slider, clahe_checkbox], outputs=[output_image, output_text], ) # ═════════════════════════════════════════════════════════════════════ # Help & Info Section # ═════════════════════════════════════════════════════════════════════ gr.HTML("""

ℹ️ How to Use This System

🎥 Live Camera Mode

  1. Switch to "Live Camera Detection" tab
  2. Allow camera access when prompted
  3. Adjust confidence threshold (0.25 recommended)
  4. Enable CLAHE for better shadow handling
  5. Use frame skip for performance optimization
  6. Watch real-time detections with FPS overlay

📷 Single Image Mode

  1. Switch to "Single Image Detection" tab
  2. Upload image or capture from webcam
  3. Adjust confidence threshold if needed
  4. Enable CLAHE for shadowed images
  5. Click "Detect Parts" button
  6. View detailed results and summary

💡 Pro Tips:

⚠️ Troubleshooting:

Model: YOLOv8 | Classes: Based on your training data | Performance: 15-30 FPS typical

""") # ═════════════════════════════════════════════════════════════════════════════ # Launch # ═════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": print("\n" + "="*60) print("🚀 Starting Engine Part Detection System") print("="*60) print(f"📦 Model: {Config.MODEL_PATH}") print(f"🎥 Camera Resolution: {Config.DEFAULT_CAMERA_WIDTH}x{Config.DEFAULT_CAMERA_HEIGHT}") print(f"🎯 Default Confidence: {Config.DEFAULT_CONF_THRESHOLD}") print(f"⚡ CLAHE Enabled: Yes") print("="*60 + "\n") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, ) # import gradio as gr # import cv2 # import numpy as np # from PIL import Image # from ultralytics import YOLO # import os # import time # from collections import deque # from typing import Optional, Tuple # import threading # # ═════════════════════════════════════════════════════════════════════════════ # # Load YOLO Model # # ═════════════════════════════════════════════════════════════════════════════ # MODEL_PATH = "best (1).pt" # try: # model = YOLO(MODEL_PATH) # print(f"✅ Model loaded: {MODEL_PATH}") # print(f"📊 Classes: {model.names}") # except Exception as e: # print(f"❌ Error loading model: {e}") # model = None # # ═════════════════════════════════════════════════════════════════════════════ # # CLAHE Preprocessing (Shadow Recovery) # # ═════════════════════════════════════════════════════════════════════════════ # def apply_clahe(image: np.ndarray, clip_limit: float = 2.0) -> np.ndarray: # """Apply CLAHE preprocessing for shadow recovery.""" # if image.dtype != np.uint8: # image = np.clip(image, 0, 255).astype(np.uint8) # # LAB color space - only enhance L channel # bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB) # l, a, b = cv2.split(lab) # # CLAHE on L channel # clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8)) # l_enhanced = clahe.apply(l) # # Merge back # lab_enhanced = cv2.merge([l_enhanced, a, b]) # bgr_enhanced = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR) # rgb_enhanced = cv2.cvtColor(bgr_enhanced, cv2.COLOR_BGR2RGB) # return rgb_enhanced # # ═════════════════════════════════════════════════════════════════════════════ # # FPS Counter # # ═════════════════════════════════════════════════════════════════════════════ # class FPSCounter: # """Track FPS with moving average""" # def __init__(self, buffer_size=30): # self.frame_times = deque(maxlen=buffer_size) # self.last_time = time.time() # def update(self): # current_time = time.time() # self.frame_times.append(current_time - self.last_time) # self.last_time = current_time # def get_fps(self) -> float: # if len(self.frame_times) == 0: # return 0.0 # return 1.0 / (sum(self.frame_times) / len(self.frame_times)) # # ═════════════════════════════════════════════════════════════════════════════ # # Detection Functions # # ═════════════════════════════════════════════════════════════════════════════ # def detect_engine_parts(image, conf_threshold=0.25, apply_clahe_preprocessing=True): # """ # Detect engine parts with YOLO (single image). # Args: # image: Input image (PIL or numpy) # conf_threshold: Confidence threshold (0-1) # apply_clahe_preprocessing: Whether to apply CLAHE before detection # Returns: # annotated_image: Image with bounding boxes # results_text: Detection summary # """ # if model is None: # return image, "❌ Model not loaded. Please check the model file." # # Convert to numpy array # if isinstance(image, Image.Image): # image = np.array(image) # if image is None: # return None, "❌ No image provided" # # Apply CLAHE preprocessing # if apply_clahe_preprocessing: # image = apply_clahe(image) # # YOLO inference # results = model.predict( # source=image, # conf=conf_threshold, # iou=0.45, # imgsz=640, # verbose=False, # ) # # Get annotated image # annotated = results[0].plot() # BGR format # annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) # # Extract detection info # boxes = results[0].boxes # if len(boxes) == 0: # summary = f"**No detections** (threshold: {conf_threshold:.0%})" # else: # summary = f"**Detected {len(boxes)} object(s):**\n\n" # # Group by class # class_counts = {} # for box in boxes: # class_id = int(box.cls[0]) # class_name = model.names[class_id] # class_counts[class_name] = class_counts.get(class_name, 0) + 1 # # Summary by class # for class_name, count in sorted(class_counts.items()): # summary += f"• **{class_name}**: {count}\n" # summary += f"\n**Details:**\n" # # Individual detections # for i, box in enumerate(boxes, 1): # x1, y1, x2, y2 = box.xyxy[0].tolist() # conf = float(box.conf[0]) # class_id = int(box.cls[0]) # class_name = model.names[class_id] # summary += f"{i}. **{class_name}** — {conf:.2%} confidence\n" # return annotated_rgb, summary # def draw_info_overlay(frame: np.ndarray, fps: float, detection_counts: dict, # total_detections: int, conf_threshold: float) -> np.ndarray: # """Draw FPS and detection info overlay on frame""" # overlay = frame.copy() # height, width = frame.shape[:2] # # Semi-transparent background for text # cv2.rectangle(overlay, (10, 10), (350, 150), (0, 0, 0), -1) # cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame) # # FPS # cv2.putText(frame, f"FPS: {fps:.1f}", (20, 35), # cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # # Total detections # cv2.putText(frame, f"Total: {total_detections}", (20, 60), # cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # # Confidence threshold # cv2.putText(frame, f"Conf: {conf_threshold:.0%}", (20, 85), # cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # # Class counts # y_offset = 110 # for class_name, count in sorted(detection_counts.items()): # text = f"{class_name}: {count}" # cv2.putText(frame, text, (20, y_offset), # cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 200, 0), 2) # y_offset += 25 # return frame # def process_video_frame(frame: np.ndarray, conf_threshold: float, # apply_clahe: bool, fps_counter: FPSCounter) -> Tuple[np.ndarray, dict]: # """ # Process a single video frame for detection. # Returns: # annotated_frame: Frame with detections drawn # detection_info: Dictionary with detection counts # """ # if model is None: # return frame, {} # # Apply CLAHE preprocessing # if apply_clahe: # processed_frame = apply_clahe(frame) # else: # processed_frame = frame # # YOLO inference # results = model.predict( # source=processed_frame, # conf=conf_threshold, # iou=0.45, # imgsz=640, # verbose=False, # stream=False, # ) # # Get annotated image # annotated = results[0].plot() # BGR format # annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) # # Count detections by class # boxes = results[0].boxes # class_counts = {} # total_count = 0 # for box in boxes: # class_id = int(box.cls[0]) # class_name = model.names[class_id] # class_counts[class_name] = class_counts.get(class_name, 0) + 1 # total_count += 1 # # Update FPS # fps_counter.update() # fps = fps_counter.get_fps() # # Draw overlay # annotated_rgb = draw_info_overlay( # annotated_rgb, # fps, # class_counts, # total_count, # conf_threshold # ) # detection_info = { # 'class_counts': class_counts, # 'total': total_count, # 'fps': fps # } # return annotated_rgb, detection_info # # ═════════════════════════════════════════════════════════════════════════════ # # Live Camera Detection (Generator Function for Gradio) # # ═════════════════════════════════════════════════════════════════════════════ # def live_detection_stream(conf_threshold: float = 0.25, # apply_clahe: bool = True, # process_every_n_frames: int = 1): # """ # Generator function for live camera detection with Gradio. # Args: # conf_threshold: Detection confidence threshold # apply_clahe: Whether to apply CLAHE preprocessing # process_every_n_frames: Process every Nth frame (1 = all frames, 2 = every other frame) # Yields: # annotated_frame: Frame with detections and info overlay # """ # fps_counter = FPSCounter() # frame_count = 0 # last_detections = {} # last_annotated = None # # Open webcam # cap = cv2.VideoCapture(0) # # Set camera properties for better performance # cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # cap.set(cv2.CAP_PROP_FPS, 30) # if not cap.isOpened(): # # Return error frame # error_frame = np.zeros((480, 640, 3), dtype=np.uint8) # cv2.putText(error_frame, "Camera not available", (150, 240), # cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) # yield error_frame # return # try: # while True: # ret, frame = cap.read() # if not ret: # break # # Convert BGR to RGB # frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # frame_count += 1 # # Process every Nth frame for performance # if frame_count % process_every_n_frames == 0: # annotated_frame, detection_info = process_video_frame( # frame_rgb, # conf_threshold, # apply_clahe, # fps_counter # ) # last_annotated = annotated_frame # last_detections = detection_info # else: # # Reuse last detection but update FPS # if last_annotated is not None: # fps_counter.update() # annotated_frame = draw_info_overlay( # frame_rgb, # fps_counter.get_fps(), # last_detections.get('class_counts', {}), # last_detections.get('total', 0), # conf_threshold # ) # else: # annotated_frame = frame_rgb # yield annotated_frame # finally: # cap.release() # # ═════════════════════════════════════════════════════════════════════════════ # # Gradio Interface # # ═════════════════════════════════════════════════════════════════════════════ # CSS = """ # @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600&display=swap'); # body { font-family: 'Inter', sans-serif; } # .gr-button-primary { background: linear-gradient(90deg, #4CAF50 0%, #45a049 100%) !important; } # .gr-button-secondary { background: linear-gradient(90deg, #2196F3 0%, #1976D2 100%) !important; } # """ # with gr.Blocks(css=CSS, title="Engine Part Detector") as demo: # gr.HTML(""" #
#

⚙️ Engine Part Detection System

#

# YOLOv8 object detection with CLAHE shadow recovery preprocessing #

#
# """) # with gr.Tabs(): # # ═════════════════════════════════════════════════════════════════ # # TAB 1: Single Image Detection # # ═════════════════════════════════════════════════════════════════ # with gr.Tab("📷 Single Image Detection"): # gr.Markdown("### Upload an image or capture from webcam for one-time detection") # with gr.Row(): # with gr.Column(scale=1): # input_image = gr.Image( # sources=["upload", "webcam"], # type="numpy", # label="Upload Engine Part Image" # ) # with gr.Row(): # conf_slider = gr.Slider( # minimum=0.1, # maximum=0.9, # value=0.25, # step=0.05, # label="Confidence Threshold", # info="Lower = more detections (may include false positives)" # ) # clahe_checkbox = gr.Checkbox( # value=True, # label="Apply CLAHE Preprocessing", # info="Recovers details in shadowed areas (recommended)" # ) # detect_btn = gr.Button("🔍 Detect Parts", variant="primary", size="lg") # with gr.Column(scale=1): # output_image = gr.Image(label="Detection Results") # output_text = gr.Markdown(label="Summary") # # Wire up the button # detect_btn.click( # fn=detect_engine_parts, # inputs=[input_image, conf_slider, clahe_checkbox], # outputs=[output_image, output_text], # ) # # ═════════════════════════════════════════════════════════════════ # # TAB 2: Live Camera Detection # # ═════════════════════════════════════════════════════════════════ # with gr.Tab("🎥 Live Camera Detection"): # gr.Markdown(""" # ### Real-time engine part detection from your camera # **Note:** Click "Start Detection" to begin, and "Stop" to end the stream. # """) # with gr.Row(): # with gr.Column(scale=1): # # Camera feed # live_video = gr.Image( # label="Live Camera Feed", # streaming=True, # show_label=True, # height=480 # ) # with gr.Column(scale=1): # gr.Markdown("### ⚙️ Detection Settings") # live_conf_slider = gr.Slider( # minimum=0.1, # maximum=0.9, # value=0.25, # step=0.05, # label="Confidence Threshold", # info="Adjust sensitivity" # ) # live_clahe_checkbox = gr.Checkbox( # value=True, # label="Apply CLAHE Preprocessing", # info="Better results in shadows" # ) # frame_skip_slider = gr.Slider( # minimum=1, # maximum=5, # value=1, # step=1, # label="Process Every N Frames", # info="Higher = faster but less smooth (1 = process all frames)" # ) # gr.Markdown("### 📊 Performance Tips") # gr.Markdown(""" # - **Process Every N Frames**: Set to 2-3 for better performance # - **Confidence**: Lower threshold = more detections # - **CLAHE**: Slight performance cost but better quality # """) # gr.Markdown("### 📈 Info Overlay") # gr.Markdown(""" # The video shows: # - **FPS**: Frames per second # - **Total**: Total objects detected # - **Conf**: Current confidence threshold # - **Class counts**: Number of each part type # """) # # Start live detection # live_video.stream( # fn=live_detection_stream, # inputs=[live_conf_slider, live_clahe_checkbox, frame_skip_slider], # outputs=live_video, # stream_every=0.1, # Update interval in seconds # time_limit=3600, # 1 hour max # ) # # ═════════════════════════════════════════════════════════════════════ # # Info Section (Shared) # # ═════════════════════════════════════════════════════════════════════ # gr.HTML(""" #
#

ℹ️ How to Use

#

📷 Single Image Mode:

#
    #
  1. Upload an image of an engine part or use your webcam to capture
  2. #
  3. Adjust confidence threshold (default 0.25 works well)
  4. #
  5. Enable CLAHE preprocessing for better results on shadowed images
  6. #
  7. Click "Detect Parts" to run detection
  8. #
#

🎥 Live Camera Mode:

#
    #
  1. Switch to the "Live Camera Detection" tab
  2. #
  3. Adjust settings (confidence, CLAHE, frame skip)
  4. #
  5. The camera will start automatically showing real-time detections
  6. #
  7. View live FPS and detection counts on the video overlay
  8. #
  9. Close the tab or adjust settings to stop
  10. #
#

Supported Classes: Depends on your model training (e.g., bearing_saddle, piston, defect, crack, corrosion)

#

Performance: Live mode processes at 15-30 FPS depending on your hardware. Use "Process Every N Frames" to optimize speed.

#
# """) # # ═════════════════════════════════════════════════════════════════════════════ # # Launch # # ═════════════════════════════════════════════════════════════════════════════ # if __name__ == "__main__": # demo.launch( # server_name="0.0.0.0", # server_port=7860, # share=False, # )