Spaces:
Sleeping
Sleeping
| """ | |
| Converts a dataset of mp4 videos into a dataset of video frames | |
| I.e. a directory of mp4 files becomes a directory of directories of frames | |
| This speeds up loading during training because we do not need | |
| """ | |
| import os | |
| from typing import List | |
| import argparse | |
| from pathlib import Path | |
| from multiprocessing import Pool | |
| from collections import Counter | |
| import numpy as np | |
| from PIL import Image | |
| import torchvision.transforms.functional as TVF | |
| from moviepy.editor import VideoFileClip | |
| from tqdm import tqdm | |
| def convert_videos_into_dataset(video_path: os.PathLike, target_dir: os.PathLike, num_chunks: int, chunk_size: int, start_frame: int, target_size: int, force_fps: int): | |
| assert (num_chunks is None) or (chunk_size is None), "Cant use both num_chunks and chunk_size" | |
| os.makedirs(target_dir, exist_ok=True) | |
| clip = VideoFileClip(video_path) | |
| fps = clip.fps if force_fps is None else force_fps | |
| num_frames_total = int(np.floor(clip.duration * fps)) - start_frame | |
| if num_chunks is None: | |
| num_chunks = num_frames_total // chunk_size | |
| else: | |
| chunk_size = num_frames_total // num_chunks | |
| num_frames_to_save = chunk_size * num_chunks | |
| print(f'Processing the video at {fps} fps. {num_frames_total} frames in total. We have {num_chunks} videos of {chunk_size} frames each.') | |
| current_chunk_idx = 0 | |
| frame_idx = -start_frame | |
| curr_chunk_dir = os.path.join(target_dir, f'{current_chunk_idx:06d}') | |
| for frame in tqdm(clip.iter_frames(fps=fps), total=num_frames_total + start_frame): | |
| if frame_idx >= 0: | |
| os.makedirs(curr_chunk_dir, exist_ok=True) | |
| frame = Image.fromarray(frame) | |
| frame = TVF.center_crop(frame, output_size=min(frame.size)) | |
| frame = TVF.resize(frame, size=target_size, interpolation=Image.LANCZOS) | |
| frame.save(os.path.join(curr_chunk_dir, f'{frame_idx % chunk_size:06d}.jpg'), q=95) | |
| frame_idx += 1 | |
| if frame_idx % chunk_size == 0 and frame_idx > 0: | |
| current_chunk_idx += 1 | |
| curr_chunk_dir = os.path.join(target_dir, f'{current_chunk_idx:06d}') | |
| if frame_idx == num_frames_to_save: | |
| # Stop here so not to have a partially-filled chunk | |
| break | |
| chunk_sizes = [len(os.listdir(d)) for d in listdir_full_paths(target_dir)] | |
| assert len(set(chunk_sizes)) == 1, f"Bad chunk sizes: {set(chunk_sizes)}" | |
| print('Finished successfully!') | |
| def listdir_full_paths(d) -> List[os.PathLike]: | |
| return sorted([os.path.join(d, x) for x in os.listdir(d)]) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description='Convert a long video into a dataset of frame dirs') | |
| parser.add_argument('-s', '--source_video_path', type=str, help='Path to the source video') | |
| parser.add_argument('-t', '--target_dir', type=str, help='Where to save the new dataset') | |
| parser.add_argument('-n', '--num_chunks', type=int, help='How many samples should there be in the dataset?') | |
| parser.add_argument('-cs', '--chunk_size', type=int, help='Each video length. Should be used separately from num_chunks') | |
| parser.add_argument('-sf', '--start_frame', type=int, default=0, help='Start frame idx. Should we skip several frames?') | |
| parser.add_argument('--target_size', type=int, default=128, help='What size should we resize to?') | |
| parser.add_argument('--force_fps', type=int, help='What fps should we run videos with?') | |
| args = parser.parse_args() | |
| convert_videos_into_dataset( | |
| video_path=args.source_video_path, | |
| target_dir=args.target_dir, | |
| num_chunks=args.num_chunks, | |
| chunk_size=args.chunk_size, | |
| start_frame=args.start_frame, | |
| target_size=args.target_size, | |
| force_fps=args.force_fps, | |
| ) | |