#!/usr/bin/env python3 """ Test script to explicitly use the largest YOLO model (YOLOv8x) for detection and verify that it's working correctly. """ import os import cv2 import logging import numpy as np from pathlib import Path from typing import Dict, List, Optional, Tuple, Union import time import sys # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Import from app modules sys.path.append('.') # Add current directory to path from app.services.image_processing import ( initialize_yolo_model, detect_beach_scene, detect_water_scene, detect_plastic_bottles, detect_plastic_bottles_in_beach, detect_ships, check_for_plastic_bottle, check_for_ship, check_for_plastic_waste ) def setup_test_directories(): """Set up output directories for test results""" output_dir = Path("test_output/large_model_detection") output_dir.mkdir(parents=True, exist_ok=True) return output_dir def test_yolov8x_detection(img_path: str, output_dir: Path) -> Dict: """ Test YOLOv8x detection on a given image and save the results. Args: img_path: Path to test image output_dir: Output directory for results Returns: Dict with test results """ logger.info(f"Testing YOLOv8x detection on: {img_path}") # Read the image img = cv2.imread(img_path) if img is None: logger.error(f"Could not read image: {img_path}") return {} # Get image dimensions h, w = img.shape[:2] logger.info(f"Image dimensions: {w}x{h}") # Convert to HSV hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) # Detect scene type is_beach = detect_beach_scene(img, hsv) is_water = detect_water_scene(img, hsv) # Determine scene type if is_beach and is_water: scene_type = "coastal" elif is_beach: scene_type = "beach" elif is_water: scene_type = "water" else: scene_type = "other" logger.info(f"Scene type: {scene_type}") # Initialize YOLOv8x model explicitly start_time = time.time() logger.info("Initializing YOLOv8x model...") # Check if YOLOv8x exists and is a valid size model_path = "yolov8x.pt" need_download = False if not os.path.exists(model_path): logger.info("YOLOv8x model file not found, will download") need_download = True elif os.path.getsize(model_path) < 1000000: logger.info(f"YOLOv8x model file seems incomplete ({os.path.getsize(model_path)} bytes), will download") need_download = True else: logger.info(f"Found existing YOLOv8x model ({os.path.getsize(model_path)} bytes), using it") # Only download if needed if need_download: try: from ultralytics import YOLO logger.info("Downloading YOLOv8x model...") model = YOLO("yolov8x.pt") # This will download if not present logger.info(f"YOLOv8x download complete: {os.path.getsize(model_path)} bytes") except Exception as e: logger.error(f"Failed to download YOLOv8x: {e}") return {} # Initialize the model model = initialize_yolo_model() if model is None: logger.error("Failed to initialize YOLOv8x model") return {} # Get model info with improved handling for different return types model_type = "unknown" try: if hasattr(model, 'info'): model_info = model.info() logger.info(f"Model info type: {type(model_info)}") if isinstance(model_info, dict): model_type = model_info.get('model_type', 'unknown') logger.info(f"Using model: {model_type} (from dictionary)") elif isinstance(model_info, tuple): # For newer versions of Ultralytics that return tuples model_type = str(model_info[0]) if model_info and len(model_info) > 0 else "unknown" logger.info(f"Using model: {model_type} (from tuple)") elif hasattr(model_info, 'model_type'): # For object-based returns model_type = model_info.model_type logger.info(f"Using model: {model_type} (from object attribute)") else: # Fallback - extract from model path if hasattr(model, 'model') and hasattr(model.model, 'names'): logger.info(f"Model has {len(model.model.names)} classes") if len(model.model.names) > 80: model_type = "x" # Most likely YOLOv8x logger.info(f"Model info is not a standard format: {type(model_info)}") else: logger.info("Model does not have info() method") except Exception as e: logger.warning(f"Could not get model info: {e}") # Run inference logger.info("Running YOLOv8x inference...") results = model(img_path) inference_time = time.time() - start_time logger.info(f"Inference completed in {inference_time:.2f} seconds") # Process results result = results[0] if results and len(results) > 0 else None detections = [] if result: # Extract boxes, confidences, and class IDs logger.info(f"YOLOv8x detected {len(result.boxes)} objects") for box in result.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) confidence = float(box.conf[0]) class_id = int(box.cls[0]) # Skip very low confidence detections if confidence < 0.1: continue # Get class name if hasattr(result, 'names') and class_id in result.names: class_name = result.names[class_id] else: class_name = f"class_{class_id}" # Add detection detections.append({ "class": class_name, "confidence": round(confidence, 3), "bbox": [x1, y1, x2, y2] }) # Draw detections on the image img_result = img.copy() # Add header with model info header = f"Model: YOLOv8x | Scene: {scene_type} | Objects: {len(detections)}" cv2.rectangle(img_result, (0, 0), (w, 30), (0, 0, 0), -1) cv2.putText(img_result, header, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Draw all detections for det in detections: x1, y1, x2, y2 = det["bbox"] class_name = det["class"] confidence = det["confidence"] # Choose color based on class if class_name == "bottle": color = (0, 0, 255) # Red elif class_name == "person": color = (255, 0, 0) # Blue else: color = (0, 255, 0) # Green # Draw bounding box cv2.rectangle(img_result, (x1, y1), (x2, y2), color, 2) # Add label label = f"{class_name}: {confidence:.2f}" cv2.rectangle(img_result, (x1, y1 - 20), (x1 + len(label) * 8, y1), color, -1) cv2.putText(img_result, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # Save result img_name = Path(img_path).name output_path = output_dir / f"yolov8x_{img_name}" cv2.imwrite(str(output_path), img_result) logger.info(f"Result saved to: {output_path}") # Special detection for plastic and ships bottle_detections = [] ship_detections = [] if is_beach: logger.info("Using beach-specific bottle detection") bottle_detections = detect_plastic_bottles_in_beach(img, hsv) else: logger.info("Using standard bottle detection") bottle_detections = detect_plastic_bottles(img, hsv) if is_water: logger.info("Detecting ships in water scene") ship_detections = detect_ships(img, hsv) logger.info(f"Detected {len(bottle_detections)} potential plastic bottles") logger.info(f"Detected {len(ship_detections)} potential ships") # Return results return { "scene_type": scene_type, "yolo_detections": len(detections), "bottle_detections": len(bottle_detections), "ship_detections": len(ship_detections), "model_info": model_type, "output_path": str(output_path) } def main(): """Main function""" logger.info("Starting YOLOv8x detection test") # Set up test directories output_dir = setup_test_directories() # List of test images test_images = [ "test_files/cargo.jpg", "test_files/download.jpg", "test_files/hmm.jpg", "test_files/images.jpg", "test_files/ship.jpg", "test_files/sss.jpg", "test_files/sssss.jpg" ] # Run test on each image results = {} for img_path in test_images: if not os.path.exists(img_path): logger.warning(f"Image not found: {img_path}") continue result = test_yolov8x_detection(img_path, output_dir) results[os.path.basename(img_path)] = result # Print summary logger.info("\n\n--- YOLOv8x Detection Results Summary ---") for img_name, result in results.items(): logger.info(f"{img_name}:") logger.info(f" Scene type: {result.get('scene_type', 'unknown')}") logger.info(f" Model used: YOLOv8{result.get('model_info', '')}") logger.info(f" YOLOv8x detections: {result.get('yolo_detections', 0)}") logger.info(f" Plastic bottles: {result.get('bottle_detections', 0)}") logger.info(f" Ships: {result.get('ship_detections', 0)}") logger.info(f" Output: {result.get('output_path', 'none')}") logger.info("---") if __name__ == "__main__": main()