|
|
import ffmpeg, typer, os, sys, json |
|
|
from loguru import logger |
|
|
from PIL import Image |
|
|
from tqdm import tqdm |
|
|
|
|
|
logger.remove() |
|
|
logger.add( |
|
|
sys.stderr, |
|
|
format="<d>{time:YYYY-MM-DD ddd HH:mm:ss}</d> | <lvl>{level}</lvl> | <lvl>{message}</lvl>", |
|
|
) |
|
|
app = typer.Typer(pretty_exceptions_show_locals=False) |
|
|
|
|
|
|
|
|
def parse_frame_name(fname: str): |
|
|
"""return a tuple of frame_type and frame_index""" |
|
|
fn, fext = os.path.splitext(os.path.basename(fname)) |
|
|
frame_type, frame_index = fn.split("_") |
|
|
return frame_type, int(frame_index) |
|
|
|
|
|
|
|
|
def get_fps_ffmpeg(video_path: str): |
|
|
probe = ffmpeg.probe(video_path) |
|
|
|
|
|
video_stream = next( |
|
|
(stream for stream in probe["streams"] if stream["codec_type"] == "video"), None |
|
|
) |
|
|
if video_stream is None: |
|
|
raise ValueError("No video stream found") |
|
|
|
|
|
r_frame_rate = video_stream["r_frame_rate"] |
|
|
num, denom = map(int, r_frame_rate.split("/")) |
|
|
return num / denom |
|
|
|
|
|
|
|
|
@app.command() |
|
|
def get_video_metadata(video_path: str, bverbose: bool = True): |
|
|
""" |
|
|
Extract comprehensive metadata from a video file. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to the video file |
|
|
|
|
|
Returns: |
|
|
dict: Dictionary containing video metadata including: |
|
|
- width, height: Video dimensions |
|
|
- duration: Video duration in seconds |
|
|
- fps: Frames per second |
|
|
- codec: Video codec name |
|
|
- bitrate: Video bitrate |
|
|
- format_name: Container format |
|
|
- file_size: File size in bytes |
|
|
""" |
|
|
try: |
|
|
probe = ffmpeg.probe(video_path) |
|
|
|
|
|
|
|
|
video_stream = next( |
|
|
(stream for stream in probe["streams"] if stream["codec_type"] == "video"), |
|
|
None, |
|
|
) |
|
|
|
|
|
if video_stream is None: |
|
|
raise ValueError("No video stream found") |
|
|
|
|
|
|
|
|
width = int(video_stream.get("width", 0)) |
|
|
height = int(video_stream.get("height", 0)) |
|
|
duration = float(video_stream.get("duration", 0)) |
|
|
|
|
|
|
|
|
r_frame_rate = video_stream.get("r_frame_rate", "0/1") |
|
|
num, denom = map(int, r_frame_rate.split("/")) |
|
|
fps = num / denom if denom != 0 else 0 |
|
|
|
|
|
|
|
|
codec = video_stream.get("codec_name", "unknown") |
|
|
bitrate = ( |
|
|
int(video_stream.get("bit_rate", 0)) if video_stream.get("bit_rate") else 0 |
|
|
) |
|
|
|
|
|
|
|
|
format_info = probe.get("format", {}) |
|
|
format_name = format_info.get("format_name", "unknown") |
|
|
file_size = int(format_info.get("size", 0)) |
|
|
|
|
|
|
|
|
audio_stream = next( |
|
|
(stream for stream in probe["streams"] if stream["codec_type"] == "audio"), |
|
|
None, |
|
|
) |
|
|
|
|
|
audio_codec = audio_stream.get("codec_name", "none") if audio_stream else "none" |
|
|
audio_bitrate = ( |
|
|
int(audio_stream.get("bit_rate", 0)) |
|
|
if audio_stream and audio_stream.get("bit_rate") |
|
|
else 0 |
|
|
) |
|
|
|
|
|
metadata = { |
|
|
"width": width, |
|
|
"height": height, |
|
|
"duration": duration, |
|
|
"fps": fps, |
|
|
"video_codec": codec, |
|
|
"video_bitrate": bitrate, |
|
|
"audio_codec": audio_codec, |
|
|
"audio_bitrate": audio_bitrate, |
|
|
"format_name": format_name, |
|
|
"file_size": file_size, |
|
|
"total_streams": len(probe["streams"]), |
|
|
} |
|
|
if bverbose: |
|
|
logger.info(f"Video metadata extracted: {json.dumps(metadata, indent=4)}") |
|
|
return metadata |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to extract video metadata: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
@app.command() |
|
|
def extract_frames( |
|
|
input_path: str, |
|
|
fps: int = 8, |
|
|
max_short_edge: int = 1080, |
|
|
write_timestamp: bool = True, |
|
|
write_frame_num: bool = True, |
|
|
output_dir: str = None, |
|
|
): |
|
|
""" |
|
|
Extract frames from a video file using FFmpeg. |
|
|
|
|
|
Args: |
|
|
input_path (str): Path to the input video file. |
|
|
fps (int): Frames per second to extract. |
|
|
max_short_edge (int): Maximum length of the shorter edge of the extracted frames. |
|
|
write_timestamp (bool): Whether to write the timestamp of each frame. |
|
|
write_frame_num (bool): Whether to write the frame number of each frame. |
|
|
output_dir (str): Directory to save the extracted frames. |
|
|
|
|
|
Returns: |
|
|
List of PIL Images |
|
|
""" |
|
|
if output_dir: |
|
|
assert os.path.isdir( |
|
|
output_dir |
|
|
), f"Output directory {output_dir} does not exist" |
|
|
|
|
|
|
|
|
vmeta = get_video_metadata(input_path, bverbose=False) |
|
|
org_w, org_h = vmeta["width"], vmeta["height"] |
|
|
max_short_edge = int(max_short_edge) if max_short_edge else min(org_w, org_h) |
|
|
long_edge = int((max(org_h, org_w) / min(org_h, org_w)) * max_short_edge) |
|
|
long_edge += 0 if long_edge % 2 == 0 else 1 |
|
|
duration = vmeta["duration"] |
|
|
org_fps = vmeta["fps"] |
|
|
if fps > org_fps: |
|
|
logger.debug( |
|
|
f"requested fps({fps}) exceeded source fps({org_fps}): fps will be capped to source fps({org_fps})" |
|
|
) |
|
|
fps = org_fps |
|
|
|
|
|
|
|
|
total_frames = int(duration * fps) |
|
|
|
|
|
|
|
|
add_scale_filter = max_short_edge < min(org_w, org_h) |
|
|
w = max_short_edge if org_w < org_h else long_edge |
|
|
h = max_short_edge if org_w > org_h else long_edge |
|
|
logger.debug(f"Video dimensions: {org_w}x{org_h}") |
|
|
if add_scale_filter: |
|
|
logger.debug(f"\tscaling video to {w}x{h}") |
|
|
|
|
|
|
|
|
drawtext_filter_text = ( |
|
|
r"text='Timestamp\:%{pts\:hms} \|Frame Number\: %{frame_num}'" |
|
|
if write_frame_num |
|
|
else r"text='Timestamp\:%{pts\:hms}'" |
|
|
) |
|
|
|
|
|
|
|
|
drawtext_filter = ( |
|
|
f",drawtext={drawtext_filter_text}: x=(w-tw)/2: y=h-(2*lh): fontcolor=white: fontsize=20: box=1: boxcolor=0x00000099: boxborderw=5" |
|
|
if write_timestamp |
|
|
else "" |
|
|
) |
|
|
scale_filter = ( |
|
|
|
|
|
f",scale='{w}:{h}'" |
|
|
if add_scale_filter |
|
|
else "" |
|
|
) |
|
|
filter_chain = f"fps={fps}{drawtext_filter}{scale_filter}" |
|
|
|
|
|
|
|
|
process = ( |
|
|
ffmpeg.input(input_path) |
|
|
.output("pipe:", vf=filter_chain, format="rawvideo", pix_fmt="rgb24") |
|
|
.run_async(pipe_stdout=True, pipe_stderr=True) |
|
|
) |
|
|
logger.info(f"running ffmpeg with filter:\n{filter_chain}") |
|
|
|
|
|
frame_size = ( |
|
|
long_edge * max_short_edge * 3 if add_scale_filter else org_w * org_h * 3 |
|
|
) |
|
|
frames = [] |
|
|
|
|
|
|
|
|
for _ in tqdm(range(total_frames), desc="Extracting frames with FFMPEG"): |
|
|
in_bytes = process.stdout.read(frame_size) |
|
|
if not in_bytes or len(in_bytes) < frame_size: |
|
|
break |
|
|
frame = Image.frombytes( |
|
|
"RGB", (w, h) if add_scale_filter else (org_w, org_h), in_bytes |
|
|
) |
|
|
frames.append(frame) |
|
|
|
|
|
process.stdout.close() |
|
|
process.wait() |
|
|
|
|
|
if output_dir: |
|
|
vname, _ = os.path.splitext(os.path.basename(input_path)) |
|
|
for i, im in enumerate(tqdm(frames, desc=f"Saving frames to {output_dir}")): |
|
|
output_path = os.path.join(output_dir, f"{vname}_{i}.jpg") |
|
|
im.save(output_path) |
|
|
|
|
|
return frames |
|
|
|
|
|
|
|
|
def extract_specific_frames( |
|
|
input_path: str, |
|
|
timestamps_or_frames: list, |
|
|
max_short_edge: int = 1080, |
|
|
): |
|
|
""" |
|
|
Extract specific frames from a video file using FFmpeg at given timestamps or frame numbers. |
|
|
|
|
|
Args: |
|
|
input_path (str): Path to the input video file. |
|
|
timestamps_or_frames (list): List of timestamps (in seconds) or frame numbers to extract. |
|
|
max_short_edge (int): Maximum length of the shorter edge of the extracted frames. |
|
|
write_timestamp (bool): Whether to write the timestamp of each frame. |
|
|
write_frame_num (bool): Whether to write the frame number of each frame. |
|
|
use_timestamps (bool): If True, treat input list as timestamps. If False, treat as frame numbers. |
|
|
|
|
|
Returns: |
|
|
List of PIL Images corresponding to the specified timestamps/frames |
|
|
""" |
|
|
|
|
|
vmeta = get_video_metadata(input_path, bverbose=False) |
|
|
org_w, org_h = vmeta["width"], vmeta["height"] |
|
|
max_short_edge = int(max_short_edge) if max_short_edge else min(org_w, org_h) |
|
|
long_edge = int((max(org_h, org_w) / min(org_h, org_w)) * max_short_edge) |
|
|
long_edge += 0 if long_edge % 2 == 0 else 1 |
|
|
duration = vmeta["duration"] |
|
|
org_fps = vmeta["fps"] |
|
|
|
|
|
|
|
|
add_scale_filter = max_short_edge < min(org_w, org_h) |
|
|
w = max_short_edge if org_w < org_h else long_edge |
|
|
h = max_short_edge if org_w > org_h else long_edge |
|
|
logger.debug(f"Video dimensions: {org_w}x{org_h}") |
|
|
if add_scale_filter: |
|
|
logger.debug(f"\tscaling video to {w}x{h}") |
|
|
scale_filter = f",scale='{w}:{h}'" if add_scale_filter else "" |
|
|
|
|
|
frames = [] |
|
|
|
|
|
for target in tqdm(timestamps_or_frames, desc="Extracting specific frames"): |
|
|
try: |
|
|
|
|
|
use_timestamps = isinstance(target, float) |
|
|
if use_timestamps: |
|
|
seek_time = float(target) |
|
|
if seek_time > duration: |
|
|
logger.warning( |
|
|
f"Timestamp {seek_time}s exceeds video duration {duration}s, skipping" |
|
|
) |
|
|
continue |
|
|
else: |
|
|
|
|
|
seek_time = float(target) / org_fps |
|
|
if seek_time > duration: |
|
|
logger.warning(f"Frame {target} exceeds video duration, skipping") |
|
|
continue |
|
|
|
|
|
filter_chain = f"fps={org_fps}{scale_filter}" |
|
|
|
|
|
|
|
|
logger.debug(f"Extracting frame at {seek_time}s") |
|
|
process = ( |
|
|
ffmpeg.input(input_path, ss=seek_time) |
|
|
.output( |
|
|
"pipe:", |
|
|
vf=filter_chain, |
|
|
format="rawvideo", |
|
|
pix_fmt="rgb24", |
|
|
frames=1, |
|
|
) |
|
|
.run_async(pipe_stdout=True, pipe_stderr=True) |
|
|
) |
|
|
|
|
|
frame_size = ( |
|
|
w * h * 3 if add_scale_filter else org_w * org_h * 3 |
|
|
) |
|
|
|
|
|
in_bytes = process.stdout.read(frame_size) |
|
|
if in_bytes and len(in_bytes) >= frame_size: |
|
|
frame = Image.frombytes( |
|
|
"RGB", (w, h) if add_scale_filter else (org_w, org_h), in_bytes |
|
|
) |
|
|
frames.append(frame) |
|
|
else: |
|
|
logger.warning( |
|
|
f"Failed to extract frame at {'timestamp' if use_timestamps else 'frame'} {target}" |
|
|
) |
|
|
frames.append( |
|
|
None |
|
|
) |
|
|
|
|
|
process.stdout.close() |
|
|
process.wait() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"Error extracting frame at {'timestamp' if use_timestamps else 'frame'} {target}: {e}" |
|
|
) |
|
|
frames.append(None) |
|
|
|
|
|
|
|
|
logger.info( |
|
|
f"Successfully extracted {len([f for f in frames if f is not None])} out of {len(timestamps_or_frames)} requested frames" |
|
|
) |
|
|
|
|
|
return frames |
|
|
|
|
|
|
|
|
@app.command() |
|
|
def extract_audio( |
|
|
video_path: str, |
|
|
output_dir: str = "/tmp/miro/clip_cognition/audio/", |
|
|
overwrite: bool = False, |
|
|
): |
|
|
"""extracting audio of a video file into m4a without re-encoding |
|
|
ref: https://www.baeldung.com/linux/ffmpeg-audio-from-video#1-extracting-audio-without-re-encoding |
|
|
""" |
|
|
|
|
|
vmeta = get_video_metadata(video_path, bverbose=False) |
|
|
if vmeta.get("audio_codec") == "none": |
|
|
logger.error(f"No audio found in {video_path}") |
|
|
return None |
|
|
|
|
|
|
|
|
output_dir = output_dir if output_dir else os.path.dirname(video_path) |
|
|
vname, vext = os.path.splitext(os.path.basename(video_path)) |
|
|
output_dir = os.path.join(output_dir, vname) |
|
|
output_fname = os.path.join(output_dir, vname + ".mp3") |
|
|
if os.path.isfile(output_fname): |
|
|
if overwrite: |
|
|
os.remove(output_fname) |
|
|
logger.warning(f"removed existing data: {output_fname}") |
|
|
else: |
|
|
logger.error(f"overwrite is false and data already exists!") |
|
|
return None |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
stream = ffmpeg.input(video_path) |
|
|
config_dict = {"map": "0:a", "acodec": "mp3"} |
|
|
stream = ffmpeg.output(stream, output_fname, **config_dict) |
|
|
|
|
|
|
|
|
try: |
|
|
ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) |
|
|
logger.success(f"audio extracted to {output_fname}") |
|
|
return output_fname |
|
|
except ffmpeg.Error as e: |
|
|
logger.error(f"Error executing FFmpeg command: {e.stderr.decode()}") |
|
|
return None |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app() |
|
|
|