MTFL_UCF_Crime / utils /video_loader.py
erkutt's picture
Upload open source code of MTFL model
28e129b verified
import logging
import os
import sys
import numpy as np
from torch.utils import data
from torchvision.datasets.video_utils import VideoClips
class VideoIter(data.Dataset):
def __init__(self,
clip_length,
frame_stride,
proc_video=None,
dataset_path=None,
video_transform=None,
use_splits=False,
return_label=False,):
super().__init__()
# video clip properties
self.frames_stride = frame_stride
self.total_clip_length_in_frames = clip_length * frame_stride
self.video_transform = video_transform
# IO
self.dataset_path = dataset_path
self.video_list = self._get_video_list(dataset_path=self.dataset_path, proc_video=proc_video, use_splits=use_splits)
self.return_label = return_label
# data loading
self.video_clips = VideoClips(video_paths=self.video_list,
clip_length_in_frames=self.total_clip_length_in_frames,
frames_between_clips=int(self.total_clip_length_in_frames), # between start frame
)
@property
def video_count(self):
return len(self.video_list)
def getitem_from_raw_video(self, idx):
"""
Load and preprocess a video clip from the dataset.
Args:
idx (int): Index of the video clip to load.
Returns:
tuple: A tuple containing the loaded and preprocessed video clip, clip index, directory, and filename.
"""
video, _, _, _ = self.video_clips.get_clip(idx)
video_idx, clip_idx = self.video_clips.get_clip_location(idx)
video_path = self.video_clips.video_paths[video_idx]
in_clip_frames = list(range(0, self.total_clip_length_in_frames, self.frames_stride))
video = video[in_clip_frames]
if self.video_transform is not None:
video = self.video_transform(video)
dir, file = video_path.split(os.sep)[-2:]
file = file.split('.')[0]
if self.return_label:
label = 0 if "Normal" in video_path else 1
return video, label, clip_idx, dir, file
return video, clip_idx, dir, file
def __len__(self):
return len(self.video_clips)
def __getitem__(self, index):
succ = False
while not succ:
try:
batch = self.getitem_from_raw_video(index)
succ = True
except Exception as e:
index = np.random.choice(range(0, self.__len__()))
trace_back = sys.exc_info()[2]
line = trace_back.tb_lineno
logging.warning(f"VideoIter:: ERROR (line number {line}) !! (Force using another index:\n{index})\n{e}")
return batch
def _get_video_list(self, dataset_path, proc_video, use_splits):
"""
Get a list of video files from the dataset path, considering the specified processing options.
Args:
dataset_path (str): Path to the dataset directory.
proc_video (list or None): List of processed video filenames or None.
use_splits (bool): Whether to use splits for data selection.
Returns:
list: List of video file paths.
"""
assert os.path.exists(dataset_path), "VideoIter:: failed to locate: `{}'".format(dataset_path)
with open("/media/DataDrive/yiling/annotation/recognition/splits/VAD/splits_video_list.txt", "r") as file:
splits_files = [line.strip() for line in file.readlines()]
vid_list = []
for path, subdirs, files in os.walk(dataset_path):
for name in files:
file_path = os.path.join(path, name)
parent_dir = os.path.basename(os.path.dirname(file_path))
relative_path = os.path.join(parent_dir, name)
if use_splits and relative_path not in splits_files:
continue
if ('mp4' not in name and 'avi' not in name) or relative_path.split(".")[0] in proc_video:
continue
vid_list.append(os.path.join(path, name))
logging.info(f"Found {len(vid_list)} unprocessed video files in {dataset_path}")
return vid_list