| | """ |
| | Profiling infrastructure for tracking performance metrics, hot paths, and resource utilization. |
| | |
| | This module provides: |
| | - Timing decorators for function-level profiling |
| | - GPU/CPU utilization tracking |
| | - Memory usage monitoring |
| | - Hot path identification |
| | - API endpoints for remote profiling access |
| | """ |
| |
|
| | import functools |
| | import logging |
| | import threading |
| | import time |
| | from collections import defaultdict, deque |
| | from dataclasses import asdict, dataclass, field |
| | from typing import Any, Callable, Dict, List, Optional |
| |
|
| | try: |
| | import torch |
| |
|
| | HAS_TORCH = True |
| | except ImportError: |
| | HAS_TORCH = False |
| |
|
| | try: |
| | import psutil |
| |
|
| | HAS_PSUTIL = True |
| | except ImportError: |
| | HAS_PSUTIL = False |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @dataclass |
| | class ProfileEntry: |
| | """Single profiling entry for a function call.""" |
| |
|
| | function_name: str |
| | stage: str |
| | start_time: float |
| | end_time: float |
| | duration: float |
| | memory_before: Optional[float] = None |
| | memory_after: Optional[float] = None |
| | gpu_memory_before: Optional[float] = None |
| | gpu_memory_after: Optional[float] = None |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| | thread_id: int = 0 |
| | call_id: str = "" |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | """Convert to dictionary for JSON serialization.""" |
| | return asdict(self) |
| |
|
| |
|
| | @dataclass |
| | class StageStats: |
| | """Statistics for a pipeline stage.""" |
| |
|
| | stage_name: str |
| | call_count: int = 0 |
| | total_time: float = 0.0 |
| | min_time: float = float("inf") |
| | max_time: float = 0.0 |
| | avg_time: float = 0.0 |
| | total_memory: float = 0.0 |
| | peak_memory: float = 0.0 |
| | gpu_total_memory: float = 0.0 |
| | gpu_peak_memory: float = 0.0 |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| |
|
| | def update(self, entry: ProfileEntry): |
| | """Update statistics with a new entry.""" |
| | self.call_count += 1 |
| | self.total_time += entry.duration |
| | self.min_time = min(self.min_time, entry.duration) |
| | self.max_time = max(self.max_time, entry.duration) |
| | self.avg_time = self.total_time / self.call_count |
| |
|
| | if entry.memory_before is not None and entry.memory_after is not None: |
| | memory_delta = entry.memory_after - entry.memory_before |
| | self.total_memory += memory_delta |
| | self.peak_memory = max(self.peak_memory, entry.memory_after) |
| |
|
| | if entry.gpu_memory_before is not None and entry.gpu_memory_after is not None: |
| | gpu_memory_delta = entry.gpu_memory_after - entry.gpu_memory_before |
| | self.gpu_total_memory += gpu_memory_delta |
| | self.gpu_peak_memory = max(self.gpu_peak_memory, entry.gpu_memory_after) |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | """Convert to dictionary for JSON serialization.""" |
| | return asdict(self) |
| |
|
| |
|
| | class Profiler: |
| | """ |
| | Global profiler for tracking performance metrics across the pipeline. |
| | |
| | Thread-safe and designed for use in multi-threaded environments. |
| | """ |
| |
|
| | _instance: Optional["Profiler"] = None |
| | _lock = threading.Lock() |
| |
|
| | def __init__(self, max_entries: int = 10000, enabled: bool = True): |
| | """ |
| | Args: |
| | max_entries: Maximum number of entries to keep in memory |
| | enabled: Whether profiling is enabled |
| | """ |
| | self.enabled = enabled |
| | self.max_entries = max_entries |
| |
|
| | |
| | self._lock = threading.Lock() |
| | self.entries: deque = deque(maxlen=max_entries) |
| | self.stage_stats: Dict[str, StageStats] = {} |
| | self.function_stats: Dict[str, StageStats] = {} |
| |
|
| | |
| | self.hot_paths: List[Dict[str, Any]] = [] |
| |
|
| | |
| | self.active_operations: Dict[int, List[str]] = defaultdict(list) |
| |
|
| | |
| | self.system_metrics: List[Dict[str, Any]] = [] |
| | self._last_system_check = 0.0 |
| | self._system_check_interval = 1.0 |
| |
|
| | @classmethod |
| | def get_instance(cls) -> "Profiler": |
| | """Get singleton instance.""" |
| | if cls._instance is None: |
| | with cls._lock: |
| | if cls._instance is None: |
| | cls._instance = cls() |
| | return cls._instance |
| |
|
| | def reset(self): |
| | """Reset all profiling data.""" |
| | with self._lock: |
| | self.entries.clear() |
| | self.stage_stats.clear() |
| | self.function_stats.clear() |
| | self.hot_paths.clear() |
| | self.active_operations.clear() |
| | self.system_metrics.clear() |
| |
|
| | def _get_memory_usage(self) -> Optional[float]: |
| | """Get current process memory usage in MB.""" |
| | if not HAS_PSUTIL: |
| | return None |
| | try: |
| | process = psutil.Process() |
| | return process.memory_info().rss / 1024 / 1024 |
| | except Exception: |
| | return None |
| |
|
| | def _get_gpu_memory_usage(self, device: str = "cuda:0") -> Optional[float]: |
| | """Get current GPU memory usage in MB.""" |
| | if not HAS_TORCH or not torch.cuda.is_available(): |
| | return None |
| | try: |
| | return torch.cuda.memory_allocated(device) / 1024 / 1024 |
| | except Exception: |
| | return None |
| |
|
| | def _update_system_metrics(self): |
| | """Update system-level metrics (CPU, memory, GPU).""" |
| | current_time = time.time() |
| | if current_time - self._last_system_check < self._system_check_interval: |
| | return |
| |
|
| | self._last_system_check = current_time |
| |
|
| | metrics = { |
| | "timestamp": current_time, |
| | "cpu_percent": None, |
| | "memory_percent": None, |
| | "gpu_memory_used": None, |
| | "gpu_memory_total": None, |
| | "gpu_utilization": None, |
| | } |
| |
|
| | if HAS_PSUTIL: |
| | try: |
| | process = psutil.Process() |
| | metrics["cpu_percent"] = process.cpu_percent() |
| | metrics["memory_percent"] = process.memory_percent() |
| | except Exception: |
| | pass |
| |
|
| | if HAS_TORCH and torch.cuda.is_available(): |
| | try: |
| | metrics["gpu_memory_used"] = torch.cuda.memory_allocated() / 1024 / 1024 |
| | metrics["gpu_memory_total"] = ( |
| | torch.cuda.get_device_properties(0).total_memory / 1024 / 1024 |
| | ) |
| | |
| | except Exception: |
| | pass |
| |
|
| | with self._lock: |
| | self.system_metrics.append(metrics) |
| | if len(self.system_metrics) > 1000: |
| | self.system_metrics.pop(0) |
| |
|
| | def start_operation(self, function_name: str, stage: str = "unknown", **metadata) -> str: |
| | """ |
| | Start profiling an operation. |
| | |
| | Returns: |
| | call_id: Unique identifier for this call |
| | """ |
| | if not self.enabled: |
| | return "" |
| |
|
| | call_id = f"{function_name}_{time.time()}_{threading.get_ident()}" |
| | thread_id = threading.get_ident() |
| |
|
| | entry = ProfileEntry( |
| | function_name=function_name, |
| | stage=stage, |
| | start_time=time.time(), |
| | end_time=0.0, |
| | duration=0.0, |
| | memory_before=self._get_memory_usage(), |
| | gpu_memory_before=self._get_gpu_memory_usage(), |
| | metadata=metadata, |
| | thread_id=thread_id, |
| | call_id=call_id, |
| | ) |
| |
|
| | with self._lock: |
| | self.active_operations[thread_id].append(call_id) |
| |
|
| | |
| | |
| | entry.metadata["_temp_entry"] = entry |
| |
|
| | self._update_system_metrics() |
| |
|
| | return call_id |
| |
|
| | def end_operation(self, call_id: str): |
| | """End profiling an operation.""" |
| | if not self.enabled or not call_id: |
| | return |
| |
|
| | thread_id = threading.get_ident() |
| |
|
| | |
| | with self._lock: |
| | if ( |
| | thread_id in self.active_operations |
| | and call_id in self.active_operations[thread_id] |
| | ): |
| | |
| | |
| | pass |
| |
|
| | |
| | |
| | |
| |
|
| | self._update_system_metrics() |
| |
|
| | def record(self, function_name: str, stage: str, duration: float, **metadata): |
| | """ |
| | Record a completed operation. |
| | |
| | Args: |
| | function_name: Name of the function |
| | stage: Pipeline stage (e.g., "gpu", "cpu", "data_loading") |
| | duration: Duration in seconds |
| | **metadata: Additional metadata |
| | """ |
| | if not self.enabled: |
| | return |
| |
|
| | thread_id = threading.get_ident() |
| | entry = ProfileEntry( |
| | function_name=function_name, |
| | stage=stage, |
| | start_time=time.time() - duration, |
| | end_time=time.time(), |
| | duration=duration, |
| | memory_before=self._get_memory_usage(), |
| | memory_after=self._get_memory_usage(), |
| | gpu_memory_before=self._get_gpu_memory_usage(), |
| | gpu_memory_after=self._get_gpu_memory_usage(), |
| | metadata=metadata, |
| | thread_id=thread_id, |
| | call_id=f"{function_name}_{time.time()}_{thread_id}", |
| | ) |
| |
|
| | with self._lock: |
| | self.entries.append(entry) |
| |
|
| | |
| | if stage not in self.stage_stats: |
| | self.stage_stats[stage] = StageStats(stage_name=stage) |
| | self.stage_stats[stage].update(entry) |
| |
|
| | |
| | if function_name not in self.function_stats: |
| | self.function_stats[function_name] = StageStats(stage_name=function_name) |
| | self.function_stats[function_name].update(entry) |
| |
|
| | |
| | self._update_hot_paths() |
| |
|
| | self._update_system_metrics() |
| |
|
| | def _update_hot_paths(self): |
| | """Update hot paths list (top operations by total time).""" |
| | with self._lock: |
| | |
| | function_totals: Dict[str, float] = defaultdict(float) |
| | for entry in self.entries: |
| | function_totals[entry.function_name] += entry.duration |
| |
|
| | |
| | sorted_functions = sorted(function_totals.items(), key=lambda x: x[1], reverse=True) |
| |
|
| | self.hot_paths = [ |
| | { |
| | "function": func, |
| | "total_time": total, |
| | "call_count": self.function_stats.get(func, StageStats(func)).call_count, |
| | "avg_time": self.function_stats.get(func, StageStats(func)).avg_time, |
| | } |
| | for func, total in sorted_functions[:20] |
| | ] |
| |
|
| | def get_metrics(self) -> Dict[str, Any]: |
| | """Get all profiling metrics.""" |
| | with self._lock: |
| | return { |
| | "enabled": self.enabled, |
| | "total_entries": len(self.entries), |
| | "stage_stats": { |
| | stage: stats.to_dict() for stage, stats in self.stage_stats.items() |
| | }, |
| | "function_stats": { |
| | func: stats.to_dict() for func, stats in self.function_stats.items() |
| | }, |
| | "hot_paths": self.hot_paths, |
| | "system_metrics": ( |
| | self.system_metrics[-100:] if self.system_metrics else [] |
| | ), |
| | } |
| |
|
| | def get_stage_stats(self, stage: str) -> Optional[Dict[str, Any]]: |
| | """Get statistics for a specific stage.""" |
| | with self._lock: |
| | if stage in self.stage_stats: |
| | return self.stage_stats[stage].to_dict() |
| | return None |
| |
|
| | def get_latency_breakdown(self) -> Dict[str, Any]: |
| | """Get latency breakdown by stage.""" |
| | with self._lock: |
| | breakdown = {} |
| | total_time = sum(stats.total_time for stats in self.stage_stats.values()) |
| |
|
| | for stage, stats in self.stage_stats.items(): |
| | percentage = (stats.total_time / total_time * 100) if total_time > 0 else 0 |
| | breakdown[stage] = { |
| | "total_time": stats.total_time, |
| | "avg_time": stats.avg_time, |
| | "call_count": stats.call_count, |
| | "percentage": percentage, |
| | } |
| |
|
| | return { |
| | "total_time": total_time, |
| | "breakdown": breakdown, |
| | } |
| |
|
| |
|
| | def profile(stage: str = "unknown", **metadata): |
| | """ |
| | Decorator for profiling functions. |
| | |
| | Usage: |
| | @profile(stage="gpu") |
| | def my_function(): |
| | ... |
| | """ |
| |
|
| | def decorator(func: Callable) -> Callable: |
| | @functools.wraps(func) |
| | def wrapper(*args, **kwargs): |
| | profiler = Profiler.get_instance() |
| | if not profiler.enabled: |
| | return func(*args, **kwargs) |
| |
|
| | start_time = time.time() |
| | try: |
| | result = func(*args, **kwargs) |
| | duration = time.time() - start_time |
| | profiler.record( |
| | function_name=func.__name__, stage=stage, duration=duration, **metadata |
| | ) |
| | return result |
| | except Exception as e: |
| | duration = time.time() - start_time |
| | profiler.record( |
| | function_name=func.__name__, |
| | stage=stage, |
| | duration=duration, |
| | error=str(e), |
| | **metadata, |
| | ) |
| | raise |
| |
|
| | return wrapper |
| |
|
| | return decorator |
| |
|
| |
|
| | def profile_context(stage: str = "unknown", **metadata): |
| | """ |
| | Context manager for profiling code blocks. |
| | |
| | Usage: |
| | with profile_context(stage="gpu"): |
| | # code to profile |
| | """ |
| |
|
| | class ProfileContext: |
| | def __init__(self, stage: str, **metadata): |
| | self.stage = stage |
| | self.metadata = metadata |
| | self.profiler = Profiler.get_instance() |
| | self.start_time = None |
| |
|
| | def __enter__(self): |
| | if self.profiler.enabled: |
| | self.start_time = time.time() |
| | return self |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | if self.profiler.enabled and self.start_time is not None: |
| | duration = time.time() - self.start_time |
| | |
| | import inspect |
| |
|
| | frame = inspect.currentframe().f_back |
| | function_name = frame.f_code.co_name if frame else "unknown" |
| |
|
| | self.profiler.record( |
| | function_name=function_name, |
| | stage=self.stage, |
| | duration=duration, |
| | **self.metadata, |
| | ) |
| | return False |
| |
|
| | return ProfileContext(stage, **metadata) |
| |
|