test213 / benchmark.py
dkescape's picture
Upload 10 files
0e868b4 verified
import sys
import os
import time
import psutil
import numpy as np
import cv2
from unittest.mock import MagicMock
import importlib
# --- Mocking ModelScope if unavailable ---
try:
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
print("Real ModelScope found.")
USE_MOCK = False
except ImportError:
print("ModelScope not found or broken. Using Mock.")
USE_MOCK = True
if USE_MOCK:
# Create mocks
mock_modelscope = MagicMock()
mock_modelscope.pipelines = MagicMock()
mock_modelscope.utils = MagicMock()
mock_modelscope.utils.constant = MagicMock()
mock_modelscope.outputs = MagicMock()
# Setup constants
mock_modelscope.utils.constant.Tasks.image_colorization = "image-colorization"
mock_modelscope.outputs.OutputKeys.OUTPUT_IMG = "output_img"
# Mock pipeline
class MockPipeline:
def __init__(self, task, model):
self.task = task
self.model = model
print(f"Initialized MockPipeline for {model}")
def __call__(self, image):
# Simulate inference time: 0.1s per 1MP
h, w, c = image.shape
pixels = h * w
sleep_time = (pixels / 1_000_000) * 0.1
time.sleep(sleep_time)
# Simulate output (just tint the image red)
output = image.copy()
output[:, :, 2] = np.clip(output[:, :, 2] * 1.5, 0, 255) # Increase Red (BGR)
return {mock_modelscope.outputs.OutputKeys.OUTPUT_IMG: output}
def mock_pipeline_func(task, model):
return MockPipeline(task, model)
mock_modelscope.pipelines.pipeline = mock_pipeline_func
# Inject into sys.modules
sys.modules["modelscope"] = mock_modelscope
sys.modules["modelscope.pipelines"] = mock_modelscope.pipelines
sys.modules["modelscope.utils"] = mock_modelscope.utils
sys.modules["modelscope.utils.constant"] = mock_modelscope.utils.constant
sys.modules["modelscope.outputs"] = mock_modelscope.outputs
# Now import app
import app
import gradio as gr
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
def measure_memory():
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # MB
class MockProgress:
def __call__(self, *args, **kwargs):
pass
def benchmark_image(name, input_path, gt_path):
print(f"Benchmarking {name}...")
# Measure baseline memory
mem_before = measure_memory()
start_time = time.time()
# Run pipeline
try:
# Quality: Balanced (512px)
(gallery, output_path) = app.process_image(input_path, 1.0, 1.0, False, "PNG", "Balanced (512px)", progress=MockProgress())
except Exception as e:
print(f"Failed to process {name}: {e}")
return
end_time = time.time()
mem_after = measure_memory()
# Load output and GT for metrics
output = cv2.imread(output_path)
gt = cv2.imread(gt_path)
# Resize GT to match output if needed
if output.shape != gt.shape:
# print(f"Shape mismatch: Out {output.shape} vs GT {gt.shape}")
gt = cv2.resize(gt, (output.shape[1], output.shape[0]))
# Metrics
try:
score_psnr = psnr(gt, output)
score_ssim = ssim(gt, output, channel_axis=2)
except Exception as e:
print(f"Metrics failed: {e}")
score_psnr = 0
score_ssim = 0
print(f"Results for {name}:")
print(f" Time: {end_time - start_time:.4f} s")
print(f" Memory Peak Delta: {mem_after - mem_before:.2f} MB")
print(f" PSNR: {score_psnr:.2f}")
print(f" SSIM: {score_ssim:.4f}")
return {
"time": end_time - start_time,
"mem_delta": mem_after - mem_before,
"psnr": score_psnr,
"ssim": score_ssim
}
def main():
test_cases = [
("128", "test_data/128_gray.jpg", "test_data/128_gt.jpg"),
("512", "test_data/512_gray.jpg", "test_data/512_gt.jpg"),
("1080p", "test_data/1080p_gray.jpg", "test_data/1080p_gt.jpg")
]
results = {}
for name, inp, gt in test_cases:
if os.path.exists(inp):
res = benchmark_image(name, inp, gt)
results[name] = res
else:
print(f"Skipping {name}, input not found.")
print("\nSummary:")
print("Resolution | Time (s) | RAM Delta (MB) | PSNR | SSIM")
print("--- | --- | --- | --- | ---")
for name, res in results.items():
if res:
print(f"{name} | {res['time']:.4f} | {res['mem_delta']:.2f} | {res['psnr']:.2f} | {res['ssim']:.4f}")
if __name__ == "__main__":
main()