Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| import logging | |
| import os | |
| import tempfile | |
| import uuid | |
| from typing import Optional, List, Dict, Tuple, Union | |
| import io | |
| import requests | |
| import asyncio | |
| import numpy as np | |
| import cloudinary | |
| import cloudinary.uploader | |
| import sys | |
| # Functions for enhanced plastic detection | |
| def detect_beach_scene(img, hsv=None): | |
| """ | |
| Detect if an image contains a beach or water scene. | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: Pre-computed HSV image (optional) | |
| Returns: | |
| Boolean indicating if beach/water is present | |
| """ | |
| if hsv is None: | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| # Check for beach sand colors | |
| sand_mask = cv2.inRange(hsv, np.array([10, 20, 120]), np.array([40, 80, 255])) | |
| # Check for water/ocean colors | |
| water_mask = cv2.inRange(hsv, np.array([80, 40, 40]), np.array([140, 255, 255])) | |
| # Check for sky blue | |
| sky_mask = cv2.inRange(hsv, np.array([90, 30, 170]), np.array([130, 90, 255])) | |
| # Calculate ratios | |
| h, w = img.shape[:2] | |
| total_pixels = h * w | |
| sand_ratio = np.sum(sand_mask > 0) / total_pixels | |
| water_ratio = np.sum(water_mask > 0) / total_pixels | |
| sky_ratio = np.sum(sky_mask > 0) / total_pixels | |
| # Return True if significant beach/water features are present | |
| return (sand_ratio > 0.15) or (water_ratio > 0.15) or (sand_ratio + water_ratio + sky_ratio > 0.4) | |
| def detect_plastic_bottles(img, hsv=None): | |
| """ | |
| Specialized detection for plastic bottles in beach/water scenes. | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: Pre-computed HSV image (optional) | |
| Returns: | |
| List of detected regions with bounding boxes and confidence scores | |
| """ | |
| if hsv is None: | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| # Create masks for different types of plastic bottles | |
| clear_bottle_mask = cv2.inRange(hsv, np.array([0, 0, 120]), np.array([180, 60, 255])) | |
| blue_bottle_mask = cv2.inRange(hsv, np.array([90, 50, 50]), np.array([130, 255, 255])) | |
| # Combine masks | |
| combined_mask = cv2.bitwise_or(clear_bottle_mask, blue_bottle_mask) | |
| # Apply morphological operations to clean up mask | |
| kernel = np.ones((5, 5), np.uint8) | |
| combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_CLOSE, kernel) | |
| combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_OPEN, kernel) | |
| # Find contours | |
| contours, _ = cv2.findContours(combined_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # Filter and process contours | |
| plastic_regions = [] | |
| for contour in contours: | |
| area = cv2.contourArea(contour) | |
| if area < 200: | |
| continue # Skip small regions | |
| x, y, w, h = cv2.boundingRect(contour) | |
| # Skip if aspect ratio doesn't match typical bottles (bottles are taller than wide) | |
| aspect_ratio = w / h if h > 0 else 0 | |
| if not (0.2 < aspect_ratio < 0.8) and h > 30: | |
| continue | |
| # Get region for additional analysis | |
| roi = img[y:y+h, x:x+w] | |
| if roi.size == 0: | |
| continue | |
| # Check shape characteristics | |
| confidence = 0.65 # Base confidence | |
| # If shape is very bottle-like, increase confidence | |
| if 0.25 < aspect_ratio < 0.5 and h > 50: | |
| confidence = 0.85 | |
| plastic_regions.append({ | |
| "bbox": [x, y, x+w, y+h], | |
| "confidence": confidence, | |
| "class": "plastic bottle" | |
| }) | |
| return plastic_regions | |
| def check_for_plastic_bottle(roi, roi_hsv=None): | |
| """ | |
| Check if an image region contains a plastic bottle based on color and shape. | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Boolean indicating if region likely contains a plastic bottle | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False | |
| # Check aspect ratio (bottles are typically taller than wide) | |
| aspect_ratio = w / h | |
| if not (0.2 < aspect_ratio < 0.8): | |
| return False | |
| # Check for clear plastic areas | |
| clear_mask = cv2.inRange(roi_hsv, np.array([0, 0, 120]), np.array([180, 60, 255])) | |
| clear_ratio = np.sum(clear_mask > 0) / (h * w) | |
| # Check for blue bottle cap areas | |
| blue_mask = cv2.inRange(roi_hsv, np.array([90, 50, 50]), np.array([130, 255, 255])) | |
| blue_ratio = np.sum(blue_mask > 0) / (h * w) | |
| # Check for typical bottle colors | |
| plastic_colors_present = (clear_ratio > 0.4) or (blue_ratio > 0.1) | |
| # Convert to grayscale for edge/shape analysis | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Look for edges that could indicate bottle shape | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Check for vertical edges typical in bottles | |
| vertical_edge_count = np.sum(edges > 0) / (h * w) | |
| has_bottle_edges = vertical_edge_count > 0.05 | |
| # Combine checks | |
| return plastic_colors_present and has_bottle_edges | |
| def check_for_plastic_waste(roi, roi_hsv=None): | |
| """ | |
| Check if an image region contains plastic waste based on color and texture. | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Boolean indicating if region likely contains plastic waste | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False | |
| # Check for plastic-like colors | |
| plastic_mask = cv2.inRange(roi_hsv, np.array([0, 0, 100]), np.array([180, 100, 255])) | |
| plastic_ratio = np.sum(plastic_mask > 0) / (h * w) | |
| # Check for bright colors often found in plastic waste | |
| bright_mask = cv2.inRange(roi_hsv, np.array([0, 50, 150]), np.array([180, 255, 255])) | |
| bright_ratio = np.sum(bright_mask > 0) / (h * w) | |
| # Convert to grayscale for texture analysis | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Calculate texture uniformity (plastics often have uniform texture) | |
| std_dev = np.std(gray) | |
| uniform_texture = std_dev < 40 | |
| # Apply combined criteria | |
| is_plastic = (plastic_ratio > 0.3 or bright_ratio > 0.2) and uniform_texture | |
| return is_plastic | |
| def check_for_ship(roi, roi_hsv=None): | |
| """ | |
| Check if an image region contains a ship based on color and shape. | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Boolean indicating if region likely contains a ship | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False | |
| # Ships typically have a horizontal profile | |
| aspect_ratio = w / h | |
| if aspect_ratio < 1.0: # If taller than wide, probably not a ship | |
| return False | |
| # Convert to grayscale for edge detection | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Look for strong horizontal lines (ship deck) | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Find horizontal lines using HoughLines | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=50, minLineLength=w/4, maxLineGap=20) | |
| horizontal_lines = 0 | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) | |
| # Horizontal lines have angles close to 0 or 180 degrees | |
| if angle < 20 or angle > 160: | |
| horizontal_lines += 1 | |
| # Check for metal/ship hull colors | |
| # Ships often have white, gray, black, or blue colors | |
| white_mask = cv2.inRange(roi_hsv, np.array([0, 0, 150]), np.array([180, 30, 255])) | |
| gray_mask = cv2.inRange(roi_hsv, np.array([0, 0, 50]), np.array([180, 30, 150])) | |
| blue_mask = cv2.inRange(roi_hsv, np.array([90, 50, 50]), np.array([130, 255, 255])) | |
| white_ratio = np.sum(white_mask > 0) / (h * w) | |
| gray_ratio = np.sum(gray_mask > 0) / (h * w) | |
| blue_ratio = np.sum(blue_mask > 0) / (h * w) | |
| ship_color_present = (white_ratio + gray_ratio + blue_ratio) > 0.3 | |
| # Combine all criteria - need horizontal lines and ship colors | |
| return horizontal_lines >= 2 and ship_color_present | |
| def detect_general_waste(roi, roi_hsv=None): | |
| """ | |
| General-purpose waste detection for beach and water scenes. | |
| Detects various types of waste including plastics, metal, glass, etc. | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Tuple of (is_waste, waste_type, confidence) | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False, None, 0.0 | |
| # Convert to grayscale for texture analysis | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Calculate texture metrics | |
| std_dev = np.std(gray) | |
| # Detect plastic waste | |
| if check_for_plastic_waste(roi, roi_hsv): | |
| return True, "plastic waste", 0.7 | |
| # Detect plastic bottles specifically | |
| if check_for_plastic_bottle(roi, roi_hsv): | |
| return True, "plastic bottle", 0.85 | |
| # Check for other common waste colors and textures | |
| # Bright unnatural colors | |
| bright_mask = cv2.inRange(roi_hsv, np.array([0, 100, 150]), np.array([180, 255, 255])) | |
| bright_ratio = np.sum(bright_mask > 0) / (h * w) | |
| # Metallic/reflective surfaces | |
| metal_mask = cv2.inRange(roi_hsv, np.array([0, 0, 150]), np.array([180, 40, 220])) | |
| metal_ratio = np.sum(metal_mask > 0) / (h * w) | |
| # Detect regular shape with unnatural color (likely man-made) | |
| edges = cv2.Canny(gray, 50, 150) | |
| edge_ratio = np.sum(edges > 0) / (h * w) | |
| has_straight_edges = False | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=50, minLineLength=20, maxLineGap=10) | |
| if lines is not None and len(lines) > 2: | |
| has_straight_edges = True | |
| # If it has bright unnatural colors and straight edges, likely waste | |
| if bright_ratio > 0.3 and has_straight_edges: | |
| return True, "colored waste", 0.65 | |
| # If it has metallic appearance and straight edges, likely metal waste | |
| if metal_ratio > 0.3 and has_straight_edges: | |
| return True, "metal waste", 0.6 | |
| # If it has uniform texture and straight edges, could be general waste | |
| if std_dev < 35 and has_straight_edges: | |
| return True, "general waste", 0.5 | |
| # Not waste | |
| return False, None, 0.0 | |
| # Initialize logger first | |
| logger = logging.getLogger(__name__) | |
| # Apply the torchvision circular import fix BEFORE any other imports | |
| # This is critical to prevent the "torchvision::nms does not exist" error | |
| try: | |
| # Pre-emptively patch the _meta_registrations module to avoid the circular import | |
| import types | |
| sys.modules['torchvision._meta_registrations'] = types.ModuleType('torchvision._meta_registrations') | |
| sys.modules['torchvision._meta_registrations'].__dict__['register_meta'] = lambda x: lambda y: y | |
| # Now safely import torchvision | |
| import torchvision | |
| import torchvision.ops | |
| logger.info(f"Successfully pre-patched torchvision") | |
| except Exception as e: | |
| logger.warning(f"Failed to pre-patch torchvision: {e}") | |
| # Import our fallback detection module | |
| try: | |
| from . import fallback_detection | |
| HAS_FALLBACK = True | |
| logger.info("Fallback detection module loaded successfully") | |
| except ImportError: | |
| HAS_FALLBACK = False | |
| logger.warning("Fallback detection module not available") | |
| # Initialize logger first | |
| logger = logging.getLogger(__name__) | |
| # Configure environment variables before importing torch | |
| os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128" | |
| # Only import cv2 if available - it might not be in all environments | |
| try: | |
| import cv2 | |
| HAS_CV2 = True | |
| except ImportError: | |
| HAS_CV2 = False | |
| logger.warning("OpenCV (cv2) not available - image processing will be limited") | |
| # First try to import torch to check compatibility | |
| try: | |
| import torch | |
| HAS_TORCH = True | |
| # Force CPU mode if needed | |
| if not torch.cuda.is_available(): | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "-1" | |
| logger.info("CUDA not available, using CPU for inference") | |
| # Check torch version | |
| torch_version = torch.__version__ | |
| logger.info(f"PyTorch version: {torch_version}") | |
| # We already imported torchvision at the top of the file | |
| # Just log the version if available | |
| if 'torchvision' in sys.modules: | |
| logger.info(f"TorchVision version: {torchvision.__version__}") | |
| except ImportError: | |
| HAS_TORCH = False | |
| logger.warning("PyTorch not available - YOLO detection will not work") | |
| # Now try to import YOLO | |
| try: | |
| from ultralytics import YOLO | |
| HAS_YOLO = True | |
| logger.info("Ultralytics YOLO loaded successfully") | |
| except ImportError: | |
| HAS_YOLO = False | |
| logger.warning("Ultralytics YOLO not available - object detection disabled") | |
| # The YOLO model - will be loaded on first use | |
| yolo_model = None | |
| # Custom confidence thresholds | |
| PLASTIC_BOTTLE_CONF_THRESHOLD = 0.01 # Very low threshold to catch all potential bottles | |
| GENERAL_CONF_THRESHOLD = 0.25 # Regular threshold for other objects | |
| # Marine pollution related classes in COCO dataset (for standard YOLOv8) | |
| # These are the indexes we'll filter for when using the standard YOLO model | |
| POLLUTION_RELATED_CLASSES = { | |
| # Primary target - plastic bottles (highest priority) | |
| 39: "plastic bottle", # COCO bottle class - primary target | |
| 40: "glass bottle", # wine glass - also bottles | |
| 41: "plastic cup", # cup - similar to bottles | |
| 44: "plastic bottle", # spoon - often misclassified bottles | |
| # Objects commonly misclassified as bottles or vice versa (high priority) | |
| 1: "possible plastic bottle", # bicycle (sometimes confused with bottles on beaches) | |
| 2: "possible plastic bottle", # car (frequently misclassified bottles on beaches) | |
| 3: "possible plastic waste", # motorcycle (can be confused with debris) | |
| 4: "possible plastic bottle", # airplane (often misidentified with debris/bottles) | |
| 5: "possible plastic bottle", # bus (large plastic items) | |
| 9: "possible plastic bottle", # traffic light (frequently misclassified bottles) | |
| 10: "possible plastic bottle", # fire hydrant (often confused with bottles) | |
| 11: "possible plastic bottle", # stop sign (confused with bottles) | |
| 13: "possible plastic bottle", # bench (often confused with beach debris) | |
| # Vessels and maritime objects (medium-high priority) | |
| 8: "ship", # boat/ship | |
| 9: "ship", # traffic light (sometimes confused with boats) | |
| 90: "ship", # boat | |
| 37: "ship", # sports ball (confused with buoys/small boats) | |
| # General waste and pollution categories (medium priority) | |
| 0: "general waste", # person (can be mistaken for debris at a distance) | |
| 6: "general waste", # train | |
| 7: "general waste", # truck | |
| 15: "marine animal", # bird (can be affected by pollution) | |
| 16: "marine animal", # cat | |
| 17: "marine animal", # dog | |
| 18: "marine animal", # horse | |
| 19: "marine animal", # sheep | |
| 20: "marine animal", # cow | |
| 21: "marine animal", # elephant | |
| 22: "marine animal", # bear | |
| 23: "marine animal", # zebra | |
| 24: "marine animal", # giraffe | |
| 25: "general waste", # backpack | |
| 26: "general waste", # umbrella | |
| 27: "marine debris", # backpack (often washed up on beaches) | |
| 28: "plastic waste", # umbrella (can be beach debris) | |
| 31: "plastic waste", # handbag | |
| 32: "plastic waste", # tie | |
| 33: "plastic waste", # suitcase | |
| # Other plastic/trash items (medium-low priority) | |
| 42: "plastic waste", # fork | |
| 43: "plastic waste", # knife | |
| 45: "plastic waste", # bowl | |
| 46: "plastic waste", # banana (misidentified waste) | |
| 47: "plastic waste", # apple (misidentified waste) | |
| 48: "plastic waste", # sandwich (often packaging) | |
| 49: "plastic waste", # orange (misidentified waste) | |
| 50: "plastic waste", # broccoli | |
| 51: "plastic waste", # carrot | |
| 67: "plastic bag", # plastic bag | |
| 73: "electronic waste",# laptop | |
| 74: "electronic waste",# mouse | |
| 75: "electronic waste",# remote | |
| 76: "electronic waste",# keyboard | |
| 77: "electronic waste",# cell phone | |
| 84: "trash bin", # trash bin | |
| 86: "paper waste" # paper | |
| } | |
| def custom_nms(boxes, scores, iou_threshold=0.5): | |
| """ | |
| Custom implementation of Non-Maximum Suppression. | |
| This is a fallback for when torchvision's NMS operator fails. | |
| Args: | |
| boxes: Bounding boxes in format [x1, y1, x2, y2] | |
| scores: Confidence scores for each box | |
| iou_threshold: IoU threshold for considering boxes as duplicates | |
| Returns: | |
| List of indices of boxes to keep | |
| """ | |
| if len(boxes) == 0: | |
| return [] | |
| # Convert to numpy if they're torch tensors | |
| if HAS_TORCH and isinstance(boxes, torch.Tensor): | |
| boxes = boxes.cpu().numpy() | |
| if HAS_TORCH and isinstance(scores, torch.Tensor): | |
| scores = scores.cpu().numpy() | |
| # Get coordinates and areas | |
| x1 = boxes[:, 0] | |
| y1 = boxes[:, 1] | |
| x2 = boxes[:, 2] | |
| y2 = boxes[:, 3] | |
| area = (x2 - x1) * (y2 - y1) | |
| # Sort by confidence score | |
| indices = np.argsort(scores)[::-1] | |
| keep = [] | |
| while indices.size > 0: | |
| # Pick the box with highest score | |
| i = indices[0] | |
| keep.append(i) | |
| if indices.size == 1: | |
| break | |
| # Calculate IoU of the picked box with the rest | |
| xx1 = np.maximum(x1[i], x1[indices[1:]]) | |
| yy1 = np.maximum(y1[i], y1[indices[1:]]) | |
| xx2 = np.minimum(x2[i], x2[indices[1:]]) | |
| yy2 = np.minimum(y2[i], y2[indices[1:]]) | |
| w = np.maximum(0.0, xx2 - xx1) | |
| h = np.maximum(0.0, yy2 - yy1) | |
| intersection = w * h | |
| # Calculate IoU | |
| iou = intersection / (area[i] + area[indices[1:]] - intersection) | |
| # Keep boxes with IoU less than threshold | |
| indices = indices[1:][iou < iou_threshold] | |
| return keep | |
| def initialize_yolo_model(force_cpu=False): | |
| """ | |
| Initialize YOLO model with appropriate settings based on environment. | |
| Returns the model or None if initialization fails. | |
| Args: | |
| force_cpu: If True, will force CPU inference regardless of CUDA availability | |
| """ | |
| if not HAS_YOLO or not HAS_CV2: | |
| logger.warning("Cannot initialize YOLO: dependencies missing") | |
| return None | |
| try: | |
| # Set environment variables for compatibility | |
| if force_cpu or not torch.cuda.is_available(): | |
| logger.info("Setting YOLO to use CPU mode") | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "-1" | |
| # We've already patched torchvision at the module level, | |
| # but let's double check that the patch is still in place | |
| if 'torchvision._meta_registrations' not in sys.modules: | |
| logger.warning("Torchvision patch not found, reapplying...") | |
| try: | |
| import types | |
| sys.modules['torchvision._meta_registrations'] = types.ModuleType('torchvision._meta_registrations') | |
| sys.modules['torchvision._meta_registrations'].__dict__['register_meta'] = lambda x: lambda y: y | |
| except Exception as import_err: | |
| logger.warning(f"Failed to reapply torchvision patch: {import_err}") | |
| # Configure PyTorch for specific versions | |
| if HAS_TORCH and hasattr(torch, '__version__'): | |
| torch_version = torch.__version__ | |
| # Apply fixes for known version issues | |
| if torch_version.startswith(('1.13', '2.0', '2.1')): | |
| logger.info(f"Applying compatibility fixes for PyTorch {torch_version}") | |
| # Patch for torchvision::nms issue in some versions | |
| if "PYTHONPATH" not in os.environ: | |
| os.environ["PYTHONPATH"] = "" | |
| # Check if custom model exists | |
| if os.path.exists("models/marine_pollution_yolov8.pt"): | |
| # Load with very low confidence threshold to catch all potential bottles | |
| model = YOLO("models/marine_pollution_yolov8.pt") | |
| logger.info("Loaded custom marine pollution YOLO model") | |
| else: | |
| # ALWAYS use YOLOv8x model for deployment - no fallbacks | |
| logger.info("Using YOLOv8x (largest/most accurate model) for production deployment...") | |
| # Only use YOLOv8x for deployment - no fallbacks to smaller models | |
| model_name = "yolov8x.pt" | |
| model_size = "extra large" | |
| # Force the model to be loaded using ultralytics' auto-download | |
| model = None | |
| model_loaded = False | |
| # Only try to load YOLOv8x - this is simpler and ensures we're always using the best model | |
| try: | |
| # Attempt to load the model if it exists or download it if not | |
| logger.info(f"Attempting to load {model_name} ({model_size})...") | |
| # Import YOLO | |
| from ultralytics import YOLO | |
| # Check if model already exists, no need to re-download | |
| model_exists = os.path.exists(model_name) | |
| if model_exists: | |
| logger.info(f"Found existing {model_name}, using it without redownloading") | |
| else: | |
| logger.info(f"Model {model_name} not found, will download it automatically") | |
| # Load the model - this will trigger the download only if the file doesn't exist | |
| model = YOLO(model_name) | |
| # Verify that the model was loaded successfully | |
| if hasattr(model, 'model') and model.model is not None: | |
| logger.info(f"SUCCESS! Loaded {model_name} ({model_size} model)") | |
| model_loaded = True | |
| else: | |
| logger.warning(f"Model {model_name} loaded but verification failed") | |
| except Exception as e: | |
| logger.error(f"Failed to load YOLOv8x model: {str(e)}") | |
| logger.error("This is critical for proper detection. Please check your internet connection and retry.") | |
| # If model failed to load, raise an exception - we need YOLOv8x for proper detection | |
| if not model_loaded: | |
| error_message = "Failed to load YOLOv8x model. This is critical for proper marine pollution detection." | |
| logger.error(error_message) | |
| raise RuntimeError(error_message + " Please check your internet connection and try again.") | |
| # Configure model parameters for marine pollution detection | |
| # Optimize settings based on model size | |
| try: | |
| # Get model info to adjust parameters based on model size | |
| model_type = "" | |
| if hasattr(model, 'info'): | |
| model_info = model.info() | |
| # Handle different return types from model.info() | |
| if isinstance(model_info, dict): | |
| model_type = model_info.get('model_type', '') | |
| elif isinstance(model_info, tuple): | |
| # For newer versions of Ultralytics that return tuples | |
| model_type = str(model_info[0]) if model_info and len(model_info) > 0 else "" | |
| elif hasattr(model_info, 'model_type'): | |
| # For object-based returns | |
| model_type = model_info.model_type | |
| logger.info(f"Configuring model (type: {model_type}) with optimal settings for marine pollution detection") | |
| # Adjust confidence threshold based on model size | |
| # Larger models are more accurate so can use lower confidence threshold | |
| # Safely determine model type from any identifier string | |
| model_type_str = str(model_type).lower() | |
| if 'x' in model_type_str: # YOLOv8x (extra large) | |
| # For the largest model, we can use very low confidence | |
| # as it's much more accurate with fewer false positives | |
| model.conf = 0.15 | |
| model.iou = 0.30 | |
| logger.info("Using optimized parameters for extra large model") | |
| elif 'l' in model_type_str: # YOLOv8l (large) | |
| model.conf = 0.18 | |
| model.iou = 0.32 | |
| logger.info("Using optimized parameters for large model") | |
| elif 'm' in model_type: # YOLOv8m (medium) | |
| model.conf = 0.20 | |
| model.iou = 0.35 | |
| logger.info("Using optimized parameters for medium model") | |
| else: # YOLOv8s or YOLOv8n (small/nano) | |
| model.conf = 0.25 | |
| model.iou = 0.40 | |
| logger.info("Using optimized parameters for small model") | |
| # Common settings for all model sizes | |
| model.verbose = True # Enable detailed logging | |
| model.agnostic_nms = True # Apply class-agnostic NMS for better multi-class detection | |
| model.max_det = 150 # Increase max detections to catch more small objects | |
| # Set fuse=True to optimize model speed without sacrificing accuracy | |
| if hasattr(model, 'fuse'): | |
| model.fuse = True | |
| # Configure for classes that might be plastic debris or marine pollution | |
| # These are COCO classes that could be marine pollution: | |
| # 39: bottle, 41: cup, 44: spoon, 73: laptop, etc. | |
| logger.info(f"YOLO {model_type} model configured successfully with optimized parameters") | |
| # Print model properties to verify configuration | |
| logger.info(f"Model configuration - confidence: {model.conf}, iou threshold: {model.iou}, max detections: {model.max_det}") | |
| except Exception as config_err: | |
| logger.warning(f"Could not configure YOLO parameters: {config_err} - using default settings") | |
| # Fallback to basic configuration | |
| try: | |
| model.conf = 0.25 | |
| model.iou = 0.45 | |
| except: | |
| pass | |
| # Ensure model is in evaluation mode | |
| try: | |
| model.model.eval() | |
| except Exception as e: | |
| logger.warning(f"Could not explicitly set model to eval mode: {e}") | |
| # Test model by running a simple inference to check for NMS errors | |
| try: | |
| # Create a small test image | |
| test_img = np.zeros((100, 100, 3), dtype=np.uint8) | |
| temp_path = tempfile.mktemp(suffix='.jpg') | |
| cv2.imwrite(temp_path, test_img) | |
| # Test inference | |
| logger.info("Testing model with dummy image") | |
| _ = model(temp_path) | |
| os.unlink(temp_path) | |
| logger.info("Model test successful") | |
| except RuntimeError as e: | |
| error_msg = str(e) | |
| if "torchvision::nms" in error_msg: | |
| # NMS operator error detected | |
| logger.warning("NMS operator error detected during test. Will apply fallback solution.") | |
| # If this was already in CPU mode and still failed, we need a different approach | |
| if force_cpu: | |
| logger.error("Model failed even in CPU mode. Manual implementation will be used.") | |
| # We'll continue but use the custom NMS function instead when needed | |
| else: | |
| # Try again with CPU mode forced | |
| logger.info("Retrying with CPU mode forced") | |
| os.unlink(temp_path) | |
| return initialize_yolo_model(force_cpu=True) | |
| elif "Couldn't load custom C++ ops" in error_msg: | |
| # Version incompatibility detected | |
| logger.warning(f"PyTorch/Torchvision version incompatibility detected: {error_msg}") | |
| os.unlink(temp_path) | |
| logger.info("Will use fallback detection methods due to incompatible versions") | |
| return None | |
| else: | |
| raise | |
| except AttributeError as e: | |
| # Handle torchvision circular import errors | |
| if "has no attribute 'extension'" in str(e): | |
| logger.warning(f"Torchvision circular import detected: {e}") | |
| os.unlink(temp_path) | |
| logger.info("Will use fallback detection methods") | |
| return None | |
| else: | |
| raise | |
| except Exception as e: | |
| logger.warning(f"Model test threw exception: {e}") | |
| os.unlink(temp_path) | |
| return model | |
| except Exception as e: | |
| logger.error(f"Failed to initialize YOLO model: {str(e)}") | |
| return None | |
| async def detect_objects_in_image(image_url: str) -> Optional[Dict]: | |
| """ | |
| Detect objects in an image using YOLO model and return detection results. | |
| If successful, returns a dictionary with detection results and annotated image URL. | |
| If failed, returns None or falls back to color-based detection. | |
| """ | |
| if not HAS_CV2: | |
| logger.warning("Object detection disabled: OpenCV not available") | |
| return None | |
| global yolo_model | |
| temp_path = None | |
| try: | |
| # Download the image | |
| image_data = await download_image(image_url) | |
| if not image_data: | |
| logger.error("Failed to download image for object detection") | |
| return None | |
| # Create a temporary file for the image | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: | |
| temp_path = temp_file.name | |
| temp_file.write(image_data) | |
| # First check if YOLO and PyTorch are available | |
| if not HAS_YOLO or not HAS_TORCH: | |
| logger.warning("YOLO or PyTorch not available - using fallback detection") | |
| if HAS_FALLBACK: | |
| logger.info("Using color-based fallback detection method") | |
| return await run_fallback_detection(temp_path) | |
| return None | |
| # Load YOLO model if not already loaded | |
| if yolo_model is None: | |
| logger.info("Initializing YOLO model for object detection") | |
| yolo_model = initialize_yolo_model() | |
| if yolo_model is None: | |
| logger.warning("Failed to initialize YOLO model - using fallback") | |
| if HAS_FALLBACK: | |
| logger.info("Using color-based fallback detection method") | |
| return await run_fallback_detection(temp_path) | |
| return None | |
| # Run inference with error handling and potential retry | |
| logger.info(f"Running YOLO inference on image: {temp_path}") | |
| try: | |
| # Try with default settings | |
| results = yolo_model(temp_path) | |
| except (RuntimeError, AttributeError) as e: | |
| # Handle both NMS operator errors and torchvision circular import errors | |
| error_msg = str(e) | |
| logger.warning(f"YOLO inference error detected: {error_msg}") | |
| # Check for torchvision circular import issue | |
| if "has no attribute 'extension'" in error_msg: | |
| logger.warning("Torchvision circular import detected - using fallback detection") | |
| return await run_fallback_detection(temp_path) | |
| # Check for custom C++ ops loading error (version incompatibility) | |
| if "Couldn't load custom C++ ops" in error_msg: | |
| logger.warning("PyTorch/Torchvision version incompatibility detected - using fallback detection") | |
| return await run_fallback_detection(temp_path) | |
| # Check for NMS operator error | |
| if "torchvision::nms does not exist" in error_msg: | |
| logger.warning("NMS operator error detected - trying workarounds") | |
| # Try to fix circular import issues with torchvision | |
| try: | |
| # First try direct import to fix circular import | |
| import torchvision.ops | |
| import torchvision.models | |
| try: | |
| import torchvision.extension | |
| except ImportError: | |
| # Mock the extension module to avoid circular import | |
| logger.info("Creating mock extension module for torchvision") | |
| sys.modules['torchvision.extension'] = type('', (), {})() | |
| except Exception as import_err: | |
| logger.warning(f"Couldn't resolve torchvision imports: {import_err}") | |
| # Try to reload model with forced CPU mode | |
| try: | |
| # Force CPU mode | |
| # We can access yolo_model directly since it's already declared global at module level | |
| yolo_model = None # Force model reload | |
| yolo_model = initialize_yolo_model(force_cpu=True) | |
| if yolo_model is None: | |
| logger.warning("Failed to reinitialize YOLO model - using fallback detection") | |
| return await run_fallback_detection(temp_path) | |
| # Try inference with reloaded model | |
| logger.info("Retrying with reloaded model in CPU mode") | |
| results = yolo_model(temp_path) | |
| except Exception as e2: | |
| logger.warning(f"CPU mode fallback failed: {str(e2)} - using fallback detection") | |
| return await run_fallback_detection(temp_path) | |
| else: | |
| # For any other error, use the fallback | |
| logger.error(f"Unknown YOLO error: {error_msg} - using fallback detection") | |
| return await run_fallback_detection(temp_path) | |
| # Process results | |
| detections = [] | |
| if results and len(results) > 0: | |
| result = results[0] # Get the first result | |
| # Convert the image to BGR (OpenCV format) | |
| img = cv2.imread(temp_path) | |
| if img is None: | |
| logger.error(f"Failed to read image at {temp_path}") | |
| return None | |
| # Convert to HSV for additional checks | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| img_height, img_width = img.shape[:2] | |
| # Check if this is a beach/water scene | |
| is_beach_scene = detect_beach_scene(img, hsv) | |
| is_water_scene = detect_water_scene(img, hsv) | |
| if is_beach_scene: | |
| logger.info("Beach scene detected - optimizing for beach plastic detection") | |
| if is_water_scene: | |
| logger.info("Water scene detected - optimizing for marine pollution detection") | |
| # STEP 1: Run specialized detection routines first | |
| specialized_detections = [] | |
| # Custom plastic bottle detection | |
| plastic_bottle_regions = [] | |
| if is_beach_scene: | |
| # More aggressive bottle detection for beach scenes | |
| plastic_bottle_regions = detect_plastic_bottles_in_beach(img, hsv) | |
| else: | |
| # Standard bottle detection for all scenes | |
| plastic_bottle_regions = detect_plastic_bottles(img, hsv) | |
| # Add plastic bottle detections | |
| if plastic_bottle_regions: | |
| logger.info(f"Specialized detector found {len(plastic_bottle_regions)} potential plastic bottles") | |
| # Add these detections with high confidence | |
| for region in plastic_bottle_regions: | |
| specialized_detections.append({ | |
| "class": "plastic bottle", | |
| "confidence": region.get("confidence", 0.9), | |
| "bbox": region["bbox"], | |
| "method": "specialized_bottle_detector" | |
| }) | |
| # Ship detection for water scenes | |
| if is_water_scene: | |
| ship_regions = detect_ships(img, hsv) | |
| if ship_regions: | |
| logger.info(f"Specialized detector found {len(ship_regions)} potential ships") | |
| # Add these detections with high confidence | |
| for region in ship_regions: | |
| specialized_detections.append({ | |
| "class": "ship", | |
| "confidence": region.get("confidence", 0.85), | |
| "bbox": region["bbox"], | |
| "method": "specialized_ship_detector" | |
| }) | |
| # Add specialized detections to our main detections list | |
| detections.extend(specialized_detections) | |
| # STEP 2: Process YOLO detections with enhanced classification | |
| # List of problematic classes that are often confused with plastic waste | |
| problematic_classes = ["airplane", "car", "boat", "traffic light", "truck", "bus", "person", "bench", | |
| "backpack", "handbag", "bottle", "cup", "bowl", "chair", "sofa", "box"] | |
| marine_waste_classes = ["bottle", "cup", "plastic", "waste", "debris", "bag", "trash", "container", | |
| "box", "package", "carton", "wrapper"] | |
| ship_classes = ["boat", "ship", "yacht", "vessel", "speedboat", "sailboat", "barge", "tanker"] | |
| # Potentially pollution-related classes from COCO dataset | |
| pollution_coco_ids = [39, 41, 43, 44, 65, 67, 72, 73, 76] # bottle, cup, knife, spoon, remote, cellphone, etc. | |
| # Use extremely low confidence threshold for beach/water scenes | |
| min_confidence = 0.01 if (is_beach_scene or is_water_scene) else GENERAL_CONF_THRESHOLD | |
| # Get all boxes from the results | |
| logger.info(f"Processing {len(result.boxes)} YOLO detections") | |
| # Create a list to track suspicious ROIs for detailed analysis | |
| suspicious_regions = [] | |
| for box in result.boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| confidence = float(box.conf[0]) | |
| class_id = int(box.cls[0]) | |
| # Use even lower confidence threshold for bigger models | |
| # Larger models are more accurate so we can trust lower confidence predictions | |
| try: | |
| if yolo_model is not None and hasattr(yolo_model, 'model') and hasattr(yolo_model.model, 'yaml'): | |
| # Try to get model size from the model name | |
| model_name = str(yolo_model.model.yaml.get('yaml_file', '')) | |
| if 'yolov8x' in model_name.lower(): | |
| min_confidence = 0.003 # Accept even lower confidence detections | |
| elif 'yolov8l' in model_name.lower(): | |
| min_confidence = 0.004 | |
| except Exception: | |
| pass # Use default min_confidence if we can't determine model size | |
| # Skip only extremely low confidence detections | |
| if confidence < min_confidence: | |
| continue | |
| # Add location and size-based confidence boost | |
| # Objects in certain regions are more likely to be relevant | |
| # Calculate relative position and size | |
| img_height, img_width = img.shape[:2] | |
| rel_width = (x2 - x1) / img_width | |
| rel_height = (y2 - y1) / img_height | |
| rel_area = rel_width * rel_height | |
| rel_y_pos = (y1 + y2) / 2 / img_height # Vertical center position | |
| # Boost confidence for objects of appropriate size in water scenes | |
| # Small to medium objects in the water are more likely to be floating debris | |
| if is_water_scene and 0.01 < rel_area < 0.2: | |
| confidence = min(0.99, confidence * 1.25) # 25% boost | |
| # Get class name | |
| if hasattr(result, 'names') and class_id in result.names: | |
| class_name = result.names[class_id] | |
| elif class_id in POLLUTION_RELATED_CLASSES: | |
| class_name = POLLUTION_RELATED_CLASSES[class_id] | |
| else: | |
| class_name = f"class_{class_id}" | |
| # Boost confidence for ships and boats in water scenes | |
| if is_water_scene and any(ship_class in class_name.lower() for ship_class in ship_classes): | |
| confidence = min(0.95, confidence * 1.5) # Boost confidence by 50% | |
| # Boost confidence for waste in beach scenes | |
| if is_beach_scene and any(waste_class in class_name.lower() for waste_class in marine_waste_classes): | |
| confidence = min(0.95, confidence * 1.5) # Boost confidence by 50% | |
| # MAJOR CHANGE: Extremely aggressive reclassification in beach/water scenes | |
| # For beach/water scenes, any object detection might actually be a plastic bottle | |
| if is_beach_scene or is_water_scene: | |
| # Extract ROI for analysis | |
| roi = img[max(0, y1):min(img_height, y2), max(0, x1):min(img_width, x2)] | |
| if roi.size == 0: | |
| continue | |
| # Convert ROI to HSV for plastic detection | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| # First check if this might be a ship in water scenes | |
| is_ship = is_water_scene and check_for_ship(roi, roi_hsv) | |
| # Check for plastic bottle characteristics regardless of class | |
| is_plastic_bottle = check_for_plastic_bottle(roi, roi_hsv) | |
| # Check object shape | |
| object_shape = analyze_object_shape(roi) | |
| # Check for general waste | |
| is_waste, waste_type, waste_confidence = detect_general_waste(roi, roi_hsv) | |
| # Hierarchical classification | |
| if is_ship and is_water_scene: | |
| # Reclassify to ship with high confidence | |
| class_name = "ship" | |
| confidence = 0.9 | |
| logger.info(f"Reclassified {class_id} as ship") | |
| elif class_name.lower() == "airplane" or is_plastic_bottle or object_shape == "bottle-like": | |
| # Reclassify to plastic bottle with high confidence | |
| class_name = "plastic bottle" | |
| confidence = 0.95 | |
| logger.info(f"Reclassified {class_id} as plastic bottle") | |
| elif check_for_plastic_waste(roi, roi_hsv): | |
| # Reclassify to general plastic waste | |
| class_name = "plastic waste" | |
| confidence = 0.85 | |
| logger.info(f"Reclassified {class_id} as general plastic waste") | |
| elif is_waste and waste_confidence > confidence: | |
| # Use the general waste detector result | |
| class_name = waste_type | |
| confidence = waste_confidence | |
| logger.info(f"Reclassified {class_id} as {waste_type}") | |
| # Handle class 39 (bottle) -> always plastic bottle in beach scene | |
| if class_id == 39 or "bottle" in class_name.lower(): | |
| class_name = "plastic bottle" | |
| confidence = 0.98 # Very high confidence | |
| # Context-specific confidence boost for beach scenes | |
| if "plastic" in class_name.lower(): | |
| confidence = min(0.99, confidence * 1.5) # Big confidence boost | |
| # For non-beach scenes, still do smart processing | |
| else: | |
| # Extract ROI for analysis | |
| roi = img[max(0, y1):min(img_height, y2), max(0, x1):min(img_width, x2)] | |
| if roi.size > 0: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| # Check specifically for problematic classes | |
| if class_name.lower() in problematic_classes: | |
| if check_for_plastic_bottle(roi, roi_hsv): | |
| class_name = "plastic bottle" | |
| confidence = 0.8 | |
| elif check_for_plastic_waste(roi, roi_hsv): | |
| class_name = "plastic waste" | |
| confidence = 0.7 | |
| # Skip if not a pollution-related class after all the checks | |
| if not (class_name.lower() in ["plastic bottle", "plastic waste", "bottle"] or | |
| "plastic" in class_name.lower() or | |
| "bottle" in class_name.lower()): | |
| continue | |
| # Add to detections list | |
| detections.append({ | |
| "class": class_name, | |
| "confidence": round(confidence, 3), | |
| "bbox": [x1, y1, x2, y2] | |
| }) | |
| # STEP 3: Merge overlapping detections and remove duplicates | |
| if len(detections) > 1: | |
| detections = merge_overlapping_detections(detections) | |
| # STEP 4: Draw all detections on the image with enhanced visualization | |
| # Add scene information at the top of the image (much smaller text) | |
| scene_info = [] | |
| if is_beach_scene: | |
| scene_info.append("Beach") | |
| if is_water_scene: | |
| scene_info.append("Water") | |
| # Simplified header - just scene and object count, with smaller text | |
| scene_type = ' + '.join(scene_info) if scene_info else 'Unknown' | |
| header_text = f"Scene: {scene_type} | Objects: {len(detections)}" | |
| # Use a semi-transparent overlay instead of solid black | |
| overlay = img.copy() | |
| cv2.rectangle(overlay, (5, 5), (5 + len(header_text) * 4 + 10, 20), (0, 0, 0), -1) | |
| cv2.addWeighted(overlay, 0.6, img, 0.4, 0, img) | |
| # Much smaller text with thinner font | |
| cv2.putText(img, header_text, (10, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| # Use a color mapping for different object types | |
| color_map = { | |
| "plastic bottle": (0, 0, 255), # Red for bottles | |
| "plastic waste": (0, 165, 255), # Orange for general waste | |
| "ship": (255, 0, 0), # Blue for ships | |
| "bottle": (0, 0, 255), # Red for bottles | |
| "waste": (0, 165, 255), # Orange for waste | |
| "debris": (0, 165, 255) # Orange for debris | |
| } | |
| # Define default color and get the model type if available | |
| default_color = (0, 255, 0) # Default green | |
| for det in detections: | |
| x1, y1, x2, y2 = det["bbox"] | |
| class_name = det["class"] | |
| confidence = det["confidence"] | |
| method = det.get("method", "yolo") | |
| # Get color for this detection type | |
| color = color_map.get(class_name.lower(), default_color) | |
| # Adjust thickness based on confidence and detection method | |
| base_thickness = 2 | |
| if confidence > 0.7: | |
| base_thickness += 1 | |
| if method == "specialized_bottle_detector" or method == "specialized_ship_detector": | |
| base_thickness += 1 | |
| # Draw a semi-transparent filled rectangle for the detection area | |
| overlay = img.copy() | |
| cv2.rectangle(overlay, (x1, y1), (x2, y2), color, -1) # Filled rectangle | |
| cv2.addWeighted(overlay, 0.2, img, 0.8, 0, img) # 20% opacity | |
| # Draw the border with appropriate thickness | |
| cv2.rectangle(img, (x1, y1), (x2, y2), color, base_thickness) | |
| # Create background for text | |
| label = f"{class_name}: {confidence:.2f}" | |
| (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(img, (x1, y1 - 25), (x1 + text_width, y1), color, -1) | |
| # Add label with confidence and detection method | |
| cv2.putText(img, label, (x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # Remove duplicate detections (if plastic bottle is detected multiple ways) | |
| if len(detections) > 1: | |
| filtered_detections = [] | |
| boxes = [] | |
| for det in detections: | |
| bbox = det["bbox"] | |
| boxes.append([bbox[0], bbox[1], bbox[2], bbox[3]]) | |
| # Convert to numpy arrays for NMS | |
| boxes = np.array(boxes).astype(np.float32) | |
| scores = np.array([det["confidence"] for det in detections]).astype(np.float32) | |
| try: | |
| # Try to use torchvision NMS if available | |
| if HAS_TORCH and hasattr(torchvision, "ops"): | |
| try: | |
| import torch | |
| boxes_tensor = torch.from_numpy(boxes) | |
| scores_tensor = torch.from_numpy(scores) | |
| keep_indices = torchvision.ops.nms(boxes_tensor, scores_tensor, iou_threshold=0.4).cpu().numpy() | |
| except Exception: | |
| # Fall back to custom NMS | |
| keep_indices = custom_nms(boxes, scores, iou_threshold=0.4) | |
| else: | |
| # Use custom NMS implementation | |
| keep_indices = custom_nms(boxes, scores, iou_threshold=0.4) | |
| # Keep only non-overlapping detections | |
| filtered_detections = [detections[i] for i in keep_indices] | |
| detections = filtered_detections | |
| except Exception as e: | |
| logger.warning(f"NMS failed: {e} - using all detections") | |
| # Save the annotated image | |
| annotated_image_path = f"{temp_path}_annotated.jpg" | |
| cv2.imwrite(annotated_image_path, img) | |
| # Upload the annotated image to Cloudinary | |
| annotated_image_url = await upload_to_cloudinary(annotated_image_path) | |
| # Clean up | |
| try: | |
| os.unlink(annotated_image_path) | |
| except Exception as e: | |
| logger.error(f"Failed to delete temporary annotated image: {e}") | |
| # Record scene type in the response | |
| scene_type = None | |
| if is_beach_scene and is_water_scene: | |
| scene_type = "coastal" | |
| elif is_beach_scene: | |
| scene_type = "beach" | |
| elif is_water_scene: | |
| scene_type = "water" | |
| # Add method information to each detection | |
| for det in detections: | |
| if "method" not in det: | |
| det["method"] = "yolo" | |
| # Get model information for the response | |
| model_info = {} | |
| if yolo_model is not None: | |
| try: | |
| # Handle different return types from model.info() | |
| info_result = yolo_model.info() if hasattr(yolo_model, 'info') else None | |
| # Determine model type | |
| model_type = "unknown" | |
| if isinstance(info_result, dict): | |
| model_type = info_result.get('model_type', 'unknown') | |
| elif isinstance(info_result, tuple) and len(info_result) > 0: | |
| # New versions return tuple: try to extract model info from tuple | |
| model_type = str(info_result[0]) if info_result else 'unknown' | |
| # Try to get model name from file path or model itself | |
| model_name = "YOLOv8" | |
| if hasattr(yolo_model, 'model') and hasattr(yolo_model.model, 'yaml'): | |
| yaml_file = yolo_model.model.yaml.get('yaml_file', '') | |
| if 'yolov8x' in str(yaml_file).lower(): | |
| model_name = "YOLOv8x" | |
| elif 'yolov8l' in str(yaml_file).lower(): | |
| model_name = "YOLOv8l" | |
| elif 'yolov8m' in str(yaml_file).lower(): | |
| model_name = "YOLOv8m" | |
| model_info = { | |
| "model_type": model_type, | |
| "model_name": model_name, | |
| "framework": "YOLOv8", | |
| } | |
| logger.info(f"Using {model_name} model for detection") | |
| except Exception as e: | |
| logger.warning(f"Could not get model info: {e}") | |
| model_info = {"model_type": "unknown", "model_name": "YOLO", "framework": "YOLOv8"} | |
| # Return the results with model information | |
| return { | |
| "detections": detections, | |
| "annotated_image_url": annotated_image_url, | |
| "detection_count": len(detections), | |
| "scene_type": scene_type, | |
| "model_info": model_info # Include model information in the response | |
| } | |
| return {"detections": [], "detection_count": 0, "annotated_image_url": None} | |
| except Exception as e: | |
| logger.error(f"Object detection failed: {e}", exc_info=True) | |
| return None | |
| finally: | |
| # Clean up the temporary file | |
| if temp_path and os.path.exists(temp_path): | |
| try: | |
| os.unlink(temp_path) | |
| logger.info(f"Deleted temporary file: {temp_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to delete temporary file: {e}") | |
| async def download_image(url: str) -> Optional[bytes]: | |
| """Download an image from a URL and return its bytes""" | |
| try: | |
| # Use requests to download the image | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| return response.content | |
| except Exception as e: | |
| logger.error(f"Failed to download image: {e}") | |
| return None | |
| async def run_fallback_detection(image_path: str) -> Dict: | |
| """ | |
| Run the fallback detection when YOLO is not available or fails. | |
| Args: | |
| image_path: Path to the image file | |
| Returns: | |
| Dictionary with detection results | |
| """ | |
| try: | |
| # Use the fallback detection module | |
| if not HAS_FALLBACK: | |
| logger.error("Fallback detection module not available") | |
| return {"detections": [], "detection_count": 0, "annotated_image_url": None} | |
| # Run fallback detection | |
| results = fallback_detection.fallback_detect_objects(image_path) | |
| # If we have a path to an annotated image, upload it | |
| if "annotated_image_path" in results and results["annotated_image_path"]: | |
| try: | |
| annotated_image_url = await upload_to_cloudinary(results["annotated_image_path"]) | |
| results["annotated_image_url"] = annotated_image_url | |
| # Clean up the temporary annotated file | |
| os.unlink(results["annotated_image_path"]) | |
| except Exception as e: | |
| logger.error(f"Failed to upload fallback annotated image: {str(e)}") | |
| logger.info(f"Fallback detection found {results.get('detection_count', 0)} possible objects") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Fallback detection failed: {str(e)}", exc_info=True) | |
| return {"detections": [], "detection_count": 0, "annotated_image_url": None} | |
| def is_beach_scene(img): | |
| """ | |
| Detect if an image shows a beach scene (sand, water, horizon line) | |
| Args: | |
| img: OpenCV image in BGR format | |
| Returns: | |
| Boolean indicating if the image is likely a beach scene | |
| """ | |
| try: | |
| # Convert to HSV for better color segmentation | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| h, w = img.shape[:2] | |
| # Define color ranges for sand/beach | |
| sand_lower = np.array([15, 20, 100]) | |
| sand_upper = np.array([35, 180, 255]) | |
| # Define color ranges for water (blue/green tones) | |
| water_lower = np.array([80, 30, 30]) | |
| water_upper = np.array([140, 255, 255]) | |
| # Create masks for sand and water | |
| sand_mask = cv2.inRange(hsv, sand_lower, sand_upper) | |
| water_mask = cv2.inRange(hsv, water_lower, water_upper) | |
| # Calculate the percentage of sand and water pixels | |
| sand_ratio = np.sum(sand_mask > 0) / (h * w) | |
| water_ratio = np.sum(water_mask > 0) / (h * w) | |
| # Check for horizon line using edge detection | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Apply Hough Line Transform to detect straight horizontal lines | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100, minLineLength=w//3, maxLineGap=20) | |
| has_horizon = False | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) | |
| # Look for horizontal lines (+/- 10 degrees) | |
| if angle < 10 or angle > 170: | |
| # Check if it's in the middle third of the image (typical horizon position) | |
| y_pos = (y1 + y2) / 2 | |
| if h/4 < y_pos < 3*h/4: | |
| has_horizon = True | |
| break | |
| # Consider it a beach if we have significant sand or water AND | |
| # either have both elements OR have a horizon line | |
| return ((sand_ratio > 0.15 or water_ratio > 0.2) and | |
| (sand_ratio + water_ratio > 0.3 or has_horizon)) | |
| except Exception as e: | |
| logger.error(f"Error in beach scene detection: {e}") | |
| return False | |
| def is_water_scene(img): | |
| """ | |
| Detect if an image shows a water scene (ocean, lake, river) | |
| Args: | |
| img: OpenCV image in BGR format | |
| Returns: | |
| Boolean indicating if the image is likely a water scene | |
| """ | |
| try: | |
| # Convert to HSV for better color segmentation | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| h, w = img.shape[:2] | |
| # Define color ranges for water (blue/green tones) | |
| blue_water_lower = np.array([80, 30, 30]) | |
| blue_water_upper = np.array([140, 255, 255]) | |
| # Define color ranges for darker water | |
| dark_water_lower = np.array([80, 10, 10]) | |
| dark_water_upper = np.array([140, 180, 180]) | |
| # Define color ranges for greenish water | |
| green_water_lower = np.array([40, 30, 30]) | |
| green_water_upper = np.array([90, 180, 200]) | |
| # Create masks for different water colors | |
| blue_water_mask = cv2.inRange(hsv, blue_water_lower, blue_water_upper) | |
| dark_water_mask = cv2.inRange(hsv, dark_water_lower, dark_water_upper) | |
| green_water_mask = cv2.inRange(hsv, green_water_lower, green_water_upper) | |
| # Combine masks | |
| water_mask = cv2.bitwise_or(blue_water_mask, dark_water_mask) | |
| water_mask = cv2.bitwise_or(water_mask, green_water_mask) | |
| # Calculate the percentage of water pixels | |
| water_ratio = np.sum(water_mask > 0) / (h * w) | |
| # Check for horizon line using edge detection | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Apply Hough Line Transform to detect straight horizontal lines | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100, minLineLength=w//3, maxLineGap=20) | |
| has_horizon = False | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) | |
| # Look for horizontal lines (+/- 10 degrees) | |
| if angle < 10 or angle > 170: | |
| # Check if it's in the middle third of the image (typical horizon position) | |
| y_pos = (y1 + y2) / 2 | |
| if h/4 < y_pos < 3*h/4: | |
| has_horizon = True | |
| break | |
| # It's a water scene if significant portion is water-colored or has horizon with some water | |
| return water_ratio > 0.3 or (water_ratio > 0.15 and has_horizon) | |
| except Exception as e: | |
| logger.error(f"Error in water scene detection: {e}") | |
| return False | |
| def analyze_object_shape(roi): | |
| """ | |
| Analyze the shape of an object to determine if it looks like a bottle, ship, etc. | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| Returns: | |
| String indicating the likely shape category | |
| """ | |
| try: | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Apply threshold to get binary image | |
| _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| # Find contours | |
| contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # No contours found | |
| if not contours: | |
| return "unknown" | |
| # Use the largest contour | |
| contour = max(contours, key=cv2.contourArea) | |
| # Calculate shape metrics | |
| area = cv2.contourArea(contour) | |
| perimeter = cv2.arcLength(contour, True) | |
| x, y, w, h = cv2.boundingRect(contour) | |
| # Skip if area is too small | |
| if area < 100: | |
| return "unknown" | |
| # Calculate aspect ratio | |
| aspect_ratio = float(w) / h if h > 0 else 0 | |
| # Calculate circularity | |
| circularity = 4 * np.pi * area / (perimeter * perimeter) if perimeter > 0 else 0 | |
| # Calculate extent (ratio of contour area to bounding rectangle area) | |
| extent = float(area) / (w * h) if w * h > 0 else 0 | |
| # Identify shape based on metrics | |
| if 0.2 < aspect_ratio < 0.7 and circularity < 0.8 and extent > 0.4: | |
| return "bottle-like" | |
| elif aspect_ratio > 3 and circularity < 0.3: | |
| return "elongated" # could be floating debris | |
| elif aspect_ratio < 0.3 and circularity < 0.3: | |
| return "tall-thin" # could be standing bottle | |
| elif 0.85 < circularity and extent > 0.7: | |
| return "circular" # could be bottle cap or small debris | |
| elif aspect_ratio > 2 and extent > 0.6: | |
| return "ship-like" # horizontally elongated with high fill ratio | |
| else: | |
| return "irregular" | |
| except Exception as e: | |
| logger.error(f"Error in shape analysis: {e}") | |
| return "unknown" | |
| def check_for_plastic_bottle(roi, roi_hsv=None): | |
| """ | |
| Check if a region of interest contains a plastic bottle based on color and texture | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Boolean indicating if a plastic bottle was detected | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False | |
| # Look for clear/translucent plastic colors (broader range) | |
| clear_plastic_mask = cv2.inRange( | |
| roi_hsv, | |
| np.array([0, 0, 120]), # Lower threshold to catch more plastic | |
| np.array([180, 80, 255]) # Higher saturation tolerance | |
| ) | |
| clear_ratio = np.sum(clear_plastic_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Look for blue plastic colors (common in water bottles) | |
| blue_plastic_mask = cv2.inRange( | |
| roi_hsv, | |
| np.array([85, 40, 40]), # Wider blue range | |
| np.array([135, 255, 255]) | |
| ) | |
| blue_ratio = np.sum(blue_plastic_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Look for colored plastic (expanded colors) | |
| colored_plastic_mask = cv2.inRange( | |
| roi_hsv, | |
| np.array([0, 50, 100]), # Catch any colored plastics | |
| np.array([180, 255, 255]) | |
| ) | |
| colored_ratio = np.sum(colored_plastic_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Look for blue plastic cap colors | |
| blue_cap_mask = cv2.inRange( | |
| roi_hsv, | |
| np.array([90, 80, 80]), | |
| np.array([140, 255, 255]) | |
| ) | |
| blue_cap_ratio = np.sum(blue_cap_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Check object shape | |
| bottle_shape = analyze_object_shape(roi) | |
| # Calculate aspect ratio directly (bottles are typically taller than wide) | |
| aspect_ratio = w / h if h > 0 else 0 | |
| direct_bottle_shape = 0.1 < aspect_ratio < 0.9 # Very permissive aspect ratio | |
| # Check for uniform texture (plastic bottles tend to have uniform regions) | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| std_dev = np.std(gray) | |
| uniform_texture = std_dev < 60 # More permissive texture threshold | |
| # Combination of factors to determine if it's a bottle - MUCH more permissive now | |
| is_bottle_shape = bottle_shape in ["bottle-like", "tall-thin"] or direct_bottle_shape | |
| has_plastic_colors = clear_ratio > 0.2 or blue_ratio > 0.2 or colored_ratio > 0.3 | |
| has_bottle_cap = blue_cap_ratio > 0.03 | |
| # More permissive combination | |
| return (is_bottle_shape and has_plastic_colors) or \ | |
| (has_plastic_colors and has_bottle_cap) or \ | |
| (is_bottle_shape and uniform_texture and (clear_ratio > 0.1 or blue_ratio > 0.1)) | |
| def check_for_plastic_waste(roi, roi_hsv=None): | |
| """ | |
| Check if a region of interest contains plastic waste based on color and texture | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Boolean indicating if plastic waste was detected | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False | |
| # Look for plastic-like colors - much broader range | |
| plastic_colors_mask = cv2.inRange( | |
| roi_hsv, | |
| np.array([0, 0, 80]), # Lower threshold to catch more varied plastics | |
| np.array([180, 120, 255]) # Higher saturation tolerance | |
| ) | |
| plastic_ratio = np.sum(plastic_colors_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Look for bright colored plastics (packaging, etc.) | |
| bright_plastic_mask = cv2.inRange( | |
| roi_hsv, | |
| np.array([0, 80, 120]), # More permissive for colored plastics | |
| np.array([180, 255, 255]) | |
| ) | |
| bright_ratio = np.sum(bright_plastic_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Check for white/gray plastic specifically | |
| white_plastic_mask = cv2.inRange( | |
| roi_hsv, | |
| np.array([0, 0, 120]), | |
| np.array([180, 50, 255]) | |
| ) | |
| white_ratio = np.sum(white_plastic_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Get standard deviation of hue and saturation (plastics often have uniform color) | |
| h_std = np.std(roi_hsv[:,:,0]) | |
| s_std = np.std(roi_hsv[:,:,1]) | |
| v_std = np.std(roi_hsv[:,:,2]) | |
| # Look for unnatural colors (not common in natural scenes) | |
| # For synthetic materials like plastic waste | |
| unnatural_mask = np.zeros_like(roi_hsv[:,:,0]) | |
| # Neon colors | |
| neon_mask = cv2.inRange(roi_hsv, np.array([0, 150, 150]), np.array([180, 255, 255])) | |
| unnatural_mask = cv2.bitwise_or(unnatural_mask, neon_mask) | |
| # Light blue (uncommon in nature) | |
| light_blue_mask = cv2.inRange(roi_hsv, np.array([90, 50, 200]), np.array([110, 150, 255])) | |
| unnatural_mask = cv2.bitwise_or(unnatural_mask, light_blue_mask) | |
| # Bright red/orange (uncommon in nature) | |
| bright_red_mask = cv2.inRange(roi_hsv, np.array([0, 150, 150]), np.array([20, 255, 255])) | |
| unnatural_mask = cv2.bitwise_or(unnatural_mask, bright_red_mask) | |
| unnatural_ratio = np.sum(unnatural_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Convert to grayscale for edge detection | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| edge_ratio = np.sum(edges > 0) / (roi.shape[0] * roi.shape[1]) | |
| # Check if it has plastic-like colors and uniform appearance - more permissive | |
| has_plastic_colors = plastic_ratio > 0.25 or bright_ratio > 0.2 or white_ratio > 0.3 or unnatural_ratio > 0.1 | |
| has_uniform_appearance = h_std < 45 and s_std < 70 | |
| # Additional check for man-made objects: uniform regions with defined edges | |
| has_defined_edges = 0.01 < edge_ratio < 0.3 and v_std < 50 | |
| # More permissive criteria - any of these combinations could indicate plastic waste | |
| return (has_plastic_colors and has_uniform_appearance) or \ | |
| (has_plastic_colors and has_defined_edges) or \ | |
| (unnatural_ratio > 0.15) or \ | |
| (white_ratio > 0.4 and edge_ratio > 0.01) | |
| def check_for_ship(roi, roi_hsv=None): | |
| """ | |
| Check if a region of interest contains a ship based on shape and color | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Boolean indicating if a ship was detected | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False | |
| # Ship needs to have enough size | |
| if h < 20 or w < 20: | |
| return False | |
| # Check aspect ratio first - ships are typically wider than tall | |
| aspect_ratio = w / h | |
| if aspect_ratio < 1.2: # Ship must be wider than tall | |
| return False | |
| # Convert to grayscale for line detection | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Get edges | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Look for horizontal lines (characteristic of ships) | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=40, minLineLength=w//3, maxLineGap=10) | |
| horizontal_lines = 0 | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) | |
| # Count horizontal lines (stricter: +/- 5 degrees) | |
| if angle < 5 or angle > 175: | |
| # Only count lines with significant length | |
| if abs(x2 - x1) > w//3: | |
| horizontal_lines += 1 | |
| # Require more horizontal lines | |
| if horizontal_lines < 3: | |
| return False | |
| # Look for ship colors (white, gray, dark) | |
| white_mask = cv2.inRange(roi_hsv, np.array([0, 0, 180]), np.array([180, 30, 255])) | |
| gray_mask = cv2.inRange(roi_hsv, np.array([0, 0, 80]), np.array([180, 30, 150])) | |
| blue_mask = cv2.inRange(roi_hsv, np.array([90, 50, 50]), np.array([130, 255, 255])) | |
| white_ratio = np.sum(white_mask > 0) / (h * w) | |
| gray_ratio = np.sum(gray_mask > 0) / (h * w) | |
| blue_ratio = np.sum(blue_mask > 0) / (h * w) | |
| # Require higher color presence | |
| ship_color_present = (white_ratio + gray_ratio + blue_ratio) > 0.4 | |
| # Check object shape | |
| shape = analyze_object_shape(roi) | |
| ship_shape = shape == "ship-like" # Only use ship-like, not elongated which is too broad | |
| # Check for presence of water at the bottom of the region (ships are on water) | |
| if h > 30: | |
| bottom_roi = roi[int(h*2/3):h, :] | |
| if bottom_roi.size > 0: | |
| bottom_hsv = cv2.cvtColor(bottom_roi, cv2.COLOR_BGR2HSV) | |
| water_mask = cv2.inRange(bottom_hsv, np.array([80, 30, 30]), np.array([150, 255, 255])) | |
| water_ratio = np.sum(water_mask > 0) / (bottom_roi.shape[0] * bottom_roi.shape[1]) | |
| has_water = water_ratio > 0.3 | |
| else: | |
| has_water = False | |
| else: | |
| has_water = False | |
| # Combine all criteria - much more strict now | |
| return (horizontal_lines >= 3 and ship_color_present and aspect_ratio > 1.5) or (ship_shape and ship_color_present and has_water) | |
| async def upload_to_cloudinary(image_path: str) -> Optional[str]: | |
| """Upload an image to Cloudinary and return its URL""" | |
| try: | |
| # Check if Cloudinary is configured | |
| from ..config import get_settings | |
| settings = get_settings() | |
| if not settings.cloudinary_cloud_name or not settings.cloudinary_api_key or not settings.cloudinary_api_secret: | |
| logger.warning("Cloudinary not configured - using local storage for annotated image") | |
| # Save to local uploads folder instead | |
| from pathlib import Path | |
| upload_dir = Path("app/uploads") | |
| upload_dir.mkdir(exist_ok=True) | |
| filename = f"{uuid.uuid4().hex}.jpg" | |
| local_path = upload_dir / filename | |
| import shutil | |
| shutil.copy(image_path, local_path) | |
| # Return a local file URL | |
| return f"/uploads/{filename}" | |
| # Cloudinary is configured, proceed with upload | |
| upload_result = cloudinary.uploader.upload( | |
| image_path, | |
| folder="marine_guard_annotated", | |
| resource_type="auto" | |
| ) | |
| return upload_result["secure_url"] | |
| except Exception as e: | |
| logger.error(f"Failed to upload annotated image to Cloudinary: {e}") | |
| try: | |
| # Fallback to local storage | |
| from pathlib import Path | |
| upload_dir = Path("app/uploads") | |
| upload_dir.mkdir(exist_ok=True) | |
| filename = f"{uuid.uuid4().hex}_fallback.jpg" | |
| local_path = upload_dir / filename | |
| import shutil | |
| shutil.copy(image_path, local_path) | |
| logger.info(f"Saved annotated image locally as fallback: {local_path}") | |
| return f"/uploads/{filename}" | |
| except Exception as e2: | |
| logger.error(f"Local storage fallback also failed: {e2}") | |
| return None | |
| # -------------------- Helper Functions for Marine Pollution Detection -------------------- | |
| def detect_beach_scene(img, hsv): | |
| """ | |
| Detect if the image shows a beach scene | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: HSV format of the same image | |
| Returns: | |
| True if beach scene detected, False otherwise | |
| """ | |
| # Detect sand/beach colors | |
| sand_mask = cv2.inRange( | |
| hsv, | |
| np.array([15, 0, 150]), # Light sand colors - broader range | |
| np.array([40, 80, 255]) | |
| ) | |
| # Check for presence of blue sky | |
| sky_mask = cv2.inRange( | |
| hsv, | |
| np.array([90, 50, 180]), # Blue sky | |
| np.array([130, 255, 255]) | |
| ) | |
| # Calculate ratio of sand and sky pixels | |
| sand_ratio = np.sum(sand_mask) / (hsv.shape[0] * hsv.shape[1] * 255) | |
| sky_ratio = np.sum(sky_mask) / (hsv.shape[0] * hsv.shape[1] * 255) | |
| # Return True if significant sand is detected (suggesting beach) | |
| return sand_ratio > 0.15 or (sand_ratio > 0.1 and sky_ratio > 0.2) | |
| def detect_water_scene(img, hsv): | |
| """ | |
| Detect if the image shows a water body (sea, ocean, lake) | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: HSV format of the same image | |
| Returns: | |
| True if water scene detected, False otherwise | |
| """ | |
| # Detect water colors (blue/green tones) | |
| blue_water_mask = cv2.inRange( | |
| hsv, | |
| np.array([80, 30, 30]), # Broader range for water colors | |
| np.array([150, 255, 255]) | |
| ) | |
| # Define color ranges for darker water | |
| dark_water_mask = cv2.inRange(hsv, np.array([80, 10, 10]), np.array([140, 180, 180])) | |
| # Define color ranges for greenish water | |
| green_water_mask = cv2.inRange(hsv, np.array([40, 30, 30]), np.array([90, 180, 200])) | |
| # Combine masks | |
| water_mask = cv2.bitwise_or(blue_water_mask, dark_water_mask) | |
| water_mask = cv2.bitwise_or(water_mask, green_water_mask) | |
| # Calculate ratio of water pixels | |
| water_ratio = np.sum(water_mask) / (hsv.shape[0] * hsv.shape[1] * 255) | |
| # Check for horizon line using edge detection | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Apply Hough Line Transform to detect straight horizontal lines | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100, | |
| minLineLength=img.shape[1]//3, maxLineGap=20) | |
| has_horizon = False | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) | |
| # Look for horizontal lines (+/- 10 degrees) | |
| if angle < 10 or angle > 170: | |
| # Check if it's in the middle third of the image (typical horizon position) | |
| y_pos = (y1 + y2) / 2 | |
| if img.shape[0]/4 < y_pos < 3*img.shape[0]/4: | |
| has_horizon = True | |
| break | |
| # Return True if significant water is detected or has horizon with some water | |
| return water_ratio > 0.25 or (water_ratio > 0.15 and has_horizon) | |
| def check_for_plastic_bottle(roi, roi_hsv=None): | |
| """ | |
| Check if an image region contains a plastic bottle | |
| Args: | |
| roi: Image region to analyze | |
| roi_hsv: HSV version of the roi (optional) | |
| Returns: | |
| True if likely plastic bottle, False otherwise | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| if h == 0 or w == 0: | |
| return False | |
| # Check bottle aspect ratio (usually taller than wide) | |
| aspect_ratio = w / h | |
| # Check for transparent/translucent plastic | |
| clear_mask = cv2.inRange(roi_hsv, np.array([0, 0, 150]), np.array([180, 60, 255])) | |
| clear_ratio = np.sum(clear_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Check for blue plastic (common for bottles) | |
| blue_mask = cv2.inRange(roi_hsv, np.array([90, 40, 100]), np.array([130, 255, 255])) | |
| blue_ratio = np.sum(blue_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Check for white plastic cap or label | |
| white_mask = cv2.inRange(roi_hsv, np.array([0, 0, 200]), np.array([180, 30, 255])) | |
| white_ratio = np.sum(white_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Bottle-like if it has right shape and color characteristics | |
| return ((0.2 < aspect_ratio < 0.8) and # Bottle shape | |
| (clear_ratio > 0.3 or blue_ratio > 0.3 or white_ratio > 0.2)) # Bottle colors | |
| def check_for_plastic_waste(roi, roi_hsv=None): | |
| """ | |
| Check if an image region contains plastic waste (broader than just bottles) | |
| Args: | |
| roi: Image region to analyze | |
| roi_hsv: HSV version of the roi (optional) | |
| Returns: | |
| True if likely plastic waste, False otherwise | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| # Check for common plastic colors | |
| plastic_mask = np.zeros_like(roi_hsv[:,:,0]) | |
| # Clear/white plastic | |
| clear_mask = cv2.inRange(roi_hsv, np.array([0, 0, 150]), np.array([180, 60, 255])) | |
| plastic_mask = cv2.bitwise_or(plastic_mask, clear_mask) | |
| # Blue plastic | |
| blue_mask = cv2.inRange(roi_hsv, np.array([90, 40, 100]), np.array([130, 255, 255])) | |
| plastic_mask = cv2.bitwise_or(plastic_mask, blue_mask) | |
| # Green plastic | |
| green_mask = cv2.inRange(roi_hsv, np.array([40, 40, 100]), np.array([80, 255, 255])) | |
| plastic_mask = cv2.bitwise_or(plastic_mask, green_mask) | |
| # Calculate ratio of plastic-like pixels | |
| plastic_ratio = np.sum(plastic_mask) / (roi_hsv.shape[0] * roi_hsv.shape[1] * 255) | |
| # Check if region has uniform texture (common for plastic) | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| texture_uniformity = np.std(gray) | |
| # Return True if significant plastic-like colors and texture | |
| return plastic_ratio > 0.4 or (plastic_ratio > 0.25 and texture_uniformity < 50) | |
| def detect_plastic_bottles(img, hsv=None): | |
| """ | |
| Specialized detector for plastic bottles using color and shape analysis | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: HSV format of the same image (optional) | |
| Returns: | |
| List of detected plastic bottle regions with bounding boxes and confidence | |
| """ | |
| if hsv is None: | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| # Create a combined mask for common bottle colors | |
| bottle_mask = np.zeros_like(hsv[:,:,0]) | |
| # Clear/translucent plastic | |
| clear_mask = cv2.inRange(hsv, np.array([0, 0, 140]), np.array([180, 60, 255])) | |
| bottle_mask = cv2.bitwise_or(bottle_mask, clear_mask) | |
| # Blue plastic | |
| blue_mask = cv2.inRange(hsv, np.array([90, 40, 100]), np.array([130, 255, 255])) | |
| bottle_mask = cv2.bitwise_or(bottle_mask, blue_mask) | |
| # Green plastic | |
| green_mask = cv2.inRange(hsv, np.array([40, 40, 100]), np.array([80, 255, 255])) | |
| bottle_mask = cv2.bitwise_or(bottle_mask, green_mask) | |
| # Apply morphological operations to clean up the mask | |
| kernel = np.ones((5, 5), np.uint8) | |
| bottle_mask = cv2.morphologyEx(bottle_mask, cv2.MORPH_CLOSE, kernel) | |
| bottle_mask = cv2.morphologyEx(bottle_mask, cv2.MORPH_OPEN, kernel) | |
| # Find contours in the bottle mask | |
| contours, _ = cv2.findContours(bottle_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # Filter contours to find bottle-shaped objects | |
| detections = [] | |
| for contour in contours: | |
| area = cv2.contourArea(contour) | |
| if area < 200: # Skip small contours | |
| continue | |
| # Get bounding rectangle | |
| x, y, w, h = cv2.boundingRect(contour) | |
| # Skip if too small | |
| if w < 20 or h < 30: | |
| continue | |
| # Calculate aspect ratio | |
| aspect_ratio = float(w) / h if h > 0 else 0 | |
| # Check if shape matches bottle profile (usually taller than wide) | |
| if 0.2 < aspect_ratio < 0.8: | |
| # Extract ROI for additional checks | |
| roi = img[y:y+h, x:x+w] | |
| roi_hsv = hsv[y:y+h, x:x+w] | |
| # Check for bottle characteristics | |
| if check_for_plastic_bottle(roi, roi_hsv): | |
| detections.append({ | |
| "bbox": [x, y, x+w, y+h], | |
| "confidence": 0.85, | |
| "class": "plastic bottle" | |
| }) | |
| return detections | |
| def box_overlap(box1, box2): | |
| """ | |
| Calculate IoU (Intersection over Union) between two boxes | |
| Args: | |
| box1, box2: Boxes in format [x1, y1, x2, y2] | |
| Returns: | |
| IoU value between 0 and 1 | |
| """ | |
| # Calculate intersection | |
| x_left = max(box1[0], box2[0]) | |
| y_top = max(box1[1], box2[1]) | |
| x_right = min(box1[2], box2[2]) | |
| y_bottom = min(box1[3], box2[3]) | |
| if x_right < x_left or y_bottom < y_top: | |
| return 0.0 # No intersection | |
| intersection = (x_right - x_left) * (y_bottom - y_top) | |
| # Calculate areas | |
| box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) | |
| box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) | |
| # Calculate IoU | |
| union = box1_area + box2_area - intersection | |
| return intersection / union if union > 0 else 0 | |
| def merge_overlapping_detections(detections, iou_threshold=0.5): | |
| """ | |
| Merge overlapping detections, keeping the one with higher confidence | |
| Args: | |
| detections: List of detection dictionaries | |
| iou_threshold: Threshold for overlap detection | |
| Returns: | |
| List of merged detections | |
| """ | |
| if not detections: | |
| return [] | |
| # Sort by confidence (descending) | |
| sorted_detections = sorted(detections, key=lambda x: x["confidence"], reverse=True) | |
| merged = [] | |
| for det in sorted_detections: | |
| should_add = True | |
| # Check if it overlaps with any detection already in merged list | |
| for m in merged: | |
| overlap = box_overlap(det["bbox"], m["bbox"]) | |
| # If significant overlap and same/similar class, don't add | |
| if overlap > iou_threshold: | |
| if ("bottle" in det["class"].lower() and "bottle" in m["class"].lower()) or \ | |
| ("plastic" in det["class"].lower() and "plastic" in m["class"].lower()): | |
| should_add = False | |
| break | |
| if should_add: | |
| merged.append(det) | |
| return merged | |
| def analyze_object_shape(roi): | |
| """ | |
| Analyze the shape of an object to determine if it resembles a bottle | |
| Args: | |
| roi: Region of interest (image crop) | |
| Returns: | |
| String indicating the shape type | |
| """ | |
| if roi is None or roi.size == 0: | |
| return "unknown" | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Apply threshold | |
| _, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY) | |
| # Find contours | |
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # If no contours found, return unknown | |
| if not contours: | |
| return "unknown" | |
| # Get largest contour | |
| largest_contour = max(contours, key=cv2.contourArea) | |
| # Calculate aspect ratio | |
| x, y, w, h = cv2.boundingRect(largest_contour) | |
| aspect_ratio = w / h if h > 0 else 0 | |
| # Calculate circularity | |
| area = cv2.contourArea(largest_contour) | |
| perimeter = cv2.arcLength(largest_contour, True) | |
| circularity = 4 * np.pi * area / (perimeter * perimeter) if perimeter > 0 else 0 | |
| # Bottle characteristics: typically taller than wide and not very circular | |
| if 0.2 < aspect_ratio < 0.7 and 0.4 < circularity < 0.75: | |
| return "bottle-like" | |
| # Irregular plastic waste | |
| elif circularity < 0.6: | |
| return "irregular" | |
| # Round objects | |
| elif circularity > 0.8: | |
| return "circular" | |
| else: | |
| return "unknown" | |
| # Special detection functions for different object types | |
| def detect_plastic_bottles(img, hsv=None): | |
| """ | |
| Specialized function to detect plastic bottles based on color and shape | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: Optional pre-computed HSV image | |
| Returns: | |
| List of dictionaries with bbox and confidence for detected plastic bottles | |
| """ | |
| if hsv is None: | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| h, w = img.shape[:2] | |
| detections = [] | |
| # Apply color thresholding for typical plastic bottle colors | |
| # 1. Clear/transparent plastic | |
| clear_plastic_mask = cv2.inRange(hsv, np.array([0, 0, 140]), np.array([180, 70, 255])) | |
| # 2. Blue bottle caps | |
| blue_cap_mask = cv2.inRange(hsv, np.array([100, 100, 100]), np.array([130, 255, 255])) | |
| # 3. Blue plastic bottles | |
| blue_bottle_mask = cv2.inRange(hsv, np.array([90, 50, 50]), np.array([130, 255, 255])) | |
| # Combine masks | |
| combined_mask = cv2.bitwise_or(clear_plastic_mask, blue_cap_mask) | |
| combined_mask = cv2.bitwise_or(combined_mask, blue_bottle_mask) | |
| # Apply morphological operations to clean up the mask | |
| kernel = np.ones((5, 5), np.uint8) | |
| mask_cleaned = cv2.morphologyEx(combined_mask, cv2.MORPH_OPEN, kernel) | |
| mask_cleaned = cv2.morphologyEx(mask_cleaned, cv2.MORPH_CLOSE, kernel) | |
| # Find contours | |
| contours, _ = cv2.findContours(mask_cleaned, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # Process contours | |
| for contour in contours: | |
| # Filter by size | |
| area = cv2.contourArea(contour) | |
| if area < (h * w * 0.005): # Skip very small objects (less than 0.5% of image) | |
| continue | |
| # Get bounding box | |
| x, y, w_box, h_box = cv2.boundingRect(contour) | |
| # Calculate aspect ratio - bottles are usually taller than wide | |
| aspect_ratio = float(w_box) / h_box if h_box > 0 else 0 | |
| # Bottle shape criteria | |
| is_bottle_shape = 0.2 < aspect_ratio < 0.8 | |
| # Calculate confidence based on multiple factors | |
| confidence = 0.6 # Base confidence | |
| # Extract ROI for more detailed analysis | |
| roi = img[y:y+h_box, x:x+w_box] | |
| if roi.size > 0: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| # Check if ROI has bottle characteristics | |
| if check_for_plastic_bottle(roi, roi_hsv): | |
| confidence += 0.25 | |
| # Check for blue cap at the top of the potential bottle | |
| top_region = roi[:max(1, h_box//4), :] | |
| if top_region.size > 0: | |
| top_hsv = cv2.cvtColor(top_region, cv2.COLOR_BGR2HSV) | |
| blue_cap_mask = cv2.inRange(top_hsv, np.array([100, 100, 100]), np.array([130, 255, 255])) | |
| blue_cap_ratio = np.sum(blue_cap_mask > 0) / (top_region.shape[0] * top_region.shape[1]) | |
| if blue_cap_ratio > 0.1: | |
| confidence += 0.15 | |
| # Add to detections if confidence is high enough | |
| if is_bottle_shape and confidence > 0.65: | |
| detections.append({ | |
| "bbox": [x, y, x + w_box, y + h_box], | |
| "confidence": min(0.98, confidence) | |
| }) | |
| return detections | |
| def detect_plastic_bottles_in_beach(img, hsv=None): | |
| """ | |
| Specialized function to detect plastic bottles in beach scenes - more aggressive | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: Optional pre-computed HSV image | |
| Returns: | |
| List of dictionaries with bbox and confidence for detected plastic bottles | |
| """ | |
| # Start with standard bottle detection | |
| detections = detect_plastic_bottles(img, hsv) | |
| # Use more aggressive detection for beach scenes | |
| if hsv is None: | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| h, w = img.shape[:2] | |
| # For beach scenes, we'll be extremely aggressive and look for any potential plastic | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Use adaptive thresholding to better detect plastic in variable lighting | |
| adaptive_thresh = cv2.adaptiveThreshold( | |
| gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 | |
| ) | |
| # Use multiple Canny edge detection settings to catch different kinds of plastic edges | |
| edges1 = cv2.Canny(gray, 20, 100) # More sensitive | |
| edges2 = cv2.Canny(gray, 50, 150) # Standard | |
| edges = cv2.bitwise_or(edges1, edges2) | |
| # Dilate edges to connect boundaries | |
| kernel = np.ones((5, 5), np.uint8) | |
| dilated_edges = cv2.dilate(edges, kernel, iterations=1) | |
| # Find contours | |
| contours, _ = cv2.findContours(dilated_edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # Process contours | |
| for contour in contours: | |
| # Filter by size - much more permissive | |
| area = cv2.contourArea(contour) | |
| perimeter = cv2.arcLength(contour, True) | |
| # Only skip extremely small or extremely large objects | |
| if area < (h * w * 0.002) or area > (h * w * 0.7): | |
| continue | |
| # Calculate shape metrics | |
| if perimeter > 0: | |
| circularity = 4 * np.pi * area / (perimeter * perimeter) | |
| # Get bounding box | |
| x, y, w_box, h_box = cv2.boundingRect(contour) | |
| # Calculate aspect ratio | |
| aspect_ratio = float(w_box) / h_box if h_box > 0 else 0 | |
| # Much more permissive bottle shape criteria | |
| is_bottle_shape = h_box > 20 and ( | |
| # Traditional bottle shape | |
| ((0.1 < aspect_ratio < 1.2) and (circularity < 1.0)) or | |
| # Flattened/crushed bottle | |
| ((0.5 < aspect_ratio < 2.0) and (circularity < 0.8)) | |
| ) | |
| # Continue processing even if shape doesn't match bottle - for plastic waste detection | |
| if is_bottle_shape or (area > (h * w * 0.005)): # Process larger objects even if shape doesn't match | |
| # Extract ROI for detailed analysis | |
| roi = img[max(0, y-5):min(h, y+h_box+5), max(0, x-5):min(w, x+w_box+5)] | |
| if roi.size == 0: | |
| continue | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| # Expanded color range for plastic detection | |
| plastic_colors = [ | |
| # Clear plastic | |
| (np.array([0, 0, 80]), np.array([180, 70, 255])), | |
| # White/gray plastic | |
| (np.array([0, 0, 150]), np.array([180, 40, 255])), | |
| # Colored plastic (common in bottles) | |
| (np.array([0, 40, 100]), np.array([180, 255, 255])), | |
| # Blue plastic specifically (common in bottles) | |
| (np.array([90, 50, 100]), np.array([130, 255, 255])), | |
| ] | |
| # Check all plastic color ranges | |
| has_plastic_colors = False | |
| for low, high in plastic_colors: | |
| plastic_mask = cv2.inRange(roi_hsv, low, high) | |
| plastic_ratio = np.sum(plastic_mask > 0) / (roi.shape[0] * roi.shape[1]) | |
| if plastic_ratio > 0.15: # Lower threshold for plastic detection | |
| has_plastic_colors = True | |
| break | |
| # Calculate texture metrics | |
| gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| blur = cv2.GaussianBlur(gray_roi, (5, 5), 0) | |
| std_dev = np.std(blur) | |
| # Look for colored caps - not just blue but any bright color | |
| has_bottle_cap = False | |
| if h_box > 15: | |
| # Check both top and bottom for caps (for bottles lying on their sides) | |
| top_roi = roi[:max(1, roi.shape[0]//4), :] | |
| bottom_roi = roi[min(roi.shape[0], roi.shape[0]*3//4):, :] | |
| # Check both regions for bright colors that could be caps | |
| for cap_roi in [top_roi, bottom_roi]: | |
| if cap_roi.size > 0: | |
| cap_hsv = cv2.cvtColor(cap_roi, cv2.COLOR_BGR2HSV) | |
| # Check for various cap colors - blue, red, green, white | |
| cap_masks = [ | |
| cv2.inRange(cap_hsv, np.array([90, 80, 80]), np.array([140, 255, 255])), # Blue | |
| cv2.inRange(cap_hsv, np.array([0, 80, 80]), np.array([20, 255, 255])), # Red | |
| cv2.inRange(cap_hsv, np.array([35, 80, 80]), np.array([85, 255, 255])), # Green | |
| cv2.inRange(cap_hsv, np.array([0, 0, 180]), np.array([180, 40, 255])) # White | |
| ] | |
| for cap_mask in cap_masks: | |
| cap_ratio = np.sum(cap_mask > 0) / (cap_roi.shape[0] * cap_roi.shape[1]) | |
| if cap_ratio > 0.08: # Lower threshold for cap detection | |
| has_bottle_cap = True | |
| break | |
| if has_bottle_cap: | |
| break | |
| # Look for plastic waste specifically | |
| is_plastic_waste = check_for_plastic_waste(roi, roi_hsv) | |
| # Check with our specialized bottle detector | |
| is_bottle = check_for_plastic_bottle(roi, roi_hsv) | |
| # Calculate confidence - much more permissive criteria | |
| base_confidence = 0.4 # Start with a lower base confidence | |
| if has_plastic_colors: | |
| base_confidence += 0.15 | |
| if has_bottle_cap: | |
| base_confidence += 0.15 | |
| if is_bottle_shape: | |
| base_confidence += 0.15 | |
| if is_bottle: | |
| base_confidence += 0.2 | |
| if is_plastic_waste: | |
| base_confidence += 0.15 | |
| if std_dev < 50: # Uniform texture is common in plastic | |
| base_confidence += 0.1 | |
| # For beach scenes, be much more aggressive with detection confidence threshold | |
| if base_confidence > 0.5: # Lower threshold for beach scenes | |
| # Check if this detection overlaps with existing ones | |
| bbox = [x, y, x + w_box, y + h_box] | |
| is_duplicate = False | |
| for det in detections: | |
| existing_bbox = det["bbox"] | |
| # Calculate IoU | |
| x1 = max(bbox[0], existing_bbox[0]) | |
| y1 = max(bbox[1], existing_bbox[1]) | |
| x2 = min(bbox[2], existing_bbox[2]) | |
| y2 = min(bbox[3], existing_bbox[3]) | |
| if x2 > x1 and y2 > y1: | |
| intersection = (x2 - x1) * (y2 - y1) | |
| area1 = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) | |
| area2 = (existing_bbox[2] - existing_bbox[0]) * (existing_bbox[3] - existing_bbox[1]) | |
| union = area1 + area2 - intersection | |
| iou = intersection / union if union > 0 else 0 | |
| if iou > 0.3: # If overlapping significantly | |
| is_duplicate = True | |
| # Update the existing detection if this one has higher confidence | |
| if base_confidence > det["confidence"]: | |
| det["confidence"] = base_confidence | |
| break | |
| if not is_duplicate: | |
| detections.append({ | |
| "bbox": bbox, | |
| "confidence": base_confidence | |
| }) | |
| return detections | |
| def detect_ships(img, hsv=None): | |
| """ | |
| Specialized function to detect ships based on color, shape and context. | |
| Now with extremely conservative criteria to avoid false positives. | |
| Args: | |
| img: OpenCV image in BGR format | |
| hsv: Optional pre-computed HSV image | |
| Returns: | |
| List of dictionaries with bbox and confidence for detected ships | |
| """ | |
| if hsv is None: | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| h, w = img.shape[:2] | |
| detections = [] | |
| # Return empty if the image is too small - can't reliably detect ships | |
| if h < 100 or w < 100: | |
| return [] | |
| # Convert to grayscale for edge detection | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Apply edge detection - more conservative parameters | |
| edges = cv2.Canny(gray, 80, 200) # Higher thresholds | |
| # Apply Hough Line Transform with stricter parameters | |
| # Require longer lines (1/4 of image width) and higher threshold | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100, | |
| minLineLength=w//4, maxLineGap=15) | |
| # No lines found, definitely no ships | |
| if lines is None or len(lines) < 3: # Require at least 3 lines | |
| return [] | |
| # Count horizontal lines and their positions - be more strict about horizontality | |
| horizontal_lines = [] | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) | |
| # Horizontal lines (+/- 5 degrees) - stricter angle | |
| if angle < 5 or angle > 175: | |
| # Calculate line length | |
| length = np.sqrt((x2 - x1)**2 + (y2 - y1)**2) | |
| # Only include lines that are significant in length (at least 1/4 of width) | |
| if length > w / 4: | |
| horizontal_lines.append((x1, y1, x2, y2)) | |
| # Require more horizontal lines | |
| if len(horizontal_lines) < 3: | |
| # Not enough significant horizontal lines for ship detection | |
| return [] | |
| # Find clusters of horizontal lines that might represent ships - more conservative | |
| ship_candidates = [] | |
| for i, (x1, y1, x2, y2) in enumerate(horizontal_lines): | |
| # Start a new candidate with this line | |
| y_min = min(y1, y2) | |
| y_max = max(y1, y2) | |
| x_min = min(x1, x2) | |
| x_max = max(x1, x2) | |
| # Look for nearby horizontal lines | |
| related_lines = [i] | |
| for j, (x1_other, y1_other, x2_other, y2_other) in enumerate(horizontal_lines): | |
| if i == j: | |
| continue | |
| y_min_other = min(y1_other, y2_other) | |
| y_max_other = max(y1_other, y2_other) | |
| # Check if this line is near our candidate (vertically) | |
| # Use a more conservative distance threshold | |
| vertical_distance = min(abs(y_min - y_max_other), abs(y_max - y_min_other)) | |
| if vertical_distance < h * 0.1: # Within 10% of image height | |
| # Update bounding box | |
| y_min = min(y_min, y_min_other) | |
| y_max = max(y_max, y_max_other) | |
| x_min = min(x_min, min(x1_other, x2_other)) | |
| x_max = max(x_max, max(x1_other, x2_other)) | |
| related_lines.append(j) | |
| # Calculate bounding box aspect ratio (ships are typically wider than tall) | |
| width = x_max - x_min | |
| height = y_max - y_min | |
| aspect_ratio = width / height if height > 0 else 0 | |
| # Skip if aspect ratio is not appropriate for ships | |
| if aspect_ratio < 1.5: # More conservative | |
| continue | |
| # Check if there's water present at the bottom of the candidate | |
| # Ships should be on water | |
| if y_max < h: | |
| water_region = img[y_max:min(h, y_max + 20), x_min:x_max] | |
| if water_region.size > 0: | |
| water_hsv = cv2.cvtColor(water_region, cv2.COLOR_BGR2HSV) | |
| water_mask = cv2.inRange(water_hsv, np.array([90, 40, 40]), np.array([140, 255, 255])) | |
| water_ratio = np.sum(water_mask > 0) / (water_region.shape[0] * water_region.shape[1]) | |
| # Skip if no water detected below the object | |
| if water_ratio < 0.3: | |
| continue | |
| # Add some padding to the bounding box | |
| y_padding = int(h * 0.03) | |
| x_padding = int(w * 0.03) | |
| y_min = max(0, y_min - y_padding) | |
| y_max = min(h, y_max + y_padding) | |
| x_min = max(0, x_min - x_padding) | |
| x_max = min(w, x_max + x_padding) | |
| # Only add if we have multiple related lines AND they span a significant width | |
| if len(related_lines) >= 3 and (x_max - x_min) > w / 4: | |
| ship_candidates.append({ | |
| "bbox": [x_min, y_min, x_max, y_max], | |
| "related_lines": related_lines, | |
| "aspect_ratio": aspect_ratio | |
| }) | |
| # Further verify ship candidates - much stricter criteria | |
| for candidate in ship_candidates: | |
| bbox = candidate["bbox"] | |
| x_min, y_min, x_max, y_max = bbox | |
| # Extract ROI | |
| roi = img[y_min:y_max, x_min:x_max] | |
| if roi.size == 0: | |
| continue | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| # Only accept candidates with good aspect ratio | |
| if candidate["aspect_ratio"] < 1.5: | |
| continue | |
| # Check if this is a large region - ships are usually significant | |
| region_size_ratio = ((y_max - y_min) * (x_max - x_min)) / (h * w) | |
| if region_size_ratio < 0.05: # Skip very small regions | |
| continue | |
| # Check for plastic bottles or waste - if found, this is likely NOT a ship | |
| if check_for_plastic_bottle(roi, roi_hsv) or check_for_plastic_waste(roi, roi_hsv): | |
| continue | |
| # Finally, check if it meets stricter ship criteria | |
| if check_for_ship(roi, roi_hsv): | |
| # More conservative confidence scoring | |
| confidence = 0.6 + (0.05 * min(3, len(candidate["related_lines"]))) | |
| confidence += 0.1 if candidate["aspect_ratio"] > 2 else 0 # Bonus for wide ships | |
| # If we pass all these strict checks, it's very likely a ship | |
| detections.append({ | |
| "bbox": bbox, | |
| "confidence": min(0.9, confidence) # Cap confidence slightly lower | |
| }) | |
| # Apply non-max suppression to remove overlapping detections | |
| if len(detections) > 1: | |
| # Extract boxes and confidences | |
| boxes = np.array([d["bbox"] for d in detections]) | |
| confidences = np.array([d["confidence"] for d in detections]) | |
| # Convert boxes from [x1, y1, x2, y2] to [x, y, w, h] | |
| boxes_nms = np.zeros((len(boxes), 4)) | |
| boxes_nms[:, 0] = boxes[:, 0] | |
| boxes_nms[:, 1] = boxes[:, 1] | |
| boxes_nms[:, 2] = boxes[:, 2] - boxes[:, 0] | |
| boxes_nms[:, 3] = boxes[:, 3] - boxes[:, 1] | |
| # Apply NMS with low IoU threshold to keep distinct ships | |
| indices = cv2.dnn.NMSBoxes(boxes_nms.tolist(), confidences.tolist(), 0.6, 0.4) | |
| if isinstance(indices, list) and len(indices) > 0: | |
| filtered_detections = [detections[i] for i in indices] | |
| elif len(indices) > 0: | |
| # OpenCV 4.x returns a 2D array | |
| try: | |
| filtered_detections = [detections[i[0]] for i in indices] | |
| except: | |
| filtered_detections = [detections[i] for i in indices.flatten()] | |
| else: | |
| filtered_detections = [] | |
| # Limit to a maximum of 3 ship detections per image to further reduce false positives | |
| return filtered_detections[:3] | |
| return detections | |
| def detect_general_waste(roi, roi_hsv=None): | |
| """ | |
| General-purpose waste detection for beach and water scenes. | |
| Detects various types of waste including plastics, metal, glass, etc. | |
| Args: | |
| roi: Region of interest (cropped image) in BGR format | |
| roi_hsv: Pre-computed HSV region (optional) | |
| Returns: | |
| Tuple of (is_waste, waste_type, confidence) | |
| """ | |
| if roi_hsv is None: | |
| roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) | |
| h, w = roi.shape[:2] | |
| # Skip invalid ROIs | |
| if h == 0 or w == 0: | |
| return False, None, 0.0 | |
| # Convert to grayscale for texture analysis | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| # Calculate texture metrics | |
| std_dev = np.std(gray) | |
| # Detect plastic waste | |
| if check_for_plastic_waste(roi, roi_hsv): | |
| return True, "plastic waste", 0.7 | |
| # Detect plastic bottles specifically | |
| if check_for_plastic_bottle(roi, roi_hsv): | |
| return True, "plastic bottle", 0.85 | |
| # Check for other common waste colors and textures | |
| # Bright unnatural colors | |
| bright_mask = cv2.inRange(roi_hsv, np.array([0, 100, 150]), np.array([180, 255, 255])) | |
| bright_ratio = np.sum(bright_mask > 0) / (h * w) | |
| # Metallic/reflective surfaces | |
| metal_mask = cv2.inRange(roi_hsv, np.array([0, 0, 150]), np.array([180, 40, 220])) | |
| metal_ratio = np.sum(metal_mask > 0) / (h * w) | |
| # Detect regular shape with unnatural color (likely man-made) | |
| edges = cv2.Canny(gray, 50, 150) | |
| edge_ratio = np.sum(edges > 0) / (h * w) | |
| has_straight_edges = False | |
| lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=50, minLineLength=20, maxLineGap=10) | |
| if lines is not None and len(lines) > 2: | |
| has_straight_edges = True | |
| # If it has bright unnatural colors and straight edges, likely waste | |
| if bright_ratio > 0.3 and has_straight_edges: | |
| return True, "colored waste", 0.65 | |
| # If it has metallic appearance and straight edges, likely metal waste | |
| if metal_ratio > 0.3 and has_straight_edges: | |
| return True, "metal waste", 0.6 | |
| # If it has uniform texture and straight edges, could be general waste | |
| if std_dev < 35 and has_straight_edges: | |
| return True, "general waste", 0.5 | |
| # Not waste | |
| return False, None, 0.0 | |
| # Apply one final torchvision patch to ensure we avoid the circular import issue | |
| # This will run when the module is imported and ensure the patch is applied | |
| try: | |
| # Make sure torchvision._meta_registrations is properly patched | |
| if 'torchvision._meta_registrations' not in sys.modules or not hasattr(sys.modules['torchvision._meta_registrations'], 'register_meta'): | |
| import types | |
| sys.modules['torchvision._meta_registrations'] = types.ModuleType('torchvision._meta_registrations') | |
| sys.modules['torchvision._meta_registrations'].__dict__['register_meta'] = lambda x: lambda y: y | |
| logger.info("Applied final torchvision patch") | |
| # Apply specific patch for torchvision::nms operator issue | |
| if HAS_TORCH: | |
| # Check if we need to mock torch._C._dispatch_has_kernel_for_dispatch_key | |
| if hasattr(torch, '_C') and hasattr(torch._C, '_dispatch_has_kernel_for_dispatch_key'): | |
| original_func = torch._C._dispatch_has_kernel_for_dispatch_key | |
| # Patch the function to handle the problematic case | |
| def patched_dispatch_check(qualname, key): | |
| if qualname == "torchvision::nms" and key == "Meta": | |
| logger.info("Intercepted check for torchvision::nms Meta dispatcher") | |
| return True | |
| return original_func(qualname, key) | |
| torch._C._dispatch_has_kernel_for_dispatch_key = patched_dispatch_check | |
| logger.info("Applied torch dispatch check patch") | |
| except Exception as e: | |
| logger.warning(f"Final torchvision patching failed (non-critical): {e}") |