Spaces:
Sleeping
Sleeping
| # app/utils.py | |
| import asyncio | |
| import aiohttp | |
| import aiofiles | |
| import os | |
| import shutil | |
| import json | |
| import re | |
| from datetime import timedelta | |
| from typing import List, Dict | |
| async def run_subprocess(command: str) -> str: | |
| """ | |
| Executa um comando de shell de forma assíncrona e retorna sua saída. | |
| Levanta uma exceção se o comando falhar. | |
| """ | |
| process = await asyncio.create_subprocess_shell( | |
| command, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode != 0: | |
| error_message = f"Comando falhou com código {process.returncode}:\n{command}\nSTDERR: {stderr.decode()}" | |
| print(error_message) | |
| raise RuntimeError(error_message) | |
| print(f"Comando executado com sucesso: {command}") | |
| return stdout.decode().strip() | |
| async def download_file(url: str, destination_path: str): | |
| """ | |
| Baixa um arquivo de uma URL para um caminho de destino. | |
| """ | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| response.raise_for_status() | |
| async with aiofiles.open(destination_path, 'wb') as f: | |
| while True: | |
| chunk = await response.content.read(8192) | |
| if not chunk: | |
| break | |
| await f.write(chunk) | |
| print(f"Download completo: {url} para {destination_path}") | |
| async def get_audio_duration(audio_path: str) -> float: | |
| """ | |
| Usa ffprobe para obter a duração de um arquivo de áudio em segundos. | |
| """ | |
| import shlex # Import shlex here for safety with paths | |
| command = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {shlex.quote(audio_path)}" | |
| output = await run_subprocess(command) | |
| try: | |
| return float(output) | |
| except ValueError: | |
| raise ValueError(f"Não foi possível obter a duração do áudio em {audio_path}. Saída do ffprobe: {output}") | |
| def clean_up_temp_dir(path: str): | |
| """ | |
| Remove recursivamente um diretório. Usado para limpeza. | |
| """ | |
| if os.path.exists(path): | |
| shutil.rmtree(path) | |
| print(f"Diretório temporário limpo: {path}") | |
| # Import shlex aqui para ser usado no escape_ffmpeg_path também | |
| import shlex | |
| def escape_ffmpeg_path(path: str) -> str: | |
| """ | |
| Escapa caminhos de arquivo locais para serem usados em comandos FFmpeg no shell. | |
| Usa shlex.quote para máxima segurança. | |
| NÃO DEVE SER USADO PARA ESCAPAR URLs. | |
| """ | |
| return shlex.quote(path) | |
| # Helper function para formatar segundos para o padrão de tempo do ASS (H:MM:SS.cs) | |
| def format_seconds_to_ass(seconds: float) -> str: | |
| """ | |
| Formata segundos para o padrão de tempo do ASS (H:MM:SS.cs). | |
| Ex: 123.45 -> 0:02:03.45 | |
| """ | |
| td = timedelta(seconds=seconds) | |
| total_milliseconds = int(td.total_seconds() * 1000) | |
| hours = total_milliseconds // 3_600_000 | |
| total_milliseconds %= 3_3600_000 # Erro no original, corrigido para 3_600_000 | |
| minutes = total_milliseconds // 60_000 | |
| total_milliseconds %= 60_000 | |
| seconds_part = total_milliseconds // 1_000 | |
| centiseconds = (total_milliseconds % 1_000) // 10 | |
| return f"{hours}:{minutes:02}:{seconds_part:02}.{centiseconds:02}" | |
| # Função para ajustar o arquivo ASS, movida para utils.py | |
| async def adjust_ass_last_line(ass_path: str, target_duration: float): | |
| """ | |
| Ajusta o 'End Time' da última linha de diálogo em um arquivo .ass | |
| para corresponder à duração total desejada. | |
| """ | |
| async with aiofiles.open(ass_path, 'r', encoding='utf-8') as f: | |
| lines = await f.readlines() | |
| dialogue_lines = [] | |
| header_lines = [] | |
| in_events_section = False | |
| for line in lines: | |
| if line.strip().lower() == '[events]': | |
| in_events_section = True | |
| header_lines.append(line) | |
| elif in_events_section and line.strip().startswith('Format:'): | |
| header_lines.append(line) | |
| elif in_events_section and line.strip().startswith('Dialogue:'): | |
| dialogue_lines.append(line) | |
| else: | |
| header_lines.append(line) | |
| if not dialogue_lines: | |
| print("Aviso: Nenhuma linha de diálogo encontrada no arquivo ASS para ajustar.") | |
| return | |
| last_dialogue_line = dialogue_lines[-1] | |
| # Regex para encontrar os campos Start e End de uma linha de diálogo ASS | |
| # As aspas nos grupos são para ser mais flexível se houver espaços antes/depois das vírgulas | |
| ass_pattern = re.compile(r'^(Dialogue:.*?,\s*)(\d+:\d{2}:\d{2}\.\d{2})(\s*,\s*)(\d+:\d{2}:\d{2}\.\d{2})(.*)$') | |
| match = ass_pattern.match(last_dialogue_line) | |
| if match: | |
| parts_before_start = match.group(1) | |
| start_time_str = match.group(2) | |
| separator = match.group(3) | |
| parts_after_end = match.group(5) | |
| new_end_time_str = format_seconds_to_ass(target_duration) | |
| adjusted_last_line = f"{parts_before_start}{start_time_str}{separator}{new_end_time_str}{parts_after_end.strip()}\n" # Garante newline | |
| dialogue_lines[-1] = adjusted_last_line | |
| print(f"Última linha ASS ajustada para terminar em {new_end_time_str}") | |
| else: | |
| print(f"Aviso: Não foi possível parsear a última linha de diálogo ASS: {last_dialogue_line.strip()}") | |
| # Reconstrói o arquivo completo | |
| adjusted_ass_content = "".join(header_lines + dialogue_lines) | |
| async with aiofiles.open(ass_path, 'w', encoding='utf-8') as f: | |
| await f.write(adjusted_ass_content) | |
| print(f"Arquivo ASS {ass_path} ajustado com sucesso para duração {target_duration:.2f}.") |