import gradio as gr import tempfile import os import re from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip, concatenate_audioclips import numpy as np import logging import sys import traceback import socket import shutil # Set up logging to debug issues logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)]) logger = logging.getLogger(__name__) # --- Functions --- def check_port(port): """ Check if a port is available. Returns True if the port is free, False if it's in use. """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(("0.0.0.0", port)) return True except socket.error: return False def sort_files_by_index(file_list, prefix_pattern): """ Sort files based on their numerical index in the filename. Args: file_list: List of file paths (e.g., ['file2.mp4', 'file1.mp4', 'file3.mp4']) prefix_pattern: Regex pattern to match the prefix and number (e.g., r'file(\d+)\.mp4') Returns: Sorted list of file paths """ if not file_list: return [] def extract_index(filename): match = re.match(prefix_pattern, os.path.basename(filename)) if match: return int(match.group(1)) return float('inf') # Invalid filenames go to the end sorted_files = sorted(file_list, key=extract_index) logger.info(f"Sorted files: {sorted_files}") return sorted_files def get_indices_string(file_list, prefix_pattern): """ Extract the numerical indices from filenames and return as a string. Args: file_list: List of file paths prefix_pattern: Regex pattern to match the prefix and number Returns: String of indices (e.g., '123' for ['file1.mp4', 'file2.mp4', 'file3.mp4']) """ if not file_list: return "" indices = [] for filename in file_list: match = re.match(prefix_pattern, os.path.basename(filename)) if match: indices.append(match.group(1)) return "".join(indices) def split_files_by_extension(all_files): """ Split a list of files into videos (.mp4) and audios (.mp3, .wav) based on extensions. Args: all_files: List of file paths (e.g., ['file1.mp4', 'audio1.mp3', 'file2.mp4', 'audio2.mp3']) Returns: Tuple of (video_files, audio_files) """ video_files = [] audio_files = [] for file_path in all_files: filename = os.path.basename(file_path).lower() if filename.endswith('.mp4'): video_files.append(file_path) elif filename.endswith(('.mp3', '.wav')): audio_files.append(file_path) else: logger.warning(f"Unsupported file extension for {file_path}; skipping") return video_files, audio_files def trim_silence(audio_clip, threshold=0.005): """ Trim silence from the start and end of an audio clip. Args: audio_clip: AudioFileClip object threshold: Amplitude threshold below which audio is considered silent Returns: Trimmed AudioFileClip """ try: # Get audio data as numpy array samples = audio_clip.to_soundarray(fps=44100) # Compute amplitude (RMS) if len(samples.shape) > 1: # Stereo audio amplitudes = np.sqrt(np.mean(samples**2, axis=1)) else: # Mono audio amplitudes = np.sqrt(samples**2) # Find non-silent regions non_silent = amplitudes > threshold if not np.any(non_silent): logger.warning("Audio clip is completely silent; returning original clip") return audio_clip # Find start and end indices start_idx = np.argmax(non_silent) end_idx = len(non_silent) - np.argmax(non_silent[::-1]) # Convert indices to time (seconds) start_time = start_idx / 44100 end_time = end_idx / 44100 # Ensure the trimmed duration is reasonable if end_time <= start_time: logger.warning("Trimmed duration is zero or negative; returning original clip") return audio_clip # Trim the audio trimmed_audio = audio_clip.subclip(start_time, end_time) logger.info(f"Trimmed audio from {start_time:.2f}s to {end_time:.2f}s (original duration: {audio_clip.duration:.2f}s)") return trimmed_audio except Exception as e: logger.error(f"Error trimming silence: {str(e)}") return audio_clip def merge_videos_and_audios(video_files=None, audio_files=None, orig_vol=1.0, music_vol=0.5, temp_dir=None): """ Merge multiple video clips and/or audio clips based on inputs provided. - If only video_files: Merge videos, retaining their original audio. - If only audio_files: Merge audio files into a single audio file. - If both: Merge videos and overlay the concatenated audio. Files are sorted by numerical index in their filenames (e.g., file1.mp4, file2.mp4). Args: video_files: List of video file paths (optional) audio_files: List of audio file paths (optional) orig_vol: Volume for original video audio (0.0 to 1.0) music_vol: Volume for background audio (0.0 to 1.0) temp_dir: Temporary directory to clean up (optional) Returns: Path to the merged file (video or audio) or error message. """ try: # Sort files by numerical index video_files = sort_files_by_index(video_files, r'file(\d+)\.mp4') audio_files = sort_files_by_index(audio_files, r'audio(\d+)\.(mp3|wav)') # Get indices for output naming video_indices = get_indices_string(video_files, r'file(\d+)\.mp4') audio_indices = get_indices_string(audio_files, r'audio(\d+)\.(mp3|wav)') # Ensure at least two files are provided (videos, audios, or combination) video_count = len(video_files) if video_files else 0 audio_count = len(audio_files) if audio_files else 0 total_files = video_count + audio_count logger.info(f"Starting merge with {video_count} video files and {audio_count} audio files") if total_files < 2: error_msg = "Error: Please upload at least 2 files total (videos, audios, or a combination)." logger.error(error_msg) return error_msg # Create a temporary output path (use provided temp_dir if available) if temp_dir is None: temp_dir = tempfile.mkdtemp() output_dir = temp_dir # Case 1: Audio only if audio_count >= 2 and video_count == 0: output_filename = f"combined_audio_{audio_indices}.mp3" output_path = os.path.join(output_dir, output_filename) logger.info("Merging audio files only") # Load, normalize, and trim audio clips audio_clips = [] for audio in audio_files: clip = AudioFileClip(audio).set_fps(44100) # Normalize sample rate clip = clip.to_stereo() if clip.nchannels == 1 else clip # Convert mono to stereo logger.info(f"Original audio duration for {audio}: {clip.duration:.2f}s, channels: {clip.nchannels}") trimmed_clip = trim_silence(clip) if trimmed_clip.duration > 0: audio_clips.append(trimmed_clip) else: logger.warning(f"Skipping audio file {audio} as it has zero duration after trimming") # Check if we have enough clips to concatenate if len(audio_clips) < 2: error_msg = "Error: Fewer than 2 audio clips available after trimming (clips may be silent or too short)." logger.error(error_msg) for clip in audio_clips: clip.close() return error_msg # Log durations after trimming for i, clip in enumerate(audio_clips): logger.info(f"Trimmed audio {i+1} duration: {clip.duration:.2f}s, channels: {clip.nchannels}") # Concatenate audio clips logger.info(f"Attempting to concatenate {len(audio_clips)} audio clips") final_audio_clip = concatenate_audioclips(audio_clips) logger.info(f"Concatenated audio duration: {final_audio_clip.duration:.2f}s, channels: {final_audio_clip.nchannels}") # Verify concatenated duration expected_duration = sum(clip.duration for clip in audio_clips) if abs(final_audio_clip.duration - expected_duration) > 0.1: logger.warning(f"Concatenated duration ({final_audio_clip.duration:.2f}s) does not match expected duration ({expected_duration:.2f}s)") # Write the final audio logger.info(f"Writing output audio to {output_path}") final_audio_clip.write_audiofile(output_path, codec="mp3") # Close resources final_audio_clip.close() for clip in audio_clips: clip.close() logger.info("Audio merge completed successfully") return output_path # Case 2: Video only or Video with Audio if audio_indices: output_filename = f"combined_video_{video_indices}_with_audio_{audio_indices}.mp4" else: output_filename = f"combined_video_{video_indices}.mp4" output_path = os.path.join(output_dir, output_filename) # Load and concatenate video clips video_clips = [VideoFileClip(video) for video in video_files] final_video_clip = concatenate_videoclips(video_clips, method='compose') # Determine final video duration video_duration = final_video_clip.duration or sum(clip.duration for clip in video_clips) logger.info(f"Total video duration: {video_duration:.2f}s") # Handle audio (if provided) if audio_files: logger.info("Processing audio files") # Load, normalize, and trim audio clips audio_clips = [] for audio in audio_files: clip = AudioFileClip(audio).set_fps(44100) # Normalize sample rate clip = clip.to_stereo() if clip.nchannels == 1 else clip # Convert mono to stereo logger.info(f"Original audio duration for {audio}: {clip.duration:.2f}s, channels: {clip.nchannels}") trimmed_clip = trim_silence(clip) if trimmed_clip.duration > 0: audio_clips.append(trimmed_clip) else: logger.warning(f"Skipping audio file {audio} as it has zero duration after trimming") # Log durations after trimming for i, clip in enumerate(audio_clips): logger.info(f"Trimmed audio {i+1} duration: {clip.duration:.2f}s, channels: {clip.nchannels}") if not audio_clips: logger.warning("No valid audio clips after trimming; using original video audio only") final_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None else: # Concatenate audio clips logger.info(f"Attempting to concatenate {len(audio_clips)} audio clips") concatenated_audio = concatenate_audioclips(audio_clips) logger.info(f"Concatenated audio duration: {concatenated_audio.duration:.2f}s, channels: {concatenated_audio.nchannels}") # Verify concatenated duration expected_duration = sum(clip.duration for clip in audio_clips) if abs(concatenated_audio.duration - expected_duration) > 0.1: logger.warning(f"Concatenated duration ({concatenated_audio.duration:.2f}s) does not match expected duration ({expected_duration:.2f}s)") # Adjust concatenated audio duration to match video duration (trim or loop) if concatenated_audio.duration > video_duration: concatenated_audio = concatenated_audio.subclip(0, video_duration) logger.info(f"Trimmed concatenated audio to match video duration: {concatenated_audio.duration:.2f}s") elif concatenated_audio.duration < video_duration: # Loop the audio to match video duration concatenated_audio = concatenated_audio.fx(lambda clip: clip.loop(duration=video_duration)) logger.info(f"Looped concatenated audio to match video duration: {concatenated_audio.duration:.2f}s") # Apply volume to concatenated audio concatenated_audio = concatenated_audio.volumex(music_vol) # Get original video audio (if any) and apply volume original_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None # Composite the audio tracks if original_audio: final_audio = CompositeAudioClip([original_audio, concatenated_audio]) else: final_audio = concatenated_audio else: logger.info("No audio files provided; using original video audio if available") # If no audio files provided, retain original video audio (if any) final_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None # Set the audio to the final video final_video_clip = final_video_clip.set_audio(final_audio) # Write the final video logger.info(f"Writing output video to {output_path}") final_video_clip.write_videofile(output_path, codec="libx264", fps=30, audio_codec="aac", ffmpeg_params=["-preset", "fast"]) # Close resources final_video_clip.close() for clip in video_clips: clip.close() if audio_files and audio_clips: for clip in audio_clips: clip.close() if 'concatenated_audio' in locals(): concatenated_audio.close() logger.info("Video merge completed successfully") return output_path except Exception as e: error_msg = f"Error during merging: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return error_msg finally: # Clean up temporary directory if it was created in this function if temp_dir and os.path.exists(temp_dir): try: shutil.rmtree(temp_dir) logger.info(f"Cleaned up temporary directory: {temp_dir}") except Exception as e: logger.warning(f"Failed to clean up temporary directory {temp_dir}: {str(e)}") # --- Gradio App Using Blocks --- def gradio_merge_files(file_binaries, orig_vol, music_vol, file_names=None): """ Gradio endpoint to merge videos and/or audio from binary file uploads. Args: file_binaries: List of binary data (bytes objects) orig_vol: Volume for original video audio (0.0 to 1.0) music_vol: Volume for background audio (0.0 to 1.0) file_names: List of original filenames (passed separately for API calls) Returns: Path to the merged file (video or audio) or error message """ try: logger.info(f"Received {len(file_binaries) if file_binaries else 0} binary files") if not file_binaries or len(file_binaries) < 2: error_msg = "Error: Please upload at least 2 files." logger.error(error_msg) return error_msg, None # Create a temporary directory to store uploaded files temp_dir = tempfile.mkdtemp() all_files = [] # When called via the UI, Gradio provides binary data but not filenames directly. # When called via API (e.g., n8n), we need to get filenames from the request. # For UI testing, infer filenames based on index; for API, use provided file_names. if file_names is None: # Fallback for UI: assign temporary filenames (not ideal for production) file_names = [] for idx, binary in enumerate(file_binaries): ext = ".mp4" if idx % 2 == 0 else ".mp3" # Dummy assignment for UI testing file_names.append(f"temp_file_{idx}{ext}") logger.warning("No file names provided; using temporary names for UI testing. For API calls, pass file_names.") if len(file_names) != len(file_binaries): error_msg = f"Error: Mismatch between file binaries ({len(file_binaries)}) and file names ({len(file_names)})." logger.error(error_msg) return error_msg, None # Save each binary file to the temporary directory for binary, filename in zip(file_binaries, file_names): if binary is None: logger.warning(f"Skipping None binary data for file {filename}") continue # Validate filename original_filename = os.path.basename(filename) if not re.match(r'file\d+\.mp4', original_filename, re.IGNORECASE) and \ not re.match(r'audio\d+\.(mp3|wav)', original_filename, re.IGNORECASE): logger.warning(f"Filename {original_filename} does not match expected pattern; skipping") continue # Create a temporary file path temp_file_path = os.path.join(temp_dir, original_filename) # Write the binary data to the temporary file with open(temp_file_path, 'wb') as temp_file: temp_file.write(binary) all_files.append(temp_file_path) logger.info(f"Saved uploaded file to {temp_file_path}") if len(all_files) < 2: error_msg = "Error: Fewer than 2 valid files after filtering." logger.error(error_msg) return error_msg, None # Split files into videos and audios based on extensions video_files, audio_files = split_files_by_extension(all_files) logger.info(f"Identified {len(video_files)} video files: {video_files}") logger.info(f"Identified {len(audio_files)} audio files: {audio_files}") result = merge_videos_and_audios( video_files=video_files, audio_files=audio_files, orig_vol=orig_vol, music_vol=music_vol, temp_dir=temp_dir ) if isinstance(result, str) and result.startswith("Error"): logger.error(result) return result, None else: logger.info(f"Merge successful. Output saved at: {result}") # Return appropriate output based on file type if result.endswith(".mp3"): return None, result # Audio output else: return result, None # Video output except Exception as e: error_msg = f"Error processing files: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return error_msg, None finally: # Cleanup is handled in merge_videos_and_audios pass # --- Main Execution --- if __name__ == "__main__": logger.info(f"Environment: {os.environ.get('HUGGINGFACE_SPACES', 'Not in HF Spaces')}") logger.info(f"Arguments: {sys.argv}") # Check ports in a wider range default_port = 7860 ports_to_try = list(range(default_port, default_port + 11)) # 7860 to 7870 selected_port = None for port in ports_to_try: logger.info(f"Checking if port {port} is available") if check_port(port): logger.info(f"Port {port} is available") selected_port = port break else: logger.warning(f"Port {port} is already in use") if selected_port is None: logger.error("No available ports found in range 7860-7870") sys.exit(1) logger.info("Launching Gradio Blocks interface") with gr.Blocks(title="Video and Audio Merger API") as app: gr.Markdown("## Video and Audio Merger API") gr.Markdown("Upload at least 2 files total (videos, audios, or a combination) to merge them.") gr.Markdown("For API usage, send binary files via multipart/form-data. Name videos as file1.mp4, file2.mp4, etc., and audios as audio1.mp3, audio2.mp3, etc.") with gr.Row(): file_input = gr.File(label="Upload Files (Videos: .mp4, Audios: .mp3/.wav)", type="binary", file_count="multiple") with gr.Row(): orig_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=1.0, step=0.05, label="Original Video Audio Volume") music_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=0.5, step=0.05, label="Background Audio Volume") output_video = gr.Video(label="Merged Video (if videos provided)") output_audio = gr.Audio(label="Merged Audio (if only audios provided)") merge_button = gr.Button("Merge Files") merge_button.click( fn=gradio_merge_files, inputs=[file_input, orig_vol_input, music_vol_input], outputs=[output_video, output_audio] ) try: logger.info(f"Attempting to launch Gradio app on port {selected_port}") app.queue(api_open=True) app.launch(server_port=selected_port, share=True) logger.info(f"Gradio app launched successfully on port {app.server_port}") except Exception as e: error_msg = f"Failed to launch Gradio interface: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) raise