Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import numpy as np | |
| from PIL import Image | |
| from typing import Dict, List, Tuple, Optional | |
| import time | |
| from .config import Config, Implementation | |
| from .mosaic import MosaicGenerator | |
| from .metrics import calculate_comprehensive_metrics, interpret_metrics | |
| from .utils import pil_to_np, np_to_pil | |
| class MosaicPipeline: | |
| """Complete pipeline for mosaic generation with performance analysis.""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.mosaic_generator = MosaicGenerator(config) | |
| self.results = {} | |
| def run_full_pipeline(self, image: Image.Image) -> Dict: | |
| """ | |
| Run the complete mosaic generation pipeline. | |
| Args: | |
| image: Input PIL Image | |
| Returns: | |
| Dictionary with all results and metrics | |
| """ | |
| results = { | |
| 'input_image': image, | |
| 'config': self.config.__dict__.copy(), | |
| 'timing': {}, | |
| 'metrics': {}, | |
| 'outputs': {} | |
| } | |
| # Generate mosaic | |
| start_time = time.time() | |
| mosaic_img, stats = self.mosaic_generator.generate_mosaic(image) | |
| results['timing'] = stats['processing_time'] | |
| results['outputs']['mosaic'] = mosaic_img | |
| # Calculate similarity metrics | |
| metrics_start = time.time() | |
| metrics = calculate_comprehensive_metrics(image, mosaic_img) | |
| results['metrics'] = metrics | |
| results['metrics_interpretation'] = interpret_metrics(metrics) | |
| results['timing']['metrics_calculation'] = time.time() - metrics_start | |
| # Store additional information | |
| results['outputs']['processed_image'] = self.mosaic_generator.preprocess_image(image) | |
| results['grid_info'] = { | |
| 'grid_size': self.config.grid, | |
| 'tile_size': self.config.tile_size, | |
| 'total_tiles': self.config.grid ** 2 | |
| } | |
| self.results = results | |
| return results | |
| def benchmark_implementations(self, image: Image.Image) -> Dict: | |
| """ | |
| Compare vectorized vs loop-based implementations. | |
| Args: | |
| image: Input PIL Image | |
| Returns: | |
| Dictionary with performance comparison | |
| """ | |
| original_impl = self.config.impl | |
| results = { | |
| 'vectorized': {}, | |
| 'loop_based': {}, | |
| 'comparison': {} | |
| } | |
| # Test vectorized implementation | |
| self.config.impl = Implementation.VECT | |
| start_time = time.time() | |
| vec_results = self.run_full_pipeline(image) | |
| vec_time = time.time() - start_time | |
| results['vectorized'] = { | |
| 'processing_time': vec_time, | |
| 'metrics': vec_results['metrics'], | |
| 'mosaic': vec_results['outputs']['mosaic'] | |
| } | |
| # Test loop-based implementation | |
| self.config.impl = Implementation.LOOPS | |
| start_time = time.time() | |
| loop_results = self.run_full_pipeline(image) | |
| loop_time = time.time() - start_time | |
| results['loop_based'] = { | |
| 'processing_time': loop_time, | |
| 'metrics': loop_results['metrics'], | |
| 'mosaic': loop_results['outputs']['mosaic'] | |
| } | |
| # Calculate comparison | |
| speedup = loop_time / vec_time if vec_time > 0 else 0 | |
| results['comparison'] = { | |
| 'speedup_factor': speedup, | |
| 'time_difference': loop_time - vec_time, | |
| 'vectorized_faster': vec_time < loop_time | |
| } | |
| # Restore original implementation | |
| self.config.impl = original_impl | |
| return results | |
| def benchmark_grid_sizes(self, image: Image.Image, grid_sizes: List[int]) -> Dict: | |
| """ | |
| Benchmark performance for different grid sizes. | |
| Args: | |
| image: Input PIL Image | |
| grid_sizes: List of grid sizes to test | |
| Returns: | |
| Dictionary with grid size performance results | |
| """ | |
| results = {} | |
| original_grid = self.config.grid | |
| original_out_w = self.config.out_w | |
| original_out_h = self.config.out_h | |
| for grid_size in grid_sizes: | |
| self.config.grid = grid_size | |
| # Calculate appropriate output dimensions | |
| aspect_ratio = image.width / image.height | |
| if aspect_ratio > 1: | |
| # Landscape | |
| self.config.out_w = (image.width // grid_size) * grid_size | |
| self.config.out_h = int(self.config.out_w / aspect_ratio // grid_size) * grid_size | |
| else: | |
| # Portrait | |
| self.config.out_h = (image.height // grid_size) * grid_size | |
| self.config.out_w = int(self.config.out_h * aspect_ratio // grid_size) * grid_size | |
| # Time the generation | |
| start_time = time.time() | |
| pipeline_results = self.run_full_pipeline(image) | |
| total_time = time.time() - start_time | |
| results[grid_size] = { | |
| 'processing_time': total_time, | |
| 'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}", | |
| 'total_tiles': grid_size * grid_size, | |
| 'tiles_per_second': (grid_size * grid_size) / total_time if total_time > 0 else 0, | |
| 'metrics': pipeline_results['metrics'] | |
| } | |
| # Restore original configuration | |
| self.config.grid = original_grid | |
| self.config.out_w = original_out_w | |
| self.config.out_h = original_out_h | |
| return results | |
| def analyze_performance_scaling(self, benchmark_results: Dict) -> Dict: | |
| """ | |
| Analyze how performance scales with grid size. | |
| Args: | |
| benchmark_results: Results from benchmark_grid_sizes | |
| Returns: | |
| Dictionary with scaling analysis | |
| """ | |
| grid_sizes = sorted(benchmark_results.keys()) | |
| processing_times = [benchmark_results[gs]['processing_time'] for gs in grid_sizes] | |
| total_tiles = [benchmark_results[gs]['total_tiles'] for gs in grid_sizes] | |
| tiles_per_second = [benchmark_results[gs]['tiles_per_second'] for gs in grid_sizes] | |
| # Calculate scaling factors | |
| scaling_analysis = { | |
| 'grid_sizes': grid_sizes, | |
| 'processing_times': processing_times, | |
| 'total_tiles': total_tiles, | |
| 'tiles_per_second': tiles_per_second, | |
| 'scaling_factors': {} | |
| } | |
| if len(grid_sizes) >= 2: | |
| # Calculate how processing time scales with number of tiles | |
| tile_ratio = total_tiles[-1] / total_tiles[0] | |
| time_ratio = processing_times[-1] / processing_times[0] | |
| scaling_analysis['scaling_factors'] = { | |
| 'tile_increase_ratio': tile_ratio, | |
| 'time_increase_ratio': time_ratio, | |
| 'scaling_efficiency': tile_ratio / time_ratio if time_ratio > 0 else 0, | |
| 'is_linear_scaling': abs(time_ratio - tile_ratio) / tile_ratio < 0.1 | |
| } | |
| return scaling_analysis | |
| def generate_report(self, image: Image.Image, benchmark_results: Optional[Dict] = None) -> str: | |
| """ | |
| Generate a comprehensive report of the mosaic generation process. | |
| Args: | |
| image: Input PIL Image | |
| benchmark_results: Optional benchmark results | |
| Returns: | |
| Formatted report string | |
| """ | |
| # Run full pipeline if not already done | |
| if not self.results: | |
| self.run_full_pipeline(image) | |
| report = [] | |
| report.append("=" * 60) | |
| report.append("MOSAIC GENERATION REPORT") | |
| report.append("=" * 60) | |
| # Configuration | |
| report.append("\nCONFIGURATION:") | |
| report.append(f"Grid Size: {self.config.grid}x{self.config.grid}") | |
| report.append(f"Tile Size: {self.config.tile_size}x{self.config.tile_size}") | |
| report.append(f"Output Resolution: {self.config.out_w}x{self.config.out_h}") | |
| report.append(f"Implementation: {self.config.impl.value}") | |
| report.append(f"Color Matching: {self.config.match_space.value}") | |
| report.append(f"Total Tiles: {self.config.grid ** 2}") | |
| # Processing Time | |
| report.append("\nPROCESSING TIME:") | |
| for stage, time_val in self.results['timing'].items(): | |
| report.append(f"{stage.replace('_', ' ').title()}: {time_val:.3f} seconds") | |
| # Quality Metrics | |
| report.append("\nQUALITY METRICS:") | |
| metrics = self.results['metrics'] | |
| interpretations = self.results['metrics_interpretation'] | |
| report.append(f"MSE: {metrics['mse']:.6f} ({interpretations['mse']})") | |
| report.append(f"PSNR: {metrics['psnr']:.2f} dB ({interpretations['psnr']})") | |
| report.append(f"SSIM: {metrics['ssim']:.4f} ({interpretations['ssim']})") | |
| report.append(f"RMSE: {metrics['rmse']:.6f}") | |
| report.append(f"MAE: {metrics['mae']:.6f}") | |
| # Benchmark Results | |
| if benchmark_results: | |
| report.append("\nBENCHMARK RESULTS:") | |
| for grid_size, result in benchmark_results.items(): | |
| report.append(f"Grid {grid_size}x{grid_size}:") | |
| report.append(f" Processing Time: {result['processing_time']:.3f}s") | |
| report.append(f" Tiles per Second: {result['tiles_per_second']:.1f}") | |
| report.append(f" Output Resolution: {result['output_resolution']}") | |
| report.append("\n" + "=" * 60) | |
| return "\n".join(report) | |