Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import tempfile | |
| import os | |
| import re | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip, concatenate_audioclips | |
| import numpy as np | |
| import logging | |
| import sys | |
| import traceback | |
| import socket | |
| import shutil | |
| # Set up logging to debug issues | |
| logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)]) | |
| logger = logging.getLogger(__name__) | |
| # --- Functions --- | |
| def check_port(port): | |
| """ | |
| Check if a port is available. | |
| Returns True if the port is free, False if it's in use. | |
| """ | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| try: | |
| s.bind(("0.0.0.0", port)) | |
| return True | |
| except socket.error: | |
| return False | |
| def sort_files_by_index(file_list, prefix_pattern): | |
| """ | |
| Sort files based on their numerical index in the filename. | |
| Args: | |
| file_list: List of file paths (e.g., ['file2.mp4', 'file1.mp4', 'file3.mp4']) | |
| prefix_pattern: Regex pattern to match the prefix and number (e.g., r'file(\d+)\.mp4') | |
| Returns: | |
| Sorted list of file paths | |
| """ | |
| if not file_list: | |
| return [] | |
| def extract_index(filename): | |
| match = re.match(prefix_pattern, os.path.basename(filename)) | |
| if match: | |
| return int(match.group(1)) | |
| return float('inf') # Invalid filenames go to the end | |
| sorted_files = sorted(file_list, key=extract_index) | |
| logger.info(f"Sorted files: {sorted_files}") | |
| return sorted_files | |
| def get_indices_string(file_list, prefix_pattern): | |
| """ | |
| Extract the numerical indices from filenames and return as a string. | |
| Args: | |
| file_list: List of file paths | |
| prefix_pattern: Regex pattern to match the prefix and number | |
| Returns: | |
| String of indices (e.g., '123' for ['file1.mp4', 'file2.mp4', 'file3.mp4']) | |
| """ | |
| if not file_list: | |
| return "" | |
| indices = [] | |
| for filename in file_list: | |
| match = re.match(prefix_pattern, os.path.basename(filename)) | |
| if match: | |
| indices.append(match.group(1)) | |
| return "".join(indices) | |
| def split_files_by_extension(all_files): | |
| """ | |
| Split a list of files into videos (.mp4) and audios (.mp3, .wav) based on extensions. | |
| Args: | |
| all_files: List of file paths (e.g., ['file1.mp4', 'audio1.mp3', 'file2.mp4', 'audio2.mp3']) | |
| Returns: | |
| Tuple of (video_files, audio_files) | |
| """ | |
| video_files = [] | |
| audio_files = [] | |
| for file_path in all_files: | |
| filename = os.path.basename(file_path).lower() | |
| if filename.endswith('.mp4'): | |
| video_files.append(file_path) | |
| elif filename.endswith(('.mp3', '.wav')): | |
| audio_files.append(file_path) | |
| else: | |
| logger.warning(f"Unsupported file extension for {file_path}; skipping") | |
| return video_files, audio_files | |
| def trim_silence(audio_clip, threshold=0.005): | |
| """ | |
| Trim silence from the start and end of an audio clip. | |
| Args: | |
| audio_clip: AudioFileClip object | |
| threshold: Amplitude threshold below which audio is considered silent | |
| Returns: | |
| Trimmed AudioFileClip | |
| """ | |
| try: | |
| # Get audio data as numpy array | |
| samples = audio_clip.to_soundarray(fps=44100) | |
| # Compute amplitude (RMS) | |
| if len(samples.shape) > 1: # Stereo audio | |
| amplitudes = np.sqrt(np.mean(samples**2, axis=1)) | |
| else: # Mono audio | |
| amplitudes = np.sqrt(samples**2) | |
| # Find non-silent regions | |
| non_silent = amplitudes > threshold | |
| if not np.any(non_silent): | |
| logger.warning("Audio clip is completely silent; returning original clip") | |
| return audio_clip | |
| # Find start and end indices | |
| start_idx = np.argmax(non_silent) | |
| end_idx = len(non_silent) - np.argmax(non_silent[::-1]) | |
| # Convert indices to time (seconds) | |
| start_time = start_idx / 44100 | |
| end_time = end_idx / 44100 | |
| # Ensure the trimmed duration is reasonable | |
| if end_time <= start_time: | |
| logger.warning("Trimmed duration is zero or negative; returning original clip") | |
| return audio_clip | |
| # Trim the audio | |
| trimmed_audio = audio_clip.subclip(start_time, end_time) | |
| logger.info(f"Trimmed audio from {start_time:.2f}s to {end_time:.2f}s (original duration: {audio_clip.duration:.2f}s)") | |
| return trimmed_audio | |
| except Exception as e: | |
| logger.error(f"Error trimming silence: {str(e)}") | |
| return audio_clip | |
| def merge_videos_and_audios(video_files=None, audio_files=None, orig_vol=1.0, music_vol=0.5, temp_dir=None): | |
| """ | |
| Merge multiple video clips and/or audio clips based on inputs provided. | |
| - If only video_files: Merge videos, retaining their original audio. | |
| - If only audio_files: Merge audio files into a single audio file. | |
| - If both: Merge videos and overlay the concatenated audio. | |
| Files are sorted by numerical index in their filenames (e.g., file1.mp4, file2.mp4). | |
| Args: | |
| video_files: List of video file paths (optional) | |
| audio_files: List of audio file paths (optional) | |
| orig_vol: Volume for original video audio (0.0 to 1.0) | |
| music_vol: Volume for background audio (0.0 to 1.0) | |
| temp_dir: Temporary directory to clean up (optional) | |
| Returns: | |
| Path to the merged file (video or audio) or error message. | |
| """ | |
| try: | |
| # Sort files by numerical index | |
| video_files = sort_files_by_index(video_files, r'file(\d+)\.mp4') | |
| audio_files = sort_files_by_index(audio_files, r'audio(\d+)\.(mp3|wav)') | |
| # Get indices for output naming | |
| video_indices = get_indices_string(video_files, r'file(\d+)\.mp4') | |
| audio_indices = get_indices_string(audio_files, r'audio(\d+)\.(mp3|wav)') | |
| # Ensure at least two files are provided (videos, audios, or combination) | |
| video_count = len(video_files) if video_files else 0 | |
| audio_count = len(audio_files) if audio_files else 0 | |
| total_files = video_count + audio_count | |
| logger.info(f"Starting merge with {video_count} video files and {audio_count} audio files") | |
| if total_files < 2: | |
| error_msg = "Error: Please upload at least 2 files total (videos, audios, or a combination)." | |
| logger.error(error_msg) | |
| return error_msg | |
| # Create a temporary output path (use provided temp_dir if available) | |
| if temp_dir is None: | |
| temp_dir = tempfile.mkdtemp() | |
| output_dir = temp_dir | |
| # Case 1: Audio only | |
| if audio_count >= 2 and video_count == 0: | |
| output_filename = f"combined_audio_{audio_indices}.mp3" | |
| output_path = os.path.join(output_dir, output_filename) | |
| logger.info("Merging audio files only") | |
| # Load, normalize, and trim audio clips | |
| audio_clips = [] | |
| for audio in audio_files: | |
| clip = AudioFileClip(audio).set_fps(44100) # Normalize sample rate | |
| clip = clip.to_stereo() if clip.nchannels == 1 else clip # Convert mono to stereo | |
| logger.info(f"Original audio duration for {audio}: {clip.duration:.2f}s, channels: {clip.nchannels}") | |
| trimmed_clip = trim_silence(clip) | |
| if trimmed_clip.duration > 0: | |
| audio_clips.append(trimmed_clip) | |
| else: | |
| logger.warning(f"Skipping audio file {audio} as it has zero duration after trimming") | |
| # Check if we have enough clips to concatenate | |
| if len(audio_clips) < 2: | |
| error_msg = "Error: Fewer than 2 audio clips available after trimming (clips may be silent or too short)." | |
| logger.error(error_msg) | |
| for clip in audio_clips: | |
| clip.close() | |
| return error_msg | |
| # Log durations after trimming | |
| for i, clip in enumerate(audio_clips): | |
| logger.info(f"Trimmed audio {i+1} duration: {clip.duration:.2f}s, channels: {clip.nchannels}") | |
| # Concatenate audio clips | |
| logger.info(f"Attempting to concatenate {len(audio_clips)} audio clips") | |
| final_audio_clip = concatenate_audioclips(audio_clips) | |
| logger.info(f"Concatenated audio duration: {final_audio_clip.duration:.2f}s, channels: {final_audio_clip.nchannels}") | |
| # Verify concatenated duration | |
| expected_duration = sum(clip.duration for clip in audio_clips) | |
| if abs(final_audio_clip.duration - expected_duration) > 0.1: | |
| logger.warning(f"Concatenated duration ({final_audio_clip.duration:.2f}s) does not match expected duration ({expected_duration:.2f}s)") | |
| # Write the final audio | |
| logger.info(f"Writing output audio to {output_path}") | |
| final_audio_clip.write_audiofile(output_path, codec="mp3") | |
| # Close resources | |
| final_audio_clip.close() | |
| for clip in audio_clips: | |
| clip.close() | |
| logger.info("Audio merge completed successfully") | |
| return output_path | |
| # Case 2: Video only or Video with Audio | |
| if audio_indices: | |
| output_filename = f"combined_video_{video_indices}_with_audio_{audio_indices}.mp4" | |
| else: | |
| output_filename = f"combined_video_{video_indices}.mp4" | |
| output_path = os.path.join(output_dir, output_filename) | |
| # Load and concatenate video clips | |
| video_clips = [VideoFileClip(video) for video in video_files] | |
| final_video_clip = concatenate_videoclips(video_clips, method='compose') | |
| # Determine final video duration | |
| video_duration = final_video_clip.duration or sum(clip.duration for clip in video_clips) | |
| logger.info(f"Total video duration: {video_duration:.2f}s") | |
| # Handle audio (if provided) | |
| if audio_files: | |
| logger.info("Processing audio files") | |
| # Load, normalize, and trim audio clips | |
| audio_clips = [] | |
| for audio in audio_files: | |
| clip = AudioFileClip(audio).set_fps(44100) # Normalize sample rate | |
| clip = clip.to_stereo() if clip.nchannels == 1 else clip # Convert mono to stereo | |
| logger.info(f"Original audio duration for {audio}: {clip.duration:.2f}s, channels: {clip.nchannels}") | |
| trimmed_clip = trim_silence(clip) | |
| if trimmed_clip.duration > 0: | |
| audio_clips.append(trimmed_clip) | |
| else: | |
| logger.warning(f"Skipping audio file {audio} as it has zero duration after trimming") | |
| # Log durations after trimming | |
| for i, clip in enumerate(audio_clips): | |
| logger.info(f"Trimmed audio {i+1} duration: {clip.duration:.2f}s, channels: {clip.nchannels}") | |
| if not audio_clips: | |
| logger.warning("No valid audio clips after trimming; using original video audio only") | |
| final_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None | |
| else: | |
| # Concatenate audio clips | |
| logger.info(f"Attempting to concatenate {len(audio_clips)} audio clips") | |
| concatenated_audio = concatenate_audioclips(audio_clips) | |
| logger.info(f"Concatenated audio duration: {concatenated_audio.duration:.2f}s, channels: {concatenated_audio.nchannels}") | |
| # Verify concatenated duration | |
| expected_duration = sum(clip.duration for clip in audio_clips) | |
| if abs(concatenated_audio.duration - expected_duration) > 0.1: | |
| logger.warning(f"Concatenated duration ({concatenated_audio.duration:.2f}s) does not match expected duration ({expected_duration:.2f}s)") | |
| # Adjust concatenated audio duration to match video duration (trim or loop) | |
| if concatenated_audio.duration > video_duration: | |
| concatenated_audio = concatenated_audio.subclip(0, video_duration) | |
| logger.info(f"Trimmed concatenated audio to match video duration: {concatenated_audio.duration:.2f}s") | |
| elif concatenated_audio.duration < video_duration: | |
| # Loop the audio to match video duration | |
| concatenated_audio = concatenated_audio.fx(lambda clip: clip.loop(duration=video_duration)) | |
| logger.info(f"Looped concatenated audio to match video duration: {concatenated_audio.duration:.2f}s") | |
| # Apply volume to concatenated audio | |
| concatenated_audio = concatenated_audio.volumex(music_vol) | |
| # Get original video audio (if any) and apply volume | |
| original_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None | |
| # Composite the audio tracks | |
| if original_audio: | |
| final_audio = CompositeAudioClip([original_audio, concatenated_audio]) | |
| else: | |
| final_audio = concatenated_audio | |
| else: | |
| logger.info("No audio files provided; using original video audio if available") | |
| # If no audio files provided, retain original video audio (if any) | |
| final_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None | |
| # Set the audio to the final video | |
| final_video_clip = final_video_clip.set_audio(final_audio) | |
| # Write the final video | |
| logger.info(f"Writing output video to {output_path}") | |
| final_video_clip.write_videofile(output_path, codec="libx264", fps=30, audio_codec="aac", ffmpeg_params=["-preset", "fast"]) | |
| # Close resources | |
| final_video_clip.close() | |
| for clip in video_clips: | |
| clip.close() | |
| if audio_files and audio_clips: | |
| for clip in audio_clips: | |
| clip.close() | |
| if 'concatenated_audio' in locals(): | |
| concatenated_audio.close() | |
| logger.info("Video merge completed successfully") | |
| return output_path | |
| except Exception as e: | |
| error_msg = f"Error during merging: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return error_msg | |
| finally: | |
| # Clean up temporary directory if it was created in this function | |
| if temp_dir and os.path.exists(temp_dir): | |
| try: | |
| shutil.rmtree(temp_dir) | |
| logger.info(f"Cleaned up temporary directory: {temp_dir}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clean up temporary directory {temp_dir}: {str(e)}") | |
| # --- Gradio App Using Blocks --- | |
| def gradio_merge_files(file_binaries, orig_vol, music_vol, file_names=None): | |
| """ | |
| Gradio endpoint to merge videos and/or audio from binary file uploads. | |
| Args: | |
| file_binaries: List of binary data (bytes objects) | |
| orig_vol: Volume for original video audio (0.0 to 1.0) | |
| music_vol: Volume for background audio (0.0 to 1.0) | |
| file_names: List of original filenames (passed separately for API calls) | |
| Returns: | |
| Path to the merged file (video or audio) or error message | |
| """ | |
| try: | |
| logger.info(f"Received {len(file_binaries) if file_binaries else 0} binary files") | |
| if not file_binaries or len(file_binaries) < 2: | |
| error_msg = "Error: Please upload at least 2 files." | |
| logger.error(error_msg) | |
| return error_msg, None | |
| # Create a temporary directory to store uploaded files | |
| temp_dir = tempfile.mkdtemp() | |
| all_files = [] | |
| # When called via the UI, Gradio provides binary data but not filenames directly. | |
| # When called via API (e.g., n8n), we need to get filenames from the request. | |
| # For UI testing, infer filenames based on index; for API, use provided file_names. | |
| if file_names is None: | |
| # Fallback for UI: assign temporary filenames (not ideal for production) | |
| file_names = [] | |
| for idx, binary in enumerate(file_binaries): | |
| ext = ".mp4" if idx % 2 == 0 else ".mp3" # Dummy assignment for UI testing | |
| file_names.append(f"temp_file_{idx}{ext}") | |
| logger.warning("No file names provided; using temporary names for UI testing. For API calls, pass file_names.") | |
| if len(file_names) != len(file_binaries): | |
| error_msg = f"Error: Mismatch between file binaries ({len(file_binaries)}) and file names ({len(file_names)})." | |
| logger.error(error_msg) | |
| return error_msg, None | |
| # Save each binary file to the temporary directory | |
| for binary, filename in zip(file_binaries, file_names): | |
| if binary is None: | |
| logger.warning(f"Skipping None binary data for file {filename}") | |
| continue | |
| # Validate filename | |
| original_filename = os.path.basename(filename) | |
| if not re.match(r'file\d+\.mp4', original_filename, re.IGNORECASE) and \ | |
| not re.match(r'audio\d+\.(mp3|wav)', original_filename, re.IGNORECASE): | |
| logger.warning(f"Filename {original_filename} does not match expected pattern; skipping") | |
| continue | |
| # Create a temporary file path | |
| temp_file_path = os.path.join(temp_dir, original_filename) | |
| # Write the binary data to the temporary file | |
| with open(temp_file_path, 'wb') as temp_file: | |
| temp_file.write(binary) | |
| all_files.append(temp_file_path) | |
| logger.info(f"Saved uploaded file to {temp_file_path}") | |
| if len(all_files) < 2: | |
| error_msg = "Error: Fewer than 2 valid files after filtering." | |
| logger.error(error_msg) | |
| return error_msg, None | |
| # Split files into videos and audios based on extensions | |
| video_files, audio_files = split_files_by_extension(all_files) | |
| logger.info(f"Identified {len(video_files)} video files: {video_files}") | |
| logger.info(f"Identified {len(audio_files)} audio files: {audio_files}") | |
| result = merge_videos_and_audios( | |
| video_files=video_files, | |
| audio_files=audio_files, | |
| orig_vol=orig_vol, | |
| music_vol=music_vol, | |
| temp_dir=temp_dir | |
| ) | |
| if isinstance(result, str) and result.startswith("Error"): | |
| logger.error(result) | |
| return result, None | |
| else: | |
| logger.info(f"Merge successful. Output saved at: {result}") | |
| # Return appropriate output based on file type | |
| if result.endswith(".mp3"): | |
| return None, result # Audio output | |
| else: | |
| return result, None # Video output | |
| except Exception as e: | |
| error_msg = f"Error processing files: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return error_msg, None | |
| finally: | |
| # Cleanup is handled in merge_videos_and_audios | |
| pass | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| logger.info(f"Environment: {os.environ.get('HUGGINGFACE_SPACES', 'Not in HF Spaces')}") | |
| logger.info(f"Arguments: {sys.argv}") | |
| # Check ports in a wider range | |
| default_port = 7860 | |
| ports_to_try = list(range(default_port, default_port + 11)) # 7860 to 7870 | |
| selected_port = None | |
| for port in ports_to_try: | |
| logger.info(f"Checking if port {port} is available") | |
| if check_port(port): | |
| logger.info(f"Port {port} is available") | |
| selected_port = port | |
| break | |
| else: | |
| logger.warning(f"Port {port} is already in use") | |
| if selected_port is None: | |
| logger.error("No available ports found in range 7860-7870") | |
| sys.exit(1) | |
| logger.info("Launching Gradio Blocks interface") | |
| with gr.Blocks(title="Video and Audio Merger API") as app: | |
| gr.Markdown("## Video and Audio Merger API") | |
| gr.Markdown("Upload at least 2 files total (videos, audios, or a combination) to merge them.") | |
| gr.Markdown("For API usage, send binary files via multipart/form-data. Name videos as file1.mp4, file2.mp4, etc., and audios as audio1.mp3, audio2.mp3, etc.") | |
| with gr.Row(): | |
| file_input = gr.File(label="Upload Files (Videos: .mp4, Audios: .mp3/.wav)", type="binary", file_count="multiple") | |
| with gr.Row(): | |
| orig_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=1.0, step=0.05, label="Original Video Audio Volume") | |
| music_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=0.5, step=0.05, label="Background Audio Volume") | |
| output_video = gr.Video(label="Merged Video (if videos provided)") | |
| output_audio = gr.Audio(label="Merged Audio (if only audios provided)") | |
| merge_button = gr.Button("Merge Files") | |
| merge_button.click( | |
| fn=gradio_merge_files, | |
| inputs=[file_input, orig_vol_input, music_vol_input], | |
| outputs=[output_video, output_audio] | |
| ) | |
| try: | |
| logger.info(f"Attempting to launch Gradio app on port {selected_port}") | |
| app.queue(api_open=True) | |
| app.launch(server_port=selected_port, share=True) | |
| logger.info(f"Gradio app launched successfully on port {app.server_port}") | |
| except Exception as e: | |
| error_msg = f"Failed to launch Gradio interface: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| raise |