import torch from torch.optim import Optimizer class SnooC(Optimizer): """ Fixed SnooC Optimizer """ @torch.no_grad() def __init__(self, optimizer, lr: float = 0.67, momentum: float = 0.67, k: int = 20) -> None: self.optimizer = optimizer self.lr = lr self.momentum = momentum self.k = k self.current_step = 0 self.model_params = None self.outer_buf = None self.outer_optimizer = None if self.optimizer.param_groups: self.param_groups = self.optimizer.param_groups @torch.no_grad() def _initialize_outer_optimizer(self): # Исправленная логика сбора параметров params = [] for pg in self.optimizer.param_groups: for param in pg['params']: # Собираем только тензоры, требующие градиента (обычно это то, что нужно оптимизировать) if isinstance(param, torch.Tensor) and param.requires_grad: params.append(param) if not params: return self.model_params = list(params) self.outer_buf = [p.clone() for p in self.model_params] # Инициализируем внешний оптимизатор только если есть параметры self.outer_optimizer = torch.optim.SGD( self.model_params, lr=self.lr, momentum=self.momentum, nesterov=True, # fused=True может вызывать ошибки на некоторых версиях torch/hw, # можно поставить False, если будет падать дальше, но пока оставим True fused=True, ) self.param_groups = self.optimizer.param_groups @torch.no_grad() def step(self, closure=None): if self.outer_optimizer is None or self.current_step == 0: if self.optimizer.param_groups: self._initialize_outer_optimizer() # Если после попытки инициализации параметры все еще None, # значит оптимизировать нечего, просто делаем шаг базового оптимизатора if self.model_params is None: return self.optimizer.step(closure) loss = self.optimizer.step(closure) # Добавляем проверку на None здесь на всякий случай if self.model_params is not None and self.current_step % self.k == 0: for p_new, p_old in zip(self.model_params, self.outer_buf): if p_new.grad is None: continue # Защита от отсутствующих градиентов p_new.grad = p_old.data - p_new.data p_new.copy_(p_old, non_blocking=True) self.outer_optimizer.step() for p_new, p_old in zip(self.model_params, self.outer_buf): p_old.copy_(p_new, non_blocking=True) self.current_step += 1 return loss def zero_grad(self, set_to_none: bool = False): self.optimizer.zero_grad(set_to_none=set_to_none) def state_dict(self): return self.optimizer.state_dict() def load_state_dict(self, state_dict): self.optimizer.load_state_dict(state_dict)