|
|
import torch
|
|
|
from torch.optim import Optimizer
|
|
|
import math
|
|
|
from collections import deque
|
|
|
|
|
|
"""
|
|
|
EmoNavi v3.6.1 (251220) shadow-system v3.1 -moment v3.1 emoDrive v3.6
|
|
|
(v1.0)AMP対応完了(250725) p.data -> p 修正済み/低精度量子化への基本対応/低精度補償は別
|
|
|
(v2.0)shadow-system 微調整/3段階補正を連続的に滑らかに/派生版では以下の切替も可能
|
|
|
optimizer 指定の際に True / False で shadow を切替できる(現在 False)
|
|
|
(v3.0)emosens shadow-effect v1.0 反映した動的学習率と shadow-system 切替をデフォルト化
|
|
|
(v3.1)通常未使用の shadow 更新速度 (lerp) を倍化し信頼度で動的制御/coeff 活用(急変・微動)
|
|
|
動的学習率や感情スカラー値など TensorBoard 連携可 (現在 writer=None)/外部設定必要
|
|
|
全体の効率化や可読性を向上(emaやスカラーの多重処理を省く等、動的学習率のスケールや状態の見直し等、含む)
|
|
|
(v3.6)-Final- emoDrive v3.6 により信頼度に応じ学習率を大きく増減させることにした(emo系の完成版)
|
|
|
"""
|
|
|
|
|
|
class EmoNavi(Optimizer):
|
|
|
|
|
|
def __init__(self, params,
|
|
|
lr=1e-3,
|
|
|
eps=1e-8,
|
|
|
betas=(0.9, 0.995),
|
|
|
weight_decay=0.01,
|
|
|
use_shadow:bool=False,
|
|
|
writer=None):
|
|
|
defaults = dict(lr=lr, betas=betas, eps=eps, weight_decay=weight_decay)
|
|
|
super().__init__(params, defaults)
|
|
|
self._init_lr = lr
|
|
|
self.should_stop = False
|
|
|
self.use_shadow = use_shadow
|
|
|
self.writer = writer
|
|
|
|
|
|
|
|
|
def _update_ema(self, state, loss_val):
|
|
|
ema = state.setdefault('ema', {})
|
|
|
ema['short'] = 0.3 * loss_val + 0.7 * ema.get('short', loss_val)
|
|
|
ema['medium'] = 0.05 * loss_val + 0.95 * ema.get('medium', loss_val)
|
|
|
ema['long'] = 0.01 * loss_val + 0.99 * ema.get('long', loss_val)
|
|
|
return ema
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _compute_scalar(self, ema):
|
|
|
scale_base_l = max(ema['long'], 1e-5)
|
|
|
scale_base_m = max(ema['medium'], 1e-5)
|
|
|
diff_l = (ema['long'] - ema['short']) / scale_base_l
|
|
|
diff_m = (ema['long'] - ema['short']) / scale_base_m
|
|
|
|
|
|
if abs(diff_l) < 0.05:
|
|
|
return math.tanh(diff_l)
|
|
|
|
|
|
if abs(diff_m) * scale_base_m < abs(diff_l) * scale_base_l:
|
|
|
return math.tanh(1 * diff_m)
|
|
|
else:
|
|
|
return math.tanh(1 * diff_l)
|
|
|
|
|
|
|
|
|
def _early_scalar(self, ema):
|
|
|
scale_base_l = max(ema['long'], 1e-5)
|
|
|
diff = (ema['long'] - ema['short']) / scale_base_l
|
|
|
return math.tanh(1 * diff)
|
|
|
|
|
|
|
|
|
|
|
|
def _decide_coeff(self, scalar):
|
|
|
if abs(scalar) > 0.75:
|
|
|
return 1.0 - abs(scalar)
|
|
|
elif abs(scalar) > 0.50:
|
|
|
return 1.0 - abs(scalar)
|
|
|
elif abs(scalar) > 0.25:
|
|
|
return 1.0 - abs(scalar)
|
|
|
else:
|
|
|
return 1.0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _decide_ratio(self, scalar):
|
|
|
if not self.use_shadow:
|
|
|
return 0.0
|
|
|
if abs(scalar) > 0.625:
|
|
|
return 1.0 - abs(scalar)
|
|
|
else:
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
def step(self, closure=None):
|
|
|
loss = closure() if closure is not None else None
|
|
|
loss_val = loss.item() if loss is not None else 0.0
|
|
|
|
|
|
|
|
|
ema = self._update_ema(self.state, loss_val)
|
|
|
early_scalar = self._early_scalar(ema)
|
|
|
scalar = self._compute_scalar(ema)
|
|
|
coeff = self._decide_coeff(scalar)
|
|
|
ratio = self._decide_ratio(scalar)
|
|
|
trust = math.copysign((1.0 - abs(scalar)), scalar)
|
|
|
emoDpt = 8.0 * abs(trust)
|
|
|
|
|
|
for group in self.param_groups:
|
|
|
for p in group['params']:
|
|
|
if p.grad is None:
|
|
|
continue
|
|
|
|
|
|
grad = p.grad
|
|
|
state = self.state[p]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.use_shadow :
|
|
|
if 'shadow' not in state:
|
|
|
state['shadow'] = p.clone()
|
|
|
if ratio > 0:
|
|
|
p.mul_(1-ratio).add_(state['shadow'], alpha=abs(trust))
|
|
|
else:
|
|
|
leap_ratio = 0.1 * abs(trust)
|
|
|
state['shadow'].lerp_(p, leap_ratio)
|
|
|
|
|
|
|
|
|
if 0.25 < abs(scalar) < 0.5:
|
|
|
emoDrive = emoDpt * (1.0 + 0.1 * trust)
|
|
|
elif abs(scalar) > 0.75:
|
|
|
emoDrive = coeff
|
|
|
else:
|
|
|
emoDrive = 1.0
|
|
|
|
|
|
|
|
|
|
|
|
exp_avg = state.setdefault('exp_avg', torch.zeros_like(p))
|
|
|
exp_avg_sq = state.setdefault('exp_avg_sq', torch.zeros_like(p))
|
|
|
beta1, beta2 = group['betas']
|
|
|
|
|
|
exp_avg.mul_(beta1).add_(grad, alpha=1 - beta1)
|
|
|
exp_avg_sq.mul_(beta2).addcmul_(grad, grad, value=1 - beta2)
|
|
|
denom = exp_avg_sq.sqrt().add_(group['eps'])
|
|
|
|
|
|
step_size = group['lr']
|
|
|
if group['weight_decay']:
|
|
|
p.add_(p, alpha=-group['weight_decay'] * step_size)
|
|
|
p.addcdiv_(exp_avg, denom, value=-step_size * emoDrive)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
hist = self.state.setdefault('scalar_hist', deque(maxlen=32))
|
|
|
hist.append(early_scalar)
|
|
|
|
|
|
|
|
|
|
|
|
if len(hist) >= 32:
|
|
|
avg_abs = sum(abs(s) for s in hist) / len(hist)
|
|
|
mean = sum(hist) / len(hist)
|
|
|
var = sum((s - mean)**2 for s in hist) / len(hist)
|
|
|
if avg_abs < 0.05 and var < 0.005:
|
|
|
self.should_stop = True
|
|
|
|
|
|
|
|
|
if hasattr(self, 'writer') and self.writer is not None:
|
|
|
self._step_count = getattr(self, "_step_count", 0) + 1
|
|
|
self.writer.add_scalar("emoLR/base", step_size, self._step_count)
|
|
|
self.writer.add_scalar("emoLR/Turbo", step_size * emoDrive, self._step_count)
|
|
|
self.writer.add_scalar("emostate/emoDrive", emoDrive, self._step_count)
|
|
|
self.writer.add_scalar("emostate/scalar", scalar, self._step_count)
|
|
|
|
|
|
return
|
|
|
|
|
|
"""
|
|
|
https://github.com/muooon/EmoNavi
|
|
|
An emotion-driven optimizer that feels loss and navigates accordingly.
|
|
|
Don't think. Feel. Don't stop. Keep running. Believe in what's beyond.
|
|
|
"""
|
|
|
|