# ltx_server_clean_refactor.py — VideoService (Modular Version with Simple Overlap Chunking) # ============================================================================== # 0. CONFIGURAÇÃO DE AMBIENTE E IMPORTAÇÕES # ============================================================================== import os import sys import gc import yaml import time import json import random import shutil import warnings import tempfile import traceback import subprocess from pathlib import Path from typing import List, Dict, Optional, Tuple, Union import cv2 # --- GATILHO DE OTIMIZAÇÃO DE MEMÓRIA --- # Se "1", "true", ou "yes" (ignorando maiúsculas/minúsculas), ativa o descarregamento de modelos. # Por padrão, fica ativado para segurança em ambientes com VRAM limitada. ENABLE_MEMORY_OPTIMIZATION = os.getenv("ADUC_MEMORY_OPTIMIZATION", "1").lower() in ["1", "true", "yes"] # --- Configurações de Logging e Avisos --- warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning) from huggingface_hub import logging as hf_logging hf_logging.set_verbosity_error() # --- Importações de Bibliotecas de ML/Processamento --- import torch import torch.nn.functional as F import numpy as np from PIL import Image from einops import rearrange from huggingface_hub import hf_hub_download from safetensors import safe_open from managers.vae_manager import vae_manager_singleton from tools.video_encode_tool import video_encode_tool_singleton # --- Constantes Globais --- LTXV_DEBUG = True # Mude para False para desativar logs de debug LTXV_FRAME_LOG_EVERY = 8 DEPS_DIR = Path("/data") LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video" RESULTS_DIR = Path("/app/output") DEFAULT_FPS = 24.0 # ============================================================================== # 1. SETUP E FUNÇÕES AUXILIARES DE AMBIENTE # ============================================================================== def _run_setup_script(): """Executa o script setup.py se o repositório LTX-Video não existir.""" setup_script_path = "setup.py" if not os.path.exists(setup_script_path): print("[DEBUG] 'setup.py' não encontrado. Pulando clonagem de dependências.") return print(f"[DEBUG] Repositório não encontrado em {LTX_VIDEO_REPO_DIR}. Executando setup.py...") try: subprocess.run([sys.executable, setup_script_path], check=True, capture_output=True, text=True) print("[DEBUG] Script 'setup.py' concluído com sucesso.") except subprocess.CalledProcessError as e: print(f"[ERROR] Falha ao executar 'setup.py' (código {e.returncode}).\nOutput:\n{e.stdout}\n{e.stderr}") sys.exit(1) def add_deps_to_path(repo_path: Path): """Adiciona o diretório do repositório ao sys.path para importações locais.""" resolved_path = str(repo_path.resolve()) if resolved_path not in sys.path: sys.path.insert(0, resolved_path) if LTXV_DEBUG: print(f"[DEBUG] Adicionado ao sys.path: {resolved_path}") # --- Execução da configuração inicial --- if not LTX_VIDEO_REPO_DIR.exists(): _run_setup_script() add_deps_to_path(LTX_VIDEO_REPO_DIR) # --- Importações Dependentes do Path Adicionado --- from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier from ltx_video.models.transformers.transformer3d import Transformer3DModel from ltx_video.schedulers.rf import RectifiedFlowScheduler from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy import ltx_video.pipelines.crf_compressor as crf_compressor def create_latent_upsampler(latent_upsampler_model_path: str, device: str): latent_upsampler = LatentUpsampler.from_pretrained(latent_upsampler_model_path) latent_upsampler.to(device) latent_upsampler.eval() return latent_upsampler def create_ltx_video_pipeline( ckpt_path: str, precision: str, text_encoder_model_name_or_path: str, sampler: Optional[str] = None, device: Optional[str] = None, enhance_prompt: bool = False, prompt_enhancer_image_caption_model_name_or_path: Optional[str] = None, prompt_enhancer_llm_model_name_or_path: Optional[str] = None, ) -> LTXVideoPipeline: ckpt_path = Path(ckpt_path) assert os.path.exists( ckpt_path ), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist" with safe_open(ckpt_path, framework="pt") as f: metadata = f.metadata() config_str = metadata.get("config") configs = json.loads(config_str) allowed_inference_steps = configs.get("allowed_inference_steps", None) vae = CausalVideoAutoencoder.from_pretrained(ckpt_path) transformer = Transformer3DModel.from_pretrained(ckpt_path) # Use constructor if sampler is specified, otherwise use from_pretrained if sampler == "from_checkpoint" or not sampler: scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path) else: scheduler = RectifiedFlowScheduler( sampler=("Uniform" if sampler.lower() == "uniform" else "LinearQuadratic") ) text_encoder = T5EncoderModel.from_pretrained( text_encoder_model_name_or_path, subfolder="text_encoder" ) patchifier = SymmetricPatchifier(patch_size=1) tokenizer = T5Tokenizer.from_pretrained( text_encoder_model_name_or_path, subfolder="tokenizer" ) transformer = transformer.to(device) vae = vae.to(device) text_encoder = text_encoder.to(device) if enhance_prompt: prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained( prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True ) prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained( prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True ) prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained( prompt_enhancer_llm_model_name_or_path, torch_dtype="bfloat16", ) prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained( prompt_enhancer_llm_model_name_or_path, ) else: prompt_enhancer_image_caption_model = None prompt_enhancer_image_caption_processor = None prompt_enhancer_llm_model = None prompt_enhancer_llm_tokenizer = None vae = vae.to(torch.bfloat16) if precision == "bfloat16" and transformer.dtype != torch.bfloat16: transformer = transformer.to(torch.bfloat16) text_encoder = text_encoder.to(torch.bfloat16) # Use submodels for the pipeline submodel_dict = { "transformer": transformer, "patchifier": patchifier, "text_encoder": text_encoder, "tokenizer": tokenizer, "scheduler": scheduler, "vae": vae, "prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model, "prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor, "prompt_enhancer_llm_model": prompt_enhancer_llm_model, "prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer, "allowed_inference_steps": allowed_inference_steps, } pipeline = LTXVideoPipeline(**submodel_dict) pipeline = pipeline.to(device) return pipeline # ============================================================================== # 2. FUNÇÕES AUXILIARES DE PROCESSAMENTO # ============================================================================== def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]: """Calcula o preenchimento para centralizar uma imagem em uma nova dimensão.""" pad_h = target_h - orig_h pad_w = target_w - orig_w pad_top = pad_h // 2 pad_bottom = pad_h - pad_top pad_left = pad_w // 2 pad_right = pad_w - pad_left return (pad_left, pad_right, pad_top, pad_bottom) def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"): """Exibe informações detalhadas sobre um tensor para depuração.""" if not isinstance(tensor, torch.Tensor): print(f"\n[INFO] '{name}' não é um tensor.") return print(f"\n--- Tensor Info: {name} ---") print(f" - Shape: {tuple(tensor.shape)}") print(f" - Dtype: {tensor.dtype}") print(f" - Device: {tensor.device}") if tensor.numel() > 0: try: print(f" - Stats: Min={tensor.min().item():.4f}, Max={tensor.max().item():.4f}, Mean={tensor.mean().item():.4f}") except RuntimeError: print(" - Stats: Não foi possível calcular (ex: tensores bool).") print("-" * 30) # ============================================================================== # 3. CLASSE PRINCIPAL DO SERVIÇO DE VÍDEO # ============================================================================== class VideoService: """ Serviço encapsulado para gerar vídeos usando a pipeline LTX-Video. Gerencia o carregamento de modelos, pré-processamento, geração em múltiplos passos (baixa resolução, upscale com denoise) e pós-processamento. """ def __init__(self): """Inicializa o serviço, carregando configurações e modelos.""" t0 = time.perf_counter() print("[INFO] Inicializando VideoService...") self.device = "cuda" if torch.cuda.is_available() else "cpu" self.config = self._load_config("ltxv-13b-0.9.8-distilled-fp8.yaml") self.pipeline, self.latent_upsampler = self._load_models_from_hub() self._move_models_to_device() self.runtime_autocast_dtype = self._get_precision_dtype() vae_manager_singleton.attach_pipeline( self.pipeline, device=self.device, autocast_dtype=self.runtime_autocast_dtype ) self._tmp_dirs = set() RESULTS_DIR.mkdir(exist_ok=True) print(f"[INFO] VideoService pronto. Tempo de inicialização: {time.perf_counter()-t0:.2f}s") # -------------------------------------------------------------------------- # --- Métodos de Gerenciamento de Memória (com Gatilho) --- # -------------------------------------------------------------------------- def _set_generation_environment(self): """Prepara a GPU para tarefas de geração (Transformer).""" if not ENABLE_MEMORY_OPTIMIZATION: if not next(self.pipeline.transformer.parameters()).is_cuda: self.pipeline.transformer.to(self.device) if not next(self.pipeline.text_encoder.parameters()).is_cuda: self.pipeline.text_encoder.to(self.device) if not next(self.pipeline.vae.parameters()).is_cuda: self.pipeline.vae.to(self.device) return print("\n [VRAM Manager] Otimização ATIVA. Configurando ambiente de GERAÇÃO...") if next(self.pipeline.vae.parameters()).is_cuda: self.pipeline.vae.to('cpu') print(" - Modelo VAE movido para a CPU.") if not next(self.pipeline.transformer.parameters()).is_cuda: self.pipeline.transformer.to(self.device) print(" - Modelo Transformer carregado na GPU.") if not next(self.pipeline.text_encoder.parameters()).is_cuda: self.pipeline.text_encoder.to(self.device) print(" - Modelo Text Encoder carregado na GPU.") torch.cuda.empty_cache() print(" [VRAM Manager] Ambiente de GERAÇÃO pronto.\n") def _set_decode_environment(self): """Prepara a GPU para tarefas de decodificação (VAE).""" if not ENABLE_MEMORY_OPTIMIZATION: return print("\n [VRAM Manager] Otimização ATIVA. Configurando ambiente de DECODIFICAÇÃO...") if next(self.pipeline.transformer.parameters()).is_cuda: self.pipeline.transformer.to('cpu') print(" - Modelo Transformer movido para a CPU.") if next(self.pipeline.text_encoder.parameters()).is_cuda: self.pipeline.text_encoder.to('cpu') print(" - Modelo Text Encoder movido para a CPU.") if not next(self.pipeline.vae.parameters()).is_cuda: self.pipeline.vae.to(self.device) print(" - Modelo VAE carregado na GPU.") torch.cuda.empty_cache() print(" [VRAM Manager] Ambiente de DECODIFICAÇÃO pronto.\n") # -------------------------------------------------------------------------- # --- Métodos Públicos (API do Serviço) --- # -------------------------------------------------------------------------- def _load_image_to_tensor_with_resize_and_crop( self, image_input: Union[str, Image.Image], target_height: int = 512, target_width: int = 768, just_crop: bool = False, ) -> torch.Tensor: """Load and process an image into a tensor. Args: image_input: Either a file path (str) or a PIL Image object target_height: Desired height of output tensor target_width: Desired width of output tensor just_crop: If True, only crop the image to the target size without resizing """ if isinstance(image_input, str): image = Image.open(image_input).convert("RGB") elif isinstance(image_input, Image.Image): image = image_input else: raise ValueError("image_input must be either a file path or a PIL Image object") input_width, input_height = image.size aspect_ratio_target = target_width / target_height aspect_ratio_frame = input_width / input_height if aspect_ratio_frame > aspect_ratio_target: new_width = int(input_height * aspect_ratio_target) new_height = input_height x_start = (input_width - new_width) // 2 y_start = 0 else: new_width = input_width new_height = int(input_width / aspect_ratio_target) x_start = 0 y_start = (input_height - new_height) // 2 image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height)) if not just_crop: image = image.resize((target_width, target_height)) image = np.array(image) image = cv2.GaussianBlur(image, (3, 3), 0) frame_tensor = torch.from_numpy(image).float() frame_tensor = crf_compressor.compress(frame_tensor / 255.0) * 255.0 frame_tensor = frame_tensor.permute(2, 0, 1) frame_tensor = (frame_tensor / 127.5) - 1.0 # Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width) return frame_tensor.unsqueeze(0).unsqueeze(2) def generate_low_resolution1(self, prompt: str, negative_prompt: str, height: int, width: int, duration_secs: float, guidance_scale: float, seed: Optional[int] = None, conditioning_items: Optional[List[ConditioningItem]] = None) -> Tuple[str, str, int]: """ Gera um vídeo de baixa resolução e retorna os caminhos para o vídeo e os latentes. """ used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed) #self._seed_everething(used_seed) actual_num_frames = max(9, int(round((round(duration_secs * DEFAULT_FPS) - 1) / 8.0) * 8 + 1)) downscaled_height, downscaled_width = self._calculate_downscaled_dims(height, width) first_pass_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": downscaled_height, "width": downscaled_width, "num_frames": actual_num_frames, "frame_rate": int(DEFAULT_FPS), "generator": torch.Generator(device=self.device).manual_seed(used_seed), "output_type": "latent", "conditioning_items": conditioning_items, "guidance_scale": float(guidance_scale), **(self.config.get("first_pass", {})) } temp_dir = tempfile.mkdtemp(prefix="ltxv_low_") self._register_tmp_dir(temp_dir) try: with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): latents = self.pipeline(**first_pass_kwargs).images #pixel_tensor = vae_manager_singleton.decode(latents.clone(), decode_timestep=float(self.config.get("decode_timestep", 0.05))) #video_path = self._save_video_from_tensor(pixel_tensor, "low_res_video", used_seed, temp_dir) latents_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed) log_tensor_info(latents, "first_pass_lat" ) self._finalize() final_video_path, final_latents_path = self.generate_upscale_denoise( latents_path=latents_path, prompt=prompt, negative_prompt=negative_prompt, guidance_scale=guidance_scale, seed=used_seed ) print(f"[SUCCESS] PASSO 2 concluído. Vídeo final em: {final_video_path}") return final_video_path, final_latents_path, used_seed except Exception as e: print(f"[ERROR] Falha na geração de baixa resolução: {e}") traceback.print_exc() raise finally: self._finalize() def _prepare_conditioning_tensor(self, filepath, height, width, padding_values): print(f"[DEBUG] Carregando condicionamento: {filepath}") tensor = self._load_image_to_tensor_with_resize_and_crop(filepath, height, width) tensor = torch.nn.functional.pad(tensor, padding_values) out = tensor.to(self.device, dtype=self.runtime_autocast_dtype) if self.device == "cuda" else tensor.to(self.device) print(f"[DEBUG] Cond shape={tuple(out.shape)} dtype={out.dtype} device={out.device}") return out def generate_upscale_denoise(self, latents_path: str, prompt: str, negative_prompt: str, guidance_scale: float, seed: Optional[int] = None) -> Tuple[str, str]: """ Aplica upscale, AdaIN e Denoise em latentes de baixa resolução usando um processo de chunking. """ used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed) #seed_everything(used_seed) temp_dir = tempfile.mkdtemp(prefix="ltxv_up_") self._register_tmp_dir(temp_dir) try: latents_low = torch.load(latents_path).to(self.device) with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): upsampled_latents = self._upsample_and_filter_latents(latents_low) del latents_low; torch.cuda.empty_cache() chunks = self._split_latents_with_overlap(upsampled_latents) refined_chunks = [] for chunk in chunks: if chunk.shape[2] <= 1: continue # Pula chunks inválidos second_pass_height = chunk.shape[3] * self.pipeline.vae_scale_factor second_pass_width = chunk.shape[4] * self.pipeline.vae_scale_factor second_pass_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": second_pass_height, "width": second_pass_width, "num_frames": chunk.shape[2], "latents": chunk, "guidance_scale": float(guidance_scale), "output_type": "latent", "generator": torch.Generator(device=self.device).manual_seed(used_seed), **(self.config.get("second_pass", {})) } refined_chunk = self.pipeline(**second_pass_kwargs).images refined_chunks.append(refined_chunk) log_tensor_info(refined_chunk, "refined_chunk" ) final_latents = self._merge_chunks_with_overlap(refined_chunks) if LTXV_DEBUG: log_tensor_info(final_latents, "Latentes Upscaled/Refinados Finais") latents_path = self._save_latents_to_disk(final_latents, "latents_refined", used_seed) pixel_tensor = vae_manager_singleton.decode(final_latents, decode_timestep=float(self.config.get("decode_timestep", 0.05))) video_path = self._save_video_from_tensor(pixel_tensor, "refined_video", used_seed, temp_dir) return video_path, latents_path except Exception as e: print(f"[ERROR] Falha no processo de upscale e denoise: {e}") traceback.print_exc() raise finally: self._finalize() def generate_low_resolution( self, prompt: str, negative_prompt: str, height: int, width: int, duration_secs: float, guidance_scale: float, seed: Optional[int] = None, image_filepaths: Optional[List[str]] = None ) -> Tuple[str, str, int]: """ ETAPA 1: Gera um vídeo e latentes em resolução base a partir de um prompt e condicionamentos opcionais. """ print("[INFO] Iniciando ETAPA 1: Geração de Baixa Resolução...") # --- Configuração de Seed e Diretórios --- used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed) #seed_everything(used_seed) print(f" - Usando Seed: {used_seed}") temp_dir = tempfile.mkdtemp(prefix="ltxv_low_") self._register_tmp_dir(temp_dir) results_dir = "/app/output" os.makedirs(results_dir, exist_ok=True) # --- Cálculo de Dimensões e Frames --- actual_num_frames = int(round(duration_secs * DEFAULT_FPS)) downscaled_height, downscaled_width = self._calculate_downscaled_dims(height, width) height_padded = ((downscaled_height - 1) // 32 + 1) * 32 width_padded = ((downscaled_width - 1) // 32 + 1) * 32 padding_values = calculate_padding(downscaled_height, downscaled_width, height_padded, width_padded) conditioning_items = [] for filepath in image_filepaths: cond_tensor = self._prepare_conditioning_tensor(filepath, downscaled_height, downscaled_width, padding_values) conditioning_items.append(ConditioningItem(cond_tensor, 0, 1.0)) print(f" - Frames: {actual_num_frames}, Duração: {duration_secs}s") print(f" - Dimensões de Saída: {downscaled_height}x{downscaled_width}") # --- Execução da Pipeline --- with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): first_pass_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": downscaled_height, "width": downscaled_width, "num_frames": (actual_num_frames//8)+1, "frame_rate": int(DEFAULT_FPS), "generator": torch.Generator(device=self.device).manual_seed(used_seed), "output_type": "latent", "conditioning_items": conditioning_items, "guidance_scale": float(guidance_scale), **(self.config.get("first_pass", {})) } print(" - Enviando para a pipeline LTX...") latents = self.pipeline(**first_pass_kwargs).images print(f" - Latentes gerados com shape: {latents.shape}") #_upsample_and_filter_latents latents = self._upsample_and_filter_latents(latents) print(f" - Latentes com upscaler: {latents.shape}") # Decodifica os latentes para pixels para criar o vídeo de preview #pixel_tensor = vae_manager_singleton.decode(latents.clone(), decode_timestep=float(self.config.get("decode_timestep", 0.05))) # Salva os artefatos de saída (vídeo e tensor de latentes) #video_path = self._save_video_from_tensor(pixel_tensor, "low_res_video", used_seed, temp_dir) tensor_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed) self._finalize() final_video_path, final_latents_path = self.refine_texture_only( latents_path=tensor_path, prompt=prompt, negative_prompt=negative_prompt, guidance_scale=guidance_scale, seed=used_seed, conditioning_items=conditioning_items, ) # --- Limpeza --- self._finalize() print("[SUCCESS] ETAPA 1 Concluída.") return final_video_path, final_latents_path, used_seed def refine_texture_only( self, latents_path: str, prompt: str, negative_prompt: str, guidance_scale: float, seed: Optional[int] = None, conditioning_items: Optional[List[ConditioningItem]] = None ) -> Tuple[str, str]: """ Refina e decodifica latentes com gerenciamento explícito de modelos na GPU para máxima performance e robustez ("hot-swap"). """ print("\n======================================================================") print("====== [INFO] Iniciando ETAPA 2: Refinamento e Decodificação com Hot-Swap ======") print("======================================================================\n") temp_dir = tempfile.mkdtemp(prefix="ltxv_refine_") self._register_tmp_dir(temp_dir) used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed) # --- FASE 1: GERAÇÃO DE LATENTES (TRABALHO DO TRANSFORMER) --- print("[LOG] FASE 1: Geração de Latentes (Transformer na GPU)") self._set_generation_environment() latents_to_refine = torch.load(latents_path).to(self.device) print(f" [LOG] Latentes carregados para a GPU. Shape: {latents_to_refine.shape}") with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): refine_height = latents_to_refine.shape[3] * self.pipeline.vae_scale_factor refine_width = latents_to_refine.shape[4] * self.pipeline.vae_scale_factor second_pass_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": refine_height, "width": refine_width, "frame_rate": int(DEFAULT_FPS), "num_frames": latents_to_refine.shape[2], "latents": latents_to_refine, "guidance_scale": float(guidance_scale), "output_type": "latent", "generator": torch.Generator(device=self.device).manual_seed(used_seed), "conditioning_items": conditioning_items, **(self.config.get("second_pass", {})) } print(" [LOG] Enviando para a pipeline de refinamento (Transformer)...") final_latents = self.pipeline(**second_pass_kwargs).images print(f" [LOG] [SUCESSO] Latentes refinados. Shape: {final_latents.shape}") print(" [LOG] Geração de latentes concluída. Movendo resultado para a CPU.") final_latents_cpu = final_latents.cpu() del final_latents, latents_to_refine # --- FASE 2: DECODIFICAÇÃO EM CHUNKS (TRABALHO DO VAE) --- print("\n[LOG] FASE 2: Decodificação de Latentes (VAE na GPU)") self._set_decode_environment() total_latents = final_latents_cpu.shape[2] if total_latents <= 9: print(f" [LOG] Detecção: Vídeo curto ({total_latents} latentes). Usando decodificação direta.") pixel_tensor = vae_manager_singleton.decode( final_latents_cpu.to(self.device), decode_timestep=float(self.config.get("decode_timestep", 0.05)) ).cpu() else: print(f" [LOG] Detecção: Vídeo longo ({total_latents} latentes). Ativando modo de janela deslizante.") sobreposicao = 2 tamanho_base = (total_latents - 1) // 3 pontos_de_corte = [ (0, tamanho_base + sobreposicao), (tamanho_base - sobreposicao, (2 * tamanho_base) + sobreposicao), ((2 * tamanho_base) - sobreposicao, total_latents) ] pixel_chunks_list = [] for i, (start, end) in enumerate(pontos_de_corte): latent_chunk = final_latents_cpu[:, :, start:end, :, :] print(f" -> Decodificando Grupo {i+1} (latentes {start} a {end-1}), shape: {latent_chunk.shape}") pixel_chunk = vae_manager_singleton.decode( latent_chunk.to(self.device), decode_timestep=float(self.config.get("decode_timestep", 0.05)) ) pixel_chunks_list.append(pixel_chunk.cpu()) torch.cuda.empty_cache() print(" [LOG] Costurando os vídeos decodificados...") frames_p1 = tamanho_base * 8 parte1 = pixel_chunks_list[0][:, :, :frames_p1, :, :] descarte_inicio_p2 = sobreposicao * 8 frames_p2 = tamanho_base * 8 parte2 = pixel_chunks_list[1][:, :, descarte_inicio_p2 : descarte_inicio_p2 + frames_p2, :, :] descarte_inicio_p3 = sobreposicao * 8 parte3 = pixel_chunks_list[2][:, :, descarte_inicio_p3:, :, :] pixel_tensor = torch.cat([parte1, parte2, parte3], dim=2) print(f"\n[LOG] [SUCESSO] Tensor de pixels final montado na CPU com shape: {pixel_tensor.shape}") # --- FASE 3: SALVAMENTO E RESTAURAÇÃO DO AMBIENTE --- print("\n[LOG] FASE 3: Salvamento e Restauração do Ambiente da GPU") video_path_out = self._save_video_from_tensor(pixel_tensor, "refined_video_final", used_seed, temp_dir) latents_path_out = self._save_latents_to_disk(final_latents_cpu, "latents_refined_final", used_seed) print(" [LOG] Tarefa concluída. Restaurando ambiente de GERAÇÃO na GPU para a próxima execução...") self._set_generation_environment() print(" [LOG] Liberando tensores finais da memória da CPU.") del pixel_tensor, final_latents_cpu self._finalize() print("\n======================================================================") print("============ [SUCCESS] ETAPA 2 Concluída com Sucesso =============") print("======================================================================\n") return video_path_out, latents_path_out def encode_latents_to_mp4(self, latents_path: str, fps: int = int(DEFAULT_FPS)) -> str: """Decodifica um tensor de latentes salvo e o salva como um vídeo MP4.""" latents = torch.load(latents_path) temp_dir = tempfile.mkdtemp(prefix="ltxv_enc_") self._register_tmp_dir(temp_dir) seed = random.randint(0, 99999) # Seed apenas para nome do arquivo try: chunks = self._split_latents_with_overlap(latents) pixel_chunks = [] with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): for chunk in chunks: if chunk.shape[2] == 0: continue pixel_chunk = vae_manager_singleton.decode(chunk.to(self.device), decode_timestep=float(self.config.get("decode_timestep", 0.05))) pixel_chunks.append(pixel_chunk) final_pixel_tensor = self._merge_chunks_with_overlap(pixel_chunks) final_video_path = self._save_video_from_tensor(final_pixel_tensor, f"final_video_{seed}", seed, temp_dir, fps=fps) return final_video_path except Exception as e: print(f"[ERROR] Falha ao encodar latentes para MP4: {e}") traceback.print_exc() raise finally: self._finalize() # -------------------------------------------------------------------------- # --- Métodos Internos e Auxiliares --- # -------------------------------------------------------------------------- def _finalize(self): """Limpa a memória da GPU e os diretórios temporários.""" if LTXV_DEBUG: print("[DEBUG] Finalize: iniciando limpeza...") gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() # Limpa todos os diretórios temporários registrados for d in list(self._tmp_dirs): shutil.rmtree(d, ignore_errors=True) self._tmp_dirs.remove(d) if LTXV_DEBUG: print(f"[DEBUG] Diretório temporário removido: {d}") def _load_config(self, config_filename: str) -> Dict: """Carrega o arquivo de configuração YAML.""" config_path = LTX_VIDEO_REPO_DIR / "configs" / config_filename print(f"[INFO] Carregando configuração de: {config_path}") with open(config_path, "r") as file: return yaml.safe_load(file) def _load_models_from_hub(self): """Baixa e cria as instâncias da pipeline e do upsampler.""" t0 = time.perf_counter() LTX_REPO = "Lightricks/LTX-Video" print("[INFO] Baixando checkpoint principal...") self.config["checkpoint_path"] = hf_hub_download( repo_id=LTX_REPO, filename=self.config["checkpoint_path"], token=os.getenv("HF_TOKEN") ) print(f"[INFO] Checkpoint principal em: {self.config['checkpoint_path']}") print("[INFO] Construindo pipeline...") pipeline = create_ltx_video_pipeline( ckpt_path=self.config["checkpoint_path"], precision=self.config["precision"], text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"], sampler=self.config["sampler"], device="cpu", # Carrega em CPU primeiro enhance_prompt=False ) print("[INFO] Pipeline construída.") latent_upsampler = None if self.config.get("spatial_upscaler_model_path"): print("[INFO] Baixando upscaler espacial...") self.config["spatial_upscaler_model_path"] = hf_hub_download( repo_id=LTX_REPO, filename=self.config["spatial_upscaler_model_path"], token=os.getenv("HF_TOKEN") ) print(f"[INFO] Upscaler em: {self.config['spatial_upscaler_model_path']}") print("[INFO] Construindo latent_upsampler...") latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu") print("[INFO] Latent upsampler construído.") print(f"[INFO] Carregamento de modelos concluído em {time.perf_counter()-t0:.2f}s") return pipeline, latent_upsampler def _move_models_to_device(self): """Move os modelos carregados para o dispositivo de computação (GPU/CPU).""" print(f"[INFO] Movendo modelos para o dispositivo: {self.device}") self.pipeline.to(self.device) if self.latent_upsampler: self.latent_upsampler.to(self.device) def _get_precision_dtype(self) -> torch.dtype: """Determina o dtype para autocast com base na configuração de precisão.""" prec = str(self.config.get("precision", "")).lower() if prec in ["float8_e4m3fn", "bfloat16"]: return torch.bfloat16 elif prec == "mixed_precision": return torch.float16 return torch.float32 @torch.no_grad() def _upsample_and_filter_latents(self, latents: torch.Tensor) -> torch.Tensor: """Aplica o upsample espacial e o filtro AdaIN aos latentes.""" if not self.latent_upsampler: raise ValueError("Latent Upsampler não está carregado para a operação de upscale.") latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True) upsampled_latents_unnormalized = self.latent_upsampler(latents_unnormalized) upsampled_latents_normalized = normalize_latents(upsampled_latents_unnormalized, self.pipeline.vae, vae_per_channel_normalize=True) # Filtro AdaIN para manter consistência de cor/estilo com o vídeo de baixa resolução return adain_filter_latent(latents=upsampled_latents_normalized, reference_latents=latents) def _prepare_conditioning_tensor_from_path(self, filepath: str, height: int, width: int, padding: Tuple) -> torch.Tensor: """Carrega uma imagem, redimensiona, aplica padding e move para o dispositivo.""" tensor = self._load_image_to_tensor_with_resize_and_crop(filepath, height, width) tensor = F.pad(tensor, padding) return tensor.to(self.device, dtype=self.runtime_autocast_dtype) def _calculate_downscaled_dims(self, height: int, width: int) -> Tuple[int, int]: """Calcula as dimensões para o primeiro passo (baixa resolução).""" height_padded = ((height - 1) // 8 + 1) * 8 width_padded = ((width - 1) // 8 + 1) * 8 downscale_factor = self.config.get("downscale_factor", 0.6666666) vae_scale_factor = self.pipeline.vae_scale_factor target_w = int(width_padded * downscale_factor) downscaled_width = target_w - (target_w % vae_scale_factor) target_h = int(height_padded * downscale_factor) downscaled_height = target_h - (target_h % vae_scale_factor) return downscaled_height, downscaled_width def _split_latents_with_overlap(self, latents: torch.Tensor, overlap: int = 1) -> List[torch.Tensor]: """Divide um tensor de latentes em dois chunks com sobreposição.""" total_frames = latents.shape[2] if total_frames <= overlap: return [latents] mid_point = max(overlap, total_frames // 2) chunk1 = latents[:, :, :mid_point, :, :] # O segundo chunk começa 'overlap' frames antes para criar a sobreposição chunk2 = latents[:, :, mid_point - overlap:, :, :] return [c for c in [chunk1, chunk2] if c.shape[2] > 0] def _merge_chunks_with_overlap(self, chunks: List[torch.Tensor], overlap: int = 1) -> torch.Tensor: """Junta uma lista de chunks, removendo a sobreposição.""" if not chunks: return torch.empty(0) if len(chunks) == 1: return chunks[0] # Pega o primeiro chunk sem o frame de sobreposição final merged_list = [chunks[0][:, :, :-overlap, :, :]] # Adiciona os chunks restantes merged_list.extend(chunks[1:]) return torch.cat(merged_list, dim=2) def _save_latents_to_disk(self, latents_tensor: torch.Tensor, base_filename: str, seed: int) -> str: """Salva um tensor de latentes em um arquivo .pt.""" latents_cpu = latents_tensor.detach().to("cpu") tensor_path = RESULTS_DIR / f"{base_filename}_{seed}.pt" torch.save(latents_cpu, tensor_path) if LTXV_DEBUG: print(f"[DEBUG] Latentes salvos em: {tensor_path}") return str(tensor_path) def _save_video_from_tensor(self, pixel_tensor: torch.Tensor, base_filename: str, seed: int, temp_dir: str, fps: int = int(DEFAULT_FPS)) -> str: """Salva um tensor de pixels como um arquivo de vídeo MP4.""" temp_path = os.path.join(temp_dir, f"{base_filename}_{seed}.mp4") video_encode_tool_singleton.save_video_from_tensor(pixel_tensor, temp_path, fps=fps) final_path = RESULTS_DIR / f"{base_filename}_{seed}.mp4" shutil.move(temp_path, final_path) print(f"[INFO] Vídeo final salvo em: {final_path}") return str(final_path) def _seed_everething(self, seed: int): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) if torch.backends.mps.is_available(): torch.mps.manual_seed(seed) def _register_tmp_dir(self, dir_path: str): """Registra um diretório temporário para limpeza posterior.""" if dir_path and os.path.isdir(dir_path): self._tmp_dirs.add(dir_path) if LTXV_DEBUG: print(f"[DEBUG] Diretório temporário registrado: {dir_path}") # ============================================================================== # 4. INSTANCIAÇÃO E PONTO DE ENTRADA (Exemplo) # ============================================================================== print("Criando instância do VideoService. O carregamento do modelo começará agora...") video_generation_service = VideoService() print("Instância do VideoService pronta para uso.")