from fastapi import FastAPI, UploadFile, File, Form from fastapi.responses import FileResponse import tempfile import shutil import os import logging import traceback from moviepy.editor import ( VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip, concatenate_audioclips, ) import numpy as np # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Safe audio clip loader def safe_audio_clip(path): try: clip = AudioFileClip(path) if clip.reader is None or clip.duration is None or clip.duration <= 0: clip.close() raise ValueError(f"Invalid or empty audio file: {os.path.basename(path)}") return clip except Exception as e: raise ValueError(f"Failed to load audio file '{os.path.basename(path)}': {str(e)}") def merge_videos_and_audios( video_files, audio_files, orig_vol=1.0, music_vol=0.5, temp_dir=None ): try: output_path = os.path.join( temp_dir, "merged_output.mp4" if video_files else "merged_output.mp3" ) # If no videos, just concatenate audios if not video_files and audio_files: audio_clips = [] for a in audio_files: try: audio_clip = safe_audio_clip(a) audio_clips.append(audio_clip) except Exception as e: logger.warning(f"Skipping invalid audio file: {a} → {str(e)}") if not audio_clips: raise ValueError("No valid audio files to merge.") final_audio = concatenate_audioclips(audio_clips) final_audio.write_audiofile(output_path) for clip in audio_clips: clip.close() final_audio.close() return output_path # If videos are present: video_clips = [VideoFileClip(v) for v in video_files] final_video = concatenate_videoclips(video_clips, method="compose") if audio_files: audio_clips = [] for a in audio_files: try: audio_clip = safe_audio_clip(a) audio_clips.append(audio_clip) except Exception as e: logger.warning(f"Skipping invalid audio file: {a} → {str(e)}") if audio_clips: final_audio = concatenate_audioclips(audio_clips).volumex(music_vol) original_audio = ( final_video.audio.volumex(orig_vol) if final_video.audio is not None else None ) if original_audio: composite_audio = CompositeAudioClip([original_audio, final_audio]) else: composite_audio = final_audio final_video = final_video.set_audio(composite_audio) for clip in audio_clips: clip.close() else: logger.warning("No valid audio files found. Using only video audio (if present).") if final_video.audio is not None: final_video = final_video.volumex(orig_vol) else: # No audio files provided if final_video.audio is not None: final_video = final_video.volumex(orig_vol) final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") for clip in video_clips: clip.close() final_video.close() return output_path except Exception as e: error_msg = f"Error during merge: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) return error_msg app = FastAPI() @app.post("/merge") async def merge_endpoint( files: list[UploadFile] = File(...), orig_vol: float = Form(1.0), music_vol: float = Form(0.5), ): temp_dir = tempfile.mkdtemp() try: saved_files = [] for uploaded_file in files: file_path = os.path.join(temp_dir, uploaded_file.filename) with open(file_path, "wb") as out_file: content = await uploaded_file.read() out_file.write(content) saved_files.append(file_path) video_files = [f for f in saved_files if f.lower().endswith(".mp4")] audio_files = [ f for f in saved_files if f.lower().endswith((".mp3", ".wav", ".aac", ".m4a", ".ogg")) ] if len(saved_files) < 2: return {"error": "Please upload at least 2 files (videos or audios)."} result_path = merge_videos_and_audios( video_files, audio_files, orig_vol, music_vol, temp_dir ) if isinstance(result_path, str) and result_path.startswith("Error"): return {"error": result_path} media_type = ( "video/mp4" if result_path.lower().endswith(".mp4") else "audio/mpeg" ) filename = os.path.basename(result_path) return FileResponse(result_path, media_type=media_type, filename=filename) finally: shutil.rmtree(temp_dir) # Optional: log space public URL def log_api_url(): url = os.getenv("SPACE_PUBLIC_URL") if url: logger.info(f"API is available at: {url}/merge") else: logger.info("SPACE_PUBLIC_URL environment variable not found") log_api_url()