| | import os |
| | import subprocess |
| |
|
| | import cv2 |
| | from imageio_ffmpeg import get_ffmpeg_exe |
| | from mmengine.logging import print_log |
| | from moviepy.editor import VideoFileClip |
| | from scenedetect import FrameTimecode |
| |
|
| |
|
| | def iterate_files(folder_path): |
| | for root, dirs, files in os.walk(folder_path): |
| | |
| | |
| | |
| |
|
| | |
| | for file in files: |
| | file_path = os.path.join(root, file) |
| | |
| | yield file_path |
| |
|
| | |
| | for subdir in dirs: |
| | subdir_path = os.path.join(root, subdir) |
| | |
| | iterate_files(subdir_path) |
| |
|
| |
|
| | def iterate_folders(folder_path): |
| | for root, dirs, files in os.walk(folder_path): |
| | for subdir in dirs: |
| | subdir_path = os.path.join(root, subdir) |
| | yield subdir_path |
| | |
| | iterate_folders(subdir_path) |
| |
|
| |
|
| | def clone_folder_structure(root_src, root_dst, verbose=False): |
| | src_path_list = iterate_folders(root_src) |
| | src_relpath_list = [os.path.relpath(x, root_src) for x in src_path_list] |
| |
|
| | os.makedirs(root_dst, exist_ok=True) |
| | dst_path_list = [os.path.join(root_dst, x) for x in src_relpath_list] |
| | for folder_path in dst_path_list: |
| | os.makedirs(folder_path, exist_ok=True) |
| | if verbose: |
| | print(f"Create folder: '{folder_path}'") |
| |
|
| |
|
| | def count_files(root, suffix=".mp4"): |
| | files_list = iterate_files(root) |
| | cnt = len([x for x in files_list if x.endswith(suffix)]) |
| | return cnt |
| |
|
| |
|
| | def check_mp4_integrity(file_path, verbose=True, logger=None): |
| | try: |
| | VideoFileClip(file_path) |
| | if verbose: |
| | print_log(f"The MP4 file '{file_path}' is intact.", logger=logger) |
| | return True |
| | except Exception as e: |
| | if verbose: |
| | print_log(f"Error: {e}", logger=logger) |
| | print_log(f"The MP4 file '{file_path}' is not intact.", logger=logger) |
| | return False |
| |
|
| |
|
| | def count_frames(video_path): |
| | cap = cv2.VideoCapture(video_path) |
| |
|
| | if not cap.isOpened(): |
| | print(f"Error: Could not open video file '{video_path}'") |
| | return |
| |
|
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | print(f"Total frames in the video '{video_path}': {total_frames}") |
| |
|
| | cap.release() |
| |
|
| |
|
| | def split_video( |
| | sample_path, |
| | scene_list, |
| | save_dir, |
| | target_fps=30, |
| | min_seconds=1, |
| | max_seconds=10, |
| | shorter_size=512, |
| | verbose=False, |
| | logger=None, |
| | ): |
| | FFMPEG_PATH = get_ffmpeg_exe() |
| |
|
| | save_path_list = [] |
| | for idx, scene in enumerate(scene_list): |
| | s, t = scene |
| | fps = s.framerate |
| | max_duration = FrameTimecode(timecode="00:00:00", fps=fps) |
| | max_duration.frame_num = round(fps * max_seconds) |
| | duration = min(max_duration, t - s) |
| | if duration.get_frames() < round(min_seconds * fps): |
| | continue |
| |
|
| | |
| | fname = os.path.basename(sample_path) |
| | fname_wo_ext = os.path.splitext(fname)[0] |
| | |
| | save_path = os.path.join(save_dir, f"{fname_wo_ext}_scene-{idx}.mp4") |
| |
|
| | |
| | cmd = [FFMPEG_PATH] |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | cmd += ["-i", sample_path] |
| |
|
| | |
| | cmd += ["-nostdin", "-y", "-ss", str(s.get_seconds()), "-t", str(duration.get_seconds())] |
| |
|
| | |
| | |
| | cmd += ["-r", f"{target_fps}"] |
| |
|
| | |
| | cmd += ["-vf", f"scale='if(gt(iw,ih),-2,{shorter_size})':'if(gt(iw,ih),{shorter_size},-2)'"] |
| | |
| |
|
| | cmd += ["-map", "0", save_path] |
| |
|
| | proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| | stdout, stderr = proc.communicate() |
| | if verbose: |
| | stdout = stdout.decode("utf-8") |
| | print_log(stdout, logger=logger) |
| |
|
| | save_path_list.append(sample_path) |
| | print_log(f"Video clip saved to '{save_path}'", logger=logger) |
| |
|
| | return save_path_list |
| |
|