| import torch
|
| from torch.optim import Optimizer
|
| import math
|
| from collections import deque
|
|
|
| """
|
| EmoFact v3.6.3 (260130) shadow-system v3.1 -moment v3.1 emoDrive v3.6
|
| コード全体を見直し効率化を進めた/開発終了のため新機能等はない
|
| EmoFact v3.6.1 (251220) shadow-system v3.1 -moment v3.1 emoDrive v3.6
|
| (v1.0)AMP対応完了(250725) p.data -> p 修正済み/低精度量子化への基本対応/低精度補償は別
|
| (v2.0)shadow-system 微調整/3段階補正を連続的に滑らかに/派生版では以下の切替も可能
|
| optimizer 指定の際に True / False で shadow を切替できる(現在 False)
|
| (v3.0)emosens shadow-effect v1.0 反映した動的学習率と shadow-system 切替をデフォルト化
|
| (v3.1)通常未使用の shadow 更新速度 (lerp) を倍化し信頼度で動的制御/coeff 活用(急変・微動)
|
| 動的学習率や感情スカラー値など TensorBoard 連携可 (現在 writer=None)/外部設定必要
|
| 全体の効率化や可読性を向上(emaやスカラーの多重処理を省く等、動的学習率のスケールや状態の見直し等、含む)
|
| (v3.6)-Final- emoDrive v3.6 により信頼度に応じ学習率を大きく増減させた(emonavi世代の完成版)
|
| """
|
|
|
| class EmoFact(Optimizer):
|
|
|
| def __init__(self, params,
|
| lr=1e-3,
|
| eps=1e-8,
|
| betas=(0.9, 0.995),
|
| weight_decay=0.01,
|
| use_shadow:bool=False):
|
| defaults = dict(lr=lr, betas=betas, eps=eps, weight_decay=weight_decay)
|
| super().__init__(params, defaults)
|
| self._init_lr = lr
|
| self.should_stop = False
|
| self.use_shadow = use_shadow
|
|
|
|
|
| def _update_ema(self, state, loss_val):
|
| ema = state.setdefault('ema', {})
|
| ema['short'] = 0.3 * loss_val + 0.7 * ema.get('short', loss_val)
|
| ema['medium'] = 0.05 * loss_val + 0.95 * ema.get('medium', loss_val)
|
| ema['long'] = 0.01 * loss_val + 0.99 * ema.get('long', loss_val)
|
| return ema
|
|
|
|
|
|
|
|
|
|
|
|
|
| def _compute_scalar(self, ema):
|
| scale_base_l = max(ema['long'], 1e-5)
|
| scale_base_m = max(ema['medium'], 1e-5)
|
| diff_base = ema['long'] - ema['short']
|
| diff_l = diff_base / scale_base_l
|
| diff_m = diff_base / scale_base_m
|
|
|
| if abs(diff_l) < 0.05:
|
| return math.tanh(diff_l)
|
|
|
| if abs(diff_m) * scale_base_m < abs(diff_l) * scale_base_l:
|
| return math.tanh(diff_m)
|
| else:
|
| return math.tanh(diff_l)
|
|
|
|
|
| def _early_scalar(self, ema):
|
| scale_base_l = max(ema['long'], 1e-5)
|
| diff = (ema['long'] - ema['short']) / scale_base_l
|
| return math.tanh(diff)
|
|
|
|
|
|
|
| def _decide_coeff(self, scalar):
|
| if abs(scalar) > 0.75:
|
| return 1.0 - abs(scalar)
|
| elif abs(scalar) > 0.50:
|
| return 1.0 - abs(scalar)
|
| elif abs(scalar) > 0.25:
|
| return 1.0 - abs(scalar)
|
| else:
|
| return 1.0
|
|
|
|
|
|
|
|
|
|
|
|
|
| def _decide_ratio(self, scalar):
|
| if not self.use_shadow:
|
| return 0.0
|
| if abs(scalar) > 0.625:
|
| return 1.0 - abs(scalar)
|
| else:
|
| return 0.0
|
|
|
|
|
| @torch.no_grad()
|
| def step(self, closure=None):
|
| loss = closure() if closure is not None else None
|
| loss_val = loss.item() if loss is not None else 0.0
|
|
|
|
|
| ema = self._update_ema(self.state, loss_val)
|
| early_scalar = self._early_scalar(ema)
|
| scalar = self._compute_scalar(ema)
|
| coeff = self._decide_coeff(scalar)
|
| ratio = self._decide_ratio(scalar)
|
| trust = math.copysign((1.0 - abs(scalar)), scalar)
|
| emoDpt = 8.0 * abs(trust)
|
|
|
| for group in self.param_groups:
|
| for p in group['params']:
|
| if p.grad is None:
|
| continue
|
|
|
| grad = p.grad
|
| state = self.state[p]
|
|
|
|
|
|
|
|
|
|
|
|
|
| if self.use_shadow :
|
| if 'shadow' not in state:
|
| state['shadow'] = p.clone()
|
| if ratio > 0:
|
| p.mul_(1-ratio).add_(state['shadow'], alpha=abs(trust))
|
| else:
|
| leap_ratio = 0.1 * abs(trust)
|
| state['shadow'].lerp_(p, leap_ratio)
|
|
|
|
|
| if 0.25 < abs(scalar) < 0.5:
|
| emoDrive = emoDpt * (1.0 + 0.1 * trust)
|
| elif abs(scalar) > 0.75:
|
| emoDrive = coeff
|
| else:
|
| emoDrive = 1.0
|
|
|
|
|
|
|
| if grad.dim() >= 2:
|
|
|
| r_sq = torch.mean(grad * grad, dim=tuple(range(1, grad.dim())), keepdim=True).add_(group['eps'])
|
| c_sq = torch.mean(grad * grad, dim=0, keepdim=True).add_(group['eps'])
|
|
|
|
|
|
|
|
|
| beta1, beta2 = group['betas']
|
| state.setdefault('exp_avg_r', torch.zeros_like(r_sq)).mul_(beta1).add_(torch.sqrt(r_sq), alpha=1 - beta1)
|
| state.setdefault('exp_avg_c', torch.zeros_like(c_sq)).mul_(beta1).add_(torch.sqrt(c_sq), alpha=1 - beta1)
|
|
|
|
|
| denom = torch.sqrt(state['exp_avg_r'] * state['exp_avg_c']).add_(group['eps'])
|
|
|
|
|
| update_term = (grad / denom)
|
|
|
|
|
| else:
|
| beta1, beta2 = group['betas']
|
| exp_avg_sq = state.setdefault('exp_avg_sq', torch.zeros_like(p))
|
| exp_avg_sq.mul_(beta1).addcmul_(grad, grad, value=(1 - beta2))
|
| denom = exp_avg_sq.sqrt().add_(group['eps'])
|
|
|
| update_term = (grad / denom)
|
|
|
|
|
| step_size = group['lr']
|
| p.add_(p, alpha=-group['weight_decay'] * step_size)
|
| p.add_(update_term.sign_(), alpha=-step_size * emoDrive)
|
|
|
|
|
|
|
|
|
| hist = self.state.setdefault('scalar_hist', deque(maxlen=32))
|
| hist.append(scalar)
|
|
|
|
|
|
|
| if len(hist) >= 32:
|
| avg_abs = sum(abs(s) for s in hist) / len(hist)
|
| mean = sum(hist) / len(hist)
|
| var = sum((s - mean)**2 for s in hist) / len(hist)
|
| if avg_abs < 0.05 and var < 0.005:
|
| self.should_stop = True
|
| else:
|
| self.should_stop = False
|
|
|
| return
|
|
|
| """
|
| https://github.com/muooon/EmoNavi
|
| Fact is inspired by Adafactor, and emoairy,
|
| and its VRAM-friendly design is something everyone loves.
|
| """ |