Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import tempfile | |
| import os | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip, concatenate_audioclips | |
| import logging | |
| import sys | |
| import traceback | |
| import socket | |
| # Set up logging to debug issues | |
| logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)]) | |
| logger = logging.getLogger(__name__) | |
| # --- Functions --- | |
| def check_port(port): | |
| """ | |
| Check if a port is available. | |
| Returns True if the port is free, False if it's in use. | |
| """ | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| try: | |
| s.bind(("0.0.0.0", port)) | |
| return True | |
| except socket.error: | |
| return False | |
| def merge_videos_and_audios(video_files, audio_files=None, orig_vol=1.0, music_vol=0.5): | |
| """ | |
| Merge multiple video clips into one and optionally add a concatenated audio track. | |
| If audio_files is None, the video will retain its original audio (if any). | |
| Volumes for original video audio and background audio (if provided) can be controlled. | |
| Uses method='compose' to avoid cropping in landscape videos. | |
| Returns the path to the merged video file. | |
| """ | |
| try: | |
| logger.info(f"Starting merge with {len(video_files)} video files and {len(audio_files) if audio_files else 0} audio files") | |
| # Create a temporary output path | |
| temp_dir = tempfile.mkdtemp() | |
| output_path = os.path.join(temp_dir, "merged_output.mp4") | |
| # Load and concatenate video clips | |
| video_clips = [VideoFileClip(video) for video in video_files] | |
| final_video_clip = concatenate_videoclips(video_clips, method='compose') | |
| # Determine final video duration | |
| video_duration = final_video_clip.duration or sum(clip.duration for clip in video_clips) | |
| logger.info(f"Total video duration: {video_duration} seconds") | |
| # Handle audio (optional) | |
| if audio_files: | |
| logger.info("Processing audio files") | |
| # Load and concatenate audio clips | |
| audio_clips = [AudioFileClip(audio) for audio in audio_files] | |
| concatenated_audio = concatenate_audioclips(audio_clips) | |
| # Adjust concatenated audio duration to match video duration (trim or loop) | |
| if concatenated_audio.duration > video_duration: | |
| concatenated_audio = concatenated_audio.subclip(0, video_duration) | |
| elif concatenated_audio.duration < video_duration: | |
| # Loop the audio to match video duration | |
| concatenated_audio = concatenated_audio.fx(lambda clip: clip.loop(duration=video_duration)) | |
| # Apply volume to concatenated audio | |
| concatenated_audio = concatenated_audio.volumex(music_vol) | |
| # Get original video audio (if any) and apply volume | |
| original_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None | |
| # Composite the audio tracks | |
| if original_audio: | |
| final_audio = CompositeAudioClip([original_audio, concatenated_audio]) | |
| else: | |
| final_audio = concatenated_audio | |
| else: | |
| logger.info("No audio files provided; using original video audio if available") | |
| # If no audio files provided, retain original video audio (if any) | |
| final_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None | |
| # Set the audio to the final video | |
| final_video_clip = final_video_clip.set_audio(final_audio) | |
| # Write the final video | |
| logger.info(f"Writing output video to {output_path}") | |
| final_video_clip.write_videofile(output_path, codec="libx264", fps=30, audio_codec="aac", ffmpeg_params=["-preset", "fast"]) | |
| # Close resources | |
| final_video_clip.close() | |
| for clip in video_clips: | |
| clip.close() | |
| if audio_files: | |
| for clip in audio_clips: | |
| clip.close() | |
| concatenated_audio.close() | |
| logger.info("Merge completed successfully") | |
| return output_path | |
| except Exception as e: | |
| error_msg = f"Error during video and audio merging: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return error_msg | |
| # --- Gradio App Using Blocks --- | |
| def gradio_merge_videos(video_files, audio_files, orig_vol, music_vol): | |
| """ | |
| Gradio endpoint to merge videos and optionally add audio. | |
| Args: | |
| video_files: List of video file paths | |
| audio_files: List of audio file paths (optional) | |
| orig_vol: Volume for original video audio (0.0 to 1.0) | |
| music_vol: Volume for background audio (0.0 to 1.0) | |
| Returns: | |
| Path to the merged video file or error message | |
| """ | |
| logger.info(f"Received {len(video_files)} video files and {len(audio_files) if audio_files else 0} audio files") | |
| if len(video_files) <= 1: | |
| error_msg = "Error: Please upload more than 1 video file." | |
| logger.error(error_msg) | |
| return error_msg | |
| result = merge_videos_and_audios( | |
| video_files, audio_files, orig_vol=orig_vol, music_vol=music_vol | |
| ) | |
| if isinstance(result, str) and result.startswith("Error"): | |
| logger.error(result) | |
| return result | |
| else: | |
| logger.info(f"Merge successful. Output saved at: {result}") | |
| return result # Gradio will serve this as a downloadable video | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| logger.info(f"Environment: {os.environ.get('HUGGINGFACE_SPACES', 'Not in HF Spaces')}") | |
| logger.info(f"Arguments: {sys.argv}") | |
| # Check ports in a wider range | |
| default_port = 7860 | |
| ports_to_try = list(range(default_port, default_port + 11)) # 7860 to 7870 | |
| selected_port = None | |
| for port in ports_to_try: | |
| logger.info(f"Checking if port {port} is available") | |
| if check_port(port): | |
| logger.info(f"Port {port} is available") | |
| selected_port = port | |
| break | |
| else: | |
| logger.warning(f"Port {port} is already in use") | |
| if selected_port is None: | |
| logger.error("No available ports found in range 7860-7870") | |
| sys.exit(1) | |
| logger.info("Launching Gradio Blocks interface") | |
| with gr.Blocks(title="Video and Audio Merger API") as app: | |
| gr.Markdown("## Video and Audio Merger API") | |
| gr.Markdown("Upload more than 1 video file to merge them into a single video. Optionally upload audio files to add as background audio.") | |
| with gr.Row(): | |
| video_input = gr.File(label="Video Files (more than 1)", type="filepath", file_count="multiple") | |
| audio_input = gr.File(label="Audio Files (Optional)", type="filepath", file_count="multiple") | |
| with gr.Row(): | |
| orig_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=1.0, step=0.05, label="Original Video Audio Volume") | |
| music_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=0.5, step=0.05, label="Background Audio Volume") | |
| output_video = gr.Video(label="Merged Video") | |
| merge_button = gr.Button("Merge Videos and Audios") | |
| merge_button.click( | |
| fn=gradio_merge_videos, | |
| inputs=[video_input, audio_input, orig_vol_input, music_vol_input], | |
| outputs=output_video | |
| ) | |
| try: | |
| logger.info(f"Attempting to launch Gradio app on port {selected_port}") | |
| # Set share=True to bypass localhost check | |
| app.launch(server_port=selected_port, share=True) | |
| logger.info(f"Gradio app launched successfully on port {app.server_port}") | |
| except Exception as e: | |
| error_msg = f"Failed to launch Gradio interface: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| raise |