AI_Tools_Platform / tools /beatsyncer /beatsyncer_tool.py
ik's picture
Upload folder using huggingface_hub
12ddc9e verified
import os
import random
import librosa
import numpy as np
from moviepy import *
class BeatSyncer:
AUDIO_EXTENSIONS = {'.mp3', '.wav', '.m4a', '.aac', '.ogg', '.flac'}
VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv'}
def __init__(self, audio_filename):
# Construct the full path to the audio file relative to the script's directory
script_dir = os.path.dirname(os.path.abspath(__file__))
input_path = os.path.join(script_dir, audio_filename)
# Convert to audio if needed
if self._is_video_file(input_path):
print(f"Converting video file '{input_path}' to audio...")
audio_path = self._convert_to_audio(input_path)
self.audio_path = audio_path
elif self._is_audio_file(input_path):
self.audio_path = input_path
else:
raise ValueError(f"Unsupported file format. File must be one of: {self.AUDIO_EXTENSIONS.union(self.VIDEO_EXTENSIONS)}")
self.y = None # Audio samples
self.sr = None # Sample rate
self.beat_times = None
self.beat_drop_times = None
def _is_audio_file(self, filepath):
"""Check if the file is an audio file based on its extension."""
return os.path.splitext(filepath)[1].lower() in self.AUDIO_EXTENSIONS
def _is_video_file(self, filepath):
"""Check if the file is a video file based on its extension."""
return os.path.splitext(filepath)[1].lower() in self.VIDEO_EXTENSIONS
def _convert_to_audio(self, video_path):
"""Convert a video file to audio and return the path to the audio file."""
try:
# Create tmp directory in project root if it doesn't exist
script_dir = os.path.dirname(os.path.abspath(__file__))
tmp_dir = os.path.join(script_dir, 'tmp')
os.makedirs(tmp_dir, exist_ok=True)
# Create audio filename in tmp directory
base_name = os.path.basename(video_path)
audio_name = os.path.splitext(base_name)[0] + '.mp3'
audio_path = os.path.join(tmp_dir, audio_name)
# Convert video to audio
video = VideoFileClip(video_path)
video.audio.write_audiofile(audio_path)
video.close()
return audio_path
except Exception as e:
raise ValueError(f"Error converting video to audio: {str(e)}")
def load_audio(self, duration_minutes=None):
"""
Load the audio file using librosa, limiting to duration_minutes if provided.
"""
try:
self.sr = 44100 # Set a standard sample rate
if duration_minutes is not None:
duration_seconds = duration_minutes * 60
else:
duration_seconds = None # Load full audio
self.y, _ = librosa.load(self.audio_path, sr=self.sr, duration=duration_seconds)
if self.y is None or len(self.y) == 0:
raise ValueError("Audio data is empty.")
duration = len(self.y) / self.sr
print(f"Loaded audio file '{self.audio_path}' with duration {duration:.2f} seconds.")
except Exception as e:
print(f"An error occurred while loading the audio file: {e}")
raise
def detect_beat_drops(self, sensitivity='medium'):
"""
Improved beat detection using onset_strength_multi and adjusted parameters.
Args:
sensitivity (str): Detection sensitivity level ('low', 'medium', or 'high').
- low: Less sensitive, detects only major beat drops
- medium: Balanced sensitivity (default)
- high: More sensitive, detects subtle beat changes
"""
print("Detecting beat drops...")
if self.y is None or len(self.y) == 0:
raise ValueError("Audio data is empty. Cannot perform beat detection.")
# Map sensitivity levels to parameters
sensitivity_params = {
'low': {
'hop_length': 512, # Larger hop length for less granular detection
'std_multiplier': 1.0, # Higher threshold for beat detection
'min_interval': 0.05 # Longer minimum interval between beats
},
'medium': {
'hop_length': 256, # Default hop length
'std_multiplier': 0.5, # Default threshold
'min_interval': 0.02 # Default minimum interval
},
'high': {
'hop_length': 128, # Smaller hop length for more granular detection
'std_multiplier': 0.25, # Lower threshold for beat detection
'min_interval': 0.01 # Shorter minimum interval between beats
}
}
# Validate and get sensitivity parameters
sensitivity = sensitivity.lower()
if sensitivity not in sensitivity_params:
raise ValueError(f"Invalid sensitivity level. Must be one of: {list(sensitivity_params.keys())}")
params = sensitivity_params[sensitivity]
# Parameters for onset detection
hop_length = params['hop_length']
params.update({
"detrend": True,
"tightness": 100
})
# Compute the onset envelope using onset_strength_multi with adjusted parameters
onset_env_multi = librosa.onset.onset_strength_multi(
y=self.y,
sr=self.sr,
hop_length=params["hop_length"],
aggregate=np.median,
lag=1,
max_size=1,
detrend=params["detrend"],
center=True,
)
# Sum across channels if multi-channel audio
onset_env = onset_env_multi.mean(axis=0)
# Beat tracking with adjusted parameters
tempo, beats = librosa.beat.beat_track(
onset_envelope=onset_env,
sr=self.sr,
hop_length=params["hop_length"],
tightness=params["tightness"],
units='time'
)
# Convert tempo to scalar if it's an array
tempo = float(np.mean(tempo)) if isinstance(tempo, np.ndarray) else float(tempo)
# Adjust tempo if it's unusually high (e.g., over 180 BPM)
if tempo >= 180:
tempo /= 2
print(f"Adjusted tempo: {tempo:.2f} BPM")
else:
print(f"Detected tempo: {tempo:.2f} BPM")
self.beat_times = beats
if len(self.beat_times) == 0:
raise ValueError("No beats detected in the audio.")
# Compute the difference in onset envelope with sensitivity-based threshold
onset_diff = np.diff(onset_env)
threshold = np.mean(onset_diff) + params['std_multiplier'] * np.std(onset_diff)
beat_drop_indices = np.where(onset_diff > threshold)[0] + 1
# Convert beat frames to times
onset_times = librosa.frames_to_time(
np.arange(len(onset_env)), sr=self.sr, hop_length=hop_length
)
# Map beat drop indices to times
beat_drop_times = onset_times[beat_drop_indices]
# Remove duplicate beat times
self.beat_drop_times = np.unique(beat_drop_times)
# Remove beat drops that are too close together based on sensitivity
min_interval = params['min_interval']
filtered_beat_drop_times = [self.beat_drop_times[0]]
for bd_time in self.beat_drop_times[1:]:
if bd_time - filtered_beat_drop_times[-1] >= min_interval:
filtered_beat_drop_times.append(bd_time)
self.beat_drop_times = np.array(filtered_beat_drop_times)
print(f"Detected {len(self.beat_drop_times)} beat drops with {sensitivity} sensitivity.")
def _get_media_files(self, media_dir):
"""
Get list of media files from the directory.
Returns a shuffled list of absolute paths to media files.
"""
supported_extensions = ('.mp4', '.avi', '.mov', '.png', '.jpg', '.jpeg', '.gif', '.mkv')
media_files = []
# Walk through directory
for root, _, files in os.walk(media_dir):
for file in files:
if file.lower().endswith(supported_extensions):
full_path = os.path.join(root, file)
media_files.append(full_path)
if not media_files:
raise ValueError(f"No supported media files found in directory: {media_dir}")
# Shuffle the media files list
random.shuffle(media_files)
print(f"Found and shuffled {len(media_files)} media files")
return media_files
def _calculate_text_duration(self, text):
"""
Calculate the duration a text should be displayed based on its word count.
Args:
text (str): The text to be displayed
Returns:
float: Duration in seconds
"""
# Count words (split by whitespace)
word_count = len(text.split())
# Base duration calculation:
# - Minimum duration: 2 seconds
# - Add 0.5 seconds per word
# - Maximum duration: 8 seconds
duration = min(max(2, word_count * 0.5), 8)
return duration
def _wrap_text(self, text, max_chars_per_line=30):
"""
Wrap text to ensure it fits nicely on screen.
Args:
text (str): Text to wrap
max_chars_per_line (int): Maximum characters per line
Returns:
str: Wrapped text with newlines and increased line spacing
"""
words = text.split()
lines = []
current_line = []
current_length = 0
for word in words:
word_length = len(word)
if current_length + word_length + len(current_line) <= max_chars_per_line:
current_line.append(word)
current_length += word_length
else:
lines.append(' '.join(current_line))
current_line = [word]
current_length = word_length
if current_line:
lines.append(' '.join(current_line))
# Add extra line spacing by using double newlines
return '\n'.join(lines)
def sync_with_media_directory(self, media_dir, output_path, duration_minutes=None, video_size=None, dark_overlay=0.3,
text_overlays=None):
"""
Create a video by syncing media files with beat drops.
Args:
media_dir: Directory containing images and videos
output_path: Path where the output video will be saved
duration_minutes: Desired total duration in minutes
video_size: Tuple of (width, height) for output video
dark_overlay: Opacity of dark overlay (0.0 to 1.0, where 1.0 is completely black)
text_overlays: List of text strings or tuples (text, duration_seconds).
If only text is provided, duration will be calculated based on word count.
"""
print("Creating video from media files...")
media_files = self._get_media_files(media_dir)
if not media_files:
raise ValueError(f"No media files found in directory: {media_dir}")
# Prepare clips with dark theme
clips = []
# Set desired_total_duration
if duration_minutes is not None:
desired_total_duration = duration_minutes * 60 # Convert minutes to seconds
else:
desired_total_duration = len(self.y) / self.sr # Use the length of the audio
# Limit the beat_drop_times to the desired duration
beat_drop_times = self.beat_drop_times[self.beat_drop_times <= desired_total_duration]
if len(beat_drop_times) == 0:
raise ValueError("No beat drops detected within the desired duration.")
# Include the start (0) and end times to cover the entire duration
beat_times_full = np.concatenate(([0], beat_drop_times, [desired_total_duration]))
num_media = len(media_files)
media_index = 0
synced_clips = []
# Define the desired clip length for initial blank space
initial_clip_duration = 2 # 2 seconds for each clip in the initial blank space
base_scale = 1.2 # resized factor to make clips larger than the video size
zoom_factor = 0.009 # Zoom factor for zoom effects
# Loop over intervals between beat times to cover entire duration
for i in range(len(beat_times_full) - 1):
start_time = beat_times_full[i]
end_time = beat_times_full[i + 1]
interval_duration = end_time - start_time
if i == 0 and interval_duration > initial_clip_duration:
# Handle the initial blank space with multiple clips
num_initial_clips = int(np.ceil(interval_duration / initial_clip_duration))
for j in range(num_initial_clips):
clip_start_time = start_time + j * initial_clip_duration
clip_end_time = min(clip_start_time + initial_clip_duration, end_time)
clip_duration = clip_end_time - clip_start_time
media_file = media_files[media_index % num_media]
media_index += 1
if media_file.lower().endswith(('.mp4', '.avi', '.mov')):
# It's a video file
clip = VideoFileClip(media_file).subclipped(0, clip_duration).resized(base_scale)
else:
# It's an image file
clip = ImageClip(media_file).with_duration(clip_duration).resized(base_scale)
# cropped the clip to the desired video size
clip = clip.cropped(width=video_size[0], height=video_size[1], x_center=clip.w / 2,
y_center=clip.h / 2)
# Set the start time of the clip
clip = clip.with_start(clip_start_time)
# Apply random zoom in or zoom out effect with slower zoom
zoom_type = random.choice(['zoom_in', 'zoom_out', None])
# if zoom_type == 'zoom_in':
# # Slow down zoom in effect
# clip = clip.with_effects([vfx.Resize(lambda t: 1 + zoom_factor * (t / clip.duration))])
# elif zoom_type == 'zoom_out':
# # Slow down zoom out effect
# clip = clip.with_effects([vfx.Resize(lambda t: 1 + zoom_factor * (1 - t / clip.duration))])
# Add random transition to the clip
if synced_clips:
transition_type = random.choice(['crossfadein', 'fadein', None])
if transition_type == 'crossfadein':
# clip = clip.crossfadein(0.1)
print()
elif transition_type == 'fadein':
# clip = clip.fadein(0.1)
print()
synced_clips.append(clip)
else:
# Handle regular intervals between beat drops
media_file = media_files[media_index % num_media]
media_index += 1
if media_file.lower().endswith(('.mp4', '.avi', '.mov')):
# It's a video file
clip = VideoFileClip(media_file).subclipped(0, interval_duration).resized(base_scale)
else:
# It's an image file
clip = ImageClip(media_file).with_duration(interval_duration).resized(base_scale)
# cropped the clip to the desired video size
clip = clip.cropped(width=video_size[0], height=video_size[1], x_center=clip.w / 2, y_center=clip.h / 2)
# Set the start time of the clip
clip = clip.with_start(start_time)
# Apply random zoom in or zoom out effect with slower zoom
zoom_type = random.choice(['zoom_in', 'zoom_out', None])
# if zoom_type == 'zoom_in':
# # Slow down zoom in effect
# clip = clip.with_effects([vfx.Resize(lambda t: 1 + zoom_factor * (t / clip.duration))])
# elif zoom_type == 'zoom_out':
# # Slow down zoom out effect
# clip = clip.with_effects([vfx.Resize(lambda t: 1 + zoom_factor * (1 - t / clip.duration))])
# If this is the last clip, apply fadeout
if i == len(beat_times_full) - 2:
fade_duration = min(2, clip.duration / 2)
clip = clip.with_effects([vfx.FadeOut(fade_duration)])
synced_clips.append(clip)
# Create the final video
final_clip = CompositeVideoClip(synced_clips, size=video_size)
# Create a list of all clips for final composition
all_clips = [final_clip]
# Add dark overlay if enabled
if dark_overlay > 0:
# Create a black ColorClip with the same size as the video
black_clip = ColorClip(size=video_size, color=(0, 0, 0))
black_clip = black_clip.with_duration(final_clip.duration)
black_clip = black_clip.with_opacity(dark_overlay)
all_clips.append(black_clip)
# Add text overlays if provided
if text_overlays:
current_time = 0
for text_item in text_overlays:
# Handle both string and tuple inputs
if isinstance(text_item, tuple):
text, duration = text_item
else:
text = text_item
duration = self._calculate_text_duration(text)
# Wrap text if needed
wrapped_text = self._wrap_text(text)
print(f"Adding text: '{wrapped_text}' with duration: {duration:.1f}s")
# Create text clip with white color and nice font
txt_clip = TextClip(text=wrapped_text, font_size=40, color='white',
font='Arial Black',method='label', interline=-1) # 'label' method handles multiline text better
# Center the text
txt_clip = txt_clip.with_position(('center', 'center'))
# Set the duration and start time
txt_clip = txt_clip.with_duration(duration)
txt_clip = txt_clip.with_start(current_time)
# Add fade in/out effects
txt_clip = txt_clip.with_effects([vfx.FadeIn(0.5), vfx.FadeOut(0.5)])
all_clips.append(txt_clip)
current_time += duration
# Create final composition with all layers
final_clip = CompositeVideoClip(all_clips, size=video_size)
# Set the audio of the final clip
audio_clip = AudioFileClip(self.audio_path).subclipped(0, desired_total_duration)
# Apply audio fadeout at the end
fade_duration = min(2, audio_clip.duration / 2)
audio_clip = audio_clip.with_effects([afx.AudioFadeOut(fade_duration)])
final_clip = final_clip.with_audio(audio_clip)
final_clip = final_clip.with_duration(desired_total_duration)
# Write the output video file
final_clip.write_videofile(
output_path, fps=30, threads=32, audio_codec="aac", preset='ultrafast'
)
print(f"Output video saved to '{output_path}'")