|
|
import logging |
|
|
import os |
|
|
import sys |
|
|
|
|
|
import numpy as np |
|
|
from torch.utils import data |
|
|
from torchvision.datasets.video_utils import VideoClips |
|
|
|
|
|
|
|
|
class VideoIter(data.Dataset): |
|
|
def __init__(self, |
|
|
clip_length, |
|
|
frame_stride, |
|
|
proc_video=None, |
|
|
dataset_path=None, |
|
|
video_transform=None, |
|
|
use_splits=False, |
|
|
return_label=False,): |
|
|
super().__init__() |
|
|
|
|
|
self.frames_stride = frame_stride |
|
|
self.total_clip_length_in_frames = clip_length * frame_stride |
|
|
self.video_transform = video_transform |
|
|
|
|
|
|
|
|
self.dataset_path = dataset_path |
|
|
self.video_list = self._get_video_list(dataset_path=self.dataset_path, proc_video=proc_video, use_splits=use_splits) |
|
|
self.return_label = return_label |
|
|
|
|
|
|
|
|
self.video_clips = VideoClips(video_paths=self.video_list, |
|
|
clip_length_in_frames=self.total_clip_length_in_frames, |
|
|
frames_between_clips=int(self.total_clip_length_in_frames), |
|
|
) |
|
|
|
|
|
@property |
|
|
def video_count(self): |
|
|
return len(self.video_list) |
|
|
|
|
|
def getitem_from_raw_video(self, idx): |
|
|
""" |
|
|
Load and preprocess a video clip from the dataset. |
|
|
|
|
|
Args: |
|
|
idx (int): Index of the video clip to load. |
|
|
|
|
|
Returns: |
|
|
tuple: A tuple containing the loaded and preprocessed video clip, clip index, directory, and filename. |
|
|
""" |
|
|
video, _, _, _ = self.video_clips.get_clip(idx) |
|
|
video_idx, clip_idx = self.video_clips.get_clip_location(idx) |
|
|
video_path = self.video_clips.video_paths[video_idx] |
|
|
in_clip_frames = list(range(0, self.total_clip_length_in_frames, self.frames_stride)) |
|
|
video = video[in_clip_frames] |
|
|
if self.video_transform is not None: |
|
|
video = self.video_transform(video) |
|
|
|
|
|
dir, file = video_path.split(os.sep)[-2:] |
|
|
file = file.split('.')[0] |
|
|
|
|
|
if self.return_label: |
|
|
label = 0 if "Normal" in video_path else 1 |
|
|
return video, label, clip_idx, dir, file |
|
|
|
|
|
return video, clip_idx, dir, file |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.video_clips) |
|
|
|
|
|
def __getitem__(self, index): |
|
|
succ = False |
|
|
while not succ: |
|
|
try: |
|
|
batch = self.getitem_from_raw_video(index) |
|
|
succ = True |
|
|
except Exception as e: |
|
|
index = np.random.choice(range(0, self.__len__())) |
|
|
trace_back = sys.exc_info()[2] |
|
|
line = trace_back.tb_lineno |
|
|
logging.warning(f"VideoIter:: ERROR (line number {line}) !! (Force using another index:\n{index})\n{e}") |
|
|
|
|
|
return batch |
|
|
|
|
|
|
|
|
def _get_video_list(self, dataset_path, proc_video, use_splits): |
|
|
""" |
|
|
Get a list of video files from the dataset path, considering the specified processing options. |
|
|
|
|
|
Args: |
|
|
dataset_path (str): Path to the dataset directory. |
|
|
proc_video (list or None): List of processed video filenames or None. |
|
|
use_splits (bool): Whether to use splits for data selection. |
|
|
|
|
|
Returns: |
|
|
list: List of video file paths. |
|
|
""" |
|
|
assert os.path.exists(dataset_path), "VideoIter:: failed to locate: `{}'".format(dataset_path) |
|
|
with open("/media/DataDrive/yiling/annotation/recognition/splits/VAD/splits_video_list.txt", "r") as file: |
|
|
splits_files = [line.strip() for line in file.readlines()] |
|
|
vid_list = [] |
|
|
for path, subdirs, files in os.walk(dataset_path): |
|
|
for name in files: |
|
|
file_path = os.path.join(path, name) |
|
|
parent_dir = os.path.basename(os.path.dirname(file_path)) |
|
|
relative_path = os.path.join(parent_dir, name) |
|
|
if use_splits and relative_path not in splits_files: |
|
|
continue |
|
|
if ('mp4' not in name and 'avi' not in name) or relative_path.split(".")[0] in proc_video: |
|
|
continue |
|
|
vid_list.append(os.path.join(path, name)) |
|
|
|
|
|
logging.info(f"Found {len(vid_list)} unprocessed video files in {dataset_path}") |
|
|
|
|
|
return vid_list |
|
|
|
|
|
|