| | |
| | """ |
| | Benchmark script for BackgroundFX Pro. |
| | Tests performance across different configurations and hardware. |
| | """ |
| |
|
| | import time |
| | import psutil |
| | import torch |
| | import cv2 |
| | import numpy as np |
| | from pathlib import Path |
| | import json |
| | import argparse |
| | from typing import Dict, List, Any |
| | import statistics |
| | from datetime import datetime |
| |
|
| | |
| | import sys |
| | sys.path.append(str(Path(__file__).parent.parent)) |
| |
|
| | from api import ProcessingPipeline, PipelineConfig |
| | from models import ModelRegistry, ModelLoader |
| |
|
| |
|
| | class Benchmarker: |
| | """Performance benchmarking tool.""" |
| | |
| | def __init__(self, output_file: str = None): |
| | """Initialize benchmarker.""" |
| | self.results = { |
| | 'timestamp': datetime.now().isoformat(), |
| | 'system_info': self._get_system_info(), |
| | 'benchmarks': [] |
| | } |
| | self.output_file = output_file or f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| | |
| | def _get_system_info(self) -> Dict[str, Any]: |
| | """Collect system information.""" |
| | info = { |
| | 'cpu': { |
| | 'count': psutil.cpu_count(), |
| | 'frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 0, |
| | 'model': self._get_cpu_model() |
| | }, |
| | 'memory': { |
| | 'total_gb': psutil.virtual_memory().total / (1024**3), |
| | 'available_gb': psutil.virtual_memory().available / (1024**3) |
| | }, |
| | 'gpu': self._get_gpu_info(), |
| | 'python_version': sys.version, |
| | 'torch_version': torch.__version__, |
| | 'cuda_available': torch.cuda.is_available() |
| | } |
| | return info |
| | |
| | def _get_cpu_model(self) -> str: |
| | """Get CPU model name.""" |
| | try: |
| | import platform |
| | return platform.processor() |
| | except: |
| | return "Unknown" |
| | |
| | def _get_gpu_info(self) -> Dict[str, Any]: |
| | """Get GPU information.""" |
| | if torch.cuda.is_available(): |
| | return { |
| | 'name': torch.cuda.get_device_name(0), |
| | 'memory_gb': torch.cuda.get_device_properties(0).total_memory / (1024**3), |
| | 'compute_capability': torch.cuda.get_device_capability(0) |
| | } |
| | return {'available': False} |
| | |
| | def benchmark_image_processing(self, |
| | sizes: List[tuple] = None, |
| | qualities: List[str] = None, |
| | num_iterations: int = 5) -> Dict[str, Any]: |
| | """Benchmark image processing performance.""" |
| | print("\n=== Image Processing Benchmark ===") |
| | |
| | sizes = sizes or [(512, 512), (1024, 1024), (1920, 1080)] |
| | qualities = qualities or ['low', 'medium', 'high'] |
| | |
| | results = { |
| | 'test': 'image_processing', |
| | 'iterations': num_iterations, |
| | 'results': [] |
| | } |
| | |
| | for size in sizes: |
| | for quality in qualities: |
| | print(f"Testing {size[0]}x{size[1]} @ {quality} quality...") |
| | |
| | |
| | image = np.random.randint(0, 255, (*size, 3), dtype=np.uint8) |
| | |
| | |
| | config = PipelineConfig( |
| | quality_preset=quality, |
| | use_gpu=torch.cuda.is_available(), |
| | enable_cache=False |
| | ) |
| | |
| | try: |
| | pipeline = ProcessingPipeline(config) |
| | |
| | |
| | pipeline.process_image(image, None) |
| | |
| | |
| | times = [] |
| | memory_usage = [] |
| | |
| | for _ in range(num_iterations): |
| | start_mem = psutil.Process().memory_info().rss / (1024**2) |
| | start_time = time.time() |
| | |
| | result = pipeline.process_image(image, None) |
| | |
| | elapsed = time.time() - start_time |
| | end_mem = psutil.Process().memory_info().rss / (1024**2) |
| | |
| | times.append(elapsed) |
| | memory_usage.append(end_mem - start_mem) |
| | |
| | |
| | result_data = { |
| | 'size': f"{size[0]}x{size[1]}", |
| | 'quality': quality, |
| | 'avg_time': statistics.mean(times), |
| | 'std_time': statistics.stdev(times) if len(times) > 1 else 0, |
| | 'min_time': min(times), |
| | 'max_time': max(times), |
| | 'fps': 1.0 / statistics.mean(times), |
| | 'avg_memory_mb': statistics.mean(memory_usage) |
| | } |
| | |
| | results['results'].append(result_data) |
| | print(f" Average: {result_data['avg_time']:.3f}s ({result_data['fps']:.1f} FPS)") |
| | |
| | except Exception as e: |
| | print(f" Failed: {str(e)}") |
| | results['results'].append({ |
| | 'size': f"{size[0]}x{size[1]}", |
| | 'quality': quality, |
| | 'error': str(e) |
| | }) |
| | |
| | self.results['benchmarks'].append(results) |
| | return results |
| | |
| | def benchmark_model_loading(self) -> Dict[str, Any]: |
| | """Benchmark model loading times.""" |
| | print("\n=== Model Loading Benchmark ===") |
| | |
| | results = { |
| | 'test': 'model_loading', |
| | 'results': [] |
| | } |
| | |
| | registry = ModelRegistry() |
| | loader = ModelLoader(registry, device='cuda' if torch.cuda.is_available() else 'cpu') |
| | |
| | |
| | models_to_test = ['rmbg-1.4', 'u2netp', 'modnet'] |
| | |
| | for model_id in models_to_test: |
| | print(f"Loading {model_id}...") |
| | |
| | |
| | loader.unload_all() |
| | |
| | |
| | start_time = time.time() |
| | start_mem = psutil.Process().memory_info().rss / (1024**2) |
| | |
| | try: |
| | loaded = loader.load_model(model_id) |
| | |
| | elapsed = time.time() - start_time |
| | end_mem = psutil.Process().memory_info().rss / (1024**2) |
| | |
| | if loaded: |
| | result_data = { |
| | 'model': model_id, |
| | 'load_time': elapsed, |
| | 'memory_usage_mb': end_mem - start_mem, |
| | 'device': loaded.device |
| | } |
| | print(f" Loaded in {elapsed:.2f}s, Memory: {end_mem - start_mem:.1f}MB") |
| | else: |
| | result_data = { |
| | 'model': model_id, |
| | 'error': 'Failed to load' |
| | } |
| | print(f" Failed to load") |
| | |
| | except Exception as e: |
| | result_data = { |
| | 'model': model_id, |
| | 'error': str(e) |
| | } |
| | print(f" Error: {str(e)}") |
| | |
| | results['results'].append(result_data) |
| | |
| | self.results['benchmarks'].append(results) |
| | return results |
| | |
| | def benchmark_video_processing(self, |
| | duration: int = 5, |
| | fps: int = 30, |
| | size: tuple = (1280, 720)) -> Dict[str, Any]: |
| | """Benchmark video processing performance.""" |
| | print("\n=== Video Processing Benchmark ===") |
| | |
| | results = { |
| | 'test': 'video_processing', |
| | 'video_specs': { |
| | 'duration': duration, |
| | 'fps': fps, |
| | 'size': f"{size[0]}x{size[1]}", |
| | 'total_frames': duration * fps |
| | }, |
| | 'results': [] |
| | } |
| | |
| | |
| | import tempfile |
| | video_path = Path(tempfile.mkdtemp()) / "test_video.mp4" |
| | fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| | out = cv2.VideoWriter(str(video_path), fourcc, fps, size) |
| | |
| | print(f"Creating test video: {duration}s @ {fps}fps, {size[0]}x{size[1]}") |
| | for i in range(duration * fps): |
| | frame = np.random.randint(0, 255, (*size[::-1], 3), dtype=np.uint8) |
| | |
| | x = int((i / (duration * fps)) * size[0]) |
| | cv2.rectangle(frame, (x, 100), (x + 100, 200), (0, 255, 0), -1) |
| | out.write(frame) |
| | out.release() |
| | |
| | |
| | for quality in ['low', 'medium', 'high']: |
| | print(f"Processing at {quality} quality...") |
| | |
| | from api import VideoProcessorAPI |
| | processor = VideoProcessorAPI() |
| | |
| | start_time = time.time() |
| | start_mem = psutil.Process().memory_info().rss / (1024**2) |
| | |
| | try: |
| | output_path = video_path.parent / f"output_{quality}.mp4" |
| | stats = processor.process_video( |
| | str(video_path), |
| | str(output_path), |
| | background=None |
| | ) |
| | |
| | elapsed = time.time() - start_time |
| | end_mem = psutil.Process().memory_info().rss / (1024**2) |
| | |
| | result_data = { |
| | 'quality': quality, |
| | 'total_time': elapsed, |
| | 'frames_processed': stats.frames_processed, |
| | 'processing_fps': stats.processing_fps, |
| | 'time_per_frame': elapsed / stats.frames_processed if stats.frames_processed > 0 else 0, |
| | 'memory_usage_mb': end_mem - start_mem |
| | } |
| | |
| | print(f" Processed in {elapsed:.2f}s @ {stats.processing_fps:.1f} FPS") |
| | |
| | except Exception as e: |
| | result_data = { |
| | 'quality': quality, |
| | 'error': str(e) |
| | } |
| | print(f" Failed: {str(e)}") |
| | |
| | results['results'].append(result_data) |
| | |
| | |
| | video_path.unlink(missing_ok=True) |
| | |
| | self.results['benchmarks'].append(results) |
| | return results |
| | |
| | def benchmark_batch_processing(self, |
| | batch_sizes: List[int] = None, |
| | num_workers_list: List[int] = None) -> Dict[str, Any]: |
| | """Benchmark batch processing performance.""" |
| | print("\n=== Batch Processing Benchmark ===") |
| | |
| | batch_sizes = batch_sizes or [1, 5, 10, 20] |
| | num_workers_list = num_workers_list or [1, 2, 4, 8] |
| | |
| | results = { |
| | 'test': 'batch_processing', |
| | 'results': [] |
| | } |
| | |
| | |
| | test_images = [] |
| | for i in range(max(batch_sizes)): |
| | img = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8) |
| | test_images.append(img) |
| | |
| | for batch_size in batch_sizes: |
| | for num_workers in num_workers_list: |
| | print(f"Testing batch_size={batch_size}, workers={num_workers}...") |
| | |
| | config = PipelineConfig( |
| | batch_size=batch_size, |
| | num_workers=num_workers, |
| | use_gpu=torch.cuda.is_available(), |
| | enable_cache=False |
| | ) |
| | |
| | try: |
| | pipeline = ProcessingPipeline(config) |
| | |
| | start_time = time.time() |
| | results_batch = pipeline.process_batch(test_images[:batch_size]) |
| | elapsed = time.time() - start_time |
| | |
| | successful = sum(1 for r in results_batch if r.success) |
| | |
| | result_data = { |
| | 'batch_size': batch_size, |
| | 'num_workers': num_workers, |
| | 'total_time': elapsed, |
| | 'time_per_image': elapsed / batch_size, |
| | 'throughput': batch_size / elapsed, |
| | 'successful': successful |
| | } |
| | |
| | print(f" {elapsed:.2f}s total, {result_data['throughput']:.1f} images/sec") |
| | |
| | except Exception as e: |
| | result_data = { |
| | 'batch_size': batch_size, |
| | 'num_workers': num_workers, |
| | 'error': str(e) |
| | } |
| | print(f" Failed: {str(e)}") |
| | |
| | results['results'].append(result_data) |
| | |
| | self.results['benchmarks'].append(results) |
| | return results |
| | |
| | def save_results(self): |
| | """Save benchmark results to file.""" |
| | with open(self.output_file, 'w') as f: |
| | json.dump(self.results, f, indent=2) |
| | print(f"\nResults saved to: {self.output_file}") |
| | |
| | def print_summary(self): |
| | """Print benchmark summary.""" |
| | print("\n" + "="*50) |
| | print("BENCHMARK SUMMARY") |
| | print("="*50) |
| | |
| | for benchmark in self.results['benchmarks']: |
| | print(f"\n{benchmark['test'].upper()}:") |
| | |
| | if 'results' in benchmark: |
| | for result in benchmark['results']: |
| | if 'error' not in result: |
| | if benchmark['test'] == 'image_processing': |
| | print(f" {result['size']} @ {result['quality']}: {result['fps']:.1f} FPS") |
| | elif benchmark['test'] == 'model_loading': |
| | print(f" {result['model']}: {result['load_time']:.2f}s") |
| | elif benchmark['test'] == 'video_processing': |
| | print(f" {result['quality']}: {result['processing_fps']:.1f} FPS") |
| | elif benchmark['test'] == 'batch_processing': |
| | print(f" Batch {result['batch_size']} x {result['num_workers']} workers: {result['throughput']:.1f} img/s") |
| |
|
| |
|
| | def main(): |
| | """Main benchmark function.""" |
| | parser = argparse.ArgumentParser(description='BackgroundFX Pro Performance Benchmark') |
| | parser.add_argument('--tests', nargs='+', |
| | choices=['image', 'model', 'video', 'batch', 'all'], |
| | default=['all'], |
| | help='Tests to run') |
| | parser.add_argument('--output', '-o', help='Output file for results') |
| | parser.add_argument('--iterations', '-i', type=int, default=5, |
| | help='Number of iterations for each test') |
| | |
| | args = parser.parse_args() |
| | |
| | benchmarker = Benchmarker(args.output) |
| | |
| | tests_to_run = args.tests |
| | if 'all' in tests_to_run: |
| | tests_to_run = ['image', 'model', 'video', 'batch'] |
| | |
| | print("BackgroundFX Pro Performance Benchmark") |
| | print("="*50) |
| | print("System Information:") |
| | print(f" CPU: {benchmarker.results['system_info']['cpu']['model']}") |
| | print(f" Memory: {benchmarker.results['system_info']['memory']['total_gb']:.1f}GB") |
| | if benchmarker.results['system_info']['cuda_available']: |
| | print(f" GPU: {benchmarker.results['system_info']['gpu']['name']}") |
| | else: |
| | print(" GPU: Not available") |
| | |
| | |
| | if 'image' in tests_to_run: |
| | benchmarker.benchmark_image_processing(num_iterations=args.iterations) |
| | |
| | if 'model' in tests_to_run: |
| | benchmarker.benchmark_model_loading() |
| | |
| | if 'video' in tests_to_run: |
| | benchmarker.benchmark_video_processing() |
| | |
| | if 'batch' in tests_to_run: |
| | benchmarker.benchmark_batch_processing() |
| | |
| | |
| | benchmarker.save_results() |
| | benchmarker.print_summary() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |