Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Test script to explicitly use the largest YOLO model (YOLOv8x) for detection | |
| and verify that it's working correctly. | |
| """ | |
| import os | |
| import cv2 | |
| import logging | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import time | |
| import sys | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Import from app modules | |
| sys.path.append('.') # Add current directory to path | |
| from app.services.image_processing import ( | |
| initialize_yolo_model, detect_beach_scene, detect_water_scene, | |
| detect_plastic_bottles, detect_plastic_bottles_in_beach, | |
| detect_ships, check_for_plastic_bottle, check_for_ship, check_for_plastic_waste | |
| ) | |
| def setup_test_directories(): | |
| """Set up output directories for test results""" | |
| output_dir = Path("test_output/large_model_detection") | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| return output_dir | |
| def test_yolov8x_detection(img_path: str, output_dir: Path) -> Dict: | |
| """ | |
| Test YOLOv8x detection on a given image and save the results. | |
| Args: | |
| img_path: Path to test image | |
| output_dir: Output directory for results | |
| Returns: | |
| Dict with test results | |
| """ | |
| logger.info(f"Testing YOLOv8x detection on: {img_path}") | |
| # Read the image | |
| img = cv2.imread(img_path) | |
| if img is None: | |
| logger.error(f"Could not read image: {img_path}") | |
| return {} | |
| # Get image dimensions | |
| h, w = img.shape[:2] | |
| logger.info(f"Image dimensions: {w}x{h}") | |
| # Convert to HSV | |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
| # Detect scene type | |
| is_beach = detect_beach_scene(img, hsv) | |
| is_water = detect_water_scene(img, hsv) | |
| # Determine scene type | |
| if is_beach and is_water: | |
| scene_type = "coastal" | |
| elif is_beach: | |
| scene_type = "beach" | |
| elif is_water: | |
| scene_type = "water" | |
| else: | |
| scene_type = "other" | |
| logger.info(f"Scene type: {scene_type}") | |
| # Initialize YOLOv8x model explicitly | |
| start_time = time.time() | |
| logger.info("Initializing YOLOv8x model...") | |
| # Check if YOLOv8x exists and is a valid size | |
| model_path = "yolov8x.pt" | |
| need_download = False | |
| if not os.path.exists(model_path): | |
| logger.info("YOLOv8x model file not found, will download") | |
| need_download = True | |
| elif os.path.getsize(model_path) < 1000000: | |
| logger.info(f"YOLOv8x model file seems incomplete ({os.path.getsize(model_path)} bytes), will download") | |
| need_download = True | |
| else: | |
| logger.info(f"Found existing YOLOv8x model ({os.path.getsize(model_path)} bytes), using it") | |
| # Only download if needed | |
| if need_download: | |
| try: | |
| from ultralytics import YOLO | |
| logger.info("Downloading YOLOv8x model...") | |
| model = YOLO("yolov8x.pt") # This will download if not present | |
| logger.info(f"YOLOv8x download complete: {os.path.getsize(model_path)} bytes") | |
| except Exception as e: | |
| logger.error(f"Failed to download YOLOv8x: {e}") | |
| return {} | |
| # Initialize the model | |
| model = initialize_yolo_model() | |
| if model is None: | |
| logger.error("Failed to initialize YOLOv8x model") | |
| return {} | |
| # Get model info with improved handling for different return types | |
| model_type = "unknown" | |
| try: | |
| if hasattr(model, 'info'): | |
| model_info = model.info() | |
| logger.info(f"Model info type: {type(model_info)}") | |
| if isinstance(model_info, dict): | |
| model_type = model_info.get('model_type', 'unknown') | |
| logger.info(f"Using model: {model_type} (from dictionary)") | |
| elif isinstance(model_info, tuple): | |
| # For newer versions of Ultralytics that return tuples | |
| model_type = str(model_info[0]) if model_info and len(model_info) > 0 else "unknown" | |
| logger.info(f"Using model: {model_type} (from tuple)") | |
| elif hasattr(model_info, 'model_type'): | |
| # For object-based returns | |
| model_type = model_info.model_type | |
| logger.info(f"Using model: {model_type} (from object attribute)") | |
| else: | |
| # Fallback - extract from model path | |
| if hasattr(model, 'model') and hasattr(model.model, 'names'): | |
| logger.info(f"Model has {len(model.model.names)} classes") | |
| if len(model.model.names) > 80: | |
| model_type = "x" # Most likely YOLOv8x | |
| logger.info(f"Model info is not a standard format: {type(model_info)}") | |
| else: | |
| logger.info("Model does not have info() method") | |
| except Exception as e: | |
| logger.warning(f"Could not get model info: {e}") | |
| # Run inference | |
| logger.info("Running YOLOv8x inference...") | |
| results = model(img_path) | |
| inference_time = time.time() - start_time | |
| logger.info(f"Inference completed in {inference_time:.2f} seconds") | |
| # Process results | |
| result = results[0] if results and len(results) > 0 else None | |
| detections = [] | |
| if result: | |
| # Extract boxes, confidences, and class IDs | |
| logger.info(f"YOLOv8x detected {len(result.boxes)} objects") | |
| for box in result.boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| confidence = float(box.conf[0]) | |
| class_id = int(box.cls[0]) | |
| # Skip very low confidence detections | |
| if confidence < 0.1: | |
| continue | |
| # Get class name | |
| if hasattr(result, 'names') and class_id in result.names: | |
| class_name = result.names[class_id] | |
| else: | |
| class_name = f"class_{class_id}" | |
| # Add detection | |
| detections.append({ | |
| "class": class_name, | |
| "confidence": round(confidence, 3), | |
| "bbox": [x1, y1, x2, y2] | |
| }) | |
| # Draw detections on the image | |
| img_result = img.copy() | |
| # Add header with model info | |
| header = f"Model: YOLOv8x | Scene: {scene_type} | Objects: {len(detections)}" | |
| cv2.rectangle(img_result, (0, 0), (w, 30), (0, 0, 0), -1) | |
| cv2.putText(img_result, header, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # Draw all detections | |
| for det in detections: | |
| x1, y1, x2, y2 = det["bbox"] | |
| class_name = det["class"] | |
| confidence = det["confidence"] | |
| # Choose color based on class | |
| if class_name == "bottle": | |
| color = (0, 0, 255) # Red | |
| elif class_name == "person": | |
| color = (255, 0, 0) # Blue | |
| else: | |
| color = (0, 255, 0) # Green | |
| # Draw bounding box | |
| cv2.rectangle(img_result, (x1, y1), (x2, y2), color, 2) | |
| # Add label | |
| label = f"{class_name}: {confidence:.2f}" | |
| cv2.rectangle(img_result, (x1, y1 - 20), (x1 + len(label) * 8, y1), color, -1) | |
| cv2.putText(img_result, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| # Save result | |
| img_name = Path(img_path).name | |
| output_path = output_dir / f"yolov8x_{img_name}" | |
| cv2.imwrite(str(output_path), img_result) | |
| logger.info(f"Result saved to: {output_path}") | |
| # Special detection for plastic and ships | |
| bottle_detections = [] | |
| ship_detections = [] | |
| if is_beach: | |
| logger.info("Using beach-specific bottle detection") | |
| bottle_detections = detect_plastic_bottles_in_beach(img, hsv) | |
| else: | |
| logger.info("Using standard bottle detection") | |
| bottle_detections = detect_plastic_bottles(img, hsv) | |
| if is_water: | |
| logger.info("Detecting ships in water scene") | |
| ship_detections = detect_ships(img, hsv) | |
| logger.info(f"Detected {len(bottle_detections)} potential plastic bottles") | |
| logger.info(f"Detected {len(ship_detections)} potential ships") | |
| # Return results | |
| return { | |
| "scene_type": scene_type, | |
| "yolo_detections": len(detections), | |
| "bottle_detections": len(bottle_detections), | |
| "ship_detections": len(ship_detections), | |
| "model_info": model_type, | |
| "output_path": str(output_path) | |
| } | |
| def main(): | |
| """Main function""" | |
| logger.info("Starting YOLOv8x detection test") | |
| # Set up test directories | |
| output_dir = setup_test_directories() | |
| # List of test images | |
| test_images = [ | |
| "test_files/cargo.jpg", | |
| "test_files/download.jpg", | |
| "test_files/hmm.jpg", | |
| "test_files/images.jpg", | |
| "test_files/ship.jpg", | |
| "test_files/sss.jpg", | |
| "test_files/sssss.jpg" | |
| ] | |
| # Run test on each image | |
| results = {} | |
| for img_path in test_images: | |
| if not os.path.exists(img_path): | |
| logger.warning(f"Image not found: {img_path}") | |
| continue | |
| result = test_yolov8x_detection(img_path, output_dir) | |
| results[os.path.basename(img_path)] = result | |
| # Print summary | |
| logger.info("\n\n--- YOLOv8x Detection Results Summary ---") | |
| for img_name, result in results.items(): | |
| logger.info(f"{img_name}:") | |
| logger.info(f" Scene type: {result.get('scene_type', 'unknown')}") | |
| logger.info(f" Model used: YOLOv8{result.get('model_info', '')}") | |
| logger.info(f" YOLOv8x detections: {result.get('yolo_detections', 0)}") | |
| logger.info(f" Plastic bottles: {result.get('bottle_detections', 0)}") | |
| logger.info(f" Ships: {result.get('ship_detections', 0)}") | |
| logger.info(f" Output: {result.get('output_path', 'none')}") | |
| logger.info("---") | |
| if __name__ == "__main__": | |
| main() |