Spaces:
Sleeping
Sleeping
| """ | |
| Performance metrics and timing utilities for the mosaic generator. | |
| """ | |
| import numpy as np | |
| import cv2 | |
| from skimage.metrics import structural_similarity as ssim | |
| import time | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| from PIL import Image | |
| import cProfile | |
| import pstats | |
| import io | |
| import os | |
| try: | |
| from line_profiler import LineProfiler | |
| except ImportError: # pragma: no cover - optional dependency | |
| LineProfiler = None | |
| def mse(img1: np.ndarray, img2: np.ndarray) -> float: | |
| """Calculate mean squared error between two images. | |
| Args: | |
| img1 (np.ndarray): Reference image. | |
| img2 (np.ndarray): Comparison image. | |
| Returns: | |
| float: Mean squared error value. | |
| """ | |
| return float(np.mean((img1.astype(np.float32) - img2.astype(np.float32))**2)) | |
| def ssim_metric(img1: np.ndarray, img2: np.ndarray) -> float: | |
| """Return the structural similarity index (SSIM) for two images.""" | |
| img1_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) | |
| img2_gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) | |
| ssim_value = ssim(img1_gray, img2_gray) | |
| if isinstance(ssim_value, tuple): | |
| return float(ssim_value[0]) | |
| return float(ssim_value) | |
| def timed(func): | |
| """Decorator that returns both function output and elapsed time. | |
| Args: | |
| func (Callable): Function to wrap. | |
| Returns: | |
| Callable: Wrapper returning ``(result, seconds)``. | |
| """ | |
| def wrapper(*args, **kwargs): | |
| start = time.time() | |
| result = func(*args, **kwargs) | |
| elapsed = time.time() - start | |
| return result, elapsed | |
| return wrapper | |
| def create_performance_report( | |
| actual_grid_size: int, | |
| cell_size: int, | |
| n_colors: int, | |
| actual_image_size: int, | |
| preprocess_time: float, | |
| seg_time: float, | |
| tile_load_time: float, | |
| mosaic_time: float, | |
| metrics_time: float, | |
| total_time: float, | |
| mse_val: float, | |
| ssim_val: float, | |
| ) -> str: | |
| """Generate a formatted performance report string for display.""" | |
| performance_info = f""" | |
| π PERFORMANCE METRICS | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| π§ CONFIGURATION | |
| β’ Grid Size: {actual_grid_size}Γ{actual_grid_size} ({actual_grid_size**2:,} cells) | |
| β’ Cell Size: {cell_size}Γ{cell_size} pixels | |
| β’ Colors: {n_colors} quantized colors | |
| β’ Image Size: {actual_image_size}Γ{actual_image_size} pixels | |
| β±οΈ TIMING BREAKDOWN | |
| β’ Preprocessing: {preprocess_time:.3f}s ({preprocess_time/total_time*100:.1f}%) | |
| β’ Grid Segmentation: {seg_time:.3f}s ({seg_time/total_time*100:.1f}%) | |
| β’ Tile Loading: {tile_load_time:.3f}s ({tile_load_time/total_time*100:.1f}%) | |
| β’ Tile Mapping: {mosaic_time:.3f}s ({mosaic_time/total_time*100:.1f}%) | |
| β’ Metrics Calculation: {metrics_time:.3f}s ({metrics_time/total_time*100:.1f}%) | |
| β’ TOTAL TIME: {total_time:.3f}s | |
| π SIMILARITY METRICS | |
| β’ MSE: {mse_val:.2f} (Lower = Better, 0 = Perfect) | |
| β’ SSIM: {ssim_val:.3f} (Higher = Better, 1 = Perfect) | |
| π― QUALITY ASSESSMENT | |
| β’ MSE Quality: {'Excellent' if mse_val < 100 else 'Good' if mse_val < 500 else 'Fair' if mse_val < 1000 else 'Poor'} | |
| β’ SSIM Quality: {'Excellent' if ssim_val > 0.9 else 'Good' if ssim_val > 0.8 else 'Fair' if ssim_val > 0.6 else 'Poor'} | |
| β‘ PERFORMANCE ANALYSIS | |
| β’ Cells/second: {(actual_grid_size**2)/total_time:.0f} | |
| β’ Efficiency: {'High' if total_time < 2 else 'Medium' if total_time < 5 else 'Low'} | |
| β’ Memory Usage: ~{actual_image_size*actual_image_size*3/1024/1024:.1f}MB | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| return performance_info | |
| def run_performance_benchmark( | |
| preprocess_func, | |
| segment_func, | |
| load_tiles_func, | |
| map_tiles_func, | |
| image: Image.Image, | |
| n_colors: int = 16, | |
| image_sizes=None, | |
| grid_sizes=None, | |
| ): | |
| """Run comprehensive performance benchmark across combinations of grid and image sizes. | |
| Args: | |
| preprocess_func (dict): Dict containing ``resize`` and ``quantize`` callables. | |
| segment_func (Callable): Grid segmentation function. | |
| load_tiles_func (Callable): Tile loader. | |
| map_tiles_func (Callable): Tile mapping function. | |
| image (PIL.Image.Image): Source image. | |
| n_colors (int, optional): Number of quantized colors. Defaults to 16. | |
| image_sizes (list[int] | None): Sizes to test. | |
| grid_sizes (list[int] | None): Grid sizes to test. | |
| Returns: | |
| tuple[str, str]: Tuple containing chart path and textual report. | |
| """ | |
| if image is None: | |
| return None, "Please upload an image first!" | |
| image_sizes = image_sizes or [400] | |
| grid_sizes = grid_sizes or [8, 16, 24, 32, 48, 64, 80, 96, 128] | |
| results = [] | |
| overall_profile_stats = None | |
| line_profile_report = None | |
| line_profile_config = None | |
| width, height = image.size | |
| if width != height: | |
| min_size = min(width, height) | |
| left = (width - min_size) // 2 | |
| top = (height - min_size) // 2 | |
| image = image.crop((left, top, left + min_size, top + min_size)) | |
| for img_size in image_sizes: | |
| resized_img = preprocess_func['resize'](image.copy(), img_size) | |
| quantized_img, color_centers = preprocess_func['quantize'](resized_img, n_colors) | |
| resized_np = np.array(quantized_img) | |
| if resized_np.shape[-1] == 3: | |
| resized_np = cv2.cvtColor(resized_np, cv2.COLOR_RGB2BGR) | |
| for grid_size in grid_sizes: | |
| profiler = cProfile.Profile() | |
| profiler.enable() | |
| start_time = time.time() | |
| cell_size = max(1, img_size // grid_size) | |
| actual_grid_size = max(1, img_size // cell_size) | |
| actual_image_size = actual_grid_size * cell_size | |
| resized_np_cropped = resized_np[:actual_image_size, :actual_image_size] | |
| grid_labels, seg_time = timed(segment_func)(resized_np_cropped, actual_grid_size, color_centers) | |
| start_tile = time.time() | |
| tiles, tile_colors = load_tiles_func('data/tiles', n_colors, cell_size) | |
| tile_time = time.time() - start_tile | |
| mosaic_img, mosaic_time = timed(map_tiles_func)(grid_labels, tiles, cell_size, color_centers, tile_colors) | |
| total_time = time.time() - start_time | |
| profiler.disable() | |
| mse_val = mse(resized_np_cropped, mosaic_img) | |
| ssim_val = ssim_metric(resized_np_cropped, mosaic_img) | |
| profile_stats = pstats.Stats(profiler).strip_dirs() | |
| top_funcs = summarize_top_functions(profile_stats) | |
| if overall_profile_stats is None: | |
| overall_profile_stats = profile_stats | |
| else: | |
| overall_profile_stats.add(profile_stats) | |
| config_label = f"{img_size}px | {grid_size}x{grid_size}" | |
| results.append({ | |
| 'Image Size (px)': img_size, | |
| 'Grid Size (requested)': grid_size, | |
| 'Actual Grid Size': f"{actual_grid_size}x{actual_grid_size}", | |
| 'Grid Cells': actual_grid_size**2, | |
| 'Cell Size (px)': cell_size, | |
| 'Segmentation (s)': seg_time, | |
| 'Tile Loading (s)': tile_time, | |
| 'Tile Mapping (s)': mosaic_time, | |
| 'Total Time (s)': total_time, | |
| 'Cells/sec': actual_grid_size**2 / total_time, | |
| 'MSE': mse_val, | |
| 'SSIM': ssim_val, | |
| 'Configuration': config_label, | |
| 'Profile Summary': top_funcs, | |
| }) | |
| profile_report = format_overall_profile_report(overall_profile_stats) | |
| if line_profile_report is None: | |
| sample_image_size = min(image_sizes) | |
| sample_grid_size = min(grid_sizes) | |
| line_profile_report = run_line_profile_analysis( | |
| preprocess_func, | |
| segment_func, | |
| load_tiles_func, | |
| map_tiles_func, | |
| image, | |
| n_colors, | |
| sample_image_size, | |
| sample_grid_size | |
| ) | |
| line_profile_config = (sample_image_size, sample_grid_size) | |
| return create_benchmark_plots(results, profile_report, line_profile_report, line_profile_config) | |
| def summarize_top_functions(stats_obj, limit=5): | |
| """Summarize the most expensive functions from ``pstats`` output. | |
| Args: | |
| stats_obj (pstats.Stats | None): Profile statistics object. | |
| limit (int, optional): Maximum rows to include. Defaults to 5. | |
| Returns: | |
| str: Human readable summary. | |
| """ | |
| if stats_obj is None: | |
| return "No profiling data" | |
| stats_items = stats_obj.stats.items() | |
| top_funcs = sorted(stats_items, key=lambda item: item[1][3], reverse=True)[:limit] | |
| lines = [] | |
| for (filename, line, funcname), (_, _, _, cumtime, _) in top_funcs: | |
| func_label = f"{funcname} ({os.path.basename(filename)}:{line})" | |
| lines.append(f"{func_label} β {cumtime:.4f}s") | |
| return "\n".join(lines) | |
| def format_overall_profile_report(stats_obj, limit=10): | |
| """Return formatted ``pstats`` output for the aggregate benchmark run. | |
| Args: | |
| stats_obj (pstats.Stats | None): Combined stats. | |
| limit (int, optional): Maximum number of rows. Defaults to 10. | |
| Returns: | |
| str: Printable statistics table. | |
| """ | |
| if stats_obj is None: | |
| return "" | |
| stream = io.StringIO() | |
| stats_obj.strip_dirs().sort_stats('cumulative') | |
| stats_obj.stream = stream | |
| stats_obj.print_stats(limit) | |
| return stream.getvalue() | |
| def run_line_profile_analysis( | |
| preprocess_func, | |
| segment_func, | |
| load_tiles_func, | |
| map_tiles_func, | |
| image: Image.Image, | |
| n_colors: int, | |
| img_size: int, | |
| grid_size: int, | |
| ): | |
| """Execute ``line_profiler`` for a sample configuration. | |
| Args: | |
| preprocess_func (dict): Dict containing preprocess callables. | |
| segment_func (Callable): Grid segmentation function. | |
| load_tiles_func (Callable): Tile loader. | |
| map_tiles_func (Callable): Tile mapper. | |
| image (PIL.Image.Image): Base image. | |
| n_colors (int): Number of quantized colors. | |
| img_size (int): Image size for profiling scenario. | |
| grid_size (int): Grid size for profiling scenario. | |
| Returns: | |
| str: Line-profiler text output. | |
| """ | |
| if LineProfiler is None: | |
| return "line_profiler not installed. Run `pip install line_profiler` to enable." | |
| square_img = image | |
| if image.size[0] != image.size[1]: | |
| min_size = min(image.size) | |
| left = (image.size[0] - min_size) // 2 | |
| top = (image.size[1] - min_size) // 2 | |
| square_img = image.crop((left, top, left + min_size, top + min_size)) | |
| lp = LineProfiler() | |
| lp.add_function(preprocess_func['resize']) | |
| lp.add_function(segment_func) | |
| lp.add_function(load_tiles_func) | |
| lp.add_function(map_tiles_func) | |
| def profiled_pipeline(): | |
| resized_img = preprocess_func['resize'](square_img.copy(), img_size) | |
| quantized_img, color_centers = preprocess_func['quantize'](resized_img, n_colors) | |
| resized_np = np.array(quantized_img) | |
| if resized_np.shape[-1] == 3: | |
| resized_np = cv2.cvtColor(resized_np, cv2.COLOR_RGB2BGR) | |
| cell_size = max(1, img_size // grid_size) | |
| actual_grid_size = max(1, img_size // cell_size) | |
| actual_image_size = actual_grid_size * cell_size | |
| resized_np_cropped = resized_np[:actual_image_size, :actual_image_size] | |
| seg_result = segment_func(resized_np_cropped, actual_grid_size, color_centers) | |
| grid_labels = seg_result[0] if isinstance(seg_result, (tuple, list)) else seg_result | |
| tiles, tile_colors = load_tiles_func('data/tiles', n_colors, cell_size) | |
| map_tiles_func(grid_labels, tiles, cell_size, color_centers, tile_colors) | |
| profiled_run = lp(profiled_pipeline) | |
| profiled_run() | |
| stream = io.StringIO() | |
| lp.print_stats(stream=stream) | |
| return stream.getvalue() | |
| def create_benchmark_plots(results, profile_report=None, line_profile_report=None, line_profile_config=None): | |
| """Create performance benchmark visualization plots. | |
| Args: | |
| results (list[dict]): Measurements from ``run_performance_benchmark``. | |
| profile_report (str | None): Aggregate cProfile table. | |
| line_profile_report (str | None): Line-profiler snippet. | |
| line_profile_config (tuple[int, int] | None): Image/grid pair used for profiling. | |
| Returns: | |
| tuple[str, str]: Path to the saved plot image and textual summary. | |
| """ | |
| fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10)) | |
| cells = [r['Grid Cells'] for r in results] | |
| total_times = [r['Total Time (s)'] for r in results] | |
| seg_times = [r['Segmentation (s)'] for r in results] | |
| mapping_times = [r['Tile Mapping (s)'] for r in results] | |
| mse_vals = [r['MSE'] for r in results] | |
| ssim_vals = [r['SSIM'] for r in results] | |
| config_labels = [r['Configuration'] for r in results] | |
| image_sizes = [r['Image Size (px)'] for r in results] | |
| unique_sizes = sorted(set(image_sizes)) | |
| color_map = plt.get_cmap('viridis', len(unique_sizes)) | |
| colors = [color_map(unique_sizes.index(size)) for size in image_sizes] | |
| # Plot 1: Total Time vs Grid Size | |
| for size in unique_sizes: | |
| indices = [i for i, s in enumerate(image_sizes) if s == size] | |
| ax1.plot( | |
| [cells[i] for i in indices], | |
| [total_times[i] for i in indices], | |
| marker='o', | |
| linewidth=2, | |
| markersize=6, | |
| color=color_map(unique_sizes.index(size)), | |
| label=f'{size}px' | |
| ) | |
| ax1.set_xlabel('Number of Cells') | |
| ax1.set_ylabel('Total Time (seconds)') | |
| ax1.set_title('Performance Scaling') | |
| ax1.grid(True, alpha=0.3) | |
| ax1.set_xscale('log') | |
| ax1.legend(title='Image Size (px)') | |
| # Plot 2: Time Breakdown | |
| width = 0.35 | |
| x = np.arange(len(results)) | |
| ax2.bar(x - width/2, seg_times, width, label='Segmentation', alpha=0.8) | |
| ax2.bar(x + width/2, mapping_times, width, label='Tile Mapping', alpha=0.8) | |
| ax2.set_xlabel('Grid Configuration') | |
| ax2.set_ylabel('Time (seconds)') | |
| ax2.set_title('Time Breakdown by Operation') | |
| ax2.set_xticks(x) | |
| ax2.set_xticklabels(config_labels, rotation=45) | |
| ax2.legend() | |
| ax2.grid(True, alpha=0.3) | |
| # Plot 3: MSE vs Grid Size | |
| for size in unique_sizes: | |
| indices = [i for i, s in enumerate(image_sizes) if s == size] | |
| ax3.plot( | |
| [cells[i] for i in indices], | |
| [mse_vals[i] for i in indices], | |
| marker='o', | |
| linewidth=2, | |
| markersize=6, | |
| color=color_map(unique_sizes.index(size)), | |
| label=f'{size}px' | |
| ) | |
| ax3.set_xlabel('Number of Cells') | |
| ax3.set_ylabel('MSE (Lower = Better)') | |
| ax3.set_title('Image Quality (MSE)') | |
| ax3.grid(True, alpha=0.3) | |
| ax3.set_xscale('log') | |
| ax3.legend(title='Image Size (px)', loc='upper right') | |
| # Plot 4: SSIM vs Grid Size | |
| for size in unique_sizes: | |
| indices = [i for i, s in enumerate(image_sizes) if s == size] | |
| ax4.plot( | |
| [cells[i] for i in indices], | |
| [ssim_vals[i] for i in indices], | |
| marker='o', | |
| linewidth=2, | |
| markersize=6, | |
| color=color_map(unique_sizes.index(size)), | |
| label=f'{size}px' | |
| ) | |
| ax4.set_xlabel('Number of Cells') | |
| ax4.set_ylabel('SSIM (Higher = Better)') | |
| ax4.set_title('Image Quality (SSIM)') | |
| ax4.grid(True, alpha=0.3) | |
| ax4.set_xscale('log') | |
| ax4.legend(title='Image Size (px)', loc='lower right') | |
| plt.tight_layout() | |
| plt.savefig('performance_benchmark.png', dpi=150, bbox_inches='tight') | |
| plt.close() | |
| # Create summary report | |
| df = pd.DataFrame(results) | |
| summary = create_benchmark_summary( | |
| results, | |
| mse_vals, | |
| ssim_vals, | |
| df, | |
| profile_report, | |
| line_profile_report, | |
| line_profile_config | |
| ) | |
| return 'performance_benchmark.png', summary | |
| def create_benchmark_summary( | |
| results, | |
| mse_vals, | |
| ssim_vals, | |
| df, | |
| profile_report=None, | |
| line_profile_report=None, | |
| line_profile_config=None | |
| ): | |
| """Create a textual benchmark summary with profiling data. | |
| Args: | |
| results (list[dict]): Raw measurement entries. | |
| mse_vals (list[float]): MSE metrics per configuration. | |
| ssim_vals (list[float]): SSIM metrics per configuration. | |
| df (pandas.DataFrame): Tabular version of ``results``. | |
| profile_report (str | None): Aggregate cProfile output. | |
| line_profile_report (str | None): Line-profiler output. | |
| line_profile_config (tuple[int, int] | None): Profiling configuration detail. | |
| Returns: | |
| str: Markdown-formatted summary text. | |
| """ | |
| fastest = min(results, key=lambda x: x['Total Time (s)']) | |
| best_quality = max(results, key=lambda x: x['SSIM']) | |
| best_balance = min(results, key=lambda x: x['Total Time (s)'] * (1 - x['SSIM'])) | |
| pivot = df.pivot_table( | |
| index='Image Size (px)', | |
| columns='Grid Size (requested)', | |
| values='Total Time (s)', | |
| aggfunc='mean' | |
| ).sort_index().sort_index(axis=1) | |
| df_display = df[ | |
| [ | |
| 'Image Size (px)', | |
| 'Grid Size (requested)', | |
| 'Actual Grid Size', | |
| 'Cell Size (px)', | |
| 'Segmentation (s)', | |
| 'Tile Loading (s)', | |
| 'Tile Mapping (s)', | |
| 'Total Time (s)', | |
| 'MSE', | |
| 'SSIM' | |
| ] | |
| ] | |
| per_config_profile = "\n".join( | |
| [ | |
| f" β’ {r['Image Size (px)']}px | grid {r['Grid Size (requested)']}:\n " | |
| + r.get('Profile Summary', 'No profiling data').replace('\n', '\n ') | |
| for r in results | |
| ] | |
| ) | |
| line_profile_config_str = ( | |
| f"{line_profile_config[0]}px image, grid {line_profile_config[1]}" | |
| if line_profile_config else "n/a" | |
| ) | |
| summary = f""" | |
| π¬ PERFORMANCE BENCHMARK RESULTS | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| π PERFORMANCE HIGHLIGHTS: | |
| β’ Fastest Run: {fastest['Image Size (px)']}px image, grid {fastest['Grid Size (requested)']} (Actual {fastest['Actual Grid Size']}) β {fastest['Total Time (s)']:.3f}s | |
| β’ Highest Quality: {best_quality['Image Size (px)']}px image, grid {best_quality['Grid Size (requested)']} β SSIM {best_quality['SSIM']:.3f} | |
| β’ Best Balance: {best_balance['Image Size (px)']}px image, grid {best_balance['Grid Size (requested)']} β {best_balance['Total Time (s)']:.3f}s / SSIM {best_balance['SSIM']:.3f} | |
| π GLOBAL TRENDS: | |
| β’ Processing efficiency: {np.mean([r['Cells/sec'] for r in results]):.0f} cells/sec on average | |
| β’ MSE range: {min(mse_vals):.1f}β{max(mse_vals):.1f} | |
| β’ SSIM range: {min(ssim_vals):.3f}β{max(ssim_vals):.3f} | |
| π TOTAL TIME (s) BY IMAGE & GRID | |
| {pivot.to_string(float_format='%.3f')} | |
| π DETAILED MEASUREMENTS | |
| {df_display.to_string(index=False, float_format='%.3f')} | |
| π§ FUNCTION-LEVEL PROFILING (aggregate top functions) | |
| {profile_report if profile_report else 'No profiling data collected'} | |
| π§© PER-CONFIGURATION TOP FUNCTIONS | |
| {per_config_profile} | |
| 𧬠LINE PROFILER (sample run: {line_profile_config_str}) | |
| {line_profile_report if line_profile_report else 'Line profiler unavailable'} | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| return summary | |