AZIIIIIIIIZ's picture
Upload 1039 files
d670799 verified
# Copyright (c) OpenMMLab. All rights reserved.
import torch
import torch.nn as nn
from mmaction.registry import MODELS
from mmaction.utils import SampleList
from .base import BaseRecognizer
@MODELS.register_module()
class Recognizer2D(BaseRecognizer):
"""2D recognizer model framework."""
def extract_feat(self,
inputs: torch.Tensor,
stage: str = 'neck',
data_samples: SampleList = None,
test_mode: bool = False) -> tuple:
"""Extract features of different stages.
Args:
inputs (Tensor): The input data.
stage (str): Which stage to output the feature.
Defaults to ``neck``.
data_samples (List[:obj:`ActionDataSample`]): Action data
samples, which are only needed in training. Defaults to None.
test_mode: (bool): Whether in test mode. Defaults to False.
Returns:
Tensor: The extracted features.
dict: A dict recording the kwargs for downstream
pipeline. These keys are usually included:
``num_segs``, ``fcn_test``, ``loss_aux``.
"""
# Record the kwargs required by `loss` and `predict`.
loss_predict_kwargs = dict()
num_segs = inputs.shape[1]
loss_predict_kwargs['num_segs'] = num_segs
# [N, num_crops * num_segs, C, H, W] ->
# [N * num_crops * num_segs, C, H, W]
# `num_crops` is calculated by:
# 1) `twice_sample` in `SampleFrames`
# 2) `num_sample_positions` in `DenseSampleFrames`
# 3) `ThreeCrop/TenCrop` in `test_pipeline`
# 4) `num_clips` in `SampleFrames` or its subclass if `clip_len != 1`
inputs = inputs.view((-1, ) + inputs.shape[2:])
def forward_once(batch_imgs):
# Extract features through backbone.
if (hasattr(self.backbone, 'features')
and self.backbone_from == 'torchvision'):
x = self.backbone.features(batch_imgs)
elif self.backbone_from == 'timm':
x = self.backbone.forward_features(batch_imgs)
elif self.backbone_from in ['mmcls', 'mmpretrain']:
x = self.backbone(batch_imgs)
if isinstance(x, tuple):
assert len(x) == 1
x = x[0]
else:
x = self.backbone(batch_imgs)
if self.backbone_from in ['torchvision', 'timm']:
if not self.feature_shape:
# Transformer-based feature shape: B x L x C.
if len(x.shape) == 3:
self.feature_shape = 'NLC'
# Resnet-based feature shape: B x C x Hs x Ws.
elif len(x.shape) == 4:
self.feature_shape = 'NCHW'
if self.feature_shape == 'NHWC':
x = nn.AdaptiveAvgPool2d(1)(x.permute(0, 3, 1,
2)) # B x C x 1 x 1
elif self.feature_shape == 'NCHW':
x = nn.AdaptiveAvgPool2d(1)(x) # B x C x 1 x 1
elif self.feature_shape == 'NLC':
x = nn.AdaptiveAvgPool1d(1)(x.transpose(1, 2)) # B x C x 1
x = x.reshape((x.shape[0], -1)) # B x C
x = x.reshape(x.shape + (1, 1)) # B x C x 1 x 1
return x
# Check settings of `fcn_test`.
fcn_test = False
if test_mode:
if self.test_cfg is not None and self.test_cfg.get(
'fcn_test', False):
fcn_test = True
num_segs = self.test_cfg.get('num_segs',
self.backbone.num_segments)
loss_predict_kwargs['fcn_test'] = fcn_test
# inference with batch size of `max_testing_views` if set
if self.test_cfg is not None and self.test_cfg.get(
'max_testing_views', False):
max_testing_views = self.test_cfg.get('max_testing_views')
assert isinstance(max_testing_views, int)
# backbone specify num_segments
num_segments = self.backbone.get('num_segments')
if num_segments is not None:
assert max_testing_views % num_segments == 0, \
'make sure that max_testing_views is a multiple of ' \
'num_segments, but got {max_testing_views} and '\
'{num_segments}'
total_views = inputs.shape[0]
view_ptr = 0
feats = []
while view_ptr < total_views:
batch_imgs = inputs[view_ptr:view_ptr + max_testing_views]
feat = forward_once(batch_imgs)
if self.with_neck:
feat, _ = self.neck(feat)
feats.append(feat)
view_ptr += max_testing_views
def recursively_cat(feats):
# recursively traverse feats until it's a tensor,
# then concat
out_feats = []
for e_idx, elem in enumerate(feats[0]):
batch_elem = [feat[e_idx] for feat in feats]
if not isinstance(elem, torch.Tensor):
batch_elem = recursively_cat(batch_elem)
else:
batch_elem = torch.cat(batch_elem)
out_feats.append(batch_elem)
return tuple(out_feats)
if isinstance(feats[0], tuple):
x = recursively_cat(feats)
else:
x = torch.cat(feats)
else:
x = forward_once(inputs)
else:
x = forward_once(inputs)
# Return features extracted through backbone.
if stage == 'backbone':
return x, loss_predict_kwargs
loss_aux = dict()
if self.with_neck:
# x is a tuple with multiple feature maps.
x = [
each.reshape((-1, num_segs) +
each.shape[1:]).transpose(1, 2).contiguous()
for each in x
]
x, loss_aux = self.neck(x, data_samples=data_samples)
if not fcn_test:
x = x.squeeze(2)
loss_predict_kwargs['num_segs'] = 1
elif fcn_test:
# full convolution (fcn) testing when no neck
# [N * num_crops * num_segs, C', H', W'] ->
# [N * num_crops, C', num_segs, H', W']
x = x.reshape((-1, num_segs) +
x.shape[1:]).transpose(1, 2).contiguous()
loss_predict_kwargs['loss_aux'] = loss_aux
# Return features extracted through neck.
if stage == 'neck':
return x, loss_predict_kwargs
# Return raw logits through head.
if self.with_cls_head and stage == 'head':
# [N * num_crops, num_classes]
x = self.cls_head(x, **loss_predict_kwargs)
return x, loss_predict_kwargs