""" Profiling infrastructure for tracking performance metrics, hot paths, and resource utilization. This module provides: - Timing decorators for function-level profiling - GPU/CPU utilization tracking - Memory usage monitoring - Hot path identification - API endpoints for remote profiling access """ import functools import logging import threading import time from collections import defaultdict, deque from dataclasses import asdict, dataclass, field from typing import Any, Callable, Dict, List, Optional try: import torch HAS_TORCH = True except ImportError: HAS_TORCH = False try: import psutil HAS_PSUTIL = True except ImportError: HAS_PSUTIL = False logger = logging.getLogger(__name__) @dataclass class ProfileEntry: """Single profiling entry for a function call.""" function_name: str stage: str # e.g., "gpu", "cpu", "data_loading" start_time: float end_time: float duration: float memory_before: Optional[float] = None memory_after: Optional[float] = None gpu_memory_before: Optional[float] = None gpu_memory_after: Optional[float] = None metadata: Dict[str, Any] = field(default_factory=dict) thread_id: int = 0 call_id: str = "" def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" return asdict(self) @dataclass class StageStats: """Statistics for a pipeline stage.""" stage_name: str call_count: int = 0 total_time: float = 0.0 min_time: float = float("inf") max_time: float = 0.0 avg_time: float = 0.0 total_memory: float = 0.0 peak_memory: float = 0.0 gpu_total_memory: float = 0.0 gpu_peak_memory: float = 0.0 metadata: Dict[str, Any] = field(default_factory=dict) def update(self, entry: ProfileEntry): """Update statistics with a new entry.""" self.call_count += 1 self.total_time += entry.duration self.min_time = min(self.min_time, entry.duration) self.max_time = max(self.max_time, entry.duration) self.avg_time = self.total_time / self.call_count if entry.memory_before is not None and entry.memory_after is not None: memory_delta = entry.memory_after - entry.memory_before self.total_memory += memory_delta self.peak_memory = max(self.peak_memory, entry.memory_after) if entry.gpu_memory_before is not None and entry.gpu_memory_after is not None: gpu_memory_delta = entry.gpu_memory_after - entry.gpu_memory_before self.gpu_total_memory += gpu_memory_delta self.gpu_peak_memory = max(self.gpu_peak_memory, entry.gpu_memory_after) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" return asdict(self) class Profiler: """ Global profiler for tracking performance metrics across the pipeline. Thread-safe and designed for use in multi-threaded environments. """ _instance: Optional["Profiler"] = None _lock = threading.Lock() def __init__(self, max_entries: int = 10000, enabled: bool = True): """ Args: max_entries: Maximum number of entries to keep in memory enabled: Whether profiling is enabled """ self.enabled = enabled self.max_entries = max_entries # Thread-safe storage self._lock = threading.Lock() self.entries: deque = deque(maxlen=max_entries) self.stage_stats: Dict[str, StageStats] = {} self.function_stats: Dict[str, StageStats] = {} # Hot path tracking (most time-consuming operations) self.hot_paths: List[Dict[str, Any]] = [] # Current active operations (for nested profiling) self.active_operations: Dict[int, List[str]] = defaultdict(list) # thread_id -> stack # System metrics self.system_metrics: List[Dict[str, Any]] = [] self._last_system_check = 0.0 self._system_check_interval = 1.0 # Check every second @classmethod def get_instance(cls) -> "Profiler": """Get singleton instance.""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = cls() return cls._instance def reset(self): """Reset all profiling data.""" with self._lock: self.entries.clear() self.stage_stats.clear() self.function_stats.clear() self.hot_paths.clear() self.active_operations.clear() self.system_metrics.clear() def _get_memory_usage(self) -> Optional[float]: """Get current process memory usage in MB.""" if not HAS_PSUTIL: return None try: process = psutil.Process() return process.memory_info().rss / 1024 / 1024 # MB except Exception: return None def _get_gpu_memory_usage(self, device: str = "cuda:0") -> Optional[float]: """Get current GPU memory usage in MB.""" if not HAS_TORCH or not torch.cuda.is_available(): return None try: return torch.cuda.memory_allocated(device) / 1024 / 1024 # MB except Exception: return None def _update_system_metrics(self): """Update system-level metrics (CPU, memory, GPU).""" current_time = time.time() if current_time - self._last_system_check < self._system_check_interval: return self._last_system_check = current_time metrics = { "timestamp": current_time, "cpu_percent": None, "memory_percent": None, "gpu_memory_used": None, "gpu_memory_total": None, "gpu_utilization": None, } if HAS_PSUTIL: try: process = psutil.Process() metrics["cpu_percent"] = process.cpu_percent() metrics["memory_percent"] = process.memory_percent() except Exception: pass if HAS_TORCH and torch.cuda.is_available(): try: metrics["gpu_memory_used"] = torch.cuda.memory_allocated() / 1024 / 1024 # MB metrics["gpu_memory_total"] = ( torch.cuda.get_device_properties(0).total_memory / 1024 / 1024 ) # MB # GPU utilization requires nvidia-ml-py, skip for now except Exception: pass with self._lock: self.system_metrics.append(metrics) if len(self.system_metrics) > 1000: # Keep last 1000 samples self.system_metrics.pop(0) def start_operation(self, function_name: str, stage: str = "unknown", **metadata) -> str: """ Start profiling an operation. Returns: call_id: Unique identifier for this call """ if not self.enabled: return "" call_id = f"{function_name}_{time.time()}_{threading.get_ident()}" thread_id = threading.get_ident() entry = ProfileEntry( function_name=function_name, stage=stage, start_time=time.time(), end_time=0.0, duration=0.0, memory_before=self._get_memory_usage(), gpu_memory_before=self._get_gpu_memory_usage(), metadata=metadata, thread_id=thread_id, call_id=call_id, ) with self._lock: self.active_operations[thread_id].append(call_id) # Store entry temporarily (will be updated on end) # We'll store it in metadata for now entry.metadata["_temp_entry"] = entry self._update_system_metrics() return call_id def end_operation(self, call_id: str): """End profiling an operation.""" if not self.enabled or not call_id: return thread_id = threading.get_ident() # Find the entry (stored in active_operations) with self._lock: if ( thread_id in self.active_operations and call_id in self.active_operations[thread_id] ): # We need to reconstruct the entry # For now, we'll use a simpler approach: store in a dict pass # Simplified: create entry on end # In practice, we'd track the start entry # For now, we'll use a decorator-based approach instead self._update_system_metrics() def record(self, function_name: str, stage: str, duration: float, **metadata): """ Record a completed operation. Args: function_name: Name of the function stage: Pipeline stage (e.g., "gpu", "cpu", "data_loading") duration: Duration in seconds **metadata: Additional metadata """ if not self.enabled: return thread_id = threading.get_ident() entry = ProfileEntry( function_name=function_name, stage=stage, start_time=time.time() - duration, end_time=time.time(), duration=duration, memory_before=self._get_memory_usage(), memory_after=self._get_memory_usage(), gpu_memory_before=self._get_gpu_memory_usage(), gpu_memory_after=self._get_gpu_memory_usage(), metadata=metadata, thread_id=thread_id, call_id=f"{function_name}_{time.time()}_{thread_id}", ) with self._lock: self.entries.append(entry) # Update stage statistics if stage not in self.stage_stats: self.stage_stats[stage] = StageStats(stage_name=stage) self.stage_stats[stage].update(entry) # Update function statistics if function_name not in self.function_stats: self.function_stats[function_name] = StageStats(stage_name=function_name) self.function_stats[function_name].update(entry) # Update hot paths (top N by total time) self._update_hot_paths() self._update_system_metrics() def _update_hot_paths(self): """Update hot paths list (top operations by total time).""" with self._lock: # Aggregate by function name function_totals: Dict[str, float] = defaultdict(float) for entry in self.entries: function_totals[entry.function_name] += entry.duration # Sort by total time sorted_functions = sorted(function_totals.items(), key=lambda x: x[1], reverse=True) self.hot_paths = [ { "function": func, "total_time": total, "call_count": self.function_stats.get(func, StageStats(func)).call_count, "avg_time": self.function_stats.get(func, StageStats(func)).avg_time, } for func, total in sorted_functions[:20] # Top 20 ] def get_metrics(self) -> Dict[str, Any]: """Get all profiling metrics.""" with self._lock: return { "enabled": self.enabled, "total_entries": len(self.entries), "stage_stats": { stage: stats.to_dict() for stage, stats in self.stage_stats.items() }, "function_stats": { func: stats.to_dict() for func, stats in self.function_stats.items() }, "hot_paths": self.hot_paths, "system_metrics": ( self.system_metrics[-100:] if self.system_metrics else [] ), # Last 100 samples } def get_stage_stats(self, stage: str) -> Optional[Dict[str, Any]]: """Get statistics for a specific stage.""" with self._lock: if stage in self.stage_stats: return self.stage_stats[stage].to_dict() return None def get_latency_breakdown(self) -> Dict[str, Any]: """Get latency breakdown by stage.""" with self._lock: breakdown = {} total_time = sum(stats.total_time for stats in self.stage_stats.values()) for stage, stats in self.stage_stats.items(): percentage = (stats.total_time / total_time * 100) if total_time > 0 else 0 breakdown[stage] = { "total_time": stats.total_time, "avg_time": stats.avg_time, "call_count": stats.call_count, "percentage": percentage, } return { "total_time": total_time, "breakdown": breakdown, } def profile(stage: str = "unknown", **metadata): """ Decorator for profiling functions. Usage: @profile(stage="gpu") def my_function(): ... """ def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs): profiler = Profiler.get_instance() if not profiler.enabled: return func(*args, **kwargs) start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time profiler.record( function_name=func.__name__, stage=stage, duration=duration, **metadata ) return result except Exception as e: duration = time.time() - start_time profiler.record( function_name=func.__name__, stage=stage, duration=duration, error=str(e), **metadata, ) raise return wrapper return decorator def profile_context(stage: str = "unknown", **metadata): """ Context manager for profiling code blocks. Usage: with profile_context(stage="gpu"): # code to profile """ class ProfileContext: def __init__(self, stage: str, **metadata): self.stage = stage self.metadata = metadata self.profiler = Profiler.get_instance() self.start_time = None def __enter__(self): if self.profiler.enabled: self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.profiler.enabled and self.start_time is not None: duration = time.time() - self.start_time # Get function name from stack import inspect frame = inspect.currentframe().f_back function_name = frame.f_code.co_name if frame else "unknown" self.profiler.record( function_name=function_name, stage=self.stage, duration=duration, **self.metadata, ) return False return ProfileContext(stage, **metadata)