Tools / src /utils.py
jebin2's picture
Refactor: Standardize logging by replacing print statements with logger calls and adjusting verbosity levels.
503d4ac
"""
Utility functions
"""
from __future__ import annotations
from src.logger_config import logger, setup_logger
import sys
from pathlib import Path
import subprocess
import os
import uuid
import re
import shutil
import tempfile
from src.config import get_config_value
import json
import traceback
import cv2
import numpy as np
import imagehash
from PIL import Image
from moviepy.editor import VideoFileClip
import tempfile
import librosa
def get_temp_dir(prefix: str = "tmp_") -> Path:
"""
Creates a temp directory.
Uses fixed path during test automation if configured.
"""
if get_config_value("test_automation"):
base_dir = get_config_value("test_data_directory")
if not base_dir:
raise RuntimeError("TEST_DATA_DIRECTORY must be set when TEST_AUTOMATION=true")
# Ensure base dir exists
Path(base_dir).mkdir(parents=True, exist_ok=True)
sub_dir = "output"
if "download" in prefix:
sub_dir = "downloads"
path = Path(base_dir) / sub_dir
path.mkdir(parents=True, exist_ok=True)
return path
return Path(tempfile.mkdtemp(prefix=prefix))
def get_video_duration(path: str) -> float:
"""
Returns the duration of a video file in seconds as a float.
Uses ffprobe (very fast and accurate).
"""
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "format=duration",
"-of", "json",
path
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
info = json.loads(result.stdout)
return float(info["format"]["duration"])
def calculate_video_durations(selected_videos, all_tts_script_segment, word_level_segment, total_duration: float) -> None:
"""
Calculate and update duration for each video based on word-level segments.
Uses three approaches in order of preference:
1. Simple word count matching (if counts align exactly)
2. Text matching with cleaning (if counts differ slightly)
3. Fuzzy matching (if words are missing or mismatched)
"""
try:
# Get word counts
all_script_words = all_tts_script_segment.split()
def clean_word(word: str) -> str:
return re.sub(r'[^a-zA-Z]', '', word).lower()
cleaned_script_words = [clean_word(w) for w in all_script_words if clean_word(w)]
cleaned_segment_words = [clean_word(seg.get("word", "")) for seg in word_level_segment if clean_word(seg.get("word", ""))]
logger.debug(f"πŸ“Š Original: Script={len(all_script_words)} words, Segments={len(word_level_segment)} words")
logger.debug(f"πŸ“Š Cleaned: Script={len(cleaned_script_words)} words, Segments={len(cleaned_segment_words)} words")
logger.debug(f"⏱️ Total audio duration: {total_duration}s (starting from 0)")
# APPROACH 1: Exact match (original word counts)
if len(all_script_words) == len(word_level_segment):
logger.debug("βœ… Using APPROACH 1: Simple word count matching")
calculate_durations_simple(selected_videos, word_level_segment, total_duration)
# APPROACH 2: Cleaned match (cleaned word counts)
elif len(cleaned_script_words) == len(cleaned_segment_words):
logger.debug("βœ… Using APPROACH 2: Text matching with cleaning")
calculate_durations_with_text_matching(selected_videos, word_level_segment, total_duration)
# APPROACH 3: Fuzzy match
else:
diff = abs(len(cleaned_script_words) - len(cleaned_segment_words))
logger.debug(f"⚠️ Word count mismatch after cleaning (diff: {diff})")
logger.debug("πŸ” Using APPROACH 3: Fuzzy matching")
calculate_durations_with_fuzzy_matching(selected_videos, word_level_segment, total_duration)
except Exception as e:
logger.error(f"❌ Failed to calculate video durations: {e}")
traceback.print_exc()
# Fallback: set equal durations
equal_duration = total_duration / len(selected_videos)
for video in selected_videos:
video["duration"] = round(equal_duration, 2)
def calculate_durations_simple(selected_videos, word_level_segment, total_duration: float) -> None:
"""
APPROACH 1: Simple sequential matching when word counts align exactly.
First video always starts at 0, last video always ends at total_duration.
"""
current_word_index = 0
for i, video in enumerate(selected_videos):
tts_text = video.get("tts_script_segment", "").strip()
if not tts_text:
video["duration"] = 0
continue
word_count = len(tts_text.split())
# Get start time for this segment
if i == 0:
start_time = 0.0
else:
start_time = word_level_segment[current_word_index]["start_time"]
# Calculate next word index
next_word_index = current_word_index + word_count
# Get end time
if i + 1 == len(selected_videos):
end_time = total_duration
else:
if next_word_index < len(word_level_segment):
end_time = word_level_segment[next_word_index]["start_time"]
else:
end_time = total_duration
# Calculate duration
video["duration"] = round(end_time - start_time, 2)
logger.debug(f" Video {i}: [{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s | '{tts_text[:40]}...'")
# Move to next segment
current_word_index = next_word_index
# Verify total
total_calculated = sum(v.get("duration", 0) for v in selected_videos)
logger.debug(f"βœ… Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)")
def calculate_durations_with_text_matching(selected_videos, word_level_segment, total_duration: float) -> None:
"""
APPROACH 2: Text matching with cleaned/normalized text.
Handles cases where word counts don't align due to numbers, punctuation, etc.
First video always starts at 0, last video always ends at total_duration.
"""
def clean_word(word: str) -> str:
"""Clean a single word - remove numbers, special chars, keep only alpha"""
return re.sub(r'[^a-zA-Z]', '', word).lower()
# Build cleaned word list from word_level_segment
cleaned_word_segments = []
for seg in word_level_segment:
word = seg.get("word", "")
cleaned = clean_word(word)
if cleaned:
cleaned_word_segments.append({
"cleaned": cleaned,
"original": word,
"start_time": seg.get("start_time", 0),
"end_time": seg.get("end_time", 0)
})
logger.debug(f"πŸ“ Cleaned word segments: {len(cleaned_word_segments)} words")
# Track current position in cleaned_word_segments
current_word_index = 0
for i, video in enumerate(selected_videos):
tts_text = video.get("tts_script_segment", "").strip()
if not tts_text:
video["duration"] = 0
continue
# Clean the video's script segment
video_words = tts_text.split()
cleaned_video_words = [clean_word(w) for w in video_words if clean_word(w)]
if not cleaned_video_words:
video["duration"] = 0
continue
word_count = len(cleaned_video_words)
logger.debug(f" Video {i}: Looking for {word_count} cleaned words starting at index {current_word_index}")
# Get start time
if i == 0:
start_time = 0.0
elif current_word_index < len(cleaned_word_segments):
start_time = cleaned_word_segments[current_word_index]["start_time"]
else:
logger.warning(f" ⚠️ Out of word segments, using remaining time")
remaining_videos = len(selected_videos) - i
remaining_time = total_duration - (cleaned_word_segments[-1]["end_time"] if cleaned_word_segments else 0)
video["duration"] = round(remaining_time / remaining_videos, 2)
continue
# Calculate next word index
next_word_index = current_word_index + word_count
# Get end time
if i + 1 == len(selected_videos):
end_time = total_duration
else:
if next_word_index < len(cleaned_word_segments):
end_time = cleaned_word_segments[next_word_index]["start_time"]
else:
end_time = total_duration
# Calculate duration
video["duration"] = round(end_time - start_time, 2)
logger.debug(f" βœ… [{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s | {word_count} words")
# Move to next segment
current_word_index = next_word_index
# Verify total
total_calculated = sum(v.get("duration", 0) for v in selected_videos)
logger.debug(f"βœ… Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)")
def calculate_durations_with_fuzzy_matching(selected_videos, word_level_segment, total_duration: float) -> None:
"""
APPROACH 3: Fuzzy matching with flexible word alignment.
Handles cases where words are missing, misspelled, or slightly different.
First video always starts at 0, last video always ends at total_duration.
"""
from difflib import SequenceMatcher
def clean_word(word: str) -> str:
"""Clean a single word - remove numbers, special chars, keep only alpha"""
return re.sub(r'[^a-zA-Z]', '', word).lower()
def similarity_ratio(word1: str, word2: str) -> float:
"""Calculate similarity between two words (0.0 to 1.0)"""
if not word1 or not word2:
return 0.0
return SequenceMatcher(None, word1, word2).ratio()
# Build cleaned word list from word_level_segment
cleaned_word_segments = []
for seg in word_level_segment:
word = seg.get("word", "")
cleaned = clean_word(word)
if cleaned:
cleaned_word_segments.append({
"cleaned": cleaned,
"original": word,
"start_time": seg.get("start_time", 0),
"end_time": seg.get("end_time", 0)
})
logger.debug(f"πŸ“ Cleaned word segments: {len(cleaned_word_segments)} words")
# Track current position in cleaned_word_segments
current_word_index = 0
for i, video in enumerate(selected_videos):
tts_text = video.get("tts_script_segment", "").strip()
if not tts_text:
video["duration"] = 0
continue
# Clean the video's script segment
video_words = tts_text.split()
cleaned_video_words = [clean_word(w) for w in video_words if clean_word(w)]
if not cleaned_video_words:
video["duration"] = 0
continue
word_count = len(cleaned_video_words)
logger.debug(f" Video {i}: Fuzzy matching {word_count} words starting at index {current_word_index}")
# Get start time
if i == 0:
start_time = 0.0
elif current_word_index < len(cleaned_word_segments):
start_time = cleaned_word_segments[current_word_index]["start_time"]
else:
logger.warning(f" ⚠️ Out of word segments")
remaining_videos = len(selected_videos) - i
remaining_time = total_duration - (cleaned_word_segments[-1]["end_time"] if cleaned_word_segments else 0)
video["duration"] = round(remaining_time / remaining_videos, 2)
continue
# FUZZY MATCHING: Match words with flexibility
matched_count = 0
search_index = current_word_index
last_matched_index = current_word_index - 1
for video_word in cleaned_video_words:
found = False
# Search within a window (next 5 words to avoid jumping too far)
search_end = min(search_index + 5, len(cleaned_word_segments))
for j in range(search_index, search_end):
segment_word = cleaned_word_segments[j]["cleaned"]
# Exact match
if video_word == segment_word:
matched_count += 1
last_matched_index = j
search_index = j + 1
found = True
break
# Substring match
if video_word in segment_word or segment_word in video_word:
matched_count += 1
last_matched_index = j
search_index = j + 1
found = True
logger.debug(f" Substring match: '{video_word}' β‰ˆ '{segment_word}'")
break
# Fuzzy similarity match
similarity = similarity_ratio(video_word, segment_word)
if similarity >= 0.75: # 75% similarity threshold
matched_count += 1
last_matched_index = j
search_index = j + 1
found = True
logger.debug(f" Fuzzy match: '{video_word}' β‰ˆ '{segment_word}' (sim: {similarity:.2f})")
break
if not found:
logger.debug(f" No match for '{video_word}'")
# Determine end index
if matched_count > 0:
next_word_index = last_matched_index + 1
logger.debug(f" βœ“ Matched {matched_count}/{word_count} words")
else:
logger.warning(f" ⚠️ No matches, estimating position")
next_word_index = min(current_word_index + word_count, len(cleaned_word_segments))
# Get end time
if i + 1 == len(selected_videos):
end_time = total_duration
else:
if next_word_index < len(cleaned_word_segments):
end_time = cleaned_word_segments[next_word_index]["start_time"]
else:
end_time = total_duration
# Calculate duration
video["duration"] = round(end_time - start_time, 2)
logger.debug(f" βœ… [{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s")
# Move to next segment
current_word_index = next_word_index
# Verify total
total_calculated = sum(v.get("duration", 0) for v in selected_videos)
logger.debug(f"βœ… Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)")
def is_video_loopable(video_path, frame_check_window=10, threshold=15.0):
if not video_path:
return False
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if frame_count <= frame_check_window * 2:
cap.release()
return False # Too short to judge loopability
frame_indices = list(range(frame_check_window)) + \
list(range(frame_count - frame_check_window, frame_count))
frames = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if not ret or frame is None:
continue
frame = cv2.resize(frame, (128, 128))
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY))
cap.release()
if len(frames) < 2 * frame_check_window:
return False
start_frames = np.array(frames[:frame_check_window])
end_frames = np.array(frames[-frame_check_window:])
diff = np.mean(np.abs(start_frames.astype(np.float32) - end_frames.astype(np.float32)))
logger.debug(f"πŸ” Mean frame difference: {diff:.2f}")
return diff < threshold
def is_loopable_phash(video_path, hash_diff_threshold=8):
if not video_path:
return False
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if frame_count < 2:
cap.release()
return False
# Read first frame
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, start = cap.read()
if not ret or start is None:
cap.release()
return False
# Try to read last valid frame (with fallback)
last_frame_index = frame_count - 1
ret, end = False, None
while not ret and last_frame_index > frame_count - 10:
cap.set(cv2.CAP_PROP_POS_FRAMES, last_frame_index)
ret, end = cap.read()
last_frame_index -= 1
cap.release()
if end is None or not ret:
return False
start_hash = imagehash.phash(Image.fromarray(cv2.cvtColor(start, cv2.COLOR_BGR2RGB)))
end_hash = imagehash.phash(Image.fromarray(cv2.cvtColor(end, cv2.COLOR_BGR2RGB)))
diff = abs(start_hash - end_hash)
logger.debug(f"🧩 pHash difference: {diff}")
return diff < hash_diff_threshold
def is_video_zoomable_tail(video_path, tail_seconds=1, sample_frames=15, motion_threshold=1.5):
"""
Checks only the *last few seconds* of the video to see if it's already zooming.
Returns True if mostly static (safe to add zoom), False if motion already exists.
"""
return False
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
# Only analyze the last N seconds
start_frame = max(total_frames - int(tail_seconds * fps), 0)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY))
cap.release()
if len(frames) < 2:
return True # too few frames β†’ assume static
# Sample frames evenly
step = max(len(frames) // sample_frames, 1)
total_motion = 0
motion_samples = 0
for i in range(0, len(frames) - step, step):
prev_gray = frames[i]
gray = frames[i + step]
flow = cv2.calcOpticalFlowFarneback(
prev_gray, gray, None,
pyr_scale=0.5, levels=3, winsize=15,
iterations=3, poly_n=5, poly_sigma=1.2, flags=0
)
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
total_motion += np.mean(mag)
motion_samples += 1
avg_motion = total_motion / motion_samples if motion_samples else 0
logger.debug(f"πŸŽ₯ Tail motion magnitude: {avg_motion:.3f}")
# If low optical flow in tail section β†’ it's safe to add zoom
return avg_motion < motion_threshold
def selective_update_with_keymaps(source: dict, modified: dict, source_keys: list, modified_keys: list) -> dict:
"""
Update 'source' dict by copying values from 'modified' dict.
Each pair (source_key, modified_key) defines a mapping.
Example:
source_keys = ["url", "description"]
modified_keys = ["video_url", "desc_text"]
"""
updated = source.copy()
for s_key, m_key in zip(source_keys, modified_keys):
if m_key in modified:
updated[s_key] = modified[m_key]
return updated
def clean_tts_script(tts_script: str) -> str:
"""Split and clean TTS script joined by '-'."""
if tts_script:
# Split by hyphen and strip spaces
parts = [part.strip() for part in tts_script.split('-') if part.strip()]
return " ".join(parts).rstrip(".")
return ""
def reverse_clip(path_or_clip) -> str:
input_path = ""
# βœ… Handle any MoviePy clip (VideoFileClip, CompositeVideoClip, etc.)
if hasattr(path_or_clip, "write_videofile"):
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_input:
input_path = temp_input.name
path_or_clip.write_videofile(
input_path,
codec="libx264",
audio_codec="aac",
verbose=False,
logger=None,
fps=25
)
elif isinstance(path_or_clip, str):
input_path = path_or_clip
"""Reverse both video and audio using ffmpeg."""
out_path = os.path.join(tempfile.gettempdir(), f"{uuid.uuid4().hex[:8]}_reversed.mp4")
subprocess.run([
"ffmpeg", "-hide_banner", "-loglevel", "error",
"-y", "-i", input_path,
"-vf", "reverse",
"-af", "areverse",
out_path
], check=True)
return out_path
def interpolate_video(input_path: str, target_duration: float = 4.0, fps: int = 60) -> str:
"""
Smoothly extend a short video using motion interpolation.
Works entirely on CPU (no GPU required).
Args:
input_path: path to input video
target_duration: desired output length (seconds)
fps: target output framerate (default 60)
"""
return None
# Get actual duration
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", input_path]
duration_str = subprocess.check_output(cmd).decode().strip()
duration = float(duration_str)
# Calculate how much we need to stretch
stretch_factor = target_duration / duration
# Output file
base = os.path.splitext(os.path.basename(input_path))[0]
output_path = f"/tmp/{base}_interp.mp4"
# FFmpeg motion interpolation command
cmd = [
"ffmpeg",
"-i", input_path,
"-filter_complex",
f"[0:v]setpts={stretch_factor}*PTS,"
f"minterpolate='mi_mode=mci:mc_mode=aobmc:vsbmc=1:fps={fps}'[v]",
"-map", "[v]",
"-an", # remove audio
"-c:v", "libx264",
"-preset", "fast",
"-crf", "18",
"-y", output_path
]
subprocess.run(cmd, check=True)
return output_path
def _get_video_resolution(path: str) -> tuple[int, int]:
cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "json",
path,
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
raise RuntimeError(f"ffprobe failed:\n{result.stderr.decode()}")
info = json.loads(result.stdout)
stream = info["streams"][0]
return stream["width"], stream["height"]
def _get_pixel_format(path: str) -> str:
cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=pix_fmt",
"-of", "json",
path,
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
return ""
info = json.loads(result.stdout)
return info["streams"][0].get("pix_fmt", "")
def resize_video(input_path: str, target_width: int = 1080, target_height: int = 1920, overwrite: bool = False, force: bool = False) -> str:
"""
Resize a video to the given resolution (default 1080x1920) using FFmpeg.
If overwrite=True, replaces the original file safely after successful conversion.
If force=True, re-encodes even if the resolution already matches.
"""
if not os.path.exists(input_path):
raise FileNotFoundError(f"Input video not found: {input_path}")
# πŸ” Probe resolution
width, height = _get_video_resolution(input_path)
pix_fmt = _get_pixel_format(input_path)
# Check if we can skip:
# 1. Force is False
# 2. Dimensions match
# 3. Pixel format is yuv420p (required for broad compatibility)
if not force and width == target_width and height == target_height and pix_fmt == "yuv420p":
logger.debug(
f"Skipping resize (already {width}x{height}, {pix_fmt}): {os.path.basename(input_path)}"
)
return input_path
logger.debug(
f"Resizing/Re-encoding {os.path.basename(input_path)} "
f"({width}x{height}, {pix_fmt}) β†’ ({target_width}x{target_height}, yuv420p)"
)
temp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}.mp4")
# FFmpeg resize command (output goes to /tmp first)
# FFmpeg command for Crop-to-Fill (Strict 9:16 enforcement)
# scale=1080:1920:force_original_aspect_ratio=increase ensures min dim fits
# crop=1080:1920 crops the excess
cmd = [
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-i", input_path,
"-vf", f"scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height},setsar=1",
"-c:v", "libx264", "-crf", "18", "-preset", "slow",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
temp_output
]
# Run FFmpeg process
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
if os.path.exists(temp_output):
os.remove(temp_output)
raise RuntimeError(f"FFmpeg failed:\n{result.stderr.decode('utf-8', errors='ignore')}")
# Overwrite original safely if requested
if overwrite:
shutil.move(temp_output, input_path)
return input_path
return temp_output
def remove_black_padding(input_path: str, overwrite: bool = False, threshold_pct: float = 0.1) -> str:
"""
Automatically detect and remove black padding (crop only) using FFmpeg.
Saves to /tmp with a unique UUID filename unless overwrite=True.
Args:
input_path (str): Path to the input video.
overwrite (bool): If True, safely replace the original file.
threshold_pct (float): Only crop if black padding > threshold_pct (0.0 to 1.0).
0.0 = always crop if any padding detected.
Returns:
str: Path to the cropped video (or original if no crop needed).
"""
if get_config_value("test_automation"):
return input_path
if not os.path.exists(input_path):
raise FileNotFoundError(f"Input video not found: {input_path}")
# Step 1: Detect crop parameters using cropdetect
detect_cmd = [
"ffmpeg", "-i", input_path, "-vf", "cropdetect=24:16:0",
"-frames:v", "500", "-f", "null", "-"
]
result = subprocess.run(detect_cmd, stderr=subprocess.PIPE, text=True)
matches = re.findall(r"crop=\S+", result.stderr)
if not matches:
logger.debug("No black padding detected.")
return input_path
# Get most frequent crop value
crop_value = max(set(matches), key=matches.count)
# Parse crop string: crop=w:h:x:y
# Example: crop=1080:1520:0:200
try:
match = re.search(r"crop=(\d+):(\d+):(\d+):(\d+)", crop_value)
if match:
c_w, c_h, _, _ = map(int, match.groups())
# Get original resolution
orig_w, orig_h = _get_video_resolution(input_path)
orig_area = orig_w * orig_h
crop_area = c_w * c_h
padding_area = orig_area - crop_area
padding_pct = padding_area / orig_area if orig_area > 0 else 0
if padding_pct < threshold_pct:
logger.debug(f"Skipping crop: Padding {padding_pct:.1%} < Threshold {threshold_pct:.1%}")
return input_path
logger.debug(f"Detected crop: {crop_value} (Padding: {padding_pct:.1%})")
except Exception as e:
logger.warning(f"Could not parse crop value '{crop_value}' for threshold check: {e}")
# Proceed with cropping if parsing fails, or return?
# Safest is to proceed as before or log and continue.
# Let's proceed to maintain existing behavior on failure unless explicitly stopped.
logger.debug(f"Proceeding with crop: {crop_value}")
# Step 2: Create temp output file
tmp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_cropped.mp4")
# Step 3: Run FFmpeg crop command
crop_cmd = ["ffmpeg", "-y", "-i", input_path, "-vf", crop_value, "-c:a", "copy", tmp_output]
crop_proc = subprocess.run(crop_cmd, stderr=subprocess.PIPE, text=True)
if crop_proc.returncode != 0:
raise RuntimeError(f"FFmpeg crop failed:\n{crop_proc.stderr}")
# Step 4: Handle overwrite safely
if overwrite:
shutil.move(tmp_output, input_path)
return input_path
return tmp_output
def trim_black_frames(
input_path: str,
overwrite: bool = False,
black_threshold: int = 20,
min_frames_to_trim: int = 1,
max_frames_to_trim: int = 30
) -> str:
"""
Detect and remove solid black frames from the start and end of a video.
Uses FFmpeg showinfo filter to analyze frame luminance (Y channel mean).
A frame is considered black if its Y mean is <= black_threshold.
Args:
input_path: Path to the input video
overwrite: If True, replace the original file
black_threshold: Maximum Y luminance value to consider a frame as black (0-255)
Default 20 catches pure black (16) with some tolerance
min_frames_to_trim: Minimum black frames at start/end to trigger trimming
max_frames_to_trim: Maximum frames to check at start/end
Returns:
Path to the trimmed video, or original path if no trimming needed
"""
if not os.path.exists(input_path):
raise FileNotFoundError(f"Input video not found: {input_path}")
# Get video info
probe_cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=nb_frames,r_frame_rate,duration",
"-show_entries", "format=duration",
"-of", "json", input_path
]
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True)
if probe_result.returncode != 0:
logger.warning(f"Failed to probe video: {input_path}")
return input_path
probe_data = json.loads(probe_result.stdout)
# Get FPS
fps_str = probe_data.get("streams", [{}])[0].get("r_frame_rate", "25/1")
fps_parts = fps_str.split("/")
fps = float(fps_parts[0]) / float(fps_parts[1]) if len(fps_parts) == 2 else float(fps_parts[0])
# Get total duration
duration = float(probe_data.get("format", {}).get("duration", 0))
if duration == 0:
duration = float(probe_data.get("streams", [{}])[0].get("duration", 0))
if duration <= 0:
logger.warning(f"Could not determine video duration: {input_path}")
return input_path
# Analyze first N frames for black frames at start
start_black_frames = _count_black_frames_at_position(
input_path, "start", max_frames_to_trim, black_threshold, fps
)
# Analyze last N frames for black frames at end
end_black_frames = _count_black_frames_at_position(
input_path, "end", max_frames_to_trim, black_threshold, fps, duration
)
logger.debug(f"🎬 Black frame analysis: start={start_black_frames}, end={end_black_frames}")
# Check if trimming is needed
if start_black_frames < min_frames_to_trim and end_black_frames < min_frames_to_trim:
logger.debug(f"βœ… No black frames to trim in: {os.path.basename(input_path)}")
return input_path
# Calculate trim times
start_trim_time = start_black_frames / fps if start_black_frames >= min_frames_to_trim else 0
end_trim_time = end_black_frames / fps if end_black_frames >= min_frames_to_trim else 0
# New duration after trimming
new_duration = duration - start_trim_time - end_trim_time
if new_duration <= 0.1:
logger.warning(f"⚠️ Trimming would remove entire video, skipping: {input_path}")
return input_path
logger.debug(
f"βœ‚οΈ Trimming black frames: {os.path.basename(input_path)} "
f"(start: {start_trim_time:.3f}s, end: {end_trim_time:.3f}s)"
)
# Generate output path
temp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_trimmed.mp4")
# Build FFmpeg command
cmd = [
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-ss", str(start_trim_time),
"-i", input_path,
"-t", str(new_duration),
"-c:v", "libx264", "-preset", "fast", "-crf", "18",
"-pix_fmt", "yuv420p",
"-c:a", "copy",
temp_output
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"FFmpeg trim failed: {result.stderr}")
return input_path
logger.debug(f"βœ… Trimmed video saved: {temp_output}")
# Handle overwrite
if overwrite:
shutil.move(temp_output, input_path)
return input_path
return temp_output
def _count_black_frames_at_position(
video_path: str,
position: str, # "start" or "end"
max_frames: int,
black_threshold: int,
fps: float,
duration: float = 0
) -> int:
"""
Count consecutive black frames at the start or end of a video.
Args:
video_path: Path to video file
position: "start" or "end"
max_frames: Maximum frames to analyze
black_threshold: Y luminance threshold for black detection
fps: Video frame rate
duration: Video duration (required for "end" position)
Returns:
Number of consecutive black frames at the specified position
"""
# For start: analyze first max_frames frames
# For end: seek to near end and analyze last max_frames frames
if position == "end" and duration > 0:
seek_time = max(0, duration - (max_frames / fps) - 0.5)
ss_arg = ["-ss", str(seek_time)]
else:
ss_arg = []
# Use showinfo filter to get frame luminance
cmd = [
"ffmpeg", "-hide_banner",
*ss_arg,
"-i", video_path,
"-vf", f"select='lte(n,{max_frames})',showinfo",
"-frames:v", str(max_frames + 5),
"-f", "null", "-"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return 0
# Parse showinfo output for mean values
# Format: mean:[Y U V] where Y is luminance
# A pure black frame has Y=16 in YUV (limited range)
frame_means = []
for line in result.stderr.split('\n'):
match = re.search(r'mean:\[(\d+)\s+\d+\s+\d+\]', line)
if match:
y_mean = int(match.group(1))
frame_means.append(y_mean)
if not frame_means:
return 0
# Count consecutive black frames
if position == "start":
# Count from beginning
black_count = 0
for y_mean in frame_means:
if y_mean <= black_threshold:
black_count += 1
else:
break
return black_count
else:
# Count from end (reverse)
black_count = 0
for y_mean in reversed(frame_means):
if y_mean <= black_threshold:
black_count += 1
else:
break
return black_count
def ratio_1x1_to9x16(video_path, overwrite=False):
"""
Convert a 1:1 video to 9:16 by adding blurred padding using FFmpeg.
Saves to /tmp with a unique UUID filename unless overwrite=True.
Args:
video_path (str): Path to the input video.
overwrite (bool): If True, safely replace the original file.
Returns:
str: Path to the converted video.
"""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Input video not found: {video_path}")
tmp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_9x16.mp4")
cmd = [
"ffmpeg",
"-i", video_path,
"-vf", "crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080,pad=1080:1920:0:(1920-1080)/2:black",
"-c:a", "copy",
"-y",
tmp_output
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed:\n{result.stderr.decode('utf-8', errors='ignore')}")
if overwrite:
shutil.move(tmp_output, video_path)
return video_path
return tmp_output
def get_best_beat_method(audio_path: str, min_interval: float = 1.0, target_beats: int = 10) -> tuple[np.ndarray, str]:
"""
Try all beat detection methods and return the one with closest to target number of beats.
Args:
audio_path: Path to audio file
min_interval: Minimum time between beats in seconds
target_beats: Desired number of beats (default 10 for 10-15 sec videos)
Returns:
Tuple of (beat_times, method_name)
"""
methods = ["kick", "snare", "downbeat", "general"]
results = {}
logger.debug(f"Testing all beat detection methods (target: ~{target_beats} beats)...")
for method in methods:
try:
beat_times = get_beat_times(audio_path, beat_type=method, min_interval=min_interval)
results[method] = beat_times
logger.debug(f"{method:12s}: {len(beat_times):2d} beats detected")
except Exception as e:
logger.debug(f"{method:12s}: ERROR - {e}")
results[method] = np.array([])
# Filter out empty results
valid_results = {k: v for k, v in results.items() if len(v) > 0}
if not valid_results:
return None, None
# Find the method closest to target
best_method = min(valid_results.keys(), key=lambda k: abs(len(valid_results[k]) - target_beats))
best_beats = valid_results[best_method]
logger.debug(f"Selected: {best_method} with {len(best_beats)} beats (closest to target)")
return best_beats, best_method
def get_kick_times(audio_path: str, min_interval: float = 1.0) -> np.ndarray:
"""
Detect kick drum hits (low frequency emphasis).
Kicks are the "boom" - usually the strongest low-end hits.
Args:
audio_path: Path to audio file
min_interval: Minimum time between kicks in seconds
Returns:
Array of kick drum timestamps in seconds
"""
y, sr = librosa.load(audio_path)
# Use percussive component separation
y_harmonic, y_percussive = librosa.effects.hpss(y, margin=2.0)
# Apply low-pass filter to focus on bass frequencies
y_bass = librosa.effects.percussive(y_percussive, margin=4.0)
# Get onset strength emphasizing low frequencies
onset_env = librosa.onset.onset_strength(
y=y_bass,
sr=sr,
aggregate=np.median,
fmax=200, # Focus on frequencies below 200Hz
n_mels=128
)
# Detect onsets with lower threshold for kicks
onset_frames = librosa.onset.onset_detect(
onset_envelope=onset_env,
sr=sr,
backtrack=False,
pre_max=3,
post_max=3,
pre_avg=3,
post_avg=5,
delta=0.15, # Very low threshold to catch more kicks
wait=8
)
kick_times = librosa.frames_to_time(onset_frames, sr=sr)
logger.debug(f"Raw kick detections: {len(kick_times)}")
# Filter to minimum interval
return _filter_by_min_interval(kick_times, min_interval)
def get_snare_times(audio_path: str, min_interval: float = 1.0) -> np.ndarray:
"""
Detect snare/clap hits (mid-high frequency emphasis).
Snares are the "crack" - sharp, crisp hits.
Args:
audio_path: Path to audio file
min_interval: Minimum time between snares in seconds
Returns:
Array of snare hit timestamps in seconds
"""
y, sr = librosa.load(audio_path)
# Use percussive component
y_harmonic, y_percussive = librosa.effects.hpss(y, margin=2.0)
# Get onset strength emphasizing mid-high frequencies
onset_env = librosa.onset.onset_strength(
y=y_percussive,
sr=sr,
aggregate=np.median,
fmin=150, # Focus on frequencies above 150Hz
fmax=4000,
n_mels=128
)
# Detect onsets
onset_frames = librosa.onset.onset_detect(
onset_envelope=onset_env,
sr=sr,
backtrack=False,
pre_max=3,
post_max=3,
pre_avg=3,
post_avg=5,
delta=0.15, # Very low threshold
wait=8
)
snare_times = librosa.frames_to_time(onset_frames, sr=sr)
logger.debug(f"Raw snare detections: {len(snare_times)}")
# Filter to minimum interval
return _filter_by_min_interval(snare_times, min_interval)
def get_downbeats(audio_path: str, min_interval: float = 1.0) -> np.ndarray:
"""
Detect downbeats - every Nth beat based on tempo.
More reliable than frequency filtering for finding the "1" count.
Args:
audio_path: Path to audio file
min_interval: Minimum time between downbeats in seconds
Returns:
Array of downbeat timestamps in seconds
"""
y, sr = librosa.load(audio_path)
# Get all beats first
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units='frames')
beat_times = librosa.frames_to_time(beat_frames, sr=sr)
# Handle tempo being array or scalar
tempo_val = tempo[0] if isinstance(tempo, np.ndarray) else tempo
logger.debug(f"Detected {len(beat_times)} total beats at {tempo_val:.1f} BPM")
if len(beat_times) == 0:
return np.array([])
# Most music is in 4/4 time, so take every 4th beat as downbeat
# Or every 2nd beat for half-time feel
beats_per_bar = 4
# If we have very few beats, use every 2nd
if len(beat_times) < 8:
beats_per_bar = 2
# Select every Nth beat
downbeat_indices = np.arange(0, len(beat_times), beats_per_bar)
downbeat_times = beat_times[downbeat_indices]
logger.debug(f"Selected {len(downbeat_times)} downbeats (every {beats_per_bar} beats)")
# Filter to minimum interval
return _filter_by_min_interval(downbeat_times, min_interval)
def get_general_beats(audio_path: str, min_interval: float = 1.0) -> np.ndarray:
"""
Fallback: Get general beat times (original method).
Args:
audio_path: Path to audio file
min_interval: Minimum time between beats in seconds
Returns:
Array of beat timestamps in seconds
"""
y, sr = librosa.load(audio_path)
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
beat_times = librosa.frames_to_time(beat_frames, sr=sr)
logger.debug(f"Tempo: {tempo} BPM")
logger.debug(f"Beat times: {beat_times}")
return _filter_by_min_interval(beat_times, min_interval)
def _filter_by_min_interval(times: np.ndarray, min_interval: float) -> np.ndarray:
"""Filter timestamps to ensure minimum interval between them."""
if len(times) == 0:
return times
filtered = [times[0]]
for t in times[1:]:
if t - filtered[-1] >= min_interval:
filtered.append(t)
return np.array(filtered)
def get_beat_times(audio_path: str, beat_type: str = "downbeat", min_interval: float = 1.0) -> np.ndarray:
"""
Get beat times based on specified drum element.
Args:
audio_path: Path to audio file
beat_type: One of "kick", "snare", "downbeat", or "general"
min_interval: Minimum time between beats in seconds
Returns:
Array of beat timestamps in seconds
Recommendation: Start with "downbeat" - it's the most reliable!
"""
logger.debug(f"Detecting {beat_type} beats with min_interval={min_interval}s...")
if beat_type == "kick":
result = get_kick_times(audio_path, min_interval)
elif beat_type == "snare":
result = get_snare_times(audio_path, min_interval)
elif beat_type == "downbeat":
result = get_downbeats(audio_path, min_interval)
elif beat_type == "general":
result = get_general_beats(audio_path, min_interval)
else:
raise ValueError(f"Unknown beat_type: {beat_type}. Use 'kick', 'snare', 'downbeat', or 'general'")
logger.debug(f"Final result: {len(result)} {beat_type} beats detected")
return result
def repeat_audio_ffmpeg(input_audio, output_audio, repeat: int):
"""
Repeat audio multiple times, removing leading/trailing silence before repeating.
Automatically determines the correct output format based on the file extension.
Args:
input_audio: Path to input audio file
output_audio: Path to output audio file (extension determines format)
repeat: Number of times to repeat the audio
Returns:
str: Path to the output file (may be modified if extension was incompatible)
"""
# Determine output format and codec from extension
output_ext = os.path.splitext(output_audio)[1].lower()
output_base = os.path.splitext(output_audio)[0]
# Map extensions to appropriate codec and container
format_map = {
'.mp3': {'codec': 'libmp3lame', 'bitrate': '192k'},
'.m4a': {'codec': 'aac', 'bitrate': '192k'},
'.aac': {'codec': 'aac', 'bitrate': '192k'},
'.opus': {'codec': 'libopus', 'bitrate': '128k'},
'.ogg': {'codec': 'libvorbis', 'bitrate': '192k'},
'.wav': {'codec': 'pcm_s16le', 'bitrate': None},
}
# Default to m4a if extension not recognized or if using aac with mp3
if output_ext not in format_map:
output_ext = '.m4a'
output_audio = output_base + output_ext
logger.debug(f"Unknown format, defaulting to: {output_audio}")
audio_config = format_map[output_ext]
# Create a temporary file for the silence-trimmed audio (use same format)
with tempfile.NamedTemporaryFile(suffix=output_ext, delete=False) as tmp:
temp_trimmed = tmp.name
try:
# Step 1: Remove leading AND trailing silence from the original audio
trim_cmd = [
"ffmpeg", "-y",
"-i", input_audio,
"-af", "silenceremove=start_periods=1:start_threshold=-50dB:start_duration=0:stop_periods=-1:stop_threshold=-50dB:stop_duration=0",
"-c:a", audio_config['codec']
]
# Add bitrate if applicable (not for WAV)
if audio_config['bitrate']:
trim_cmd.extend(["-b:a", audio_config['bitrate']])
trim_cmd.append(temp_trimmed)
result = subprocess.run(trim_cmd, check=True, capture_output=True, text=True)
# Step 2: Repeat the trimmed audio
repeat_cmd = [
"ffmpeg", "-y",
"-stream_loop", str(repeat - 1),
"-i", temp_trimmed,
"-c:a", audio_config['codec']
]
# Add bitrate if applicable
if audio_config['bitrate']:
repeat_cmd.extend(["-b:a", audio_config['bitrate']])
repeat_cmd.append(output_audio)
result = subprocess.run(repeat_cmd, check=True, capture_output=True, text=True)
logger.debug(f"Successfully repeated audio {repeat} times, output: {output_audio}")
return output_audio
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg error: STDOUT={e.stdout}, STDERR={e.stderr}")
raise
finally:
# Clean up temporary file
if os.path.exists(temp_trimmed):
os.remove(temp_trimmed)
def clean_and_drop_empty(
df: pd.DataFrame,
column: str,
extra_nulls: list[str] | None = None,
) -> pd.DataFrame:
"""
Normalize Google Sheets empty values and drop rows
where `column` is effectively empty.
Handles:
- NaN
- ""
- " "
- "nan", "None", "NULL", "N/A"
Args:
df: Input DataFrame
column: Column to validate (e.g. "VIDEO_LINK")
extra_nulls: Optional extra string values to treat as null
Returns:
Cleaned DataFrame with valid rows only
"""
if column not in df.columns:
raise KeyError(f"Column '{column}' not found in DataFrame")
null_values = ["", "nan", "none", "null", "n/a"]
if extra_nulls:
null_values.extend([v.lower() for v in extra_nulls])
df = df.copy()
df[column] = (
df[column]
.astype(str)
.str.strip()
# .str.lower()
.replace(null_values, np.nan)
)
return df.dropna(subset=[column])
def is_valid_video(path: str) -> bool:
if not os.path.exists(path):
return False
if os.path.getsize(path) < 100 * 1024: # <100KB = almost certainly invalid
return False
return True