| | """ |
| | Memory Management Module |
| | Handles memory cleanup, monitoring, and GPU resource management |
| | """ |
| |
|
| | import gc |
| | import os |
| | import psutil |
| | import torch |
| | import time |
| | import logging |
| | import threading |
| | from typing import Dict, Any, Optional, Callable |
| | from exceptions import MemoryError, ResourceExhaustionError |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class MemoryManager: |
| | """ |
| | Comprehensive memory management for video processing applications |
| | """ |
| | |
| | def __init__(self, device: torch.device, memory_limit_gb: Optional[float] = None): |
| | self.device = device |
| | self.gpu_available = device.type in ['cuda', 'mps'] |
| | self.memory_limit_gb = memory_limit_gb |
| | self.cleanup_callbacks = [] |
| | self.monitoring_active = False |
| | self.monitoring_thread = None |
| | self.stats = { |
| | 'cleanup_count': 0, |
| | 'peak_memory_usage': 0.0, |
| | 'total_allocated': 0.0, |
| | 'total_freed': 0.0 |
| | } |
| | |
| | |
| | self._initialize_memory_limits() |
| | logger.info(f"MemoryManager initialized for device: {device}") |
| | |
| | def _initialize_memory_limits(self): |
| | """Initialize memory limits based on device and system""" |
| | if self.device.type == 'cuda': |
| | try: |
| | device_idx = self.device.index or 0 |
| | device_props = torch.cuda.get_device_properties(device_idx) |
| | total_memory_gb = device_props.total_memory / (1024**3) |
| | |
| | |
| | if self.memory_limit_gb is None: |
| | self.memory_limit_gb = total_memory_gb * 0.8 |
| | |
| | logger.info(f"CUDA memory limit set to {self.memory_limit_gb:.1f}GB " |
| | f"(total: {total_memory_gb:.1f}GB)") |
| | |
| | except Exception as e: |
| | logger.warning(f"Could not get CUDA memory info: {e}") |
| | self.memory_limit_gb = 4.0 |
| | |
| | elif self.device.type == 'mps': |
| | |
| | system_memory_gb = psutil.virtual_memory().total / (1024**3) |
| | if self.memory_limit_gb is None: |
| | |
| | self.memory_limit_gb = system_memory_gb * 0.5 |
| | |
| | logger.info(f"MPS memory limit set to {self.memory_limit_gb:.1f}GB " |
| | f"(system: {system_memory_gb:.1f}GB)") |
| | |
| | else: |
| | system_memory_gb = psutil.virtual_memory().total / (1024**3) |
| | if self.memory_limit_gb is None: |
| | |
| | self.memory_limit_gb = system_memory_gb * 0.6 |
| | |
| | logger.info(f"CPU memory limit set to {self.memory_limit_gb:.1f}GB " |
| | f"(system: {system_memory_gb:.1f}GB)") |
| | |
| | def get_memory_usage(self) -> Dict[str, Any]: |
| | """Get comprehensive memory usage statistics""" |
| | usage = { |
| | 'device_type': self.device.type, |
| | 'memory_limit_gb': self.memory_limit_gb, |
| | 'timestamp': time.time() |
| | } |
| | |
| | try: |
| | if self.device.type == 'cuda': |
| | device_idx = self.device.index or 0 |
| | |
| | |
| | allocated = torch.cuda.memory_allocated(device_idx) |
| | reserved = torch.cuda.memory_reserved(device_idx) |
| | total = torch.cuda.get_device_properties(device_idx).total_memory |
| | |
| | usage.update({ |
| | 'gpu_allocated_gb': allocated / (1024**3), |
| | 'gpu_reserved_gb': reserved / (1024**3), |
| | 'gpu_total_gb': total / (1024**3), |
| | 'gpu_utilization_percent': (allocated / total) * 100, |
| | 'gpu_reserved_percent': (reserved / total) * 100, |
| | 'gpu_free_gb': (total - reserved) / (1024**3) |
| | }) |
| | |
| | |
| | max_allocated = torch.cuda.max_memory_allocated(device_idx) |
| | max_reserved = torch.cuda.max_memory_reserved(device_idx) |
| | usage.update({ |
| | 'gpu_max_allocated_gb': max_allocated / (1024**3), |
| | 'gpu_max_reserved_gb': max_reserved / (1024**3) |
| | }) |
| | |
| | elif self.device.type == 'mps': |
| | |
| | |
| | vm = psutil.virtual_memory() |
| | usage.update({ |
| | 'system_memory_gb': vm.total / (1024**3), |
| | 'system_available_gb': vm.available / (1024**3), |
| | 'system_used_gb': vm.used / (1024**3), |
| | 'system_utilization_percent': vm.percent |
| | }) |
| | |
| | except Exception as e: |
| | logger.warning(f"Error getting GPU memory usage: {e}") |
| | |
| | |
| | try: |
| | vm = psutil.virtual_memory() |
| | swap = psutil.swap_memory() |
| | |
| | usage.update({ |
| | 'system_total_gb': vm.total / (1024**3), |
| | 'system_available_gb': vm.available / (1024**3), |
| | 'system_used_gb': vm.used / (1024**3), |
| | 'system_percent': vm.percent, |
| | 'swap_total_gb': swap.total / (1024**3), |
| | 'swap_used_gb': swap.used / (1024**3), |
| | 'swap_percent': swap.percent |
| | }) |
| | |
| | except Exception as e: |
| | logger.warning(f"Error getting system memory usage: {e}") |
| | |
| | |
| | try: |
| | process = psutil.Process() |
| | memory_info = process.memory_info() |
| | usage.update({ |
| | 'process_rss_gb': memory_info.rss / (1024**3), |
| | 'process_vms_gb': memory_info.vms / (1024**3), |
| | }) |
| | |
| | except Exception as e: |
| | logger.warning(f"Error getting process memory usage: {e}") |
| | |
| | |
| | current_usage = usage.get('gpu_allocated_gb', usage.get('system_used_gb', 0)) |
| | if current_usage > self.stats['peak_memory_usage']: |
| | self.stats['peak_memory_usage'] = current_usage |
| | |
| | return usage |
| | |
| | def cleanup_basic(self): |
| | """Basic memory cleanup - lightweight operation""" |
| | try: |
| | gc.collect() |
| | |
| | if self.device.type == 'cuda': |
| | torch.cuda.empty_cache() |
| | |
| | self.stats['cleanup_count'] += 1 |
| | logger.debug("Basic memory cleanup completed") |
| | |
| | except Exception as e: |
| | logger.warning(f"Basic memory cleanup failed: {e}") |
| | |
| | def cleanup_aggressive(self): |
| | """Aggressive memory cleanup - more thorough but slower""" |
| | try: |
| | start_time = time.time() |
| | |
| | |
| | for callback in self.cleanup_callbacks: |
| | try: |
| | callback() |
| | except Exception as e: |
| | logger.warning(f"Cleanup callback failed: {e}") |
| | |
| | |
| | for _ in range(3): |
| | gc.collect() |
| | |
| | if self.device.type == 'cuda': |
| | |
| | torch.cuda.empty_cache() |
| | torch.cuda.synchronize() |
| | |
| | |
| | device_idx = self.device.index or 0 |
| | torch.cuda.reset_peak_memory_stats(device_idx) |
| | |
| | elif self.device.type == 'mps': |
| | |
| | |
| | pass |
| | |
| | cleanup_time = time.time() - start_time |
| | self.stats['cleanup_count'] += 1 |
| | |
| | logger.debug(f"Aggressive memory cleanup completed in {cleanup_time:.2f}s") |
| | |
| | except Exception as e: |
| | logger.error(f"Aggressive memory cleanup failed: {e}") |
| | raise MemoryError("aggressive_cleanup", str(e)) |
| | |
| | def check_memory_pressure(self, threshold_percent: float = 85.0) -> Dict[str, Any]: |
| | """Check if system is under memory pressure""" |
| | usage = self.get_memory_usage() |
| | |
| | pressure_info = { |
| | 'under_pressure': False, |
| | 'pressure_level': 'normal', |
| | 'recommendations': [], |
| | 'usage_percent': 0.0 |
| | } |
| | |
| | |
| | if self.device.type == 'cuda': |
| | usage_percent = usage.get('gpu_utilization_percent', 0) |
| | pressure_info['usage_percent'] = usage_percent |
| | |
| | if usage_percent >= threshold_percent: |
| | pressure_info['under_pressure'] = True |
| | |
| | if usage_percent >= 95: |
| | pressure_info['pressure_level'] = 'critical' |
| | pressure_info['recommendations'].extend([ |
| | 'Reduce batch size immediately', |
| | 'Enable gradient checkpointing', |
| | 'Consider switching to CPU processing' |
| | ]) |
| | elif usage_percent >= threshold_percent: |
| | pressure_info['pressure_level'] = 'warning' |
| | pressure_info['recommendations'].extend([ |
| | 'Run aggressive memory cleanup', |
| | 'Reduce keyframe interval', |
| | 'Monitor memory usage closely' |
| | ]) |
| | |
| | else: |
| | usage_percent = usage.get('system_percent', 0) |
| | pressure_info['usage_percent'] = usage_percent |
| | |
| | if usage_percent >= threshold_percent: |
| | pressure_info['under_pressure'] = True |
| | |
| | if usage_percent >= 95: |
| | pressure_info['pressure_level'] = 'critical' |
| | pressure_info['recommendations'].extend([ |
| | 'Free system memory immediately', |
| | 'Close unnecessary applications', |
| | 'Reduce video processing quality' |
| | ]) |
| | elif usage_percent >= threshold_percent: |
| | pressure_info['pressure_level'] = 'warning' |
| | pressure_info['recommendations'].extend([ |
| | 'Run memory cleanup', |
| | 'Monitor system memory', |
| | 'Consider processing in smaller chunks' |
| | ]) |
| | |
| | return pressure_info |
| | |
| | def auto_cleanup_if_needed(self, pressure_threshold: float = 80.0) -> bool: |
| | """Automatically run cleanup if memory pressure is detected""" |
| | pressure = self.check_memory_pressure(pressure_threshold) |
| | |
| | if pressure['under_pressure']: |
| | cleanup_method = ( |
| | self.cleanup_aggressive |
| | if pressure['pressure_level'] == 'critical' |
| | else self.cleanup_basic |
| | ) |
| | |
| | logger.info(f"Auto-cleanup triggered due to {pressure['pressure_level']} " |
| | f"memory pressure ({pressure['usage_percent']:.1f}%)") |
| | |
| | cleanup_method() |
| | return True |
| | |
| | return False |
| | |
| | def register_cleanup_callback(self, callback: Callable): |
| | """Register a callback to run during cleanup operations""" |
| | self.cleanup_callbacks.append(callback) |
| | logger.debug("Cleanup callback registered") |
| | |
| | def start_monitoring(self, interval_seconds: float = 30.0, |
| | pressure_callback: Optional[Callable] = None): |
| | """Start background memory monitoring""" |
| | if self.monitoring_active: |
| | logger.warning("Memory monitoring already active") |
| | return |
| | |
| | self.monitoring_active = True |
| | |
| | def monitor_loop(): |
| | while self.monitoring_active: |
| | try: |
| | pressure = self.check_memory_pressure() |
| | |
| | if pressure['under_pressure']: |
| | logger.warning(f"Memory pressure detected: {pressure['pressure_level']} " |
| | f"({pressure['usage_percent']:.1f}%)") |
| | |
| | if pressure_callback: |
| | try: |
| | pressure_callback(pressure) |
| | except Exception as e: |
| | logger.error(f"Pressure callback failed: {e}") |
| | |
| | |
| | if pressure['pressure_level'] == 'critical': |
| | self.cleanup_aggressive() |
| | |
| | time.sleep(interval_seconds) |
| | |
| | except Exception as e: |
| | logger.error(f"Memory monitoring error: {e}") |
| | time.sleep(interval_seconds) |
| | |
| | self.monitoring_thread = threading.Thread(target=monitor_loop, daemon=True) |
| | self.monitoring_thread.start() |
| | |
| | logger.info(f"Memory monitoring started (interval: {interval_seconds}s)") |
| | |
| | def stop_monitoring(self): |
| | """Stop background memory monitoring""" |
| | if self.monitoring_active: |
| | self.monitoring_active = False |
| | if self.monitoring_thread and self.monitoring_thread.is_alive(): |
| | self.monitoring_thread.join(timeout=5.0) |
| | logger.info("Memory monitoring stopped") |
| | |
| | def estimate_memory_requirement(self, video_width: int, video_height: int, |
| | frames_in_memory: int = 5) -> Dict[str, float]: |
| | """Estimate memory requirements for video processing""" |
| | |
| | |
| | bytes_per_frame = video_width * video_height * 3 |
| | |
| | |
| | overhead_multiplier = 3.0 |
| | |
| | estimated_memory = { |
| | 'frames_memory_gb': (bytes_per_frame * frames_in_memory * overhead_multiplier) / (1024**3), |
| | 'model_memory_gb': 4.0, |
| | 'system_overhead_gb': 2.0, |
| | 'total_estimated_gb': 0.0 |
| | } |
| | |
| | estimated_memory['total_estimated_gb'] = sum([ |
| | estimated_memory['frames_memory_gb'], |
| | estimated_memory['model_memory_gb'], |
| | estimated_memory['system_overhead_gb'] |
| | ]) |
| | |
| | return estimated_memory |
| | |
| | def can_process_video(self, video_width: int, video_height: int, |
| | frames_in_memory: int = 5) -> Dict[str, Any]: |
| | """Check if video can be processed with current memory""" |
| | |
| | estimate = self.estimate_memory_requirement(video_width, video_height, frames_in_memory) |
| | current_usage = self.get_memory_usage() |
| | |
| | |
| | if self.device.type == 'cuda': |
| | available_memory = current_usage.get('gpu_free_gb', 0) |
| | else: |
| | available_memory = current_usage.get('system_available_gb', 0) |
| | |
| | can_process = estimate['total_estimated_gb'] <= available_memory |
| | |
| | result = { |
| | 'can_process': can_process, |
| | 'estimated_memory_gb': estimate['total_estimated_gb'], |
| | 'available_memory_gb': available_memory, |
| | 'memory_margin_gb': available_memory - estimate['total_estimated_gb'], |
| | 'recommendations': [] |
| | } |
| | |
| | if not can_process: |
| | deficit = estimate['total_estimated_gb'] - available_memory |
| | result['recommendations'] = [ |
| | f"Free {deficit:.1f}GB of memory", |
| | "Reduce video resolution", |
| | "Process in smaller chunks", |
| | "Use lower quality settings" |
| | ] |
| | elif result['memory_margin_gb'] < 1.0: |
| | result['recommendations'] = [ |
| | "Memory margin is low", |
| | "Monitor memory usage during processing", |
| | "Consider reducing batch size" |
| | ] |
| | |
| | return result |
| | |
| | def get_optimization_suggestions(self) -> Dict[str, Any]: |
| | """Get memory optimization suggestions based on current state""" |
| | usage = self.get_memory_usage() |
| | |
| | suggestions = { |
| | 'current_usage_percent': usage.get('gpu_utilization_percent', usage.get('system_percent', 0)), |
| | 'suggestions': [], |
| | 'priority': 'low' |
| | } |
| | |
| | usage_percent = suggestions['current_usage_percent'] |
| | |
| | if usage_percent >= 90: |
| | suggestions['priority'] = 'high' |
| | suggestions['suggestions'].extend([ |
| | 'Run aggressive memory cleanup immediately', |
| | 'Reduce batch size to 1', |
| | 'Enable gradient checkpointing if available', |
| | 'Consider switching to CPU processing' |
| | ]) |
| | elif usage_percent >= 75: |
| | suggestions['priority'] = 'medium' |
| | suggestions['suggestions'].extend([ |
| | 'Run memory cleanup regularly', |
| | 'Monitor memory usage closely', |
| | 'Reduce keyframe interval', |
| | 'Use mixed precision if supported' |
| | ]) |
| | elif usage_percent >= 50: |
| | suggestions['priority'] = 'low' |
| | suggestions['suggestions'].extend([ |
| | 'Current usage is acceptable', |
| | 'Regular cleanup should be sufficient', |
| | 'Monitor for memory leaks during long operations' |
| | ]) |
| | else: |
| | suggestions['suggestions'] = [ |
| | 'Memory usage is optimal', |
| | 'No immediate action required' |
| | ] |
| | |
| | return suggestions |
| | |
| | def get_stats(self) -> Dict[str, Any]: |
| | """Get memory management statistics""" |
| | return { |
| | 'cleanup_count': self.stats['cleanup_count'], |
| | 'peak_memory_usage_gb': self.stats['peak_memory_usage'], |
| | 'monitoring_active': self.monitoring_active, |
| | 'device_type': self.device.type, |
| | 'memory_limit_gb': self.memory_limit_gb, |
| | 'registered_callbacks': len(self.cleanup_callbacks) |
| | } |
| | |
| | def __del__(self): |
| | """Cleanup when MemoryManager is destroyed""" |
| | try: |
| | self.stop_monitoring() |
| | self.cleanup_aggressive() |
| | except Exception: |
| | pass |