Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, Form | |
| from fastapi.responses import FileResponse | |
| import tempfile | |
| import shutil | |
| import os | |
| import logging | |
| import traceback | |
| from moviepy.editor import ( | |
| VideoFileClip, | |
| concatenate_videoclips, | |
| AudioFileClip, | |
| CompositeAudioClip, | |
| concatenate_audioclips, | |
| ) | |
| import numpy as np | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Safe audio clip loader | |
| def safe_audio_clip(path): | |
| try: | |
| clip = AudioFileClip(path) | |
| if clip.reader is None or clip.duration is None or clip.duration <= 0: | |
| clip.close() | |
| raise ValueError(f"Invalid or empty audio file: {os.path.basename(path)}") | |
| return clip | |
| except Exception as e: | |
| raise ValueError(f"Failed to load audio file '{os.path.basename(path)}': {str(e)}") | |
| def merge_videos_and_audios( | |
| video_files, audio_files, orig_vol=1.0, music_vol=0.5, temp_dir=None | |
| ): | |
| try: | |
| output_path = os.path.join( | |
| temp_dir, "merged_output.mp4" if video_files else "merged_output.mp3" | |
| ) | |
| # If no videos, just concatenate audios | |
| if not video_files and audio_files: | |
| audio_clips = [] | |
| for a in audio_files: | |
| try: | |
| audio_clip = safe_audio_clip(a) | |
| audio_clips.append(audio_clip) | |
| except Exception as e: | |
| logger.warning(f"Skipping invalid audio file: {a} → {str(e)}") | |
| if not audio_clips: | |
| raise ValueError("No valid audio files to merge.") | |
| final_audio = concatenate_audioclips(audio_clips) | |
| final_audio.write_audiofile(output_path) | |
| for clip in audio_clips: | |
| clip.close() | |
| final_audio.close() | |
| return output_path | |
| # If videos are present: | |
| video_clips = [VideoFileClip(v) for v in video_files] | |
| final_video = concatenate_videoclips(video_clips, method="compose") | |
| if audio_files: | |
| audio_clips = [] | |
| for a in audio_files: | |
| try: | |
| audio_clip = safe_audio_clip(a) | |
| audio_clips.append(audio_clip) | |
| except Exception as e: | |
| logger.warning(f"Skipping invalid audio file: {a} → {str(e)}") | |
| if audio_clips: | |
| final_audio = concatenate_audioclips(audio_clips).volumex(music_vol) | |
| original_audio = ( | |
| final_video.audio.volumex(orig_vol) | |
| if final_video.audio is not None | |
| else None | |
| ) | |
| if original_audio: | |
| composite_audio = CompositeAudioClip([original_audio, final_audio]) | |
| else: | |
| composite_audio = final_audio | |
| final_video = final_video.set_audio(composite_audio) | |
| for clip in audio_clips: | |
| clip.close() | |
| else: | |
| logger.warning("No valid audio files found. Using only video audio (if present).") | |
| if final_video.audio is not None: | |
| final_video = final_video.volumex(orig_vol) | |
| else: | |
| # No audio files provided | |
| if final_video.audio is not None: | |
| final_video = final_video.volumex(orig_vol) | |
| final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") | |
| for clip in video_clips: | |
| clip.close() | |
| final_video.close() | |
| return output_path | |
| except Exception as e: | |
| error_msg = f"Error during merge: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return error_msg | |
| app = FastAPI() | |
| async def merge_endpoint( | |
| files: list[UploadFile] = File(...), | |
| orig_vol: float = Form(1.0), | |
| music_vol: float = Form(0.5), | |
| ): | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| saved_files = [] | |
| for uploaded_file in files: | |
| file_path = os.path.join(temp_dir, uploaded_file.filename) | |
| with open(file_path, "wb") as out_file: | |
| content = await uploaded_file.read() | |
| out_file.write(content) | |
| saved_files.append(file_path) | |
| video_files = [f for f in saved_files if f.lower().endswith(".mp4")] | |
| audio_files = [ | |
| f | |
| for f in saved_files | |
| if f.lower().endswith((".mp3", ".wav", ".aac", ".m4a", ".ogg")) | |
| ] | |
| if len(saved_files) < 2: | |
| return {"error": "Please upload at least 2 files (videos or audios)."} | |
| result_path = merge_videos_and_audios( | |
| video_files, audio_files, orig_vol, music_vol, temp_dir | |
| ) | |
| if isinstance(result_path, str) and result_path.startswith("Error"): | |
| return {"error": result_path} | |
| media_type = ( | |
| "video/mp4" if result_path.lower().endswith(".mp4") else "audio/mpeg" | |
| ) | |
| filename = os.path.basename(result_path) | |
| return FileResponse(result_path, media_type=media_type, filename=filename) | |
| finally: | |
| shutil.rmtree(temp_dir) | |
| # Optional: log space public URL | |
| def log_api_url(): | |
| url = os.getenv("SPACE_PUBLIC_URL") | |
| if url: | |
| logger.info(f"API is available at: {url}/merge") | |
| else: | |
| logger.info("SPACE_PUBLIC_URL environment variable not found") | |
| log_api_url() | |