| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import subprocess |
| import logging |
| import random |
| import time |
| import shutil |
| from typing import List, Optional, Tuple |
|
|
| import imageio |
| import numpy as np |
| import torch |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class VideoToolError(Exception): |
| """Exceção personalizada para erros originados do VideoEncodeTool.""" |
| pass |
| |
|
|
| class VideoEncodeTool: |
| def __init__(self, frame_log_every=8): |
| self.frame_log_every = frame_log_every |
| |
| """ |
| Um especialista para lidar com tarefas de codificação e manipulação de vídeo. |
| """ |
|
|
|
|
| @torch.no_grad() |
| def save_video_from_tensor(self, pixel_5d: torch.Tensor, path: str, fps: int = 24, progress_callback=None): |
| """ |
| Espera pixel_5d em [0,1], shape (B,C,T,H,W). |
| Escreve MP4 incremental, convertendo cada frame para (H,W,C) uint8. |
| """ |
| |
| device = "cuda" if pixel_5d.is_cuda else "cpu" |
| B, C, T, H, W = pixel_5d.shape |
| if B != 1: |
| |
| raise ValueError(f"Esperado B=1, recebido B={B}") |
|
|
| |
| with imageio.get_writer(path, fps=int(fps), codec="libx264", quality=8) as writer: |
| for i in range(T): |
| frame_chw = pixel_5d[0, :, i] |
| frame_hwc_u8 = (frame_chw.permute(1, 2, 0) |
| .clamp(0, 1) |
| .mul(255) |
| .to(torch.uint8) |
| .cpu() |
| .numpy()) |
| writer.append_data(frame_hwc_u8) |
| if progress_callback: |
| progress_callback(i + 1, T) |
| if i % self.frame_log_every == 0: |
| print(f"[DEBUG] [Encoder] frame {i}/{T} gravado ({H}x{W}@{fps}fps)") |
| |
|
|
| def extract_first_frame(self, video_path: str, output_image_path: str) -> str: |
| """ |
| Extrai o primeiro frame de um arquivo de vídeo e o salva como uma imagem. |
| """ |
| logger.info(f"Extraindo primeiro frame de '{os.path.basename(video_path)}'...") |
| cmd = ['ffmpeg', '-y', '-v', 'error', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] |
| try: |
| subprocess.run(cmd, check=True, capture_output=True, text=True) |
| return output_image_path |
| except subprocess.CalledProcessError as e: |
| logger.error(f"FFmpeg (extract_first_frame) falhou: {e.stderr}") |
| raise VideoToolError(f"Falha ao extrair o primeiro frame de {video_path}") |
|
|
| def extract_last_frame(self, video_path: str, output_image_path: str) -> str: |
| """ |
| Extrai o último frame de um arquivo de vídeo e o salva como uma imagem. |
| """ |
| logger.info(f"Extraindo último frame de '{os.path.basename(video_path)}'...") |
| cmd = ['ffmpeg', '-y', '-v', 'error', '-sseof', '-0.1', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] |
| try: |
| subprocess.run(cmd, check=True, capture_output=True, text=True) |
| return output_image_path |
| except subprocess.CalledProcessError as e: |
| logger.error(f"FFmpeg (extract_last_frame) falhou: {e.stderr}") |
| raise VideoToolError(f"Falha ao extrair o último frame de {video_path}") |
|
|
| def create_transition_bridge(self, start_image_path: str, end_image_path: str, |
| duration: float, fps: int, target_resolution: Tuple[int, int], |
| workspace_dir: str, effect: Optional[str] = None) -> str: |
| """ |
| Cria um clipe de vídeo curto que transiciona entre duas imagens estáticas. |
| """ |
| output_path = os.path.join(workspace_dir, f"bridge_{int(time.time())}_{random.randint(100, 999)}.mp4") |
| width, height = target_resolution |
| fade_effects = ["fade", "wipeleft", "wiperight", "wipeup", "wipedown", "dissolve", "fadeblack", "fadewhite", "radial", "rectcrop", "circleopen", "circleclose", "horzopen", "horzclose"] |
| selected_effect = effect if effect and effect.strip() else random.choice(fade_effects) |
| transition_duration = max(0.1, duration) |
| cmd = (f"ffmpeg -y -v error -loop 1 -t {transition_duration} -i \"{start_image_path}\" -loop 1 -t {transition_duration} -i \"{end_image_path}\" " |
| f"-filter_complex \"[0:v]scale={width}:{height},setsar=1[v0];[1:v]scale={width}:{height},setsar=1[v1];" |
| f"[v0][v1]xfade=transition={selected_effect}:duration={transition_duration}:offset=0[out]\" " |
| f"-map \"[out]\" -c:v libx264 -r {fps} -pix_fmt yuv420p \"{output_path}\"") |
| logger.info(f"Criando ponte de transição com efeito '{selected_effect}'...") |
| try: |
| subprocess.run(cmd, shell=True, check=True, text=True) |
| except subprocess.CalledProcessError as e: |
| raise VideoToolError(f"Falha ao criar vídeo de transição: {e.stderr}") |
| return output_path |
|
|
| |
|
|
|
|
|
|
|
|
| def concatenate_videos(self, video_paths: List[str], output_path: str, workspace_dir: str, start:int= 0, overlap:int=3) -> str: |
| """ |
| Concatena múltiplos vídeos MP4, removendo exatamente o último frame |
| de cada vídeo (exceto o último), salvando os cortes e recriando a lista a cada execução. |
| """ |
| if not video_paths: |
| raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") |
|
|
| |
| if len(video_paths) == 1: |
| shutil.copy(video_paths[0], output_path) |
| print(f"[Concat] Apenas um clipe fornecido. Copiado para '{output_path}'.") |
| return output_path |
|
|
| |
| trimmed_dir = os.path.join(workspace_dir, "trimmed_parts") |
| os.makedirs(trimmed_dir, exist_ok=True) |
|
|
| |
| for old in os.listdir(trimmed_dir): |
| try: |
| os.remove(os.path.join(trimmed_dir, old)) |
| except Exception: |
| pass |
|
|
| processed_videos = [] |
|
|
| try: |
| for i, base in enumerate(video_paths): |
| abs_base = os.path.abspath(base) |
| base_name = os.path.basename(abs_base) |
| video_podado = os.path.join(trimmed_dir, f"cut_{i}_{base_name}") |
|
|
| |
| probe_cmd = [ |
| "ffprobe", "-v", "error", |
| "-select_streams", "v:0", |
| "-count_frames", |
| "-show_entries", "stream=nb_read_frames", |
| "-of", "default=nokey=1:noprint_wrappers=1", |
| abs_base |
| ] |
| result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) |
| total_frames = int(result.stdout.strip()) |
| print(f"[Trim] {base_name} → total_frames={total_frames}") |
|
|
| |
| start_frame = start |
| end_frame = total_frames if i == len(video_paths) - 1 else total_frames - overlap |
|
|
| if i < len(video_paths) - 1: |
| |
| cmd_fim = ( |
| f'ffmpeg -y -hide_banner -loglevel error -i "{abs_base}" ' |
| f'-vf "trim=start_frame={start_frame}:end_frame={end_frame},setpts=PTS-STARTPTS" ' |
| f'-an "{video_podado}"' |
| ) |
| print(f"[CmdTrim] {cmd_fim}") |
| subprocess.run(cmd_fim, shell=True, check=True) |
| print(f"[TrimOK] {base_name}: corte {end_frame}/{total_frames} frames → {os.path.basename(video_podado)}") |
| processed_videos.append(video_podado) |
| else: |
| processed_videos.append(abs_base) |
| print(f"[Keep] Último vídeo sem corte: {base_name}") |
|
|
| |
| list_file_path = os.path.join(workspace_dir, "concat_list.txt") |
| if os.path.exists(list_file_path): |
| os.remove(list_file_path) |
| with open(list_file_path, "w", encoding="utf-8") as f: |
| for p in processed_videos: |
| f.write(f"file '{os.path.abspath(p)}'\n") |
|
|
| |
| cmd_concat = ( |
| f'ffmpeg -y -hide_banner -loglevel error -f concat -safe 0 ' |
| f'-i "{list_file_path}" -c copy "{output_path}"' |
| ) |
| print(f"[Concat] Executando concatenação final:\n{cmd_concat}") |
| subprocess.run(cmd_concat, shell=True, check=True) |
| print("[ConcatOK] Concatenação concluída com sucesso.") |
| return output_path |
|
|
| except subprocess.CalledProcessError as e: |
| logger.error(f"[ConcatERR] Erro FFmpeg: {e}") |
| raise VideoToolError("Falha durante concatenação de vídeos.") |
|
|
| def concatenate_videos2(self, video_paths: List[str], output_path: str, workspace_dir: str) -> str: |
| """ |
| Concatena múltiplos clipes de vídeo em um único arquivo sem re-codificar. |
| """ |
| if not video_paths: |
| raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") |
| |
| if len(video_paths) == 1: |
| shutil.copy(video_paths[0], output_path) |
| logger.info(f"Apenas um clipe fornecido. Copiado para '{output_path}'.") |
| return output_path |
| |
| list_file_path = os.path.join(workspace_dir, f"concat_list_{int(time.time())}.txt") |
| try: |
| with open(list_file_path, 'w', encoding='utf-8') as f: |
| for path in video_paths: |
| |
| f.write(f"file '{os.path.abspath(path)}'\n") |
| |
| cmd_list = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-c', 'copy', output_path] |
| logger.info(f"Concatenando {len(video_paths)} clipes para '{os.path.basename(output_path)}'...") |
| subprocess.run(cmd_list, check=True, capture_output=True, text=True) |
| logger.info("Concatenação FFmpeg bem-sucedida.") |
| return output_path |
| except subprocess.CalledProcessError as e: |
| logger.error(f"Falha ao montar o vídeo final com FFmpeg: {e.stderr}") |
| raise VideoToolError(f"Falha ao montar o vídeo final com FFmpeg.") |
| finally: |
| if os.path.exists(list_file_path): |
| os.remove(list_file_path) |
|
|
| |
| video_encode_tool_singleton = VideoEncodeTool() |