Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import tempfile | |
| from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks | |
| from fastapi.responses import FileResponse | |
| from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip, concatenate_videoclips | |
| app = FastAPI() | |
| def merge_videos_and_audios(video_paths, audio_paths, output_path): | |
| # Load video clips | |
| video_clips = [VideoFileClip(vp) for vp in video_paths] | |
| # Load audio clips (skip invalid files) | |
| audio_clips = [] | |
| for ap in audio_paths: | |
| try: | |
| audio_clip = AudioFileClip(ap) | |
| audio_clips.append(audio_clip) | |
| except Exception as e: | |
| print(f"Warning: Could not load audio file {ap}: {e}") | |
| # Combine audio clips into one composite audio clip | |
| final_audio = CompositeAudioClip(audio_clips) if audio_clips else None | |
| # Concatenate video clips | |
| final_video = concatenate_videoclips(video_clips) | |
| # Set audio if exists | |
| if final_audio: | |
| final_video = final_video.set_audio(final_audio) | |
| # Write output video file | |
| final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") | |
| # Close clips | |
| final_video.close() | |
| for vc in video_clips: | |
| vc.close() | |
| for ac in audio_clips: | |
| ac.close() | |
| async def merge_endpoint( | |
| video_files: list[UploadFile] = File(...), | |
| audio_files: list[UploadFile] = File(None), | |
| background_tasks: BackgroundTasks = None | |
| ): | |
| # Create temporary directory for uploaded files | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| # Save uploaded video files to temp_dir | |
| video_paths = [] | |
| for vf in video_files: | |
| video_path = os.path.join(temp_dir, vf.filename) | |
| with open(video_path, "wb") as f: | |
| content = await vf.read() | |
| f.write(content) | |
| video_paths.append(video_path) | |
| # Save uploaded audio files to temp_dir (if any) | |
| audio_paths = [] | |
| if audio_files: | |
| for af in audio_files: | |
| audio_path = os.path.join(temp_dir, af.filename) | |
| with open(audio_path, "wb") as f: | |
| content = await af.read() | |
| f.write(content) | |
| audio_paths.append(audio_path) | |
| # Prepare permanent output directory | |
| output_dir = os.path.abspath("output") | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Define output video path | |
| output_file = os.path.join(output_dir, "merged_output.mp4") | |
| # Merge videos and audios | |
| merge_videos_and_audios(video_paths, audio_paths, output_file) | |
| # Add background task to delete temp_dir after response sent | |
| if background_tasks: | |
| background_tasks.add_task(shutil.rmtree, temp_dir) | |
| else: | |
| # If no background_tasks support, you can delete later manually or omit | |
| pass | |
| # Return merged video file | |
| return FileResponse(output_file, media_type="video/mp4", filename="merged_output.mp4") | |
| except Exception as e: | |
| # Cleanup temp_dir if exception occurs | |
| shutil.rmtree(temp_dir) | |
| return {"error": str(e)} | |