| | """ |
| | Performance monitoring utilities for tracking inference time, throughput, and memory usage. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import functools |
| | import time |
| | from contextlib import contextmanager |
| | from dataclasses import dataclass, field |
| | from typing import Any, Callable, Dict, Optional |
| |
|
| | import psutil |
| | import torch |
| |
|
| |
|
| | @dataclass |
| | class PerformanceMetrics: |
| | """Performance metrics for a single operation.""" |
| |
|
| | inference_time: float = 0.0 |
| | memory_used_mb: float = 0.0 |
| | throughput: float = 0.0 |
| | batch_size: int = 1 |
| | device: str = "cpu" |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| |
|
| |
|
| | @dataclass |
| | class PerformanceStats: |
| | """Aggregated performance statistics.""" |
| |
|
| | total_calls: int = 0 |
| | total_time: float = 0.0 |
| | total_memory: float = 0.0 |
| | min_time: float = float("inf") |
| | max_time: float = 0.0 |
| | avg_time: float = 0.0 |
| | min_memory: float = float("inf") |
| | max_memory: float = 0.0 |
| | avg_memory: float = 0.0 |
| | avg_throughput: float = 0.0 |
| |
|
| |
|
| | class PerformanceMonitor: |
| | """Monitor and track performance metrics.""" |
| |
|
| | def __init__(self): |
| | self.metrics: list[PerformanceMetrics] = [] |
| | self._stats: Dict[str, PerformanceStats] = {} |
| |
|
| | def record( |
| | self, |
| | inference_time: float, |
| | memory_used_mb: float = 0.0, |
| | batch_size: int = 1, |
| | device: str = "cpu", |
| | metadata: Optional[Dict[str, Any]] = None, |
| | operation_name: str = "operation", |
| | ) -> PerformanceMetrics: |
| | """Record performance metrics.""" |
| | throughput = batch_size / inference_time if inference_time > 0 else 0.0 |
| | |
| | metric = PerformanceMetrics( |
| | inference_time=inference_time, |
| | memory_used_mb=memory_used_mb, |
| | throughput=throughput, |
| | batch_size=batch_size, |
| | device=device, |
| | metadata=metadata or {}, |
| | ) |
| | |
| | self.metrics.append(metric) |
| | |
| | |
| | if operation_name not in self._stats: |
| | self._stats[operation_name] = PerformanceStats() |
| | |
| | stats = self._stats[operation_name] |
| | stats.total_calls += 1 |
| | stats.total_time += inference_time |
| | stats.total_memory += memory_used_mb |
| | stats.min_time = min(stats.min_time, inference_time) |
| | stats.max_time = max(stats.max_time, inference_time) |
| | stats.min_memory = min(stats.min_memory, memory_used_mb) |
| | stats.max_memory = max(stats.max_memory, memory_used_mb) |
| | stats.avg_time = stats.total_time / stats.total_calls |
| | stats.avg_memory = stats.total_memory / stats.total_calls |
| | stats.avg_throughput = batch_size / stats.avg_time if stats.avg_time > 0 else 0.0 |
| | |
| | return metric |
| |
|
| | def get_stats(self, operation_name: Optional[str] = None) -> Dict[str, PerformanceStats]: |
| | """Get performance statistics.""" |
| | if operation_name: |
| | return {operation_name: self._stats.get(operation_name, PerformanceStats())} |
| | return self._stats.copy() |
| |
|
| | def get_summary(self) -> Dict[str, Any]: |
| | """Get summary of all performance metrics.""" |
| | summary = {} |
| | for op_name, stats in self._stats.items(): |
| | summary[op_name] = { |
| | "total_calls": stats.total_calls, |
| | "avg_time_seconds": stats.avg_time, |
| | "min_time_seconds": stats.min_time, |
| | "max_time_seconds": stats.max_time, |
| | "avg_memory_mb": stats.avg_memory, |
| | "min_memory_mb": stats.min_memory, |
| | "max_memory_mb": stats.max_memory, |
| | "avg_throughput": stats.avg_throughput, |
| | } |
| | return summary |
| |
|
| | def reset(self) -> None: |
| | """Reset all metrics and statistics.""" |
| | self.metrics.clear() |
| | self._stats.clear() |
| |
|
| |
|
| | def get_memory_usage_mb(process: Optional[psutil.Process] = None) -> float: |
| | """Get current memory usage in MB.""" |
| | if process is None: |
| | process = psutil.Process() |
| | try: |
| | return process.memory_info().rss / 1024 / 1024 |
| | except Exception: |
| | return 0.0 |
| |
|
| |
|
| | def get_gpu_memory_mb(device: str = "cuda:0") -> float: |
| | """Get GPU memory usage in MB.""" |
| | try: |
| | if torch.cuda.is_available() and device.startswith("cuda"): |
| | device_id = int(device.split(":")[1]) if ":" in device else 0 |
| | return torch.cuda.memory_allocated(device_id) / 1024 / 1024 |
| | except Exception: |
| | pass |
| | return 0.0 |
| |
|
| |
|
| | @contextmanager |
| | def measure_performance( |
| | monitor: PerformanceMonitor, |
| | operation_name: str = "operation", |
| | batch_size: int = 1, |
| | device: str = "cpu", |
| | metadata: Optional[Dict[str, Any]] = None, |
| | ): |
| | """Context manager to measure performance of a code block.""" |
| | process = psutil.Process() |
| | memory_before = get_memory_usage_mb(process) |
| | |
| | if device.startswith("cuda"): |
| | gpu_memory_before = get_gpu_memory_mb(device) |
| | else: |
| | gpu_memory_before = 0.0 |
| | |
| | start_time = time.time() |
| | |
| | try: |
| | yield |
| | finally: |
| | end_time = time.time() |
| | inference_time = end_time - start_time |
| | |
| | memory_after = get_memory_usage_mb(process) |
| | memory_used = memory_after - memory_before |
| | |
| | if device.startswith("cuda"): |
| | gpu_memory_after = get_gpu_memory_mb(device) |
| | gpu_memory_used = gpu_memory_after - gpu_memory_before |
| | memory_used = max(memory_used, gpu_memory_used) |
| | |
| | monitor.record( |
| | inference_time=inference_time, |
| | memory_used_mb=max(memory_used, 0.0), |
| | batch_size=batch_size, |
| | device=device, |
| | metadata=metadata or {}, |
| | operation_name=operation_name, |
| | ) |
| |
|
| |
|
| | def monitor_performance( |
| | operation_name: Optional[str] = None, |
| | batch_size: int = 1, |
| | device: str = "cpu", |
| | monitor: Optional[PerformanceMonitor] = None, |
| | ): |
| | """Decorator to monitor performance of a function.""" |
| | if monitor is None: |
| | monitor = PerformanceMonitor() |
| |
|
| | def decorator(func: Callable) -> Callable: |
| | name = operation_name or func.__name__ |
| |
|
| | @functools.wraps(func) |
| | def wrapper(*args, **kwargs): |
| | with measure_performance(monitor, operation_name=name, batch_size=batch_size, device=device): |
| | return func(*args, **kwargs) |
| |
|
| | wrapper._monitor = monitor |
| | return wrapper |
| |
|
| | return decorator |
| |
|