File size: 4,680 Bytes
0e868b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import sys
import os
import time
import psutil
import numpy as np
import cv2
from unittest.mock import MagicMock
import importlib
# --- Mocking ModelScope if unavailable ---
try:
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
print("Real ModelScope found.")
USE_MOCK = False
except ImportError:
print("ModelScope not found or broken. Using Mock.")
USE_MOCK = True
if USE_MOCK:
# Create mocks
mock_modelscope = MagicMock()
mock_modelscope.pipelines = MagicMock()
mock_modelscope.utils = MagicMock()
mock_modelscope.utils.constant = MagicMock()
mock_modelscope.outputs = MagicMock()
# Setup constants
mock_modelscope.utils.constant.Tasks.image_colorization = "image-colorization"
mock_modelscope.outputs.OutputKeys.OUTPUT_IMG = "output_img"
# Mock pipeline
class MockPipeline:
def __init__(self, task, model):
self.task = task
self.model = model
print(f"Initialized MockPipeline for {model}")
def __call__(self, image):
# Simulate inference time: 0.1s per 1MP
h, w, c = image.shape
pixels = h * w
sleep_time = (pixels / 1_000_000) * 0.1
time.sleep(sleep_time)
# Simulate output (just tint the image red)
output = image.copy()
output[:, :, 2] = np.clip(output[:, :, 2] * 1.5, 0, 255) # Increase Red (BGR)
return {mock_modelscope.outputs.OutputKeys.OUTPUT_IMG: output}
def mock_pipeline_func(task, model):
return MockPipeline(task, model)
mock_modelscope.pipelines.pipeline = mock_pipeline_func
# Inject into sys.modules
sys.modules["modelscope"] = mock_modelscope
sys.modules["modelscope.pipelines"] = mock_modelscope.pipelines
sys.modules["modelscope.utils"] = mock_modelscope.utils
sys.modules["modelscope.utils.constant"] = mock_modelscope.utils.constant
sys.modules["modelscope.outputs"] = mock_modelscope.outputs
# Now import app
import app
import gradio as gr
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
def measure_memory():
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # MB
class MockProgress:
def __call__(self, *args, **kwargs):
pass
def benchmark_image(name, input_path, gt_path):
print(f"Benchmarking {name}...")
# Measure baseline memory
mem_before = measure_memory()
start_time = time.time()
# Run pipeline
try:
# Quality: Balanced (512px)
(gallery, output_path) = app.process_image(input_path, 1.0, 1.0, False, "PNG", "Balanced (512px)", progress=MockProgress())
except Exception as e:
print(f"Failed to process {name}: {e}")
return
end_time = time.time()
mem_after = measure_memory()
# Load output and GT for metrics
output = cv2.imread(output_path)
gt = cv2.imread(gt_path)
# Resize GT to match output if needed
if output.shape != gt.shape:
# print(f"Shape mismatch: Out {output.shape} vs GT {gt.shape}")
gt = cv2.resize(gt, (output.shape[1], output.shape[0]))
# Metrics
try:
score_psnr = psnr(gt, output)
score_ssim = ssim(gt, output, channel_axis=2)
except Exception as e:
print(f"Metrics failed: {e}")
score_psnr = 0
score_ssim = 0
print(f"Results for {name}:")
print(f" Time: {end_time - start_time:.4f} s")
print(f" Memory Peak Delta: {mem_after - mem_before:.2f} MB")
print(f" PSNR: {score_psnr:.2f}")
print(f" SSIM: {score_ssim:.4f}")
return {
"time": end_time - start_time,
"mem_delta": mem_after - mem_before,
"psnr": score_psnr,
"ssim": score_ssim
}
def main():
test_cases = [
("128", "test_data/128_gray.jpg", "test_data/128_gt.jpg"),
("512", "test_data/512_gray.jpg", "test_data/512_gt.jpg"),
("1080p", "test_data/1080p_gray.jpg", "test_data/1080p_gt.jpg")
]
results = {}
for name, inp, gt in test_cases:
if os.path.exists(inp):
res = benchmark_image(name, inp, gt)
results[name] = res
else:
print(f"Skipping {name}, input not found.")
print("\nSummary:")
print("Resolution | Time (s) | RAM Delta (MB) | PSNR | SSIM")
print("--- | --- | --- | --- | ---")
for name, res in results.items():
if res:
print(f"{name} | {res['time']:.4f} | {res['mem_delta']:.2f} | {res['psnr']:.2f} | {res['ssim']:.4f}")
if __name__ == "__main__":
main()
|