ffmpeg-space / app.py
jcnok's picture
Update app.py
708fe13 verified
# -*- coding: utf-8 -*-
"""
FFmpeg API Service for n8n Video Automation.
This FastAPI application provides a single, highly-optimized endpoint
to orchestrate a full video production pipeline from raw assets.
Version: 9.0.0 (Ultimate - All-In-One Endpoint)
"""
# --- Core Imports ---
import asyncio
import json
import os
import shutil
import shlex
import uuid
from typing import Dict, List
# --- Library Imports ---
import aiohttp
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi import Request
from starlette.background import BackgroundTask
# ==============================================================================
# 1. APPLICATION CONFIGURATION & SETUP
# ==============================================================================
app = FastAPI(
title="Vid Automator API",
description="A single, powerful endpoint to create complete videos from raw assets.",
version="9.0.0",
)
TEMP_DIR = "/tmp/vid_automator"
BROWSER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
# ==============================================================================
# 2. LIFECYCLE & HELPER FUNCTIONS
# ==============================================================================
@app.on_event("startup")
async def startup_event():
os.makedirs(TEMP_DIR, exist_ok=True)
def cleanup_directory(directory_path: str):
if os.path.exists(directory_path):
shutil.rmtree(directory_path, ignore_errors=True)
print(f"--- CLEANUP: Successfully removed {directory_path} ---")
async def run_subprocess(command: str) -> (str, str):
print(f"--- EXECUTING COMMAND ---\n{command}\n-------------------------")
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_message = stderr.decode(errors='ignore').strip()
print(f"ERROR: Command failed.\nStderr: {error_message}")
raise RuntimeError(f"FFmpeg command failed: {error_message}")
print("SUCCESS: Command executed.")
return stdout.decode(errors='ignore'), stderr.decode(errors='ignore')
async def download_asset(session: aiohttp.ClientSession, url: str, path: str, asset_type: str):
print(f"Downloading {asset_type} from {url}...")
try:
async with session.get(url, timeout=120) as resp: # Aumentado timeout para downloads maiores
resp.raise_for_status()
with open(path, "wb") as f:
f.write(await resp.read())
print(f"SUCCESS: Downloaded {asset_type}.")
except Exception as e:
print(f"ERROR downloading {asset_type} from {url}: {e}")
raise
# ==============================================================================
# ENDPOINT OTIMIZADO: RECEBE UMA URL DE ÁUDIO E RETORNA A DURAÇÃO
# ==============================================================================
@app.post("/get-duration-from-url/")
async def get_duration_from_url(payload: Dict):
"""
Accepts a JSON payload with an 'audio_url', downloads the audio file,
calculates its duration using FFprobe, and returns the duration in
rounded seconds.
This endpoint is designed to be highly efficient for n8n workflows,
avoiding the need for the client to handle binary file uploads.
Args:
payload (Dict): A JSON object containing the key "audio_url".
Example: {"audio_url": "http://example.com/audio.mp3"}
Returns:
JSONResponse: A JSON object with the duration in rounded seconds.
Example: {"duration_seconds": 150}
"""
# ETAPA 1: Validação do Payload de Entrada
audio_url = payload.get("audio_url")
if not audio_url:
raise HTTPException(
status_code=422, # 422 Unprocessable Entity é mais semântico aqui
detail="Payload must contain a non-empty 'audio_url' key."
)
# ETAPA 2: Setup do Ambiente Temporário
unique_id = str(uuid.uuid4())
temp_audio_path = os.path.join(TEMP_DIR, f"{unique_id}_audio_to_probe.mp3")
try:
# ETAPA 3: Download Assíncrono do Arquivo de Áudio
async with aiohttp.ClientSession(headers=BROWSER_HEADERS) as session:
await download_asset(session, audio_url, temp_audio_path, "audio for probing")
# ETAPA 4: Executar o FFprobe no Arquivo Baixado
# O comando é o mesmo, apenas o caminho do arquivo mudou
quoted_path = shlex.quote(temp_audio_path)
command = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {quoted_path}"
stdout, _ = await run_subprocess(command)
duration_str = stdout.strip()
if not duration_str:
raise ValueError("FFprobe did not return a valid duration.")
duration_float = float(duration_str)
# Arredonda o valor para o número inteiro mais próximo
duration_rounded = round(duration_float)
# ETAPA 5: Retornar a Resposta JSON
return JSONResponse(
content={"duration_seconds": duration_rounded}
)
except (ValueError, IOError) as e:
# Captura erros de download ou de parsing do FFprobe
raise HTTPException(status_code=400, detail=f"Error processing audio URL: {e}")
except Exception as e:
# Captura todos os outros erros, incluindo falhas no subprocesso
raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}")
finally:
# ETAPA 6: Garantir a Limpeza do Arquivo Temporário
# Este bloco é executado sempre, mesmo se ocorrerem erros
if os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
print(f"--- CLEANUP: Removed temporary probe file {temp_audio_path} ---")
# ==============================================================================
# 3. SUPER ENDPOINT "TUDO-EM-UM"
# ==============================================================================
@app.post("/generate-complete-video/")
async def generate_complete_video_robust(request: Request):
"""
Orchestrates the entire video production pipeline by dynamically processing
a multipart/form-data request. This is the most robust method for handling
complex uploads from clients like n8n.
"""
unique_id = str(uuid.uuid4())
temp_processing_dir = os.path.join(TEMP_DIR, unique_id)
os.makedirs(temp_processing_dir)
try:
# ETAPA 1: Processar o formulário multipart manualmente
print("--- STEP 1/8: PROCESSING MULTIPART FORM DATA ---")
form_data = await request.form()
# Extrair campos de texto
image_urls_json = form_data.get("image_urls_json")
narration_url = form_data.get("narration_url")
music_volume = float(form_data.get("music_volume", 0.2))
output_filename = form_data.get("output_filename", "final_video.mp4")
# Extrair arquivos binários
subtitle_file = form_data.get("subtitle_file")
# Coleta todos os arquivos cujo nome de campo começa com "music_files"
music_files = [file for key, file in form_data.items() if key.startswith("music_files")]
# Validação robusta
if not all([image_urls_json, narration_url, subtitle_file, music_files]):
raise HTTPException(status_code=422, detail="Missing one or more required fields: image_urls_json, narration_url, subtitle_file, music_files.")
# ETAPA 2: Salvar e baixar todos os ativos
print("--- STEP 2/8: ACQUIRING ALL ASSETS ---")
narration_path = os.path.join(temp_processing_dir, "narration.mp3")
subtitle_path = os.path.join(temp_processing_dir, "subtitle.ass")
with open(subtitle_path, "wb") as buffer:
shutil.copyfileobj(subtitle_file.file, buffer)
music_paths = []
for i, music_file in enumerate(music_files):
path = os.path.join(temp_processing_dir, f"music_{i}.mp3")
with open(path, "wb") as buffer:
shutil.copyfileobj(music_file.file, buffer)
music_paths.append(path)
async with aiohttp.ClientSession(headers=BROWSER_HEADERS) as session:
await download_asset(session, narration_url, narration_path, "narration audio")
image_urls = json.loads(image_urls_json)
local_image_paths = []
for i, url in enumerate(image_urls):
local_path = os.path.join(temp_processing_dir, f"image_{i:02d}.jpg")
await download_asset(session, url, local_path, f"image {i+1}")
local_image_paths.append(local_path)
print("SUCCESS: All assets acquired.")
# O restante do pipeline (ETAPAS 3 a 8) permanece o mesmo, pois já opera em arquivos locais.
# ... (código das etapas 3 a 8 sem alterações)
# ETAPA 3: Unir músicas de fundo
print("--- STEP 3/8: MERGING BACKGROUND MUSIC ---")
merged_music_path = os.path.join(temp_processing_dir, "bg_music_merged.mp3")
if len(music_paths) > 1:
input_args_music = "".join([f"-i {shlex.quote(p)} " for p in music_paths])
filter_inputs = "".join([f"[{i}:a:0]" for i in range(len(music_paths))])
concat_filter = f"\"{filter_inputs}concat=n={len(music_paths)}:v=0:a=1[outa]\""
cmd_merge_music = f"ffmpeg {input_args_music} -filter_complex {concat_filter} -map \"[outa]\" -y {shlex.quote(merged_music_path)}"
await run_subprocess(cmd_merge_music)
else:
shutil.copy(music_paths[0], merged_music_path)
print("SUCCESS: Background music prepared.")
# ETAPA 4: Mixar narração e música
print("--- STEP 4/8: MIXING FINAL AUDIO ---")
final_audio_path = os.path.join(temp_processing_dir, "final_audio.mp3")
cmd_mix = (f"ffmpeg -i {shlex.quote(narration_path)} -i {shlex.quote(merged_music_path)} "
f"-filter_complex \"[1:a]volume={music_volume}[bg];[0:a][bg]amix=inputs=2:duration=first\" "
f"-y {shlex.quote(final_audio_path)}")
await run_subprocess(cmd_mix)
print("SUCCESS: Final audio track mixed.")
# ETAPA 5: Obter duração da narração
print("--- STEP 5/8: GETTING NARRATION DURATION ---")
cmd_ffprobe = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {shlex.quote(narration_path)}"
stdout, _ = await run_subprocess(cmd_ffprobe)
total_duration = float(stdout.strip())
print(f"SUCCESS: Narration duration is {total_duration} seconds.")
# ETAPA 6: Criar vídeo com transições
print("--- STEP 6/8: CREATING VIDEO WITH TRANSITIONS ---")
transition_duration, num_images = 1.0, len(local_image_paths)
display_duration = (total_duration - transition_duration) / (num_images - 1)
input_args = "".join([f"-loop 1 -t {display_duration + transition_duration} -i {shlex.quote(p)} " for p in local_image_paths])
filter_complex_chain, last_stream = "", "0:v"
for i in range(num_images - 1):
cumulative_offset = (i + 1) * display_duration
filter_part = f"[{last_stream}][{i + 1}:v]xfade=transition=fade:duration={transition_duration}:offset={cumulative_offset}[v{i + 1}];"
filter_complex_chain += filter_part
last_stream = f"v{i + 1}"
video_with_transitions_path = os.path.join(temp_processing_dir, "video_with_transitions.mp4")
final_filter_chain = f"{filter_complex_chain}[{last_stream}]format=yuv420p,fps=25[final_v]"
cmd_video_transitions = f"ffmpeg {input_args} -filter_complex \"{final_filter_chain}\" -map \"[final_v]\" -an -y {shlex.quote(video_with_transitions_path)}"
await run_subprocess(cmd_video_transitions)
print("SUCCESS: Video with transitions created.")
# ETAPA 7: Unir vídeo ao áudio final
print("--- STEP 7/8: COMBINING VIDEO AND FINAL AUDIO ---")
video_with_audio_path = os.path.join(temp_processing_dir, "video_with_audio.mp4")
cmd_combine = (f"ffmpeg -i {shlex.quote(video_with_transitions_path)} -i {shlex.quote(final_audio_path)} -c:v copy -c:a aac -shortest -y {shlex.quote(video_with_audio_path)}")
await run_subprocess(cmd_combine)
print("SUCCESS: Video and final audio combined.")
# ETAPA 8: Queimar legendas
print("--- STEP 8/8: BURNING SUBTITLES ---")
final_video_path = os.path.join(temp_processing_dir, output_filename)
escaped_subtitle_path = subtitle_path.replace("'", "'\\''")
cmd_subtitles = (f"ffmpeg -i {shlex.quote(video_with_audio_path)} -vf \"ass='{escaped_subtitle_path}'\" -c:v libx264 -preset ultrafast -c:a copy -y {shlex.quote(final_video_path)}")
await run_subprocess(cmd_subtitles)
print("--- FINAL VIDEO CREATED SUCCESSFULLY ---")
custom_headers = {"X-Video-Duration-Seconds": str(round(total_duration, 2))}
cleanup_task = BackgroundTask(cleanup_directory, directory_path=temp_processing_dir)
return FileResponse(
path=final_video_path, filename=output_filename, media_type="video/mp4",
background=cleanup_task, headers=custom_headers
)
except Exception as e:
cleanup_directory(directory_path=temp_processing_dir)
raise HTTPException(status_code=500, detail=f"An internal server error occurred: {str(e)}")
# (Você pode remover os endpoints antigos como /merge-audio-files/, /mix-audio-tracks/, etc.)