File size: 5,450 Bytes
a7a9f00
2bad8b6
a3c7146
 
 
a7a9f00
c467f81
d17416c
 
 
 
 
 
 
c467f81
 
 
 
 
 
2bad8b6
b95ac21
2bad8b6
 
 
 
 
 
 
 
b95ac21
d17416c
 
 
c467f81
d17416c
 
 
2bad8b6
c467f81
 
2bad8b6
 
 
 
 
 
 
 
 
 
 
c467f81
 
 
 
 
 
b95ac21
c467f81
2bad8b6
c467f81
2bad8b6
c467f81
2bad8b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c467f81
2bad8b6
 
 
b95ac21
c467f81
2bad8b6
d17416c
 
b95ac21
c467f81
2bad8b6
c467f81
 
 
2bad8b6
c467f81
b95ac21
c467f81
 
 
 
a3c7146
 
 
 
 
 
 
 
 
 
 
 
a7a9f00
c467f81
 
a7a9f00
a3c7146
c467f81
b95ac21
c467f81
2bad8b6
 
 
 
 
b95ac21
a7a9f00
2bad8b6
b95ac21
d17416c
 
 
b95ac21
c467f81
2bad8b6
b95ac21
2bad8b6
 
 
c467f81
2bad8b6
c467f81
b95ac21
a3c7146
c467f81
 
2bad8b6
c467f81
 
 
 
 
 
 
b95ac21
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import FileResponse
import tempfile
import shutil
import os
import logging
import traceback
from moviepy.editor import (
    VideoFileClip,
    concatenate_videoclips,
    AudioFileClip,
    CompositeAudioClip,
    concatenate_audioclips,
)
import numpy as np

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Safe audio clip loader
def safe_audio_clip(path):
    try:
        clip = AudioFileClip(path)
        if clip.reader is None or clip.duration is None or clip.duration <= 0:
            clip.close()
            raise ValueError(f"Invalid or empty audio file: {os.path.basename(path)}")
        return clip
    except Exception as e:
        raise ValueError(f"Failed to load audio file '{os.path.basename(path)}': {str(e)}")

def merge_videos_and_audios(
    video_files, audio_files, orig_vol=1.0, music_vol=0.5, temp_dir=None
):
    try:
        output_path = os.path.join(
            temp_dir, "merged_output.mp4" if video_files else "merged_output.mp3"
        )

        # If no videos, just concatenate audios
        if not video_files and audio_files:
            audio_clips = []
            for a in audio_files:
                try:
                    audio_clip = safe_audio_clip(a)
                    audio_clips.append(audio_clip)
                except Exception as e:
                    logger.warning(f"Skipping invalid audio file: {a}{str(e)}")

            if not audio_clips:
                raise ValueError("No valid audio files to merge.")

            final_audio = concatenate_audioclips(audio_clips)
            final_audio.write_audiofile(output_path)
            for clip in audio_clips:
                clip.close()
            final_audio.close()
            return output_path

        # If videos are present:
        video_clips = [VideoFileClip(v) for v in video_files]
        final_video = concatenate_videoclips(video_clips, method="compose")

        if audio_files:
            audio_clips = []
            for a in audio_files:
                try:
                    audio_clip = safe_audio_clip(a)
                    audio_clips.append(audio_clip)
                except Exception as e:
                    logger.warning(f"Skipping invalid audio file: {a}{str(e)}")

            if audio_clips:
                final_audio = concatenate_audioclips(audio_clips).volumex(music_vol)

                original_audio = (
                    final_video.audio.volumex(orig_vol)
                    if final_video.audio is not None
                    else None
                )

                if original_audio:
                    composite_audio = CompositeAudioClip([original_audio, final_audio])
                else:
                    composite_audio = final_audio

                final_video = final_video.set_audio(composite_audio)

                for clip in audio_clips:
                    clip.close()
            else:
                logger.warning("No valid audio files found. Using only video audio (if present).")
                if final_video.audio is not None:
                    final_video = final_video.volumex(orig_vol)

        else:
            # No audio files provided
            if final_video.audio is not None:
                final_video = final_video.volumex(orig_vol)

        final_video.write_videofile(output_path, codec="libx264", audio_codec="aac")

        for clip in video_clips:
            clip.close()
        final_video.close()

        return output_path

    except Exception as e:
        error_msg = f"Error during merge: {str(e)}\n{traceback.format_exc()}"
        logger.error(error_msg)
        return error_msg

app = FastAPI()

@app.post("/merge")
async def merge_endpoint(
    files: list[UploadFile] = File(...),
    orig_vol: float = Form(1.0),
    music_vol: float = Form(0.5),
):
    temp_dir = tempfile.mkdtemp()
    try:
        saved_files = []
        for uploaded_file in files:
            file_path = os.path.join(temp_dir, uploaded_file.filename)
            with open(file_path, "wb") as out_file:
                content = await uploaded_file.read()
                out_file.write(content)
            saved_files.append(file_path)

        video_files = [f for f in saved_files if f.lower().endswith(".mp4")]
        audio_files = [
            f
            for f in saved_files
            if f.lower().endswith((".mp3", ".wav", ".aac", ".m4a", ".ogg"))
        ]

        if len(saved_files) < 2:
            return {"error": "Please upload at least 2 files (videos or audios)."}

        result_path = merge_videos_and_audios(
            video_files, audio_files, orig_vol, music_vol, temp_dir
        )

        if isinstance(result_path, str) and result_path.startswith("Error"):
            return {"error": result_path}

        media_type = (
            "video/mp4" if result_path.lower().endswith(".mp4") else "audio/mpeg"
        )
        filename = os.path.basename(result_path)

        return FileResponse(result_path, media_type=media_type, filename=filename)

    finally:
        shutil.rmtree(temp_dir)

# Optional: log space public URL
def log_api_url():
    url = os.getenv("SPACE_PUBLIC_URL")
    if url:
        logger.info(f"API is available at: {url}/merge")
    else:
        logger.info("SPACE_PUBLIC_URL environment variable not found")

log_api_url()