from __future__ import annotations import numpy as np from PIL import Image from typing import Dict, List, Tuple, Optional import time from .config import Config, Implementation from .mosaic import MosaicGenerator from .metrics import calculate_comprehensive_metrics, interpret_metrics from .utils import pil_to_np, np_to_pil class MosaicPipeline: """Complete pipeline for mosaic generation with performance analysis.""" def __init__(self, config: Config): self.config = config self.mosaic_generator = MosaicGenerator(config) self.results = {} def run_full_pipeline(self, image: Image.Image) -> Dict: """ Run the complete mosaic generation pipeline. Args: image: Input PIL Image Returns: Dictionary with all results and metrics """ results = { 'input_image': image, 'config': self.config.__dict__.copy(), 'timing': {}, 'metrics': {}, 'outputs': {} } # Generate mosaic start_time = time.time() mosaic_img, stats = self.mosaic_generator.generate_mosaic(image) results['timing'] = stats['processing_time'] results['outputs']['mosaic'] = mosaic_img # Calculate similarity metrics metrics_start = time.time() metrics = calculate_comprehensive_metrics(image, mosaic_img) results['metrics'] = metrics results['metrics_interpretation'] = interpret_metrics(metrics) results['timing']['metrics_calculation'] = time.time() - metrics_start # Store additional information results['outputs']['processed_image'] = self.mosaic_generator.preprocess_image(image) results['grid_info'] = { 'grid_size': self.config.grid, 'tile_size': self.config.tile_size, 'total_tiles': self.config.grid ** 2 } self.results = results return results def benchmark_implementations(self, image: Image.Image) -> Dict: """ Compare vectorized vs loop-based implementations. Args: image: Input PIL Image Returns: Dictionary with performance comparison """ original_impl = self.config.impl results = { 'vectorized': {}, 'loop_based': {}, 'comparison': {} } # Test vectorized implementation self.config.impl = Implementation.VECT start_time = time.time() vec_results = self.run_full_pipeline(image) vec_time = time.time() - start_time results['vectorized'] = { 'processing_time': vec_time, 'metrics': vec_results['metrics'], 'mosaic': vec_results['outputs']['mosaic'] } # Test loop-based implementation self.config.impl = Implementation.LOOPS start_time = time.time() loop_results = self.run_full_pipeline(image) loop_time = time.time() - start_time results['loop_based'] = { 'processing_time': loop_time, 'metrics': loop_results['metrics'], 'mosaic': loop_results['outputs']['mosaic'] } # Calculate comparison speedup = loop_time / vec_time if vec_time > 0 else 0 results['comparison'] = { 'speedup_factor': speedup, 'time_difference': loop_time - vec_time, 'vectorized_faster': vec_time < loop_time } # Restore original implementation self.config.impl = original_impl return results def benchmark_grid_sizes(self, image: Image.Image, grid_sizes: List[int]) -> Dict: """ Benchmark performance for different grid sizes. Args: image: Input PIL Image grid_sizes: List of grid sizes to test Returns: Dictionary with grid size performance results """ results = {} original_grid = self.config.grid original_out_w = self.config.out_w original_out_h = self.config.out_h for grid_size in grid_sizes: self.config.grid = grid_size # Calculate appropriate output dimensions aspect_ratio = image.width / image.height if aspect_ratio > 1: # Landscape self.config.out_w = (image.width // grid_size) * grid_size self.config.out_h = int(self.config.out_w / aspect_ratio // grid_size) * grid_size else: # Portrait self.config.out_h = (image.height // grid_size) * grid_size self.config.out_w = int(self.config.out_h * aspect_ratio // grid_size) * grid_size # Time the generation start_time = time.time() pipeline_results = self.run_full_pipeline(image) total_time = time.time() - start_time results[grid_size] = { 'processing_time': total_time, 'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}", 'total_tiles': grid_size * grid_size, 'tiles_per_second': (grid_size * grid_size) / total_time if total_time > 0 else 0, 'metrics': pipeline_results['metrics'] } # Restore original configuration self.config.grid = original_grid self.config.out_w = original_out_w self.config.out_h = original_out_h return results def analyze_performance_scaling(self, benchmark_results: Dict) -> Dict: """ Analyze how performance scales with grid size. Args: benchmark_results: Results from benchmark_grid_sizes Returns: Dictionary with scaling analysis """ grid_sizes = sorted(benchmark_results.keys()) processing_times = [benchmark_results[gs]['processing_time'] for gs in grid_sizes] total_tiles = [benchmark_results[gs]['total_tiles'] for gs in grid_sizes] tiles_per_second = [benchmark_results[gs]['tiles_per_second'] for gs in grid_sizes] # Calculate scaling factors scaling_analysis = { 'grid_sizes': grid_sizes, 'processing_times': processing_times, 'total_tiles': total_tiles, 'tiles_per_second': tiles_per_second, 'scaling_factors': {} } if len(grid_sizes) >= 2: # Calculate how processing time scales with number of tiles tile_ratio = total_tiles[-1] / total_tiles[0] time_ratio = processing_times[-1] / processing_times[0] scaling_analysis['scaling_factors'] = { 'tile_increase_ratio': tile_ratio, 'time_increase_ratio': time_ratio, 'scaling_efficiency': tile_ratio / time_ratio if time_ratio > 0 else 0, 'is_linear_scaling': abs(time_ratio - tile_ratio) / tile_ratio < 0.1 } return scaling_analysis def generate_report(self, image: Image.Image, benchmark_results: Optional[Dict] = None) -> str: """ Generate a comprehensive report of the mosaic generation process. Args: image: Input PIL Image benchmark_results: Optional benchmark results Returns: Formatted report string """ # Run full pipeline if not already done if not self.results: self.run_full_pipeline(image) report = [] report.append("=" * 60) report.append("MOSAIC GENERATION REPORT") report.append("=" * 60) # Configuration report.append("\nCONFIGURATION:") report.append(f"Grid Size: {self.config.grid}x{self.config.grid}") report.append(f"Tile Size: {self.config.tile_size}x{self.config.tile_size}") report.append(f"Output Resolution: {self.config.out_w}x{self.config.out_h}") report.append(f"Implementation: {self.config.impl.value}") report.append(f"Color Matching: {self.config.match_space.value}") report.append(f"Total Tiles: {self.config.grid ** 2}") # Processing Time report.append("\nPROCESSING TIME:") for stage, time_val in self.results['timing'].items(): report.append(f"{stage.replace('_', ' ').title()}: {time_val:.3f} seconds") # Quality Metrics report.append("\nQUALITY METRICS:") metrics = self.results['metrics'] interpretations = self.results['metrics_interpretation'] report.append(f"MSE: {metrics['mse']:.6f} ({interpretations['mse']})") report.append(f"PSNR: {metrics['psnr']:.2f} dB ({interpretations['psnr']})") report.append(f"SSIM: {metrics['ssim']:.4f} ({interpretations['ssim']})") report.append(f"RMSE: {metrics['rmse']:.6f}") report.append(f"MAE: {metrics['mae']:.6f}") # Benchmark Results if benchmark_results: report.append("\nBENCHMARK RESULTS:") for grid_size, result in benchmark_results.items(): report.append(f"Grid {grid_size}x{grid_size}:") report.append(f" Processing Time: {result['processing_time']:.3f}s") report.append(f" Tiles per Second: {result['tiles_per_second']:.1f}") report.append(f" Output Resolution: {result['output_resolution']}") report.append("\n" + "=" * 60) return "\n".join(report)