|
|
|
|
|
import copy
|
|
|
import os.path as osp
|
|
|
import platform
|
|
|
|
|
|
import mmcv
|
|
|
import numpy as np
|
|
|
import pytest
|
|
|
import torch
|
|
|
from mmengine.testing import assert_dict_has_keys
|
|
|
from numpy.testing import assert_array_almost_equal
|
|
|
|
|
|
from mmaction.datasets.transforms import (DecordDecode, DecordInit,
|
|
|
GenerateLocalizationLabels,
|
|
|
LoadAudioFeature, LoadHVULabel,
|
|
|
LoadLocalizationFeature,
|
|
|
LoadProposals, LoadRGBFromFile,
|
|
|
OpenCVDecode, OpenCVInit, PIMSDecode,
|
|
|
PIMSInit, PyAVDecode,
|
|
|
PyAVDecodeMotionVector, PyAVInit)
|
|
|
|
|
|
from mmaction.datasets.transforms import RawFrameDecode
|
|
|
|
|
|
|
|
|
class BaseTestLoading:
|
|
|
|
|
|
@classmethod
|
|
|
def setup_class(cls):
|
|
|
cls.data_prefix = osp.normpath(
|
|
|
osp.join(osp.dirname(__file__), '../../data'))
|
|
|
cls.img_path = osp.join(cls.data_prefix, 'test.jpg')
|
|
|
cls.video_path = osp.join(cls.data_prefix, 'test.mp4')
|
|
|
cls.wav_path = osp.join(cls.data_prefix, 'test.wav')
|
|
|
cls.audio_spec_path = osp.join(cls.data_prefix, 'test.npy')
|
|
|
cls.img_dir = osp.join(cls.data_prefix, 'imgs')
|
|
|
cls.raw_feature_dir = osp.join(cls.data_prefix, 'activitynet_features')
|
|
|
cls.bsp_feature_dir = osp.join(cls.data_prefix, 'bsp_features')
|
|
|
cls.proposals_dir = osp.join(cls.data_prefix, 'proposals')
|
|
|
|
|
|
cls.total_frames = 5
|
|
|
cls.filename_tmpl = 'img_{:05}.jpg'
|
|
|
cls.flow_filename_tmpl = '{}_{:05d}.jpg'
|
|
|
video_total_frames = len(mmcv.VideoReader(cls.video_path))
|
|
|
cls.audio_total_frames = video_total_frames
|
|
|
|
|
|
cls.video_results = dict(
|
|
|
filename=cls.video_path,
|
|
|
label=1,
|
|
|
total_frames=video_total_frames,
|
|
|
start_index=0)
|
|
|
cls.audio_results = dict(
|
|
|
audios=np.random.randn(1280, ),
|
|
|
audio_path=cls.wav_path,
|
|
|
total_frames=cls.audio_total_frames,
|
|
|
label=1,
|
|
|
start_index=0)
|
|
|
cls.audio_feature_results = dict(
|
|
|
audios=np.random.randn(128, 80),
|
|
|
audio_path=cls.audio_spec_path,
|
|
|
total_frames=cls.audio_total_frames,
|
|
|
label=1,
|
|
|
start_index=0)
|
|
|
cls.frame_results = dict(
|
|
|
frame_dir=cls.img_dir,
|
|
|
total_frames=cls.total_frames,
|
|
|
filename_tmpl=cls.filename_tmpl,
|
|
|
start_index=1,
|
|
|
modality='RGB',
|
|
|
offset=0,
|
|
|
label=1)
|
|
|
cls.flow_frame_results = dict(
|
|
|
frame_dir=cls.img_dir,
|
|
|
total_frames=cls.total_frames,
|
|
|
filename_tmpl=cls.flow_filename_tmpl,
|
|
|
modality='Flow',
|
|
|
offset=0,
|
|
|
label=1)
|
|
|
cls.action_results = dict(
|
|
|
video_name='v_test1',
|
|
|
data_prefix=cls.raw_feature_dir,
|
|
|
temporal_scale=5,
|
|
|
boundary_ratio=0.1,
|
|
|
duration_second=10,
|
|
|
duration_frame=10,
|
|
|
feature_frame=8,
|
|
|
annotations=[{
|
|
|
'segment': [3.0, 5.0],
|
|
|
'label': 'Rock climbing'
|
|
|
}])
|
|
|
cls.action_results['feature_path'] = osp.join(cls.raw_feature_dir,
|
|
|
'v_test1.csv')
|
|
|
|
|
|
cls.ava_results = dict(
|
|
|
fps=30, timestamp=902, timestamp_start=840, shot_info=(0, 27000))
|
|
|
|
|
|
cls.hvu_label_example1 = dict(
|
|
|
categories=['action', 'object', 'scene', 'concept'],
|
|
|
category_nums=[2, 5, 3, 2],
|
|
|
label=dict(action=[0], object=[2, 3], scene=[0, 1]))
|
|
|
cls.hvu_label_example2 = dict(
|
|
|
categories=['action', 'object', 'scene', 'concept'],
|
|
|
category_nums=[2, 5, 3, 2],
|
|
|
label=dict(action=[1], scene=[1, 2], concept=[1]))
|
|
|
|
|
|
|
|
|
class TestDecode(BaseTestLoading):
|
|
|
|
|
|
def test_pyav_init(self):
|
|
|
target_keys = ['video_reader', 'total_frames']
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
assert assert_dict_has_keys(pyav_init_result, target_keys)
|
|
|
assert pyav_init_result['total_frames'] == 300
|
|
|
assert repr(
|
|
|
pyav_init) == f'{pyav_init.__class__.__name__}(io_backend=disk)'
|
|
|
|
|
|
def test_pyav_decode(self):
|
|
|
target_keys = ['frame_inds', 'imgs', 'original_shape']
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames,
|
|
|
2)[:, np.newaxis]
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
video_result['video_reader'] = pyav_init_result['video_reader']
|
|
|
|
|
|
pyav_decode = PyAVDecode()
|
|
|
pyav_decode_result = pyav_decode(video_result)
|
|
|
assert assert_dict_has_keys(pyav_decode_result, target_keys)
|
|
|
assert pyav_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pyav_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
assert repr(pyav_decode) == (f'{pyav_decode.__class__.__name__}('
|
|
|
f'multi_thread={False}, mode=accurate)')
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames, 5)
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
video_result['video_reader'] = pyav_init_result['video_reader']
|
|
|
|
|
|
pyav_decode = PyAVDecode()
|
|
|
pyav_decode_result = pyav_decode(video_result)
|
|
|
assert assert_dict_has_keys(pyav_decode_result, target_keys)
|
|
|
assert pyav_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pyav_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames, 5)
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
video_result['video_reader'] = pyav_init_result['video_reader']
|
|
|
|
|
|
pyav_decode = PyAVDecode(multi_thread=True)
|
|
|
pyav_decode_result = pyav_decode(video_result)
|
|
|
assert assert_dict_has_keys(pyav_decode_result, target_keys)
|
|
|
assert pyav_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pyav_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
assert repr(pyav_decode) == (f'{pyav_decode.__class__.__name__}('
|
|
|
f'multi_thread={True}, mode=accurate)')
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(1, self.total_frames,
|
|
|
2)[:, np.newaxis]
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
video_result['video_reader'] = pyav_init_result['video_reader']
|
|
|
|
|
|
pyav_decode = PyAVDecode()
|
|
|
pyav_decode_result = pyav_decode(video_result)
|
|
|
assert assert_dict_has_keys(pyav_decode_result, target_keys)
|
|
|
assert pyav_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pyav_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(1, self.total_frames, 5)
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
video_result['video_reader'] = pyav_init_result['video_reader']
|
|
|
|
|
|
pyav_decode = PyAVDecode()
|
|
|
pyav_decode_result = pyav_decode(video_result)
|
|
|
assert assert_dict_has_keys(pyav_decode_result, target_keys)
|
|
|
assert pyav_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pyav_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(1, self.total_frames, 5)
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
video_result['video_reader'] = pyav_init_result['video_reader']
|
|
|
|
|
|
pyav_decode = PyAVDecode(multi_thread=True)
|
|
|
pyav_decode_result = pyav_decode(video_result)
|
|
|
assert assert_dict_has_keys(pyav_decode_result, target_keys)
|
|
|
assert pyav_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pyav_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(1, self.total_frames, 5)
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav_init_result = pyav_init(video_result)
|
|
|
video_result['video_reader'] = pyav_init_result['video_reader']
|
|
|
|
|
|
pyav_decode = PyAVDecode(multi_thread=True, mode='efficient')
|
|
|
pyav_decode_result = pyav_decode(video_result)
|
|
|
assert assert_dict_has_keys(pyav_decode_result, target_keys)
|
|
|
assert pyav_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pyav_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
assert pyav_decode_result['video_reader'] is None
|
|
|
|
|
|
assert (repr(pyav_decode) == pyav_decode.__class__.__name__ +
|
|
|
f'(multi_thread={True}, mode=efficient)')
|
|
|
|
|
|
def test_pims_init(self):
|
|
|
target_keys = ['video_reader', 'total_frames']
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
pims_init = PIMSInit()
|
|
|
pims_init_result = pims_init(video_result)
|
|
|
assert assert_dict_has_keys(pims_init_result, target_keys)
|
|
|
assert pims_init_result['total_frames'] == 300
|
|
|
|
|
|
pims_init = PIMSInit(mode='efficient')
|
|
|
pims_init_result = pims_init(video_result)
|
|
|
assert assert_dict_has_keys(pims_init_result, target_keys)
|
|
|
assert pims_init_result['total_frames'] == 300
|
|
|
|
|
|
assert repr(pims_init) == (f'{pims_init.__class__.__name__}'
|
|
|
f'(io_backend=disk, mode=efficient)')
|
|
|
|
|
|
def test_pims_decode(self):
|
|
|
target_keys = ['frame_inds', 'imgs', 'original_shape']
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames,
|
|
|
2)[:, np.newaxis]
|
|
|
pims_init = PIMSInit()
|
|
|
pims_init_result = pims_init(video_result)
|
|
|
|
|
|
pims_decode = PIMSDecode()
|
|
|
pims_decode_result = pims_decode(pims_init_result)
|
|
|
assert assert_dict_has_keys(pims_decode_result, target_keys)
|
|
|
assert pims_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(pims_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
def test_decord_init(self):
|
|
|
target_keys = ['video_reader', 'total_frames', 'avg_fps']
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
decord_init = DecordInit()
|
|
|
decord_init_result = decord_init(video_result)
|
|
|
assert assert_dict_has_keys(decord_init_result, target_keys)
|
|
|
assert decord_init_result['total_frames'] == len(
|
|
|
decord_init_result['video_reader'])
|
|
|
assert decord_init_result['avg_fps'] == 30
|
|
|
|
|
|
assert repr(decord_init) == (f'{decord_init.__class__.__name__}('
|
|
|
f'io_backend=disk, '
|
|
|
f'num_threads=1)')
|
|
|
|
|
|
def test_decord_decode(self):
|
|
|
target_keys = ['frame_inds', 'imgs', 'original_shape']
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames,
|
|
|
3)[:, np.newaxis]
|
|
|
decord_init = DecordInit()
|
|
|
decord_init_result = decord_init(video_result)
|
|
|
video_result['video_reader'] = decord_init_result['video_reader']
|
|
|
|
|
|
decord_decode = DecordDecode()
|
|
|
decord_decode_result = decord_decode(video_result)
|
|
|
assert assert_dict_has_keys(decord_decode_result, target_keys)
|
|
|
assert decord_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(decord_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames, 3)
|
|
|
decord_init = DecordInit()
|
|
|
decord_init_result = decord_init(video_result)
|
|
|
video_result['video_reader'] = decord_init_result['video_reader']
|
|
|
|
|
|
decord_decode = DecordDecode()
|
|
|
decord_decode_result = decord_decode(video_result)
|
|
|
assert assert_dict_has_keys(decord_decode_result, target_keys)
|
|
|
assert decord_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(decord_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames,
|
|
|
3)[:, np.newaxis]
|
|
|
decord_init = DecordInit()
|
|
|
decord_init_result = decord_init(video_result)
|
|
|
video_result['video_reader'] = decord_init_result['video_reader']
|
|
|
|
|
|
decord_decode = DecordDecode(mode='efficient')
|
|
|
decord_decode_result = decord_decode(video_result)
|
|
|
assert assert_dict_has_keys(decord_decode_result, target_keys)
|
|
|
assert decord_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(decord_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(1, self.total_frames, 3)
|
|
|
decord_init = DecordInit()
|
|
|
decord_init_result = decord_init(video_result)
|
|
|
video_result['video_reader'] = decord_init_result['video_reader']
|
|
|
|
|
|
decord_decode = DecordDecode(mode='efficient')
|
|
|
decord_decode_result = decord_decode(video_result)
|
|
|
assert assert_dict_has_keys(decord_decode_result, target_keys)
|
|
|
assert decord_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(decord_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
assert repr(decord_decode) == (f'{decord_decode.__class__.__name__}('
|
|
|
f'mode=efficient)')
|
|
|
|
|
|
def test_opencv_init(self):
|
|
|
target_keys = ['new_path', 'video_reader', 'total_frames']
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
opencv_init = OpenCVInit()
|
|
|
opencv_init_result = opencv_init(video_result)
|
|
|
assert assert_dict_has_keys(opencv_init_result, target_keys)
|
|
|
assert opencv_init_result['total_frames'] == len(
|
|
|
opencv_init_result['video_reader'])
|
|
|
assert repr(opencv_init) == (f'{opencv_init.__class__.__name__}('
|
|
|
f'io_backend=disk)')
|
|
|
|
|
|
def test_opencv_decode(self):
|
|
|
target_keys = ['frame_inds', 'imgs', 'original_shape']
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames,
|
|
|
2)[:, np.newaxis]
|
|
|
opencv_init = OpenCVInit()
|
|
|
opencv_init_result = opencv_init(video_result)
|
|
|
video_result['video_reader'] = opencv_init_result['video_reader']
|
|
|
|
|
|
opencv_decode = OpenCVDecode()
|
|
|
opencv_decode_result = opencv_decode(video_result)
|
|
|
assert assert_dict_has_keys(opencv_decode_result, target_keys)
|
|
|
assert opencv_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(opencv_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(1, self.total_frames,
|
|
|
2)[:, np.newaxis]
|
|
|
opencv_init = OpenCVInit()
|
|
|
opencv_init_result = opencv_init(video_result)
|
|
|
video_result['video_reader'] = opencv_init_result['video_reader']
|
|
|
|
|
|
opencv_decode = OpenCVDecode()
|
|
|
opencv_decode_result = opencv_decode(video_result)
|
|
|
assert assert_dict_has_keys(opencv_decode_result, target_keys)
|
|
|
assert opencv_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(opencv_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(0, self.total_frames, 3)
|
|
|
opencv_init = OpenCVInit()
|
|
|
opencv_init_result = opencv_init(video_result)
|
|
|
video_result['video_reader'] = opencv_init_result['video_reader']
|
|
|
|
|
|
|
|
|
video_result = copy.deepcopy(self.video_results)
|
|
|
video_result['frame_inds'] = np.arange(1, self.total_frames, 3)
|
|
|
opencv_init = OpenCVInit()
|
|
|
opencv_init_result = opencv_init(video_result)
|
|
|
video_result['video_reader'] = opencv_init_result['video_reader']
|
|
|
|
|
|
opencv_decode = OpenCVDecode()
|
|
|
opencv_decode_result = opencv_decode(video_result)
|
|
|
assert assert_dict_has_keys(opencv_decode_result, target_keys)
|
|
|
assert opencv_decode_result['original_shape'] == (256, 340)
|
|
|
assert np.shape(opencv_decode_result['imgs']) == (len(
|
|
|
video_result['frame_inds']), 256, 340, 3)
|
|
|
|
|
|
def test_rawframe_decode(self):
|
|
|
target_keys = ['frame_inds', 'imgs', 'original_shape', 'modality']
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(0, self.total_frames, 2)[:,
|
|
|
np.newaxis]
|
|
|
|
|
|
|
|
|
inputs['frame_inds'] = inputs['frame_inds'] + 1
|
|
|
|
|
|
inputs['gt_bboxes'] = np.array([[0, 0, 1, 1]])
|
|
|
inputs['proposals'] = np.array([[0, 0, 1, 1]])
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(1, self.total_frames, 2)[:,
|
|
|
np.newaxis]
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(0, self.total_frames, 5)
|
|
|
|
|
|
|
|
|
inputs['frame_inds'] = inputs['frame_inds'] + 1
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(1, self.total_frames, 5)
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(0, self.total_frames, 2)
|
|
|
|
|
|
|
|
|
inputs['frame_inds'] = inputs['frame_inds'] + 1
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(1, self.total_frames, 2)
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.flow_frame_results)
|
|
|
inputs['frame_inds'] = np.arange(0, self.total_frames, 2)
|
|
|
|
|
|
|
|
|
inputs['frame_inds'] = inputs['frame_inds'] + 1
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 2)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.flow_frame_results)
|
|
|
inputs['frame_inds'] = np.arange(1, self.total_frames, 2)
|
|
|
frame_selector = RawFrameDecode(io_backend='disk')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']), 240,
|
|
|
320, 2)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
return
|
|
|
|
|
|
if platform.system() != 'Windows':
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(0, self.total_frames, 5)
|
|
|
|
|
|
|
|
|
inputs['frame_inds'] = inputs['frame_inds'] + 1
|
|
|
frame_selector = RawFrameDecode(
|
|
|
io_backend='disk', decoding_backend='turbojpeg')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']),
|
|
|
240, 320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.frame_results)
|
|
|
inputs['frame_inds'] = np.arange(1, self.total_frames, 5)
|
|
|
frame_selector = RawFrameDecode(
|
|
|
io_backend='disk', decoding_backend='turbojpeg')
|
|
|
results = frame_selector(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert np.shape(results['imgs']) == (len(inputs['frame_inds']),
|
|
|
240, 320, 3)
|
|
|
assert results['original_shape'] == (240, 320)
|
|
|
assert repr(frame_selector) == (
|
|
|
f'{frame_selector.__class__.__name__}(io_backend=disk, '
|
|
|
f'decoding_backend=turbojpeg)')
|
|
|
|
|
|
def test_pyav_decode_motion_vector(self):
|
|
|
pyav_init = PyAVInit()
|
|
|
pyav = PyAVDecodeMotionVector()
|
|
|
|
|
|
|
|
|
results = {
|
|
|
'filename': self.video_path,
|
|
|
'frame_inds': np.arange(0, 32, 1)[:, np.newaxis]
|
|
|
}
|
|
|
results = pyav_init(results)
|
|
|
results = pyav(results)
|
|
|
target_keys = ['motion_vectors']
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
|
|
|
|
|
|
results = {
|
|
|
'filename': self.video_path,
|
|
|
'frame_inds': np.arange(0, 32, 1)
|
|
|
}
|
|
|
pyav_init = PyAVInit()
|
|
|
results = pyav_init(results)
|
|
|
pyav = PyAVDecodeMotionVector()
|
|
|
results = pyav(results)
|
|
|
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
|
|
|
|
|
|
class TestLoad(BaseTestLoading):
|
|
|
|
|
|
def test_load_hvu_label(self):
|
|
|
hvu_label_example1 = copy.deepcopy(self.hvu_label_example1)
|
|
|
hvu_label_example2 = copy.deepcopy(self.hvu_label_example2)
|
|
|
categories = hvu_label_example1['categories']
|
|
|
category_nums = hvu_label_example1['category_nums']
|
|
|
num_tags = sum(category_nums)
|
|
|
num_categories = len(categories)
|
|
|
|
|
|
loader = LoadHVULabel()
|
|
|
assert repr(loader) == (f'{loader.__class__.__name__}('
|
|
|
f'hvu_initialized={False})')
|
|
|
|
|
|
result1 = loader(hvu_label_example1)
|
|
|
label1 = torch.zeros(num_tags)
|
|
|
mask1 = torch.zeros(num_tags)
|
|
|
category_mask1 = torch.zeros(num_categories)
|
|
|
|
|
|
assert repr(loader) == (f'{loader.__class__.__name__}('
|
|
|
f'hvu_initialized={True})')
|
|
|
|
|
|
label1[[0, 4, 5, 7, 8]] = 1.
|
|
|
mask1[:10] = 1.
|
|
|
category_mask1[:3] = 1.
|
|
|
|
|
|
assert torch.all(torch.eq(label1, result1['label']))
|
|
|
assert torch.all(torch.eq(mask1, result1['mask']))
|
|
|
assert torch.all(torch.eq(category_mask1, result1['category_mask']))
|
|
|
|
|
|
result2 = loader(hvu_label_example2)
|
|
|
label2 = torch.zeros(num_tags)
|
|
|
mask2 = torch.zeros(num_tags)
|
|
|
category_mask2 = torch.zeros(num_categories)
|
|
|
|
|
|
label2[[1, 8, 9, 11]] = 1.
|
|
|
mask2[:2] = 1.
|
|
|
mask2[7:] = 1.
|
|
|
category_mask2[[0, 2, 3]] = 1.
|
|
|
|
|
|
assert torch.all(torch.eq(label2, result2['label']))
|
|
|
assert torch.all(torch.eq(mask2, result2['mask']))
|
|
|
assert torch.all(torch.eq(category_mask2, result2['category_mask']))
|
|
|
|
|
|
def test_load_localization_feature(self):
|
|
|
target_keys = ['raw_feature']
|
|
|
|
|
|
action_result = copy.deepcopy(self.action_results)
|
|
|
|
|
|
|
|
|
with pytest.raises(TypeError):
|
|
|
load_localization_feature = LoadLocalizationFeature(
|
|
|
'unsupport_ext')
|
|
|
|
|
|
|
|
|
load_localization_feature = LoadLocalizationFeature()
|
|
|
load_localization_feature_result = load_localization_feature(
|
|
|
action_result)
|
|
|
assert assert_dict_has_keys(load_localization_feature_result,
|
|
|
target_keys)
|
|
|
assert load_localization_feature_result['raw_feature'].shape == (400,
|
|
|
5)
|
|
|
assert repr(load_localization_feature
|
|
|
) == f'{load_localization_feature.__class__.__name__}'
|
|
|
|
|
|
def test_load_proposals(self):
|
|
|
target_keys = [
|
|
|
'bsp_feature', 'tmin', 'tmax', 'tmin_score', 'tmax_score',
|
|
|
'reference_temporal_iou'
|
|
|
]
|
|
|
|
|
|
action_result = copy.deepcopy(self.action_results)
|
|
|
|
|
|
|
|
|
with pytest.raises(NotImplementedError):
|
|
|
load_proposals = LoadProposals(5, self.proposals_dir,
|
|
|
self.bsp_feature_dir,
|
|
|
'unsupport_ext')
|
|
|
|
|
|
with pytest.raises(NotImplementedError):
|
|
|
load_proposals = LoadProposals(5, self.proposals_dir,
|
|
|
self.bsp_feature_dir, '.csv',
|
|
|
'unsupport_ext')
|
|
|
|
|
|
|
|
|
load_proposals = LoadProposals(5, self.proposals_dir,
|
|
|
self.bsp_feature_dir)
|
|
|
load_proposals_result = load_proposals(action_result)
|
|
|
assert assert_dict_has_keys(load_proposals_result, target_keys)
|
|
|
assert load_proposals_result['bsp_feature'].shape[0] == 5
|
|
|
assert load_proposals_result['tmin'].shape == (5, )
|
|
|
assert_array_almost_equal(
|
|
|
load_proposals_result['tmin'], np.arange(0.1, 0.6, 0.1), decimal=4)
|
|
|
assert load_proposals_result['tmax'].shape == (5, )
|
|
|
assert_array_almost_equal(
|
|
|
load_proposals_result['tmax'], np.arange(0.2, 0.7, 0.1), decimal=4)
|
|
|
assert load_proposals_result['tmin_score'].shape == (5, )
|
|
|
assert_array_almost_equal(
|
|
|
load_proposals_result['tmin_score'],
|
|
|
np.arange(0.95, 0.90, -0.01),
|
|
|
decimal=4)
|
|
|
assert load_proposals_result['tmax_score'].shape == (5, )
|
|
|
assert_array_almost_equal(
|
|
|
load_proposals_result['tmax_score'],
|
|
|
np.arange(0.96, 0.91, -0.01),
|
|
|
decimal=4)
|
|
|
assert load_proposals_result['reference_temporal_iou'].shape == (5, )
|
|
|
assert_array_almost_equal(
|
|
|
load_proposals_result['reference_temporal_iou'],
|
|
|
np.arange(0.85, 0.80, -0.01),
|
|
|
decimal=4)
|
|
|
assert repr(load_proposals) == (
|
|
|
f'{load_proposals.__class__.__name__}('
|
|
|
f'top_k={5}, '
|
|
|
f'pgm_proposals_dir={self.proposals_dir}, '
|
|
|
f'pgm_features_dir={self.bsp_feature_dir}, '
|
|
|
f'proposal_ext=.csv, '
|
|
|
f'feature_ext=.npy)')
|
|
|
|
|
|
def test_load_audio_feature(self):
|
|
|
target_keys = ['audios']
|
|
|
inputs = copy.deepcopy(self.audio_feature_results)
|
|
|
load_audio_feature = LoadAudioFeature()
|
|
|
results = load_audio_feature(inputs)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
|
|
|
|
|
|
inputs = copy.deepcopy(self.audio_feature_results)
|
|
|
inputs['audio_path'] = 'foo/foo/bar.npy'
|
|
|
load_audio_feature = LoadAudioFeature()
|
|
|
results = load_audio_feature(inputs)
|
|
|
assert results['audios'].shape == (640, 80)
|
|
|
assert assert_dict_has_keys(results, target_keys)
|
|
|
assert repr(load_audio_feature) == (
|
|
|
f'{load_audio_feature.__class__.__name__}('
|
|
|
f'pad_method=zero)')
|
|
|
|
|
|
|
|
|
class TestLocalization(BaseTestLoading):
|
|
|
|
|
|
def test_generate_localization_label(self):
|
|
|
action_result = copy.deepcopy(self.action_results)
|
|
|
action_result['raw_feature'] = np.random.randn(400, 5)
|
|
|
|
|
|
|
|
|
target_keys = ['gt_bbox']
|
|
|
generate_localization_labels = GenerateLocalizationLabels()
|
|
|
generate_localization_labels_result = generate_localization_labels(
|
|
|
action_result)
|
|
|
assert assert_dict_has_keys(generate_localization_labels_result,
|
|
|
target_keys)
|
|
|
|
|
|
assert_array_almost_equal(
|
|
|
generate_localization_labels_result['gt_bbox'], [[0.375, 0.625]],
|
|
|
decimal=4)
|
|
|
|
|
|
|
|
|
class TestLoadImageFromFile:
|
|
|
|
|
|
def test_load_img(self):
|
|
|
data_prefix = osp.join(osp.dirname(__file__), '../../data')
|
|
|
|
|
|
results = dict(img_path=osp.join(data_prefix, 'test.jpg'))
|
|
|
transform = LoadRGBFromFile()
|
|
|
results = transform(copy.deepcopy(results))
|
|
|
assert results['img_path'] == osp.join(data_prefix, 'test.jpg')
|
|
|
assert results['img'].shape == (240, 320, 3)
|
|
|
assert results['img'].dtype == np.uint8
|
|
|
assert results['img_shape'] == (240, 320)
|
|
|
assert results['ori_shape'] == (240, 320)
|
|
|
assert repr(transform) == transform.__class__.__name__ + \
|
|
|
"(ignore_empty=False, to_float32=False, color_type='color', " + \
|
|
|
"imdecode_backend='cv2', io_backend='disk')"
|
|
|
|
|
|
|
|
|
transform = LoadRGBFromFile(to_float32=True)
|
|
|
results = transform(copy.deepcopy(results))
|
|
|
assert results['img'].dtype == np.float32
|
|
|
|
|
|
|
|
|
fake_img_path = osp.join(data_prefix, 'fake.jpg')
|
|
|
results['img_path'] = fake_img_path
|
|
|
transform = LoadRGBFromFile(ignore_empty=False)
|
|
|
with pytest.raises(FileNotFoundError):
|
|
|
transform(copy.deepcopy(results))
|
|
|
transform = LoadRGBFromFile(ignore_empty=True)
|
|
|
assert transform(copy.deepcopy(results)) is None
|
|
|
|