Spaces:
Runtime error
Runtime error
| import os | |
| import subprocess | |
| import cv2 | |
| from imageio_ffmpeg import get_ffmpeg_exe | |
| from mmengine.logging import print_log | |
| from moviepy.editor import VideoFileClip | |
| from scenedetect import FrameTimecode | |
| def iterate_files(folder_path): | |
| for root, dirs, files in os.walk(folder_path): | |
| # root contains the current directory path | |
| # dirs contains the list of subdirectories in the current directory | |
| # files contains the list of files in the current directory | |
| # Process files in the current directory | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| # print("File:", file_path) | |
| yield file_path | |
| # Process subdirectories and recursively call the function | |
| for subdir in dirs: | |
| subdir_path = os.path.join(root, subdir) | |
| # print("Subdirectory:", subdir_path) | |
| iterate_files(subdir_path) | |
| def iterate_folders(folder_path): | |
| for root, dirs, files in os.walk(folder_path): | |
| for subdir in dirs: | |
| subdir_path = os.path.join(root, subdir) | |
| yield subdir_path | |
| # print("Subdirectory:", subdir_path) | |
| iterate_folders(subdir_path) | |
| def clone_folder_structure(root_src, root_dst, verbose=False): | |
| src_path_list = iterate_folders(root_src) | |
| src_relpath_list = [os.path.relpath(x, root_src) for x in src_path_list] | |
| os.makedirs(root_dst, exist_ok=True) | |
| dst_path_list = [os.path.join(root_dst, x) for x in src_relpath_list] | |
| for folder_path in dst_path_list: | |
| os.makedirs(folder_path, exist_ok=True) | |
| if verbose: | |
| print(f"Create folder: '{folder_path}'") | |
| def count_files(root, suffix=".mp4"): | |
| files_list = iterate_files(root) | |
| cnt = len([x for x in files_list if x.endswith(suffix)]) | |
| return cnt | |
| def check_mp4_integrity(file_path, verbose=True, logger=None): | |
| try: | |
| VideoFileClip(file_path) | |
| if verbose: | |
| print_log(f"The MP4 file '{file_path}' is intact.", logger=logger) | |
| return True | |
| except Exception as e: | |
| if verbose: | |
| print_log(f"Error: {e}", logger=logger) | |
| print_log(f"The MP4 file '{file_path}' is not intact.", logger=logger) | |
| return False | |
| def count_frames(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| print(f"Error: Could not open video file '{video_path}'") | |
| return | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| print(f"Total frames in the video '{video_path}': {total_frames}") | |
| cap.release() | |
| def split_video( | |
| sample_path, | |
| scene_list, | |
| save_dir, | |
| target_fps=30, | |
| min_seconds=1, | |
| max_seconds=10, | |
| shorter_size=512, | |
| verbose=False, | |
| logger=None, | |
| ): | |
| FFMPEG_PATH = get_ffmpeg_exe() | |
| save_path_list = [] | |
| for idx, scene in enumerate(scene_list): | |
| s, t = scene # FrameTimecode | |
| fps = s.framerate | |
| max_duration = FrameTimecode(timecode="00:00:00", fps=fps) | |
| max_duration.frame_num = round(fps * max_seconds) | |
| duration = min(max_duration, t - s) | |
| if duration.get_frames() < round(min_seconds * fps): | |
| continue | |
| # save path | |
| fname = os.path.basename(sample_path) | |
| fname_wo_ext = os.path.splitext(fname)[0] | |
| # TODO: fname pattern | |
| save_path = os.path.join(save_dir, f"{fname_wo_ext}_scene-{idx}.mp4") | |
| # ffmpeg cmd | |
| cmd = [FFMPEG_PATH] | |
| # Only show ffmpeg output for the first call, which will display any | |
| # errors if it fails, and then break the loop. We only show error messages | |
| # for the remaining calls. | |
| # cmd += ['-v', 'error'] | |
| # input path | |
| cmd += ["-i", sample_path] | |
| # clip to cut | |
| cmd += ["-nostdin", "-y", "-ss", str(s.get_seconds()), "-t", str(duration.get_seconds())] | |
| # target fps | |
| # cmd += ['-vf', 'select=mod(n\,2)'] | |
| cmd += ["-r", f"{target_fps}"] | |
| # aspect ratio | |
| cmd += ["-vf", f"scale='if(gt(iw,ih),-2,{shorter_size})':'if(gt(iw,ih),{shorter_size},-2)'"] | |
| # cmd += ['-vf', f"scale='if(gt(iw,ih),{shorter_size},trunc(ow/a/2)*2)':-2"] | |
| cmd += ["-map", "0", save_path] | |
| proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
| stdout, stderr = proc.communicate() | |
| if verbose: | |
| stdout = stdout.decode("utf-8") | |
| print_log(stdout, logger=logger) | |
| save_path_list.append(sample_path) | |
| print_log(f"Video clip saved to '{save_path}'", logger=logger) | |
| return save_path_list | |