IR_expeiment / PART2 /PromptIR /utils /schedulers.py
hugaagg's picture
Upload folder using huggingface_hub
2ecc7ab verified
import math
from collections import Counter
from torch.optim.lr_scheduler import _LRScheduler
import torch
import warnings
from typing import List
from torch import nn
from torch.optim import Adam, Optimizer
class MultiStepRestartLR(_LRScheduler):
""" MultiStep with restarts learning rate scheme.
Args:
optimizer (torch.nn.optimizer): Torch optimizer.
milestones (list): Iterations that will decrease learning rate.
gamma (float): Decrease ratio. Default: 0.1.
restarts (list): Restart iterations. Default: [0].
restart_weights (list): Restart weights at each restart iteration.
Default: [1].
last_epoch (int): Used in _LRScheduler. Default: -1.
"""
def __init__(self,
optimizer,
milestones,
gamma=0.1,
restarts=(0, ),
restart_weights=(1, ),
last_epoch=-1):
self.milestones = Counter(milestones)
self.gamma = gamma
self.restarts = restarts
self.restart_weights = restart_weights
assert len(self.restarts) == len(
self.restart_weights), 'restarts and their weights do not match.'
super(MultiStepRestartLR, self).__init__(optimizer, last_epoch)
def get_lr(self):
if self.last_epoch in self.restarts:
weight = self.restart_weights[self.restarts.index(self.last_epoch)]
return [
group['initial_lr'] * weight
for group in self.optimizer.param_groups
]
if self.last_epoch not in self.milestones:
return [group['lr'] for group in self.optimizer.param_groups]
return [
group['lr'] * self.gamma**self.milestones[self.last_epoch]
for group in self.optimizer.param_groups
]
class LinearLR(_LRScheduler):
"""
Args:
optimizer (torch.nn.optimizer): Torch optimizer.
milestones (list): Iterations that will decrease learning rate.
gamma (float): Decrease ratio. Default: 0.1.
last_epoch (int): Used in _LRScheduler. Default: -1.
"""
def __init__(self,
optimizer,
total_iter,
last_epoch=-1):
self.total_iter = total_iter
super(LinearLR, self).__init__(optimizer, last_epoch)
def get_lr(self):
process = self.last_epoch / self.total_iter
weight = (1 - process)
# print('get lr ', [weight * group['initial_lr'] for group in self.optimizer.param_groups])
return [weight * group['initial_lr'] for group in self.optimizer.param_groups]
class VibrateLR(_LRScheduler):
"""
Args:
optimizer (torch.nn.optimizer): Torch optimizer.
milestones (list): Iterations that will decrease learning rate.
gamma (float): Decrease ratio. Default: 0.1.
last_epoch (int): Used in _LRScheduler. Default: -1.
"""
def __init__(self,
optimizer,
total_iter,
last_epoch=-1):
self.total_iter = total_iter
super(VibrateLR, self).__init__(optimizer, last_epoch)
def get_lr(self):
process = self.last_epoch / self.total_iter
f = 0.1
if process < 3 / 8:
f = 1 - process * 8 / 3
elif process < 5 / 8:
f = 0.2
T = self.total_iter // 80
Th = T // 2
t = self.last_epoch % T
f2 = t / Th
if t >= Th:
f2 = 2 - f2
weight = f * f2
if self.last_epoch < Th:
weight = max(0.1, weight)
# print('f {}, T {}, Th {}, t {}, f2 {}'.format(f, T, Th, t, f2))
return [weight * group['initial_lr'] for group in self.optimizer.param_groups]
def get_position_from_periods(iteration, cumulative_period):
"""Get the position from a period list.
It will return the index of the right-closest number in the period list.
For example, the cumulative_period = [100, 200, 300, 400],
if iteration == 50, return 0;
if iteration == 210, return 2;
if iteration == 300, return 2.
Args:
iteration (int): Current iteration.
cumulative_period (list[int]): Cumulative period list.
Returns:
int: The position of the right-closest number in the period list.
"""
for i, period in enumerate(cumulative_period):
if iteration <= period:
return i
class CosineAnnealingRestartLR(_LRScheduler):
""" Cosine annealing with restarts learning rate scheme.
An example of config:
periods = [10, 10, 10, 10]
restart_weights = [1, 0.5, 0.5, 0.5]
eta_min=1e-7
It has four cycles, each has 10 iterations. At 10th, 20th, 30th, the
scheduler will restart with the weights in restart_weights.
Args:
optimizer (torch.nn.optimizer): Torch optimizer.
periods (list): Period for each cosine anneling cycle.
restart_weights (list): Restart weights at each restart iteration.
Default: [1].
eta_min (float): The mimimum lr. Default: 0.
last_epoch (int): Used in _LRScheduler. Default: -1.
"""
def __init__(self,
optimizer,
periods,
restart_weights=(1, ),
eta_min=0,
last_epoch=-1):
self.periods = periods
self.restart_weights = restart_weights
self.eta_min = eta_min
assert (len(self.periods) == len(self.restart_weights)
), 'periods and restart_weights should have the same length.'
self.cumulative_period = [
sum(self.periods[0:i + 1]) for i in range(0, len(self.periods))
]
super(CosineAnnealingRestartLR, self).__init__(optimizer, last_epoch)
def get_lr(self):
idx = get_position_from_periods(self.last_epoch,
self.cumulative_period)
current_weight = self.restart_weights[idx]
nearest_restart = 0 if idx == 0 else self.cumulative_period[idx - 1]
current_period = self.periods[idx]
return [
self.eta_min + current_weight * 0.5 * (base_lr - self.eta_min) *
(1 + math.cos(math.pi * (
(self.last_epoch - nearest_restart) / current_period)))
for base_lr in self.base_lrs
]
class CosineAnnealingRestartCyclicLR(_LRScheduler):
""" Cosine annealing with restarts learning rate scheme.
An example of config:
periods = [10, 10, 10, 10]
restart_weights = [1, 0.5, 0.5, 0.5]
eta_min=1e-7
It has four cycles, each has 10 iterations. At 10th, 20th, 30th, the
scheduler will restart with the weights in restart_weights.
Args:
optimizer (torch.nn.optimizer): Torch optimizer.
periods (list): Period for each cosine anneling cycle.
restart_weights (list): Restart weights at each restart iteration.
Default: [1].
eta_min (float): The mimimum lr. Default: 0.
last_epoch (int): Used in _LRScheduler. Default: -1.
"""
def __init__(self,
optimizer,
periods,
restart_weights=(1, ),
eta_mins=(0, ),
last_epoch=-1):
self.periods = periods
self.restart_weights = restart_weights
self.eta_mins = eta_mins
assert (len(self.periods) == len(self.restart_weights)
), 'periods and restart_weights should have the same length.'
self.cumulative_period = [
sum(self.periods[0:i + 1]) for i in range(0, len(self.periods))
]
super(CosineAnnealingRestartCyclicLR, self).__init__(optimizer, last_epoch)
def get_lr(self):
idx = get_position_from_periods(self.last_epoch,
self.cumulative_period)
current_weight = self.restart_weights[idx]
nearest_restart = 0 if idx == 0 else self.cumulative_period[idx - 1]
current_period = self.periods[idx]
eta_min = self.eta_mins[idx]
return [
eta_min + current_weight * 0.5 * (base_lr - eta_min) *
(1 + math.cos(math.pi * (
(self.last_epoch - nearest_restart) / current_period)))
for base_lr in self.base_lrs
]
class LinearWarmupCosineAnnealingLR(_LRScheduler):
"""Sets the learning rate of each parameter group to follow a linear warmup schedule between warmup_start_lr
and base_lr followed by a cosine annealing schedule between base_lr and eta_min.
.. warning::
It is recommended to call :func:`.step()` for :class:`LinearWarmupCosineAnnealingLR`
after each iteration as calling it after each epoch will keep the starting lr at
warmup_start_lr for the first epoch which is 0 in most cases.
.. warning::
passing epoch to :func:`.step()` is being deprecated and comes with an EPOCH_DEPRECATION_WARNING.
It calls the :func:`_get_closed_form_lr()` method for this scheduler instead of
:func:`get_lr()`. Though this does not change the behavior of the scheduler, when passing
epoch param to :func:`.step()`, the user should call the :func:`.step()` function before calling
train and validation methods.
Example:
>>> layer = nn.Linear(10, 1)
>>> optimizer = Adam(layer.parameters(), lr=0.02)
>>> scheduler = LinearWarmupCosineAnnealingLR(optimizer, warmup_epochs=10, max_epochs=40)
>>> #
>>> # the default case
>>> for epoch in range(40):
... # train(...)
... # validate(...)
... scheduler.step()
>>> #
>>> # passing epoch param case
>>> for epoch in range(40):
... scheduler.step(epoch)
... # train(...)
... # validate(...)
"""
def __init__(
self,
optimizer: Optimizer,
warmup_epochs: int,
max_epochs: int,
warmup_start_lr: float = 0.0,
eta_min: float = 0.0,
last_epoch: int = -1,
) -> None:
"""
Args:
optimizer (Optimizer): Wrapped optimizer.
warmup_epochs (int): Maximum number of iterations for linear warmup
max_epochs (int): Maximum number of iterations
warmup_start_lr (float): Learning rate to start the linear warmup. Default: 0.
eta_min (float): Minimum learning rate. Default: 0.
last_epoch (int): The index of last epoch. Default: -1.
"""
self.warmup_epochs = warmup_epochs
self.max_epochs = max_epochs
self.warmup_start_lr = warmup_start_lr
self.eta_min = eta_min
super().__init__(optimizer, last_epoch)
def get_lr(self) -> List[float]:
"""Compute learning rate using chainable form of the scheduler."""
if not self._get_lr_called_within_step:
warnings.warn(
"To get the last learning rate computed by the scheduler, " "please use `get_last_lr()`.",
UserWarning,
)
if self.last_epoch == 0:
return [self.warmup_start_lr] * len(self.base_lrs)
if self.last_epoch < self.warmup_epochs:
return [
group["lr"] + (base_lr - self.warmup_start_lr) / (self.warmup_epochs - 1)
for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups)
]
if self.last_epoch == self.warmup_epochs:
return self.base_lrs
if (self.last_epoch - 1 - self.max_epochs) % (2 * (self.max_epochs - self.warmup_epochs)) == 0:
return [
group["lr"]
+ (base_lr - self.eta_min) * (1 - math.cos(math.pi / (self.max_epochs - self.warmup_epochs))) / 2
for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups)
]
return [
(1 + math.cos(math.pi * (self.last_epoch - self.warmup_epochs) / (self.max_epochs - self.warmup_epochs)))
/ (
1
+ math.cos(
math.pi * (self.last_epoch - self.warmup_epochs - 1) / (self.max_epochs - self.warmup_epochs)
)
)
* (group["lr"] - self.eta_min)
+ self.eta_min
for group in self.optimizer.param_groups
]
def _get_closed_form_lr(self) -> List[float]:
"""Called when epoch is passed as a param to the `step` function of the scheduler."""
if self.last_epoch < self.warmup_epochs:
return [
self.warmup_start_lr + self.last_epoch * (base_lr - self.warmup_start_lr) / (self.warmup_epochs - 1)
for base_lr in self.base_lrs
]
return [
self.eta_min
+ 0.5
* (base_lr - self.eta_min)
* (1 + math.cos(math.pi * (self.last_epoch - self.warmup_epochs) / (self.max_epochs - self.warmup_epochs)))
for base_lr in self.base_lrs
]
# warmup + decay as a function
def linear_warmup_decay(warmup_steps, total_steps, cosine=True, linear=False):
"""Linear warmup for warmup_steps, optionally with cosine annealing or linear decay to 0 at total_steps."""
assert not (linear and cosine)
def fn(step):
if step < warmup_steps:
return float(step) / float(max(1, warmup_steps))
if not (cosine or linear):
# no decay
return 1.0
progress = float(step - warmup_steps) / float(max(1, total_steps - warmup_steps))
if cosine:
# cosine decay
return 0.5 * (1.0 + math.cos(math.pi * progress))
# linear decay
return 1.0 - progress
return fn