#15초 제한 설정되어있음/ 해상도 x2 사용금지(화질저하) import os import subprocess import sys # Disable torch.compile / dynamo before any torch import os.environ["TORCH_COMPILE_DISABLE"] = "1" os.environ["TORCHDYNAMO_DISABLE"] = "1" # Install xformers for memory-efficient attention subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False) # Install video preprocessing dependencies subprocess.run([sys.executable, "-m", "pip", "install", "dwpose", "onnxruntime-gpu", "imageio[ffmpeg]", "scikit-image", "opencv-python-headless", "decord", "num2words"], check=False) # Ensure num2words is installed (required by SmolVLMProcessor) subprocess.run([sys.executable, "-m", "pip", "install", "num2words"], check=True) # Reinstall torchaudio to match the torch CUDA version on this space. # controlnet_aux or other deps can pull in a CPU-only torchaudio that conflicts # with the pre-installed CUDA torch, causing "undefined symbol" errors. _tv = subprocess.run([sys.executable, "-c", "import torch; print(torch.__version__)"], capture_output=True, text=True) if _tv.returncode == 0: _full_ver = _tv.stdout.strip() # Extract CUDA suffix if present (e.g. "2.7.0+cu124" -> "cu124") _cuda_suffix = _full_ver.split("+")[-1] if "+" in _full_ver else "cu124" _base_ver = _full_ver.split("+")[0] print(f"Detected torch {_full_ver}, reinstalling matching torchaudio...") subprocess.run([ sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", f"torchaudio=={_base_ver}", "--index-url", f"https://download.pytorch.org/whl/{_cuda_suffix}", ], check=False) # Clone LTX-2 repo and install packages LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") LTX_COMPATIBLE_COMMIT = "ae855f8" # Pin to 2026-03-11 (compatible API) if not os.path.exists(LTX_REPO_DIR): print(f"Cloning {LTX_REPO_URL} at commit {LTX_COMPATIBLE_COMMIT}...") subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True) subprocess.run(["git", "-C", LTX_REPO_DIR, "checkout", LTX_COMPATIBLE_COMMIT], check=True) print("Installing ltx-core and ltx-pipelines from cloned repo...") subprocess.run( [sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")], check=True, ) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) import logging import random import tempfile from pathlib import Path import torch torch._dynamo.config.suppress_errors = True torch._dynamo.config.disable = True import spaces import gradio as gr import numpy as np from huggingface_hub import hf_hub_download, snapshot_download from safetensors import safe_open from ltx_core.components.diffusion_steps import EulerDiffusionStep from ltx_core.components.noisers import GaussianNoiser from ltx_core.conditioning import ( ConditioningItem, ConditioningItemAttentionStrengthWrapper, VideoConditionByReferenceLatent, ) from ltx_core.loader import LoraPathStrengthAndSDOps, LTXV_LORA_COMFY_RENAMING_MAP from ltx_core.model.audio_vae import decode_audio as vae_decode_audio from ltx_core.model.audio_vae import encode_audio as vae_encode_audio from ltx_core.model.upsampler import upsample_video from ltx_core.model.video_vae import TilingConfig, VideoEncoder, get_video_chunks_number from ltx_core.model.video_vae import decode_video as vae_decode_video from ltx_core.quantization import QuantizationPolicy from ltx_core.types import Audio, AudioLatentShape, LatentState, VideoLatentShape, VideoPixelShape from ltx_pipelines.utils import ModelLedger, euler_denoising_loop from ltx_pipelines.utils.args import ImageConditioningInput from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES from ltx_pipelines.utils.helpers import ( assert_resolution, cleanup_memory, combined_image_conditionings, denoise_audio_video, denoise_video_only, encode_prompts, get_device, simple_denoising_func, ) from ltx_pipelines.utils.media_io import ( decode_audio_from_file, encode_video, load_video_conditioning, ) from ltx_pipelines.utils.types import PipelineComponents # Force-patch xformers attention into the LTX attention module. from ltx_core.model.transformer import attention as _attn_mod print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") try: from xformers.ops import memory_efficient_attention as _mea _attn_mod.memory_efficient_attention = _mea print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") except Exception as e: print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}") logging.getLogger().setLevel(logging.INFO) # ───────────────────────────────────────────────────────────────────────────── # Video Preprocessing: Strip appearance, keep structure # ───────────────────────────────────────────────────────────────────────────── import imageio import cv2 from PIL import Image from dwpose import DwposeDetector _pose_processor = None _depth_processor = None def _get_pose_processor(): global _pose_processor if _pose_processor is None: _pose_processor = DwposeDetector.from_pretrained_default() print("[Preprocess] DWPose processor loaded") return _pose_processor def _get_depth_processor(): """Placeholder — uses simple Laplacian edge-based depth approximation via OpenCV.""" global _depth_processor if _depth_processor is None: _depth_processor = "cv2" # sentinel — we use cv2 directly print("[Preprocess] CV2-based depth processor loaded") return _depth_processor def load_video_frames(video_path: str) -> list[np.ndarray]: """Load video frames as list of HWC uint8 numpy arrays.""" frames = [] with imageio.get_reader(video_path) as reader: for frame in reader: frames.append(frame) return frames def write_video_mp4(frames_float_01: list[np.ndarray], fps: float, out_path: str) -> str: """Write float [0,1] frames to mp4.""" frames_uint8 = [(f * 255).astype(np.uint8) for f in frames_float_01] with imageio.get_writer(out_path, fps=fps, macro_block_size=1) as writer: for fr in frames_uint8: writer.append_data(fr) return out_path def extract_first_frame(video_path: str) -> str: """Extract first frame as a temp PNG file, return path.""" frames = load_video_frames(video_path) if not frames: raise ValueError("No frames in video") out_path = tempfile.mktemp(suffix=".png") Image.fromarray(frames[0]).save(out_path) return out_path def preprocess_video_pose(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]: """Extract DWPose skeletons from each frame. Returns float [0,1] frames. NOTE: We invert the pose image (white background, dark skeleton) so that the Union Control model does not interpret the predominantly-black canvas as a "dark lighting" cue. The control signal (skeleton structure) is preserved; only the brightness polarity changes. """ processor = _get_pose_processor() result = [] for frame in frames: pil = Image.fromarray(frame.astype(np.uint8)).convert("RGB") pose_img = processor(pil, include_body=True, include_hand=True, include_face=True) if not isinstance(pose_img, Image.Image): pose_img = Image.fromarray(np.array(pose_img).astype(np.uint8)) pose_img = pose_img.convert("RGB").resize((width, height), Image.BILINEAR) arr = np.array(pose_img).astype(np.float32) / 255.0 # Invert: white bg + dark skeleton — prevents model from reading # the black canvas as a lighting condition arr = 1.0 - arr result.append(arr) return result def preprocess_video_canny(frames: list[np.ndarray], width: int, height: int, low_threshold: int = 50, high_threshold: int = 100) -> list[np.ndarray]: """Extract Canny edges from each frame. Returns float [0,1] frames.""" result = [] for frame in frames: # Resize first resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY) edges = cv2.Canny(gray, low_threshold, high_threshold) # Convert single-channel to 3-channel edges_3ch = np.stack([edges, edges, edges], axis=-1) result.append(edges_3ch.astype(np.float32) / 255.0) return result def preprocess_video_depth(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]: """Estimate depth-like maps from each frame using Laplacian gradient magnitude. This is a fast approximation — for true depth, use MiDaS externally.""" result = [] for frame in frames: resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY).astype(np.float32) # Laplacian gives edge/gradient info that approximates depth discontinuities lap = np.abs(cv2.Laplacian(gray, cv2.CV_32F, ksize=5)) # Normalize to [0, 1] lap = lap / (lap.max() + 1e-8) depth_3ch = np.stack([lap, lap, lap], axis=-1) result.append(depth_3ch) return result def preprocess_conditioning_video( video_path: str, mode: str, width: int, height: int, num_frames: int, fps: float, ) -> tuple[str, str]: """ Preprocess a video for conditioning. Strips appearance, keeps structure. Returns: (conditioning_mp4_path, first_frame_png_path) """ frames = load_video_frames(video_path) if not frames: raise ValueError("No frames decoded from video") # Trim to num_frames frames = frames[:num_frames] # Save first frame (original appearance) for image conditioning first_png = tempfile.mktemp(suffix=".png") Image.fromarray(frames[0]).save(first_png) # Process based on mode if mode == "Pose (DWPose)": processed = preprocess_video_pose(frames, width, height) elif mode == "Canny Edge": processed = preprocess_video_canny(frames, width, height) elif mode == "Depth (Laplacian)": processed = preprocess_video_depth(frames, width, height) else: # "Raw" mode — no preprocessing processed = [f.astype(np.float32) / 255.0 for f in frames] cond_mp4 = tempfile.mktemp(suffix=".mp4") write_video_mp4(processed, fps=fps, out_path=cond_mp4) return cond_mp4, first_png # ───────────────────────────────────────────────────────────────────────────── # Helper: read reference downscale factor from IC-LoRA metadata # ───────────────────────────────────────────────────────────────────────────── def _read_lora_reference_downscale_factor(lora_path: str) -> int: try: with safe_open(lora_path, framework="pt") as f: metadata = f.metadata() or {} return int(metadata.get("reference_downscale_factor", 1)) except Exception as e: logging.warning(f"Failed to read metadata from LoRA file '{lora_path}': {e}") return 1 # ───────────────────────────────────────────────────────────────────────────── # Unified Pipeline: Distilled + Audio + IC-LoRA Video-to-Video # ───────────────────────────────────────────────────────────────────────────── class LTX23UnifiedPipeline: """ Unified LTX-2.3 pipeline supporting all generation modes: • Text-to-Video • Image-to-Video (first-frame conditioning) • Audio-to-Video (lip-sync / BGM conditioning with external audio) • Video-to-Video (IC-LoRA reference video conditioning) • Any combination of the above Architecture: - stage_1_model_ledger: transformer WITH IC-LoRA fused (used for Stage 1) - stage_2_model_ledger: transformer WITHOUT IC-LoRA (used for Stage 2 upsampling) - When no IC-LoRA is provided, both stages use the same base model. """ def __init__( self, distilled_checkpoint_path: str, spatial_upsampler_path: str, gemma_root: str, ic_loras: list[LoraPathStrengthAndSDOps] | None = None, device: torch.device | None = None, quantization: QuantizationPolicy | None = None, reference_downscale_factor: int | None = None, ): self.device = device or get_device() self.dtype = torch.bfloat16 ic_loras = ic_loras or [] self.has_ic_lora = len(ic_loras) > 0 # Stage 1: transformer with IC-LoRA (if provided) self.stage_1_model_ledger = ModelLedger( dtype=self.dtype, device=self.device, checkpoint_path=distilled_checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root_path=gemma_root, loras=ic_loras, quantization=quantization, ) if self.has_ic_lora: # Stage 2 needs a separate transformer WITHOUT IC-LoRA self.stage_2_model_ledger = ModelLedger( dtype=self.dtype, device=self.device, checkpoint_path=distilled_checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root_path=gemma_root, loras=[], quantization=quantization, ) else: # No IC-LoRA: share a single ledger for both stages (saves ~half VRAM) self.stage_2_model_ledger = self.stage_1_model_ledger self.pipeline_components = PipelineComponents( dtype=self.dtype, device=self.device, ) # Reference downscale factor: explicit value takes priority, # otherwise read from IC-LoRA metadata, otherwise default to 1. if reference_downscale_factor is not None: self.reference_downscale_factor = reference_downscale_factor else: self.reference_downscale_factor = 1 for lora in ic_loras: scale = _read_lora_reference_downscale_factor(lora.path) if scale != 1: if self.reference_downscale_factor not in (1, scale): raise ValueError( f"Conflicting reference_downscale_factor: " f"already {self.reference_downscale_factor}, got {scale}" ) self.reference_downscale_factor = scale logging.info(f"[Pipeline] reference_downscale_factor={self.reference_downscale_factor}") # ── Video reference conditioning (from ICLoraPipeline) ─────────────── def _create_ic_conditionings( self, video_conditioning: list[tuple[str, float]], height: int, width: int, num_frames: int, video_encoder: VideoEncoder, conditioning_strength: float = 1.0, ) -> list[ConditioningItem]: """Create IC-LoRA video reference conditioning items.""" conditionings: list[ConditioningItem] = [] scale = self.reference_downscale_factor ref_height = height // scale ref_width = width // scale for video_path, strength in video_conditioning: video = load_video_conditioning( video_path=video_path, height=ref_height, width=ref_width, frame_cap=num_frames, dtype=self.dtype, device=self.device, ) encoded_video = video_encoder(video) cond = VideoConditionByReferenceLatent( latent=encoded_video, downscale_factor=scale, strength=strength, ) if conditioning_strength < 1.0: cond = ConditioningItemAttentionStrengthWrapper( cond, attention_mask=conditioning_strength ) conditionings.append(cond) if conditionings: logging.info(f"[IC-LoRA] Added {len(conditionings)} video conditioning(s)") return conditionings # ── Main generation entry point ────────────────────────────────────── def __call__( self, prompt: str, seed: int, height: int, width: int, num_frames: int, frame_rate: float, images: list[ImageConditioningInput], audio_path: str | None = None, video_conditioning: list[tuple[str, float]] | None = None, tiling_config: TilingConfig | None = None, enhance_prompt: bool = False, conditioning_strength: float = 1.0, ): """ Generate video with any combination of conditioning. Args: audio_path: Path to external audio file for lipsync/BGM conditioning. video_conditioning: List of (path, strength) tuples for IC-LoRA V2V. conditioning_strength: Scale for IC-LoRA attention influence [0, 1]. Returns: Tuple of (decoded_video_iterator, Audio). """ assert_resolution(height=height, width=width, is_two_stage=True) prompt += " synchronized lipsync" # Ensure the prompt includes lighting context to prevent dark outputs. # When the prompt is minimal, the model can inherit "darkness" from the # pose conditioning video's latent. Adding explicit brightness cues # counteracts this. _lighting_keywords = ["bright", "light", "lit", "illuminat", "sunny", "daylight", "indoor lighting"] if not any(kw in prompt.lower() for kw in _lighting_keywords): prompt += ", well-lit, natural lighting" has_audio = audio_path is not None has_video_cond = bool(video_conditioning) generator = torch.Generator(device=self.device).manual_seed(seed) noiser = GaussianNoiser(generator=generator) stepper = EulerDiffusionStep() dtype = torch.bfloat16 # ── Encode text prompt ─────────────────────────────────────────── # Use stage_1 ledger for prompt encoding (has text encoder) (ctx_p,) = encode_prompts( [prompt], self.stage_1_model_ledger, enhance_first_prompt=enhance_prompt, enhance_prompt_image=images[0].path if len(images) > 0 else None, ) video_context, audio_context = ctx_p.video_encoding, ctx_p.audio_encoding # ── Encode external audio (if provided) ───────────────────────── encoded_audio_latent = None decoded_audio_for_output = None if has_audio: video_duration = num_frames / frame_rate decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration) if decoded_audio is None: raise ValueError(f"Could not extract audio stream from {audio_path}") encoded_audio_latent = vae_encode_audio( decoded_audio, self.stage_1_model_ledger.audio_encoder() ) audio_shape = AudioLatentShape.from_duration( batch=1, duration=video_duration, channels=8, mel_bins=16 ) expected_frames = audio_shape.frames actual_frames = encoded_audio_latent.shape[2] if actual_frames > expected_frames: encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :] elif actual_frames < expected_frames: pad = torch.zeros( encoded_audio_latent.shape[0], encoded_audio_latent.shape[1], expected_frames - actual_frames, encoded_audio_latent.shape[3], device=encoded_audio_latent.device, dtype=encoded_audio_latent.dtype, ) encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2) decoded_audio_for_output = Audio( waveform=decoded_audio.waveform.squeeze(0), sampling_rate=decoded_audio.sampling_rate, ) # ── Build conditionings for Stage 1 ────────────────────────────── # Use stage_1 video encoder (has IC-LoRA context) video_encoder = self.stage_1_model_ledger.video_encoder() stage_1_output_shape = VideoPixelShape( batch=1, frames=num_frames, width=width // 2, height=height // 2, fps=frame_rate, ) # Image conditionings stage_1_conditionings = combined_image_conditionings( images=images, height=stage_1_output_shape.height, width=stage_1_output_shape.width, video_encoder=video_encoder, dtype=dtype, device=self.device, ) # IC-LoRA video reference conditionings if has_video_cond: ic_conds = self._create_ic_conditionings( video_conditioning=video_conditioning, height=stage_1_output_shape.height, width=stage_1_output_shape.width, num_frames=num_frames, video_encoder=video_encoder, conditioning_strength=conditioning_strength, ) stage_1_conditionings.extend(ic_conds) # ── Stage 1: Low-res generation ────────────────────────────────── transformer = self.stage_1_model_ledger.transformer() stage_1_sigmas = torch.Tensor(DISTILLED_SIGMA_VALUES).to(self.device) def denoising_loop(sigmas, video_state, audio_state, stepper): return euler_denoising_loop( sigmas=sigmas, video_state=video_state, audio_state=audio_state, stepper=stepper, denoise_fn=simple_denoising_func( video_context=video_context, audio_context=audio_context, transformer=transformer, ), ) if has_audio: # Audio mode: denoise video only, use external audio latent video_state = denoise_video_only( output_shape=stage_1_output_shape, conditionings=stage_1_conditionings, noiser=noiser, sigmas=stage_1_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop, components=self.pipeline_components, dtype=dtype, device=self.device, initial_audio_latent=encoded_audio_latent, ) audio_state = None # we'll use the original audio for output else: # Standard / IC-only mode: denoise both audio and video video_state, audio_state = denoise_audio_video( output_shape=stage_1_output_shape, conditionings=stage_1_conditionings, noiser=noiser, sigmas=stage_1_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop, components=self.pipeline_components, dtype=dtype, device=self.device, ) torch.cuda.synchronize() cleanup_memory() # ── Stage 2: Upsample + Refine ────────────────────────────────── upscaled_video_latent = upsample_video( latent=video_state.latent[:1], video_encoder=video_encoder, upsampler=self.stage_2_model_ledger.spatial_upsampler(), ) torch.cuda.synchronize() cleanup_memory() # Stage 2 uses the transformer WITHOUT IC-LoRA transformer_s2 = self.stage_2_model_ledger.transformer() stage_2_sigmas = torch.Tensor(STAGE_2_DISTILLED_SIGMA_VALUES).to(self.device) def denoising_loop_s2(sigmas, video_state, audio_state, stepper): return euler_denoising_loop( sigmas=sigmas, video_state=video_state, audio_state=audio_state, stepper=stepper, denoise_fn=simple_denoising_func( video_context=video_context, audio_context=audio_context, transformer=transformer_s2, ), ) stage_2_output_shape = VideoPixelShape( batch=1, frames=num_frames, width=width, height=height, fps=frame_rate, ) stage_2_conditionings = combined_image_conditionings( images=images, height=stage_2_output_shape.height, width=stage_2_output_shape.width, video_encoder=video_encoder, dtype=dtype, device=self.device, ) if has_audio: video_state = denoise_video_only( output_shape=stage_2_output_shape, conditionings=stage_2_conditionings, noiser=noiser, sigmas=stage_2_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop_s2, components=self.pipeline_components, dtype=dtype, device=self.device, noise_scale=stage_2_sigmas[0], initial_video_latent=upscaled_video_latent, initial_audio_latent=encoded_audio_latent, ) audio_state = None else: video_state, audio_state = denoise_audio_video( output_shape=stage_2_output_shape, conditionings=stage_2_conditionings, noiser=noiser, sigmas=stage_2_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop_s2, components=self.pipeline_components, dtype=dtype, device=self.device, noise_scale=stage_2_sigmas[0], initial_video_latent=upscaled_video_latent, initial_audio_latent=audio_state.latent, ) torch.cuda.synchronize() del transformer, transformer_s2, video_encoder cleanup_memory() # ── Decode ─────────────────────────────────────────────────────── decoded_video = vae_decode_video( video_state.latent, self.stage_2_model_ledger.video_decoder(), tiling_config, generator, ) if has_audio: output_audio = decoded_audio_for_output else: output_audio = vae_decode_audio( audio_state.latent, self.stage_2_model_ledger.audio_decoder(), self.stage_2_model_ledger.vocoder(), ) return decoded_video, output_audio # ───────────────────────────────────────────────────────────────────────────── # Constants # ───────────────────────────────────────────────────────────────────────────── MAX_SEED = np.iinfo(np.int32).max DEFAULT_PROMPT = ( "An astronaut hatches from a fragile egg on the surface of the Moon, " "the shell cracking and peeling apart in gentle low-gravity motion." ) DEFAULT_FRAME_RATE = 24.0 RESOLUTIONS = { "high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)}, "low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)}, } # Available IC-LoRA models IC_LORA_OPTIONS = { "Union Control (Depth + Edge)": { "repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control", "filename": "ltx-2.3-22b-ic-lora-union-control-ref0.5.safetensors", }, "Motion Track Control": { "repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Motion-Track-Control", "filename": "ltx-2.3-22b-ic-lora-motion-track-control-ref0.5.safetensors", }, } DEFAULT_IC_LORA = "Union Control (Depth + Edge)" # ───────────────────────────────────────────────────────────────────────────── # Download Models # ───────────────────────────────────────────────────────────────────────────── LTX_MODEL_REPO = "Lightricks/LTX-2.3" CHECKPOINT_PATH = "linoyts/ltx-2.3-22b-fused-union-control" #ltx 2.3 with fused union control lora because it breaks on quantization otherwise GEMMA_REPO = "google/gemma-3-12b-it-qat-q4_0-unquantized" print("=" * 80) print("Downloading LTX-2.3 distilled model + Gemma + IC-LoRA...") print("=" * 80) checkpoint_path = hf_hub_download( # repo_id=LTX_MODEL_REPO, filename="ltx-2.3-22b-distilled.safetensors" repo_id=CHECKPOINT_PATH, filename="ltx-2.3-22b-fused-union-control.safetensors" ) spatial_upsampler_path = hf_hub_download( repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors" ) gemma_root = snapshot_download(repo_id=GEMMA_REPO, token=os.environ.get("HF_TOKEN")) # Download default IC-LoRA default_lora_info = IC_LORA_OPTIONS[DEFAULT_IC_LORA] default_ic_lora_path = hf_hub_download( repo_id=default_lora_info["repo"], filename=default_lora_info["filename"] ) print(f"Checkpoint: {checkpoint_path}") print(f"Spatial upsampler: {spatial_upsampler_path}") print(f"Gemma root: {gemma_root}") print(f"IC-LoRA: {default_ic_lora_path}") # ───────────────────────────────────────────────────────────────────────────── # Initialize Pipeline # ───────────────────────────────────────────────────────────────────────────── ic_loras = [ LoraPathStrengthAndSDOps(default_ic_lora_path, 1.0, LTXV_LORA_COMFY_RENAMING_MAP) ] pipeline = LTX23UnifiedPipeline( distilled_checkpoint_path=checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root=gemma_root, # ic_loras=ic_loras, # LoRA already fused into checkpoint quantization=QuantizationPolicy.fp8_cast(), # Union Control IC-LoRA was trained with reference videos at half resolution. # Set explicitly so it works both with separate LoRA and fused checkpoints. reference_downscale_factor=2, ) # Preload all models for ZeroGPU tensor packing. print("Preloading all models (including Gemma, Audio encoders)...") # Shared ledger: preload once. Separate ledgers (IC-LoRA): preload both. _ledger_1 = pipeline.stage_1_model_ledger _ledger_2 = pipeline.stage_2_model_ledger _shared = _ledger_1 is _ledger_2 # Stage 1 models (with IC-LoRA if loaded) _s1_transformer = _ledger_1.transformer() _s1_video_encoder = _ledger_1.video_encoder() _s1_text_encoder = _ledger_1.text_encoder() _s1_embeddings = _ledger_1.gemma_embeddings_processor() _s1_audio_encoder = _ledger_1.audio_encoder() _ledger_1.transformer = lambda: _s1_transformer _ledger_1.video_encoder = lambda: _s1_video_encoder _ledger_1.text_encoder = lambda: _s1_text_encoder _ledger_1.gemma_embeddings_processor = lambda: _s1_embeddings _ledger_1.audio_encoder = lambda: _s1_audio_encoder if _shared: # Single ledger — also preload decoder/upsampler/vocoder on the same object _video_decoder = _ledger_1.video_decoder() _audio_decoder = _ledger_1.audio_decoder() _vocoder = _ledger_1.vocoder() _spatial_upsampler = _ledger_1.spatial_upsampler() _ledger_1.video_decoder = lambda: _video_decoder _ledger_1.audio_decoder = lambda: _audio_decoder _ledger_1.vocoder = lambda: _vocoder _ledger_1.spatial_upsampler = lambda: _spatial_upsampler print(" (single shared ledger — no IC-LoRA)") else: # Stage 2 models (separate transformer without IC-LoRA) _s2_transformer = _ledger_2.transformer() _s2_video_encoder = _ledger_2.video_encoder() _s2_video_decoder = _ledger_2.video_decoder() _s2_audio_decoder = _ledger_2.audio_decoder() _s2_vocoder = _ledger_2.vocoder() _s2_spatial_upsampler = _ledger_2.spatial_upsampler() _s2_text_encoder = _ledger_2.text_encoder() _s2_embeddings = _ledger_2.gemma_embeddings_processor() _s2_audio_encoder = _ledger_2.audio_encoder() _ledger_2.transformer = lambda: _s2_transformer _ledger_2.video_encoder = lambda: _s2_video_encoder _ledger_2.video_decoder = lambda: _s2_video_decoder _ledger_2.audio_decoder = lambda: _s2_audio_decoder _ledger_2.vocoder = lambda: _s2_vocoder _ledger_2.spatial_upsampler = lambda: _s2_spatial_upsampler _ledger_2.text_encoder = lambda: _s2_text_encoder _ledger_2.gemma_embeddings_processor = lambda: _s2_embeddings _ledger_2.audio_encoder = lambda: _s2_audio_encoder print(" (two separate ledgers — IC-LoRA active)") print("All models preloaded!") print("=" * 80) # ───────────────────────────────────────────────────────────────────────────── # UI Helpers # ───────────────────────────────────────────────────────────────────────────── def detect_aspect_ratio(media_path) -> str: """Detect the closest aspect ratio from an image or video.""" if media_path is None: return "16:9" ext = str(media_path).lower().rsplit(".", 1)[-1] if "." in str(media_path) else "" # Try as image first if ext in ("jpg", "jpeg", "png", "bmp", "webp", "gif", "tiff"): import PIL.Image try: with PIL.Image.open(media_path) as img: w, h = img.size except Exception: return "16:9" else: # Try as video try: import av with av.open(str(media_path)) as container: stream = container.streams.video[0] w, h = stream.codec_context.width, stream.codec_context.height except Exception: # Fallback: try as image anyway import PIL.Image try: with PIL.Image.open(media_path) as img: w, h = img.size except Exception: return "16:9" ratio = w / h candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0} return min(candidates, key=lambda k: abs(ratio - candidates[k])) def on_image_upload(image, video, high_res): """Auto-set resolution when image is uploaded.""" media = image if image is not None else video aspect = detect_aspect_ratio(media) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] return gr.update(value=w), gr.update(value=h) def _get_video_duration(video_path) -> float | None: """Get video duration in seconds via ffprobe.""" if video_path is None: return None try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", str(video_path)], capture_output=True, text=True, ) return float(result.stdout.strip()) except Exception: return None def on_video_upload(video, image, high_res): """Auto-set resolution and duration when video is uploaded.""" media = video if video is not None else image aspect = detect_aspect_ratio(media) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] # Auto-adjust duration to min(video_length, 10) vid_dur = _get_video_duration(video) if vid_dur is not None: dur = round(min(vid_dur, 15.0), 1) else: dur = 3.0 return gr.update(value=w), gr.update(value=h), gr.update(value=dur) def on_highres_toggle(image, video, high_res): """Update resolution when high-res toggle changes.""" media = image if image is not None else video aspect = detect_aspect_ratio(media) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] return gr.update(value=w), gr.update(value=h) # ───────────────────────────────────────────────────────────────────────────── # Generation # ───────────────────────────────────────────────────────────────────────────── def _extract_audio_from_video(video_path: str) -> str | None: """Extract audio from video as a temp WAV file. Returns None if no audio.""" out_path = tempfile.mktemp(suffix=".wav") try: # Check if video has an audio stream probe = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=nw=1:nk=1", video_path], capture_output=True, text=True, ) if not probe.stdout.strip(): return None # Extract audio subprocess.run( ["ffmpeg", "-y", "-v", "error", "-i", video_path, "-vn", "-ac", "2", "-ar", "48000", "-c:a", "pcm_s16le", out_path], check=True, ) return out_path except (subprocess.CalledProcessError, FileNotFoundError): return None @spaces.GPU(duration=100) @torch.inference_mode() def generate_video( input_image, input_video, prompt: str = "", duration: float = 3, conditioning_strength: float = 0.85, enhance_prompt: bool = False, use_video_audio: bool = True, seed: int = 42, randomize_seed: bool = True, height: int = 512, width: int = 768, input_audio = None, progress=gr.Progress(track_tqdm=True), ): video_preprocess="Pose (DWPose)" try: torch.cuda.reset_peak_memory_stats() current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) frame_rate = DEFAULT_FRAME_RATE num_frames = int(duration * frame_rate) + 1 num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1 mode_parts = [] if input_image is not None: mode_parts.append("Image") if input_video is not None: mode_parts.append(f"Video({video_preprocess})") if input_audio is not None: mode_parts.append("Audio") if not mode_parts: mode_parts.append("Text") mode_str = " + ".join(mode_parts) print(f"[{mode_str}] Generating: {height}x{width}, {num_frames} frames " f"({duration}s), seed={current_seed}") # Build image conditionings images = [] if input_image is not None: images = [ImageConditioningInput(path=str(input_image), frame_idx=0, strength=1.0)] # Build video conditionings — preprocess to strip appearance video_conditioning = None if input_video is not None: video_path = str(input_video) if video_preprocess != "Raw (no preprocessing)": print(f"[Preprocess] Running {video_preprocess} on input video...") cond_mp4, first_frame_png = preprocess_conditioning_video( video_path=video_path, mode=video_preprocess, width=int(width) // 2, # Stage 1 operates at half res height=int(height) // 2, num_frames=num_frames, fps=frame_rate, ) video_conditioning = [(cond_mp4, 1.0)] # If no image was provided, use the video's first frame # (original appearance) as the image conditioning if input_image is None: images = [ImageConditioningInput( path=first_frame_png, frame_idx=0, strength=1.0, )] print(f"[Preprocess] Using video first frame as image conditioning") else: # Raw mode — pass video as-is video_conditioning = [(video_path, 1.0)] # If no audio was provided, optionally extract audio from the video if input_audio is None and use_video_audio: extracted_audio = _extract_audio_from_video(video_path) if extracted_audio is not None: input_audio = extracted_audio print(f"[Preprocess] Extracted audio from input video") tiling_config = TilingConfig.default() video_chunks_number = get_video_chunks_number(num_frames, tiling_config) # Truncate prompt to prevent Gemma token overflow (max 1024 tokens ≈ 500 chars) if len(prompt) > 500: prompt = prompt[:500] video, audio = pipeline( prompt=prompt, seed=current_seed, height=int(height), width=int(width), num_frames=num_frames, frame_rate=frame_rate, images=images, audio_path=input_audio, video_conditioning=video_conditioning, tiling_config=tiling_config, enhance_prompt=enhance_prompt, conditioning_strength=conditioning_strength, ) output_path = tempfile.mktemp(suffix=".mp4") encode_video( video=video, fps=frame_rate, audio=audio, output_path=output_path, video_chunks_number=video_chunks_number, ) return str(output_path), current_seed except Exception as e: import traceback print(f"Error: {str(e)}\n{traceback.format_exc()}") return None, current_seed # ───────────────────────────────────────────────────────────────────────────── # SmolVLM2 — Auto-describe motion from reference video # ───────────────────────────────────────────────────────────────────────────── SMOLVLM_MODEL_ID = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" _vlm_model = None _vlm_processor = None MOTION_PROMPT = """\ Watch this video carefully. Describe ONLY the following: 1. The body movements and gestures (walking, dancing, waving, turning, etc.) 2. Facial expressions and head movements (smiling, nodding, looking around, etc.) 3. The rhythm, speed, and energy of the motion (slow, fast, smooth, jerky, etc.) 4. The overall mood and tone conveyed by the movement Do NOT describe: - What the person/subject looks like (clothing, hair, skin, age, gender) - The background, setting, or environment - Colors, lighting, or visual style - Any objects or props Write a concise, single-paragraph description focused purely on motion and expression.\ """ def _load_vlm(): global _vlm_model, _vlm_processor if _vlm_model is None: from transformers import AutoProcessor, AutoModelForImageTextToText print(f"[SmolVLM] Loading {SMOLVLM_MODEL_ID}...") _vlm_processor = AutoProcessor.from_pretrained( SMOLVLM_MODEL_ID, trust_remote_code=True ) try: _vlm_model = AutoModelForImageTextToText.from_pretrained( SMOLVLM_MODEL_ID, torch_dtype=torch.bfloat16, trust_remote_code=True, _attn_implementation="flash_attention_2", ).to("cuda") except Exception: _vlm_model = AutoModelForImageTextToText.from_pretrained( SMOLVLM_MODEL_ID, torch_dtype=torch.bfloat16, trust_remote_code=True, ).to("cuda") print("[SmolVLM] Model loaded!") return _vlm_model, _vlm_processor @spaces.GPU(duration=60) @torch.inference_mode() def describe_video_motion(video_path, auto_describe=True): """Use SmolVLM2 to generate a motion-only description of a video.""" if video_path is None or not auto_describe: return gr.update() try: model, processor = _load_vlm() messages = [ { "role": "user", "content": [ {"type": "video", "path": str(video_path)}, {"type": "text", "text": MOTION_PROMPT}, ], }, ] inputs = processor.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ).to(model.device, dtype=torch.bfloat16) generated_ids = model.generate(**inputs, do_sample=False, max_new_tokens=200) generated_text = processor.batch_decode( generated_ids, skip_special_tokens=True )[0] # Extract only the assistant's response (after the prompt) if "Assistant:" in generated_text: motion_desc = generated_text.split("Assistant:")[-1].strip() else: motion_desc = generated_text.strip() # Clean up any leftover prompt fragments for marker in [MOTION_PROMPT[:40], "Watch this video", "Do NOT describe"]: if marker in motion_desc: motion_desc = motion_desc.split(marker)[0].strip() if motion_desc: print(f"[SmolVLM] Motion description: {motion_desc[:100]}...") return gr.update(value=motion_desc) else: return gr.update() except Exception as e: print(f"[SmolVLM] Error: {e}") return gr.update() # ───────────────────────────────────────────────────────────────────────────── # Gradio UI — LTX 2.3 Sync # ───────────────────────────────────────────────────────────────────────────── css = """ .main-title { text-align: center; margin-bottom: 0.5em; } .generate-btn { min-height: 52px !important; font-size: 1.1em !important; } footer { display: none !important; } video { object-fit: contain !important; } """ purple_citrus = gr.themes.Citrus( primary_hue=gr.themes.colors.purple, secondary_hue=gr.themes.colors.purple, neutral_hue=gr.themes.colors.gray, ) with gr.Blocks(title="LTX 2.3 Sync", css=css, theme=purple_citrus) as demo: gr.Markdown(""" # LTX 2.3 Sync: Fast Character Animation🕺 **Fast Character Animation with LTX 2.3 Distilled**, using [Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control](https://huggingface.co/Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control) with pose estimation & custom audio inputs for precise lipsync and body movement replication ✨ """) # Hidden state — preprocessing is always Pose video_preprocess = gr.State("Pose (DWPose)") with gr.Row(): # ── Left column: inputs ────────────────────────────────────── with gr.Column(scale=1): with gr.Row(): input_image = gr.Image( label="Character reference", type="filepath", ) input_video = gr.Video( label="Motion & audio reference", ) with gr.Row(): with gr.Column(min_width=160): prompt = gr.Textbox( label="Prompt (optional)", info="tip: describe the motion, body posture, facial expressions of the ref video", lines=2, placeholder="the person talks to the camera, making hand gestures", ) duration = gr.Slider( label="Duration (s)", minimum=1.0, maximum=15.0, value=3.0, step=0.5, ) auto_describe = gr.Checkbox( label="Auto-describe motion", value=False, visible=False, info="Use AI to describe the video's motion as a prompt", ) generate_btn = gr.Button( "Generate", variant="primary", size="lg", elem_classes=["generate-btn"], ) with gr.Accordion("Advanced Settings", open=False): enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False) conditioning_strength = gr.Slider( label="V2V Conditioning Strength", info="How closely to follow the reference video's structure", minimum=0.0, maximum=1.0, value=0.85, step=0.05, ) high_res = gr.Checkbox(label="High Resolution (2×)", value=False) use_video_audio = gr.Checkbox( label="Use Audio from Video", value=True, info="Extract the audio track from the motion source video", ) input_audio = gr.Audio( label="Override Audio (optional — replaces video audio)", type="filepath", ) seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, value=42, step=1, ) randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) with gr.Row(): width = gr.Number(label="Width", value=768, precision=0) height = gr.Number(label="Height", value=512, precision=0) # ── Right column: output ───────────────────────────────────── with gr.Column(scale=1): output_video = gr.Video(label="Result", autoplay=True, height=480) gr.Examples( examples=[ [ "britney-spears-toxic-2004.jpg", "example_2.mp4", "", 3.4, 0.85, False, True, 1824535108, False, 512, 768, ], [ "1 1.jpeg", "1 (2).mp4", "a man speaking while making hand gestures", 3.5, 0.9, False, True, 1723325627, False, 512, 768, ], [ "2 (1).jpeg", "video-5.mp4", "", 6.8, 0.9, False, True, 42, True, 512, 768, ], ], inputs=[ input_image, input_video, prompt, duration, conditioning_strength, enhance_prompt, use_video_audio, seed, randomize_seed, height, width, ], fn = generate_video, cache_examples=True, cache_mode="lazy", outputs=[output_video, seed], ) # ── Event handlers ─────────────────────────────────────────────────── input_image.change( fn=on_image_upload, inputs=[input_image, input_video, high_res], outputs=[width, height], ) input_video.change( fn=on_video_upload, inputs=[input_video, input_image, high_res], outputs=[width, height, duration], ) high_res.change( fn=on_highres_toggle, inputs=[input_image, input_video, high_res], outputs=[width, height], ) generate_btn.click( fn=generate_video, inputs=[ input_image, input_video, prompt, duration, conditioning_strength, enhance_prompt, use_video_audio, seed, randomize_seed, height, width,input_audio ], outputs=[output_video, seed], ) if __name__ == "__main__": demo.launch()