""" Utility functions """ from __future__ import annotations from src.logger_config import logger, setup_logger import sys from pathlib import Path import subprocess import os import uuid import re import shutil import tempfile from src.config import get_config_value import json import traceback import cv2 import numpy as np import imagehash from PIL import Image from moviepy.editor import VideoFileClip import tempfile import librosa def get_temp_dir(prefix: str = "tmp_") -> Path: """ Creates a temp directory. Uses fixed path during test automation if configured. """ if get_config_value("test_automation"): base_dir = get_config_value("test_data_directory") if not base_dir: raise RuntimeError("TEST_DATA_DIRECTORY must be set when TEST_AUTOMATION=true") # Ensure base dir exists Path(base_dir).mkdir(parents=True, exist_ok=True) sub_dir = "output" if "download" in prefix: sub_dir = "downloads" path = Path(base_dir) / sub_dir path.mkdir(parents=True, exist_ok=True) return path return Path(tempfile.mkdtemp(prefix=prefix)) def get_video_duration(path: str) -> float: """ Returns the duration of a video file in seconds as a float. Uses ffprobe (very fast and accurate). """ cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "json", path ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) info = json.loads(result.stdout) return float(info["format"]["duration"]) def calculate_video_durations(selected_videos, all_tts_script_segment, word_level_segment, total_duration: float) -> None: """ Calculate and update duration for each video based on word-level segments. Uses three approaches in order of preference: 1. Simple word count matching (if counts align exactly) 2. Text matching with cleaning (if counts differ slightly) 3. Fuzzy matching (if words are missing or mismatched) """ try: # Get word counts all_script_words = all_tts_script_segment.split() def clean_word(word: str) -> str: return re.sub(r'[^a-zA-Z]', '', word).lower() cleaned_script_words = [clean_word(w) for w in all_script_words if clean_word(w)] cleaned_segment_words = [clean_word(seg.get("word", "")) for seg in word_level_segment if clean_word(seg.get("word", ""))] logger.debug(f"📊 Original: Script={len(all_script_words)} words, Segments={len(word_level_segment)} words") logger.debug(f"📊 Cleaned: Script={len(cleaned_script_words)} words, Segments={len(cleaned_segment_words)} words") logger.debug(f"⏱️ Total audio duration: {total_duration}s (starting from 0)") # APPROACH 1: Exact match (original word counts) if len(all_script_words) == len(word_level_segment): logger.debug("✅ Using APPROACH 1: Simple word count matching") calculate_durations_simple(selected_videos, word_level_segment, total_duration) # APPROACH 2: Cleaned match (cleaned word counts) elif len(cleaned_script_words) == len(cleaned_segment_words): logger.debug("✅ Using APPROACH 2: Text matching with cleaning") calculate_durations_with_text_matching(selected_videos, word_level_segment, total_duration) # APPROACH 3: Fuzzy match else: diff = abs(len(cleaned_script_words) - len(cleaned_segment_words)) logger.debug(f"⚠️ Word count mismatch after cleaning (diff: {diff})") logger.debug("🔍 Using APPROACH 3: Fuzzy matching") calculate_durations_with_fuzzy_matching(selected_videos, word_level_segment, total_duration) except Exception as e: logger.error(f"❌ Failed to calculate video durations: {e}") traceback.print_exc() # Fallback: set equal durations equal_duration = total_duration / len(selected_videos) for video in selected_videos: video["duration"] = round(equal_duration, 2) def calculate_durations_simple(selected_videos, word_level_segment, total_duration: float) -> None: """ APPROACH 1: Simple sequential matching when word counts align exactly. First video always starts at 0, last video always ends at total_duration. """ current_word_index = 0 for i, video in enumerate(selected_videos): tts_text = video.get("tts_script_segment", "").strip() if not tts_text: video["duration"] = 0 continue word_count = len(tts_text.split()) # Get start time for this segment if i == 0: start_time = 0.0 else: start_time = word_level_segment[current_word_index]["start_time"] # Calculate next word index next_word_index = current_word_index + word_count # Get end time if i + 1 == len(selected_videos): end_time = total_duration else: if next_word_index < len(word_level_segment): end_time = word_level_segment[next_word_index]["start_time"] else: end_time = total_duration # Calculate duration video["duration"] = round(end_time - start_time, 2) logger.debug(f" Video {i}: [{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s | '{tts_text[:40]}...'") # Move to next segment current_word_index = next_word_index # Verify total total_calculated = sum(v.get("duration", 0) for v in selected_videos) logger.debug(f"✅ Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)") def calculate_durations_with_text_matching(selected_videos, word_level_segment, total_duration: float) -> None: """ APPROACH 2: Text matching with cleaned/normalized text. Handles cases where word counts don't align due to numbers, punctuation, etc. First video always starts at 0, last video always ends at total_duration. """ def clean_word(word: str) -> str: """Clean a single word - remove numbers, special chars, keep only alpha""" return re.sub(r'[^a-zA-Z]', '', word).lower() # Build cleaned word list from word_level_segment cleaned_word_segments = [] for seg in word_level_segment: word = seg.get("word", "") cleaned = clean_word(word) if cleaned: cleaned_word_segments.append({ "cleaned": cleaned, "original": word, "start_time": seg.get("start_time", 0), "end_time": seg.get("end_time", 0) }) logger.debug(f"📝 Cleaned word segments: {len(cleaned_word_segments)} words") # Track current position in cleaned_word_segments current_word_index = 0 for i, video in enumerate(selected_videos): tts_text = video.get("tts_script_segment", "").strip() if not tts_text: video["duration"] = 0 continue # Clean the video's script segment video_words = tts_text.split() cleaned_video_words = [clean_word(w) for w in video_words if clean_word(w)] if not cleaned_video_words: video["duration"] = 0 continue word_count = len(cleaned_video_words) logger.debug(f" Video {i}: Looking for {word_count} cleaned words starting at index {current_word_index}") # Get start time if i == 0: start_time = 0.0 elif current_word_index < len(cleaned_word_segments): start_time = cleaned_word_segments[current_word_index]["start_time"] else: logger.warning(f" ⚠️ Out of word segments, using remaining time") remaining_videos = len(selected_videos) - i remaining_time = total_duration - (cleaned_word_segments[-1]["end_time"] if cleaned_word_segments else 0) video["duration"] = round(remaining_time / remaining_videos, 2) continue # Calculate next word index next_word_index = current_word_index + word_count # Get end time if i + 1 == len(selected_videos): end_time = total_duration else: if next_word_index < len(cleaned_word_segments): end_time = cleaned_word_segments[next_word_index]["start_time"] else: end_time = total_duration # Calculate duration video["duration"] = round(end_time - start_time, 2) logger.debug(f" ✅ [{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s | {word_count} words") # Move to next segment current_word_index = next_word_index # Verify total total_calculated = sum(v.get("duration", 0) for v in selected_videos) logger.debug(f"✅ Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)") def calculate_durations_with_fuzzy_matching(selected_videos, word_level_segment, total_duration: float) -> None: """ APPROACH 3: Fuzzy matching with flexible word alignment. Handles cases where words are missing, misspelled, or slightly different. First video always starts at 0, last video always ends at total_duration. """ from difflib import SequenceMatcher def clean_word(word: str) -> str: """Clean a single word - remove numbers, special chars, keep only alpha""" return re.sub(r'[^a-zA-Z]', '', word).lower() def similarity_ratio(word1: str, word2: str) -> float: """Calculate similarity between two words (0.0 to 1.0)""" if not word1 or not word2: return 0.0 return SequenceMatcher(None, word1, word2).ratio() # Build cleaned word list from word_level_segment cleaned_word_segments = [] for seg in word_level_segment: word = seg.get("word", "") cleaned = clean_word(word) if cleaned: cleaned_word_segments.append({ "cleaned": cleaned, "original": word, "start_time": seg.get("start_time", 0), "end_time": seg.get("end_time", 0) }) logger.debug(f"📝 Cleaned word segments: {len(cleaned_word_segments)} words") # Track current position in cleaned_word_segments current_word_index = 0 for i, video in enumerate(selected_videos): tts_text = video.get("tts_script_segment", "").strip() if not tts_text: video["duration"] = 0 continue # Clean the video's script segment video_words = tts_text.split() cleaned_video_words = [clean_word(w) for w in video_words if clean_word(w)] if not cleaned_video_words: video["duration"] = 0 continue word_count = len(cleaned_video_words) logger.debug(f" Video {i}: Fuzzy matching {word_count} words starting at index {current_word_index}") # Get start time if i == 0: start_time = 0.0 elif current_word_index < len(cleaned_word_segments): start_time = cleaned_word_segments[current_word_index]["start_time"] else: logger.warning(f" ⚠️ Out of word segments") remaining_videos = len(selected_videos) - i remaining_time = total_duration - (cleaned_word_segments[-1]["end_time"] if cleaned_word_segments else 0) video["duration"] = round(remaining_time / remaining_videos, 2) continue # FUZZY MATCHING: Match words with flexibility matched_count = 0 search_index = current_word_index last_matched_index = current_word_index - 1 for video_word in cleaned_video_words: found = False # Search within a window (next 5 words to avoid jumping too far) search_end = min(search_index + 5, len(cleaned_word_segments)) for j in range(search_index, search_end): segment_word = cleaned_word_segments[j]["cleaned"] # Exact match if video_word == segment_word: matched_count += 1 last_matched_index = j search_index = j + 1 found = True break # Substring match if video_word in segment_word or segment_word in video_word: matched_count += 1 last_matched_index = j search_index = j + 1 found = True logger.debug(f" Substring match: '{video_word}' ≈ '{segment_word}'") break # Fuzzy similarity match similarity = similarity_ratio(video_word, segment_word) if similarity >= 0.75: # 75% similarity threshold matched_count += 1 last_matched_index = j search_index = j + 1 found = True logger.debug(f" Fuzzy match: '{video_word}' ≈ '{segment_word}' (sim: {similarity:.2f})") break if not found: logger.debug(f" No match for '{video_word}'") # Determine end index if matched_count > 0: next_word_index = last_matched_index + 1 logger.debug(f" ✓ Matched {matched_count}/{word_count} words") else: logger.warning(f" ⚠️ No matches, estimating position") next_word_index = min(current_word_index + word_count, len(cleaned_word_segments)) # Get end time if i + 1 == len(selected_videos): end_time = total_duration else: if next_word_index < len(cleaned_word_segments): end_time = cleaned_word_segments[next_word_index]["start_time"] else: end_time = total_duration # Calculate duration video["duration"] = round(end_time - start_time, 2) logger.debug(f" ✅ [{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s") # Move to next segment current_word_index = next_word_index # Verify total total_calculated = sum(v.get("duration", 0) for v in selected_videos) logger.debug(f"✅ Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)") def is_video_loopable(video_path, frame_check_window=10, threshold=15.0): if not video_path: return False cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {video_path}") frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count <= frame_check_window * 2: cap.release() return False # Too short to judge loopability frame_indices = list(range(frame_check_window)) + \ list(range(frame_count - frame_check_window, frame_count)) frames = [] for idx in frame_indices: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ret, frame = cap.read() if not ret or frame is None: continue frame = cv2.resize(frame, (128, 128)) frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) cap.release() if len(frames) < 2 * frame_check_window: return False start_frames = np.array(frames[:frame_check_window]) end_frames = np.array(frames[-frame_check_window:]) diff = np.mean(np.abs(start_frames.astype(np.float32) - end_frames.astype(np.float32))) logger.debug(f"🔍 Mean frame difference: {diff:.2f}") return diff < threshold def is_loopable_phash(video_path, hash_diff_threshold=8): if not video_path: return False cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {video_path}") frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count < 2: cap.release() return False # Read first frame cap.set(cv2.CAP_PROP_POS_FRAMES, 0) ret, start = cap.read() if not ret or start is None: cap.release() return False # Try to read last valid frame (with fallback) last_frame_index = frame_count - 1 ret, end = False, None while not ret and last_frame_index > frame_count - 10: cap.set(cv2.CAP_PROP_POS_FRAMES, last_frame_index) ret, end = cap.read() last_frame_index -= 1 cap.release() if end is None or not ret: return False start_hash = imagehash.phash(Image.fromarray(cv2.cvtColor(start, cv2.COLOR_BGR2RGB))) end_hash = imagehash.phash(Image.fromarray(cv2.cvtColor(end, cv2.COLOR_BGR2RGB))) diff = abs(start_hash - end_hash) logger.debug(f"🧩 pHash difference: {diff}") return diff < hash_diff_threshold def is_video_zoomable_tail(video_path, tail_seconds=1, sample_frames=15, motion_threshold=1.5): """ Checks only the *last few seconds* of the video to see if it's already zooming. Returns True if mostly static (safe to add zoom), False if motion already exists. """ return False cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps if fps > 0 else 0 # Only analyze the last N seconds start_frame = max(total_frames - int(tail_seconds * fps), 0) cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) frames = [] while True: ret, frame = cap.read() if not ret: break frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) cap.release() if len(frames) < 2: return True # too few frames → assume static # Sample frames evenly step = max(len(frames) // sample_frames, 1) total_motion = 0 motion_samples = 0 for i in range(0, len(frames) - step, step): prev_gray = frames[i] gray = frames[i + step] flow = cv2.calcOpticalFlowFarneback( prev_gray, gray, None, pyr_scale=0.5, levels=3, winsize=15, iterations=3, poly_n=5, poly_sigma=1.2, flags=0 ) mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) total_motion += np.mean(mag) motion_samples += 1 avg_motion = total_motion / motion_samples if motion_samples else 0 logger.debug(f"🎥 Tail motion magnitude: {avg_motion:.3f}") # If low optical flow in tail section → it's safe to add zoom return avg_motion < motion_threshold def selective_update_with_keymaps(source: dict, modified: dict, source_keys: list, modified_keys: list) -> dict: """ Update 'source' dict by copying values from 'modified' dict. Each pair (source_key, modified_key) defines a mapping. Example: source_keys = ["url", "description"] modified_keys = ["video_url", "desc_text"] """ updated = source.copy() for s_key, m_key in zip(source_keys, modified_keys): if m_key in modified: updated[s_key] = modified[m_key] return updated def clean_tts_script(tts_script: str) -> str: """Split and clean TTS script joined by '-'.""" if tts_script: # Split by hyphen and strip spaces parts = [part.strip() for part in tts_script.split('-') if part.strip()] return " ".join(parts).rstrip(".") return "" def reverse_clip(path_or_clip) -> str: input_path = "" # ✅ Handle any MoviePy clip (VideoFileClip, CompositeVideoClip, etc.) if hasattr(path_or_clip, "write_videofile"): with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_input: input_path = temp_input.name path_or_clip.write_videofile( input_path, codec="libx264", audio_codec="aac", verbose=False, logger=None, fps=25 ) elif isinstance(path_or_clip, str): input_path = path_or_clip """Reverse both video and audio using ffmpeg.""" out_path = os.path.join(tempfile.gettempdir(), f"{uuid.uuid4().hex[:8]}_reversed.mp4") subprocess.run([ "ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-i", input_path, "-vf", "reverse", "-af", "areverse", out_path ], check=True) return out_path def interpolate_video(input_path: str, target_duration: float = 4.0, fps: int = 60) -> str: """ Smoothly extend a short video using motion interpolation. Works entirely on CPU (no GPU required). Args: input_path: path to input video target_duration: desired output length (seconds) fps: target output framerate (default 60) """ return None # Get actual duration cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", input_path] duration_str = subprocess.check_output(cmd).decode().strip() duration = float(duration_str) # Calculate how much we need to stretch stretch_factor = target_duration / duration # Output file base = os.path.splitext(os.path.basename(input_path))[0] output_path = f"/tmp/{base}_interp.mp4" # FFmpeg motion interpolation command cmd = [ "ffmpeg", "-i", input_path, "-filter_complex", f"[0:v]setpts={stretch_factor}*PTS," f"minterpolate='mi_mode=mci:mc_mode=aobmc:vsbmc=1:fps={fps}'[v]", "-map", "[v]", "-an", # remove audio "-c:v", "libx264", "-preset", "fast", "-crf", "18", "-y", output_path ] subprocess.run(cmd, check=True) return output_path def _get_video_resolution(path: str) -> tuple[int, int]: cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "json", path, ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: raise RuntimeError(f"ffprobe failed:\n{result.stderr.decode()}") info = json.loads(result.stdout) stream = info["streams"][0] return stream["width"], stream["height"] def _get_pixel_format(path: str) -> str: cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=pix_fmt", "-of", "json", path, ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: return "" info = json.loads(result.stdout) return info["streams"][0].get("pix_fmt", "") def resize_video(input_path: str, target_width: int = 1080, target_height: int = 1920, overwrite: bool = False, force: bool = False) -> str: """ Resize a video to the given resolution (default 1080x1920) using FFmpeg. If overwrite=True, replaces the original file safely after successful conversion. If force=True, re-encodes even if the resolution already matches. """ if not os.path.exists(input_path): raise FileNotFoundError(f"Input video not found: {input_path}") # 🔍 Probe resolution width, height = _get_video_resolution(input_path) pix_fmt = _get_pixel_format(input_path) # Check if we can skip: # 1. Force is False # 2. Dimensions match # 3. Pixel format is yuv420p (required for broad compatibility) if not force and width == target_width and height == target_height and pix_fmt == "yuv420p": logger.debug( f"Skipping resize (already {width}x{height}, {pix_fmt}): {os.path.basename(input_path)}" ) return input_path logger.debug( f"Resizing/Re-encoding {os.path.basename(input_path)} " f"({width}x{height}, {pix_fmt}) → ({target_width}x{target_height}, yuv420p)" ) temp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}.mp4") # FFmpeg resize command (output goes to /tmp first) # FFmpeg command for Crop-to-Fill (Strict 9:16 enforcement) # scale=1080:1920:force_original_aspect_ratio=increase ensures min dim fits # crop=1080:1920 crops the excess cmd = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", input_path, "-vf", f"scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height},setsar=1", "-c:v", "libx264", "-crf", "18", "-preset", "slow", "-pix_fmt", "yuv420p", "-c:a", "copy", temp_output ] # Run FFmpeg process result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: if os.path.exists(temp_output): os.remove(temp_output) raise RuntimeError(f"FFmpeg failed:\n{result.stderr.decode('utf-8', errors='ignore')}") # Overwrite original safely if requested if overwrite: shutil.move(temp_output, input_path) return input_path return temp_output def remove_black_padding(input_path: str, overwrite: bool = False, threshold_pct: float = 0.1) -> str: """ Automatically detect and remove black padding (crop only) using FFmpeg. Saves to /tmp with a unique UUID filename unless overwrite=True. Args: input_path (str): Path to the input video. overwrite (bool): If True, safely replace the original file. threshold_pct (float): Only crop if black padding > threshold_pct (0.0 to 1.0). 0.0 = always crop if any padding detected. Returns: str: Path to the cropped video (or original if no crop needed). """ if get_config_value("test_automation"): return input_path if not os.path.exists(input_path): raise FileNotFoundError(f"Input video not found: {input_path}") # Step 1: Detect crop parameters using cropdetect detect_cmd = [ "ffmpeg", "-i", input_path, "-vf", "cropdetect=24:16:0", "-frames:v", "500", "-f", "null", "-" ] result = subprocess.run(detect_cmd, stderr=subprocess.PIPE, text=True) matches = re.findall(r"crop=\S+", result.stderr) if not matches: logger.debug("No black padding detected.") return input_path # Get most frequent crop value crop_value = max(set(matches), key=matches.count) # Parse crop string: crop=w:h:x:y # Example: crop=1080:1520:0:200 try: match = re.search(r"crop=(\d+):(\d+):(\d+):(\d+)", crop_value) if match: c_w, c_h, _, _ = map(int, match.groups()) # Get original resolution orig_w, orig_h = _get_video_resolution(input_path) orig_area = orig_w * orig_h crop_area = c_w * c_h padding_area = orig_area - crop_area padding_pct = padding_area / orig_area if orig_area > 0 else 0 if padding_pct < threshold_pct: logger.debug(f"Skipping crop: Padding {padding_pct:.1%} < Threshold {threshold_pct:.1%}") return input_path logger.debug(f"Detected crop: {crop_value} (Padding: {padding_pct:.1%})") except Exception as e: logger.warning(f"Could not parse crop value '{crop_value}' for threshold check: {e}") # Proceed with cropping if parsing fails, or return? # Safest is to proceed as before or log and continue. # Let's proceed to maintain existing behavior on failure unless explicitly stopped. logger.debug(f"Proceeding with crop: {crop_value}") # Step 2: Create temp output file tmp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_cropped.mp4") # Step 3: Run FFmpeg crop command crop_cmd = ["ffmpeg", "-y", "-i", input_path, "-vf", crop_value, "-c:a", "copy", tmp_output] crop_proc = subprocess.run(crop_cmd, stderr=subprocess.PIPE, text=True) if crop_proc.returncode != 0: raise RuntimeError(f"FFmpeg crop failed:\n{crop_proc.stderr}") # Step 4: Handle overwrite safely if overwrite: shutil.move(tmp_output, input_path) return input_path return tmp_output def trim_black_frames( input_path: str, overwrite: bool = False, black_threshold: int = 20, min_frames_to_trim: int = 1, max_frames_to_trim: int = 30 ) -> str: """ Detect and remove solid black frames from the start and end of a video. Uses FFmpeg showinfo filter to analyze frame luminance (Y channel mean). A frame is considered black if its Y mean is <= black_threshold. Args: input_path: Path to the input video overwrite: If True, replace the original file black_threshold: Maximum Y luminance value to consider a frame as black (0-255) Default 20 catches pure black (16) with some tolerance min_frames_to_trim: Minimum black frames at start/end to trigger trimming max_frames_to_trim: Maximum frames to check at start/end Returns: Path to the trimmed video, or original path if no trimming needed """ if not os.path.exists(input_path): raise FileNotFoundError(f"Input video not found: {input_path}") # Get video info probe_cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=nb_frames,r_frame_rate,duration", "-show_entries", "format=duration", "-of", "json", input_path ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) if probe_result.returncode != 0: logger.warning(f"Failed to probe video: {input_path}") return input_path probe_data = json.loads(probe_result.stdout) # Get FPS fps_str = probe_data.get("streams", [{}])[0].get("r_frame_rate", "25/1") fps_parts = fps_str.split("/") fps = float(fps_parts[0]) / float(fps_parts[1]) if len(fps_parts) == 2 else float(fps_parts[0]) # Get total duration duration = float(probe_data.get("format", {}).get("duration", 0)) if duration == 0: duration = float(probe_data.get("streams", [{}])[0].get("duration", 0)) if duration <= 0: logger.warning(f"Could not determine video duration: {input_path}") return input_path # Analyze first N frames for black frames at start start_black_frames = _count_black_frames_at_position( input_path, "start", max_frames_to_trim, black_threshold, fps ) # Analyze last N frames for black frames at end end_black_frames = _count_black_frames_at_position( input_path, "end", max_frames_to_trim, black_threshold, fps, duration ) logger.debug(f"🎬 Black frame analysis: start={start_black_frames}, end={end_black_frames}") # Check if trimming is needed if start_black_frames < min_frames_to_trim and end_black_frames < min_frames_to_trim: logger.debug(f"✅ No black frames to trim in: {os.path.basename(input_path)}") return input_path # Calculate trim times start_trim_time = start_black_frames / fps if start_black_frames >= min_frames_to_trim else 0 end_trim_time = end_black_frames / fps if end_black_frames >= min_frames_to_trim else 0 # New duration after trimming new_duration = duration - start_trim_time - end_trim_time if new_duration <= 0.1: logger.warning(f"⚠️ Trimming would remove entire video, skipping: {input_path}") return input_path logger.debug( f"✂️ Trimming black frames: {os.path.basename(input_path)} " f"(start: {start_trim_time:.3f}s, end: {end_trim_time:.3f}s)" ) # Generate output path temp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_trimmed.mp4") # Build FFmpeg command cmd = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-ss", str(start_trim_time), "-i", input_path, "-t", str(new_duration), "-c:v", "libx264", "-preset", "fast", "-crf", "18", "-pix_fmt", "yuv420p", "-c:a", "copy", temp_output ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.error(f"FFmpeg trim failed: {result.stderr}") return input_path logger.debug(f"✅ Trimmed video saved: {temp_output}") # Handle overwrite if overwrite: shutil.move(temp_output, input_path) return input_path return temp_output def _count_black_frames_at_position( video_path: str, position: str, # "start" or "end" max_frames: int, black_threshold: int, fps: float, duration: float = 0 ) -> int: """ Count consecutive black frames at the start or end of a video. Args: video_path: Path to video file position: "start" or "end" max_frames: Maximum frames to analyze black_threshold: Y luminance threshold for black detection fps: Video frame rate duration: Video duration (required for "end" position) Returns: Number of consecutive black frames at the specified position """ # For start: analyze first max_frames frames # For end: seek to near end and analyze last max_frames frames if position == "end" and duration > 0: seek_time = max(0, duration - (max_frames / fps) - 0.5) ss_arg = ["-ss", str(seek_time)] else: ss_arg = [] # Use showinfo filter to get frame luminance cmd = [ "ffmpeg", "-hide_banner", *ss_arg, "-i", video_path, "-vf", f"select='lte(n,{max_frames})',showinfo", "-frames:v", str(max_frames + 5), "-f", "null", "-" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: return 0 # Parse showinfo output for mean values # Format: mean:[Y U V] where Y is luminance # A pure black frame has Y=16 in YUV (limited range) frame_means = [] for line in result.stderr.split('\n'): match = re.search(r'mean:\[(\d+)\s+\d+\s+\d+\]', line) if match: y_mean = int(match.group(1)) frame_means.append(y_mean) if not frame_means: return 0 # Count consecutive black frames if position == "start": # Count from beginning black_count = 0 for y_mean in frame_means: if y_mean <= black_threshold: black_count += 1 else: break return black_count else: # Count from end (reverse) black_count = 0 for y_mean in reversed(frame_means): if y_mean <= black_threshold: black_count += 1 else: break return black_count def ratio_1x1_to9x16(video_path, overwrite=False): """ Convert a 1:1 video to 9:16 by adding blurred padding using FFmpeg. Saves to /tmp with a unique UUID filename unless overwrite=True. Args: video_path (str): Path to the input video. overwrite (bool): If True, safely replace the original file. Returns: str: Path to the converted video. """ if not os.path.exists(video_path): raise FileNotFoundError(f"Input video not found: {video_path}") tmp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_9x16.mp4") cmd = [ "ffmpeg", "-i", video_path, "-vf", "crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080,pad=1080:1920:0:(1920-1080)/2:black", "-c:a", "copy", "-y", tmp_output ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: raise RuntimeError(f"FFmpeg failed:\n{result.stderr.decode('utf-8', errors='ignore')}") if overwrite: shutil.move(tmp_output, video_path) return video_path return tmp_output def get_best_beat_method(audio_path: str, min_interval: float = 1.0, target_beats: int = 10) -> tuple[np.ndarray, str]: """ Try all beat detection methods and return the one with closest to target number of beats. Args: audio_path: Path to audio file min_interval: Minimum time between beats in seconds target_beats: Desired number of beats (default 10 for 10-15 sec videos) Returns: Tuple of (beat_times, method_name) """ methods = ["kick", "snare", "downbeat", "general"] results = {} logger.debug(f"Testing all beat detection methods (target: ~{target_beats} beats)...") for method in methods: try: beat_times = get_beat_times(audio_path, beat_type=method, min_interval=min_interval) results[method] = beat_times logger.debug(f"{method:12s}: {len(beat_times):2d} beats detected") except Exception as e: logger.debug(f"{method:12s}: ERROR - {e}") results[method] = np.array([]) # Filter out empty results valid_results = {k: v for k, v in results.items() if len(v) > 0} if not valid_results: return None, None # Find the method closest to target best_method = min(valid_results.keys(), key=lambda k: abs(len(valid_results[k]) - target_beats)) best_beats = valid_results[best_method] logger.debug(f"Selected: {best_method} with {len(best_beats)} beats (closest to target)") return best_beats, best_method def get_kick_times(audio_path: str, min_interval: float = 1.0) -> np.ndarray: """ Detect kick drum hits (low frequency emphasis). Kicks are the "boom" - usually the strongest low-end hits. Args: audio_path: Path to audio file min_interval: Minimum time between kicks in seconds Returns: Array of kick drum timestamps in seconds """ y, sr = librosa.load(audio_path) # Use percussive component separation y_harmonic, y_percussive = librosa.effects.hpss(y, margin=2.0) # Apply low-pass filter to focus on bass frequencies y_bass = librosa.effects.percussive(y_percussive, margin=4.0) # Get onset strength emphasizing low frequencies onset_env = librosa.onset.onset_strength( y=y_bass, sr=sr, aggregate=np.median, fmax=200, # Focus on frequencies below 200Hz n_mels=128 ) # Detect onsets with lower threshold for kicks onset_frames = librosa.onset.onset_detect( onset_envelope=onset_env, sr=sr, backtrack=False, pre_max=3, post_max=3, pre_avg=3, post_avg=5, delta=0.15, # Very low threshold to catch more kicks wait=8 ) kick_times = librosa.frames_to_time(onset_frames, sr=sr) logger.debug(f"Raw kick detections: {len(kick_times)}") # Filter to minimum interval return _filter_by_min_interval(kick_times, min_interval) def get_snare_times(audio_path: str, min_interval: float = 1.0) -> np.ndarray: """ Detect snare/clap hits (mid-high frequency emphasis). Snares are the "crack" - sharp, crisp hits. Args: audio_path: Path to audio file min_interval: Minimum time between snares in seconds Returns: Array of snare hit timestamps in seconds """ y, sr = librosa.load(audio_path) # Use percussive component y_harmonic, y_percussive = librosa.effects.hpss(y, margin=2.0) # Get onset strength emphasizing mid-high frequencies onset_env = librosa.onset.onset_strength( y=y_percussive, sr=sr, aggregate=np.median, fmin=150, # Focus on frequencies above 150Hz fmax=4000, n_mels=128 ) # Detect onsets onset_frames = librosa.onset.onset_detect( onset_envelope=onset_env, sr=sr, backtrack=False, pre_max=3, post_max=3, pre_avg=3, post_avg=5, delta=0.15, # Very low threshold wait=8 ) snare_times = librosa.frames_to_time(onset_frames, sr=sr) logger.debug(f"Raw snare detections: {len(snare_times)}") # Filter to minimum interval return _filter_by_min_interval(snare_times, min_interval) def get_downbeats(audio_path: str, min_interval: float = 1.0) -> np.ndarray: """ Detect downbeats - every Nth beat based on tempo. More reliable than frequency filtering for finding the "1" count. Args: audio_path: Path to audio file min_interval: Minimum time between downbeats in seconds Returns: Array of downbeat timestamps in seconds """ y, sr = librosa.load(audio_path) # Get all beats first tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units='frames') beat_times = librosa.frames_to_time(beat_frames, sr=sr) # Handle tempo being array or scalar tempo_val = tempo[0] if isinstance(tempo, np.ndarray) else tempo logger.debug(f"Detected {len(beat_times)} total beats at {tempo_val:.1f} BPM") if len(beat_times) == 0: return np.array([]) # Most music is in 4/4 time, so take every 4th beat as downbeat # Or every 2nd beat for half-time feel beats_per_bar = 4 # If we have very few beats, use every 2nd if len(beat_times) < 8: beats_per_bar = 2 # Select every Nth beat downbeat_indices = np.arange(0, len(beat_times), beats_per_bar) downbeat_times = beat_times[downbeat_indices] logger.debug(f"Selected {len(downbeat_times)} downbeats (every {beats_per_bar} beats)") # Filter to minimum interval return _filter_by_min_interval(downbeat_times, min_interval) def get_general_beats(audio_path: str, min_interval: float = 1.0) -> np.ndarray: """ Fallback: Get general beat times (original method). Args: audio_path: Path to audio file min_interval: Minimum time between beats in seconds Returns: Array of beat timestamps in seconds """ y, sr = librosa.load(audio_path) tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr) beat_times = librosa.frames_to_time(beat_frames, sr=sr) logger.debug(f"Tempo: {tempo} BPM") logger.debug(f"Beat times: {beat_times}") return _filter_by_min_interval(beat_times, min_interval) def _filter_by_min_interval(times: np.ndarray, min_interval: float) -> np.ndarray: """Filter timestamps to ensure minimum interval between them.""" if len(times) == 0: return times filtered = [times[0]] for t in times[1:]: if t - filtered[-1] >= min_interval: filtered.append(t) return np.array(filtered) def get_beat_times(audio_path: str, beat_type: str = "downbeat", min_interval: float = 1.0) -> np.ndarray: """ Get beat times based on specified drum element. Args: audio_path: Path to audio file beat_type: One of "kick", "snare", "downbeat", or "general" min_interval: Minimum time between beats in seconds Returns: Array of beat timestamps in seconds Recommendation: Start with "downbeat" - it's the most reliable! """ logger.debug(f"Detecting {beat_type} beats with min_interval={min_interval}s...") if beat_type == "kick": result = get_kick_times(audio_path, min_interval) elif beat_type == "snare": result = get_snare_times(audio_path, min_interval) elif beat_type == "downbeat": result = get_downbeats(audio_path, min_interval) elif beat_type == "general": result = get_general_beats(audio_path, min_interval) else: raise ValueError(f"Unknown beat_type: {beat_type}. Use 'kick', 'snare', 'downbeat', or 'general'") logger.debug(f"Final result: {len(result)} {beat_type} beats detected") return result def repeat_audio_ffmpeg(input_audio, output_audio, repeat: int): """ Repeat audio multiple times, removing leading/trailing silence before repeating. Automatically determines the correct output format based on the file extension. Args: input_audio: Path to input audio file output_audio: Path to output audio file (extension determines format) repeat: Number of times to repeat the audio Returns: str: Path to the output file (may be modified if extension was incompatible) """ # Determine output format and codec from extension output_ext = os.path.splitext(output_audio)[1].lower() output_base = os.path.splitext(output_audio)[0] # Map extensions to appropriate codec and container format_map = { '.mp3': {'codec': 'libmp3lame', 'bitrate': '192k'}, '.m4a': {'codec': 'aac', 'bitrate': '192k'}, '.aac': {'codec': 'aac', 'bitrate': '192k'}, '.opus': {'codec': 'libopus', 'bitrate': '128k'}, '.ogg': {'codec': 'libvorbis', 'bitrate': '192k'}, '.wav': {'codec': 'pcm_s16le', 'bitrate': None}, } # Default to m4a if extension not recognized or if using aac with mp3 if output_ext not in format_map: output_ext = '.m4a' output_audio = output_base + output_ext logger.debug(f"Unknown format, defaulting to: {output_audio}") audio_config = format_map[output_ext] # Create a temporary file for the silence-trimmed audio (use same format) with tempfile.NamedTemporaryFile(suffix=output_ext, delete=False) as tmp: temp_trimmed = tmp.name try: # Step 1: Remove leading AND trailing silence from the original audio trim_cmd = [ "ffmpeg", "-y", "-i", input_audio, "-af", "silenceremove=start_periods=1:start_threshold=-50dB:start_duration=0:stop_periods=-1:stop_threshold=-50dB:stop_duration=0", "-c:a", audio_config['codec'] ] # Add bitrate if applicable (not for WAV) if audio_config['bitrate']: trim_cmd.extend(["-b:a", audio_config['bitrate']]) trim_cmd.append(temp_trimmed) result = subprocess.run(trim_cmd, check=True, capture_output=True, text=True) # Step 2: Repeat the trimmed audio repeat_cmd = [ "ffmpeg", "-y", "-stream_loop", str(repeat - 1), "-i", temp_trimmed, "-c:a", audio_config['codec'] ] # Add bitrate if applicable if audio_config['bitrate']: repeat_cmd.extend(["-b:a", audio_config['bitrate']]) repeat_cmd.append(output_audio) result = subprocess.run(repeat_cmd, check=True, capture_output=True, text=True) logger.debug(f"Successfully repeated audio {repeat} times, output: {output_audio}") return output_audio except subprocess.CalledProcessError as e: logger.error(f"FFmpeg error: STDOUT={e.stdout}, STDERR={e.stderr}") raise finally: # Clean up temporary file if os.path.exists(temp_trimmed): os.remove(temp_trimmed) def clean_and_drop_empty( df: pd.DataFrame, column: str, extra_nulls: list[str] | None = None, ) -> pd.DataFrame: """ Normalize Google Sheets empty values and drop rows where `column` is effectively empty. Handles: - NaN - "" - " " - "nan", "None", "NULL", "N/A" Args: df: Input DataFrame column: Column to validate (e.g. "VIDEO_LINK") extra_nulls: Optional extra string values to treat as null Returns: Cleaned DataFrame with valid rows only """ if column not in df.columns: raise KeyError(f"Column '{column}' not found in DataFrame") null_values = ["", "nan", "none", "null", "n/a"] if extra_nulls: null_values.extend([v.lower() for v in extra_nulls]) df = df.copy() df[column] = ( df[column] .astype(str) .str.strip() # .str.lower() .replace(null_values, np.nan) ) return df.dropna(subset=[column]) def is_valid_video(path: str) -> bool: if not os.path.exists(path): return False if os.path.getsize(path) < 100 * 1024: # <100KB = almost certainly invalid return False return True