| | """ |
| | Device Management Module |
| | Handles hardware detection, optimization, and device switching |
| | """ |
| |
|
| | import torch |
| | import logging |
| | import platform |
| | import subprocess |
| | from typing import Optional, Dict, Any, List |
| | from exceptions import DeviceError |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class DeviceManager: |
| | """ |
| | Manages device detection, validation, and optimization for video processing |
| | """ |
| | |
| | def __init__(self): |
| | self._optimal_device = None |
| | self._device_info = {} |
| | self._cuda_tested = False |
| | self._mps_tested = False |
| | self._initialize_device_info() |
| | |
| | def _initialize_device_info(self): |
| | """Initialize comprehensive device information""" |
| | self._device_info = { |
| | 'platform': platform.system(), |
| | 'python_version': platform.python_version(), |
| | 'pytorch_version': torch.__version__, |
| | 'cuda_available': torch.cuda.is_available(), |
| | 'cuda_version': torch.version.cuda if torch.cuda.is_available() else None, |
| | 'mps_available': self._check_mps_availability(), |
| | 'cpu_count': torch.get_num_threads(), |
| | } |
| | |
| | if self._device_info['cuda_available']: |
| | self._device_info.update(self._get_cuda_info()) |
| | |
| | if self._device_info['mps_available']: |
| | self._device_info.update(self._get_mps_info()) |
| | |
| | logger.debug(f"Device info initialized: {self._device_info}") |
| | |
| | def _check_mps_availability(self) -> bool: |
| | """Check if Metal Performance Shaders (MPS) is available on macOS""" |
| | try: |
| | if platform.system() == 'Darwin': |
| | return hasattr(torch.backends, 'mps') and torch.backends.mps.is_available() |
| | except Exception: |
| | pass |
| | return False |
| | |
| | def _get_cuda_info(self) -> Dict[str, Any]: |
| | """Get detailed CUDA information""" |
| | cuda_info = {} |
| | try: |
| | if torch.cuda.is_available(): |
| | cuda_info.update({ |
| | 'cuda_device_count': torch.cuda.device_count(), |
| | 'cuda_current_device': torch.cuda.current_device(), |
| | 'cuda_devices': [] |
| | }) |
| | |
| | for i in range(torch.cuda.device_count()): |
| | device_props = torch.cuda.get_device_properties(i) |
| | device_info = { |
| | 'index': i, |
| | 'name': device_props.name, |
| | 'memory_total_gb': device_props.total_memory / (1024**3), |
| | 'memory_total_mb': device_props.total_memory / (1024**2), |
| | 'multiprocessor_count': device_props.multiprocessor_count, |
| | 'compute_capability': f"{device_props.major}.{device_props.minor}" |
| | } |
| | |
| | |
| | try: |
| | memory_allocated = torch.cuda.memory_allocated(i) / (1024**3) |
| | memory_reserved = torch.cuda.memory_reserved(i) / (1024**3) |
| | device_info.update({ |
| | 'memory_allocated_gb': memory_allocated, |
| | 'memory_reserved_gb': memory_reserved, |
| | 'memory_free_gb': device_info['memory_total_gb'] - memory_reserved |
| | }) |
| | except Exception as e: |
| | logger.warning(f"Could not get memory info for CUDA device {i}: {e}") |
| | |
| | cuda_info['cuda_devices'].append(device_info) |
| | |
| | except Exception as e: |
| | logger.error(f"Error getting CUDA info: {e}") |
| | |
| | return cuda_info |
| | |
| | def _get_mps_info(self) -> Dict[str, Any]: |
| | """Get Metal Performance Shaders information""" |
| | mps_info = {} |
| | try: |
| | if self._device_info['mps_available']: |
| | |
| | try: |
| | result = subprocess.run(['sysctl', 'hw.memsize'], |
| | capture_output=True, text=True, timeout=5) |
| | if result.returncode == 0: |
| | memory_bytes = int(result.stdout.split(':')[1].strip()) |
| | mps_info['mps_system_memory_gb'] = memory_bytes / (1024**3) |
| | except Exception as e: |
| | logger.warning(f"Could not get system memory info: {e}") |
| | |
| | mps_info['mps_device'] = 'Apple Silicon GPU' |
| | |
| | except Exception as e: |
| | logger.error(f"Error getting MPS info: {e}") |
| | |
| | return mps_info |
| | |
| | def get_optimal_device(self) -> torch.device: |
| | """ |
| | Get the optimal device for video processing with comprehensive testing |
| | """ |
| | if self._optimal_device is not None: |
| | return self._optimal_device |
| | |
| | logger.info("Determining optimal device for video processing...") |
| | |
| | |
| | if self._device_info['cuda_available'] and not self._cuda_tested: |
| | cuda_device = self._test_cuda_device() |
| | if cuda_device is not None: |
| | self._optimal_device = cuda_device |
| | logger.info(f"Selected CUDA device: {self._get_device_name(cuda_device)}") |
| | return self._optimal_device |
| | |
| | |
| | if self._device_info['mps_available'] and not self._mps_tested: |
| | mps_device = self._test_mps_device() |
| | if mps_device is not None: |
| | self._optimal_device = mps_device |
| | logger.info(f"Selected MPS device: {self._get_device_name(mps_device)}") |
| | return self._optimal_device |
| | |
| | |
| | self._optimal_device = torch.device("cpu") |
| | logger.info("Using CPU device (no suitable GPU found or GPU tests failed)") |
| | return self._optimal_device |
| | |
| | def _test_cuda_device(self) -> Optional[torch.device]: |
| | """Test CUDA device functionality""" |
| | self._cuda_tested = True |
| | |
| | try: |
| | |
| | best_device_idx = 0 |
| | best_memory = 0 |
| | |
| | for device_info in self._device_info.get('cuda_devices', []): |
| | if device_info['memory_free_gb'] > best_memory: |
| | best_memory = device_info['memory_free_gb'] |
| | best_device_idx = device_info['index'] |
| | |
| | device = torch.device(f"cuda:{best_device_idx}") |
| | |
| | |
| | test_tensor = torch.tensor([1.0], device=device) |
| | result = test_tensor * 2 |
| | |
| | |
| | large_tensor = torch.randn(1000, 1000, device=device) |
| | del large_tensor, test_tensor, result |
| | torch.cuda.empty_cache() |
| | torch.cuda.synchronize() |
| | |
| | logger.info(f"CUDA device {best_device_idx} passed functionality tests") |
| | return device |
| | |
| | except Exception as e: |
| | logger.warning(f"CUDA device test failed: {e}") |
| | return None |
| | |
| | def _test_mps_device(self) -> Optional[torch.device]: |
| | """Test MPS device functionality""" |
| | self._mps_tested = True |
| | |
| | try: |
| | device = torch.device("mps") |
| | |
| | |
| | test_tensor = torch.tensor([1.0], device=device) |
| | result = test_tensor * 2 |
| | |
| | |
| | large_tensor = torch.randn(1000, 1000, device=device) |
| | del large_tensor, test_tensor, result |
| | |
| | |
| | logger.info("MPS device passed functionality tests") |
| | return device |
| | |
| | except Exception as e: |
| | logger.warning(f"MPS device test failed: {e}") |
| | return None |
| | |
| | def _get_device_name(self, device: torch.device) -> str: |
| | """Get human-readable device name""" |
| | if device.type == 'cuda': |
| | if self._device_info.get('cuda_devices'): |
| | device_idx = device.index or 0 |
| | for cuda_device in self._device_info['cuda_devices']: |
| | if cuda_device['index'] == device_idx: |
| | return cuda_device['name'] |
| | return f"CUDA Device {device.index or 0}" |
| | elif device.type == 'mps': |
| | return "Apple Silicon GPU (MPS)" |
| | else: |
| | return "CPU" |
| | |
| | def get_device_capabilities(self, device: Optional[torch.device] = None) -> Dict[str, Any]: |
| | """Get capabilities of the specified device""" |
| | if device is None: |
| | device = self.get_optimal_device() |
| | |
| | capabilities = { |
| | 'device_type': device.type, |
| | 'device_name': self._get_device_name(device), |
| | 'supports_mixed_precision': False, |
| | 'recommended_batch_size': 1, |
| | 'memory_efficiency': 'medium' |
| | } |
| | |
| | if device.type == 'cuda': |
| | device_idx = device.index or 0 |
| | for cuda_device in self._device_info.get('cuda_devices', []): |
| | if cuda_device['index'] == device_idx: |
| | |
| | compute_version = float(cuda_device.get('compute_capability', '0.0')) |
| | capabilities['supports_mixed_precision'] = compute_version >= 7.0 |
| | |
| | |
| | memory_gb = cuda_device.get('memory_free_gb', 0) |
| | if memory_gb >= 24: |
| | capabilities['recommended_batch_size'] = 4 |
| | capabilities['memory_efficiency'] = 'high' |
| | elif memory_gb >= 12: |
| | capabilities['recommended_batch_size'] = 2 |
| | capabilities['memory_efficiency'] = 'high' |
| | elif memory_gb >= 6: |
| | capabilities['recommended_batch_size'] = 1 |
| | capabilities['memory_efficiency'] = 'medium' |
| | else: |
| | capabilities['memory_efficiency'] = 'low' |
| | |
| | capabilities['memory_available_gb'] = memory_gb |
| | break |
| | |
| | elif device.type == 'mps': |
| | capabilities['supports_mixed_precision'] = True |
| | capabilities['memory_efficiency'] = 'high' |
| | system_memory = self._device_info.get('mps_system_memory_gb', 8) |
| | if system_memory >= 16: |
| | capabilities['recommended_batch_size'] = 2 |
| | capabilities['memory_available_gb'] = system_memory * 0.7 |
| | |
| | else: |
| | capabilities['memory_efficiency'] = 'low' |
| | capabilities['supports_mixed_precision'] = False |
| | |
| | return capabilities |
| | |
| | def switch_device(self, device_type: str) -> torch.device: |
| | """ |
| | Switch to a specific device type |
| | |
| | Args: |
| | device_type: 'cuda', 'mps', or 'cpu' |
| | """ |
| | try: |
| | if device_type.lower() == 'cuda': |
| | if not self._device_info['cuda_available']: |
| | raise DeviceError('cuda', 'CUDA not available on this system') |
| | |
| | device = self._test_cuda_device() |
| | if device is None: |
| | raise DeviceError('cuda', 'CUDA device failed functionality tests') |
| | |
| | elif device_type.lower() == 'mps': |
| | if not self._device_info['mps_available']: |
| | raise DeviceError('mps', 'MPS not available on this system') |
| | |
| | device = self._test_mps_device() |
| | if device is None: |
| | raise DeviceError('mps', 'MPS device failed functionality tests') |
| | |
| | elif device_type.lower() == 'cpu': |
| | device = torch.device('cpu') |
| | |
| | else: |
| | raise DeviceError('unknown', f'Unknown device type: {device_type}') |
| | |
| | self._optimal_device = device |
| | logger.info(f"Switched to device: {self._get_device_name(device)}") |
| | return device |
| | |
| | except DeviceError: |
| | raise |
| | except Exception as e: |
| | raise DeviceError(device_type, f"Failed to switch to {device_type}: {str(e)}") |
| | |
| | def get_available_devices(self) -> List[str]: |
| | """Get list of available device types""" |
| | devices = ['cpu'] |
| | |
| | if self._device_info['cuda_available']: |
| | devices.append('cuda') |
| | |
| | if self._device_info['mps_available']: |
| | devices.append('mps') |
| | |
| | return devices |
| | |
| | def get_device_status(self) -> Dict[str, Any]: |
| | """Get comprehensive device status""" |
| | current_device = self.get_optimal_device() |
| | |
| | status = { |
| | 'current_device': str(current_device), |
| | 'current_device_name': self._get_device_name(current_device), |
| | 'available_devices': self.get_available_devices(), |
| | 'device_info': self._device_info.copy(), |
| | 'capabilities': self.get_device_capabilities(current_device) |
| | } |
| | |
| | |
| | if current_device.type == 'cuda': |
| | try: |
| | device_idx = current_device.index or 0 |
| | status['current_memory_usage'] = { |
| | 'allocated_gb': torch.cuda.memory_allocated(device_idx) / (1024**3), |
| | 'reserved_gb': torch.cuda.memory_reserved(device_idx) / (1024**3), |
| | 'max_allocated_gb': torch.cuda.max_memory_allocated(device_idx) / (1024**3), |
| | 'max_reserved_gb': torch.cuda.max_memory_reserved(device_idx) / (1024**3) |
| | } |
| | except Exception as e: |
| | logger.warning(f"Could not get current memory usage: {e}") |
| | |
| | return status |
| | |
| | def optimize_for_processing(self) -> Dict[str, Any]: |
| | """Optimize device settings for video processing""" |
| | device = self.get_optimal_device() |
| | optimizations = { |
| | 'device': str(device), |
| | 'optimizations_applied': [] |
| | } |
| | |
| | try: |
| | if device.type == 'cuda': |
| | |
| | torch.backends.cudnn.benchmark = True |
| | optimizations['optimizations_applied'].append('cudnn_benchmark') |
| | |
| | |
| | |
| | |
| | |
| | |
| | optimizations['optimizations_applied'].append('cuda_memory_strategy') |
| | |
| | elif device.type == 'mps': |
| | |
| | optimizations['optimizations_applied'].append('mps_optimized') |
| | |
| | else: |
| | |
| | torch.set_num_threads(min(torch.get_num_threads(), 8)) |
| | optimizations['optimizations_applied'].append('cpu_thread_optimization') |
| | |
| | logger.info(f"Applied optimizations for {device}: {optimizations['optimizations_applied']}") |
| | |
| | except Exception as e: |
| | logger.warning(f"Some optimizations failed: {e}") |
| | optimizations['optimization_errors'] = str(e) |
| | |
| | return optimizations |
| | |
| | def cleanup_device_memory(self): |
| | """Clean up device memory""" |
| | device = self.get_optimal_device() |
| | |
| | if device.type == 'cuda': |
| | try: |
| | torch.cuda.empty_cache() |
| | torch.cuda.synchronize() |
| | logger.debug("CUDA memory cache cleared") |
| | except Exception as e: |
| | logger.warning(f"CUDA memory cleanup failed: {e}") |
| | |
| | elif device.type == 'mps': |
| | try: |
| | |
| | |
| | import gc |
| | gc.collect() |
| | logger.debug("MPS memory cleanup completed") |
| | except Exception as e: |
| | logger.warning(f"MPS memory cleanup failed: {e}") |
| | |
| | else: |
| | try: |
| | import gc |
| | gc.collect() |
| | logger.debug("CPU memory cleanup completed") |
| | except Exception as e: |
| | logger.warning(f"CPU memory cleanup failed: {e}") |