Spaces:
No application file
No application file
| import torch | |
| import gc | |
| import logging | |
| from typing import Dict, Any, Optional | |
| import psutil | |
| import os | |
| class GPUOptimizer: | |
| """Serviço para otimizações de GPU e gerenciamento de memória""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| self.device = self._get_optimal_device() | |
| self.memory_stats = {} | |
| def _get_optimal_device(self) -> str: | |
| """Determina o melhor dispositivo disponível""" | |
| if torch.cuda.is_available(): | |
| # Seleciona a GPU com mais memória livre | |
| gpu_count = torch.cuda.device_count() | |
| if gpu_count > 0: | |
| best_gpu = 0 | |
| max_free_memory = 0 | |
| for i in range(gpu_count): | |
| torch.cuda.set_device(i) | |
| free_memory = torch.cuda.get_device_properties(i).total_memory - torch.cuda.memory_allocated(i) | |
| if free_memory > max_free_memory: | |
| max_free_memory = free_memory | |
| best_gpu = i | |
| return f"cuda:{best_gpu}" | |
| self.logger.warning("CUDA não disponível, usando CPU") | |
| return "cpu" | |
| def setup_memory_efficient_training(self, model, config: Dict[str, Any]): | |
| """Configura o modelo para treinamento eficiente em memória""" | |
| optimizations_applied = [] | |
| # 1. Gradient Checkpointing | |
| if config.get('use_gradient_checkpointing', True): | |
| if hasattr(model, 'gradient_checkpointing_enable'): | |
| model.gradient_checkpointing_enable() | |
| optimizations_applied.append("gradient_checkpointing") | |
| self.logger.info("Gradient checkpointing habilitado") | |
| # 2. Mixed Precision | |
| mixed_precision = config.get('mixed_precision', 'fp16') | |
| if mixed_precision in ['fp16', 'bf16']: | |
| optimizations_applied.append(f"mixed_precision_{mixed_precision}") | |
| self.logger.info(f"Mixed precision configurado: {mixed_precision}") | |
| # 3. Model Quantization (se suportado) | |
| if config.get('use_8bit_quantization', False): | |
| optimizations_applied.append("8bit_quantization") | |
| self.logger.info("Quantização 8-bit habilitada") | |
| return optimizations_applied | |
| def get_8bit_optimizer(self, optimizer_class, model_parameters, **kwargs): | |
| """Retorna um otimizador 8-bit para economia de memória""" | |
| try: | |
| import bitsandbytes as bnb | |
| # Mapeia otimizadores padrão para versões 8-bit | |
| optimizer_mapping = { | |
| 'AdamW': bnb.optim.AdamW8bit, | |
| 'Adam': bnb.optim.Adam8bit, | |
| 'SGD': bnb.optim.SGD8bit, | |
| } | |
| optimizer_name = optimizer_class.__name__ | |
| if optimizer_name in optimizer_mapping: | |
| optimizer_8bit = optimizer_mapping[optimizer_name] | |
| self.logger.info(f"Usando otimizador 8-bit: {optimizer_name}") | |
| return optimizer_8bit(model_parameters, **kwargs) | |
| else: | |
| self.logger.warning(f"Otimizador 8-bit não disponível para {optimizer_name}, usando versão padrão") | |
| return optimizer_class(model_parameters, **kwargs) | |
| except ImportError: | |
| self.logger.warning("bitsandbytes não disponível, usando otimizador padrão") | |
| return optimizer_class(model_parameters, **kwargs) | |
| def optimize_batch_size(self, base_batch_size: int, model_size_mb: float) -> int: | |
| """Otimiza o tamanho do batch baseado na memória disponível""" | |
| if self.device == "cpu": | |
| # Para CPU, limita baseado na RAM disponível | |
| available_ram_gb = psutil.virtual_memory().available / (1024**3) | |
| # Heurística: 1GB de RAM por item do batch para modelos grandes | |
| max_batch_size = max(1, int(available_ram_gb / 2)) | |
| return min(base_batch_size, max_batch_size) | |
| # Para GPU | |
| if torch.cuda.is_available(): | |
| device_idx = int(self.device.split(':')[1]) if ':' in self.device else 0 | |
| total_memory = torch.cuda.get_device_properties(device_idx).total_memory | |
| allocated_memory = torch.cuda.memory_allocated(device_idx) | |
| free_memory = total_memory - allocated_memory | |
| # Heurística: reserva 20% da memória livre para overhead | |
| usable_memory = free_memory * 0.8 | |
| # Estima quantos itens do batch cabem na memória | |
| # Assume que cada item do batch usa aproximadamente model_size_mb * 3 (forward + backward + gradients) | |
| memory_per_item = model_size_mb * 3 * 1024 * 1024 # Converte para bytes | |
| max_batch_size = max(1, int(usable_memory / memory_per_item)) | |
| optimized_batch_size = min(base_batch_size, max_batch_size) | |
| self.logger.info(f"Batch size otimizado: {base_batch_size} -> {optimized_batch_size}") | |
| return optimized_batch_size | |
| return base_batch_size | |
| def clear_memory_cache(self): | |
| """Limpa cache de memória GPU e força garbage collection""" | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.synchronize() | |
| # Force garbage collection | |
| gc.collect() | |
| self.logger.info("Cache de memória limpo") | |
| def get_memory_usage(self) -> Dict[str, Any]: | |
| """Retorna estatísticas de uso de memória""" | |
| stats = { | |
| 'device': self.device, | |
| 'cpu_memory': { | |
| 'total_gb': psutil.virtual_memory().total / (1024**3), | |
| 'available_gb': psutil.virtual_memory().available / (1024**3), | |
| 'used_percent': psutil.virtual_memory().percent | |
| } | |
| } | |
| if torch.cuda.is_available() and self.device.startswith('cuda'): | |
| device_idx = int(self.device.split(':')[1]) if ':' in self.device else 0 | |
| stats['gpu_memory'] = { | |
| 'device_name': torch.cuda.get_device_name(device_idx), | |
| 'total_mb': torch.cuda.get_device_properties(device_idx).total_memory / (1024**2), | |
| 'allocated_mb': torch.cuda.memory_allocated(device_idx) / (1024**2), | |
| 'reserved_mb': torch.cuda.memory_reserved(device_idx) / (1024**2), | |
| 'free_mb': (torch.cuda.get_device_properties(device_idx).total_memory - | |
| torch.cuda.memory_reserved(device_idx)) / (1024**2) | |
| } | |
| else: | |
| stats['gpu_memory'] = { | |
| 'device_name': 'CPU', | |
| 'total_mb': 0, | |
| 'allocated_mb': 0, | |
| 'reserved_mb': 0, | |
| 'free_mb': 0 | |
| } | |
| return stats | |
| def estimate_training_memory(self, model_params: int, batch_size: int, sequence_length: int = 512) -> Dict[str, float]: | |
| """Estima o uso de memória para treinamento""" | |
| # Estimativas baseadas em heurísticas comuns | |
| # Memória do modelo (parâmetros + gradientes + estados do otimizador) | |
| param_memory_mb = model_params * 4 / (1024**2) # 4 bytes por parâmetro (float32) | |
| gradient_memory_mb = param_memory_mb # Gradientes têm o mesmo tamanho dos parâmetros | |
| optimizer_memory_mb = param_memory_mb * 2 # Adam mantém momentum e variance | |
| # Memória de ativações (depende do batch size e sequence length) | |
| activation_memory_mb = batch_size * sequence_length * 768 * 4 / (1024**2) # Estimativa para transformer | |
| total_memory_mb = param_memory_mb + gradient_memory_mb + optimizer_memory_mb + activation_memory_mb | |
| return { | |
| 'model_params_mb': param_memory_mb, | |
| 'gradients_mb': gradient_memory_mb, | |
| 'optimizer_states_mb': optimizer_memory_mb, | |
| 'activations_mb': activation_memory_mb, | |
| 'total_estimated_mb': total_memory_mb, | |
| 'total_estimated_gb': total_memory_mb / 1024 | |
| } | |
| def suggest_optimizations(self, current_config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Sugere otimizações baseadas no hardware disponível""" | |
| suggestions = {} | |
| memory_stats = self.get_memory_usage() | |
| # Sugestões baseadas na memória GPU disponível | |
| if 'gpu_memory' in memory_stats: | |
| gpu_memory_gb = memory_stats['gpu_memory']['total_mb'] / 1024 | |
| if gpu_memory_gb < 4: # GPU com pouca memória | |
| suggestions.update({ | |
| 'use_8bit_optimizer': True, | |
| 'use_gradient_checkpointing': True, | |
| 'mixed_precision': 'fp16', | |
| 'batch_size': 1, | |
| 'suggested_rank': 4, # Rank baixo para LoRA | |
| 'reason': 'GPU com pouca memória detectada' | |
| }) | |
| elif gpu_memory_gb < 8: # GPU média | |
| suggestions.update({ | |
| 'use_8bit_optimizer': True, | |
| 'use_gradient_checkpointing': True, | |
| 'mixed_precision': 'fp16', | |
| 'batch_size': 2, | |
| 'suggested_rank': 8, | |
| 'reason': 'GPU com memória média detectada' | |
| }) | |
| else: # GPU com bastante memória | |
| suggestions.update({ | |
| 'use_8bit_optimizer': False, | |
| 'use_gradient_checkpointing': False, | |
| 'mixed_precision': 'fp16', | |
| 'batch_size': 4, | |
| 'suggested_rank': 16, | |
| 'reason': 'GPU com memória suficiente detectada' | |
| }) | |
| else: # CPU only | |
| suggestions.update({ | |
| 'use_8bit_optimizer': True, | |
| 'use_gradient_checkpointing': True, | |
| 'mixed_precision': 'fp32', # CPU não suporta fp16 eficientemente | |
| 'batch_size': 1, | |
| 'suggested_rank': 4, | |
| 'reason': 'Treinamento em CPU detectado' | |
| }) | |
| return suggestions | |
| def monitor_memory_during_training(self) -> Dict[str, Any]: | |
| """Monitora o uso de memória durante o treinamento""" | |
| current_stats = self.get_memory_usage() | |
| # Detecta vazamentos de memória ou uso excessivo | |
| warnings = [] | |
| if 'gpu_memory' in current_stats: | |
| gpu_usage_percent = (current_stats['gpu_memory']['allocated_mb'] / | |
| current_stats['gpu_memory']['total_mb']) * 100 | |
| if gpu_usage_percent > 90: | |
| warnings.append("Uso de GPU muito alto (>90%), considere reduzir batch size") | |
| elif gpu_usage_percent > 80: | |
| warnings.append("Uso de GPU alto (>80%), monitore para possível OOM") | |
| cpu_usage_percent = current_stats['cpu_memory']['used_percent'] | |
| if cpu_usage_percent > 90: | |
| warnings.append("Uso de RAM muito alto (>90%)") | |
| return { | |
| 'memory_stats': current_stats, | |
| 'warnings': warnings, | |
| 'timestamp': torch.cuda.Event(enable_timing=True) if torch.cuda.is_available() else None | |
| } | |