testrender / app.py
sam12345324's picture
Rename app (3).py to app.py
0fb853c verified
import gradio as gr
import tempfile
import os
import re
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip, concatenate_audioclips
import numpy as np
import logging
import sys
import traceback
import socket
import shutil
# Set up logging to debug issues
logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)
# --- Functions ---
def check_port(port):
"""
Check if a port is available.
Returns True if the port is free, False if it's in use.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("0.0.0.0", port))
return True
except socket.error:
return False
def sort_files_by_index(file_list, prefix_pattern):
"""
Sort files based on their numerical index in the filename.
Args:
file_list: List of file paths (e.g., ['file2.mp4', 'file1.mp4', 'file3.mp4'])
prefix_pattern: Regex pattern to match the prefix and number (e.g., r'file(\d+)\.mp4')
Returns:
Sorted list of file paths
"""
if not file_list:
return []
def extract_index(filename):
match = re.match(prefix_pattern, os.path.basename(filename))
if match:
return int(match.group(1))
return float('inf') # Invalid filenames go to the end
sorted_files = sorted(file_list, key=extract_index)
logger.info(f"Sorted files: {sorted_files}")
return sorted_files
def get_indices_string(file_list, prefix_pattern):
"""
Extract the numerical indices from filenames and return as a string.
Args:
file_list: List of file paths
prefix_pattern: Regex pattern to match the prefix and number
Returns:
String of indices (e.g., '123' for ['file1.mp4', 'file2.mp4', 'file3.mp4'])
"""
if not file_list:
return ""
indices = []
for filename in file_list:
match = re.match(prefix_pattern, os.path.basename(filename))
if match:
indices.append(match.group(1))
return "".join(indices)
def split_files_by_extension(all_files):
"""
Split a list of files into videos (.mp4) and audios (.mp3, .wav) based on extensions.
Args:
all_files: List of file paths (e.g., ['file1.mp4', 'audio1.mp3', 'file2.mp4', 'audio2.mp3'])
Returns:
Tuple of (video_files, audio_files)
"""
video_files = []
audio_files = []
for file_path in all_files:
filename = os.path.basename(file_path).lower()
if filename.endswith('.mp4'):
video_files.append(file_path)
elif filename.endswith(('.mp3', '.wav')):
audio_files.append(file_path)
else:
logger.warning(f"Unsupported file extension for {file_path}; skipping")
return video_files, audio_files
def trim_silence(audio_clip, threshold=0.005):
"""
Trim silence from the start and end of an audio clip.
Args:
audio_clip: AudioFileClip object
threshold: Amplitude threshold below which audio is considered silent
Returns:
Trimmed AudioFileClip
"""
try:
# Get audio data as numpy array
samples = audio_clip.to_soundarray(fps=44100)
# Compute amplitude (RMS)
if len(samples.shape) > 1: # Stereo audio
amplitudes = np.sqrt(np.mean(samples**2, axis=1))
else: # Mono audio
amplitudes = np.sqrt(samples**2)
# Find non-silent regions
non_silent = amplitudes > threshold
if not np.any(non_silent):
logger.warning("Audio clip is completely silent; returning original clip")
return audio_clip
# Find start and end indices
start_idx = np.argmax(non_silent)
end_idx = len(non_silent) - np.argmax(non_silent[::-1])
# Convert indices to time (seconds)
start_time = start_idx / 44100
end_time = end_idx / 44100
# Ensure the trimmed duration is reasonable
if end_time <= start_time:
logger.warning("Trimmed duration is zero or negative; returning original clip")
return audio_clip
# Trim the audio
trimmed_audio = audio_clip.subclip(start_time, end_time)
logger.info(f"Trimmed audio from {start_time:.2f}s to {end_time:.2f}s (original duration: {audio_clip.duration:.2f}s)")
return trimmed_audio
except Exception as e:
logger.error(f"Error trimming silence: {str(e)}")
return audio_clip
def merge_videos_and_audios(video_files=None, audio_files=None, orig_vol=1.0, music_vol=0.5, temp_dir=None):
"""
Merge multiple video clips and/or audio clips based on inputs provided.
- If only video_files: Merge videos, retaining their original audio.
- If only audio_files: Merge audio files into a single audio file.
- If both: Merge videos and overlay the concatenated audio.
Files are sorted by numerical index in their filenames (e.g., file1.mp4, file2.mp4).
Args:
video_files: List of video file paths (optional)
audio_files: List of audio file paths (optional)
orig_vol: Volume for original video audio (0.0 to 1.0)
music_vol: Volume for background audio (0.0 to 1.0)
temp_dir: Temporary directory to clean up (optional)
Returns:
Path to the merged file (video or audio) or error message.
"""
try:
# Sort files by numerical index
video_files = sort_files_by_index(video_files, r'file(\d+)\.mp4')
audio_files = sort_files_by_index(audio_files, r'audio(\d+)\.(mp3|wav)')
# Get indices for output naming
video_indices = get_indices_string(video_files, r'file(\d+)\.mp4')
audio_indices = get_indices_string(audio_files, r'audio(\d+)\.(mp3|wav)')
# Ensure at least two files are provided (videos, audios, or combination)
video_count = len(video_files) if video_files else 0
audio_count = len(audio_files) if audio_files else 0
total_files = video_count + audio_count
logger.info(f"Starting merge with {video_count} video files and {audio_count} audio files")
if total_files < 2:
error_msg = "Error: Please upload at least 2 files total (videos, audios, or a combination)."
logger.error(error_msg)
return error_msg
# Create a temporary output path (use provided temp_dir if available)
if temp_dir is None:
temp_dir = tempfile.mkdtemp()
output_dir = temp_dir
# Case 1: Audio only
if audio_count >= 2 and video_count == 0:
output_filename = f"combined_audio_{audio_indices}.mp3"
output_path = os.path.join(output_dir, output_filename)
logger.info("Merging audio files only")
# Load, normalize, and trim audio clips
audio_clips = []
for audio in audio_files:
clip = AudioFileClip(audio).set_fps(44100) # Normalize sample rate
clip = clip.to_stereo() if clip.nchannels == 1 else clip # Convert mono to stereo
logger.info(f"Original audio duration for {audio}: {clip.duration:.2f}s, channels: {clip.nchannels}")
trimmed_clip = trim_silence(clip)
if trimmed_clip.duration > 0:
audio_clips.append(trimmed_clip)
else:
logger.warning(f"Skipping audio file {audio} as it has zero duration after trimming")
# Check if we have enough clips to concatenate
if len(audio_clips) < 2:
error_msg = "Error: Fewer than 2 audio clips available after trimming (clips may be silent or too short)."
logger.error(error_msg)
for clip in audio_clips:
clip.close()
return error_msg
# Log durations after trimming
for i, clip in enumerate(audio_clips):
logger.info(f"Trimmed audio {i+1} duration: {clip.duration:.2f}s, channels: {clip.nchannels}")
# Concatenate audio clips
logger.info(f"Attempting to concatenate {len(audio_clips)} audio clips")
final_audio_clip = concatenate_audioclips(audio_clips)
logger.info(f"Concatenated audio duration: {final_audio_clip.duration:.2f}s, channels: {final_audio_clip.nchannels}")
# Verify concatenated duration
expected_duration = sum(clip.duration for clip in audio_clips)
if abs(final_audio_clip.duration - expected_duration) > 0.1:
logger.warning(f"Concatenated duration ({final_audio_clip.duration:.2f}s) does not match expected duration ({expected_duration:.2f}s)")
# Write the final audio
logger.info(f"Writing output audio to {output_path}")
final_audio_clip.write_audiofile(output_path, codec="mp3")
# Close resources
final_audio_clip.close()
for clip in audio_clips:
clip.close()
logger.info("Audio merge completed successfully")
return output_path
# Case 2: Video only or Video with Audio
if audio_indices:
output_filename = f"combined_video_{video_indices}_with_audio_{audio_indices}.mp4"
else:
output_filename = f"combined_video_{video_indices}.mp4"
output_path = os.path.join(output_dir, output_filename)
# Load and concatenate video clips
video_clips = [VideoFileClip(video) for video in video_files]
final_video_clip = concatenate_videoclips(video_clips, method='compose')
# Determine final video duration
video_duration = final_video_clip.duration or sum(clip.duration for clip in video_clips)
logger.info(f"Total video duration: {video_duration:.2f}s")
# Handle audio (if provided)
if audio_files:
logger.info("Processing audio files")
# Load, normalize, and trim audio clips
audio_clips = []
for audio in audio_files:
clip = AudioFileClip(audio).set_fps(44100) # Normalize sample rate
clip = clip.to_stereo() if clip.nchannels == 1 else clip # Convert mono to stereo
logger.info(f"Original audio duration for {audio}: {clip.duration:.2f}s, channels: {clip.nchannels}")
trimmed_clip = trim_silence(clip)
if trimmed_clip.duration > 0:
audio_clips.append(trimmed_clip)
else:
logger.warning(f"Skipping audio file {audio} as it has zero duration after trimming")
# Log durations after trimming
for i, clip in enumerate(audio_clips):
logger.info(f"Trimmed audio {i+1} duration: {clip.duration:.2f}s, channels: {clip.nchannels}")
if not audio_clips:
logger.warning("No valid audio clips after trimming; using original video audio only")
final_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None
else:
# Concatenate audio clips
logger.info(f"Attempting to concatenate {len(audio_clips)} audio clips")
concatenated_audio = concatenate_audioclips(audio_clips)
logger.info(f"Concatenated audio duration: {concatenated_audio.duration:.2f}s, channels: {concatenated_audio.nchannels}")
# Verify concatenated duration
expected_duration = sum(clip.duration for clip in audio_clips)
if abs(concatenated_audio.duration - expected_duration) > 0.1:
logger.warning(f"Concatenated duration ({concatenated_audio.duration:.2f}s) does not match expected duration ({expected_duration:.2f}s)")
# Adjust concatenated audio duration to match video duration (trim or loop)
if concatenated_audio.duration > video_duration:
concatenated_audio = concatenated_audio.subclip(0, video_duration)
logger.info(f"Trimmed concatenated audio to match video duration: {concatenated_audio.duration:.2f}s")
elif concatenated_audio.duration < video_duration:
# Loop the audio to match video duration
concatenated_audio = concatenated_audio.fx(lambda clip: clip.loop(duration=video_duration))
logger.info(f"Looped concatenated audio to match video duration: {concatenated_audio.duration:.2f}s")
# Apply volume to concatenated audio
concatenated_audio = concatenated_audio.volumex(music_vol)
# Get original video audio (if any) and apply volume
original_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None
# Composite the audio tracks
if original_audio:
final_audio = CompositeAudioClip([original_audio, concatenated_audio])
else:
final_audio = concatenated_audio
else:
logger.info("No audio files provided; using original video audio if available")
# If no audio files provided, retain original video audio (if any)
final_audio = final_video_clip.audio.volumex(orig_vol) if final_video_clip.audio else None
# Set the audio to the final video
final_video_clip = final_video_clip.set_audio(final_audio)
# Write the final video
logger.info(f"Writing output video to {output_path}")
final_video_clip.write_videofile(output_path, codec="libx264", fps=30, audio_codec="aac", ffmpeg_params=["-preset", "fast"])
# Close resources
final_video_clip.close()
for clip in video_clips:
clip.close()
if audio_files and audio_clips:
for clip in audio_clips:
clip.close()
if 'concatenated_audio' in locals():
concatenated_audio.close()
logger.info("Video merge completed successfully")
return output_path
except Exception as e:
error_msg = f"Error during merging: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return error_msg
finally:
# Clean up temporary directory if it was created in this function
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to clean up temporary directory {temp_dir}: {str(e)}")
# --- Gradio App Using Blocks ---
def gradio_merge_files(file_binaries, orig_vol, music_vol, file_names=None):
"""
Gradio endpoint to merge videos and/or audio from binary file uploads.
Args:
file_binaries: List of binary data (bytes objects)
orig_vol: Volume for original video audio (0.0 to 1.0)
music_vol: Volume for background audio (0.0 to 1.0)
file_names: List of original filenames (passed separately for API calls)
Returns:
Path to the merged file (video or audio) or error message
"""
try:
logger.info(f"Received {len(file_binaries) if file_binaries else 0} binary files")
if not file_binaries or len(file_binaries) < 2:
error_msg = "Error: Please upload at least 2 files."
logger.error(error_msg)
return error_msg, None
# Create a temporary directory to store uploaded files
temp_dir = tempfile.mkdtemp()
all_files = []
# When called via the UI, Gradio provides binary data but not filenames directly.
# When called via API (e.g., n8n), we need to get filenames from the request.
# For UI testing, infer filenames based on index; for API, use provided file_names.
if file_names is None:
# Fallback for UI: assign temporary filenames (not ideal for production)
file_names = []
for idx, binary in enumerate(file_binaries):
ext = ".mp4" if idx % 2 == 0 else ".mp3" # Dummy assignment for UI testing
file_names.append(f"temp_file_{idx}{ext}")
logger.warning("No file names provided; using temporary names for UI testing. For API calls, pass file_names.")
if len(file_names) != len(file_binaries):
error_msg = f"Error: Mismatch between file binaries ({len(file_binaries)}) and file names ({len(file_names)})."
logger.error(error_msg)
return error_msg, None
# Save each binary file to the temporary directory
for binary, filename in zip(file_binaries, file_names):
if binary is None:
logger.warning(f"Skipping None binary data for file {filename}")
continue
# Validate filename
original_filename = os.path.basename(filename)
if not re.match(r'file\d+\.mp4', original_filename, re.IGNORECASE) and \
not re.match(r'audio\d+\.(mp3|wav)', original_filename, re.IGNORECASE):
logger.warning(f"Filename {original_filename} does not match expected pattern; skipping")
continue
# Create a temporary file path
temp_file_path = os.path.join(temp_dir, original_filename)
# Write the binary data to the temporary file
with open(temp_file_path, 'wb') as temp_file:
temp_file.write(binary)
all_files.append(temp_file_path)
logger.info(f"Saved uploaded file to {temp_file_path}")
if len(all_files) < 2:
error_msg = "Error: Fewer than 2 valid files after filtering."
logger.error(error_msg)
return error_msg, None
# Split files into videos and audios based on extensions
video_files, audio_files = split_files_by_extension(all_files)
logger.info(f"Identified {len(video_files)} video files: {video_files}")
logger.info(f"Identified {len(audio_files)} audio files: {audio_files}")
result = merge_videos_and_audios(
video_files=video_files,
audio_files=audio_files,
orig_vol=orig_vol,
music_vol=music_vol,
temp_dir=temp_dir
)
if isinstance(result, str) and result.startswith("Error"):
logger.error(result)
return result, None
else:
logger.info(f"Merge successful. Output saved at: {result}")
# Return appropriate output based on file type
if result.endswith(".mp3"):
return None, result # Audio output
else:
return result, None # Video output
except Exception as e:
error_msg = f"Error processing files: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
return error_msg, None
finally:
# Cleanup is handled in merge_videos_and_audios
pass
# --- Main Execution ---
if __name__ == "__main__":
logger.info(f"Environment: {os.environ.get('HUGGINGFACE_SPACES', 'Not in HF Spaces')}")
logger.info(f"Arguments: {sys.argv}")
# Check ports in a wider range
default_port = 7860
ports_to_try = list(range(default_port, default_port + 11)) # 7860 to 7870
selected_port = None
for port in ports_to_try:
logger.info(f"Checking if port {port} is available")
if check_port(port):
logger.info(f"Port {port} is available")
selected_port = port
break
else:
logger.warning(f"Port {port} is already in use")
if selected_port is None:
logger.error("No available ports found in range 7860-7870")
sys.exit(1)
logger.info("Launching Gradio Blocks interface")
with gr.Blocks(title="Video and Audio Merger API") as app:
gr.Markdown("## Video and Audio Merger API")
gr.Markdown("Upload at least 2 files total (videos, audios, or a combination) to merge them.")
gr.Markdown("For API usage, send binary files via multipart/form-data. Name videos as file1.mp4, file2.mp4, etc., and audios as audio1.mp3, audio2.mp3, etc.")
with gr.Row():
file_input = gr.File(label="Upload Files (Videos: .mp4, Audios: .mp3/.wav)", type="binary", file_count="multiple")
with gr.Row():
orig_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=1.0, step=0.05, label="Original Video Audio Volume")
music_vol_input = gr.Slider(minimum=0.0, maximum=1.0, value=0.5, step=0.05, label="Background Audio Volume")
output_video = gr.Video(label="Merged Video (if videos provided)")
output_audio = gr.Audio(label="Merged Audio (if only audios provided)")
merge_button = gr.Button("Merge Files")
merge_button.click(
fn=gradio_merge_files,
inputs=[file_input, orig_vol_input, music_vol_input],
outputs=[output_video, output_audio]
)
try:
logger.info(f"Attempting to launch Gradio app on port {selected_port}")
app.queue(api_open=True)
app.launch(server_port=selected_port, share=True)
logger.info(f"Gradio app launched successfully on port {app.server_port}")
except Exception as e:
error_msg = f"Failed to launch Gradio interface: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
raise