| | import sys |
| | import os |
| | import time |
| | import psutil |
| | import numpy as np |
| | import cv2 |
| | from unittest.mock import MagicMock |
| | import importlib |
| |
|
| | |
| | try: |
| | from modelscope.pipelines import pipeline |
| | from modelscope.utils.constant import Tasks |
| | print("Real ModelScope found.") |
| | USE_MOCK = False |
| | except ImportError: |
| | print("ModelScope not found or broken. Using Mock.") |
| | USE_MOCK = True |
| |
|
| | if USE_MOCK: |
| | |
| | mock_modelscope = MagicMock() |
| | mock_modelscope.pipelines = MagicMock() |
| | mock_modelscope.utils = MagicMock() |
| | mock_modelscope.utils.constant = MagicMock() |
| | mock_modelscope.outputs = MagicMock() |
| |
|
| | |
| | mock_modelscope.utils.constant.Tasks.image_colorization = "image-colorization" |
| | mock_modelscope.outputs.OutputKeys.OUTPUT_IMG = "output_img" |
| |
|
| | |
| | class MockPipeline: |
| | def __init__(self, task, model): |
| | self.task = task |
| | self.model = model |
| | print(f"Initialized MockPipeline for {model}") |
| |
|
| | def __call__(self, image): |
| | |
| | h, w, c = image.shape |
| | pixels = h * w |
| | sleep_time = (pixels / 1_000_000) * 0.1 |
| | time.sleep(sleep_time) |
| |
|
| | |
| | output = image.copy() |
| | output[:, :, 2] = np.clip(output[:, :, 2] * 1.5, 0, 255) |
| |
|
| | return {mock_modelscope.outputs.OutputKeys.OUTPUT_IMG: output} |
| |
|
| | def mock_pipeline_func(task, model): |
| | return MockPipeline(task, model) |
| |
|
| | mock_modelscope.pipelines.pipeline = mock_pipeline_func |
| |
|
| | |
| | sys.modules["modelscope"] = mock_modelscope |
| | sys.modules["modelscope.pipelines"] = mock_modelscope.pipelines |
| | sys.modules["modelscope.utils"] = mock_modelscope.utils |
| | sys.modules["modelscope.utils.constant"] = mock_modelscope.utils.constant |
| | sys.modules["modelscope.outputs"] = mock_modelscope.outputs |
| |
|
| | |
| | import app |
| | import gradio as gr |
| | from skimage.metrics import peak_signal_noise_ratio as psnr |
| | from skimage.metrics import structural_similarity as ssim |
| |
|
| | def measure_memory(): |
| | process = psutil.Process(os.getpid()) |
| | return process.memory_info().rss / 1024 / 1024 |
| |
|
| | class MockProgress: |
| | def __call__(self, *args, **kwargs): |
| | pass |
| |
|
| | def benchmark_image(name, input_path, gt_path): |
| | print(f"Benchmarking {name}...") |
| |
|
| | |
| | mem_before = measure_memory() |
| |
|
| | start_time = time.time() |
| |
|
| | |
| | try: |
| | |
| | (gallery, output_path) = app.process_image(input_path, 1.0, 1.0, False, "PNG", "Balanced (512px)", progress=MockProgress()) |
| | except Exception as e: |
| | print(f"Failed to process {name}: {e}") |
| | return |
| |
|
| | end_time = time.time() |
| | mem_after = measure_memory() |
| |
|
| | |
| | output = cv2.imread(output_path) |
| | gt = cv2.imread(gt_path) |
| |
|
| | |
| | if output.shape != gt.shape: |
| | |
| | gt = cv2.resize(gt, (output.shape[1], output.shape[0])) |
| |
|
| | |
| | try: |
| | score_psnr = psnr(gt, output) |
| | score_ssim = ssim(gt, output, channel_axis=2) |
| | except Exception as e: |
| | print(f"Metrics failed: {e}") |
| | score_psnr = 0 |
| | score_ssim = 0 |
| |
|
| | print(f"Results for {name}:") |
| | print(f" Time: {end_time - start_time:.4f} s") |
| | print(f" Memory Peak Delta: {mem_after - mem_before:.2f} MB") |
| | print(f" PSNR: {score_psnr:.2f}") |
| | print(f" SSIM: {score_ssim:.4f}") |
| |
|
| | return { |
| | "time": end_time - start_time, |
| | "mem_delta": mem_after - mem_before, |
| | "psnr": score_psnr, |
| | "ssim": score_ssim |
| | } |
| |
|
| | def main(): |
| | test_cases = [ |
| | ("128", "test_data/128_gray.jpg", "test_data/128_gt.jpg"), |
| | ("512", "test_data/512_gray.jpg", "test_data/512_gt.jpg"), |
| | ("1080p", "test_data/1080p_gray.jpg", "test_data/1080p_gt.jpg") |
| | ] |
| |
|
| | results = {} |
| | for name, inp, gt in test_cases: |
| | if os.path.exists(inp): |
| | res = benchmark_image(name, inp, gt) |
| | results[name] = res |
| | else: |
| | print(f"Skipping {name}, input not found.") |
| |
|
| | print("\nSummary:") |
| | print("Resolution | Time (s) | RAM Delta (MB) | PSNR | SSIM") |
| | print("--- | --- | --- | --- | ---") |
| | for name, res in results.items(): |
| | if res: |
| | print(f"{name} | {res['time']:.4f} | {res['mem_delta']:.2f} | {res['psnr']:.2f} | {res['ssim']:.4f}") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|