Spaces:
Running
Running
| """ | |
| Advanced CPU Optimizer for training on CPU-only systems | |
| Optimized for maximum performance on limited hardware | |
| """ | |
| import os | |
| import logging | |
| import threading | |
| from typing import Dict, Any, Optional, List | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader | |
| import numpy as np | |
| from .memory_manager import AdvancedMemoryManager | |
| logger = logging.getLogger(__name__) | |
| class CPUOptimizer: | |
| """ | |
| Advanced CPU optimization for training and inference | |
| """ | |
| def __init__(self, memory_manager: AdvancedMemoryManager): | |
| """ | |
| Initialize CPU optimizer | |
| Args: | |
| memory_manager: Memory manager instance | |
| """ | |
| self.memory_manager = memory_manager | |
| self.cpu_count = os.cpu_count() | |
| self.optimizations_applied = [] | |
| # Apply initial optimizations | |
| self._apply_global_optimizations() | |
| logger.info(f"CPU Optimizer initialized for {self.cpu_count} cores") | |
| def _apply_global_optimizations(self): | |
| """Apply global CPU optimizations""" | |
| # Set optimal thread count for PyTorch | |
| optimal_threads = min(self.cpu_count, 8) # Cap at 8 for stability | |
| torch.set_num_threads(optimal_threads) | |
| self.optimizations_applied.append(f"PyTorch threads: {optimal_threads}") | |
| # Set thread count for inter-op parallelism | |
| torch.set_num_interop_threads(min(self.cpu_count // 2, 4)) | |
| self.optimizations_applied.append("Inter-op parallelism configured") | |
| # Enable Intel MKL optimizations if available | |
| try: | |
| import intel_extension_for_pytorch as ipex | |
| self.optimizations_applied.append("Intel Extension for PyTorch enabled") | |
| except ImportError: | |
| logger.warning("Intel Extension for PyTorch not available") | |
| # Set environment variables for CPU optimization | |
| os.environ['OMP_NUM_THREADS'] = str(optimal_threads) | |
| os.environ['MKL_NUM_THREADS'] = str(optimal_threads) | |
| os.environ['NUMEXPR_NUM_THREADS'] = str(optimal_threads) | |
| os.environ['OPENBLAS_NUM_THREADS'] = str(optimal_threads) | |
| self.optimizations_applied.append("Environment variables optimized") | |
| # Enable CPU-specific optimizations | |
| torch.backends.mkl.enabled = True | |
| torch.backends.mkldnn.enabled = True | |
| self.optimizations_applied.append("MKL and MKLDNN enabled") | |
| logger.info(f"Applied optimizations: {', '.join(self.optimizations_applied)}") | |
| def optimize_model(self, model: nn.Module, | |
| use_jit: bool = True, | |
| use_channels_last: bool = True) -> nn.Module: | |
| """ | |
| Optimize model for CPU inference/training | |
| Args: | |
| model: PyTorch model to optimize | |
| use_jit: Whether to use TorchScript JIT compilation | |
| use_channels_last: Whether to use channels-last memory format | |
| Returns: | |
| Optimized model | |
| """ | |
| with self.memory_manager.memory_context("optimize_model"): | |
| logger.info("Optimizing model for CPU") | |
| # Set model to CPU | |
| model = model.cpu() | |
| # Set to evaluation mode for optimization | |
| was_training = model.training | |
| model.eval() | |
| try: | |
| # Apply Intel Extension optimizations if available | |
| try: | |
| import intel_extension_for_pytorch as ipex | |
| model = ipex.optimize(model, dtype=torch.float32) | |
| logger.info("Applied Intel Extension optimizations") | |
| except ImportError: | |
| pass | |
| # Apply channels-last memory format for conv models | |
| if use_channels_last and self._has_conv_layers(model): | |
| model = model.to(memory_format=torch.channels_last) | |
| logger.info("Applied channels-last memory format") | |
| # Apply TorchScript JIT compilation | |
| if use_jit: | |
| try: | |
| # Create dummy input for tracing | |
| dummy_input = self._create_dummy_input(model) | |
| if dummy_input is not None: | |
| model = torch.jit.trace(model, dummy_input) | |
| logger.info("Applied TorchScript JIT compilation") | |
| except Exception as e: | |
| logger.warning(f"JIT compilation failed: {e}") | |
| # Restore training mode if needed | |
| if was_training: | |
| model.train() | |
| return model | |
| except Exception as e: | |
| logger.error(f"Model optimization failed: {e}") | |
| return model | |
| def _has_conv_layers(self, model: nn.Module) -> bool: | |
| """Check if model has convolutional layers""" | |
| for module in model.modules(): | |
| if isinstance(module, (nn.Conv1d, nn.Conv2d, nn.Conv3d)): | |
| return True | |
| return False | |
| def _create_dummy_input(self, model: nn.Module) -> Optional[torch.Tensor]: | |
| """Create dummy input for model tracing""" | |
| try: | |
| # Try to infer input shape from model | |
| for name, param in model.named_parameters(): | |
| if 'embedding' in name.lower() and param.dim() == 2: | |
| # Text model - create token input | |
| vocab_size = param.shape[0] | |
| return torch.randint(0, min(vocab_size, 1000), (1, 32)) | |
| elif 'conv' in name.lower() and param.dim() == 4: | |
| # Vision model - create image input | |
| channels = param.shape[1] | |
| return torch.randn(1, channels, 224, 224) | |
| # Default fallback | |
| return torch.randn(1, 512) | |
| except Exception: | |
| return None | |
| def optimize_dataloader(self, dataloader: DataLoader) -> DataLoader: | |
| """ | |
| Optimize DataLoader for CPU training | |
| Args: | |
| dataloader: Original DataLoader | |
| Returns: | |
| Optimized DataLoader | |
| """ | |
| # Calculate optimal number of workers | |
| optimal_workers = min(self.cpu_count // 2, 4) | |
| # Create new DataLoader with optimized settings | |
| optimized_loader = DataLoader( | |
| dataloader.dataset, | |
| batch_size=dataloader.batch_size, | |
| shuffle=dataloader.drop_last if hasattr(dataloader, 'drop_last') else False, | |
| num_workers=optimal_workers, | |
| pin_memory=False, # Not needed for CPU | |
| persistent_workers=True if optimal_workers > 0 else False, | |
| prefetch_factor=2 if optimal_workers > 0 else 2, | |
| ) | |
| logger.info(f"Optimized DataLoader with {optimal_workers} workers") | |
| return optimized_loader | |
| def optimize_optimizer(self, optimizer: optim.Optimizer, | |
| model: nn.Module) -> optim.Optimizer: | |
| """ | |
| Optimize optimizer settings for CPU training | |
| Args: | |
| optimizer: PyTorch optimizer | |
| model: Model being optimized | |
| Returns: | |
| Optimized optimizer | |
| """ | |
| # Apply gradient clipping | |
| for param_group in optimizer.param_groups: | |
| if 'weight_decay' not in param_group: | |
| param_group['weight_decay'] = 0.01 | |
| logger.info("Applied optimizer optimizations") | |
| return optimizer | |
| def enable_mixed_precision(self) -> bool: | |
| """ | |
| Enable mixed precision training for CPU (if supported) | |
| Returns: | |
| Whether mixed precision was enabled | |
| """ | |
| try: | |
| # Check if CPU supports mixed precision | |
| if torch.cpu.amp.autocast is not None: | |
| logger.info("CPU mixed precision available") | |
| return True | |
| except AttributeError: | |
| pass | |
| logger.warning("CPU mixed precision not available") | |
| return False | |
| def optimize_batch_size(self, base_batch_size: int, | |
| model_size_mb: float) -> int: | |
| """ | |
| Calculate optimal batch size based on available memory | |
| Args: | |
| base_batch_size: Base batch size to start from | |
| model_size_mb: Model size in MB | |
| Returns: | |
| Optimized batch size | |
| """ | |
| memory_info = self.memory_manager.get_memory_info() | |
| available_memory_mb = memory_info['system_memory_available_gb'] * 1024 | |
| # Reserve memory for model and overhead | |
| usable_memory_mb = available_memory_mb - model_size_mb - 2000 # 2GB overhead | |
| # Estimate memory per sample (rough approximation) | |
| memory_per_sample_mb = model_size_mb * 0.1 # 10% of model size per sample | |
| if memory_per_sample_mb > 0: | |
| max_batch_size = int(usable_memory_mb / memory_per_sample_mb) | |
| optimal_batch_size = min(base_batch_size, max_batch_size, 32) # Cap at 32 | |
| else: | |
| optimal_batch_size = min(base_batch_size, 8) # Conservative fallback | |
| optimal_batch_size = max(1, optimal_batch_size) # At least 1 | |
| logger.info(f"Optimized batch size: {optimal_batch_size} (was {base_batch_size})") | |
| return optimal_batch_size | |
| def get_performance_recommendations(self, model: nn.Module) -> List[str]: | |
| """ | |
| Get performance recommendations for the current setup | |
| Args: | |
| model: Model to analyze | |
| Returns: | |
| List of recommendations | |
| """ | |
| recommendations = [] | |
| # Check model size | |
| param_count = sum(p.numel() for p in model.parameters()) | |
| model_size_mb = param_count * 4 / (1024**2) # Assume float32 | |
| if model_size_mb > 2000: # > 2GB | |
| recommendations.append("Consider using model sharding for large models") | |
| recommendations.append("Use gradient checkpointing to reduce memory usage") | |
| # Check CPU utilization | |
| if self.cpu_count > 8: | |
| recommendations.append("Consider using distributed training across CPU cores") | |
| # Check memory | |
| memory_info = self.memory_manager.get_memory_info() | |
| if memory_info['system_memory_percent'] > 80: | |
| recommendations.append("Reduce batch size to lower memory usage") | |
| recommendations.append("Enable gradient accumulation instead of large batches") | |
| # Check for optimization opportunities | |
| if not any('Intel Extension' in opt for opt in self.optimizations_applied): | |
| recommendations.append("Install Intel Extension for PyTorch for better CPU performance") | |
| return recommendations | |
| def benchmark_performance(self, model: nn.Module, | |
| input_shape: tuple, | |
| num_iterations: int = 100) -> Dict[str, float]: | |
| """ | |
| Benchmark model performance | |
| Args: | |
| model: Model to benchmark | |
| input_shape: Input tensor shape | |
| num_iterations: Number of iterations to run | |
| Returns: | |
| Performance metrics | |
| """ | |
| model.eval() | |
| dummy_input = torch.randn(*input_shape) | |
| # Warmup | |
| with torch.no_grad(): | |
| for _ in range(10): | |
| _ = model(dummy_input) | |
| # Benchmark | |
| import time | |
| start_time = time.time() | |
| with torch.no_grad(): | |
| for _ in range(num_iterations): | |
| _ = model(dummy_input) | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| avg_time_per_inference = total_time / num_iterations | |
| throughput = 1.0 / avg_time_per_inference | |
| return { | |
| 'total_time_seconds': total_time, | |
| 'avg_time_per_inference_ms': avg_time_per_inference * 1000, | |
| 'throughput_inferences_per_second': throughput, | |
| 'iterations': num_iterations | |
| } | |