Safetensors
English
llava
video-retrieval
text-to-video-search
multimodal-embedding
TARA / shared /utils /video.py
bpiyush's picture
Update TARA to latest Tarsier2 checkpoint and runnable demo.
7daf628
import decord
import PIL.Image
import PIL
import numpy as np
def load_frame(video_path, index=0):
vr = decord.VideoReader(video_path, num_threads=1)
frame = PIL.Image.fromarray(vr[index].asnumpy())
return frame
def load_first_and_final_frames(video_path):
vr = decord.VideoReader(video_path, num_threads=1)
frame_l = PIL.Image.fromarray(vr[0].asnumpy())
frame_r = PIL.Image.fromarray(vr[-1].asnumpy())
return [frame_l, frame_r]
def load_frames_linspace(video_path, st=None, et=None, n=8, num_threads=1, reverse=False, **vr_args):
decord.bridge.set_bridge('native')
try:
vr = decord.VideoReader(video_path, num_threads=num_threads, **vr_args)
except Exception as e:
print("Error loading video:", e, "for video:", video_path)
# Return blank frames
return [PIL.Image.new("RGB", (480, 256)) for _ in range(n)]
fps = vr.get_avg_fps()
if st is None:
sf = 0
else:
sf = max(int(st * fps), 0)
if et is None:
ef = len(vr) - 1
else:
ef = min(int(et * fps), len(vr) - 1)
ef = max(ef, sf)
if n == -1:
indices = np.arange(sf, ef + 1)
else:
indices = np.linspace(sf, ef, n, endpoint=True).astype(int)
try:
frames = [PIL.Image.fromarray(vr[i].asnumpy()) for i in indices]
except Exception as e:
print("Error loading frames:", e, "for video:", video_path)
# Return blank frames
frames = [PIL.Image.new("RGB", (480, 256)) for _ in range(n)]
if reverse:
frames = frames[::-1]
# Close the video reader
del vr
return frames
def load_frames_linspace_with_first_and_last(video_path, n=8):
"""Loads n frames from a video, including the first and last frames."""
assert n > 1, "n should be greater than 1"
vr = decord.VideoReader(video_path, num_threads=1)
indices = np.linspace(0, len(vr) - 1, n - 2).astype(int)
frames = [PIL.Image.fromarray(vr[0].asnumpy())]
frames += [PIL.Image.fromarray(x) for x in vr.get_batch(indices).asnumpy()]
frames += [PIL.Image.fromarray(vr[-1].asnumpy())]
return frames
def get_duration(path, return_fps=False):
vr = decord.VideoReader(path, num_threads=1)
if not return_fps:
return len(vr) / vr.get_avg_fps()
else:
return len(vr) / vr.get_avg_fps(), vr.get_avg_fps()
def load_frames_at_timestamps(video_path, timestamps):
"""
Loads frames at given timestamps from a video.
Args:
video_path (str): Path to the video file.
timestamps (list): List of timestamps at which to load frames.
"""
vr = decord.VideoReader(video_path, num_threads=1)
duration = len(vr) / vr.get_avg_fps()
assert max(timestamps) <= duration, \
"Timestamps should be within the duration of the video."
indices = [int(t * vr.get_avg_fps()) for t in timestamps]
frames = [PIL.Image.fromarray(vr[i].asnumpy()) for i in indices]
return frames
import ffmpeg
import os
from pathlib import Path
def cut_video(video_path, start_time, end_time, save_path):
"""
Cut a video clip from a source video file.
Args:
video_path (str): Path to the input video file
start_time (str or float): Start time in seconds (float) or time format (str like "00:01:30")
end_time (str or float): End time in seconds (float) or time format (str like "00:02:45")
save_path (str): Path where the cut video will be saved
Returns:
bool: True if successful, False otherwise
Raises:
FileNotFoundError: If input video file doesn't exist
Exception: For other FFmpeg-related errors
"""
# Check if input file exists
if not os.path.exists(video_path):
raise FileNotFoundError(f"Input video file not found: {video_path}")
# Create output directory if it doesn't exist
output_dir = os.path.dirname(save_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
try:
# Calculate duration if times are provided as numbers
if isinstance(start_time, (int, float)) and isinstance(end_time, (int, float)):
duration = end_time - start_time
else:
# Let FFmpeg handle time format strings directly
duration = None
# Build FFmpeg command
input_stream = ffmpeg.input(video_path)
if duration is not None:
# Use start time and duration
output_stream = input_stream.video.filter('trim', start=start_time, duration=duration)
audio_stream = input_stream.audio.filter('atrim', start=start_time, duration=duration)
else:
# Use start and end time strings
output_stream = input_stream.video.filter('trim', start=start_time, end=end_time)
audio_stream = input_stream.audio.filter('atrim', start=start_time, end=end_time)
# Combine video and audio streams
output = ffmpeg.output(
output_stream,
audio_stream,
save_path,
vcodec='copy', # Copy video codec to maintain quality and speed
acodec='copy' # Copy audio codec to maintain quality and speed
)
# Run the FFmpeg command (overwrite output file if it exists)
ffmpeg.run(output, overwrite_output=True, quiet=True)
print(f"Video successfully cut and saved to: {save_path}")
return True
except ffmpeg.Error as e:
print(f"FFmpeg error occurred: {e}")
return False
except Exception as e:
print(f"An error occurred: {e}")
return False
# Alternative simpler version using subprocess (if ffmpeg-python is not available)
import subprocess
def cut_video_subprocess(video_path, start_time, end_time, save_path):
"""
Cut a video clip using subprocess to call FFmpeg directly.
Args:
video_path (str): Path to the input video file
start_time (str): Start time (e.g., "00:01:30" or "90")
end_time (str): End time (e.g., "00:02:45" or "165")
save_path (str): Path where the cut video will be saved
Returns:
bool: True if successful, False otherwise
"""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Input video file not found: {video_path}")
# Create output directory if it doesn't exist
output_dir = os.path.dirname(save_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
try:
# Build FFmpeg command
cmd = [
'ffmpeg',
'-i', video_path,
'-ss', str(start_time),
'-to', str(end_time),
'-c', 'copy', # Copy streams without re-encoding
'-y', # Overwrite output file
save_path
]
# Run the command
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"Video successfully cut and saved to: {save_path}")
return True
else:
print(f"FFmpeg error: {result.stderr}")
return False
except Exception as e:
print(f"An error occurred: {e}")
return False
# Example usage
if __name__ == "__main__":
# Example 1: Using time in seconds
cut_video(
video_path="sample_data/folding_paper.mp4",
start_time=3, # 30 seconds
end_time=9, # 90 seconds
save_path="output_clip.mp4"
)