import sys import os import time import psutil import numpy as np import cv2 from unittest.mock import MagicMock import importlib # --- Mocking ModelScope if unavailable --- try: from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks print("Real ModelScope found.") USE_MOCK = False except ImportError: print("ModelScope not found or broken. Using Mock.") USE_MOCK = True if USE_MOCK: # Create mocks mock_modelscope = MagicMock() mock_modelscope.pipelines = MagicMock() mock_modelscope.utils = MagicMock() mock_modelscope.utils.constant = MagicMock() mock_modelscope.outputs = MagicMock() # Setup constants mock_modelscope.utils.constant.Tasks.image_colorization = "image-colorization" mock_modelscope.outputs.OutputKeys.OUTPUT_IMG = "output_img" # Mock pipeline class MockPipeline: def __init__(self, task, model): self.task = task self.model = model print(f"Initialized MockPipeline for {model}") def __call__(self, image): # Simulate inference time: 0.1s per 1MP h, w, c = image.shape pixels = h * w sleep_time = (pixels / 1_000_000) * 0.1 time.sleep(sleep_time) # Simulate output (just tint the image red) output = image.copy() output[:, :, 2] = np.clip(output[:, :, 2] * 1.5, 0, 255) # Increase Red (BGR) return {mock_modelscope.outputs.OutputKeys.OUTPUT_IMG: output} def mock_pipeline_func(task, model): return MockPipeline(task, model) mock_modelscope.pipelines.pipeline = mock_pipeline_func # Inject into sys.modules sys.modules["modelscope"] = mock_modelscope sys.modules["modelscope.pipelines"] = mock_modelscope.pipelines sys.modules["modelscope.utils"] = mock_modelscope.utils sys.modules["modelscope.utils.constant"] = mock_modelscope.utils.constant sys.modules["modelscope.outputs"] = mock_modelscope.outputs # Now import app import app import gradio as gr from skimage.metrics import peak_signal_noise_ratio as psnr from skimage.metrics import structural_similarity as ssim def measure_memory(): process = psutil.Process(os.getpid()) return process.memory_info().rss / 1024 / 1024 # MB class MockProgress: def __call__(self, *args, **kwargs): pass def benchmark_image(name, input_path, gt_path): print(f"Benchmarking {name}...") # Measure baseline memory mem_before = measure_memory() start_time = time.time() # Run pipeline try: # Quality: Balanced (512px) (gallery, output_path) = app.process_image(input_path, 1.0, 1.0, False, "PNG", "Balanced (512px)", progress=MockProgress()) except Exception as e: print(f"Failed to process {name}: {e}") return end_time = time.time() mem_after = measure_memory() # Load output and GT for metrics output = cv2.imread(output_path) gt = cv2.imread(gt_path) # Resize GT to match output if needed if output.shape != gt.shape: # print(f"Shape mismatch: Out {output.shape} vs GT {gt.shape}") gt = cv2.resize(gt, (output.shape[1], output.shape[0])) # Metrics try: score_psnr = psnr(gt, output) score_ssim = ssim(gt, output, channel_axis=2) except Exception as e: print(f"Metrics failed: {e}") score_psnr = 0 score_ssim = 0 print(f"Results for {name}:") print(f" Time: {end_time - start_time:.4f} s") print(f" Memory Peak Delta: {mem_after - mem_before:.2f} MB") print(f" PSNR: {score_psnr:.2f}") print(f" SSIM: {score_ssim:.4f}") return { "time": end_time - start_time, "mem_delta": mem_after - mem_before, "psnr": score_psnr, "ssim": score_ssim } def main(): test_cases = [ ("128", "test_data/128_gray.jpg", "test_data/128_gt.jpg"), ("512", "test_data/512_gray.jpg", "test_data/512_gt.jpg"), ("1080p", "test_data/1080p_gray.jpg", "test_data/1080p_gt.jpg") ] results = {} for name, inp, gt in test_cases: if os.path.exists(inp): res = benchmark_image(name, inp, gt) results[name] = res else: print(f"Skipping {name}, input not found.") print("\nSummary:") print("Resolution | Time (s) | RAM Delta (MB) | PSNR | SSIM") print("--- | --- | --- | --- | ---") for name, res in results.items(): if res: print(f"{name} | {res['time']:.4f} | {res['mem_delta']:.2f} | {res['psnr']:.2f} | {res['ssim']:.4f}") if __name__ == "__main__": main()