|
|
|
|
|
""" |
|
|
Benchmark YOLO models for performance on Raspberry Pi 4B (or current machine). |
|
|
Measures inference time, FPS, and memory usage for different model formats. |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import time |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
import numpy as np |
|
|
import cv2 |
|
|
import yaml |
|
|
from ultralytics import YOLO |
|
|
import psutil |
|
|
import platform |
|
|
|
|
|
def get_system_info(): |
|
|
"""Get system information for benchmarking context.""" |
|
|
info = { |
|
|
'system': platform.system(), |
|
|
'processor': platform.processor(), |
|
|
'architecture': platform.architecture()[0], |
|
|
'python_version': platform.python_version(), |
|
|
'cpu_count': psutil.cpu_count(logical=False), |
|
|
'memory_gb': psutil.virtual_memory().total / (1024**3), |
|
|
} |
|
|
return info |
|
|
|
|
|
def load_test_images(dataset_path, max_images=50): |
|
|
"""Load test images from dataset for benchmarking.""" |
|
|
test_images = [] |
|
|
|
|
|
|
|
|
possible_paths = [ |
|
|
Path(dataset_path) / "test" / "images", |
|
|
Path(dataset_path) / "valid" / "images", |
|
|
Path(dataset_path) / "val" / "images", |
|
|
Path(dataset_path) / "train" / "images", |
|
|
] |
|
|
|
|
|
for path in possible_paths: |
|
|
if path.exists(): |
|
|
image_files = list(path.glob("*.jpg")) + list(path.glob("*.png")) |
|
|
if image_files: |
|
|
test_images = [str(p) for p in image_files[:max_images]] |
|
|
print(f"Found {len(test_images)} images in {path}") |
|
|
break |
|
|
|
|
|
if not test_images: |
|
|
|
|
|
print("No test images found. Creating dummy images for benchmarking.") |
|
|
test_images = [] |
|
|
for i in range(10): |
|
|
|
|
|
dummy_img = np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8) |
|
|
dummy_path = f"/tmp/dummy_{i}.jpg" |
|
|
cv2.imwrite(dummy_path, dummy_img) |
|
|
test_images.append(dummy_path) |
|
|
|
|
|
return test_images |
|
|
|
|
|
def benchmark_model(model_path, test_images, img_size=640, warmup=10, runs=100): |
|
|
""" |
|
|
Benchmark a single model. |
|
|
|
|
|
Args: |
|
|
model_path: Path to model file (.pt, .onnx, .tflite) |
|
|
test_images: List of image paths for testing |
|
|
img_size: Input image size |
|
|
warmup: Number of warmup runs |
|
|
runs: Number of benchmark runs |
|
|
|
|
|
Returns: |
|
|
Dictionary with benchmark results |
|
|
""" |
|
|
print(f"\n{'='*60}") |
|
|
print(f"Benchmarking: {model_path}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
results = { |
|
|
'model': os.path.basename(model_path), |
|
|
'format': Path(model_path).suffix[1:], |
|
|
'size_mb': os.path.getsize(model_path) / (1024 * 1024) if os.path.exists(model_path) else 0, |
|
|
'inference_times': [], |
|
|
'memory_usage_mb': [], |
|
|
'success': False |
|
|
} |
|
|
|
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
print(f" ❌ Model not found: {model_path}") |
|
|
return results |
|
|
|
|
|
try: |
|
|
|
|
|
print(f" Loading model...") |
|
|
start_load = time.time() |
|
|
model = YOLO(model_path) |
|
|
load_time = time.time() - start_load |
|
|
results['load_time'] = load_time |
|
|
|
|
|
|
|
|
print(f" Warming up ({warmup} runs)...") |
|
|
for i in range(warmup): |
|
|
if i >= len(test_images): |
|
|
img_path = test_images[0] |
|
|
else: |
|
|
img_path = test_images[i] |
|
|
img = cv2.imread(img_path) |
|
|
if img is None: |
|
|
|
|
|
img = np.random.randint(0, 255, (img_size, img_size, 3), dtype=np.uint8) |
|
|
_ = model(img, verbose=False) |
|
|
|
|
|
|
|
|
print(f" Running benchmark ({runs} runs)...") |
|
|
for i in range(runs): |
|
|
|
|
|
img_idx = i % len(test_images) |
|
|
img_path = test_images[img_idx] |
|
|
img = cv2.imread(img_path) |
|
|
if img is None: |
|
|
img = np.random.randint(0, 255, (img_size, img_size, 3), dtype=np.uint8) |
|
|
|
|
|
|
|
|
process = psutil.Process(os.getpid()) |
|
|
mem_before = process.memory_info().rss / 1024 / 1024 |
|
|
|
|
|
|
|
|
start_time = time.perf_counter() |
|
|
results_inference = model(img, verbose=False) |
|
|
inference_time = time.perf_counter() - start_time |
|
|
|
|
|
|
|
|
mem_after = process.memory_info().rss / 1024 / 1024 |
|
|
mem_used = mem_after - mem_before |
|
|
|
|
|
results['inference_times'].append(inference_time) |
|
|
results['memory_usage_mb'].append(mem_used) |
|
|
|
|
|
|
|
|
if (i + 1) % 20 == 0: |
|
|
print(f" Completed {i+1}/{runs} runs...") |
|
|
|
|
|
|
|
|
if results['inference_times']: |
|
|
times = np.array(results['inference_times']) |
|
|
results['avg_inference_ms'] = np.mean(times) * 1000 |
|
|
results['std_inference_ms'] = np.std(times) * 1000 |
|
|
results['min_inference_ms'] = np.min(times) * 1000 |
|
|
results['max_inference_ms'] = np.max(times) * 1000 |
|
|
results['fps'] = 1.0 / np.mean(times) |
|
|
results['avg_memory_mb'] = np.mean(results['memory_usage_mb']) |
|
|
results['success'] = True |
|
|
|
|
|
print(f" ✅ Benchmark completed:") |
|
|
print(f" Model size: {results['size_mb']:.2f} MB") |
|
|
print(f" Avg inference: {results['avg_inference_ms']:.2f} ms") |
|
|
print(f" FPS: {results['fps']:.2f}") |
|
|
print(f" Memory usage: {results['avg_memory_mb']:.2f} MB") |
|
|
else: |
|
|
print(f" ❌ No inference times recorded") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ❌ Error benchmarking {model_path}: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
return results |
|
|
|
|
|
def benchmark_all_models(models_to_test, test_images, img_size=640): |
|
|
"""Benchmark multiple models and return results.""" |
|
|
all_results = [] |
|
|
|
|
|
for model_info in models_to_test: |
|
|
model_path = model_info['path'] |
|
|
if not os.path.exists(model_path): |
|
|
print(f"Skipping {model_path} - not found") |
|
|
continue |
|
|
|
|
|
results = benchmark_model( |
|
|
model_path=model_path, |
|
|
test_images=test_images, |
|
|
img_size=img_size, |
|
|
warmup=10, |
|
|
runs=50 |
|
|
) |
|
|
|
|
|
results.update({ |
|
|
'name': model_info['name'], |
|
|
'description': model_info.get('description', '') |
|
|
}) |
|
|
all_results.append(results) |
|
|
|
|
|
return all_results |
|
|
|
|
|
def print_results_table(results): |
|
|
"""Print benchmark results in a formatted table.""" |
|
|
print("\n" + "="*100) |
|
|
print("BENCHMARK RESULTS") |
|
|
print("="*100) |
|
|
print(f"{'Model':<30} {'Format':<8} {'Size (MB)':<10} {'Inference (ms)':<15} {'FPS':<10} {'Memory (MB)':<12} {'Status':<10}") |
|
|
print("-"*100) |
|
|
|
|
|
for r in results: |
|
|
if r['success']: |
|
|
print(f"{r['name'][:28]:<30} {r['format']:<8} {r['size_mb']:>9.2f} " |
|
|
f"{r['avg_inference_ms']:>14.2f} {r['fps']:>9.2f} {r['avg_memory_mb']:>11.2f} {'✅':<10}") |
|
|
else: |
|
|
print(f"{r['name'][:28]:<30} {r['format']:<8} {r['size_mb']:>9.2f} " |
|
|
f"{'N/A':>14} {'N/A':>9} {'N/A':>11} {'❌':<10}") |
|
|
|
|
|
print("="*100) |
|
|
|
|
|
|
|
|
successful = [r for r in results if r['success']] |
|
|
if successful: |
|
|
best_by_fps = max(successful, key=lambda x: x['fps']) |
|
|
best_by_size = min(successful, key=lambda x: x['size_mb']) |
|
|
best_by_memory = min(successful, key=lambda x: x['avg_memory_mb']) |
|
|
|
|
|
print(f"\n🏆 Best by FPS: {best_by_fps['name']} ({best_by_fps['fps']:.2f} FPS)") |
|
|
print(f"🏆 Best by size: {best_by_size['name']} ({best_by_size['size_mb']:.2f} MB)") |
|
|
print(f"🏆 Best by memory: {best_by_memory['name']} ({best_by_memory['avg_memory_mb']:.2f} MB)") |
|
|
|
|
|
def save_results_to_csv(results, output_path="benchmark_results.csv"): |
|
|
"""Save benchmark results to CSV file.""" |
|
|
import csv |
|
|
|
|
|
with open(output_path, 'w', newline='') as csvfile: |
|
|
fieldnames = ['name', 'format', 'size_mb', 'avg_inference_ms', |
|
|
'std_inference_ms', 'min_inference_ms', 'max_inference_ms', |
|
|
'fps', 'avg_memory_mb', 'load_time', 'success'] |
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) |
|
|
|
|
|
writer.writeheader() |
|
|
for r in results: |
|
|
writer.writerow({ |
|
|
'name': r['name'], |
|
|
'format': r['format'], |
|
|
'size_mb': r.get('size_mb', 0), |
|
|
'avg_inference_ms': r.get('avg_inference_ms', 0), |
|
|
'std_inference_ms': r.get('std_inference_ms', 0), |
|
|
'min_inference_ms': r.get('min_inference_ms', 0), |
|
|
'max_inference_ms': r.get('max_inference_ms', 0), |
|
|
'fps': r.get('fps', 0), |
|
|
'avg_memory_mb': r.get('avg_memory_mb', 0), |
|
|
'load_time': r.get('load_time', 0), |
|
|
'success': r['success'] |
|
|
}) |
|
|
|
|
|
print(f"\n📊 Results saved to {output_path}") |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description='Benchmark YOLO models for performance') |
|
|
parser.add_argument('--dataset', type=str, default='model/dataset_strawberry_detect_v3', |
|
|
help='Path to dataset for test images') |
|
|
parser.add_argument('--img-size', type=int, default=640, |
|
|
help='Input image size for inference') |
|
|
parser.add_argument('--output', type=str, default='benchmark_results.csv', |
|
|
help='Output CSV file for results') |
|
|
parser.add_argument('--config', type=str, default='config.yaml', |
|
|
help='Path to config file') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
config = {} |
|
|
if os.path.exists(args.config): |
|
|
with open(args.config, 'r') as f: |
|
|
config = yaml.safe_load(f) |
|
|
|
|
|
|
|
|
system_info = get_system_info() |
|
|
print("="*60) |
|
|
print("SYSTEM INFORMATION") |
|
|
print("="*60) |
|
|
for key, value in system_info.items(): |
|
|
print(f"{key.replace('_', ' ').title():<20}: {value}") |
|
|
|
|
|
|
|
|
models_to_test = [ |
|
|
|
|
|
{'name': 'YOLOv8n', 'path': 'yolov8n.pt', 'description': 'Ultralytics YOLOv8n'}, |
|
|
{'name': 'YOLOv8s', 'path': 'yolov8s.pt', 'description': 'Ultralytics YOLOv8s'}, |
|
|
{'name': 'YOLOv8m', 'path': 'yolov8m.pt', 'description': 'Ultralytics YOLOv8m'}, |
|
|
|
|
|
|
|
|
{'name': 'Strawberry YOLOv11n', 'path': 'model/weights/strawberry_yolov11n.pt', 'description': 'Custom trained on strawberry dataset'}, |
|
|
{'name': 'Strawberry YOLOv11n ONNX', 'path': 'model/weights/strawberry_yolov11n.onnx', 'description': 'ONNX export'}, |
|
|
|
|
|
|
|
|
{'name': 'Ripeness YOLOv11n', 'path': 'model/weights/ripeness_detection_yolov11n.pt', 'description': 'Ripeness detection model'}, |
|
|
{'name': 'Ripeness YOLOv11n ONNX', 'path': 'model/weights/ripeness_detection_yolov11n.onnx', 'description': 'ONNX export'}, |
|
|
] |
|
|
|
|
|
|
|
|
existing_models = [] |
|
|
for model in models_to_test: |
|
|
if os.path.exists(model['path']): |
|
|
existing_models.append(model) |
|
|
else: |
|
|
print(f"⚠️ Model not found: {model['path']}") |
|
|
|
|
|
if not existing_models: |
|
|
print("❌ No models found for benchmarking.") |
|
|
print("Please train a model first or download pretrained weights.") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
print(f"\n📷 Loading test images from {args.dataset}...") |
|
|
test_images = load_test_images(args.dataset, max_images=50) |
|
|
print(f" Loaded {len(test_images)} test images") |
|
|
|
|
|
|
|
|
print(f"\n🚀 Starting benchmarks...") |
|
|
results = benchmark_all_models(existing_models, test_images, img_size=args.img_size) |
|
|
|
|
|
|
|
|
print_results_table(results) |
|
|
|
|
|
|
|
|
save_results_to_csv(results, args.output) |
|
|
|
|
|
|
|
|
print(f"\n💡 RECOMMENDATIONS FOR RASPBERRY PI 4B:") |
|
|
print(f" 1. For fastest inference: Choose model with highest FPS") |
|
|
print(f" 2. For memory-constrained environments: Choose smallest model") |
|
|
print(f" 3. For best accuracy/speed tradeoff: Consider YOLOv8s") |
|
|
print(f" 4. For edge deployment: Convert to TFLite INT8 for ~2-3x speedup") |
|
|
|
|
|
|
|
|
if 'arm' in platform.machine().lower() or 'raspberry' in platform.system().lower(): |
|
|
print(f"\n🎯 Running on Raspberry Pi - results are accurate for deployment.") |
|
|
else: |
|
|
print(f"\n⚠️ Not running on Raspberry Pi - results are for reference only.") |
|
|
print(f" Actual Raspberry Pi performance may be 2-5x slower.") |
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |