File size: 6,711 Bytes
7fa9d90 ee36c8e 7fa9d90 ee36c8e 7fa9d90 ee36c8e 7fa9d90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | import subprocess
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class FFmpegUtils:
"""Utilities for audio and video processing with FFmpeg"""
@staticmethod
def save_audio_as_wav(audio_data: bytes, output_path: Path):
"""
Save audio data as WAV file (normalized for Whisper)
Args:
audio_data: Raw audio bytes (WAV format from TTS)
output_path: Where to save the normalized WAV
"""
logger.debug(f"Saving normalized WAV to {output_path}")
# Write input data to temp file
temp_input = output_path.parent / f"temp_{output_path.name}"
temp_input.write_bytes(audio_data)
try:
# Normalize audio for Whisper (16kHz, mono, 16-bit PCM)
subprocess.run([
"ffmpeg",
"-i", str(temp_input),
"-ar", "16000", # 16kHz sample rate
"-ac", "1", # Mono
"-sample_fmt", "s16", # 16-bit PCM
"-y", # Overwrite
str(output_path)
], check=True, capture_output=True)
logger.debug(f"Saved normalized WAV: {output_path}")
finally:
# Clean up temp file
if temp_input.exists():
temp_input.unlink()
@staticmethod
def save_audio_as_mp3(audio_data: bytes, output_path: Path):
"""
Convert audio data to MP3
Args:
audio_data: Raw audio bytes (WAV format from TTS)
output_path: Where to save the MP3
"""
logger.debug(f"Converting to MP3: {output_path}")
# Write input data to temp file
temp_input = output_path.parent / f"temp_{output_path.name}.wav"
temp_input.write_bytes(audio_data)
try:
# Convert to MP3
subprocess.run([
"ffmpeg",
"-i", str(temp_input),
"-codec:a", "libmp3lame",
"-qscale:a", "2", # High quality
"-y", # Overwrite
str(output_path)
], check=True, capture_output=True)
logger.debug(f"Saved MP3: {output_path}")
finally:
if temp_input.exists():
temp_input.unlink()
@staticmethod
def get_video_duration(file_path: Path) -> float:
"""
Get duration of video file in seconds using ffprobe
Args:
file_path: Path to video file
Returns:
Duration in seconds
"""
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(file_path)
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return float(result.stdout.strip())
except Exception as e:
logger.error(f"Failed to get video duration for {file_path}: {e}")
return 0.0
@staticmethod
def normalize_video(input_path: Path, output_path: Path):
"""
Normalize video to standard format (H.264, 30fps, AAC) to fix seeking/black screen issues.
Args:
input_path: Path to source video
output_path: Path to save normalized video
"""
logger.debug(f"Normalizing video: {input_path} -> {output_path}")
try:
cmd = [
"ffmpeg",
"-i", str(input_path),
"-c:v", "libx264",
"-preset", "fast",
"-r", "30",
"-c:a", "aac",
"-pix_fmt", "yuv420p",
"-y",
str(output_path)
]
subprocess.run(cmd, check=True, capture_output=True)
logger.debug(f"Normalized video saved to {output_path}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to normalize video {input_path}: {e.stderr.decode()}")
raise e
except Exception as e:
logger.error(f"Error normalizing video {input_path}: {e}")
raise e
@staticmethod
def cut_video(input_path: Path, output_path: Path, start_time: float, duration: float):
"""
Cut a segment from a video file using FFmpeg.
Uses stream copy for 10x faster cutting (no re-encoding).
Audio is removed since TTS is used separately.
Args:
input_path: Source video
output_path: Destination for the segment
start_time: Start time in seconds
duration: Duration of the segment in seconds
"""
try:
cmd = [
"ffmpeg",
"-ss", str(start_time), # Seek to start (before -i for fast seeking)
"-i", str(input_path),
"-t", str(duration),
"-c:v", "copy", # Stream copy - no re-encode (10x faster!)
"-an", # Remove audio (TTS is used)
"-y",
str(output_path)
]
subprocess.run(cmd, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
logger.error(f"Failed to cut video {input_path}: {e.stderr.decode()}")
raise e
@staticmethod
def image_to_video(input_path: Path, output_path: Path, duration: float):
"""
Convert image to video of specific duration
Args:
input_path: Path to source image (jpg, png, etc.)
output_path: Path to save the output video
duration: Duration of the video in seconds
"""
try:
cmd = [
"ffmpeg",
"-loop", "1",
"-i", str(input_path),
"-t", str(duration),
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-vf", "scale=1080:1920:force_original_aspect_ratio=decrease,pad=1080:1920:(ow-iw)/2:(oh-ih)/2",
"-r", "30",
"-y",
str(output_path)
]
subprocess.run(cmd, check=True, capture_output=True)
logger.debug(f"Created video from image: {output_path}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to convert image to video: {e.stderr.decode()}")
raise e
|