|
|
from __future__ import annotations |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from typing import Dict, List, Tuple, Optional |
|
|
import time |
|
|
from .config import Config, Implementation |
|
|
from .mosaic import MosaicGenerator |
|
|
from .metrics import calculate_comprehensive_metrics, interpret_metrics |
|
|
from .utils import pil_to_np, np_to_pil |
|
|
|
|
|
|
|
|
class MosaicPipeline: |
|
|
"""Complete pipeline for mosaic generation with performance analysis.""" |
|
|
|
|
|
def __init__(self, config: Config): |
|
|
self.config = config |
|
|
self.mosaic_generator = MosaicGenerator(config) |
|
|
self.results = {} |
|
|
|
|
|
def run_full_pipeline(self, image: Image.Image) -> Dict: |
|
|
""" |
|
|
Run the complete mosaic generation pipeline. |
|
|
|
|
|
Args: |
|
|
image: Input PIL Image |
|
|
|
|
|
Returns: |
|
|
Dictionary with all results and metrics |
|
|
""" |
|
|
self.config.validate() |
|
|
|
|
|
results = { |
|
|
'input_image': image, |
|
|
'config': self.config.__dict__.copy(), |
|
|
'timing': {}, |
|
|
'metrics': {}, |
|
|
'outputs': {} |
|
|
} |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
mosaic_img, stats = self.mosaic_generator.generate_mosaic(image) |
|
|
results['timing'] = stats['processing_time'] |
|
|
results['outputs']['mosaic'] = mosaic_img |
|
|
|
|
|
|
|
|
metrics_start = time.time() |
|
|
metrics = calculate_comprehensive_metrics(image, mosaic_img) |
|
|
results['metrics'] = metrics |
|
|
results['metrics_interpretation'] = interpret_metrics(metrics) |
|
|
results['timing']['metrics_calculation'] = time.time() - metrics_start |
|
|
|
|
|
|
|
|
results['outputs']['processed_image'] = self.mosaic_generator.preprocess_image(image) |
|
|
results['grid_info'] = { |
|
|
'grid_size': self.config.grid, |
|
|
'tile_size': self.config.tile_size, |
|
|
'total_tiles': self.config.grid ** 2 |
|
|
} |
|
|
|
|
|
self.results = results |
|
|
return results |
|
|
|
|
|
def benchmark_implementations(self, image: Image.Image) -> Dict: |
|
|
""" |
|
|
Compare vectorized vs loop-based implementations. |
|
|
|
|
|
Args: |
|
|
image: Input PIL Image |
|
|
|
|
|
Returns: |
|
|
Dictionary with performance comparison |
|
|
""" |
|
|
original_impl = self.config.impl |
|
|
|
|
|
results = { |
|
|
'vectorized': {}, |
|
|
'loop_based': {}, |
|
|
'comparison': {} |
|
|
} |
|
|
|
|
|
|
|
|
self.config.impl = Implementation.VECT |
|
|
start_time = time.time() |
|
|
vec_results = self.run_full_pipeline(image) |
|
|
vec_time = time.time() - start_time |
|
|
|
|
|
results['vectorized'] = { |
|
|
'processing_time': vec_time, |
|
|
'metrics': vec_results['metrics'], |
|
|
'mosaic': vec_results['outputs']['mosaic'] |
|
|
} |
|
|
|
|
|
|
|
|
self.config.impl = Implementation.LOOPS |
|
|
start_time = time.time() |
|
|
loop_results = self.run_full_pipeline(image) |
|
|
loop_time = time.time() - start_time |
|
|
|
|
|
results['loop_based'] = { |
|
|
'processing_time': loop_time, |
|
|
'metrics': loop_results['metrics'], |
|
|
'mosaic': loop_results['outputs']['mosaic'] |
|
|
} |
|
|
|
|
|
|
|
|
speedup = loop_time / vec_time if vec_time > 0 else 0 |
|
|
results['comparison'] = { |
|
|
'speedup_factor': speedup, |
|
|
'time_difference': loop_time - vec_time, |
|
|
'vectorized_faster': vec_time < loop_time |
|
|
} |
|
|
|
|
|
|
|
|
self.config.impl = original_impl |
|
|
|
|
|
return results |
|
|
|
|
|
def benchmark_grid_sizes(self, image: Image.Image, grid_sizes: List[int]) -> Dict: |
|
|
""" |
|
|
Benchmark performance for different grid sizes. |
|
|
|
|
|
Args: |
|
|
image: Input PIL Image |
|
|
grid_sizes: List of grid sizes to test |
|
|
|
|
|
Returns: |
|
|
Dictionary with grid size performance results |
|
|
""" |
|
|
results = {} |
|
|
original_grid = self.config.grid |
|
|
original_out_w = self.config.out_w |
|
|
original_out_h = self.config.out_h |
|
|
|
|
|
for grid_size in grid_sizes: |
|
|
self.config.grid = grid_size |
|
|
|
|
|
|
|
|
aspect_ratio = image.width / image.height |
|
|
if aspect_ratio > 1: |
|
|
|
|
|
self.config.out_w = (image.width // grid_size) * grid_size |
|
|
self.config.out_h = int(self.config.out_w / aspect_ratio // grid_size) * grid_size |
|
|
else: |
|
|
|
|
|
self.config.out_h = (image.height // grid_size) * grid_size |
|
|
self.config.out_w = int(self.config.out_h * aspect_ratio // grid_size) * grid_size |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
pipeline_results = self.run_full_pipeline(image) |
|
|
total_time = time.time() - start_time |
|
|
|
|
|
results[grid_size] = { |
|
|
'processing_time': total_time, |
|
|
'output_resolution': f"{pipeline_results['outputs']['mosaic'].width}x{pipeline_results['outputs']['mosaic'].height}", |
|
|
'total_tiles': grid_size * grid_size, |
|
|
'tiles_per_second': (grid_size * grid_size) / total_time if total_time > 0 else 0, |
|
|
'metrics': pipeline_results['metrics'] |
|
|
} |
|
|
|
|
|
|
|
|
self.config.grid = original_grid |
|
|
self.config.out_w = original_out_w |
|
|
self.config.out_h = original_out_h |
|
|
|
|
|
return results |
|
|
|
|
|
def analyze_performance_scaling(self, benchmark_results: Dict) -> Dict: |
|
|
""" |
|
|
Analyze how performance scales with grid size. |
|
|
|
|
|
Args: |
|
|
benchmark_results: Results from benchmark_grid_sizes |
|
|
|
|
|
Returns: |
|
|
Dictionary with scaling analysis |
|
|
""" |
|
|
grid_sizes = sorted(benchmark_results.keys()) |
|
|
processing_times = [benchmark_results[gs]['processing_time'] for gs in grid_sizes] |
|
|
total_tiles = [benchmark_results[gs]['total_tiles'] for gs in grid_sizes] |
|
|
tiles_per_second = [benchmark_results[gs]['tiles_per_second'] for gs in grid_sizes] |
|
|
|
|
|
|
|
|
scaling_analysis = { |
|
|
'grid_sizes': grid_sizes, |
|
|
'processing_times': processing_times, |
|
|
'total_tiles': total_tiles, |
|
|
'tiles_per_second': tiles_per_second, |
|
|
'scaling_factors': {} |
|
|
} |
|
|
|
|
|
if len(grid_sizes) >= 2: |
|
|
|
|
|
tile_ratio = total_tiles[-1] / total_tiles[0] |
|
|
time_ratio = processing_times[-1] / processing_times[0] |
|
|
|
|
|
scaling_analysis['scaling_factors'] = { |
|
|
'tile_increase_ratio': tile_ratio, |
|
|
'time_increase_ratio': time_ratio, |
|
|
'scaling_efficiency': tile_ratio / time_ratio if time_ratio > 0 else 0, |
|
|
'is_linear_scaling': abs(time_ratio - tile_ratio) / tile_ratio < 0.1 |
|
|
} |
|
|
|
|
|
return scaling_analysis |
|
|
|
|
|
def generate_report(self, image: Image.Image, benchmark_results: Optional[Dict] = None) -> str: |
|
|
""" |
|
|
Generate a comprehensive report of the mosaic generation process. |
|
|
|
|
|
Args: |
|
|
image: Input PIL Image |
|
|
benchmark_results: Optional benchmark results |
|
|
|
|
|
Returns: |
|
|
Formatted report string |
|
|
""" |
|
|
|
|
|
if not self.results: |
|
|
self.run_full_pipeline(image) |
|
|
|
|
|
report = [] |
|
|
report.append("=" * 60) |
|
|
report.append("MOSAIC GENERATION REPORT") |
|
|
report.append("=" * 60) |
|
|
|
|
|
|
|
|
report.append("\nCONFIGURATION:") |
|
|
report.append(f"Grid Size: {self.config.grid}x{self.config.grid}") |
|
|
report.append(f"Tile Size: {self.config.tile_size}x{self.config.tile_size}") |
|
|
report.append(f"Output Resolution: {self.config.out_w}x{self.config.out_h}") |
|
|
report.append(f"Implementation: {self.config.impl.value}") |
|
|
report.append(f"Color Matching: {self.config.match_space.value}") |
|
|
report.append(f"Total Tiles: {self.config.grid ** 2}") |
|
|
|
|
|
|
|
|
report.append("\nPROCESSING TIME:") |
|
|
for stage, time_val in self.results['timing'].items(): |
|
|
report.append(f"{stage.replace('_', ' ').title()}: {time_val:.3f} seconds") |
|
|
|
|
|
|
|
|
report.append("\nQUALITY METRICS:") |
|
|
metrics = self.results['metrics'] |
|
|
interpretations = self.results['metrics_interpretation'] |
|
|
|
|
|
report.append(f"MSE: {metrics['mse']:.6f} ({interpretations['mse']})") |
|
|
report.append(f"PSNR: {metrics['psnr']:.2f} dB ({interpretations['psnr']})") |
|
|
report.append(f"SSIM: {metrics['ssim']:.4f} ({interpretations['ssim']})") |
|
|
report.append(f"RMSE: {metrics['rmse']:.6f}") |
|
|
report.append(f"MAE: {metrics['mae']:.6f}") |
|
|
|
|
|
|
|
|
if benchmark_results: |
|
|
report.append("\nBENCHMARK RESULTS:") |
|
|
for grid_size, result in benchmark_results.items(): |
|
|
report.append(f"Grid {grid_size}x{grid_size}:") |
|
|
report.append(f" Processing Time: {result['processing_time']:.3f}s") |
|
|
report.append(f" Tiles per Second: {result['tiles_per_second']:.1f}") |
|
|
report.append(f" Output Resolution: {result['output_resolution']}") |
|
|
|
|
|
report.append("\n" + "=" * 60) |
|
|
|
|
|
return "\n".join(report) |
|
|
|