|
|
""" |
|
|
Utility functions |
|
|
""" |
|
|
from __future__ import annotations |
|
|
|
|
|
from src.logger_config import logger, setup_logger |
|
|
import sys |
|
|
from pathlib import Path |
|
|
import subprocess |
|
|
import os |
|
|
import uuid |
|
|
import re |
|
|
import shutil |
|
|
import tempfile |
|
|
from src.config import get_config_value |
|
|
import json |
|
|
import traceback |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import imagehash |
|
|
from PIL import Image |
|
|
from moviepy.editor import VideoFileClip |
|
|
import tempfile |
|
|
import librosa |
|
|
|
|
|
def get_temp_dir(prefix: str = "tmp_") -> Path: |
|
|
""" |
|
|
Creates a temp directory. |
|
|
Uses fixed path during test automation if configured. |
|
|
""" |
|
|
if get_config_value("test_automation"): |
|
|
base_dir = get_config_value("test_data_directory") |
|
|
if not base_dir: |
|
|
raise RuntimeError("TEST_DATA_DIRECTORY must be set when TEST_AUTOMATION=true") |
|
|
|
|
|
|
|
|
Path(base_dir).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
sub_dir = "output" |
|
|
if "download" in prefix: |
|
|
sub_dir = "downloads" |
|
|
|
|
|
path = Path(base_dir) / sub_dir |
|
|
path.mkdir(parents=True, exist_ok=True) |
|
|
return path |
|
|
|
|
|
return Path(tempfile.mkdtemp(prefix=prefix)) |
|
|
|
|
|
def get_video_duration(path: str) -> float: |
|
|
""" |
|
|
Returns the duration of a video file in seconds as a float. |
|
|
Uses ffprobe (very fast and accurate). |
|
|
""" |
|
|
cmd = [ |
|
|
"ffprobe", "-v", "error", |
|
|
"-select_streams", "v:0", |
|
|
"-show_entries", "format=duration", |
|
|
"-of", "json", |
|
|
path |
|
|
] |
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
|
|
info = json.loads(result.stdout) |
|
|
return float(info["format"]["duration"]) |
|
|
|
|
|
def calculate_video_durations(selected_videos, all_tts_script_segment, word_level_segment, total_duration: float) -> None: |
|
|
""" |
|
|
Calculate and update duration for each video based on word-level segments. |
|
|
Uses three approaches in order of preference: |
|
|
1. Simple word count matching (if counts align exactly) |
|
|
2. Text matching with cleaning (if counts differ slightly) |
|
|
3. Fuzzy matching (if words are missing or mismatched) |
|
|
""" |
|
|
try: |
|
|
|
|
|
all_script_words = all_tts_script_segment.split() |
|
|
|
|
|
def clean_word(word: str) -> str: |
|
|
return re.sub(r'[^a-zA-Z]', '', word).lower() |
|
|
|
|
|
cleaned_script_words = [clean_word(w) for w in all_script_words if clean_word(w)] |
|
|
cleaned_segment_words = [clean_word(seg.get("word", "")) for seg in word_level_segment if clean_word(seg.get("word", ""))] |
|
|
|
|
|
logger.debug(f"π Original: Script={len(all_script_words)} words, Segments={len(word_level_segment)} words") |
|
|
logger.debug(f"π Cleaned: Script={len(cleaned_script_words)} words, Segments={len(cleaned_segment_words)} words") |
|
|
logger.debug(f"β±οΈ Total audio duration: {total_duration}s (starting from 0)") |
|
|
|
|
|
|
|
|
if len(all_script_words) == len(word_level_segment): |
|
|
logger.debug("β
Using APPROACH 1: Simple word count matching") |
|
|
calculate_durations_simple(selected_videos, word_level_segment, total_duration) |
|
|
|
|
|
|
|
|
elif len(cleaned_script_words) == len(cleaned_segment_words): |
|
|
logger.debug("β
Using APPROACH 2: Text matching with cleaning") |
|
|
calculate_durations_with_text_matching(selected_videos, word_level_segment, total_duration) |
|
|
|
|
|
|
|
|
else: |
|
|
diff = abs(len(cleaned_script_words) - len(cleaned_segment_words)) |
|
|
logger.debug(f"β οΈ Word count mismatch after cleaning (diff: {diff})") |
|
|
logger.debug("π Using APPROACH 3: Fuzzy matching") |
|
|
calculate_durations_with_fuzzy_matching(selected_videos, word_level_segment, total_duration) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to calculate video durations: {e}") |
|
|
traceback.print_exc() |
|
|
|
|
|
equal_duration = total_duration / len(selected_videos) |
|
|
for video in selected_videos: |
|
|
video["duration"] = round(equal_duration, 2) |
|
|
|
|
|
|
|
|
def calculate_durations_simple(selected_videos, word_level_segment, total_duration: float) -> None: |
|
|
""" |
|
|
APPROACH 1: Simple sequential matching when word counts align exactly. |
|
|
First video always starts at 0, last video always ends at total_duration. |
|
|
""" |
|
|
current_word_index = 0 |
|
|
|
|
|
for i, video in enumerate(selected_videos): |
|
|
tts_text = video.get("tts_script_segment", "").strip() |
|
|
|
|
|
if not tts_text: |
|
|
video["duration"] = 0 |
|
|
continue |
|
|
|
|
|
word_count = len(tts_text.split()) |
|
|
|
|
|
|
|
|
if i == 0: |
|
|
start_time = 0.0 |
|
|
else: |
|
|
start_time = word_level_segment[current_word_index]["start_time"] |
|
|
|
|
|
|
|
|
next_word_index = current_word_index + word_count |
|
|
|
|
|
|
|
|
if i + 1 == len(selected_videos): |
|
|
end_time = total_duration |
|
|
else: |
|
|
if next_word_index < len(word_level_segment): |
|
|
end_time = word_level_segment[next_word_index]["start_time"] |
|
|
else: |
|
|
end_time = total_duration |
|
|
|
|
|
|
|
|
video["duration"] = round(end_time - start_time, 2) |
|
|
logger.debug(f" Video {i}: [{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s | '{tts_text[:40]}...'") |
|
|
|
|
|
|
|
|
current_word_index = next_word_index |
|
|
|
|
|
|
|
|
total_calculated = sum(v.get("duration", 0) for v in selected_videos) |
|
|
logger.debug(f"β
Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)") |
|
|
|
|
|
|
|
|
def calculate_durations_with_text_matching(selected_videos, word_level_segment, total_duration: float) -> None: |
|
|
""" |
|
|
APPROACH 2: Text matching with cleaned/normalized text. |
|
|
Handles cases where word counts don't align due to numbers, punctuation, etc. |
|
|
First video always starts at 0, last video always ends at total_duration. |
|
|
""" |
|
|
def clean_word(word: str) -> str: |
|
|
"""Clean a single word - remove numbers, special chars, keep only alpha""" |
|
|
return re.sub(r'[^a-zA-Z]', '', word).lower() |
|
|
|
|
|
|
|
|
cleaned_word_segments = [] |
|
|
for seg in word_level_segment: |
|
|
word = seg.get("word", "") |
|
|
cleaned = clean_word(word) |
|
|
if cleaned: |
|
|
cleaned_word_segments.append({ |
|
|
"cleaned": cleaned, |
|
|
"original": word, |
|
|
"start_time": seg.get("start_time", 0), |
|
|
"end_time": seg.get("end_time", 0) |
|
|
}) |
|
|
|
|
|
logger.debug(f"π Cleaned word segments: {len(cleaned_word_segments)} words") |
|
|
|
|
|
|
|
|
current_word_index = 0 |
|
|
|
|
|
for i, video in enumerate(selected_videos): |
|
|
tts_text = video.get("tts_script_segment", "").strip() |
|
|
|
|
|
if not tts_text: |
|
|
video["duration"] = 0 |
|
|
continue |
|
|
|
|
|
|
|
|
video_words = tts_text.split() |
|
|
cleaned_video_words = [clean_word(w) for w in video_words if clean_word(w)] |
|
|
|
|
|
if not cleaned_video_words: |
|
|
video["duration"] = 0 |
|
|
continue |
|
|
|
|
|
word_count = len(cleaned_video_words) |
|
|
logger.debug(f" Video {i}: Looking for {word_count} cleaned words starting at index {current_word_index}") |
|
|
|
|
|
|
|
|
if i == 0: |
|
|
start_time = 0.0 |
|
|
elif current_word_index < len(cleaned_word_segments): |
|
|
start_time = cleaned_word_segments[current_word_index]["start_time"] |
|
|
else: |
|
|
logger.warning(f" β οΈ Out of word segments, using remaining time") |
|
|
remaining_videos = len(selected_videos) - i |
|
|
remaining_time = total_duration - (cleaned_word_segments[-1]["end_time"] if cleaned_word_segments else 0) |
|
|
video["duration"] = round(remaining_time / remaining_videos, 2) |
|
|
continue |
|
|
|
|
|
|
|
|
next_word_index = current_word_index + word_count |
|
|
|
|
|
|
|
|
if i + 1 == len(selected_videos): |
|
|
end_time = total_duration |
|
|
else: |
|
|
if next_word_index < len(cleaned_word_segments): |
|
|
end_time = cleaned_word_segments[next_word_index]["start_time"] |
|
|
else: |
|
|
end_time = total_duration |
|
|
|
|
|
|
|
|
video["duration"] = round(end_time - start_time, 2) |
|
|
logger.debug(f" β
[{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s | {word_count} words") |
|
|
|
|
|
|
|
|
current_word_index = next_word_index |
|
|
|
|
|
|
|
|
total_calculated = sum(v.get("duration", 0) for v in selected_videos) |
|
|
logger.debug(f"β
Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)") |
|
|
|
|
|
|
|
|
def calculate_durations_with_fuzzy_matching(selected_videos, word_level_segment, total_duration: float) -> None: |
|
|
""" |
|
|
APPROACH 3: Fuzzy matching with flexible word alignment. |
|
|
Handles cases where words are missing, misspelled, or slightly different. |
|
|
First video always starts at 0, last video always ends at total_duration. |
|
|
""" |
|
|
from difflib import SequenceMatcher |
|
|
|
|
|
def clean_word(word: str) -> str: |
|
|
"""Clean a single word - remove numbers, special chars, keep only alpha""" |
|
|
return re.sub(r'[^a-zA-Z]', '', word).lower() |
|
|
|
|
|
def similarity_ratio(word1: str, word2: str) -> float: |
|
|
"""Calculate similarity between two words (0.0 to 1.0)""" |
|
|
if not word1 or not word2: |
|
|
return 0.0 |
|
|
return SequenceMatcher(None, word1, word2).ratio() |
|
|
|
|
|
|
|
|
cleaned_word_segments = [] |
|
|
for seg in word_level_segment: |
|
|
word = seg.get("word", "") |
|
|
cleaned = clean_word(word) |
|
|
if cleaned: |
|
|
cleaned_word_segments.append({ |
|
|
"cleaned": cleaned, |
|
|
"original": word, |
|
|
"start_time": seg.get("start_time", 0), |
|
|
"end_time": seg.get("end_time", 0) |
|
|
}) |
|
|
|
|
|
logger.debug(f"π Cleaned word segments: {len(cleaned_word_segments)} words") |
|
|
|
|
|
|
|
|
current_word_index = 0 |
|
|
|
|
|
for i, video in enumerate(selected_videos): |
|
|
tts_text = video.get("tts_script_segment", "").strip() |
|
|
|
|
|
if not tts_text: |
|
|
video["duration"] = 0 |
|
|
continue |
|
|
|
|
|
|
|
|
video_words = tts_text.split() |
|
|
cleaned_video_words = [clean_word(w) for w in video_words if clean_word(w)] |
|
|
|
|
|
if not cleaned_video_words: |
|
|
video["duration"] = 0 |
|
|
continue |
|
|
|
|
|
word_count = len(cleaned_video_words) |
|
|
logger.debug(f" Video {i}: Fuzzy matching {word_count} words starting at index {current_word_index}") |
|
|
|
|
|
|
|
|
if i == 0: |
|
|
start_time = 0.0 |
|
|
elif current_word_index < len(cleaned_word_segments): |
|
|
start_time = cleaned_word_segments[current_word_index]["start_time"] |
|
|
else: |
|
|
logger.warning(f" β οΈ Out of word segments") |
|
|
remaining_videos = len(selected_videos) - i |
|
|
remaining_time = total_duration - (cleaned_word_segments[-1]["end_time"] if cleaned_word_segments else 0) |
|
|
video["duration"] = round(remaining_time / remaining_videos, 2) |
|
|
continue |
|
|
|
|
|
|
|
|
matched_count = 0 |
|
|
search_index = current_word_index |
|
|
last_matched_index = current_word_index - 1 |
|
|
|
|
|
for video_word in cleaned_video_words: |
|
|
found = False |
|
|
|
|
|
|
|
|
search_end = min(search_index + 5, len(cleaned_word_segments)) |
|
|
|
|
|
for j in range(search_index, search_end): |
|
|
segment_word = cleaned_word_segments[j]["cleaned"] |
|
|
|
|
|
|
|
|
if video_word == segment_word: |
|
|
matched_count += 1 |
|
|
last_matched_index = j |
|
|
search_index = j + 1 |
|
|
found = True |
|
|
break |
|
|
|
|
|
|
|
|
if video_word in segment_word or segment_word in video_word: |
|
|
matched_count += 1 |
|
|
last_matched_index = j |
|
|
search_index = j + 1 |
|
|
found = True |
|
|
logger.debug(f" Substring match: '{video_word}' β '{segment_word}'") |
|
|
break |
|
|
|
|
|
|
|
|
similarity = similarity_ratio(video_word, segment_word) |
|
|
if similarity >= 0.75: |
|
|
matched_count += 1 |
|
|
last_matched_index = j |
|
|
search_index = j + 1 |
|
|
found = True |
|
|
logger.debug(f" Fuzzy match: '{video_word}' β '{segment_word}' (sim: {similarity:.2f})") |
|
|
break |
|
|
|
|
|
if not found: |
|
|
logger.debug(f" No match for '{video_word}'") |
|
|
|
|
|
|
|
|
if matched_count > 0: |
|
|
next_word_index = last_matched_index + 1 |
|
|
logger.debug(f" β Matched {matched_count}/{word_count} words") |
|
|
else: |
|
|
logger.warning(f" β οΈ No matches, estimating position") |
|
|
next_word_index = min(current_word_index + word_count, len(cleaned_word_segments)) |
|
|
|
|
|
|
|
|
if i + 1 == len(selected_videos): |
|
|
end_time = total_duration |
|
|
else: |
|
|
if next_word_index < len(cleaned_word_segments): |
|
|
end_time = cleaned_word_segments[next_word_index]["start_time"] |
|
|
else: |
|
|
end_time = total_duration |
|
|
|
|
|
|
|
|
video["duration"] = round(end_time - start_time, 2) |
|
|
logger.debug(f" β
[{start_time:.2f}s - {end_time:.2f}s] = {video['duration']}s") |
|
|
|
|
|
|
|
|
current_word_index = next_word_index |
|
|
|
|
|
|
|
|
total_calculated = sum(v.get("duration", 0) for v in selected_videos) |
|
|
logger.debug(f"β
Total calculated duration: {total_calculated:.2f}s (expected: {total_duration:.2f}s)") |
|
|
|
|
|
def is_video_loopable(video_path, frame_check_window=10, threshold=15.0): |
|
|
if not video_path: |
|
|
return False |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video: {video_path}") |
|
|
|
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
if frame_count <= frame_check_window * 2: |
|
|
cap.release() |
|
|
return False |
|
|
|
|
|
frame_indices = list(range(frame_check_window)) + \ |
|
|
list(range(frame_count - frame_check_window, frame_count)) |
|
|
|
|
|
frames = [] |
|
|
for idx in frame_indices: |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
|
|
ret, frame = cap.read() |
|
|
if not ret or frame is None: |
|
|
continue |
|
|
frame = cv2.resize(frame, (128, 128)) |
|
|
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
if len(frames) < 2 * frame_check_window: |
|
|
return False |
|
|
|
|
|
start_frames = np.array(frames[:frame_check_window]) |
|
|
end_frames = np.array(frames[-frame_check_window:]) |
|
|
|
|
|
diff = np.mean(np.abs(start_frames.astype(np.float32) - end_frames.astype(np.float32))) |
|
|
logger.debug(f"π Mean frame difference: {diff:.2f}") |
|
|
|
|
|
return diff < threshold |
|
|
|
|
|
|
|
|
def is_loopable_phash(video_path, hash_diff_threshold=8): |
|
|
if not video_path: |
|
|
return False |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video: {video_path}") |
|
|
|
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
if frame_count < 2: |
|
|
cap.release() |
|
|
return False |
|
|
|
|
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
|
|
ret, start = cap.read() |
|
|
if not ret or start is None: |
|
|
cap.release() |
|
|
return False |
|
|
|
|
|
|
|
|
last_frame_index = frame_count - 1 |
|
|
ret, end = False, None |
|
|
while not ret and last_frame_index > frame_count - 10: |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, last_frame_index) |
|
|
ret, end = cap.read() |
|
|
last_frame_index -= 1 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
if end is None or not ret: |
|
|
return False |
|
|
|
|
|
start_hash = imagehash.phash(Image.fromarray(cv2.cvtColor(start, cv2.COLOR_BGR2RGB))) |
|
|
end_hash = imagehash.phash(Image.fromarray(cv2.cvtColor(end, cv2.COLOR_BGR2RGB))) |
|
|
|
|
|
diff = abs(start_hash - end_hash) |
|
|
logger.debug(f"π§© pHash difference: {diff}") |
|
|
|
|
|
return diff < hash_diff_threshold |
|
|
|
|
|
def is_video_zoomable_tail(video_path, tail_seconds=1, sample_frames=15, motion_threshold=1.5): |
|
|
""" |
|
|
Checks only the *last few seconds* of the video to see if it's already zooming. |
|
|
Returns True if mostly static (safe to add zoom), False if motion already exists. |
|
|
""" |
|
|
return False |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video: {video_path}") |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
duration = total_frames / fps if fps > 0 else 0 |
|
|
|
|
|
|
|
|
start_frame = max(total_frames - int(tail_seconds * fps), 0) |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) |
|
|
|
|
|
frames = [] |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) |
|
|
cap.release() |
|
|
|
|
|
if len(frames) < 2: |
|
|
return True |
|
|
|
|
|
|
|
|
step = max(len(frames) // sample_frames, 1) |
|
|
total_motion = 0 |
|
|
motion_samples = 0 |
|
|
|
|
|
for i in range(0, len(frames) - step, step): |
|
|
prev_gray = frames[i] |
|
|
gray = frames[i + step] |
|
|
flow = cv2.calcOpticalFlowFarneback( |
|
|
prev_gray, gray, None, |
|
|
pyr_scale=0.5, levels=3, winsize=15, |
|
|
iterations=3, poly_n=5, poly_sigma=1.2, flags=0 |
|
|
) |
|
|
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) |
|
|
total_motion += np.mean(mag) |
|
|
motion_samples += 1 |
|
|
|
|
|
avg_motion = total_motion / motion_samples if motion_samples else 0 |
|
|
logger.debug(f"π₯ Tail motion magnitude: {avg_motion:.3f}") |
|
|
|
|
|
|
|
|
return avg_motion < motion_threshold |
|
|
|
|
|
def selective_update_with_keymaps(source: dict, modified: dict, source_keys: list, modified_keys: list) -> dict: |
|
|
""" |
|
|
Update 'source' dict by copying values from 'modified' dict. |
|
|
Each pair (source_key, modified_key) defines a mapping. |
|
|
|
|
|
Example: |
|
|
source_keys = ["url", "description"] |
|
|
modified_keys = ["video_url", "desc_text"] |
|
|
""" |
|
|
updated = source.copy() |
|
|
|
|
|
for s_key, m_key in zip(source_keys, modified_keys): |
|
|
if m_key in modified: |
|
|
updated[s_key] = modified[m_key] |
|
|
|
|
|
return updated |
|
|
|
|
|
def clean_tts_script(tts_script: str) -> str: |
|
|
"""Split and clean TTS script joined by '-'.""" |
|
|
if tts_script: |
|
|
|
|
|
parts = [part.strip() for part in tts_script.split('-') if part.strip()] |
|
|
return " ".join(parts).rstrip(".") |
|
|
return "" |
|
|
|
|
|
def reverse_clip(path_or_clip) -> str: |
|
|
input_path = "" |
|
|
|
|
|
if hasattr(path_or_clip, "write_videofile"): |
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_input: |
|
|
input_path = temp_input.name |
|
|
path_or_clip.write_videofile( |
|
|
input_path, |
|
|
codec="libx264", |
|
|
audio_codec="aac", |
|
|
verbose=False, |
|
|
logger=None, |
|
|
fps=25 |
|
|
) |
|
|
|
|
|
elif isinstance(path_or_clip, str): |
|
|
input_path = path_or_clip |
|
|
|
|
|
"""Reverse both video and audio using ffmpeg.""" |
|
|
out_path = os.path.join(tempfile.gettempdir(), f"{uuid.uuid4().hex[:8]}_reversed.mp4") |
|
|
|
|
|
subprocess.run([ |
|
|
"ffmpeg", "-hide_banner", "-loglevel", "error", |
|
|
"-y", "-i", input_path, |
|
|
"-vf", "reverse", |
|
|
"-af", "areverse", |
|
|
out_path |
|
|
], check=True) |
|
|
|
|
|
return out_path |
|
|
|
|
|
def interpolate_video(input_path: str, target_duration: float = 4.0, fps: int = 60) -> str: |
|
|
""" |
|
|
Smoothly extend a short video using motion interpolation. |
|
|
Works entirely on CPU (no GPU required). |
|
|
|
|
|
Args: |
|
|
input_path: path to input video |
|
|
target_duration: desired output length (seconds) |
|
|
fps: target output framerate (default 60) |
|
|
""" |
|
|
return None |
|
|
|
|
|
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", |
|
|
"-of", "default=noprint_wrappers=1:nokey=1", input_path] |
|
|
duration_str = subprocess.check_output(cmd).decode().strip() |
|
|
duration = float(duration_str) |
|
|
|
|
|
|
|
|
stretch_factor = target_duration / duration |
|
|
|
|
|
|
|
|
base = os.path.splitext(os.path.basename(input_path))[0] |
|
|
output_path = f"/tmp/{base}_interp.mp4" |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-i", input_path, |
|
|
"-filter_complex", |
|
|
f"[0:v]setpts={stretch_factor}*PTS," |
|
|
f"minterpolate='mi_mode=mci:mc_mode=aobmc:vsbmc=1:fps={fps}'[v]", |
|
|
"-map", "[v]", |
|
|
"-an", |
|
|
"-c:v", "libx264", |
|
|
"-preset", "fast", |
|
|
"-crf", "18", |
|
|
"-y", output_path |
|
|
] |
|
|
|
|
|
subprocess.run(cmd, check=True) |
|
|
return output_path |
|
|
|
|
|
def _get_video_resolution(path: str) -> tuple[int, int]: |
|
|
cmd = [ |
|
|
"ffprobe", |
|
|
"-v", "error", |
|
|
"-select_streams", "v:0", |
|
|
"-show_entries", "stream=width,height", |
|
|
"-of", "json", |
|
|
path, |
|
|
] |
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
if result.returncode != 0: |
|
|
raise RuntimeError(f"ffprobe failed:\n{result.stderr.decode()}") |
|
|
|
|
|
info = json.loads(result.stdout) |
|
|
stream = info["streams"][0] |
|
|
return stream["width"], stream["height"] |
|
|
|
|
|
def _get_pixel_format(path: str) -> str: |
|
|
cmd = [ |
|
|
"ffprobe", |
|
|
"-v", "error", |
|
|
"-select_streams", "v:0", |
|
|
"-show_entries", "stream=pix_fmt", |
|
|
"-of", "json", |
|
|
path, |
|
|
] |
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
if result.returncode != 0: |
|
|
return "" |
|
|
|
|
|
info = json.loads(result.stdout) |
|
|
return info["streams"][0].get("pix_fmt", "") |
|
|
|
|
|
def resize_video(input_path: str, target_width: int = 1080, target_height: int = 1920, overwrite: bool = False, force: bool = False) -> str: |
|
|
""" |
|
|
Resize a video to the given resolution (default 1080x1920) using FFmpeg. |
|
|
If overwrite=True, replaces the original file safely after successful conversion. |
|
|
If force=True, re-encodes even if the resolution already matches. |
|
|
""" |
|
|
if not os.path.exists(input_path): |
|
|
raise FileNotFoundError(f"Input video not found: {input_path}") |
|
|
|
|
|
|
|
|
width, height = _get_video_resolution(input_path) |
|
|
pix_fmt = _get_pixel_format(input_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not force and width == target_width and height == target_height and pix_fmt == "yuv420p": |
|
|
logger.debug( |
|
|
f"Skipping resize (already {width}x{height}, {pix_fmt}): {os.path.basename(input_path)}" |
|
|
) |
|
|
return input_path |
|
|
|
|
|
logger.debug( |
|
|
f"Resizing/Re-encoding {os.path.basename(input_path)} " |
|
|
f"({width}x{height}, {pix_fmt}) β ({target_width}x{target_height}, yuv420p)" |
|
|
) |
|
|
|
|
|
temp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}.mp4") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error", |
|
|
"-i", input_path, |
|
|
"-vf", f"scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height},setsar=1", |
|
|
"-c:v", "libx264", "-crf", "18", "-preset", "slow", |
|
|
"-pix_fmt", "yuv420p", |
|
|
"-c:a", "copy", |
|
|
temp_output |
|
|
] |
|
|
|
|
|
|
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
if result.returncode != 0: |
|
|
if os.path.exists(temp_output): |
|
|
os.remove(temp_output) |
|
|
raise RuntimeError(f"FFmpeg failed:\n{result.stderr.decode('utf-8', errors='ignore')}") |
|
|
|
|
|
|
|
|
if overwrite: |
|
|
shutil.move(temp_output, input_path) |
|
|
return input_path |
|
|
|
|
|
return temp_output |
|
|
|
|
|
def remove_black_padding(input_path: str, overwrite: bool = False, threshold_pct: float = 0.1) -> str: |
|
|
""" |
|
|
Automatically detect and remove black padding (crop only) using FFmpeg. |
|
|
Saves to /tmp with a unique UUID filename unless overwrite=True. |
|
|
|
|
|
Args: |
|
|
input_path (str): Path to the input video. |
|
|
overwrite (bool): If True, safely replace the original file. |
|
|
threshold_pct (float): Only crop if black padding > threshold_pct (0.0 to 1.0). |
|
|
0.0 = always crop if any padding detected. |
|
|
|
|
|
Returns: |
|
|
str: Path to the cropped video (or original if no crop needed). |
|
|
""" |
|
|
if get_config_value("test_automation"): |
|
|
return input_path |
|
|
if not os.path.exists(input_path): |
|
|
raise FileNotFoundError(f"Input video not found: {input_path}") |
|
|
|
|
|
|
|
|
detect_cmd = [ |
|
|
"ffmpeg", "-i", input_path, "-vf", "cropdetect=24:16:0", |
|
|
"-frames:v", "500", "-f", "null", "-" |
|
|
] |
|
|
result = subprocess.run(detect_cmd, stderr=subprocess.PIPE, text=True) |
|
|
matches = re.findall(r"crop=\S+", result.stderr) |
|
|
|
|
|
if not matches: |
|
|
logger.debug("No black padding detected.") |
|
|
return input_path |
|
|
|
|
|
|
|
|
crop_value = max(set(matches), key=matches.count) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
match = re.search(r"crop=(\d+):(\d+):(\d+):(\d+)", crop_value) |
|
|
if match: |
|
|
c_w, c_h, _, _ = map(int, match.groups()) |
|
|
|
|
|
|
|
|
orig_w, orig_h = _get_video_resolution(input_path) |
|
|
|
|
|
orig_area = orig_w * orig_h |
|
|
crop_area = c_w * c_h |
|
|
padding_area = orig_area - crop_area |
|
|
padding_pct = padding_area / orig_area if orig_area > 0 else 0 |
|
|
|
|
|
if padding_pct < threshold_pct: |
|
|
logger.debug(f"Skipping crop: Padding {padding_pct:.1%} < Threshold {threshold_pct:.1%}") |
|
|
return input_path |
|
|
|
|
|
logger.debug(f"Detected crop: {crop_value} (Padding: {padding_pct:.1%})") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Could not parse crop value '{crop_value}' for threshold check: {e}") |
|
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Proceeding with crop: {crop_value}") |
|
|
|
|
|
|
|
|
tmp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_cropped.mp4") |
|
|
|
|
|
|
|
|
crop_cmd = ["ffmpeg", "-y", "-i", input_path, "-vf", crop_value, "-c:a", "copy", tmp_output] |
|
|
crop_proc = subprocess.run(crop_cmd, stderr=subprocess.PIPE, text=True) |
|
|
|
|
|
if crop_proc.returncode != 0: |
|
|
raise RuntimeError(f"FFmpeg crop failed:\n{crop_proc.stderr}") |
|
|
|
|
|
|
|
|
if overwrite: |
|
|
shutil.move(tmp_output, input_path) |
|
|
return input_path |
|
|
|
|
|
return tmp_output |
|
|
|
|
|
def trim_black_frames( |
|
|
input_path: str, |
|
|
overwrite: bool = False, |
|
|
black_threshold: int = 20, |
|
|
min_frames_to_trim: int = 1, |
|
|
max_frames_to_trim: int = 30 |
|
|
) -> str: |
|
|
""" |
|
|
Detect and remove solid black frames from the start and end of a video. |
|
|
|
|
|
Uses FFmpeg showinfo filter to analyze frame luminance (Y channel mean). |
|
|
A frame is considered black if its Y mean is <= black_threshold. |
|
|
|
|
|
Args: |
|
|
input_path: Path to the input video |
|
|
overwrite: If True, replace the original file |
|
|
black_threshold: Maximum Y luminance value to consider a frame as black (0-255) |
|
|
Default 20 catches pure black (16) with some tolerance |
|
|
min_frames_to_trim: Minimum black frames at start/end to trigger trimming |
|
|
max_frames_to_trim: Maximum frames to check at start/end |
|
|
|
|
|
Returns: |
|
|
Path to the trimmed video, or original path if no trimming needed |
|
|
""" |
|
|
if not os.path.exists(input_path): |
|
|
raise FileNotFoundError(f"Input video not found: {input_path}") |
|
|
|
|
|
|
|
|
probe_cmd = [ |
|
|
"ffprobe", "-v", "error", |
|
|
"-select_streams", "v:0", |
|
|
"-show_entries", "stream=nb_frames,r_frame_rate,duration", |
|
|
"-show_entries", "format=duration", |
|
|
"-of", "json", input_path |
|
|
] |
|
|
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) |
|
|
|
|
|
if probe_result.returncode != 0: |
|
|
logger.warning(f"Failed to probe video: {input_path}") |
|
|
return input_path |
|
|
|
|
|
probe_data = json.loads(probe_result.stdout) |
|
|
|
|
|
|
|
|
fps_str = probe_data.get("streams", [{}])[0].get("r_frame_rate", "25/1") |
|
|
fps_parts = fps_str.split("/") |
|
|
fps = float(fps_parts[0]) / float(fps_parts[1]) if len(fps_parts) == 2 else float(fps_parts[0]) |
|
|
|
|
|
|
|
|
duration = float(probe_data.get("format", {}).get("duration", 0)) |
|
|
if duration == 0: |
|
|
duration = float(probe_data.get("streams", [{}])[0].get("duration", 0)) |
|
|
|
|
|
if duration <= 0: |
|
|
logger.warning(f"Could not determine video duration: {input_path}") |
|
|
return input_path |
|
|
|
|
|
|
|
|
start_black_frames = _count_black_frames_at_position( |
|
|
input_path, "start", max_frames_to_trim, black_threshold, fps |
|
|
) |
|
|
|
|
|
|
|
|
end_black_frames = _count_black_frames_at_position( |
|
|
input_path, "end", max_frames_to_trim, black_threshold, fps, duration |
|
|
) |
|
|
|
|
|
logger.debug(f"π¬ Black frame analysis: start={start_black_frames}, end={end_black_frames}") |
|
|
|
|
|
|
|
|
if start_black_frames < min_frames_to_trim and end_black_frames < min_frames_to_trim: |
|
|
logger.debug(f"β
No black frames to trim in: {os.path.basename(input_path)}") |
|
|
return input_path |
|
|
|
|
|
|
|
|
start_trim_time = start_black_frames / fps if start_black_frames >= min_frames_to_trim else 0 |
|
|
end_trim_time = end_black_frames / fps if end_black_frames >= min_frames_to_trim else 0 |
|
|
|
|
|
|
|
|
new_duration = duration - start_trim_time - end_trim_time |
|
|
|
|
|
if new_duration <= 0.1: |
|
|
logger.warning(f"β οΈ Trimming would remove entire video, skipping: {input_path}") |
|
|
return input_path |
|
|
|
|
|
logger.debug( |
|
|
f"βοΈ Trimming black frames: {os.path.basename(input_path)} " |
|
|
f"(start: {start_trim_time:.3f}s, end: {end_trim_time:.3f}s)" |
|
|
) |
|
|
|
|
|
|
|
|
temp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_trimmed.mp4") |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error", |
|
|
"-ss", str(start_trim_time), |
|
|
"-i", input_path, |
|
|
"-t", str(new_duration), |
|
|
"-c:v", "libx264", "-preset", "fast", "-crf", "18", |
|
|
"-pix_fmt", "yuv420p", |
|
|
"-c:a", "copy", |
|
|
temp_output |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
|
|
if result.returncode != 0: |
|
|
logger.error(f"FFmpeg trim failed: {result.stderr}") |
|
|
return input_path |
|
|
|
|
|
logger.debug(f"β
Trimmed video saved: {temp_output}") |
|
|
|
|
|
|
|
|
if overwrite: |
|
|
shutil.move(temp_output, input_path) |
|
|
return input_path |
|
|
|
|
|
return temp_output |
|
|
|
|
|
|
|
|
def _count_black_frames_at_position( |
|
|
video_path: str, |
|
|
position: str, |
|
|
max_frames: int, |
|
|
black_threshold: int, |
|
|
fps: float, |
|
|
duration: float = 0 |
|
|
) -> int: |
|
|
""" |
|
|
Count consecutive black frames at the start or end of a video. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
position: "start" or "end" |
|
|
max_frames: Maximum frames to analyze |
|
|
black_threshold: Y luminance threshold for black detection |
|
|
fps: Video frame rate |
|
|
duration: Video duration (required for "end" position) |
|
|
|
|
|
Returns: |
|
|
Number of consecutive black frames at the specified position |
|
|
""" |
|
|
|
|
|
|
|
|
if position == "end" and duration > 0: |
|
|
seek_time = max(0, duration - (max_frames / fps) - 0.5) |
|
|
ss_arg = ["-ss", str(seek_time)] |
|
|
else: |
|
|
ss_arg = [] |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", "-hide_banner", |
|
|
*ss_arg, |
|
|
"-i", video_path, |
|
|
"-vf", f"select='lte(n,{max_frames})',showinfo", |
|
|
"-frames:v", str(max_frames + 5), |
|
|
"-f", "null", "-" |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) |
|
|
|
|
|
if result.returncode != 0: |
|
|
return 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
frame_means = [] |
|
|
for line in result.stderr.split('\n'): |
|
|
match = re.search(r'mean:\[(\d+)\s+\d+\s+\d+\]', line) |
|
|
if match: |
|
|
y_mean = int(match.group(1)) |
|
|
frame_means.append(y_mean) |
|
|
|
|
|
if not frame_means: |
|
|
return 0 |
|
|
|
|
|
|
|
|
if position == "start": |
|
|
|
|
|
black_count = 0 |
|
|
for y_mean in frame_means: |
|
|
if y_mean <= black_threshold: |
|
|
black_count += 1 |
|
|
else: |
|
|
break |
|
|
return black_count |
|
|
else: |
|
|
|
|
|
black_count = 0 |
|
|
for y_mean in reversed(frame_means): |
|
|
if y_mean <= black_threshold: |
|
|
black_count += 1 |
|
|
else: |
|
|
break |
|
|
return black_count |
|
|
|
|
|
|
|
|
def ratio_1x1_to9x16(video_path, overwrite=False): |
|
|
""" |
|
|
Convert a 1:1 video to 9:16 by adding blurred padding using FFmpeg. |
|
|
Saves to /tmp with a unique UUID filename unless overwrite=True. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to the input video. |
|
|
overwrite (bool): If True, safely replace the original file. |
|
|
|
|
|
Returns: |
|
|
str: Path to the converted video. |
|
|
""" |
|
|
if not os.path.exists(video_path): |
|
|
raise FileNotFoundError(f"Input video not found: {video_path}") |
|
|
|
|
|
tmp_output = os.path.join("/tmp", f"{uuid.uuid4().hex}_9x16.mp4") |
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-i", video_path, |
|
|
"-vf", "crop=min(iw\\,ih):min(iw\\,ih),scale=1080:1080,pad=1080:1920:0:(1920-1080)/2:black", |
|
|
"-c:a", "copy", |
|
|
"-y", |
|
|
tmp_output |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
if result.returncode != 0: |
|
|
raise RuntimeError(f"FFmpeg failed:\n{result.stderr.decode('utf-8', errors='ignore')}") |
|
|
|
|
|
if overwrite: |
|
|
shutil.move(tmp_output, video_path) |
|
|
return video_path |
|
|
|
|
|
return tmp_output |
|
|
|
|
|
def get_best_beat_method(audio_path: str, min_interval: float = 1.0, target_beats: int = 10) -> tuple[np.ndarray, str]: |
|
|
""" |
|
|
Try all beat detection methods and return the one with closest to target number of beats. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
min_interval: Minimum time between beats in seconds |
|
|
target_beats: Desired number of beats (default 10 for 10-15 sec videos) |
|
|
|
|
|
Returns: |
|
|
Tuple of (beat_times, method_name) |
|
|
""" |
|
|
methods = ["kick", "snare", "downbeat", "general"] |
|
|
results = {} |
|
|
|
|
|
logger.debug(f"Testing all beat detection methods (target: ~{target_beats} beats)...") |
|
|
|
|
|
for method in methods: |
|
|
try: |
|
|
beat_times = get_beat_times(audio_path, beat_type=method, min_interval=min_interval) |
|
|
results[method] = beat_times |
|
|
logger.debug(f"{method:12s}: {len(beat_times):2d} beats detected") |
|
|
except Exception as e: |
|
|
logger.debug(f"{method:12s}: ERROR - {e}") |
|
|
results[method] = np.array([]) |
|
|
|
|
|
|
|
|
valid_results = {k: v for k, v in results.items() if len(v) > 0} |
|
|
|
|
|
if not valid_results: |
|
|
return None, None |
|
|
|
|
|
|
|
|
best_method = min(valid_results.keys(), key=lambda k: abs(len(valid_results[k]) - target_beats)) |
|
|
best_beats = valid_results[best_method] |
|
|
|
|
|
logger.debug(f"Selected: {best_method} with {len(best_beats)} beats (closest to target)") |
|
|
|
|
|
return best_beats, best_method |
|
|
|
|
|
|
|
|
def get_kick_times(audio_path: str, min_interval: float = 1.0) -> np.ndarray: |
|
|
""" |
|
|
Detect kick drum hits (low frequency emphasis). |
|
|
Kicks are the "boom" - usually the strongest low-end hits. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
min_interval: Minimum time between kicks in seconds |
|
|
|
|
|
Returns: |
|
|
Array of kick drum timestamps in seconds |
|
|
""" |
|
|
y, sr = librosa.load(audio_path) |
|
|
|
|
|
|
|
|
y_harmonic, y_percussive = librosa.effects.hpss(y, margin=2.0) |
|
|
|
|
|
|
|
|
y_bass = librosa.effects.percussive(y_percussive, margin=4.0) |
|
|
|
|
|
|
|
|
onset_env = librosa.onset.onset_strength( |
|
|
y=y_bass, |
|
|
sr=sr, |
|
|
aggregate=np.median, |
|
|
fmax=200, |
|
|
n_mels=128 |
|
|
) |
|
|
|
|
|
|
|
|
onset_frames = librosa.onset.onset_detect( |
|
|
onset_envelope=onset_env, |
|
|
sr=sr, |
|
|
backtrack=False, |
|
|
pre_max=3, |
|
|
post_max=3, |
|
|
pre_avg=3, |
|
|
post_avg=5, |
|
|
delta=0.15, |
|
|
wait=8 |
|
|
) |
|
|
|
|
|
kick_times = librosa.frames_to_time(onset_frames, sr=sr) |
|
|
|
|
|
logger.debug(f"Raw kick detections: {len(kick_times)}") |
|
|
|
|
|
|
|
|
return _filter_by_min_interval(kick_times, min_interval) |
|
|
|
|
|
|
|
|
def get_snare_times(audio_path: str, min_interval: float = 1.0) -> np.ndarray: |
|
|
""" |
|
|
Detect snare/clap hits (mid-high frequency emphasis). |
|
|
Snares are the "crack" - sharp, crisp hits. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
min_interval: Minimum time between snares in seconds |
|
|
|
|
|
Returns: |
|
|
Array of snare hit timestamps in seconds |
|
|
""" |
|
|
y, sr = librosa.load(audio_path) |
|
|
|
|
|
|
|
|
y_harmonic, y_percussive = librosa.effects.hpss(y, margin=2.0) |
|
|
|
|
|
|
|
|
onset_env = librosa.onset.onset_strength( |
|
|
y=y_percussive, |
|
|
sr=sr, |
|
|
aggregate=np.median, |
|
|
fmin=150, |
|
|
fmax=4000, |
|
|
n_mels=128 |
|
|
) |
|
|
|
|
|
|
|
|
onset_frames = librosa.onset.onset_detect( |
|
|
onset_envelope=onset_env, |
|
|
sr=sr, |
|
|
backtrack=False, |
|
|
pre_max=3, |
|
|
post_max=3, |
|
|
pre_avg=3, |
|
|
post_avg=5, |
|
|
delta=0.15, |
|
|
wait=8 |
|
|
) |
|
|
|
|
|
snare_times = librosa.frames_to_time(onset_frames, sr=sr) |
|
|
|
|
|
logger.debug(f"Raw snare detections: {len(snare_times)}") |
|
|
|
|
|
|
|
|
return _filter_by_min_interval(snare_times, min_interval) |
|
|
|
|
|
|
|
|
def get_downbeats(audio_path: str, min_interval: float = 1.0) -> np.ndarray: |
|
|
""" |
|
|
Detect downbeats - every Nth beat based on tempo. |
|
|
More reliable than frequency filtering for finding the "1" count. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
min_interval: Minimum time between downbeats in seconds |
|
|
|
|
|
Returns: |
|
|
Array of downbeat timestamps in seconds |
|
|
""" |
|
|
y, sr = librosa.load(audio_path) |
|
|
|
|
|
|
|
|
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units='frames') |
|
|
beat_times = librosa.frames_to_time(beat_frames, sr=sr) |
|
|
|
|
|
|
|
|
tempo_val = tempo[0] if isinstance(tempo, np.ndarray) else tempo |
|
|
logger.debug(f"Detected {len(beat_times)} total beats at {tempo_val:.1f} BPM") |
|
|
|
|
|
if len(beat_times) == 0: |
|
|
return np.array([]) |
|
|
|
|
|
|
|
|
|
|
|
beats_per_bar = 4 |
|
|
|
|
|
|
|
|
if len(beat_times) < 8: |
|
|
beats_per_bar = 2 |
|
|
|
|
|
|
|
|
downbeat_indices = np.arange(0, len(beat_times), beats_per_bar) |
|
|
downbeat_times = beat_times[downbeat_indices] |
|
|
|
|
|
logger.debug(f"Selected {len(downbeat_times)} downbeats (every {beats_per_bar} beats)") |
|
|
|
|
|
|
|
|
return _filter_by_min_interval(downbeat_times, min_interval) |
|
|
|
|
|
|
|
|
def get_general_beats(audio_path: str, min_interval: float = 1.0) -> np.ndarray: |
|
|
""" |
|
|
Fallback: Get general beat times (original method). |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
min_interval: Minimum time between beats in seconds |
|
|
|
|
|
Returns: |
|
|
Array of beat timestamps in seconds |
|
|
""" |
|
|
y, sr = librosa.load(audio_path) |
|
|
|
|
|
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr) |
|
|
beat_times = librosa.frames_to_time(beat_frames, sr=sr) |
|
|
|
|
|
logger.debug(f"Tempo: {tempo} BPM") |
|
|
logger.debug(f"Beat times: {beat_times}") |
|
|
|
|
|
return _filter_by_min_interval(beat_times, min_interval) |
|
|
|
|
|
|
|
|
def _filter_by_min_interval(times: np.ndarray, min_interval: float) -> np.ndarray: |
|
|
"""Filter timestamps to ensure minimum interval between them.""" |
|
|
if len(times) == 0: |
|
|
return times |
|
|
|
|
|
filtered = [times[0]] |
|
|
for t in times[1:]: |
|
|
if t - filtered[-1] >= min_interval: |
|
|
filtered.append(t) |
|
|
|
|
|
return np.array(filtered) |
|
|
|
|
|
|
|
|
def get_beat_times(audio_path: str, beat_type: str = "downbeat", min_interval: float = 1.0) -> np.ndarray: |
|
|
""" |
|
|
Get beat times based on specified drum element. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to audio file |
|
|
beat_type: One of "kick", "snare", "downbeat", or "general" |
|
|
min_interval: Minimum time between beats in seconds |
|
|
|
|
|
Returns: |
|
|
Array of beat timestamps in seconds |
|
|
|
|
|
Recommendation: Start with "downbeat" - it's the most reliable! |
|
|
""" |
|
|
logger.debug(f"Detecting {beat_type} beats with min_interval={min_interval}s...") |
|
|
|
|
|
if beat_type == "kick": |
|
|
result = get_kick_times(audio_path, min_interval) |
|
|
elif beat_type == "snare": |
|
|
result = get_snare_times(audio_path, min_interval) |
|
|
elif beat_type == "downbeat": |
|
|
result = get_downbeats(audio_path, min_interval) |
|
|
elif beat_type == "general": |
|
|
result = get_general_beats(audio_path, min_interval) |
|
|
else: |
|
|
raise ValueError(f"Unknown beat_type: {beat_type}. Use 'kick', 'snare', 'downbeat', or 'general'") |
|
|
|
|
|
logger.debug(f"Final result: {len(result)} {beat_type} beats detected") |
|
|
|
|
|
return result |
|
|
|
|
|
def repeat_audio_ffmpeg(input_audio, output_audio, repeat: int): |
|
|
""" |
|
|
Repeat audio multiple times, removing leading/trailing silence before repeating. |
|
|
Automatically determines the correct output format based on the file extension. |
|
|
|
|
|
Args: |
|
|
input_audio: Path to input audio file |
|
|
output_audio: Path to output audio file (extension determines format) |
|
|
repeat: Number of times to repeat the audio |
|
|
|
|
|
Returns: |
|
|
str: Path to the output file (may be modified if extension was incompatible) |
|
|
""" |
|
|
|
|
|
|
|
|
output_ext = os.path.splitext(output_audio)[1].lower() |
|
|
output_base = os.path.splitext(output_audio)[0] |
|
|
|
|
|
|
|
|
format_map = { |
|
|
'.mp3': {'codec': 'libmp3lame', 'bitrate': '192k'}, |
|
|
'.m4a': {'codec': 'aac', 'bitrate': '192k'}, |
|
|
'.aac': {'codec': 'aac', 'bitrate': '192k'}, |
|
|
'.opus': {'codec': 'libopus', 'bitrate': '128k'}, |
|
|
'.ogg': {'codec': 'libvorbis', 'bitrate': '192k'}, |
|
|
'.wav': {'codec': 'pcm_s16le', 'bitrate': None}, |
|
|
} |
|
|
|
|
|
|
|
|
if output_ext not in format_map: |
|
|
output_ext = '.m4a' |
|
|
output_audio = output_base + output_ext |
|
|
logger.debug(f"Unknown format, defaulting to: {output_audio}") |
|
|
|
|
|
audio_config = format_map[output_ext] |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=output_ext, delete=False) as tmp: |
|
|
temp_trimmed = tmp.name |
|
|
|
|
|
try: |
|
|
|
|
|
trim_cmd = [ |
|
|
"ffmpeg", "-y", |
|
|
"-i", input_audio, |
|
|
"-af", "silenceremove=start_periods=1:start_threshold=-50dB:start_duration=0:stop_periods=-1:stop_threshold=-50dB:stop_duration=0", |
|
|
"-c:a", audio_config['codec'] |
|
|
] |
|
|
|
|
|
|
|
|
if audio_config['bitrate']: |
|
|
trim_cmd.extend(["-b:a", audio_config['bitrate']]) |
|
|
|
|
|
trim_cmd.append(temp_trimmed) |
|
|
|
|
|
result = subprocess.run(trim_cmd, check=True, capture_output=True, text=True) |
|
|
|
|
|
|
|
|
repeat_cmd = [ |
|
|
"ffmpeg", "-y", |
|
|
"-stream_loop", str(repeat - 1), |
|
|
"-i", temp_trimmed, |
|
|
"-c:a", audio_config['codec'] |
|
|
] |
|
|
|
|
|
|
|
|
if audio_config['bitrate']: |
|
|
repeat_cmd.extend(["-b:a", audio_config['bitrate']]) |
|
|
|
|
|
repeat_cmd.append(output_audio) |
|
|
|
|
|
result = subprocess.run(repeat_cmd, check=True, capture_output=True, text=True) |
|
|
|
|
|
logger.debug(f"Successfully repeated audio {repeat} times, output: {output_audio}") |
|
|
|
|
|
return output_audio |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"FFmpeg error: STDOUT={e.stdout}, STDERR={e.stderr}") |
|
|
raise |
|
|
|
|
|
finally: |
|
|
|
|
|
if os.path.exists(temp_trimmed): |
|
|
os.remove(temp_trimmed) |
|
|
|
|
|
def clean_and_drop_empty( |
|
|
df: pd.DataFrame, |
|
|
column: str, |
|
|
extra_nulls: list[str] | None = None, |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Normalize Google Sheets empty values and drop rows |
|
|
where `column` is effectively empty. |
|
|
|
|
|
Handles: |
|
|
- NaN |
|
|
- "" |
|
|
- " " |
|
|
- "nan", "None", "NULL", "N/A" |
|
|
|
|
|
Args: |
|
|
df: Input DataFrame |
|
|
column: Column to validate (e.g. "VIDEO_LINK") |
|
|
extra_nulls: Optional extra string values to treat as null |
|
|
|
|
|
Returns: |
|
|
Cleaned DataFrame with valid rows only |
|
|
""" |
|
|
|
|
|
if column not in df.columns: |
|
|
raise KeyError(f"Column '{column}' not found in DataFrame") |
|
|
|
|
|
null_values = ["", "nan", "none", "null", "n/a"] |
|
|
if extra_nulls: |
|
|
null_values.extend([v.lower() for v in extra_nulls]) |
|
|
|
|
|
df = df.copy() |
|
|
|
|
|
df[column] = ( |
|
|
df[column] |
|
|
.astype(str) |
|
|
.str.strip() |
|
|
|
|
|
.replace(null_values, np.nan) |
|
|
) |
|
|
|
|
|
return df.dropna(subset=[column]) |
|
|
|
|
|
def is_valid_video(path: str) -> bool: |
|
|
if not os.path.exists(path): |
|
|
return False |
|
|
if os.path.getsize(path) < 100 * 1024: |
|
|
return False |
|
|
return True |