Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Benchmark script for mosaic generation performance analysis. | |
| """ | |
| import time | |
| import numpy as np | |
| from PIL import Image | |
| import matplotlib.pyplot as plt | |
| from typing import Dict, List | |
| import argparse | |
| import os | |
| from src.config import Config, Implementation | |
| from src.pipeline import MosaicPipeline | |
| from src.utils import pil_to_np, np_to_pil | |
| def create_test_image(width: int = 512, height: int = 512) -> Image.Image: | |
| """Create a test image with various features for benchmarking.""" | |
| # Create a colorful test image with gradients and patterns | |
| img_array = np.zeros((height, width, 3), dtype=np.float32) | |
| # Create gradient patterns | |
| for y in range(height): | |
| for x in range(width): | |
| # Red gradient | |
| img_array[y, x, 0] = x / width | |
| # Green gradient | |
| img_array[y, x, 1] = y / height | |
| # Blue pattern | |
| img_array[y, x, 2] = (x + y) / (width + height) | |
| # Add some geometric shapes | |
| center_x, center_y = width // 2, height // 2 | |
| radius = min(width, height) // 4 | |
| for y in range(height): | |
| for x in range(width): | |
| # Circle | |
| dist = np.sqrt((x - center_x)**2 + (y - center_y)**2) | |
| if dist < radius: | |
| img_array[y, x] = [1.0, 0.5, 0.2] # Orange circle | |
| return np_to_pil(img_array) | |
| def benchmark_grid_sizes(pipeline: MosaicPipeline, test_image: Image.Image, | |
| grid_sizes: List[int]) -> Dict: | |
| """Benchmark performance across different grid sizes.""" | |
| print("Benchmarking grid sizes...") | |
| results = {} | |
| for grid_size in grid_sizes: | |
| print(f"Testing grid size {grid_size}x{grid_size}...") | |
| # Update config | |
| pipeline.config.grid = grid_size | |
| pipeline.config.out_w = (test_image.width // grid_size) * grid_size | |
| pipeline.config.out_h = (test_image.height // grid_size) * grid_size | |
| # Time the generation | |
| start_time = time.time() | |
| pipeline_results = pipeline.run_full_pipeline(test_image) | |
| total_time = time.time() - start_time | |
| results[grid_size] = { | |
| 'processing_time': total_time, | |
| 'total_tiles': grid_size * grid_size, | |
| 'tiles_per_second': (grid_size * grid_size) / total_time, | |
| 'mse': pipeline_results['metrics']['mse'], | |
| 'ssim': pipeline_results['metrics']['ssim'], | |
| 'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}" | |
| } | |
| print(f" Processing time: {total_time:.3f}s") | |
| print(f" Tiles per second: {results[grid_size]['tiles_per_second']:.1f}") | |
| return results | |
| def benchmark_implementations(pipeline: MosaicPipeline, test_image: Image.Image) -> Dict: | |
| """Compare vectorized vs loop-based implementations.""" | |
| print("Benchmarking implementations...") | |
| results = {} | |
| # Test vectorized implementation | |
| print("Testing vectorized implementation...") | |
| pipeline.config.impl = Implementation.VECT | |
| start_time = time.time() | |
| vec_results = pipeline.run_full_pipeline(test_image) | |
| vec_time = time.time() - start_time | |
| results['vectorized'] = { | |
| 'processing_time': vec_time, | |
| 'mse': vec_results['metrics']['mse'], | |
| 'ssim': vec_results['metrics']['ssim'] | |
| } | |
| # Test loop-based implementation | |
| print("Testing loop-based implementation...") | |
| pipeline.config.impl = Implementation.LOOPS | |
| start_time = time.time() | |
| loop_results = pipeline.run_full_pipeline(test_image) | |
| loop_time = time.time() - start_time | |
| results['loop_based'] = { | |
| 'processing_time': loop_time, | |
| 'mse': loop_results['metrics']['mse'], | |
| 'ssim': loop_results['metrics']['ssim'] | |
| } | |
| # Calculate comparison | |
| speedup = loop_time / vec_time if vec_time > 0 else 0 | |
| results['comparison'] = { | |
| 'speedup_factor': speedup, | |
| 'vectorized_faster': vec_time < loop_time | |
| } | |
| print(f"Vectorized: {vec_time:.3f}s") | |
| print(f"Loop-based: {loop_time:.3f}s") | |
| print(f"Speedup factor: {speedup:.2f}x") | |
| return results | |
| def plot_benchmark_results(grid_results: Dict, impl_results: Dict, output_dir: str = "images"): | |
| """Create plots of benchmark results.""" | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Plot 1: Processing time vs grid size | |
| plt.figure(figsize=(10, 6)) | |
| grid_sizes = sorted(grid_results.keys()) | |
| processing_times = [grid_results[gs]['processing_time'] for gs in grid_sizes] | |
| total_tiles = [grid_results[gs]['total_tiles'] for gs in grid_sizes] | |
| plt.subplot(1, 2, 1) | |
| plt.plot(grid_sizes, processing_times, 'bo-', linewidth=2, markersize=8) | |
| plt.xlabel('Grid Size') | |
| plt.ylabel('Processing Time (seconds)') | |
| plt.title('Processing Time vs Grid Size') | |
| plt.grid(True, alpha=0.3) | |
| plt.subplot(1, 2, 2) | |
| plt.plot(total_tiles, processing_times, 'ro-', linewidth=2, markersize=8) | |
| plt.xlabel('Total Number of Tiles') | |
| plt.ylabel('Processing Time (seconds)') | |
| plt.title('Processing Time vs Number of Tiles') | |
| plt.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| plt.savefig(f"{output_dir}/processing_time_analysis.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| # Plot 2: Quality metrics vs grid size | |
| plt.figure(figsize=(12, 5)) | |
| plt.subplot(1, 2, 1) | |
| mse_values = [grid_results[gs]['mse'] for gs in grid_sizes] | |
| plt.plot(grid_sizes, mse_values, 'go-', linewidth=2, markersize=8) | |
| plt.xlabel('Grid Size') | |
| plt.ylabel('MSE') | |
| plt.title('Mean Squared Error vs Grid Size') | |
| plt.grid(True, alpha=0.3) | |
| plt.yscale('log') | |
| plt.subplot(1, 2, 2) | |
| ssim_values = [grid_results[gs]['ssim'] for gs in grid_sizes] | |
| plt.plot(grid_sizes, ssim_values, 'mo-', linewidth=2, markersize=8) | |
| plt.xlabel('Grid Size') | |
| plt.ylabel('SSIM') | |
| plt.title('Structural Similarity vs Grid Size') | |
| plt.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| plt.savefig(f"{output_dir}/quality_metrics_analysis.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| # Plot 3: Implementation comparison | |
| plt.figure(figsize=(8, 6)) | |
| impl_names = ['Vectorized', 'Loop-based'] | |
| impl_times = [ | |
| impl_results['vectorized']['processing_time'], | |
| impl_results['loop_based']['processing_time'] | |
| ] | |
| bars = plt.bar(impl_names, impl_times, color=['skyblue', 'lightcoral']) | |
| plt.ylabel('Processing Time (seconds)') | |
| plt.title('Implementation Performance Comparison') | |
| plt.grid(True, alpha=0.3, axis='y') | |
| # Add value labels on bars | |
| for bar, time_val in zip(bars, impl_times): | |
| plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, | |
| f'{time_val:.3f}s', ha='center', va='bottom') | |
| plt.tight_layout() | |
| plt.savefig(f"{output_dir}/implementation_comparison.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def generate_benchmark_report(grid_results: Dict, impl_results: Dict, output_file: str = "benchmark_report.txt"): | |
| """Generate a comprehensive benchmark report.""" | |
| with open(output_file, 'w') as f: | |
| f.write("MOSAIC GENERATION BENCHMARK REPORT\n") | |
| f.write("=" * 50 + "\n\n") | |
| # Grid size analysis | |
| f.write("GRID SIZE PERFORMANCE ANALYSIS\n") | |
| f.write("-" * 30 + "\n") | |
| for grid_size in sorted(grid_results.keys()): | |
| result = grid_results[grid_size] | |
| f.write(f"Grid {grid_size}x{grid_size}:\n") | |
| f.write(f" Processing Time: {result['processing_time']:.3f}s\n") | |
| f.write(f" Total Tiles: {result['total_tiles']}\n") | |
| f.write(f" Tiles per Second: {result['tiles_per_second']:.1f}\n") | |
| f.write(f" MSE: {result['mse']:.6f}\n") | |
| f.write(f" SSIM: {result['ssim']:.4f}\n") | |
| f.write(f" Output Resolution: {result['output_resolution']}\n\n") | |
| # Scaling analysis | |
| grid_sizes = sorted(grid_results.keys()) | |
| if len(grid_sizes) >= 2: | |
| first_result = grid_results[grid_sizes[0]] | |
| last_result = grid_results[grid_sizes[-1]] | |
| tile_ratio = last_result['total_tiles'] / first_result['total_tiles'] | |
| time_ratio = last_result['processing_time'] / first_result['processing_time'] | |
| f.write("SCALING ANALYSIS\n") | |
| f.write("-" * 20 + "\n") | |
| f.write(f"Tile increase ratio: {tile_ratio:.2f}x\n") | |
| f.write(f"Time increase ratio: {time_ratio:.2f}x\n") | |
| f.write(f"Scaling efficiency: {tile_ratio/time_ratio:.2f}\n") | |
| f.write(f"Linear scaling: {'Yes' if abs(time_ratio - tile_ratio) / tile_ratio < 0.1 else 'No'}\n\n") | |
| # Implementation comparison | |
| f.write("IMPLEMENTATION COMPARISON\n") | |
| f.write("-" * 25 + "\n") | |
| f.write(f"Vectorized processing time: {impl_results['vectorized']['processing_time']:.3f}s\n") | |
| f.write(f"Loop-based processing time: {impl_results['loop_based']['processing_time']:.3f}s\n") | |
| f.write(f"Speedup factor: {impl_results['comparison']['speedup_factor']:.2f}x\n") | |
| f.write(f"Vectorized is faster: {'Yes' if impl_results['comparison']['vectorized_faster'] else 'No'}\n\n") | |
| # Quality comparison | |
| f.write("QUALITY COMPARISON\n") | |
| f.write("-" * 18 + "\n") | |
| f.write(f"Vectorized MSE: {impl_results['vectorized']['mse']:.6f}\n") | |
| f.write(f"Loop-based MSE: {impl_results['loop_based']['mse']:.6f}\n") | |
| f.write(f"Vectorized SSIM: {impl_results['vectorized']['ssim']:.4f}\n") | |
| f.write(f"Loop-based SSIM: {impl_results['loop_based']['ssim']:.4f}\n") | |
| def main(): | |
| """Main benchmark function.""" | |
| parser = argparse.ArgumentParser(description='Benchmark mosaic generation performance') | |
| parser.add_argument('--grid-sizes', nargs='+', type=int, default=[16, 32, 48, 64], | |
| help='Grid sizes to test (default: 16 32 48 64)') | |
| parser.add_argument('--output-dir', default='images', help='Output directory for plots') | |
| parser.add_argument('--test-image', help='Path to test image (optional)') | |
| args = parser.parse_args() | |
| print("Starting mosaic generation benchmark...") | |
| # Create test image | |
| if args.test_image and os.path.exists(args.test_image): | |
| test_image = Image.open(args.test_image) | |
| print(f"Using test image: {args.test_image}") | |
| else: | |
| test_image = create_test_image() | |
| print("Using generated test image") | |
| # Create pipeline | |
| config = Config(grid=32) # Default grid size | |
| pipeline = MosaicPipeline(config) | |
| # Run benchmarks | |
| print("\n" + "="*50) | |
| grid_results = benchmark_grid_sizes(pipeline, test_image, args.grid_sizes) | |
| print("\n" + "="*50) | |
| impl_results = benchmark_implementations(pipeline, test_image) | |
| # Generate plots and report | |
| print("\nGenerating plots and report...") | |
| plot_benchmark_results(grid_results, impl_results, args.output_dir) | |
| generate_benchmark_report(grid_results, impl_results) | |
| print(f"\nBenchmark complete!") | |
| print(f"Plots saved to: {args.output_dir}/") | |
| print(f"Report saved to: benchmark_report.txt") | |
| if __name__ == "__main__": | |
| main() | |