|
|
import torch
|
|
|
from torch.optim import Optimizer
|
|
|
import math
|
|
|
|
|
|
"""
|
|
|
AMP対応完了(202507) p.data -> p 修正済み
|
|
|
"""
|
|
|
|
|
|
class EmoNavi(Optimizer):
|
|
|
|
|
|
def __init__(self, params, lr=1e-3, betas=(0.9, 0.999),
|
|
|
eps=1e-8, weight_decay=0.01):
|
|
|
defaults = dict(lr=lr, betas=betas, eps=eps, weight_decay=weight_decay)
|
|
|
super().__init__(params, defaults)
|
|
|
self._init_lr = lr
|
|
|
self.should_stop = False
|
|
|
|
|
|
|
|
|
def _update_ema(self, state, loss_val):
|
|
|
ema = state.setdefault('ema', {})
|
|
|
ema['short'] = 0.3 * loss_val + 0.7 * ema.get('short', loss_val)
|
|
|
ema['long'] = 0.01 * loss_val + 0.99 * ema.get('long', loss_val)
|
|
|
return ema
|
|
|
|
|
|
|
|
|
def _compute_scalar(self, ema):
|
|
|
diff = ema['short'] - ema['long']
|
|
|
return math.tanh(5 * diff)
|
|
|
|
|
|
|
|
|
def _decide_ratio(self, scalar):
|
|
|
if scalar > 0.6:
|
|
|
return 0.7 + 0.2 * scalar
|
|
|
elif scalar < -0.6:
|
|
|
return 0.1
|
|
|
elif abs(scalar) > 0.3:
|
|
|
return 0.3
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
def step(self, closure=None):
|
|
|
loss = closure() if closure is not None else None
|
|
|
loss_val = loss.item() if loss is not None else 0.0
|
|
|
|
|
|
for group in self.param_groups:
|
|
|
for p in group['params']:
|
|
|
if p.grad is None:
|
|
|
continue
|
|
|
|
|
|
grad = p.grad
|
|
|
state = self.state[p]
|
|
|
|
|
|
|
|
|
ema = self._update_ema(state, loss_val)
|
|
|
scalar = self._compute_scalar(ema)
|
|
|
ratio = self._decide_ratio(scalar)
|
|
|
|
|
|
|
|
|
if ratio > 0:
|
|
|
if 'shadow' not in state:
|
|
|
state['shadow'] = p.clone()
|
|
|
else:
|
|
|
p.mul_(1 - ratio).add_(state['shadow'], alpha=ratio)
|
|
|
state['shadow'].lerp_(p, 0.05)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
exp_avg = state.setdefault('exp_avg', torch.zeros_like(p))
|
|
|
exp_avg_sq = state.setdefault('exp_avg_sq', torch.zeros_like(p))
|
|
|
beta1, beta2 = group['betas']
|
|
|
exp_avg.mul_(beta1).add_(grad, alpha=1 - beta1)
|
|
|
exp_avg_sq.mul_(beta2).addcmul_(grad, grad, value=1 - beta2)
|
|
|
denom = exp_avg_sq.sqrt().add_(group['eps'])
|
|
|
|
|
|
step_size = group['lr']
|
|
|
if group['weight_decay']:
|
|
|
p.add_(p, alpha=-group['weight_decay'] * step_size)
|
|
|
p.addcdiv_(exp_avg, denom, value=-step_size)
|
|
|
|
|
|
|
|
|
|
|
|
hist = self.state.setdefault('scalar_hist', [])
|
|
|
hist.append(scalar)
|
|
|
if len(hist) >= 33:
|
|
|
hist.pop(0)
|
|
|
|
|
|
|
|
|
if len(self.state['scalar_hist']) >= 32:
|
|
|
buf = self.state['scalar_hist']
|
|
|
avg_abs = sum(abs(s) for s in buf) / len(buf)
|
|
|
std = sum((s - sum(buf)/len(buf))**2 for s in buf) / len(buf)
|
|
|
if avg_abs < 0.05 and std < 0.005:
|
|
|
self.should_stop = True
|
|
|
|
|
|
|
|
|
|
|
|
return loss
|
|
|
|
|
|
"""
|
|
|
https://github.com/muooon/EmoNavi
|
|
|
An emotion-driven optimizer that feels loss and navigates accordingly.
|
|
|
Don't think. Feel. Don't stop. Keep running. Believe in what's beyond.
|
|
|
"""
|
|
|
|