| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
|
|
| import os |
| import subprocess |
| from services.file_management import download_file |
|
|
| STORAGE_PATH = "/tmp/" |
|
|
| def get_duration(file_path): |
| cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path] |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| return float(result.stdout) |
|
|
| def process_audio_mixing(video_url, audio_url, video_vol, audio_vol, output_length, job_id, webhook_url=None): |
| video_path = download_file(video_url, STORAGE_PATH) |
| audio_path = download_file(audio_url, STORAGE_PATH) |
| output_path = os.path.join(STORAGE_PATH, f"{job_id}.mp4") |
|
|
| video_duration = get_duration(video_path) |
| audio_duration = get_duration(audio_path) |
|
|
| |
| output_duration = video_duration if output_length == 'video' else audio_duration |
|
|
| |
| cmd = ['ffmpeg', '-y'] |
|
|
| |
| cmd.extend(['-i', video_path]) |
|
|
| |
| cmd.extend(['-i', audio_path]) |
|
|
| |
| if output_length == 'audio' and audio_duration > video_duration: |
| cmd.extend(['-stream_loop', '-1']) |
|
|
| |
| audio_filter = f'[1:a]volume={audio_vol/100}' |
| if output_length == 'video': |
| audio_filter += f',atrim=duration={video_duration}' |
| audio_filter += '[a]' |
| cmd.extend(['-filter_complex', audio_filter]) |
|
|
| |
| cmd.extend(['-map', '0:v']) |
| cmd.extend(['-map', '[a]']) |
|
|
| if output_length == 'audio' and audio_duration > video_duration: |
| cmd.extend(['-c:v', 'libx264']) |
| else: |
| cmd.extend(['-c:v', 'copy']) |
|
|
| cmd.extend(['-c:a', 'aac']) |
| |
| |
| cmd.extend(['-t', str(output_duration)]) |
|
|
| cmd.append(output_path) |
|
|
| |
| subprocess.run(cmd, check=True) |
|
|
| |
| os.remove(video_path) |
| os.remove(audio_path) |
|
|
| return output_path |