|
|
from collections import OrderedDict |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
import pdb |
|
|
|
|
|
from einops import rearrange |
|
|
|
|
|
from basicsr.utils import get_root_logger |
|
|
from basicsr.utils.registry import MODEL_REGISTRY |
|
|
from basicsr.archs import build_network |
|
|
from basicsr.losses import build_loss |
|
|
from basicsr.archs.arch_util import flow_warp, resize_flow |
|
|
|
|
|
from .video_recurrent_model import VideoRecurrentModel |
|
|
|
|
|
|
|
|
@MODEL_REGISTRY.register() |
|
|
class KEEPModel(VideoRecurrentModel): |
|
|
"""KEEP Model. |
|
|
""" |
|
|
|
|
|
def init_training_settings(self): |
|
|
self.net_g.train() |
|
|
train_opt = self.opt['train'] |
|
|
logger = get_root_logger() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.ema_decay = train_opt.get('ema_decay', 0) |
|
|
if self.ema_decay > 0: |
|
|
logger.info( |
|
|
f'Use Exponential Moving Average with decay: {self.ema_decay}') |
|
|
|
|
|
|
|
|
|
|
|
self.net_g_ema = build_network( |
|
|
self.opt['network_g']).to(self.device) |
|
|
|
|
|
load_path = self.opt['path'].get('pretrain_network_g', None) |
|
|
if load_path is not None: |
|
|
self.load_network(self.net_g_ema, load_path, self.opt['path'].get( |
|
|
'strict_load_g', True), 'params_ema') |
|
|
else: |
|
|
self.model_ema(0) |
|
|
self.net_g_ema.eval() |
|
|
|
|
|
|
|
|
self.hq_feat_loss = train_opt.get('use_hq_feat_loss', False) |
|
|
self.feat_loss_weight = train_opt.get('feat_loss_weight', 1.0) |
|
|
self.cross_entropy_loss = train_opt.get('cross_entropy_loss', False) |
|
|
self.entropy_loss_weight = train_opt.get('entropy_loss_weight', 0.5) |
|
|
|
|
|
if self.cross_entropy_loss: |
|
|
self.generate_idx_gt = True |
|
|
assert self.opt.get( |
|
|
'network_vqgan', None) is not None, f'Shoule have network_vqgan config or pre-calculated latent code.' |
|
|
self.hq_vqgan_fix = build_network( |
|
|
self.opt['network_vqgan']).to(self.device) |
|
|
self.hq_vqgan_fix.eval() |
|
|
for param in self.hq_vqgan_fix.parameters(): |
|
|
param.requires_grad = False |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
self.generate_idx_gt = False |
|
|
logger.info(f'Need to generate latent GT code: {self.generate_idx_gt}') |
|
|
|
|
|
if train_opt.get('pixel_opt'): |
|
|
self.cri_pix = build_loss(train_opt['pixel_opt']).to(self.device) |
|
|
else: |
|
|
self.cri_pix = None |
|
|
|
|
|
if train_opt.get('perceptual_opt'): |
|
|
self.perceptual_type = train_opt['perceptual_opt']['type'] |
|
|
self.cri_perceptual = build_loss( |
|
|
train_opt['perceptual_opt']).to(self.device) |
|
|
else: |
|
|
self.cri_perceptual = None |
|
|
|
|
|
if train_opt.get('temporal_opt'): |
|
|
self.temporal_type = train_opt.get('temporal_warp_type', 'GT') |
|
|
self.cri_temporal = build_loss( |
|
|
train_opt['temporal_opt']).to(self.device) |
|
|
else: |
|
|
self.cri_temporal = None |
|
|
|
|
|
|
|
|
self.setup_optimizers() |
|
|
self.setup_schedulers() |
|
|
|
|
|
def setup_optimizers(self): |
|
|
train_opt = self.opt['train'] |
|
|
logger = get_root_logger() |
|
|
|
|
|
optim_names, freezed_names = [], [] |
|
|
|
|
|
optim_params_g = [] |
|
|
for k, v in self.net_g.named_parameters(): |
|
|
if v.requires_grad: |
|
|
optim_params_g.append(v) |
|
|
optim_names.append(k) |
|
|
else: |
|
|
freezed_names.append(k) |
|
|
|
|
|
logger.warning(f'--------------- Optimizing Params ---------------.') |
|
|
for k in optim_names: |
|
|
logger.warning(f'Params {k} will be optimized.') |
|
|
logger.warning(f'--------------- Freezing Params ---------------.') |
|
|
for k in freezed_names: |
|
|
logger.warning(f'Params {k} will be freezed.') |
|
|
|
|
|
|
|
|
optim_type = train_opt['optim_g'].pop('type') |
|
|
self.optimizer_g = self.get_optimizer( |
|
|
optim_type, optim_params_g, **train_opt['optim_g']) |
|
|
self.optimizers.append(self.optimizer_g) |
|
|
|
|
|
def optimize_parameters(self, current_iter): |
|
|
|
|
|
self.optimizer_g.zero_grad() |
|
|
|
|
|
if self.generate_idx_gt: |
|
|
with torch.no_grad(): |
|
|
b, f, c, h, w = self.gt.shape |
|
|
x = self.hq_vqgan_fix.encoder(self.gt.reshape(-1, c, h, w)) |
|
|
_, _, quant_stats = self.hq_vqgan_fix.quantize(x) |
|
|
min_encoding_indices = quant_stats['min_encoding_indices'] |
|
|
self.idx_gt = min_encoding_indices.view(b*f, -1) |
|
|
|
|
|
if self.hq_feat_loss or self.cross_entropy_loss: |
|
|
self.output, logits, lq_feat, gen_feat_dict = self.net_g( |
|
|
self.lq, detach_16=True, early_feat=True) |
|
|
else: |
|
|
self.output, gen_feat_dict = self.net_g( |
|
|
self.lq, detach_16=True, early_feat=False) |
|
|
if len(gen_feat_dict) == 0: |
|
|
gen_feat_dict['HR'] = self.output |
|
|
|
|
|
l_g_total = 0 |
|
|
loss_dict = OrderedDict() |
|
|
|
|
|
if self.hq_feat_loss: |
|
|
code_h = lq_feat.shape[-1] |
|
|
quant_feat_gt = self.net_g.module.quantize.get_codebook_feat( |
|
|
self.idx_gt, shape=[b*f, code_h, code_h, 256]) |
|
|
l_feat_encoder = torch.mean( |
|
|
(quant_feat_gt.detach()-lq_feat)**2) * self.feat_loss_weight |
|
|
l_g_total += l_feat_encoder |
|
|
loss_dict['l_feat_encoder'] = l_feat_encoder |
|
|
|
|
|
|
|
|
if self.cross_entropy_loss: |
|
|
|
|
|
cross_entropy_loss = F.cross_entropy(logits.permute( |
|
|
0, 2, 1), self.idx_gt) * self.entropy_loss_weight |
|
|
l_g_total += cross_entropy_loss |
|
|
loss_dict['l_cross_entropy'] = cross_entropy_loss |
|
|
|
|
|
|
|
|
if self.cri_temporal: |
|
|
assert len( |
|
|
gen_feat_dict) != 0, "Empty features for temporal regularization." |
|
|
with torch.no_grad(): |
|
|
if self.temporal_type == 'GT': |
|
|
flows = self.net_g.module.get_flow(self.gt).detach() |
|
|
flows = rearrange(flows, "b f c h w -> (b f) c h w") |
|
|
elif self.temporal_type == 'HR': |
|
|
flows = self.net_g.module.get_flow(self.output).detach() |
|
|
flows = rearrange(flows, "b f c h w -> (b f) c h w") |
|
|
elif self.temporal_type == 'Diff': |
|
|
gt_flows = self.net_g.module.get_flow(self.gt).detach() |
|
|
gt_flows = rearrange(gt_flows, "b f c h w -> (b f) c h w") |
|
|
hr_flows = self.net_g.module.get_flow(self.output).detach() |
|
|
hr_flows = rearrange(hr_flows, "b f c h w -> (b f) c h w") |
|
|
else: |
|
|
raise ValueError( |
|
|
f'Unsupported temporal mode: {self.temporal_type}.') |
|
|
|
|
|
l_temporal = 0 |
|
|
for f_size, feat in gen_feat_dict.items(): |
|
|
b, f, c, h, w = feat.shape |
|
|
|
|
|
if self.temporal_type == 'GT' or self.temporal_type == 'HR': |
|
|
flow = resize_flow(flows, 'shape', [h, w]) |
|
|
flow = rearrange(flow, "b c h w -> b h w c") |
|
|
prev_feat = feat[:, :-1, ...].view(-1, c, h, w) |
|
|
curr_feat = feat[:, 1:, ...].view(-1, c, h, w) |
|
|
warp_feat = flow_warp(prev_feat, flow) |
|
|
l_temporal += self.cri_temporal(curr_feat, warp_feat) |
|
|
elif self.temporal_type == 'Diff': |
|
|
gt_flow = resize_flow(gt_flows, 'shape', [ |
|
|
h, w]) |
|
|
gt_flow = rearrange(gt_flow, "b c h w -> b h w c") |
|
|
hr_flow = resize_flow(hr_flows, 'shape', [ |
|
|
h, w]) |
|
|
hr_flow = rearrange(hr_flow, "b c h w -> b h w c") |
|
|
|
|
|
prev_feat = feat[:, :-1, ...].view(-1, c, h, w) |
|
|
curr_feat = feat[:, 1:, ...].view(-1, c, h, w) |
|
|
gt_warp_feat = flow_warp(prev_feat, gt_flow) |
|
|
hr_warp_feat = flow_warp(prev_feat, hr_flow) |
|
|
l_temporal += self.cri_temporal(gt_warp_feat, hr_warp_feat) |
|
|
|
|
|
l_g_total += l_temporal |
|
|
loss_dict['l_temporal'] = l_temporal |
|
|
|
|
|
|
|
|
if self.cri_pix: |
|
|
l_pix = self.cri_pix(self.output, self.gt) |
|
|
l_g_total += l_pix |
|
|
loss_dict['l_pix'] = l_pix |
|
|
|
|
|
|
|
|
if self.cri_perceptual: |
|
|
B, T, C, H, W = self.gt.shape |
|
|
if self.perceptual_type == 'PerceptualLoss': |
|
|
l_percep, l_style = self.cri_perceptual( |
|
|
self.output.view(-1, C, H, W), self.gt.view(-1, C, H, W)) |
|
|
if l_percep is not None: |
|
|
l_g_total += l_percep |
|
|
loss_dict['l_percep'] = l_percep |
|
|
if l_style is not None: |
|
|
l_g_total += l_style |
|
|
loss_dict['l_style'] = l_style |
|
|
elif self.perceptual_type == 'LPIPSLoss': |
|
|
l_percep = self.cri_perceptual( |
|
|
self.output.view(-1, C, H, W), self.gt.view(-1, C, H, W)) |
|
|
l_g_total += l_percep |
|
|
loss_dict['l_percep'] = l_percep |
|
|
|
|
|
l_g_total.backward() |
|
|
self.optimizer_g.step() |
|
|
|
|
|
if self.ema_decay > 0: |
|
|
self.model_ema(decay=self.ema_decay) |
|
|
|
|
|
self.log_dict = self.reduce_loss_dict(loss_dict) |
|
|
|