|
|
|
|
|
""" |
|
|
Benchmark script for mosaic generation performance analysis. |
|
|
""" |
|
|
|
|
|
import time |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import matplotlib.pyplot as plt |
|
|
from typing import Dict, List |
|
|
import argparse |
|
|
import os |
|
|
|
|
|
from src.config import Config, Implementation |
|
|
from src.pipeline import MosaicPipeline |
|
|
from src.utils import pil_to_np, np_to_pil |
|
|
|
|
|
|
|
|
def create_test_image(width: int = 512, height: int = 512) -> Image.Image: |
|
|
"""Create a test image with various features for benchmarking.""" |
|
|
|
|
|
img_array = np.zeros((height, width, 3), dtype=np.float32) |
|
|
|
|
|
|
|
|
for y in range(height): |
|
|
for x in range(width): |
|
|
|
|
|
img_array[y, x, 0] = x / width |
|
|
|
|
|
|
|
|
img_array[y, x, 1] = y / height |
|
|
|
|
|
|
|
|
img_array[y, x, 2] = (x + y) / (width + height) |
|
|
|
|
|
|
|
|
center_x, center_y = width // 2, height // 2 |
|
|
radius = min(width, height) // 4 |
|
|
|
|
|
for y in range(height): |
|
|
for x in range(width): |
|
|
|
|
|
dist = np.sqrt((x - center_x)**2 + (y - center_y)**2) |
|
|
if dist < radius: |
|
|
img_array[y, x] = [1.0, 0.5, 0.2] |
|
|
|
|
|
return np_to_pil(img_array) |
|
|
|
|
|
|
|
|
def benchmark_grid_sizes(pipeline: MosaicPipeline, test_image: Image.Image, |
|
|
grid_sizes: List[int]) -> Dict: |
|
|
"""Benchmark performance across different grid sizes.""" |
|
|
print("Benchmarking grid sizes...") |
|
|
results = {} |
|
|
|
|
|
for grid_size in grid_sizes: |
|
|
print(f"Testing grid size {grid_size}x{grid_size}...") |
|
|
|
|
|
|
|
|
pipeline.config.grid = grid_size |
|
|
pipeline.config.out_w = (test_image.width // grid_size) * grid_size |
|
|
pipeline.config.out_h = (test_image.height // grid_size) * grid_size |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
pipeline_results = pipeline.run_full_pipeline(test_image) |
|
|
total_time = time.time() - start_time |
|
|
|
|
|
results[grid_size] = { |
|
|
'processing_time': total_time, |
|
|
'total_tiles': grid_size * grid_size, |
|
|
'tiles_per_second': (grid_size * grid_size) / total_time, |
|
|
'mse': pipeline_results['metrics']['mse'], |
|
|
'ssim': pipeline_results['metrics']['ssim'], |
|
|
'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}" |
|
|
} |
|
|
|
|
|
print(f" Processing time: {total_time:.3f}s") |
|
|
print(f" Tiles per second: {results[grid_size]['tiles_per_second']:.1f}") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def benchmark_implementations(pipeline: MosaicPipeline, test_image: Image.Image) -> Dict: |
|
|
"""Compare vectorized vs loop-based implementations.""" |
|
|
print("Benchmarking implementations...") |
|
|
|
|
|
results = {} |
|
|
|
|
|
|
|
|
print("Testing vectorized implementation...") |
|
|
pipeline.config.impl = Implementation.VECT |
|
|
start_time = time.time() |
|
|
vec_results = pipeline.run_full_pipeline(test_image) |
|
|
vec_time = time.time() - start_time |
|
|
|
|
|
results['vectorized'] = { |
|
|
'processing_time': vec_time, |
|
|
'mse': vec_results['metrics']['mse'], |
|
|
'ssim': vec_results['metrics']['ssim'] |
|
|
} |
|
|
|
|
|
|
|
|
print("Testing loop-based implementation...") |
|
|
pipeline.config.impl = Implementation.LOOPS |
|
|
start_time = time.time() |
|
|
loop_results = pipeline.run_full_pipeline(test_image) |
|
|
loop_time = time.time() - start_time |
|
|
|
|
|
results['loop_based'] = { |
|
|
'processing_time': loop_time, |
|
|
'mse': loop_results['metrics']['mse'], |
|
|
'ssim': loop_results['metrics']['ssim'] |
|
|
} |
|
|
|
|
|
|
|
|
speedup = loop_time / vec_time if vec_time > 0 else 0 |
|
|
results['comparison'] = { |
|
|
'speedup_factor': speedup, |
|
|
'vectorized_faster': vec_time < loop_time |
|
|
} |
|
|
|
|
|
print(f"Vectorized: {vec_time:.3f}s") |
|
|
print(f"Loop-based: {loop_time:.3f}s") |
|
|
print(f"Speedup factor: {speedup:.2f}x") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def plot_benchmark_results(grid_results: Dict, impl_results: Dict, output_dir: str = "images"): |
|
|
"""Create plots of benchmark results.""" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(10, 6)) |
|
|
grid_sizes = sorted(grid_results.keys()) |
|
|
processing_times = [grid_results[gs]['processing_time'] for gs in grid_sizes] |
|
|
total_tiles = [grid_results[gs]['total_tiles'] for gs in grid_sizes] |
|
|
|
|
|
plt.subplot(1, 2, 1) |
|
|
plt.plot(grid_sizes, processing_times, 'bo-', linewidth=2, markersize=8) |
|
|
plt.xlabel('Grid Size') |
|
|
plt.ylabel('Processing Time (seconds)') |
|
|
plt.title('Processing Time vs Grid Size') |
|
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
plt.subplot(1, 2, 2) |
|
|
plt.plot(total_tiles, processing_times, 'ro-', linewidth=2, markersize=8) |
|
|
plt.xlabel('Total Number of Tiles') |
|
|
plt.ylabel('Processing Time (seconds)') |
|
|
plt.title('Processing Time vs Number of Tiles') |
|
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{output_dir}/processing_time_analysis.png", dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 5)) |
|
|
|
|
|
plt.subplot(1, 2, 1) |
|
|
mse_values = [grid_results[gs]['mse'] for gs in grid_sizes] |
|
|
plt.plot(grid_sizes, mse_values, 'go-', linewidth=2, markersize=8) |
|
|
plt.xlabel('Grid Size') |
|
|
plt.ylabel('MSE') |
|
|
plt.title('Mean Squared Error vs Grid Size') |
|
|
plt.grid(True, alpha=0.3) |
|
|
plt.yscale('log') |
|
|
|
|
|
plt.subplot(1, 2, 2) |
|
|
ssim_values = [grid_results[gs]['ssim'] for gs in grid_sizes] |
|
|
plt.plot(grid_sizes, ssim_values, 'mo-', linewidth=2, markersize=8) |
|
|
plt.xlabel('Grid Size') |
|
|
plt.ylabel('SSIM') |
|
|
plt.title('Structural Similarity vs Grid Size') |
|
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{output_dir}/quality_metrics_analysis.png", dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
|
|
|
plt.figure(figsize=(8, 6)) |
|
|
impl_names = ['Vectorized', 'Loop-based'] |
|
|
impl_times = [ |
|
|
impl_results['vectorized']['processing_time'], |
|
|
impl_results['loop_based']['processing_time'] |
|
|
] |
|
|
|
|
|
bars = plt.bar(impl_names, impl_times, color=['skyblue', 'lightcoral']) |
|
|
plt.ylabel('Processing Time (seconds)') |
|
|
plt.title('Implementation Performance Comparison') |
|
|
plt.grid(True, alpha=0.3, axis='y') |
|
|
|
|
|
|
|
|
for bar, time_val in zip(bars, impl_times): |
|
|
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, |
|
|
f'{time_val:.3f}s', ha='center', va='bottom') |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{output_dir}/implementation_comparison.png", dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
|
|
|
|
|
|
def generate_benchmark_report(grid_results: Dict, impl_results: Dict, output_file: str = "benchmark_report.txt"): |
|
|
"""Generate a comprehensive benchmark report.""" |
|
|
with open(output_file, 'w') as f: |
|
|
f.write("MOSAIC GENERATION BENCHMARK REPORT\n") |
|
|
f.write("=" * 50 + "\n\n") |
|
|
|
|
|
|
|
|
f.write("GRID SIZE PERFORMANCE ANALYSIS\n") |
|
|
f.write("-" * 30 + "\n") |
|
|
for grid_size in sorted(grid_results.keys()): |
|
|
result = grid_results[grid_size] |
|
|
f.write(f"Grid {grid_size}x{grid_size}:\n") |
|
|
f.write(f" Processing Time: {result['processing_time']:.3f}s\n") |
|
|
f.write(f" Total Tiles: {result['total_tiles']}\n") |
|
|
f.write(f" Tiles per Second: {result['tiles_per_second']:.1f}\n") |
|
|
f.write(f" MSE: {result['mse']:.6f}\n") |
|
|
f.write(f" SSIM: {result['ssim']:.4f}\n") |
|
|
f.write(f" Output Resolution: {result['output_resolution']}\n\n") |
|
|
|
|
|
|
|
|
grid_sizes = sorted(grid_results.keys()) |
|
|
if len(grid_sizes) >= 2: |
|
|
first_result = grid_results[grid_sizes[0]] |
|
|
last_result = grid_results[grid_sizes[-1]] |
|
|
|
|
|
tile_ratio = last_result['total_tiles'] / first_result['total_tiles'] |
|
|
time_ratio = last_result['processing_time'] / first_result['processing_time'] |
|
|
|
|
|
f.write("SCALING ANALYSIS\n") |
|
|
f.write("-" * 20 + "\n") |
|
|
f.write(f"Tile increase ratio: {tile_ratio:.2f}x\n") |
|
|
f.write(f"Time increase ratio: {time_ratio:.2f}x\n") |
|
|
f.write(f"Scaling efficiency: {tile_ratio/time_ratio:.2f}\n") |
|
|
f.write(f"Linear scaling: {'Yes' if abs(time_ratio - tile_ratio) / tile_ratio < 0.1 else 'No'}\n\n") |
|
|
|
|
|
|
|
|
f.write("IMPLEMENTATION COMPARISON\n") |
|
|
f.write("-" * 25 + "\n") |
|
|
f.write(f"Vectorized processing time: {impl_results['vectorized']['processing_time']:.3f}s\n") |
|
|
f.write(f"Loop-based processing time: {impl_results['loop_based']['processing_time']:.3f}s\n") |
|
|
f.write(f"Speedup factor: {impl_results['comparison']['speedup_factor']:.2f}x\n") |
|
|
f.write(f"Vectorized is faster: {'Yes' if impl_results['comparison']['vectorized_faster'] else 'No'}\n\n") |
|
|
|
|
|
|
|
|
f.write("QUALITY COMPARISON\n") |
|
|
f.write("-" * 18 + "\n") |
|
|
f.write(f"Vectorized MSE: {impl_results['vectorized']['mse']:.6f}\n") |
|
|
f.write(f"Loop-based MSE: {impl_results['loop_based']['mse']:.6f}\n") |
|
|
f.write(f"Vectorized SSIM: {impl_results['vectorized']['ssim']:.4f}\n") |
|
|
f.write(f"Loop-based SSIM: {impl_results['loop_based']['ssim']:.4f}\n") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main benchmark function.""" |
|
|
parser = argparse.ArgumentParser(description='Benchmark mosaic generation performance') |
|
|
parser.add_argument('--grid-sizes', nargs='+', type=int, default=[16, 32, 48, 64], |
|
|
help='Grid sizes to test (default: 16 32 48 64)') |
|
|
parser.add_argument('--output-dir', default='images', help='Output directory for plots') |
|
|
parser.add_argument('--test-image', help='Path to test image (optional)') |
|
|
args = parser.parse_args() |
|
|
|
|
|
print("Starting mosaic generation benchmark...") |
|
|
|
|
|
|
|
|
if args.test_image and os.path.exists(args.test_image): |
|
|
test_image = Image.open(args.test_image) |
|
|
print(f"Using test image: {args.test_image}") |
|
|
else: |
|
|
test_image = create_test_image() |
|
|
print("Using generated test image") |
|
|
|
|
|
|
|
|
config = Config(grid=32) |
|
|
pipeline = MosaicPipeline(config) |
|
|
|
|
|
|
|
|
print("\n" + "="*50) |
|
|
grid_results = benchmark_grid_sizes(pipeline, test_image, args.grid_sizes) |
|
|
|
|
|
print("\n" + "="*50) |
|
|
impl_results = benchmark_implementations(pipeline, test_image) |
|
|
|
|
|
|
|
|
print("\nGenerating plots and report...") |
|
|
plot_benchmark_results(grid_results, impl_results, args.output_dir) |
|
|
generate_benchmark_report(grid_results, impl_results) |
|
|
|
|
|
print(f"\nBenchmark complete!") |
|
|
print(f"Plots saved to: {args.output_dir}/") |
|
|
print(f"Report saved to: benchmark_report.txt") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|