| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ |
| this code is base on https://github.com/hikvision-research/opera/blob/main/opera/models/utils/transformer.py |
| """ |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
|
|
| import math |
| import numpy as np |
| import paddle |
| import paddle.nn as nn |
| import paddle.nn.functional as F |
| from paddle import ParamAttr |
|
|
| from ppdet.core.workspace import register |
| from ..layers import MultiHeadAttention, _convert_attention_mask |
| from .utils import _get_clones |
| from ..initializer import linear_init_, normal_, constant_, xavier_uniform_ |
|
|
| __all__ = [ |
| 'PETRTransformer', 'MultiScaleDeformablePoseAttention', |
| 'PETR_TransformerDecoderLayer', 'PETR_TransformerDecoder', |
| 'PETR_DeformableDetrTransformerDecoder', |
| 'PETR_DeformableTransformerDecoder', 'TransformerEncoderLayer', |
| 'TransformerEncoder', 'MSDeformableAttention' |
| ] |
|
|
|
|
| def masked_fill(x, mask, value): |
| y = paddle.full(x.shape, value, x.dtype) |
| return paddle.where(mask, y, x) |
|
|
|
|
| def inverse_sigmoid(x, eps=1e-5): |
| """Inverse function of sigmoid. |
| |
| Args: |
| x (Tensor): The tensor to do the |
| inverse. |
| eps (float): EPS avoid numerical |
| overflow. Defaults 1e-5. |
| Returns: |
| Tensor: The x has passed the inverse |
| function of sigmoid, has same |
| shape with input. |
| """ |
| x = x.clip(min=0, max=1) |
| x1 = x.clip(min=eps) |
| x2 = (1 - x).clip(min=eps) |
| return paddle.log(x1 / x2) |
|
|
|
|
| @register |
| class TransformerEncoderLayer(nn.Layer): |
| __inject__ = ['attn'] |
|
|
| def __init__(self, |
| d_model, |
| attn=None, |
| nhead=8, |
| dim_feedforward=2048, |
| dropout=0.1, |
| activation="relu", |
| attn_dropout=None, |
| act_dropout=None, |
| normalize_before=False): |
| super(TransformerEncoderLayer, self).__init__() |
| attn_dropout = dropout if attn_dropout is None else attn_dropout |
| act_dropout = dropout if act_dropout is None else act_dropout |
| self.normalize_before = normalize_before |
| self.embed_dims = d_model |
|
|
| if attn is None: |
| self.self_attn = MultiHeadAttention(d_model, nhead, attn_dropout) |
| else: |
| self.self_attn = attn |
| |
| self.linear1 = nn.Linear(d_model, dim_feedforward) |
| self.dropout = nn.Dropout(act_dropout, mode="upscale_in_train") |
| self.linear2 = nn.Linear(dim_feedforward, d_model) |
|
|
| self.norm1 = nn.LayerNorm(d_model) |
| self.norm2 = nn.LayerNorm(d_model) |
| self.dropout1 = nn.Dropout(dropout, mode="upscale_in_train") |
| self.dropout2 = nn.Dropout(dropout, mode="upscale_in_train") |
| self.activation = getattr(F, activation) |
| self._reset_parameters() |
|
|
| def _reset_parameters(self): |
| linear_init_(self.linear1) |
| linear_init_(self.linear2) |
|
|
| @staticmethod |
| def with_pos_embed(tensor, pos_embed): |
| return tensor if pos_embed is None else tensor + pos_embed |
|
|
| def forward(self, src, src_mask=None, pos_embed=None, **kwargs): |
| residual = src |
| if self.normalize_before: |
| src = self.norm1(src) |
| q = k = self.with_pos_embed(src, pos_embed) |
| src = self.self_attn(q, k, value=src, attn_mask=src_mask, **kwargs) |
|
|
| src = residual + self.dropout1(src) |
| if not self.normalize_before: |
| src = self.norm1(src) |
|
|
| residual = src |
| if self.normalize_before: |
| src = self.norm2(src) |
| src = self.linear2(self.dropout(self.activation(self.linear1(src)))) |
| src = residual + self.dropout2(src) |
| if not self.normalize_before: |
| src = self.norm2(src) |
| return src |
|
|
|
|
| @register |
| class TransformerEncoder(nn.Layer): |
| __inject__ = ['encoder_layer'] |
|
|
| def __init__(self, encoder_layer, num_layers, norm=None): |
| super(TransformerEncoder, self).__init__() |
| self.layers = _get_clones(encoder_layer, num_layers) |
| self.num_layers = num_layers |
| self.norm = norm |
| self.embed_dims = encoder_layer.embed_dims |
|
|
| def forward(self, src, src_mask=None, pos_embed=None, **kwargs): |
| output = src |
| for layer in self.layers: |
| output = layer( |
| output, src_mask=src_mask, pos_embed=pos_embed, **kwargs) |
|
|
| if self.norm is not None: |
| output = self.norm(output) |
|
|
| return output |
|
|
|
|
| @register |
| class MSDeformableAttention(nn.Layer): |
| def __init__(self, |
| embed_dim=256, |
| num_heads=8, |
| num_levels=4, |
| num_points=4, |
| lr_mult=0.1): |
| """ |
| Multi-Scale Deformable Attention Module |
| """ |
| super(MSDeformableAttention, self).__init__() |
| self.embed_dim = embed_dim |
| self.num_heads = num_heads |
| self.num_levels = num_levels |
| self.num_points = num_points |
| self.total_points = num_heads * num_levels * num_points |
|
|
| self.head_dim = embed_dim // num_heads |
| assert self.head_dim * num_heads == self.embed_dim, "embed_dim must be divisible by num_heads" |
|
|
| self.sampling_offsets = nn.Linear( |
| embed_dim, |
| self.total_points * 2, |
| weight_attr=ParamAttr(learning_rate=lr_mult), |
| bias_attr=ParamAttr(learning_rate=lr_mult)) |
|
|
| self.attention_weights = nn.Linear(embed_dim, self.total_points) |
| self.value_proj = nn.Linear(embed_dim, embed_dim) |
| self.output_proj = nn.Linear(embed_dim, embed_dim) |
| try: |
| |
| print("use deformable_detr_ops in ms_deformable_attn") |
| from deformable_detr_ops import ms_deformable_attn |
| except: |
| |
| from .utils import deformable_attention_core_func as ms_deformable_attn |
| self.ms_deformable_attn_core = ms_deformable_attn |
|
|
| self._reset_parameters() |
|
|
| def _reset_parameters(self): |
| |
| constant_(self.sampling_offsets.weight) |
| thetas = paddle.arange( |
| self.num_heads, |
| dtype=paddle.float32) * (2.0 * math.pi / self.num_heads) |
| grid_init = paddle.stack([thetas.cos(), thetas.sin()], -1) |
| grid_init = grid_init / grid_init.abs().max(-1, keepdim=True) |
| grid_init = grid_init.reshape([self.num_heads, 1, 1, 2]).tile( |
| [1, self.num_levels, self.num_points, 1]) |
| scaling = paddle.arange( |
| 1, self.num_points + 1, |
| dtype=paddle.float32).reshape([1, 1, -1, 1]) |
| grid_init *= scaling |
| self.sampling_offsets.bias.set_value(grid_init.flatten()) |
| |
| constant_(self.attention_weights.weight) |
| constant_(self.attention_weights.bias) |
| |
| xavier_uniform_(self.value_proj.weight) |
| constant_(self.value_proj.bias) |
| xavier_uniform_(self.output_proj.weight) |
| constant_(self.output_proj.bias) |
|
|
| def forward(self, |
| query, |
| key, |
| value, |
| reference_points, |
| value_spatial_shapes, |
| value_level_start_index, |
| attn_mask=None, |
| **kwargs): |
| """ |
| Args: |
| query (Tensor): [bs, query_length, C] |
| reference_points (Tensor): [bs, query_length, n_levels, 2], range in [0, 1], top-left (0,0), |
| bottom-right (1, 1), including padding area |
| value (Tensor): [bs, value_length, C] |
| value_spatial_shapes (Tensor): [n_levels, 2], [(H_0, W_0), (H_1, W_1), ..., (H_{L-1}, W_{L-1})] |
| value_level_start_index (Tensor(int64)): [n_levels], [0, H_0*W_0, H_0*W_0+H_1*W_1, ...] |
| attn_mask (Tensor): [bs, value_length], True for non-padding elements, False for padding elements |
| |
| Returns: |
| output (Tensor): [bs, Length_{query}, C] |
| """ |
| bs, Len_q = query.shape[:2] |
| Len_v = value.shape[1] |
| assert int(value_spatial_shapes.prod(1).sum()) == Len_v |
|
|
| value = self.value_proj(value) |
| if attn_mask is not None: |
| attn_mask = attn_mask.astype(value.dtype).unsqueeze(-1) |
| value *= attn_mask |
| value = value.reshape([bs, Len_v, self.num_heads, self.head_dim]) |
|
|
| sampling_offsets = self.sampling_offsets(query).reshape( |
| [bs, Len_q, self.num_heads, self.num_levels, self.num_points, 2]) |
| attention_weights = self.attention_weights(query).reshape( |
| [bs, Len_q, self.num_heads, self.num_levels * self.num_points]) |
| attention_weights = F.softmax(attention_weights).reshape( |
| [bs, Len_q, self.num_heads, self.num_levels, self.num_points]) |
|
|
| if reference_points.shape[-1] == 2: |
| offset_normalizer = value_spatial_shapes.flip([1]).reshape( |
| [1, 1, 1, self.num_levels, 1, 2]) |
| sampling_locations = reference_points.reshape([ |
| bs, Len_q, 1, self.num_levels, 1, 2 |
| ]) + sampling_offsets / offset_normalizer |
| elif reference_points.shape[-1] == 4: |
| sampling_locations = ( |
| reference_points[:, :, None, :, None, :2] + sampling_offsets / |
| self.num_points * reference_points[:, :, None, :, None, 2:] * |
| 0.5) |
| else: |
| raise ValueError( |
| "Last dim of reference_points must be 2 or 4, but get {} instead.". |
| format(reference_points.shape[-1])) |
|
|
| output = self.ms_deformable_attn_core( |
| value, value_spatial_shapes, value_level_start_index, |
| sampling_locations, attention_weights) |
| output = self.output_proj(output) |
|
|
| return output |
|
|
|
|
| @register |
| class MultiScaleDeformablePoseAttention(nn.Layer): |
| """An attention module used in PETR. `End-to-End Multi-Person |
| Pose Estimation with Transformers`. |
| |
| Args: |
| embed_dims (int): The embedding dimension of Attention. |
| Default: 256. |
| num_heads (int): Parallel attention heads. Default: 8. |
| num_levels (int): The number of feature map used in |
| Attention. Default: 4. |
| num_points (int): The number of sampling points for |
| each query in each head. Default: 17. |
| im2col_step (int): The step used in image_to_column. |
| Default: 64. |
| dropout (float): A Dropout layer on `inp_residual`. |
| Default: 0.1. |
| init_cfg (obj:`mmcv.ConfigDict`): The Config for initialization. |
| Default: None. |
| """ |
|
|
| def __init__(self, |
| embed_dims=256, |
| num_heads=8, |
| num_levels=4, |
| num_points=17, |
| im2col_step=64, |
| dropout=0.1, |
| norm_cfg=None, |
| init_cfg=None, |
| batch_first=False, |
| lr_mult=0.1): |
| super().__init__() |
| if embed_dims % num_heads != 0: |
| raise ValueError(f'embed_dims must be divisible by num_heads, ' |
| f'but got {embed_dims} and {num_heads}') |
| dim_per_head = embed_dims // num_heads |
| self.norm_cfg = norm_cfg |
| self.init_cfg = init_cfg |
| self.dropout = nn.Dropout(dropout) |
| self.batch_first = batch_first |
|
|
| |
| |
| def _is_power_of_2(n): |
| if (not isinstance(n, int)) or (n < 0): |
| raise ValueError( |
| 'invalid input for _is_power_of_2: {} (type: {})'.format( |
| n, type(n))) |
| return (n & (n - 1) == 0) and n != 0 |
|
|
| if not _is_power_of_2(dim_per_head): |
| warnings.warn("You'd better set embed_dims in " |
| 'MultiScaleDeformAttention to make ' |
| 'the dimension of each attention head a power of 2 ' |
| 'which is more efficient in our CUDA implementation.') |
|
|
| self.im2col_step = im2col_step |
| self.embed_dims = embed_dims |
| self.num_levels = num_levels |
| self.num_heads = num_heads |
| self.num_points = num_points |
| self.sampling_offsets = nn.Linear( |
| embed_dims, |
| num_heads * num_levels * num_points * 2, |
| weight_attr=ParamAttr(learning_rate=lr_mult), |
| bias_attr=ParamAttr(learning_rate=lr_mult)) |
| self.attention_weights = nn.Linear(embed_dims, |
| num_heads * num_levels * num_points) |
| self.value_proj = nn.Linear(embed_dims, embed_dims) |
| self.output_proj = nn.Linear(embed_dims, embed_dims) |
|
|
| try: |
| |
| from deformable_detr_ops import ms_deformable_attn |
| except: |
| |
| from .utils import deformable_attention_core_func as ms_deformable_attn |
| self.ms_deformable_attn_core = ms_deformable_attn |
|
|
| self.init_weights() |
|
|
| def init_weights(self): |
| """Default initialization for Parameters of Module.""" |
| constant_(self.sampling_offsets.weight) |
| constant_(self.sampling_offsets.bias) |
| constant_(self.attention_weights.weight) |
| constant_(self.attention_weights.bias) |
| xavier_uniform_(self.value_proj.weight) |
| constant_(self.value_proj.bias) |
| xavier_uniform_(self.output_proj.weight) |
| constant_(self.output_proj.bias) |
|
|
| def forward(self, |
| query, |
| key, |
| value, |
| residual=None, |
| attn_mask=None, |
| reference_points=None, |
| value_spatial_shapes=None, |
| value_level_start_index=None, |
| **kwargs): |
| """Forward Function of MultiScaleDeformAttention. |
| |
| Args: |
| query (Tensor): Query of Transformer with shape |
| (num_query, bs, embed_dims). |
| key (Tensor): The key tensor with shape (num_key, bs, embed_dims). |
| value (Tensor): The value tensor with shape |
| (num_key, bs, embed_dims). |
| residual (Tensor): The tensor used for addition, with the |
| same shape as `x`. Default None. If None, `x` will be used. |
| reference_points (Tensor): The normalized reference points with |
| shape (bs, num_query, num_levels, K*2), all elements is range |
| in [0, 1], top-left (0,0), bottom-right (1, 1), including |
| padding area. |
| attn_mask (Tensor): ByteTensor for `query`, with |
| shape [bs, num_key]. |
| value_spatial_shapes (Tensor): Spatial shape of features in |
| different level. With shape (num_levels, 2), |
| last dimension represent (h, w). |
| value_level_start_index (Tensor): The start index of each level. |
| A tensor has shape (num_levels) and can be represented |
| as [0, h_0*w_0, h_0*w_0+h_1*w_1, ...]. |
| |
| Returns: |
| Tensor: forwarded results with shape [num_query, bs, embed_dims]. |
| """ |
|
|
| if key is None: |
| key = query |
| if value is None: |
| value = key |
|
|
| bs, num_query, _ = query.shape |
| bs, num_key, _ = value.shape |
| assert (value_spatial_shapes[:, 0].numpy() * |
| value_spatial_shapes[:, 1].numpy()).sum() == num_key |
|
|
| value = self.value_proj(value) |
| if attn_mask is not None: |
| |
| value *= attn_mask.unsqueeze(-1) |
| value = value.reshape([bs, num_key, self.num_heads, -1]) |
| sampling_offsets = self.sampling_offsets(query).reshape([ |
| bs, num_query, self.num_heads, self.num_levels, self.num_points, 2 |
| ]) |
| attention_weights = self.attention_weights(query).reshape( |
| [bs, num_query, self.num_heads, self.num_levels * self.num_points]) |
| attention_weights = F.softmax(attention_weights, axis=-1) |
|
|
| attention_weights = attention_weights.reshape( |
| [bs, num_query, self.num_heads, self.num_levels, self.num_points]) |
| if reference_points.shape[-1] == self.num_points * 2: |
| reference_points_reshape = reference_points.reshape( |
| (bs, num_query, self.num_levels, -1, 2)).unsqueeze(2) |
| x1 = reference_points[:, :, :, 0::2].min(axis=-1, keepdim=True) |
| y1 = reference_points[:, :, :, 1::2].min(axis=-1, keepdim=True) |
| x2 = reference_points[:, :, :, 0::2].max(axis=-1, keepdim=True) |
| y2 = reference_points[:, :, :, 1::2].max(axis=-1, keepdim=True) |
| w = paddle.clip(x2 - x1, min=1e-4) |
| h = paddle.clip(y2 - y1, min=1e-4) |
| wh = paddle.concat([w, h], axis=-1)[:, :, None, :, None, :] |
|
|
| sampling_locations = reference_points_reshape \ |
| + sampling_offsets * wh * 0.5 |
| else: |
| raise ValueError( |
| f'Last dim of reference_points must be' |
| f' 2K, but get {reference_points.shape[-1]} instead.') |
|
|
| output = self.ms_deformable_attn_core( |
| value, value_spatial_shapes, value_level_start_index, |
| sampling_locations, attention_weights) |
|
|
| output = self.output_proj(output) |
| return output |
|
|
|
|
| @register |
| class PETR_TransformerDecoderLayer(nn.Layer): |
| __inject__ = ['self_attn', 'cross_attn'] |
|
|
| def __init__(self, |
| d_model, |
| nhead=8, |
| self_attn=None, |
| cross_attn=None, |
| dim_feedforward=2048, |
| dropout=0.1, |
| activation="relu", |
| attn_dropout=None, |
| act_dropout=None, |
| normalize_before=False): |
| super(PETR_TransformerDecoderLayer, self).__init__() |
| attn_dropout = dropout if attn_dropout is None else attn_dropout |
| act_dropout = dropout if act_dropout is None else act_dropout |
| self.normalize_before = normalize_before |
|
|
| if self_attn is None: |
| self.self_attn = MultiHeadAttention(d_model, nhead, attn_dropout) |
| else: |
| self.self_attn = self_attn |
| if cross_attn is None: |
| self.cross_attn = MultiHeadAttention(d_model, nhead, attn_dropout) |
| else: |
| self.cross_attn = cross_attn |
| |
| self.linear1 = nn.Linear(d_model, dim_feedforward) |
| self.dropout = nn.Dropout(act_dropout, mode="upscale_in_train") |
| self.linear2 = nn.Linear(dim_feedforward, d_model) |
|
|
| self.norm1 = nn.LayerNorm(d_model) |
| self.norm2 = nn.LayerNorm(d_model) |
| self.norm3 = nn.LayerNorm(d_model) |
| self.dropout1 = nn.Dropout(dropout, mode="upscale_in_train") |
| self.dropout2 = nn.Dropout(dropout, mode="upscale_in_train") |
| self.dropout3 = nn.Dropout(dropout, mode="upscale_in_train") |
| self.activation = getattr(F, activation) |
| self._reset_parameters() |
|
|
| def _reset_parameters(self): |
| linear_init_(self.linear1) |
| linear_init_(self.linear2) |
|
|
| @staticmethod |
| def with_pos_embed(tensor, pos_embed): |
| return tensor if pos_embed is None else tensor + pos_embed |
|
|
| def forward(self, |
| tgt, |
| memory, |
| tgt_mask=None, |
| memory_mask=None, |
| pos_embed=None, |
| query_pos_embed=None, |
| **kwargs): |
| tgt_mask = _convert_attention_mask(tgt_mask, tgt.dtype) |
|
|
| residual = tgt |
| if self.normalize_before: |
| tgt = self.norm1(tgt) |
| q = k = self.with_pos_embed(tgt, query_pos_embed) |
| tgt = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask) |
| tgt = residual + self.dropout1(tgt) |
| if not self.normalize_before: |
| tgt = self.norm1(tgt) |
|
|
| residual = tgt |
| if self.normalize_before: |
| tgt = self.norm2(tgt) |
| q = self.with_pos_embed(tgt, query_pos_embed) |
| key_tmp = tgt |
| |
| tgt = self.cross_attn( |
| q, key=key_tmp, value=memory, attn_mask=memory_mask, **kwargs) |
| tgt = residual + self.dropout2(tgt) |
| if not self.normalize_before: |
| tgt = self.norm2(tgt) |
|
|
| residual = tgt |
| if self.normalize_before: |
| tgt = self.norm3(tgt) |
| tgt = self.linear2(self.dropout(self.activation(self.linear1(tgt)))) |
| tgt = residual + self.dropout3(tgt) |
| if not self.normalize_before: |
| tgt = self.norm3(tgt) |
| return tgt |
|
|
|
|
| @register |
| class PETR_TransformerDecoder(nn.Layer): |
| """Implements the decoder in PETR transformer. |
| |
| Args: |
| return_intermediate (bool): Whether to return intermediate outputs. |
| coder_norm_cfg (dict): Config of last normalization layer. Default: |
| `LN`. |
| """ |
| __inject__ = ['decoder_layer'] |
|
|
| def __init__(self, |
| decoder_layer, |
| num_layers, |
| norm=None, |
| return_intermediate=False, |
| num_keypoints=17, |
| **kwargs): |
| super(PETR_TransformerDecoder, self).__init__() |
| self.layers = _get_clones(decoder_layer, num_layers) |
| self.num_layers = num_layers |
| self.norm = norm |
| self.return_intermediate = return_intermediate |
| self.num_keypoints = num_keypoints |
|
|
| def forward(self, |
| query, |
| *args, |
| reference_points=None, |
| valid_ratios=None, |
| kpt_branches=None, |
| **kwargs): |
| """Forward function for `TransformerDecoder`. |
| |
| Args: |
| query (Tensor): Input query with shape (num_query, bs, embed_dims). |
| reference_points (Tensor): The reference points of offset, |
| has shape (bs, num_query, K*2). |
| valid_ratios (Tensor): The radios of valid points on the feature |
| map, has shape (bs, num_levels, 2). |
| kpt_branches: (obj:`nn.LayerList`): Used for refining the |
| regression results. Only would be passed when `with_box_refine` |
| is True, otherwise would be passed a `None`. |
| |
| Returns: |
| tuple (Tensor): Results with shape [1, num_query, bs, embed_dims] when |
| return_intermediate is `False`, otherwise it has shape |
| [num_layers, num_query, bs, embed_dims] and |
| [num_layers, bs, num_query, K*2]. |
| """ |
| output = query |
| intermediate = [] |
| intermediate_reference_points = [] |
| for lid, layer in enumerate(self.layers): |
| if reference_points.shape[-1] == self.num_keypoints * 2: |
| reference_points_input = \ |
| reference_points[:, :, None] * \ |
| valid_ratios.tile((1, 1, self.num_keypoints))[:, None] |
| else: |
| assert reference_points.shape[-1] == 2 |
| reference_points_input = reference_points[:, :, None] * \ |
| valid_ratios[:, None] |
| output = layer( |
| output, |
| *args, |
| reference_points=reference_points_input, |
| **kwargs) |
|
|
| if kpt_branches is not None: |
| tmp = kpt_branches[lid](output) |
| if reference_points.shape[-1] == self.num_keypoints * 2: |
| new_reference_points = tmp + inverse_sigmoid( |
| reference_points) |
| new_reference_points = F.sigmoid(new_reference_points) |
| else: |
| raise NotImplementedError |
| reference_points = new_reference_points.detach() |
|
|
| if self.return_intermediate: |
| intermediate.append(output) |
| intermediate_reference_points.append(reference_points) |
|
|
| if self.return_intermediate: |
| return paddle.stack(intermediate), paddle.stack( |
| intermediate_reference_points) |
|
|
| return output, reference_points |
|
|
|
|
| @register |
| class PETR_DeformableTransformerDecoder(nn.Layer): |
| __inject__ = ['decoder_layer'] |
|
|
| def __init__(self, decoder_layer, num_layers, return_intermediate=False): |
| super(PETR_DeformableTransformerDecoder, self).__init__() |
| self.layers = _get_clones(decoder_layer, num_layers) |
| self.num_layers = num_layers |
| self.return_intermediate = return_intermediate |
|
|
| def forward(self, |
| tgt, |
| reference_points, |
| memory, |
| memory_spatial_shapes, |
| memory_mask=None, |
| query_pos_embed=None): |
| output = tgt |
| intermediate = [] |
| for lid, layer in enumerate(self.layers): |
| output = layer(output, reference_points, memory, |
| memory_spatial_shapes, memory_mask, query_pos_embed) |
|
|
| if self.return_intermediate: |
| intermediate.append(output) |
|
|
| if self.return_intermediate: |
| return paddle.stack(intermediate) |
|
|
| return output.unsqueeze(0) |
|
|
|
|
| @register |
| class PETR_DeformableDetrTransformerDecoder(PETR_DeformableTransformerDecoder): |
| """Implements the decoder in DETR transformer. |
| |
| Args: |
| return_intermediate (bool): Whether to return intermediate outputs. |
| coder_norm_cfg (dict): Config of last normalization layer. Default: |
| `LN`. |
| """ |
|
|
| def __init__(self, *args, return_intermediate=False, **kwargs): |
|
|
| super(PETR_DeformableDetrTransformerDecoder, self).__init__(*args, |
| **kwargs) |
| self.return_intermediate = return_intermediate |
|
|
| def forward(self, |
| query, |
| *args, |
| reference_points=None, |
| valid_ratios=None, |
| reg_branches=None, |
| **kwargs): |
| """Forward function for `TransformerDecoder`. |
| |
| Args: |
| query (Tensor): Input query with shape |
| `(num_query, bs, embed_dims)`. |
| reference_points (Tensor): The reference |
| points of offset. has shape |
| (bs, num_query, 4) when as_two_stage, |
| otherwise has shape ((bs, num_query, 2). |
| valid_ratios (Tensor): The radios of valid |
| points on the feature map, has shape |
| (bs, num_levels, 2) |
| reg_branch: (obj:`nn.LayerList`): Used for |
| refining the regression results. Only would |
| be passed when with_box_refine is True, |
| otherwise would be passed a `None`. |
| |
| Returns: |
| Tensor: Results with shape [1, num_query, bs, embed_dims] when |
| return_intermediate is `False`, otherwise it has shape |
| [num_layers, num_query, bs, embed_dims]. |
| """ |
| output = query |
| intermediate = [] |
| intermediate_reference_points = [] |
| for lid, layer in enumerate(self.layers): |
| if reference_points.shape[-1] == 4: |
| reference_points_input = reference_points[:, :, None] * \ |
| paddle.concat([valid_ratios, valid_ratios], -1)[:, None] |
| else: |
| assert reference_points.shape[-1] == 2 |
| reference_points_input = reference_points[:, :, None] * \ |
| valid_ratios[:, None] |
| output = layer( |
| output, |
| *args, |
| reference_points=reference_points_input, |
| **kwargs) |
|
|
| if reg_branches is not None: |
| tmp = reg_branches[lid](output) |
| if reference_points.shape[-1] == 4: |
| new_reference_points = tmp + inverse_sigmoid( |
| reference_points) |
| new_reference_points = F.sigmoid(new_reference_points) |
| else: |
| assert reference_points.shape[-1] == 2 |
| new_reference_points = tmp |
| new_reference_points[..., :2] = tmp[ |
| ..., :2] + inverse_sigmoid(reference_points) |
| new_reference_points = F.sigmoid(new_reference_points) |
| reference_points = new_reference_points.detach() |
|
|
| if self.return_intermediate: |
| intermediate.append(output) |
| intermediate_reference_points.append(reference_points) |
|
|
| if self.return_intermediate: |
| return paddle.stack(intermediate), paddle.stack( |
| intermediate_reference_points) |
|
|
| return output, reference_points |
|
|
|
|
| @register |
| class PETRTransformer(nn.Layer): |
| """Implements the PETR transformer. |
| |
| Args: |
| as_two_stage (bool): Generate query from encoder features. |
| Default: False. |
| num_feature_levels (int): Number of feature maps from FPN: |
| Default: 4. |
| two_stage_num_proposals (int): Number of proposals when set |
| `as_two_stage` as True. Default: 300. |
| """ |
| __inject__ = ["encoder", "decoder", "hm_encoder", "refine_decoder"] |
|
|
| def __init__(self, |
| encoder="", |
| decoder="", |
| hm_encoder="", |
| refine_decoder="", |
| as_two_stage=True, |
| num_feature_levels=4, |
| two_stage_num_proposals=300, |
| num_keypoints=17, |
| **kwargs): |
| super(PETRTransformer, self).__init__(**kwargs) |
| self.as_two_stage = as_two_stage |
| self.num_feature_levels = num_feature_levels |
| self.two_stage_num_proposals = two_stage_num_proposals |
| self.num_keypoints = num_keypoints |
| self.encoder = encoder |
| self.decoder = decoder |
| self.embed_dims = self.encoder.embed_dims |
| self.hm_encoder = hm_encoder |
| self.refine_decoder = refine_decoder |
| self.init_layers() |
| self.init_weights() |
|
|
| def init_layers(self): |
| """Initialize layers of the DeformableDetrTransformer.""" |
| |
| self.level_embeds = paddle.create_parameter( |
| (self.num_feature_levels, self.embed_dims), dtype="float32") |
|
|
| if self.as_two_stage: |
| self.enc_output = nn.Linear(self.embed_dims, self.embed_dims) |
| self.enc_output_norm = nn.LayerNorm(self.embed_dims) |
| self.refine_query_embedding = nn.Embedding(self.num_keypoints, |
| self.embed_dims * 2) |
| else: |
| self.reference_points = nn.Linear(self.embed_dims, |
| 2 * self.num_keypoints) |
|
|
| def init_weights(self): |
| """Initialize the transformer weights.""" |
| for p in self.parameters(): |
| if p.rank() > 1: |
| xavier_uniform_(p) |
| if hasattr(p, 'bias') and p.bias is not None: |
| constant_(p.bais) |
| for m in self.sublayers(): |
| if isinstance(m, MSDeformableAttention): |
| m._reset_parameters() |
| for m in self.sublayers(): |
| if isinstance(m, MultiScaleDeformablePoseAttention): |
| m.init_weights() |
| if not self.as_two_stage: |
| xavier_uniform_(self.reference_points.weight) |
| constant_(self.reference_points.bias) |
| normal_(self.level_embeds) |
| normal_(self.refine_query_embedding.weight) |
|
|
| def gen_encoder_output_proposals(self, memory, memory_padding_mask, |
| spatial_shapes): |
| """Generate proposals from encoded memory. |
| |
| Args: |
| memory (Tensor): The output of encoder, has shape |
| (bs, num_key, embed_dim). num_key is equal the number of points |
| on feature map from all level. |
| memory_padding_mask (Tensor): Padding mask for memory. |
| has shape (bs, num_key). |
| spatial_shapes (Tensor): The shape of all feature maps. |
| has shape (num_level, 2). |
| |
| Returns: |
| tuple: A tuple of feature map and bbox prediction. |
| |
| - output_memory (Tensor): The input of decoder, has shape |
| (bs, num_key, embed_dim). num_key is equal the number of |
| points on feature map from all levels. |
| - output_proposals (Tensor): The normalized proposal |
| after a inverse sigmoid, has shape (bs, num_keys, 4). |
| """ |
|
|
| N, S, C = memory.shape |
| proposals = [] |
| _cur = 0 |
| for lvl, (H, W) in enumerate(spatial_shapes): |
| mask_flatten_ = memory_padding_mask[:, _cur:(_cur + H * W)].reshape( |
| [N, H, W, 1]) |
| valid_H = paddle.sum(mask_flatten_[:, :, 0, 0], 1) |
| valid_W = paddle.sum(mask_flatten_[:, 0, :, 0], 1) |
|
|
| grid_y, grid_x = paddle.meshgrid( |
| paddle.linspace( |
| 0, H - 1, H, dtype="float32"), |
| paddle.linspace( |
| 0, W - 1, W, dtype="float32")) |
| grid = paddle.concat([grid_x.unsqueeze(-1), grid_y.unsqueeze(-1)], |
| -1) |
|
|
| scale = paddle.concat( |
| [valid_W.unsqueeze(-1), |
| valid_H.unsqueeze(-1)], 1).reshape([N, 1, 1, 2]) |
| grid = (grid.unsqueeze(0).expand((N, -1, -1, -1)) + 0.5) / scale |
| proposal = grid.reshape([N, -1, 2]) |
| proposals.append(proposal) |
| _cur += (H * W) |
| output_proposals = paddle.concat(proposals, 1) |
| output_proposals_valid = ((output_proposals > 0.01) & |
| (output_proposals < 0.99)).all( |
| -1, keepdim=True).astype("bool") |
| output_proposals = paddle.log(output_proposals / (1 - output_proposals)) |
| output_proposals = masked_fill( |
| output_proposals, ~memory_padding_mask.astype("bool").unsqueeze(-1), |
| float('inf')) |
| output_proposals = masked_fill(output_proposals, |
| ~output_proposals_valid, float('inf')) |
|
|
| output_memory = memory |
| output_memory = masked_fill( |
| output_memory, ~memory_padding_mask.astype("bool").unsqueeze(-1), |
| float(0)) |
| output_memory = masked_fill(output_memory, ~output_proposals_valid, |
| float(0)) |
| output_memory = self.enc_output_norm(self.enc_output(output_memory)) |
| return output_memory, output_proposals |
|
|
| @staticmethod |
| def get_reference_points(spatial_shapes, valid_ratios): |
| """Get the reference points used in decoder. |
| |
| Args: |
| spatial_shapes (Tensor): The shape of all feature maps, |
| has shape (num_level, 2). |
| valid_ratios (Tensor): The radios of valid points on the |
| feature map, has shape (bs, num_levels, 2). |
| |
| Returns: |
| Tensor: reference points used in decoder, has \ |
| shape (bs, num_keys, num_levels, 2). |
| """ |
| reference_points_list = [] |
| for lvl, (H, W) in enumerate(spatial_shapes): |
| ref_y, ref_x = paddle.meshgrid( |
| paddle.linspace( |
| 0.5, H - 0.5, H, dtype="float32"), |
| paddle.linspace( |
| 0.5, W - 0.5, W, dtype="float32")) |
| ref_y = ref_y.reshape( |
| (-1, ))[None] / (valid_ratios[:, None, lvl, 1] * H) |
| ref_x = ref_x.reshape( |
| (-1, ))[None] / (valid_ratios[:, None, lvl, 0] * W) |
| ref = paddle.stack((ref_x, ref_y), -1) |
| reference_points_list.append(ref) |
| reference_points = paddle.concat(reference_points_list, 1) |
| reference_points = reference_points[:, :, None] * valid_ratios[:, None] |
| return reference_points |
|
|
| def get_valid_ratio(self, mask): |
| """Get the valid radios of feature maps of all level.""" |
| _, H, W = mask.shape |
| valid_H = paddle.sum(mask[:, :, 0].astype('float'), 1) |
| valid_W = paddle.sum(mask[:, 0, :].astype('float'), 1) |
| valid_ratio_h = valid_H.astype('float') / H |
| valid_ratio_w = valid_W.astype('float') / W |
| valid_ratio = paddle.stack([valid_ratio_w, valid_ratio_h], -1) |
| return valid_ratio |
|
|
| def get_proposal_pos_embed(self, |
| proposals, |
| num_pos_feats=128, |
| temperature=10000): |
| """Get the position embedding of proposal.""" |
| scale = 2 * math.pi |
| dim_t = paddle.arange(num_pos_feats, dtype="float32") |
| dim_t = temperature**(2 * (dim_t // 2) / num_pos_feats) |
| |
| proposals = F.sigmoid(proposals) * scale |
| |
| pos = proposals[:, :, :, None] / dim_t |
| |
| pos = paddle.stack( |
| (pos[:, :, :, 0::2].sin(), pos[:, :, :, 1::2].cos()), |
| axis=4).flatten(2) |
| return pos |
|
|
| def forward(self, |
| mlvl_feats, |
| mlvl_masks, |
| query_embed, |
| mlvl_pos_embeds, |
| kpt_branches=None, |
| cls_branches=None): |
| """Forward function for `Transformer`. |
| |
| Args: |
| mlvl_feats (list(Tensor)): Input queries from different level. |
| Each element has shape [bs, embed_dims, h, w]. |
| mlvl_masks (list(Tensor)): The key_padding_mask from different |
| level used for encoder and decoder, each element has shape |
| [bs, h, w]. |
| query_embed (Tensor): The query embedding for decoder, |
| with shape [num_query, c]. |
| mlvl_pos_embeds (list(Tensor)): The positional encoding |
| of feats from different level, has the shape |
| [bs, embed_dims, h, w]. |
| kpt_branches (obj:`nn.LayerList`): Keypoint Regression heads for |
| feature maps from each decoder layer. Only would be passed when |
| `with_box_refine` is Ture. Default to None. |
| cls_branches (obj:`nn.LayerList`): Classification heads for |
| feature maps from each decoder layer. Only would be passed when |
| `as_two_stage` is Ture. Default to None. |
| |
| Returns: |
| tuple[Tensor]: results of decoder containing the following tensor. |
| |
| - inter_states: Outputs from decoder. If |
| `return_intermediate_dec` is True output has shape \ |
| (num_dec_layers, bs, num_query, embed_dims), else has \ |
| shape (1, bs, num_query, embed_dims). |
| - init_reference_out: The initial value of reference \ |
| points, has shape (bs, num_queries, 4). |
| - inter_references_out: The internal value of reference \ |
| points in decoder, has shape \ |
| (num_dec_layers, bs,num_query, embed_dims) |
| - enc_outputs_class: The classification score of proposals \ |
| generated from encoder's feature maps, has shape \ |
| (batch, h*w, num_classes). \ |
| Only would be returned when `as_two_stage` is True, \ |
| otherwise None. |
| - enc_outputs_kpt_unact: The regression results generated from \ |
| encoder's feature maps., has shape (batch, h*w, K*2). |
| Only would be returned when `as_two_stage` is True, \ |
| otherwise None. |
| """ |
| assert self.as_two_stage or query_embed is not None |
|
|
| feat_flatten = [] |
| mask_flatten = [] |
| lvl_pos_embed_flatten = [] |
| spatial_shapes = [] |
| for lvl, (feat, mask, pos_embed |
| ) in enumerate(zip(mlvl_feats, mlvl_masks, mlvl_pos_embeds)): |
| bs, c, h, w = feat.shape |
| spatial_shape = (h, w) |
| spatial_shapes.append(spatial_shape) |
| feat = feat.flatten(2).transpose((0, 2, 1)) |
| mask = mask.flatten(1) |
| pos_embed = pos_embed.flatten(2).transpose((0, 2, 1)) |
| lvl_pos_embed = pos_embed + self.level_embeds[lvl].reshape( |
| [1, 1, -1]) |
| lvl_pos_embed_flatten.append(lvl_pos_embed) |
| feat_flatten.append(feat) |
| mask_flatten.append(mask) |
| feat_flatten = paddle.concat(feat_flatten, 1) |
| mask_flatten = paddle.concat(mask_flatten, 1) |
| lvl_pos_embed_flatten = paddle.concat(lvl_pos_embed_flatten, 1) |
| spatial_shapes_cumsum = paddle.to_tensor( |
| np.array(spatial_shapes).prod(1).cumsum(0)) |
| spatial_shapes = paddle.to_tensor(spatial_shapes, dtype="int64") |
| level_start_index = paddle.concat((paddle.zeros( |
| (1, ), dtype=spatial_shapes.dtype), spatial_shapes_cumsum[:-1])) |
| valid_ratios = paddle.stack( |
| [self.get_valid_ratio(m) for m in mlvl_masks], 1) |
|
|
| reference_points = \ |
| self.get_reference_points(spatial_shapes, |
| valid_ratios) |
|
|
| memory = self.encoder( |
| src=feat_flatten, |
| pos_embed=lvl_pos_embed_flatten, |
| src_mask=mask_flatten, |
| value_spatial_shapes=spatial_shapes, |
| reference_points=reference_points, |
| value_level_start_index=level_start_index, |
| valid_ratios=valid_ratios) |
|
|
| bs, _, c = memory.shape |
|
|
| hm_proto = None |
| if self.training: |
| hm_memory = paddle.slice( |
| memory, |
| starts=level_start_index[0], |
| ends=level_start_index[1], |
| axes=[1]) |
| hm_pos_embed = paddle.slice( |
| lvl_pos_embed_flatten, |
| starts=level_start_index[0], |
| ends=level_start_index[1], |
| axes=[1]) |
| hm_mask = paddle.slice( |
| mask_flatten, |
| starts=level_start_index[0], |
| ends=level_start_index[1], |
| axes=[1]) |
| hm_reference_points = paddle.slice( |
| reference_points, |
| starts=level_start_index[0], |
| ends=level_start_index[1], |
| axes=[1])[:, :, :1, :] |
|
|
| |
| hm_memory = self.hm_encoder( |
| src=hm_memory, |
| pose_embed=hm_pos_embed, |
| src_mask=hm_mask, |
| value_spatial_shapes=spatial_shapes[[0]], |
| reference_points=hm_reference_points, |
| value_level_start_index=level_start_index[0], |
| valid_ratios=valid_ratios[:, :1, :]) |
| hm_memory = hm_memory.reshape((bs, spatial_shapes[0, 0], |
| spatial_shapes[0, 1], -1)) |
| hm_proto = (hm_memory, mlvl_masks[0]) |
|
|
| if self.as_two_stage: |
| output_memory, output_proposals = \ |
| self.gen_encoder_output_proposals( |
| memory, mask_flatten, spatial_shapes) |
| enc_outputs_class = cls_branches[self.decoder.num_layers]( |
| output_memory) |
| enc_outputs_kpt_unact = \ |
| kpt_branches[self.decoder.num_layers](output_memory) |
| enc_outputs_kpt_unact[..., 0::2] += output_proposals[..., 0:1] |
| enc_outputs_kpt_unact[..., 1::2] += output_proposals[..., 1:2] |
|
|
| topk = self.two_stage_num_proposals |
| topk_proposals = paddle.topk( |
| enc_outputs_class[..., 0], topk, axis=1)[1].unsqueeze(-1) |
|
|
| |
| topk_kpts_unact = paddle.take_along_axis(enc_outputs_kpt_unact, |
| topk_proposals, 1) |
| topk_kpts_unact = topk_kpts_unact.detach() |
|
|
| reference_points = F.sigmoid(topk_kpts_unact) |
| init_reference_out = reference_points |
| |
| query_pos, query = paddle.split( |
| query_embed, query_embed.shape[1] // c, axis=1) |
| query_pos = query_pos.unsqueeze(0).expand((bs, -1, -1)) |
| query = query.unsqueeze(0).expand((bs, -1, -1)) |
| else: |
| query_pos, query = paddle.split( |
| query_embed, query_embed.shape[1] // c, axis=1) |
| query_pos = query_pos.unsqueeze(0).expand((bs, -1, -1)) |
| query = query.unsqueeze(0).expand((bs, -1, -1)) |
| reference_points = F.sigmoid(self.reference_points(query_pos)) |
| init_reference_out = reference_points |
|
|
| |
| inter_states, inter_references = self.decoder( |
| query=query, |
| memory=memory, |
| query_pos_embed=query_pos, |
| memory_mask=mask_flatten, |
| reference_points=reference_points, |
| value_spatial_shapes=spatial_shapes, |
| value_level_start_index=level_start_index, |
| valid_ratios=valid_ratios, |
| kpt_branches=kpt_branches) |
|
|
| inter_references_out = inter_references |
| if self.as_two_stage: |
| return inter_states, init_reference_out, \ |
| inter_references_out, enc_outputs_class, \ |
| enc_outputs_kpt_unact, hm_proto, memory |
| return inter_states, init_reference_out, \ |
| inter_references_out, None, None, None, None, None, hm_proto |
|
|
| def forward_refine(self, |
| mlvl_masks, |
| memory, |
| reference_points_pose, |
| img_inds, |
| kpt_branches=None, |
| **kwargs): |
| mask_flatten = [] |
| spatial_shapes = [] |
| for lvl, mask in enumerate(mlvl_masks): |
| bs, h, w = mask.shape |
| spatial_shape = (h, w) |
| spatial_shapes.append(spatial_shape) |
| mask = mask.flatten(1) |
| mask_flatten.append(mask) |
| mask_flatten = paddle.concat(mask_flatten, 1) |
| spatial_shapes_cumsum = paddle.to_tensor( |
| np.array( |
| spatial_shapes, dtype='int64').prod(1).cumsum(0)) |
| spatial_shapes = paddle.to_tensor(spatial_shapes, dtype="int64") |
| level_start_index = paddle.concat((paddle.zeros( |
| (1, ), dtype=spatial_shapes.dtype), spatial_shapes_cumsum[:-1])) |
| valid_ratios = paddle.stack( |
| [self.get_valid_ratio(m) for m in mlvl_masks], 1) |
|
|
| |
| |
| refine_query_embedding = self.refine_query_embedding.weight |
| query_pos, query = paddle.split(refine_query_embedding, 2, axis=1) |
| pos_num = reference_points_pose.shape[0] |
| query_pos = query_pos.unsqueeze(0).expand((pos_num, -1, -1)) |
| query = query.unsqueeze(0).expand((pos_num, -1, -1)) |
| reference_points = reference_points_pose.reshape( |
| (pos_num, reference_points_pose.shape[1] // 2, 2)) |
| pos_memory = memory[img_inds] |
| mask_flatten = mask_flatten[img_inds] |
| valid_ratios = valid_ratios[img_inds] |
| if img_inds.size == 1: |
| pos_memory = pos_memory.unsqueeze(0) |
| mask_flatten = mask_flatten.unsqueeze(0) |
| valid_ratios = valid_ratios.unsqueeze(0) |
| inter_states, inter_references = self.refine_decoder( |
| query=query, |
| memory=pos_memory, |
| query_pos_embed=query_pos, |
| memory_mask=mask_flatten, |
| reference_points=reference_points, |
| value_spatial_shapes=spatial_shapes, |
| value_level_start_index=level_start_index, |
| valid_ratios=valid_ratios, |
| reg_branches=kpt_branches, |
| **kwargs) |
| |
|
|
| init_reference_out = reference_points |
| return inter_states, init_reference_out, inter_references |
|
|