Aduc / api /ltx_server_refactored.py
caarleexx's picture
Update api/ltx_server_refactored.py
7b2197f verified
raw
history blame
41.9 kB
# ltx_server_clean_refactor.py — VideoService (Modular Version with Simple Overlap Chunking)
# ==============================================================================
# 0. CONFIGURAÇÃO DE AMBIENTE E IMPORTAÇÕES
# ==============================================================================
import os
import sys
import gc
import yaml
import time
import json
import random
import shutil
import warnings
import tempfile
import traceback
import subprocess
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
import cv2
# --- GATILHO DE OTIMIZAÇÃO DE MEMÓRIA ---
# Se "1", "true", ou "yes" (ignorando maiúsculas/minúsculas), ativa o descarregamento de modelos.
# Por padrão, fica ativado para segurança em ambientes com VRAM limitada.
ENABLE_MEMORY_OPTIMIZATION = os.getenv("ADUC_MEMORY_OPTIMIZATION", "1").lower() in ["1", "true", "yes"]
# --- Configurações de Logging e Avisos ---
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
from huggingface_hub import logging as hf_logging
hf_logging.set_verbosity_error()
# --- Importações de Bibliotecas de ML/Processamento ---
import torch
import torch.nn.functional as F
import numpy as np
from PIL import Image
from einops import rearrange
from huggingface_hub import hf_hub_download
from safetensors import safe_open
from managers.vae_manager import vae_manager_singleton
from tools.video_encode_tool import video_encode_tool_singleton
# --- Constantes Globais ---
LTXV_DEBUG = True # Mude para False para desativar logs de debug
LTXV_FRAME_LOG_EVERY = 8
DEPS_DIR = Path("/data")
LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video"
RESULTS_DIR = Path("/app/output")
DEFAULT_FPS = 24.0
# ==============================================================================
# 1. SETUP E FUNÇÕES AUXILIARES DE AMBIENTE
# ==============================================================================
def _run_setup_script():
"""Executa o script setup.py se o repositório LTX-Video não existir."""
setup_script_path = "setup.py"
if not os.path.exists(setup_script_path):
print("[DEBUG] 'setup.py' não encontrado. Pulando clonagem de dependências.")
return
print(f"[DEBUG] Repositório não encontrado em {LTX_VIDEO_REPO_DIR}. Executando setup.py...")
try:
subprocess.run([sys.executable, setup_script_path], check=True, capture_output=True, text=True)
print("[DEBUG] Script 'setup.py' concluído com sucesso.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Falha ao executar 'setup.py' (código {e.returncode}).\nOutput:\n{e.stdout}\n{e.stderr}")
sys.exit(1)
def add_deps_to_path(repo_path: Path):
"""Adiciona o diretório do repositório ao sys.path para importações locais."""
resolved_path = str(repo_path.resolve())
if resolved_path not in sys.path:
sys.path.insert(0, resolved_path)
if LTXV_DEBUG:
print(f"[DEBUG] Adicionado ao sys.path: {resolved_path}")
# --- Execução da configuração inicial ---
if not LTX_VIDEO_REPO_DIR.exists():
_run_setup_script()
add_deps_to_path(LTX_VIDEO_REPO_DIR)
# --- Importações Dependentes do Path Adicionado ---
from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents
from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent
from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline
from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer
from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder
from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier
from ltx_video.models.transformers.transformer3d import Transformer3DModel
from ltx_video.schedulers.rf import RectifiedFlowScheduler
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy
import ltx_video.pipelines.crf_compressor as crf_compressor
def create_latent_upsampler(latent_upsampler_model_path: str, device: str):
latent_upsampler = LatentUpsampler.from_pretrained(latent_upsampler_model_path)
latent_upsampler.to(device)
latent_upsampler.eval()
return latent_upsampler
def create_ltx_video_pipeline(
ckpt_path: str,
precision: str,
text_encoder_model_name_or_path: str,
sampler: Optional[str] = None,
device: Optional[str] = None,
enhance_prompt: bool = False,
prompt_enhancer_image_caption_model_name_or_path: Optional[str] = None,
prompt_enhancer_llm_model_name_or_path: Optional[str] = None,
) -> LTXVideoPipeline:
ckpt_path = Path(ckpt_path)
assert os.path.exists(
ckpt_path
), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist"
with safe_open(ckpt_path, framework="pt") as f:
metadata = f.metadata()
config_str = metadata.get("config")
configs = json.loads(config_str)
allowed_inference_steps = configs.get("allowed_inference_steps", None)
vae = CausalVideoAutoencoder.from_pretrained(ckpt_path)
transformer = Transformer3DModel.from_pretrained(ckpt_path)
# Use constructor if sampler is specified, otherwise use from_pretrained
if sampler == "from_checkpoint" or not sampler:
scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path)
else:
scheduler = RectifiedFlowScheduler(
sampler=("Uniform" if sampler.lower() == "uniform" else "LinearQuadratic")
)
text_encoder = T5EncoderModel.from_pretrained(
text_encoder_model_name_or_path, subfolder="text_encoder"
)
patchifier = SymmetricPatchifier(patch_size=1)
tokenizer = T5Tokenizer.from_pretrained(
text_encoder_model_name_or_path, subfolder="tokenizer"
)
transformer = transformer.to(device)
vae = vae.to(device)
text_encoder = text_encoder.to(device)
if enhance_prompt:
prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained(
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True
)
prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained(
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True
)
prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained(
prompt_enhancer_llm_model_name_or_path,
torch_dtype="bfloat16",
)
prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained(
prompt_enhancer_llm_model_name_or_path,
)
else:
prompt_enhancer_image_caption_model = None
prompt_enhancer_image_caption_processor = None
prompt_enhancer_llm_model = None
prompt_enhancer_llm_tokenizer = None
vae = vae.to(torch.bfloat16)
if precision == "bfloat16" and transformer.dtype != torch.bfloat16:
transformer = transformer.to(torch.bfloat16)
text_encoder = text_encoder.to(torch.bfloat16)
# Use submodels for the pipeline
submodel_dict = {
"transformer": transformer,
"patchifier": patchifier,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"scheduler": scheduler,
"vae": vae,
"prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model,
"prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor,
"prompt_enhancer_llm_model": prompt_enhancer_llm_model,
"prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer,
"allowed_inference_steps": allowed_inference_steps,
}
pipeline = LTXVideoPipeline(**submodel_dict)
pipeline = pipeline.to(device)
return pipeline
# ==============================================================================
# 2. FUNÇÕES AUXILIARES DE PROCESSAMENTO
# ==============================================================================
def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]:
"""Calcula o preenchimento para centralizar uma imagem em uma nova dimensão."""
pad_h = target_h - orig_h
pad_w = target_w - orig_w
pad_top = pad_h // 2
pad_bottom = pad_h - pad_top
pad_left = pad_w // 2
pad_right = pad_w - pad_left
return (pad_left, pad_right, pad_top, pad_bottom)
def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"):
"""Exibe informações detalhadas sobre um tensor para depuração."""
if not isinstance(tensor, torch.Tensor):
print(f"\n[INFO] '{name}' não é um tensor.")
return
print(f"\n--- Tensor Info: {name} ---")
print(f" - Shape: {tuple(tensor.shape)}")
print(f" - Dtype: {tensor.dtype}")
print(f" - Device: {tensor.device}")
if tensor.numel() > 0:
try:
print(f" - Stats: Min={tensor.min().item():.4f}, Max={tensor.max().item():.4f}, Mean={tensor.mean().item():.4f}")
except RuntimeError:
print(" - Stats: Não foi possível calcular (ex: tensores bool).")
print("-" * 30)
# ==============================================================================
# 3. CLASSE PRINCIPAL DO SERVIÇO DE VÍDEO
# ==============================================================================
class VideoService:
"""
Serviço encapsulado para gerar vídeos usando a pipeline LTX-Video.
Gerencia o carregamento de modelos, pré-processamento, geração em múltiplos
passos (baixa resolução, upscale com denoise) e pós-processamento.
"""
def __init__(self):
"""Inicializa o serviço, carregando configurações e modelos."""
t0 = time.perf_counter()
print("[INFO] Inicializando VideoService...")
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.config = self._load_config("ltxv-13b-0.9.8-distilled-fp8.yaml")
self.pipeline, self.latent_upsampler = self._load_models_from_hub()
self._move_models_to_device()
self.runtime_autocast_dtype = self._get_precision_dtype()
vae_manager_singleton.attach_pipeline(
self.pipeline,
device=self.device,
autocast_dtype=self.runtime_autocast_dtype
)
self._tmp_dirs = set()
RESULTS_DIR.mkdir(exist_ok=True)
print(f"[INFO] VideoService pronto. Tempo de inicialização: {time.perf_counter()-t0:.2f}s")
# --------------------------------------------------------------------------
# --- Métodos de Gerenciamento de Memória (com Gatilho) ---
# --------------------------------------------------------------------------
def _set_generation_environment(self):
"""Prepara a GPU para tarefas de geração (Transformer)."""
if not ENABLE_MEMORY_OPTIMIZATION:
if not next(self.pipeline.transformer.parameters()).is_cuda: self.pipeline.transformer.to(self.device)
if not next(self.pipeline.text_encoder.parameters()).is_cuda: self.pipeline.text_encoder.to(self.device)
if not next(self.pipeline.vae.parameters()).is_cuda: self.pipeline.vae.to(self.device)
return
print("\n [VRAM Manager] Otimização ATIVA. Configurando ambiente de GERAÇÃO...")
if next(self.pipeline.vae.parameters()).is_cuda:
self.pipeline.vae.to('cpu')
print(" - Modelo VAE movido para a CPU.")
if not next(self.pipeline.transformer.parameters()).is_cuda:
self.pipeline.transformer.to(self.device)
print(" - Modelo Transformer carregado na GPU.")
if not next(self.pipeline.text_encoder.parameters()).is_cuda:
self.pipeline.text_encoder.to(self.device)
print(" - Modelo Text Encoder carregado na GPU.")
torch.cuda.empty_cache()
print(" [VRAM Manager] Ambiente de GERAÇÃO pronto.\n")
def _set_decode_environment(self):
"""Prepara a GPU para tarefas de decodificação (VAE)."""
if not ENABLE_MEMORY_OPTIMIZATION:
return
print("\n [VRAM Manager] Otimização ATIVA. Configurando ambiente de DECODIFICAÇÃO...")
if next(self.pipeline.transformer.parameters()).is_cuda:
self.pipeline.transformer.to('cpu')
print(" - Modelo Transformer movido para a CPU.")
if next(self.pipeline.text_encoder.parameters()).is_cuda:
self.pipeline.text_encoder.to('cpu')
print(" - Modelo Text Encoder movido para a CPU.")
if not next(self.pipeline.vae.parameters()).is_cuda:
self.pipeline.vae.to(self.device)
print(" - Modelo VAE carregado na GPU.")
torch.cuda.empty_cache()
print(" [VRAM Manager] Ambiente de DECODIFICAÇÃO pronto.\n")
# --------------------------------------------------------------------------
# --- Métodos Públicos (API do Serviço) ---
# --------------------------------------------------------------------------
def _load_image_to_tensor_with_resize_and_crop(
self,
image_input: Union[str, Image.Image],
target_height: int = 512,
target_width: int = 768,
just_crop: bool = False,
) -> torch.Tensor:
"""Load and process an image into a tensor.
Args:
image_input: Either a file path (str) or a PIL Image object
target_height: Desired height of output tensor
target_width: Desired width of output tensor
just_crop: If True, only crop the image to the target size without resizing
"""
if isinstance(image_input, str):
image = Image.open(image_input).convert("RGB")
elif isinstance(image_input, Image.Image):
image = image_input
else:
raise ValueError("image_input must be either a file path or a PIL Image object")
input_width, input_height = image.size
aspect_ratio_target = target_width / target_height
aspect_ratio_frame = input_width / input_height
if aspect_ratio_frame > aspect_ratio_target:
new_width = int(input_height * aspect_ratio_target)
new_height = input_height
x_start = (input_width - new_width) // 2
y_start = 0
else:
new_width = input_width
new_height = int(input_width / aspect_ratio_target)
x_start = 0
y_start = (input_height - new_height) // 2
image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height))
if not just_crop:
image = image.resize((target_width, target_height))
image = np.array(image)
image = cv2.GaussianBlur(image, (3, 3), 0)
frame_tensor = torch.from_numpy(image).float()
frame_tensor = crf_compressor.compress(frame_tensor / 255.0) * 255.0
frame_tensor = frame_tensor.permute(2, 0, 1)
frame_tensor = (frame_tensor / 127.5) - 1.0
# Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width)
return frame_tensor.unsqueeze(0).unsqueeze(2)
def generate_low_resolution1(self, prompt: str, negative_prompt: str, height: int, width: int, duration_secs: float, guidance_scale: float, seed: Optional[int] = None, conditioning_items: Optional[List[ConditioningItem]] = None) -> Tuple[str, str, int]:
"""
Gera um vídeo de baixa resolução e retorna os caminhos para o vídeo e os latentes.
"""
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed)
#self._seed_everething(used_seed)
actual_num_frames = max(9, int(round((round(duration_secs * DEFAULT_FPS) - 1) / 8.0) * 8 + 1))
downscaled_height, downscaled_width = self._calculate_downscaled_dims(height, width)
first_pass_kwargs = {
"prompt": prompt, "negative_prompt": negative_prompt, "height": downscaled_height,
"width": downscaled_width, "num_frames": actual_num_frames, "frame_rate": int(DEFAULT_FPS),
"generator": torch.Generator(device=self.device).manual_seed(used_seed),
"output_type": "latent", "conditioning_items": conditioning_items,
"guidance_scale": float(guidance_scale), **(self.config.get("first_pass", {}))
}
temp_dir = tempfile.mkdtemp(prefix="ltxv_low_")
self._register_tmp_dir(temp_dir)
try:
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
latents = self.pipeline(**first_pass_kwargs).images
#pixel_tensor = vae_manager_singleton.decode(latents.clone(), decode_timestep=float(self.config.get("decode_timestep", 0.05)))
#video_path = self._save_video_from_tensor(pixel_tensor, "low_res_video", used_seed, temp_dir)
latents_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed)
log_tensor_info(latents, "first_pass_lat" )
self._finalize()
final_video_path, final_latents_path = self.generate_upscale_denoise(
latents_path=latents_path,
prompt=prompt,
negative_prompt=negative_prompt,
guidance_scale=guidance_scale,
seed=used_seed
)
print(f"[SUCCESS] PASSO 2 concluído. Vídeo final em: {final_video_path}")
return final_video_path, final_latents_path, used_seed
except Exception as e:
print(f"[ERROR] Falha na geração de baixa resolução: {e}")
traceback.print_exc()
raise
finally:
self._finalize()
def _prepare_conditioning_tensor(self, filepath, height, width, padding_values):
print(f"[DEBUG] Carregando condicionamento: {filepath}")
tensor = self._load_image_to_tensor_with_resize_and_crop(filepath, height, width)
tensor = torch.nn.functional.pad(tensor, padding_values)
out = tensor.to(self.device, dtype=self.runtime_autocast_dtype) if self.device == "cuda" else tensor.to(self.device)
print(f"[DEBUG] Cond shape={tuple(out.shape)} dtype={out.dtype} device={out.device}")
return out
def generate_upscale_denoise(self, latents_path: str, prompt: str, negative_prompt: str, guidance_scale: float, seed: Optional[int] = None) -> Tuple[str, str]:
"""
Aplica upscale, AdaIN e Denoise em latentes de baixa resolução usando um processo de chunking.
"""
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed)
#seed_everything(used_seed)
temp_dir = tempfile.mkdtemp(prefix="ltxv_up_")
self._register_tmp_dir(temp_dir)
try:
latents_low = torch.load(latents_path).to(self.device)
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
upsampled_latents = self._upsample_and_filter_latents(latents_low)
del latents_low; torch.cuda.empty_cache()
chunks = self._split_latents_with_overlap(upsampled_latents)
refined_chunks = []
for chunk in chunks:
if chunk.shape[2] <= 1: continue # Pula chunks inválidos
second_pass_height = chunk.shape[3] * self.pipeline.vae_scale_factor
second_pass_width = chunk.shape[4] * self.pipeline.vae_scale_factor
second_pass_kwargs = {
"prompt": prompt, "negative_prompt": negative_prompt, "height": second_pass_height,
"width": second_pass_width, "num_frames": chunk.shape[2], "latents": chunk,
"guidance_scale": float(guidance_scale), "output_type": "latent",
"generator": torch.Generator(device=self.device).manual_seed(used_seed),
**(self.config.get("second_pass", {}))
}
refined_chunk = self.pipeline(**second_pass_kwargs).images
refined_chunks.append(refined_chunk)
log_tensor_info(refined_chunk, "refined_chunk" )
final_latents = self._merge_chunks_with_overlap(refined_chunks)
if LTXV_DEBUG:
log_tensor_info(final_latents, "Latentes Upscaled/Refinados Finais")
latents_path = self._save_latents_to_disk(final_latents, "latents_refined", used_seed)
pixel_tensor = vae_manager_singleton.decode(final_latents, decode_timestep=float(self.config.get("decode_timestep", 0.05)))
video_path = self._save_video_from_tensor(pixel_tensor, "refined_video", used_seed, temp_dir)
return video_path, latents_path
except Exception as e:
print(f"[ERROR] Falha no processo de upscale e denoise: {e}")
traceback.print_exc()
raise
finally:
self._finalize()
def generate_low_resolution(
self,
prompt: str,
negative_prompt: str,
height: int,
width: int,
duration_secs: float,
guidance_scale: float,
seed: Optional[int] = None,
image_filepaths: Optional[List[str]] = None
) -> Tuple[str, str, int]:
"""
ETAPA 1: Gera um vídeo e latentes em resolução base a partir de um prompt e
condicionamentos opcionais.
"""
print("[INFO] Iniciando ETAPA 1: Geração de Baixa Resolução...")
# --- Configuração de Seed e Diretórios ---
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed)
#seed_everything(used_seed)
print(f" - Usando Seed: {used_seed}")
temp_dir = tempfile.mkdtemp(prefix="ltxv_low_")
self._register_tmp_dir(temp_dir)
results_dir = "/app/output"
os.makedirs(results_dir, exist_ok=True)
# --- Cálculo de Dimensões e Frames ---
actual_num_frames = int(round(duration_secs * DEFAULT_FPS))
downscaled_height, downscaled_width = self._calculate_downscaled_dims(height, width)
height_padded = ((downscaled_height - 1) // 32 + 1) * 32
width_padded = ((downscaled_width - 1) // 32 + 1) * 32
padding_values = calculate_padding(downscaled_height, downscaled_width, height_padded, width_padded)
conditioning_items = []
for filepath in image_filepaths:
cond_tensor = self._prepare_conditioning_tensor(filepath, downscaled_height, downscaled_width, padding_values)
conditioning_items.append(ConditioningItem(cond_tensor, 0, 1.0))
print(f" - Frames: {actual_num_frames}, Duração: {duration_secs}s")
print(f" - Dimensões de Saída: {downscaled_height}x{downscaled_width}")
# --- Execução da Pipeline ---
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
first_pass_kwargs = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"height": downscaled_height,
"width": downscaled_width,
"num_frames": (actual_num_frames//8)+1,
"frame_rate": int(DEFAULT_FPS),
"generator": torch.Generator(device=self.device).manual_seed(used_seed),
"output_type": "latent",
"conditioning_items": conditioning_items,
"guidance_scale": float(guidance_scale),
**(self.config.get("first_pass", {}))
}
print(" - Enviando para a pipeline LTX...")
latents = self.pipeline(**first_pass_kwargs).images
print(f" - Latentes gerados com shape: {latents.shape}")
#_upsample_and_filter_latents
latents = self._upsample_and_filter_latents(latents)
print(f" - Latentes com upscaler: {latents.shape}")
# Decodifica os latentes para pixels para criar o vídeo de preview
#pixel_tensor = vae_manager_singleton.decode(latents.clone(), decode_timestep=float(self.config.get("decode_timestep", 0.05)))
# Salva os artefatos de saída (vídeo e tensor de latentes)
#video_path = self._save_video_from_tensor(pixel_tensor, "low_res_video", used_seed, temp_dir)
tensor_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed)
self._finalize()
final_video_path, final_latents_path = self.refine_texture_only(
latents_path=tensor_path,
prompt=prompt,
negative_prompt=negative_prompt,
guidance_scale=guidance_scale,
seed=used_seed,
conditioning_items=conditioning_items,
)
# --- Limpeza ---
self._finalize()
print("[SUCCESS] ETAPA 1 Concluída.")
return final_video_path, final_latents_path, used_seed
def refine_texture_only(
self,
latents_path: str,
prompt: str,
negative_prompt: str,
guidance_scale: float,
seed: Optional[int] = None,
conditioning_items: Optional[List[ConditioningItem]] = None
) -> Tuple[str, str]:
"""
Refina e decodifica latentes com gerenciamento explícito de modelos
na GPU para máxima performance e robustez ("hot-swap").
"""
print("\n======================================================================")
print("====== [INFO] Iniciando ETAPA 2: Refinamento e Decodificação com Hot-Swap ======")
print("======================================================================\n")
temp_dir = tempfile.mkdtemp(prefix="ltxv_refine_")
self._register_tmp_dir(temp_dir)
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed)
# --- FASE 1: GERAÇÃO DE LATENTES (TRABALHO DO TRANSFORMER) ---
print("[LOG] FASE 1: Geração de Latentes (Transformer na GPU)")
self._set_generation_environment()
latents_to_refine = torch.load(latents_path).to(self.device)
print(f" [LOG] Latentes carregados para a GPU. Shape: {latents_to_refine.shape}")
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
refine_height = latents_to_refine.shape[3] * self.pipeline.vae_scale_factor
refine_width = latents_to_refine.shape[4] * self.pipeline.vae_scale_factor
second_pass_kwargs = {
"prompt": prompt, "negative_prompt": negative_prompt, "height": refine_height, "width": refine_width,
"frame_rate": int(DEFAULT_FPS), "num_frames": latents_to_refine.shape[2],
"latents": latents_to_refine, "guidance_scale": float(guidance_scale), "output_type": "latent",
"generator": torch.Generator(device=self.device).manual_seed(used_seed),
"conditioning_items": conditioning_items, **(self.config.get("second_pass", {}))
}
print(" [LOG] Enviando para a pipeline de refinamento (Transformer)...")
final_latents = self.pipeline(**second_pass_kwargs).images
print(f" [LOG] [SUCESSO] Latentes refinados. Shape: {final_latents.shape}")
print(" [LOG] Geração de latentes concluída. Movendo resultado para a CPU.")
final_latents_cpu = final_latents.cpu()
del final_latents, latents_to_refine
# --- FASE 2: DECODIFICAÇÃO EM CHUNKS (TRABALHO DO VAE) ---
print("\n[LOG] FASE 2: Decodificação de Latentes (VAE na GPU)")
self._set_decode_environment()
total_latents = final_latents_cpu.shape[2]
if total_latents <= 9:
print(f" [LOG] Detecção: Vídeo curto ({total_latents} latentes). Usando decodificação direta.")
pixel_tensor = vae_manager_singleton.decode(
final_latents_cpu.to(self.device),
decode_timestep=float(self.config.get("decode_timestep", 0.05))
).cpu()
else:
print(f" [LOG] Detecção: Vídeo longo ({total_latents} latentes). Ativando modo de janela deslizante.")
sobreposicao = 2
tamanho_base = (total_latents - 1) // 3
pontos_de_corte = [
(0, tamanho_base + sobreposicao),
(tamanho_base - sobreposicao, (2 * tamanho_base) + sobreposicao),
((2 * tamanho_base) - sobreposicao, total_latents)
]
pixel_chunks_list = []
for i, (start, end) in enumerate(pontos_de_corte):
latent_chunk = final_latents_cpu[:, :, start:end, :, :]
print(f" -> Decodificando Grupo {i+1} (latentes {start} a {end-1}), shape: {latent_chunk.shape}")
pixel_chunk = vae_manager_singleton.decode(
latent_chunk.to(self.device),
decode_timestep=float(self.config.get("decode_timestep", 0.05))
)
pixel_chunks_list.append(pixel_chunk.cpu())
torch.cuda.empty_cache()
print(" [LOG] Costurando os vídeos decodificados...")
frames_p1 = tamanho_base * 8
parte1 = pixel_chunks_list[0][:, :, :frames_p1, :, :]
descarte_inicio_p2 = sobreposicao * 8
frames_p2 = tamanho_base * 8
parte2 = pixel_chunks_list[1][:, :, descarte_inicio_p2 : descarte_inicio_p2 + frames_p2, :, :]
descarte_inicio_p3 = sobreposicao * 8
parte3 = pixel_chunks_list[2][:, :, descarte_inicio_p3:, :, :]
pixel_tensor = torch.cat([parte1, parte2, parte3], dim=2)
print(f"\n[LOG] [SUCESSO] Tensor de pixels final montado na CPU com shape: {pixel_tensor.shape}")
# --- FASE 3: SALVAMENTO E RESTAURAÇÃO DO AMBIENTE ---
print("\n[LOG] FASE 3: Salvamento e Restauração do Ambiente da GPU")
video_path_out = self._save_video_from_tensor(pixel_tensor, "refined_video_final", used_seed, temp_dir)
latents_path_out = self._save_latents_to_disk(final_latents_cpu, "latents_refined_final", used_seed)
print(" [LOG] Tarefa concluída. Restaurando ambiente de GERAÇÃO na GPU para a próxima execução...")
self._set_generation_environment()
print(" [LOG] Liberando tensores finais da memória da CPU.")
del pixel_tensor, final_latents_cpu
self._finalize()
print("\n======================================================================")
print("============ [SUCCESS] ETAPA 2 Concluída com Sucesso =============")
print("======================================================================\n")
return video_path_out, latents_path_out
def encode_latents_to_mp4(self, latents_path: str, fps: int = int(DEFAULT_FPS)) -> str:
"""Decodifica um tensor de latentes salvo e o salva como um vídeo MP4."""
latents = torch.load(latents_path)
temp_dir = tempfile.mkdtemp(prefix="ltxv_enc_")
self._register_tmp_dir(temp_dir)
seed = random.randint(0, 99999) # Seed apenas para nome do arquivo
try:
chunks = self._split_latents_with_overlap(latents)
pixel_chunks = []
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
for chunk in chunks:
if chunk.shape[2] == 0: continue
pixel_chunk = vae_manager_singleton.decode(chunk.to(self.device), decode_timestep=float(self.config.get("decode_timestep", 0.05)))
pixel_chunks.append(pixel_chunk)
final_pixel_tensor = self._merge_chunks_with_overlap(pixel_chunks)
final_video_path = self._save_video_from_tensor(final_pixel_tensor, f"final_video_{seed}", seed, temp_dir, fps=fps)
return final_video_path
except Exception as e:
print(f"[ERROR] Falha ao encodar latentes para MP4: {e}")
traceback.print_exc()
raise
finally:
self._finalize()
# --------------------------------------------------------------------------
# --- Métodos Internos e Auxiliares ---
# --------------------------------------------------------------------------
def _finalize(self):
"""Limpa a memória da GPU e os diretórios temporários."""
if LTXV_DEBUG:
print("[DEBUG] Finalize: iniciando limpeza...")
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
# Limpa todos os diretórios temporários registrados
for d in list(self._tmp_dirs):
shutil.rmtree(d, ignore_errors=True)
self._tmp_dirs.remove(d)
if LTXV_DEBUG:
print(f"[DEBUG] Diretório temporário removido: {d}")
def _load_config(self, config_filename: str) -> Dict:
"""Carrega o arquivo de configuração YAML."""
config_path = LTX_VIDEO_REPO_DIR / "configs" / config_filename
print(f"[INFO] Carregando configuração de: {config_path}")
with open(config_path, "r") as file:
return yaml.safe_load(file)
def _load_models_from_hub(self):
"""Baixa e cria as instâncias da pipeline e do upsampler."""
t0 = time.perf_counter()
LTX_REPO = "Lightricks/LTX-Video"
print("[INFO] Baixando checkpoint principal...")
self.config["checkpoint_path"] = hf_hub_download(
repo_id=LTX_REPO, filename=self.config["checkpoint_path"],
token=os.getenv("HF_TOKEN")
)
print(f"[INFO] Checkpoint principal em: {self.config['checkpoint_path']}")
print("[INFO] Construindo pipeline...")
pipeline = create_ltx_video_pipeline(
ckpt_path=self.config["checkpoint_path"],
precision=self.config["precision"],
text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"],
sampler=self.config["sampler"],
device="cpu", # Carrega em CPU primeiro
enhance_prompt=False
)
print("[INFO] Pipeline construída.")
latent_upsampler = None
if self.config.get("spatial_upscaler_model_path"):
print("[INFO] Baixando upscaler espacial...")
self.config["spatial_upscaler_model_path"] = hf_hub_download(
repo_id=LTX_REPO, filename=self.config["spatial_upscaler_model_path"],
token=os.getenv("HF_TOKEN")
)
print(f"[INFO] Upscaler em: {self.config['spatial_upscaler_model_path']}")
print("[INFO] Construindo latent_upsampler...")
latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu")
print("[INFO] Latent upsampler construído.")
print(f"[INFO] Carregamento de modelos concluído em {time.perf_counter()-t0:.2f}s")
return pipeline, latent_upsampler
def _move_models_to_device(self):
"""Move os modelos carregados para o dispositivo de computação (GPU/CPU)."""
print(f"[INFO] Movendo modelos para o dispositivo: {self.device}")
self.pipeline.to(self.device)
if self.latent_upsampler:
self.latent_upsampler.to(self.device)
def _get_precision_dtype(self) -> torch.dtype:
"""Determina o dtype para autocast com base na configuração de precisão."""
prec = str(self.config.get("precision", "")).lower()
if prec in ["float8_e4m3fn", "bfloat16"]:
return torch.bfloat16
elif prec == "mixed_precision":
return torch.float16
return torch.float32
@torch.no_grad()
def _upsample_and_filter_latents(self, latents: torch.Tensor) -> torch.Tensor:
"""Aplica o upsample espacial e o filtro AdaIN aos latentes."""
if not self.latent_upsampler:
raise ValueError("Latent Upsampler não está carregado para a operação de upscale.")
latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True)
upsampled_latents_unnormalized = self.latent_upsampler(latents_unnormalized)
upsampled_latents_normalized = normalize_latents(upsampled_latents_unnormalized, self.pipeline.vae, vae_per_channel_normalize=True)
# Filtro AdaIN para manter consistência de cor/estilo com o vídeo de baixa resolução
return adain_filter_latent(latents=upsampled_latents_normalized, reference_latents=latents)
def _prepare_conditioning_tensor_from_path(self, filepath: str, height: int, width: int, padding: Tuple) -> torch.Tensor:
"""Carrega uma imagem, redimensiona, aplica padding e move para o dispositivo."""
tensor = self._load_image_to_tensor_with_resize_and_crop(filepath, height, width)
tensor = F.pad(tensor, padding)
return tensor.to(self.device, dtype=self.runtime_autocast_dtype)
def _calculate_downscaled_dims(self, height: int, width: int) -> Tuple[int, int]:
"""Calcula as dimensões para o primeiro passo (baixa resolução)."""
height_padded = ((height - 1) // 8 + 1) * 8
width_padded = ((width - 1) // 8 + 1) * 8
downscale_factor = self.config.get("downscale_factor", 0.6666666)
vae_scale_factor = self.pipeline.vae_scale_factor
target_w = int(width_padded * downscale_factor)
downscaled_width = target_w - (target_w % vae_scale_factor)
target_h = int(height_padded * downscale_factor)
downscaled_height = target_h - (target_h % vae_scale_factor)
return downscaled_height, downscaled_width
def _split_latents_with_overlap(self, latents: torch.Tensor, overlap: int = 1) -> List[torch.Tensor]:
"""Divide um tensor de latentes em dois chunks com sobreposição."""
total_frames = latents.shape[2]
if total_frames <= overlap:
return [latents]
mid_point = max(overlap, total_frames // 2)
chunk1 = latents[:, :, :mid_point, :, :]
# O segundo chunk começa 'overlap' frames antes para criar a sobreposição
chunk2 = latents[:, :, mid_point - overlap:, :, :]
return [c for c in [chunk1, chunk2] if c.shape[2] > 0]
def _merge_chunks_with_overlap(self, chunks: List[torch.Tensor], overlap: int = 1) -> torch.Tensor:
"""Junta uma lista de chunks, removendo a sobreposição."""
if not chunks:
return torch.empty(0)
if len(chunks) == 1:
return chunks[0]
# Pega o primeiro chunk sem o frame de sobreposição final
merged_list = [chunks[0][:, :, :-overlap, :, :]]
# Adiciona os chunks restantes
merged_list.extend(chunks[1:])
return torch.cat(merged_list, dim=2)
def _save_latents_to_disk(self, latents_tensor: torch.Tensor, base_filename: str, seed: int) -> str:
"""Salva um tensor de latentes em um arquivo .pt."""
latents_cpu = latents_tensor.detach().to("cpu")
tensor_path = RESULTS_DIR / f"{base_filename}_{seed}.pt"
torch.save(latents_cpu, tensor_path)
if LTXV_DEBUG:
print(f"[DEBUG] Latentes salvos em: {tensor_path}")
return str(tensor_path)
def _save_video_from_tensor(self, pixel_tensor: torch.Tensor, base_filename: str, seed: int, temp_dir: str, fps: int = int(DEFAULT_FPS)) -> str:
"""Salva um tensor de pixels como um arquivo de vídeo MP4."""
temp_path = os.path.join(temp_dir, f"{base_filename}_{seed}.mp4")
video_encode_tool_singleton.save_video_from_tensor(pixel_tensor, temp_path, fps=fps)
final_path = RESULTS_DIR / f"{base_filename}_{seed}.mp4"
shutil.move(temp_path, final_path)
print(f"[INFO] Vídeo final salvo em: {final_path}")
return str(final_path)
def _seed_everething(self, seed: int):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
if torch.backends.mps.is_available():
torch.mps.manual_seed(seed)
def _register_tmp_dir(self, dir_path: str):
"""Registra um diretório temporário para limpeza posterior."""
if dir_path and os.path.isdir(dir_path):
self._tmp_dirs.add(dir_path)
if LTXV_DEBUG:
print(f"[DEBUG] Diretório temporário registrado: {dir_path}")
# ==============================================================================
# 4. INSTANCIAÇÃO E PONTO DE ENTRADA (Exemplo)
# ==============================================================================
print("Criando instância do VideoService. O carregamento do modelo começará agora...")
video_generation_service = VideoService()
print("Instância do VideoService pronta para uso.")