ImageMosaicGenerator / logic /perfMetric.py
meryadri's picture
code improvements
388efcc
"""
Performance metrics and timing utilities for the mosaic generator.
"""
import numpy as np
import cv2
from skimage.metrics import structural_similarity as ssim
import time
import matplotlib.pyplot as plt
import pandas as pd
from PIL import Image
import cProfile
import pstats
import io
import os
try:
from line_profiler import LineProfiler
except ImportError: # pragma: no cover - optional dependency
LineProfiler = None
def mse(img1: np.ndarray, img2: np.ndarray) -> float:
"""Calculate mean squared error between two images.
Args:
img1 (np.ndarray): Reference image.
img2 (np.ndarray): Comparison image.
Returns:
float: Mean squared error value.
"""
return float(np.mean((img1.astype(np.float32) - img2.astype(np.float32))**2))
def ssim_metric(img1: np.ndarray, img2: np.ndarray) -> float:
"""Return the structural similarity index (SSIM) for two images."""
img1_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
img2_gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
ssim_value = ssim(img1_gray, img2_gray)
if isinstance(ssim_value, tuple):
return float(ssim_value[0])
return float(ssim_value)
def timed(func):
"""Decorator that returns both function output and elapsed time.
Args:
func (Callable): Function to wrap.
Returns:
Callable: Wrapper returning ``(result, seconds)``.
"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
return result, elapsed
return wrapper
def create_performance_report(
actual_grid_size: int,
cell_size: int,
n_colors: int,
actual_image_size: int,
preprocess_time: float,
seg_time: float,
tile_load_time: float,
mosaic_time: float,
metrics_time: float,
total_time: float,
mse_val: float,
ssim_val: float,
) -> str:
"""Generate a formatted performance report string for display."""
performance_info = f"""
πŸ“Š PERFORMANCE METRICS
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
πŸ”§ CONFIGURATION
β€’ Grid Size: {actual_grid_size}Γ—{actual_grid_size} ({actual_grid_size**2:,} cells)
β€’ Cell Size: {cell_size}Γ—{cell_size} pixels
β€’ Colors: {n_colors} quantized colors
β€’ Image Size: {actual_image_size}Γ—{actual_image_size} pixels
⏱️ TIMING BREAKDOWN
β€’ Preprocessing: {preprocess_time:.3f}s ({preprocess_time/total_time*100:.1f}%)
β€’ Grid Segmentation: {seg_time:.3f}s ({seg_time/total_time*100:.1f}%)
β€’ Tile Loading: {tile_load_time:.3f}s ({tile_load_time/total_time*100:.1f}%)
β€’ Tile Mapping: {mosaic_time:.3f}s ({mosaic_time/total_time*100:.1f}%)
β€’ Metrics Calculation: {metrics_time:.3f}s ({metrics_time/total_time*100:.1f}%)
β€’ TOTAL TIME: {total_time:.3f}s
πŸ“ˆ SIMILARITY METRICS
β€’ MSE: {mse_val:.2f} (Lower = Better, 0 = Perfect)
β€’ SSIM: {ssim_val:.3f} (Higher = Better, 1 = Perfect)
🎯 QUALITY ASSESSMENT
β€’ MSE Quality: {'Excellent' if mse_val < 100 else 'Good' if mse_val < 500 else 'Fair' if mse_val < 1000 else 'Poor'}
β€’ SSIM Quality: {'Excellent' if ssim_val > 0.9 else 'Good' if ssim_val > 0.8 else 'Fair' if ssim_val > 0.6 else 'Poor'}
⚑ PERFORMANCE ANALYSIS
β€’ Cells/second: {(actual_grid_size**2)/total_time:.0f}
β€’ Efficiency: {'High' if total_time < 2 else 'Medium' if total_time < 5 else 'Low'}
β€’ Memory Usage: ~{actual_image_size*actual_image_size*3/1024/1024:.1f}MB
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
return performance_info
def run_performance_benchmark(
preprocess_func,
segment_func,
load_tiles_func,
map_tiles_func,
image: Image.Image,
n_colors: int = 16,
image_sizes=None,
grid_sizes=None,
):
"""Run comprehensive performance benchmark across combinations of grid and image sizes.
Args:
preprocess_func (dict): Dict containing ``resize`` and ``quantize`` callables.
segment_func (Callable): Grid segmentation function.
load_tiles_func (Callable): Tile loader.
map_tiles_func (Callable): Tile mapping function.
image (PIL.Image.Image): Source image.
n_colors (int, optional): Number of quantized colors. Defaults to 16.
image_sizes (list[int] | None): Sizes to test.
grid_sizes (list[int] | None): Grid sizes to test.
Returns:
tuple[str, str]: Tuple containing chart path and textual report.
"""
if image is None:
return None, "Please upload an image first!"
image_sizes = image_sizes or [400]
grid_sizes = grid_sizes or [8, 16, 24, 32, 48, 64, 80, 96, 128]
results = []
overall_profile_stats = None
line_profile_report = None
line_profile_config = None
width, height = image.size
if width != height:
min_size = min(width, height)
left = (width - min_size) // 2
top = (height - min_size) // 2
image = image.crop((left, top, left + min_size, top + min_size))
for img_size in image_sizes:
resized_img = preprocess_func['resize'](image.copy(), img_size)
quantized_img, color_centers = preprocess_func['quantize'](resized_img, n_colors)
resized_np = np.array(quantized_img)
if resized_np.shape[-1] == 3:
resized_np = cv2.cvtColor(resized_np, cv2.COLOR_RGB2BGR)
for grid_size in grid_sizes:
profiler = cProfile.Profile()
profiler.enable()
start_time = time.time()
cell_size = max(1, img_size // grid_size)
actual_grid_size = max(1, img_size // cell_size)
actual_image_size = actual_grid_size * cell_size
resized_np_cropped = resized_np[:actual_image_size, :actual_image_size]
grid_labels, seg_time = timed(segment_func)(resized_np_cropped, actual_grid_size, color_centers)
start_tile = time.time()
tiles, tile_colors = load_tiles_func('data/tiles', n_colors, cell_size)
tile_time = time.time() - start_tile
mosaic_img, mosaic_time = timed(map_tiles_func)(grid_labels, tiles, cell_size, color_centers, tile_colors)
total_time = time.time() - start_time
profiler.disable()
mse_val = mse(resized_np_cropped, mosaic_img)
ssim_val = ssim_metric(resized_np_cropped, mosaic_img)
profile_stats = pstats.Stats(profiler).strip_dirs()
top_funcs = summarize_top_functions(profile_stats)
if overall_profile_stats is None:
overall_profile_stats = profile_stats
else:
overall_profile_stats.add(profile_stats)
config_label = f"{img_size}px | {grid_size}x{grid_size}"
results.append({
'Image Size (px)': img_size,
'Grid Size (requested)': grid_size,
'Actual Grid Size': f"{actual_grid_size}x{actual_grid_size}",
'Grid Cells': actual_grid_size**2,
'Cell Size (px)': cell_size,
'Segmentation (s)': seg_time,
'Tile Loading (s)': tile_time,
'Tile Mapping (s)': mosaic_time,
'Total Time (s)': total_time,
'Cells/sec': actual_grid_size**2 / total_time,
'MSE': mse_val,
'SSIM': ssim_val,
'Configuration': config_label,
'Profile Summary': top_funcs,
})
profile_report = format_overall_profile_report(overall_profile_stats)
if line_profile_report is None:
sample_image_size = min(image_sizes)
sample_grid_size = min(grid_sizes)
line_profile_report = run_line_profile_analysis(
preprocess_func,
segment_func,
load_tiles_func,
map_tiles_func,
image,
n_colors,
sample_image_size,
sample_grid_size
)
line_profile_config = (sample_image_size, sample_grid_size)
return create_benchmark_plots(results, profile_report, line_profile_report, line_profile_config)
def summarize_top_functions(stats_obj, limit=5):
"""Summarize the most expensive functions from ``pstats`` output.
Args:
stats_obj (pstats.Stats | None): Profile statistics object.
limit (int, optional): Maximum rows to include. Defaults to 5.
Returns:
str: Human readable summary.
"""
if stats_obj is None:
return "No profiling data"
stats_items = stats_obj.stats.items()
top_funcs = sorted(stats_items, key=lambda item: item[1][3], reverse=True)[:limit]
lines = []
for (filename, line, funcname), (_, _, _, cumtime, _) in top_funcs:
func_label = f"{funcname} ({os.path.basename(filename)}:{line})"
lines.append(f"{func_label} β€” {cumtime:.4f}s")
return "\n".join(lines)
def format_overall_profile_report(stats_obj, limit=10):
"""Return formatted ``pstats`` output for the aggregate benchmark run.
Args:
stats_obj (pstats.Stats | None): Combined stats.
limit (int, optional): Maximum number of rows. Defaults to 10.
Returns:
str: Printable statistics table.
"""
if stats_obj is None:
return ""
stream = io.StringIO()
stats_obj.strip_dirs().sort_stats('cumulative')
stats_obj.stream = stream
stats_obj.print_stats(limit)
return stream.getvalue()
def run_line_profile_analysis(
preprocess_func,
segment_func,
load_tiles_func,
map_tiles_func,
image: Image.Image,
n_colors: int,
img_size: int,
grid_size: int,
):
"""Execute ``line_profiler`` for a sample configuration.
Args:
preprocess_func (dict): Dict containing preprocess callables.
segment_func (Callable): Grid segmentation function.
load_tiles_func (Callable): Tile loader.
map_tiles_func (Callable): Tile mapper.
image (PIL.Image.Image): Base image.
n_colors (int): Number of quantized colors.
img_size (int): Image size for profiling scenario.
grid_size (int): Grid size for profiling scenario.
Returns:
str: Line-profiler text output.
"""
if LineProfiler is None:
return "line_profiler not installed. Run `pip install line_profiler` to enable."
square_img = image
if image.size[0] != image.size[1]:
min_size = min(image.size)
left = (image.size[0] - min_size) // 2
top = (image.size[1] - min_size) // 2
square_img = image.crop((left, top, left + min_size, top + min_size))
lp = LineProfiler()
lp.add_function(preprocess_func['resize'])
lp.add_function(segment_func)
lp.add_function(load_tiles_func)
lp.add_function(map_tiles_func)
def profiled_pipeline():
resized_img = preprocess_func['resize'](square_img.copy(), img_size)
quantized_img, color_centers = preprocess_func['quantize'](resized_img, n_colors)
resized_np = np.array(quantized_img)
if resized_np.shape[-1] == 3:
resized_np = cv2.cvtColor(resized_np, cv2.COLOR_RGB2BGR)
cell_size = max(1, img_size // grid_size)
actual_grid_size = max(1, img_size // cell_size)
actual_image_size = actual_grid_size * cell_size
resized_np_cropped = resized_np[:actual_image_size, :actual_image_size]
seg_result = segment_func(resized_np_cropped, actual_grid_size, color_centers)
grid_labels = seg_result[0] if isinstance(seg_result, (tuple, list)) else seg_result
tiles, tile_colors = load_tiles_func('data/tiles', n_colors, cell_size)
map_tiles_func(grid_labels, tiles, cell_size, color_centers, tile_colors)
profiled_run = lp(profiled_pipeline)
profiled_run()
stream = io.StringIO()
lp.print_stats(stream=stream)
return stream.getvalue()
def create_benchmark_plots(results, profile_report=None, line_profile_report=None, line_profile_config=None):
"""Create performance benchmark visualization plots.
Args:
results (list[dict]): Measurements from ``run_performance_benchmark``.
profile_report (str | None): Aggregate cProfile table.
line_profile_report (str | None): Line-profiler snippet.
line_profile_config (tuple[int, int] | None): Image/grid pair used for profiling.
Returns:
tuple[str, str]: Path to the saved plot image and textual summary.
"""
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10))
cells = [r['Grid Cells'] for r in results]
total_times = [r['Total Time (s)'] for r in results]
seg_times = [r['Segmentation (s)'] for r in results]
mapping_times = [r['Tile Mapping (s)'] for r in results]
mse_vals = [r['MSE'] for r in results]
ssim_vals = [r['SSIM'] for r in results]
config_labels = [r['Configuration'] for r in results]
image_sizes = [r['Image Size (px)'] for r in results]
unique_sizes = sorted(set(image_sizes))
color_map = plt.get_cmap('viridis', len(unique_sizes))
colors = [color_map(unique_sizes.index(size)) for size in image_sizes]
# Plot 1: Total Time vs Grid Size
for size in unique_sizes:
indices = [i for i, s in enumerate(image_sizes) if s == size]
ax1.plot(
[cells[i] for i in indices],
[total_times[i] for i in indices],
marker='o',
linewidth=2,
markersize=6,
color=color_map(unique_sizes.index(size)),
label=f'{size}px'
)
ax1.set_xlabel('Number of Cells')
ax1.set_ylabel('Total Time (seconds)')
ax1.set_title('Performance Scaling')
ax1.grid(True, alpha=0.3)
ax1.set_xscale('log')
ax1.legend(title='Image Size (px)')
# Plot 2: Time Breakdown
width = 0.35
x = np.arange(len(results))
ax2.bar(x - width/2, seg_times, width, label='Segmentation', alpha=0.8)
ax2.bar(x + width/2, mapping_times, width, label='Tile Mapping', alpha=0.8)
ax2.set_xlabel('Grid Configuration')
ax2.set_ylabel('Time (seconds)')
ax2.set_title('Time Breakdown by Operation')
ax2.set_xticks(x)
ax2.set_xticklabels(config_labels, rotation=45)
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: MSE vs Grid Size
for size in unique_sizes:
indices = [i for i, s in enumerate(image_sizes) if s == size]
ax3.plot(
[cells[i] for i in indices],
[mse_vals[i] for i in indices],
marker='o',
linewidth=2,
markersize=6,
color=color_map(unique_sizes.index(size)),
label=f'{size}px'
)
ax3.set_xlabel('Number of Cells')
ax3.set_ylabel('MSE (Lower = Better)')
ax3.set_title('Image Quality (MSE)')
ax3.grid(True, alpha=0.3)
ax3.set_xscale('log')
ax3.legend(title='Image Size (px)', loc='upper right')
# Plot 4: SSIM vs Grid Size
for size in unique_sizes:
indices = [i for i, s in enumerate(image_sizes) if s == size]
ax4.plot(
[cells[i] for i in indices],
[ssim_vals[i] for i in indices],
marker='o',
linewidth=2,
markersize=6,
color=color_map(unique_sizes.index(size)),
label=f'{size}px'
)
ax4.set_xlabel('Number of Cells')
ax4.set_ylabel('SSIM (Higher = Better)')
ax4.set_title('Image Quality (SSIM)')
ax4.grid(True, alpha=0.3)
ax4.set_xscale('log')
ax4.legend(title='Image Size (px)', loc='lower right')
plt.tight_layout()
plt.savefig('performance_benchmark.png', dpi=150, bbox_inches='tight')
plt.close()
# Create summary report
df = pd.DataFrame(results)
summary = create_benchmark_summary(
results,
mse_vals,
ssim_vals,
df,
profile_report,
line_profile_report,
line_profile_config
)
return 'performance_benchmark.png', summary
def create_benchmark_summary(
results,
mse_vals,
ssim_vals,
df,
profile_report=None,
line_profile_report=None,
line_profile_config=None
):
"""Create a textual benchmark summary with profiling data.
Args:
results (list[dict]): Raw measurement entries.
mse_vals (list[float]): MSE metrics per configuration.
ssim_vals (list[float]): SSIM metrics per configuration.
df (pandas.DataFrame): Tabular version of ``results``.
profile_report (str | None): Aggregate cProfile output.
line_profile_report (str | None): Line-profiler output.
line_profile_config (tuple[int, int] | None): Profiling configuration detail.
Returns:
str: Markdown-formatted summary text.
"""
fastest = min(results, key=lambda x: x['Total Time (s)'])
best_quality = max(results, key=lambda x: x['SSIM'])
best_balance = min(results, key=lambda x: x['Total Time (s)'] * (1 - x['SSIM']))
pivot = df.pivot_table(
index='Image Size (px)',
columns='Grid Size (requested)',
values='Total Time (s)',
aggfunc='mean'
).sort_index().sort_index(axis=1)
df_display = df[
[
'Image Size (px)',
'Grid Size (requested)',
'Actual Grid Size',
'Cell Size (px)',
'Segmentation (s)',
'Tile Loading (s)',
'Tile Mapping (s)',
'Total Time (s)',
'MSE',
'SSIM'
]
]
per_config_profile = "\n".join(
[
f" β€’ {r['Image Size (px)']}px | grid {r['Grid Size (requested)']}:\n "
+ r.get('Profile Summary', 'No profiling data').replace('\n', '\n ')
for r in results
]
)
line_profile_config_str = (
f"{line_profile_config[0]}px image, grid {line_profile_config[1]}"
if line_profile_config else "n/a"
)
summary = f"""
πŸ”¬ PERFORMANCE BENCHMARK RESULTS
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
πŸ“Š PERFORMANCE HIGHLIGHTS:
β€’ Fastest Run: {fastest['Image Size (px)']}px image, grid {fastest['Grid Size (requested)']} (Actual {fastest['Actual Grid Size']}) β€” {fastest['Total Time (s)']:.3f}s
β€’ Highest Quality: {best_quality['Image Size (px)']}px image, grid {best_quality['Grid Size (requested)']} β€” SSIM {best_quality['SSIM']:.3f}
β€’ Best Balance: {best_balance['Image Size (px)']}px image, grid {best_balance['Grid Size (requested)']} β€” {best_balance['Total Time (s)']:.3f}s / SSIM {best_balance['SSIM']:.3f}
πŸ“ˆ GLOBAL TRENDS:
β€’ Processing efficiency: {np.mean([r['Cells/sec'] for r in results]):.0f} cells/sec on average
β€’ MSE range: {min(mse_vals):.1f}–{max(mse_vals):.1f}
β€’ SSIM range: {min(ssim_vals):.3f}–{max(ssim_vals):.3f}
πŸ“‹ TOTAL TIME (s) BY IMAGE & GRID
{pivot.to_string(float_format='%.3f')}
πŸ“‘ DETAILED MEASUREMENTS
{df_display.to_string(index=False, float_format='%.3f')}
🧠 FUNCTION-LEVEL PROFILING (aggregate top functions)
{profile_report if profile_report else 'No profiling data collected'}
🧩 PER-CONFIGURATION TOP FUNCTIONS
{per_config_profile}
🧬 LINE PROFILER (sample run: {line_profile_config_str})
{line_profile_report if line_profile_report else 'Line profiler unavailable'}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
return summary