| import decord |
| import PIL.Image |
| import PIL |
| import numpy as np |
|
|
|
|
| def load_frame(video_path, index=0): |
| vr = decord.VideoReader(video_path, num_threads=1) |
| frame = PIL.Image.fromarray(vr[index].asnumpy()) |
| return frame |
|
|
|
|
| def load_first_and_final_frames(video_path): |
| vr = decord.VideoReader(video_path, num_threads=1) |
| frame_l = PIL.Image.fromarray(vr[0].asnumpy()) |
| frame_r = PIL.Image.fromarray(vr[-1].asnumpy()) |
| return [frame_l, frame_r] |
|
|
|
|
| def load_frames_linspace(video_path, st=None, et=None, n=8, num_threads=1, reverse=False, **vr_args): |
| decord.bridge.set_bridge('native') |
|
|
| try: |
| vr = decord.VideoReader(video_path, num_threads=num_threads, **vr_args) |
| except Exception as e: |
| print("Error loading video:", e, "for video:", video_path) |
| |
| return [PIL.Image.new("RGB", (480, 256)) for _ in range(n)] |
|
|
| fps = vr.get_avg_fps() |
| if st is None: |
| sf = 0 |
| else: |
| sf = max(int(st * fps), 0) |
| if et is None: |
| ef = len(vr) - 1 |
| else: |
| ef = min(int(et * fps), len(vr) - 1) |
| ef = max(ef, sf) |
| if n == -1: |
| indices = np.arange(sf, ef + 1) |
| else: |
| indices = np.linspace(sf, ef, n, endpoint=True).astype(int) |
|
|
| try: |
| frames = [PIL.Image.fromarray(vr[i].asnumpy()) for i in indices] |
| except Exception as e: |
| print("Error loading frames:", e, "for video:", video_path) |
| |
| frames = [PIL.Image.new("RGB", (480, 256)) for _ in range(n)] |
| |
| if reverse: |
| frames = frames[::-1] |
|
|
| |
| del vr |
|
|
| return frames |
|
|
|
|
| def load_frames_linspace_with_first_and_last(video_path, n=8): |
| """Loads n frames from a video, including the first and last frames.""" |
| assert n > 1, "n should be greater than 1" |
| vr = decord.VideoReader(video_path, num_threads=1) |
| indices = np.linspace(0, len(vr) - 1, n - 2).astype(int) |
| frames = [PIL.Image.fromarray(vr[0].asnumpy())] |
| frames += [PIL.Image.fromarray(x) for x in vr.get_batch(indices).asnumpy()] |
| frames += [PIL.Image.fromarray(vr[-1].asnumpy())] |
| return frames |
|
|
|
|
| def get_duration(path, return_fps=False): |
| vr = decord.VideoReader(path, num_threads=1) |
| if not return_fps: |
| return len(vr) / vr.get_avg_fps() |
| else: |
| return len(vr) / vr.get_avg_fps(), vr.get_avg_fps() |
|
|
|
|
| def load_frames_at_timestamps(video_path, timestamps): |
| """ |
| Loads frames at given timestamps from a video. |
| |
| Args: |
| video_path (str): Path to the video file. |
| timestamps (list): List of timestamps at which to load frames. |
| """ |
| vr = decord.VideoReader(video_path, num_threads=1) |
| duration = len(vr) / vr.get_avg_fps() |
| assert max(timestamps) <= duration, \ |
| "Timestamps should be within the duration of the video." |
| indices = [int(t * vr.get_avg_fps()) for t in timestamps] |
| frames = [PIL.Image.fromarray(vr[i].asnumpy()) for i in indices] |
| return frames |
|
|
|
|
|
|
| import ffmpeg |
| import os |
| from pathlib import Path |
|
|
| def cut_video(video_path, start_time, end_time, save_path): |
| """ |
| Cut a video clip from a source video file. |
| |
| Args: |
| video_path (str): Path to the input video file |
| start_time (str or float): Start time in seconds (float) or time format (str like "00:01:30") |
| end_time (str or float): End time in seconds (float) or time format (str like "00:02:45") |
| save_path (str): Path where the cut video will be saved |
| |
| Returns: |
| bool: True if successful, False otherwise |
| |
| Raises: |
| FileNotFoundError: If input video file doesn't exist |
| Exception: For other FFmpeg-related errors |
| """ |
| |
| |
| if not os.path.exists(video_path): |
| raise FileNotFoundError(f"Input video file not found: {video_path}") |
| |
| |
| output_dir = os.path.dirname(save_path) |
| if output_dir and not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
| |
| try: |
| |
| if isinstance(start_time, (int, float)) and isinstance(end_time, (int, float)): |
| duration = end_time - start_time |
| else: |
| |
| duration = None |
| |
| |
| input_stream = ffmpeg.input(video_path) |
| |
| if duration is not None: |
| |
| output_stream = input_stream.video.filter('trim', start=start_time, duration=duration) |
| audio_stream = input_stream.audio.filter('atrim', start=start_time, duration=duration) |
| else: |
| |
| output_stream = input_stream.video.filter('trim', start=start_time, end=end_time) |
| audio_stream = input_stream.audio.filter('atrim', start=start_time, end=end_time) |
| |
| |
| output = ffmpeg.output( |
| output_stream, |
| audio_stream, |
| save_path, |
| vcodec='copy', |
| acodec='copy' |
| ) |
| |
| |
| ffmpeg.run(output, overwrite_output=True, quiet=True) |
| |
| print(f"Video successfully cut and saved to: {save_path}") |
| return True |
| |
| except ffmpeg.Error as e: |
| print(f"FFmpeg error occurred: {e}") |
| return False |
| except Exception as e: |
| print(f"An error occurred: {e}") |
| return False |
|
|
| |
| import subprocess |
|
|
| def cut_video_subprocess(video_path, start_time, end_time, save_path): |
| """ |
| Cut a video clip using subprocess to call FFmpeg directly. |
| |
| Args: |
| video_path (str): Path to the input video file |
| start_time (str): Start time (e.g., "00:01:30" or "90") |
| end_time (str): End time (e.g., "00:02:45" or "165") |
| save_path (str): Path where the cut video will be saved |
| |
| Returns: |
| bool: True if successful, False otherwise |
| """ |
| |
| if not os.path.exists(video_path): |
| raise FileNotFoundError(f"Input video file not found: {video_path}") |
| |
| |
| output_dir = os.path.dirname(save_path) |
| if output_dir and not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
| |
| try: |
| |
| cmd = [ |
| 'ffmpeg', |
| '-i', video_path, |
| '-ss', str(start_time), |
| '-to', str(end_time), |
| '-c', 'copy', |
| '-y', |
| save_path |
| ] |
| |
| |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| |
| if result.returncode == 0: |
| print(f"Video successfully cut and saved to: {save_path}") |
| return True |
| else: |
| print(f"FFmpeg error: {result.stderr}") |
| return False |
| |
| except Exception as e: |
| print(f"An error occurred: {e}") |
| return False |
|
|
| |
| if __name__ == "__main__": |
| |
| cut_video( |
| video_path="sample_data/folding_paper.mp4", |
| start_time=3, |
| end_time=9, |
| save_path="output_clip.mp4" |
| ) |
|
|