ffmpegton8n / app /utils.py
jcnok's picture
Update app/utils.py
5bc4f60 verified
# app/utils.py
import asyncio
import aiohttp
import aiofiles
import os
import shutil
import json
import re
from datetime import timedelta
from typing import List, Dict
async def run_subprocess(command: str) -> str:
"""
Executa um comando de shell de forma assíncrona e retorna sua saída.
Levanta uma exceção se o comando falhar.
"""
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_message = f"Comando falhou com código {process.returncode}:\n{command}\nSTDERR: {stderr.decode()}"
print(error_message)
raise RuntimeError(error_message)
print(f"Comando executado com sucesso: {command}")
return stdout.decode().strip()
async def download_file(url: str, destination_path: str):
"""
Baixa um arquivo de uma URL para um caminho de destino.
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
async with aiofiles.open(destination_path, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
await f.write(chunk)
print(f"Download completo: {url} para {destination_path}")
async def get_audio_duration(audio_path: str) -> float:
"""
Usa ffprobe para obter a duração de um arquivo de áudio em segundos.
"""
import shlex # Import shlex here for safety with paths
command = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {shlex.quote(audio_path)}"
output = await run_subprocess(command)
try:
return float(output)
except ValueError:
raise ValueError(f"Não foi possível obter a duração do áudio em {audio_path}. Saída do ffprobe: {output}")
def clean_up_temp_dir(path: str):
"""
Remove recursivamente um diretório. Usado para limpeza.
"""
if os.path.exists(path):
shutil.rmtree(path)
print(f"Diretório temporário limpo: {path}")
# Import shlex aqui para ser usado no escape_ffmpeg_path também
import shlex
def escape_ffmpeg_path(path: str) -> str:
"""
Escapa caminhos de arquivo locais para serem usados em comandos FFmpeg no shell.
Usa shlex.quote para máxima segurança.
NÃO DEVE SER USADO PARA ESCAPAR URLs.
"""
return shlex.quote(path)
# Helper function para formatar segundos para o padrão de tempo do ASS (H:MM:SS.cs)
def format_seconds_to_ass(seconds: float) -> str:
"""
Formata segundos para o padrão de tempo do ASS (H:MM:SS.cs).
Ex: 123.45 -> 0:02:03.45
"""
td = timedelta(seconds=seconds)
total_milliseconds = int(td.total_seconds() * 1000)
hours = total_milliseconds // 3_600_000
total_milliseconds %= 3_3600_000 # Erro no original, corrigido para 3_600_000
minutes = total_milliseconds // 60_000
total_milliseconds %= 60_000
seconds_part = total_milliseconds // 1_000
centiseconds = (total_milliseconds % 1_000) // 10
return f"{hours}:{minutes:02}:{seconds_part:02}.{centiseconds:02}"
# Função para ajustar o arquivo ASS, movida para utils.py
async def adjust_ass_last_line(ass_path: str, target_duration: float):
"""
Ajusta o 'End Time' da última linha de diálogo em um arquivo .ass
para corresponder à duração total desejada.
"""
async with aiofiles.open(ass_path, 'r', encoding='utf-8') as f:
lines = await f.readlines()
dialogue_lines = []
header_lines = []
in_events_section = False
for line in lines:
if line.strip().lower() == '[events]':
in_events_section = True
header_lines.append(line)
elif in_events_section and line.strip().startswith('Format:'):
header_lines.append(line)
elif in_events_section and line.strip().startswith('Dialogue:'):
dialogue_lines.append(line)
else:
header_lines.append(line)
if not dialogue_lines:
print("Aviso: Nenhuma linha de diálogo encontrada no arquivo ASS para ajustar.")
return
last_dialogue_line = dialogue_lines[-1]
# Regex para encontrar os campos Start e End de uma linha de diálogo ASS
# As aspas nos grupos são para ser mais flexível se houver espaços antes/depois das vírgulas
ass_pattern = re.compile(r'^(Dialogue:.*?,\s*)(\d+:\d{2}:\d{2}\.\d{2})(\s*,\s*)(\d+:\d{2}:\d{2}\.\d{2})(.*)$')
match = ass_pattern.match(last_dialogue_line)
if match:
parts_before_start = match.group(1)
start_time_str = match.group(2)
separator = match.group(3)
parts_after_end = match.group(5)
new_end_time_str = format_seconds_to_ass(target_duration)
adjusted_last_line = f"{parts_before_start}{start_time_str}{separator}{new_end_time_str}{parts_after_end.strip()}\n" # Garante newline
dialogue_lines[-1] = adjusted_last_line
print(f"Última linha ASS ajustada para terminar em {new_end_time_str}")
else:
print(f"Aviso: Não foi possível parsear a última linha de diálogo ASS: {last_dialogue_line.strip()}")
# Reconstrói o arquivo completo
adjusted_ass_content = "".join(header_lines + dialogue_lines)
async with aiofiles.open(ass_path, 'w', encoding='utf-8') as f:
await f.write(adjusted_ass_content)
print(f"Arquivo ASS {ass_path} ajustado com sucesso para duração {target_duration:.2f}.")