| |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.autograd import Function |
| from torch.nn.modules.module import Module |
|
|
| from ..cnn import UPSAMPLE_LAYERS, normal_init, xavier_init |
| from ..utils import ext_loader |
|
|
| ext_module = ext_loader.load_ext('_ext', [ |
| 'carafe_naive_forward', 'carafe_naive_backward', 'carafe_forward', |
| 'carafe_backward' |
| ]) |
|
|
|
|
| class CARAFENaiveFunction(Function): |
|
|
| @staticmethod |
| def symbolic(g, features, masks, kernel_size, group_size, scale_factor): |
| return g.op( |
| 'mmcv::MMCVCARAFENaive', |
| features, |
| masks, |
| kernel_size_i=kernel_size, |
| group_size_i=group_size, |
| scale_factor_f=scale_factor) |
|
|
| @staticmethod |
| def forward(ctx, features, masks, kernel_size, group_size, scale_factor): |
| assert scale_factor >= 1 |
| assert masks.size(1) == kernel_size * kernel_size * group_size |
| assert masks.size(-1) == features.size(-1) * scale_factor |
| assert masks.size(-2) == features.size(-2) * scale_factor |
| assert features.size(1) % group_size == 0 |
| assert (kernel_size - 1) % 2 == 0 and kernel_size >= 1 |
| ctx.kernel_size = kernel_size |
| ctx.group_size = group_size |
| ctx.scale_factor = scale_factor |
| ctx.feature_size = features.size() |
| ctx.mask_size = masks.size() |
|
|
| n, c, h, w = features.size() |
| output = features.new_zeros((n, c, h * scale_factor, w * scale_factor)) |
| ext_module.carafe_naive_forward( |
| features, |
| masks, |
| output, |
| kernel_size=kernel_size, |
| group_size=group_size, |
| scale_factor=scale_factor) |
|
|
| if features.requires_grad or masks.requires_grad: |
| ctx.save_for_backward(features, masks) |
| return output |
|
|
| @staticmethod |
| def backward(ctx, grad_output): |
| assert grad_output.is_cuda |
|
|
| features, masks = ctx.saved_tensors |
| kernel_size = ctx.kernel_size |
| group_size = ctx.group_size |
| scale_factor = ctx.scale_factor |
|
|
| grad_input = torch.zeros_like(features) |
| grad_masks = torch.zeros_like(masks) |
| ext_module.carafe_naive_backward( |
| grad_output.contiguous(), |
| features, |
| masks, |
| grad_input, |
| grad_masks, |
| kernel_size=kernel_size, |
| group_size=group_size, |
| scale_factor=scale_factor) |
|
|
| return grad_input, grad_masks, None, None, None |
|
|
|
|
| carafe_naive = CARAFENaiveFunction.apply |
|
|
|
|
| class CARAFENaive(Module): |
|
|
| def __init__(self, kernel_size, group_size, scale_factor): |
| super(CARAFENaive, self).__init__() |
|
|
| assert isinstance(kernel_size, int) and isinstance( |
| group_size, int) and isinstance(scale_factor, int) |
| self.kernel_size = kernel_size |
| self.group_size = group_size |
| self.scale_factor = scale_factor |
|
|
| def forward(self, features, masks): |
| return carafe_naive(features, masks, self.kernel_size, self.group_size, |
| self.scale_factor) |
|
|
|
|
| class CARAFEFunction(Function): |
|
|
| @staticmethod |
| def symbolic(g, features, masks, kernel_size, group_size, scale_factor): |
| return g.op( |
| 'mmcv::MMCVCARAFE', |
| features, |
| masks, |
| kernel_size_i=kernel_size, |
| group_size_i=group_size, |
| scale_factor_f=scale_factor) |
|
|
| @staticmethod |
| def forward(ctx, features, masks, kernel_size, group_size, scale_factor): |
| assert scale_factor >= 1 |
| assert masks.size(1) == kernel_size * kernel_size * group_size |
| assert masks.size(-1) == features.size(-1) * scale_factor |
| assert masks.size(-2) == features.size(-2) * scale_factor |
| assert features.size(1) % group_size == 0 |
| assert (kernel_size - 1) % 2 == 0 and kernel_size >= 1 |
| ctx.kernel_size = kernel_size |
| ctx.group_size = group_size |
| ctx.scale_factor = scale_factor |
| ctx.feature_size = features.size() |
| ctx.mask_size = masks.size() |
|
|
| n, c, h, w = features.size() |
| output = features.new_zeros((n, c, h * scale_factor, w * scale_factor)) |
| routput = features.new_zeros(output.size(), requires_grad=False) |
| rfeatures = features.new_zeros(features.size(), requires_grad=False) |
| rmasks = masks.new_zeros(masks.size(), requires_grad=False) |
| ext_module.carafe_forward( |
| features, |
| masks, |
| rfeatures, |
| routput, |
| rmasks, |
| output, |
| kernel_size=kernel_size, |
| group_size=group_size, |
| scale_factor=scale_factor) |
|
|
| if features.requires_grad or masks.requires_grad: |
| ctx.save_for_backward(features, masks, rfeatures) |
| return output |
|
|
| @staticmethod |
| def backward(ctx, grad_output): |
| assert grad_output.is_cuda |
|
|
| features, masks, rfeatures = ctx.saved_tensors |
| kernel_size = ctx.kernel_size |
| group_size = ctx.group_size |
| scale_factor = ctx.scale_factor |
|
|
| rgrad_output = torch.zeros_like(grad_output, requires_grad=False) |
| rgrad_input_hs = torch.zeros_like(grad_output, requires_grad=False) |
| rgrad_input = torch.zeros_like(features, requires_grad=False) |
| rgrad_masks = torch.zeros_like(masks, requires_grad=False) |
| grad_input = torch.zeros_like(features, requires_grad=False) |
| grad_masks = torch.zeros_like(masks, requires_grad=False) |
| ext_module.carafe_backward( |
| grad_output.contiguous(), |
| rfeatures, |
| masks, |
| rgrad_output, |
| rgrad_input_hs, |
| rgrad_input, |
| rgrad_masks, |
| grad_input, |
| grad_masks, |
| kernel_size=kernel_size, |
| group_size=group_size, |
| scale_factor=scale_factor) |
| return grad_input, grad_masks, None, None, None |
|
|
|
|
| carafe = CARAFEFunction.apply |
|
|
|
|
| class CARAFE(Module): |
| """ CARAFE: Content-Aware ReAssembly of FEatures |
| |
| Please refer to https://arxiv.org/abs/1905.02188 for more details. |
| |
| Args: |
| kernel_size (int): reassemble kernel size |
| group_size (int): reassemble group size |
| scale_factor (int): upsample ratio |
| |
| Returns: |
| upsampled feature map |
| """ |
|
|
| def __init__(self, kernel_size, group_size, scale_factor): |
| super(CARAFE, self).__init__() |
|
|
| assert isinstance(kernel_size, int) and isinstance( |
| group_size, int) and isinstance(scale_factor, int) |
| self.kernel_size = kernel_size |
| self.group_size = group_size |
| self.scale_factor = scale_factor |
|
|
| def forward(self, features, masks): |
| return carafe(features, masks, self.kernel_size, self.group_size, |
| self.scale_factor) |
|
|
|
|
| @UPSAMPLE_LAYERS.register_module(name='carafe') |
| class CARAFEPack(nn.Module): |
| """A unified package of CARAFE upsampler that contains: 1) channel |
| compressor 2) content encoder 3) CARAFE op. |
| |
| Official implementation of ICCV 2019 paper |
| CARAFE: Content-Aware ReAssembly of FEatures |
| Please refer to https://arxiv.org/abs/1905.02188 for more details. |
| |
| Args: |
| channels (int): input feature channels |
| scale_factor (int): upsample ratio |
| up_kernel (int): kernel size of CARAFE op |
| up_group (int): group size of CARAFE op |
| encoder_kernel (int): kernel size of content encoder |
| encoder_dilation (int): dilation of content encoder |
| compressed_channels (int): output channels of channels compressor |
| |
| Returns: |
| upsampled feature map |
| """ |
|
|
| def __init__(self, |
| channels, |
| scale_factor, |
| up_kernel=5, |
| up_group=1, |
| encoder_kernel=3, |
| encoder_dilation=1, |
| compressed_channels=64): |
| super(CARAFEPack, self).__init__() |
| self.channels = channels |
| self.scale_factor = scale_factor |
| self.up_kernel = up_kernel |
| self.up_group = up_group |
| self.encoder_kernel = encoder_kernel |
| self.encoder_dilation = encoder_dilation |
| self.compressed_channels = compressed_channels |
| self.channel_compressor = nn.Conv2d(channels, self.compressed_channels, |
| 1) |
| self.content_encoder = nn.Conv2d( |
| self.compressed_channels, |
| self.up_kernel * self.up_kernel * self.up_group * |
| self.scale_factor * self.scale_factor, |
| self.encoder_kernel, |
| padding=int((self.encoder_kernel - 1) * self.encoder_dilation / 2), |
| dilation=self.encoder_dilation, |
| groups=1) |
| self.init_weights() |
|
|
| def init_weights(self): |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| xavier_init(m, distribution='uniform') |
| normal_init(self.content_encoder, std=0.001) |
|
|
| def kernel_normalizer(self, mask): |
| mask = F.pixel_shuffle(mask, self.scale_factor) |
| n, mask_c, h, w = mask.size() |
| |
| |
| mask_channel = int(mask_c / float(self.up_kernel**2)) |
| mask = mask.view(n, mask_channel, -1, h, w) |
|
|
| mask = F.softmax(mask, dim=2, dtype=mask.dtype) |
| mask = mask.view(n, mask_c, h, w).contiguous() |
|
|
| return mask |
|
|
| def feature_reassemble(self, x, mask): |
| x = carafe(x, mask, self.up_kernel, self.up_group, self.scale_factor) |
| return x |
|
|
| def forward(self, x): |
| compressed_x = self.channel_compressor(x) |
| mask = self.content_encoder(compressed_x) |
| mask = self.kernel_normalizer(mask) |
|
|
| x = self.feature_reassemble(x, mask) |
| return x |
|
|