# -*- coding: utf-8 -*- """ FFmpeg API Service for n8n Video Automation. This FastAPI application provides a single, highly-optimized endpoint to orchestrate a full video production pipeline from raw assets. Version: 9.0.0 (Ultimate - All-In-One Endpoint) """ # --- Core Imports --- import asyncio import json import os import shutil import shlex import uuid from typing import Dict, List # --- Library Imports --- import aiohttp from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi import Request from starlette.background import BackgroundTask # ============================================================================== # 1. APPLICATION CONFIGURATION & SETUP # ============================================================================== app = FastAPI( title="Vid Automator API", description="A single, powerful endpoint to create complete videos from raw assets.", version="9.0.0", ) TEMP_DIR = "/tmp/vid_automator" BROWSER_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } # ============================================================================== # 2. LIFECYCLE & HELPER FUNCTIONS # ============================================================================== @app.on_event("startup") async def startup_event(): os.makedirs(TEMP_DIR, exist_ok=True) def cleanup_directory(directory_path: str): if os.path.exists(directory_path): shutil.rmtree(directory_path, ignore_errors=True) print(f"--- CLEANUP: Successfully removed {directory_path} ---") async def run_subprocess(command: str) -> (str, str): print(f"--- EXECUTING COMMAND ---\n{command}\n-------------------------") process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: error_message = stderr.decode(errors='ignore').strip() print(f"ERROR: Command failed.\nStderr: {error_message}") raise RuntimeError(f"FFmpeg command failed: {error_message}") print("SUCCESS: Command executed.") return stdout.decode(errors='ignore'), stderr.decode(errors='ignore') async def download_asset(session: aiohttp.ClientSession, url: str, path: str, asset_type: str): print(f"Downloading {asset_type} from {url}...") try: async with session.get(url, timeout=120) as resp: # Aumentado timeout para downloads maiores resp.raise_for_status() with open(path, "wb") as f: f.write(await resp.read()) print(f"SUCCESS: Downloaded {asset_type}.") except Exception as e: print(f"ERROR downloading {asset_type} from {url}: {e}") raise # ============================================================================== # ENDPOINT OTIMIZADO: RECEBE UMA URL DE ÁUDIO E RETORNA A DURAÇÃO # ============================================================================== @app.post("/get-duration-from-url/") async def get_duration_from_url(payload: Dict): """ Accepts a JSON payload with an 'audio_url', downloads the audio file, calculates its duration using FFprobe, and returns the duration in rounded seconds. This endpoint is designed to be highly efficient for n8n workflows, avoiding the need for the client to handle binary file uploads. Args: payload (Dict): A JSON object containing the key "audio_url". Example: {"audio_url": "http://example.com/audio.mp3"} Returns: JSONResponse: A JSON object with the duration in rounded seconds. Example: {"duration_seconds": 150} """ # ETAPA 1: Validação do Payload de Entrada audio_url = payload.get("audio_url") if not audio_url: raise HTTPException( status_code=422, # 422 Unprocessable Entity é mais semântico aqui detail="Payload must contain a non-empty 'audio_url' key." ) # ETAPA 2: Setup do Ambiente Temporário unique_id = str(uuid.uuid4()) temp_audio_path = os.path.join(TEMP_DIR, f"{unique_id}_audio_to_probe.mp3") try: # ETAPA 3: Download Assíncrono do Arquivo de Áudio async with aiohttp.ClientSession(headers=BROWSER_HEADERS) as session: await download_asset(session, audio_url, temp_audio_path, "audio for probing") # ETAPA 4: Executar o FFprobe no Arquivo Baixado # O comando é o mesmo, apenas o caminho do arquivo mudou quoted_path = shlex.quote(temp_audio_path) command = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {quoted_path}" stdout, _ = await run_subprocess(command) duration_str = stdout.strip() if not duration_str: raise ValueError("FFprobe did not return a valid duration.") duration_float = float(duration_str) # Arredonda o valor para o número inteiro mais próximo duration_rounded = round(duration_float) # ETAPA 5: Retornar a Resposta JSON return JSONResponse( content={"duration_seconds": duration_rounded} ) except (ValueError, IOError) as e: # Captura erros de download ou de parsing do FFprobe raise HTTPException(status_code=400, detail=f"Error processing audio URL: {e}") except Exception as e: # Captura todos os outros erros, incluindo falhas no subprocesso raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}") finally: # ETAPA 6: Garantir a Limpeza do Arquivo Temporário # Este bloco é executado sempre, mesmo se ocorrerem erros if os.path.exists(temp_audio_path): os.remove(temp_audio_path) print(f"--- CLEANUP: Removed temporary probe file {temp_audio_path} ---") # ============================================================================== # 3. SUPER ENDPOINT "TUDO-EM-UM" # ============================================================================== @app.post("/generate-complete-video/") async def generate_complete_video_robust(request: Request): """ Orchestrates the entire video production pipeline by dynamically processing a multipart/form-data request. This is the most robust method for handling complex uploads from clients like n8n. """ unique_id = str(uuid.uuid4()) temp_processing_dir = os.path.join(TEMP_DIR, unique_id) os.makedirs(temp_processing_dir) try: # ETAPA 1: Processar o formulário multipart manualmente print("--- STEP 1/8: PROCESSING MULTIPART FORM DATA ---") form_data = await request.form() # Extrair campos de texto image_urls_json = form_data.get("image_urls_json") narration_url = form_data.get("narration_url") music_volume = float(form_data.get("music_volume", 0.2)) output_filename = form_data.get("output_filename", "final_video.mp4") # Extrair arquivos binários subtitle_file = form_data.get("subtitle_file") # Coleta todos os arquivos cujo nome de campo começa com "music_files" music_files = [file for key, file in form_data.items() if key.startswith("music_files")] # Validação robusta if not all([image_urls_json, narration_url, subtitle_file, music_files]): raise HTTPException(status_code=422, detail="Missing one or more required fields: image_urls_json, narration_url, subtitle_file, music_files.") # ETAPA 2: Salvar e baixar todos os ativos print("--- STEP 2/8: ACQUIRING ALL ASSETS ---") narration_path = os.path.join(temp_processing_dir, "narration.mp3") subtitle_path = os.path.join(temp_processing_dir, "subtitle.ass") with open(subtitle_path, "wb") as buffer: shutil.copyfileobj(subtitle_file.file, buffer) music_paths = [] for i, music_file in enumerate(music_files): path = os.path.join(temp_processing_dir, f"music_{i}.mp3") with open(path, "wb") as buffer: shutil.copyfileobj(music_file.file, buffer) music_paths.append(path) async with aiohttp.ClientSession(headers=BROWSER_HEADERS) as session: await download_asset(session, narration_url, narration_path, "narration audio") image_urls = json.loads(image_urls_json) local_image_paths = [] for i, url in enumerate(image_urls): local_path = os.path.join(temp_processing_dir, f"image_{i:02d}.jpg") await download_asset(session, url, local_path, f"image {i+1}") local_image_paths.append(local_path) print("SUCCESS: All assets acquired.") # O restante do pipeline (ETAPAS 3 a 8) permanece o mesmo, pois já opera em arquivos locais. # ... (código das etapas 3 a 8 sem alterações) # ETAPA 3: Unir músicas de fundo print("--- STEP 3/8: MERGING BACKGROUND MUSIC ---") merged_music_path = os.path.join(temp_processing_dir, "bg_music_merged.mp3") if len(music_paths) > 1: input_args_music = "".join([f"-i {shlex.quote(p)} " for p in music_paths]) filter_inputs = "".join([f"[{i}:a:0]" for i in range(len(music_paths))]) concat_filter = f"\"{filter_inputs}concat=n={len(music_paths)}:v=0:a=1[outa]\"" cmd_merge_music = f"ffmpeg {input_args_music} -filter_complex {concat_filter} -map \"[outa]\" -y {shlex.quote(merged_music_path)}" await run_subprocess(cmd_merge_music) else: shutil.copy(music_paths[0], merged_music_path) print("SUCCESS: Background music prepared.") # ETAPA 4: Mixar narração e música print("--- STEP 4/8: MIXING FINAL AUDIO ---") final_audio_path = os.path.join(temp_processing_dir, "final_audio.mp3") cmd_mix = (f"ffmpeg -i {shlex.quote(narration_path)} -i {shlex.quote(merged_music_path)} " f"-filter_complex \"[1:a]volume={music_volume}[bg];[0:a][bg]amix=inputs=2:duration=first\" " f"-y {shlex.quote(final_audio_path)}") await run_subprocess(cmd_mix) print("SUCCESS: Final audio track mixed.") # ETAPA 5: Obter duração da narração print("--- STEP 5/8: GETTING NARRATION DURATION ---") cmd_ffprobe = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {shlex.quote(narration_path)}" stdout, _ = await run_subprocess(cmd_ffprobe) total_duration = float(stdout.strip()) print(f"SUCCESS: Narration duration is {total_duration} seconds.") # ETAPA 6: Criar vídeo com transições print("--- STEP 6/8: CREATING VIDEO WITH TRANSITIONS ---") transition_duration, num_images = 1.0, len(local_image_paths) display_duration = (total_duration - transition_duration) / (num_images - 1) input_args = "".join([f"-loop 1 -t {display_duration + transition_duration} -i {shlex.quote(p)} " for p in local_image_paths]) filter_complex_chain, last_stream = "", "0:v" for i in range(num_images - 1): cumulative_offset = (i + 1) * display_duration filter_part = f"[{last_stream}][{i + 1}:v]xfade=transition=fade:duration={transition_duration}:offset={cumulative_offset}[v{i + 1}];" filter_complex_chain += filter_part last_stream = f"v{i + 1}" video_with_transitions_path = os.path.join(temp_processing_dir, "video_with_transitions.mp4") final_filter_chain = f"{filter_complex_chain}[{last_stream}]format=yuv420p,fps=25[final_v]" cmd_video_transitions = f"ffmpeg {input_args} -filter_complex \"{final_filter_chain}\" -map \"[final_v]\" -an -y {shlex.quote(video_with_transitions_path)}" await run_subprocess(cmd_video_transitions) print("SUCCESS: Video with transitions created.") # ETAPA 7: Unir vídeo ao áudio final print("--- STEP 7/8: COMBINING VIDEO AND FINAL AUDIO ---") video_with_audio_path = os.path.join(temp_processing_dir, "video_with_audio.mp4") cmd_combine = (f"ffmpeg -i {shlex.quote(video_with_transitions_path)} -i {shlex.quote(final_audio_path)} -c:v copy -c:a aac -shortest -y {shlex.quote(video_with_audio_path)}") await run_subprocess(cmd_combine) print("SUCCESS: Video and final audio combined.") # ETAPA 8: Queimar legendas print("--- STEP 8/8: BURNING SUBTITLES ---") final_video_path = os.path.join(temp_processing_dir, output_filename) escaped_subtitle_path = subtitle_path.replace("'", "'\\''") cmd_subtitles = (f"ffmpeg -i {shlex.quote(video_with_audio_path)} -vf \"ass='{escaped_subtitle_path}'\" -c:v libx264 -preset ultrafast -c:a copy -y {shlex.quote(final_video_path)}") await run_subprocess(cmd_subtitles) print("--- FINAL VIDEO CREATED SUCCESSFULLY ---") custom_headers = {"X-Video-Duration-Seconds": str(round(total_duration, 2))} cleanup_task = BackgroundTask(cleanup_directory, directory_path=temp_processing_dir) return FileResponse( path=final_video_path, filename=output_filename, media_type="video/mp4", background=cleanup_task, headers=custom_headers ) except Exception as e: cleanup_directory(directory_path=temp_processing_dir) raise HTTPException(status_code=500, detail=f"An internal server error occurred: {str(e)}") # (Você pode remover os endpoints antigos como /merge-audio-files/, /mix-audio-tracks/, etc.)