Spaces:
Paused
Paused
| #15μ΄ μ ν μ€μ λμ΄μμ/ ν΄μλ x2 μ¬μ©κΈμ§(νμ§μ ν) | |
| import os | |
| import subprocess | |
| import sys | |
| # Disable torch.compile / dynamo before any torch import | |
| os.environ["TORCH_COMPILE_DISABLE"] = "1" | |
| os.environ["TORCHDYNAMO_DISABLE"] = "1" | |
| # Install xformers for memory-efficient attention | |
| subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False) | |
| # Install video preprocessing dependencies | |
| subprocess.run([sys.executable, "-m", "pip", "install", | |
| "dwpose", "onnxruntime-gpu", "imageio[ffmpeg]", "scikit-image", | |
| "opencv-python-headless", "decord", "num2words"], check=False) | |
| # Ensure num2words is installed (required by SmolVLMProcessor) | |
| subprocess.run([sys.executable, "-m", "pip", "install", "num2words"], check=True) | |
| # Reinstall torchaudio to match the torch CUDA version on this space. | |
| # controlnet_aux or other deps can pull in a CPU-only torchaudio that conflicts | |
| # with the pre-installed CUDA torch, causing "undefined symbol" errors. | |
| _tv = subprocess.run([sys.executable, "-c", "import torch; print(torch.__version__)"], | |
| capture_output=True, text=True) | |
| if _tv.returncode == 0: | |
| _full_ver = _tv.stdout.strip() | |
| # Extract CUDA suffix if present (e.g. "2.7.0+cu124" -> "cu124") | |
| _cuda_suffix = _full_ver.split("+")[-1] if "+" in _full_ver else "cu124" | |
| _base_ver = _full_ver.split("+")[0] | |
| print(f"Detected torch {_full_ver}, reinstalling matching torchaudio...") | |
| subprocess.run([ | |
| sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", | |
| f"torchaudio=={_base_ver}", | |
| "--index-url", f"https://download.pytorch.org/whl/{_cuda_suffix}", | |
| ], check=False) | |
| # Clone LTX-2 repo and install packages | |
| LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" | |
| LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") | |
| LTX_COMPATIBLE_COMMIT = "ae855f8" # Pin to 2026-03-11 (compatible API) | |
| if not os.path.exists(LTX_REPO_DIR): | |
| print(f"Cloning {LTX_REPO_URL} at commit {LTX_COMPATIBLE_COMMIT}...") | |
| subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True) | |
| subprocess.run(["git", "-C", LTX_REPO_DIR, "checkout", LTX_COMPATIBLE_COMMIT], check=True) | |
| print("Installing ltx-core and ltx-pipelines from cloned repo...") | |
| subprocess.run( | |
| [sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", | |
| os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), | |
| "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")], | |
| check=True, | |
| ) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) | |
| import logging | |
| import random | |
| import tempfile | |
| from pathlib import Path | |
| import torch | |
| torch._dynamo.config.suppress_errors = True | |
| torch._dynamo.config.disable = True | |
| import spaces | |
| import gradio as gr | |
| import numpy as np | |
| from huggingface_hub import hf_hub_download, snapshot_download | |
| from safetensors import safe_open | |
| from ltx_core.components.diffusion_steps import EulerDiffusionStep | |
| from ltx_core.components.noisers import GaussianNoiser | |
| from ltx_core.conditioning import ( | |
| ConditioningItem, | |
| ConditioningItemAttentionStrengthWrapper, | |
| VideoConditionByReferenceLatent, | |
| ) | |
| from ltx_core.loader import LoraPathStrengthAndSDOps, LTXV_LORA_COMFY_RENAMING_MAP | |
| from ltx_core.model.audio_vae import decode_audio as vae_decode_audio | |
| from ltx_core.model.audio_vae import encode_audio as vae_encode_audio | |
| from ltx_core.model.upsampler import upsample_video | |
| from ltx_core.model.video_vae import TilingConfig, VideoEncoder, get_video_chunks_number | |
| from ltx_core.model.video_vae import decode_video as vae_decode_video | |
| from ltx_core.quantization import QuantizationPolicy | |
| from ltx_core.types import Audio, AudioLatentShape, LatentState, VideoLatentShape, VideoPixelShape | |
| from ltx_pipelines.utils import ModelLedger, euler_denoising_loop | |
| from ltx_pipelines.utils.args import ImageConditioningInput | |
| from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES | |
| from ltx_pipelines.utils.helpers import ( | |
| assert_resolution, | |
| cleanup_memory, | |
| combined_image_conditionings, | |
| denoise_audio_video, | |
| denoise_video_only, | |
| encode_prompts, | |
| get_device, | |
| simple_denoising_func, | |
| ) | |
| from ltx_pipelines.utils.media_io import ( | |
| decode_audio_from_file, | |
| encode_video, | |
| load_video_conditioning, | |
| ) | |
| from ltx_pipelines.utils.types import PipelineComponents | |
| # Force-patch xformers attention into the LTX attention module. | |
| from ltx_core.model.transformer import attention as _attn_mod | |
| print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") | |
| try: | |
| from xformers.ops import memory_efficient_attention as _mea | |
| _attn_mod.memory_efficient_attention = _mea | |
| print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") | |
| except Exception as e: | |
| print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}") | |
| logging.getLogger().setLevel(logging.INFO) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Video Preprocessing: Strip appearance, keep structure | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import imageio | |
| import cv2 | |
| from PIL import Image | |
| from dwpose import DwposeDetector | |
| _pose_processor = None | |
| _depth_processor = None | |
| def _get_pose_processor(): | |
| global _pose_processor | |
| if _pose_processor is None: | |
| _pose_processor = DwposeDetector.from_pretrained_default() | |
| print("[Preprocess] DWPose processor loaded") | |
| return _pose_processor | |
| def _get_depth_processor(): | |
| """Placeholder β uses simple Laplacian edge-based depth approximation via OpenCV.""" | |
| global _depth_processor | |
| if _depth_processor is None: | |
| _depth_processor = "cv2" # sentinel β we use cv2 directly | |
| print("[Preprocess] CV2-based depth processor loaded") | |
| return _depth_processor | |
| def load_video_frames(video_path: str) -> list[np.ndarray]: | |
| """Load video frames as list of HWC uint8 numpy arrays.""" | |
| frames = [] | |
| with imageio.get_reader(video_path) as reader: | |
| for frame in reader: | |
| frames.append(frame) | |
| return frames | |
| def write_video_mp4(frames_float_01: list[np.ndarray], fps: float, out_path: str) -> str: | |
| """Write float [0,1] frames to mp4.""" | |
| frames_uint8 = [(f * 255).astype(np.uint8) for f in frames_float_01] | |
| with imageio.get_writer(out_path, fps=fps, macro_block_size=1) as writer: | |
| for fr in frames_uint8: | |
| writer.append_data(fr) | |
| return out_path | |
| def extract_first_frame(video_path: str) -> str: | |
| """Extract first frame as a temp PNG file, return path.""" | |
| frames = load_video_frames(video_path) | |
| if not frames: | |
| raise ValueError("No frames in video") | |
| out_path = tempfile.mktemp(suffix=".png") | |
| Image.fromarray(frames[0]).save(out_path) | |
| return out_path | |
| def preprocess_video_pose(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]: | |
| """Extract DWPose skeletons from each frame. Returns float [0,1] frames. | |
| NOTE: We invert the pose image (white background, dark skeleton) so that | |
| the Union Control model does not interpret the predominantly-black canvas | |
| as a "dark lighting" cue. The control signal (skeleton structure) is | |
| preserved; only the brightness polarity changes. | |
| """ | |
| processor = _get_pose_processor() | |
| result = [] | |
| for frame in frames: | |
| pil = Image.fromarray(frame.astype(np.uint8)).convert("RGB") | |
| pose_img = processor(pil, include_body=True, include_hand=True, include_face=True) | |
| if not isinstance(pose_img, Image.Image): | |
| pose_img = Image.fromarray(np.array(pose_img).astype(np.uint8)) | |
| pose_img = pose_img.convert("RGB").resize((width, height), Image.BILINEAR) | |
| arr = np.array(pose_img).astype(np.float32) / 255.0 | |
| # Invert: white bg + dark skeleton β prevents model from reading | |
| # the black canvas as a lighting condition | |
| arr = 1.0 - arr | |
| result.append(arr) | |
| return result | |
| def preprocess_video_canny(frames: list[np.ndarray], width: int, height: int, | |
| low_threshold: int = 50, high_threshold: int = 100) -> list[np.ndarray]: | |
| """Extract Canny edges from each frame. Returns float [0,1] frames.""" | |
| result = [] | |
| for frame in frames: | |
| # Resize first | |
| resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) | |
| gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY) | |
| edges = cv2.Canny(gray, low_threshold, high_threshold) | |
| # Convert single-channel to 3-channel | |
| edges_3ch = np.stack([edges, edges, edges], axis=-1) | |
| result.append(edges_3ch.astype(np.float32) / 255.0) | |
| return result | |
| def preprocess_video_depth(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]: | |
| """Estimate depth-like maps from each frame using Laplacian gradient magnitude. | |
| This is a fast approximation β for true depth, use MiDaS externally.""" | |
| result = [] | |
| for frame in frames: | |
| resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) | |
| gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| # Laplacian gives edge/gradient info that approximates depth discontinuities | |
| lap = np.abs(cv2.Laplacian(gray, cv2.CV_32F, ksize=5)) | |
| # Normalize to [0, 1] | |
| lap = lap / (lap.max() + 1e-8) | |
| depth_3ch = np.stack([lap, lap, lap], axis=-1) | |
| result.append(depth_3ch) | |
| return result | |
| def preprocess_conditioning_video( | |
| video_path: str, | |
| mode: str, | |
| width: int, | |
| height: int, | |
| num_frames: int, | |
| fps: float, | |
| ) -> tuple[str, str]: | |
| """ | |
| Preprocess a video for conditioning. Strips appearance, keeps structure. | |
| Returns: | |
| (conditioning_mp4_path, first_frame_png_path) | |
| """ | |
| frames = load_video_frames(video_path) | |
| if not frames: | |
| raise ValueError("No frames decoded from video") | |
| # Trim to num_frames | |
| frames = frames[:num_frames] | |
| # Save first frame (original appearance) for image conditioning | |
| first_png = tempfile.mktemp(suffix=".png") | |
| Image.fromarray(frames[0]).save(first_png) | |
| # Process based on mode | |
| if mode == "Pose (DWPose)": | |
| processed = preprocess_video_pose(frames, width, height) | |
| elif mode == "Canny Edge": | |
| processed = preprocess_video_canny(frames, width, height) | |
| elif mode == "Depth (Laplacian)": | |
| processed = preprocess_video_depth(frames, width, height) | |
| else: | |
| # "Raw" mode β no preprocessing | |
| processed = [f.astype(np.float32) / 255.0 for f in frames] | |
| cond_mp4 = tempfile.mktemp(suffix=".mp4") | |
| write_video_mp4(processed, fps=fps, out_path=cond_mp4) | |
| return cond_mp4, first_png | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helper: read reference downscale factor from IC-LoRA metadata | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _read_lora_reference_downscale_factor(lora_path: str) -> int: | |
| try: | |
| with safe_open(lora_path, framework="pt") as f: | |
| metadata = f.metadata() or {} | |
| return int(metadata.get("reference_downscale_factor", 1)) | |
| except Exception as e: | |
| logging.warning(f"Failed to read metadata from LoRA file '{lora_path}': {e}") | |
| return 1 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Unified Pipeline: Distilled + Audio + IC-LoRA Video-to-Video | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class LTX23UnifiedPipeline: | |
| """ | |
| Unified LTX-2.3 pipeline supporting all generation modes: | |
| β’ Text-to-Video | |
| β’ Image-to-Video (first-frame conditioning) | |
| β’ Audio-to-Video (lip-sync / BGM conditioning with external audio) | |
| β’ Video-to-Video (IC-LoRA reference video conditioning) | |
| β’ Any combination of the above | |
| Architecture: | |
| - stage_1_model_ledger: transformer WITH IC-LoRA fused (used for Stage 1) | |
| - stage_2_model_ledger: transformer WITHOUT IC-LoRA (used for Stage 2 upsampling) | |
| - When no IC-LoRA is provided, both stages use the same base model. | |
| """ | |
| def __init__( | |
| self, | |
| distilled_checkpoint_path: str, | |
| spatial_upsampler_path: str, | |
| gemma_root: str, | |
| ic_loras: list[LoraPathStrengthAndSDOps] | None = None, | |
| device: torch.device | None = None, | |
| quantization: QuantizationPolicy | None = None, | |
| reference_downscale_factor: int | None = None, | |
| ): | |
| self.device = device or get_device() | |
| self.dtype = torch.bfloat16 | |
| ic_loras = ic_loras or [] | |
| self.has_ic_lora = len(ic_loras) > 0 | |
| # Stage 1: transformer with IC-LoRA (if provided) | |
| self.stage_1_model_ledger = ModelLedger( | |
| dtype=self.dtype, | |
| device=self.device, | |
| checkpoint_path=distilled_checkpoint_path, | |
| spatial_upsampler_path=spatial_upsampler_path, | |
| gemma_root_path=gemma_root, | |
| loras=ic_loras, | |
| quantization=quantization, | |
| ) | |
| if self.has_ic_lora: | |
| # Stage 2 needs a separate transformer WITHOUT IC-LoRA | |
| self.stage_2_model_ledger = ModelLedger( | |
| dtype=self.dtype, | |
| device=self.device, | |
| checkpoint_path=distilled_checkpoint_path, | |
| spatial_upsampler_path=spatial_upsampler_path, | |
| gemma_root_path=gemma_root, | |
| loras=[], | |
| quantization=quantization, | |
| ) | |
| else: | |
| # No IC-LoRA: share a single ledger for both stages (saves ~half VRAM) | |
| self.stage_2_model_ledger = self.stage_1_model_ledger | |
| self.pipeline_components = PipelineComponents( | |
| dtype=self.dtype, | |
| device=self.device, | |
| ) | |
| # Reference downscale factor: explicit value takes priority, | |
| # otherwise read from IC-LoRA metadata, otherwise default to 1. | |
| if reference_downscale_factor is not None: | |
| self.reference_downscale_factor = reference_downscale_factor | |
| else: | |
| self.reference_downscale_factor = 1 | |
| for lora in ic_loras: | |
| scale = _read_lora_reference_downscale_factor(lora.path) | |
| if scale != 1: | |
| if self.reference_downscale_factor not in (1, scale): | |
| raise ValueError( | |
| f"Conflicting reference_downscale_factor: " | |
| f"already {self.reference_downscale_factor}, got {scale}" | |
| ) | |
| self.reference_downscale_factor = scale | |
| logging.info(f"[Pipeline] reference_downscale_factor={self.reference_downscale_factor}") | |
| # ββ Video reference conditioning (from ICLoraPipeline) βββββββββββββββ | |
| def _create_ic_conditionings( | |
| self, | |
| video_conditioning: list[tuple[str, float]], | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| video_encoder: VideoEncoder, | |
| conditioning_strength: float = 1.0, | |
| ) -> list[ConditioningItem]: | |
| """Create IC-LoRA video reference conditioning items.""" | |
| conditionings: list[ConditioningItem] = [] | |
| scale = self.reference_downscale_factor | |
| ref_height = height // scale | |
| ref_width = width // scale | |
| for video_path, strength in video_conditioning: | |
| video = load_video_conditioning( | |
| video_path=video_path, | |
| height=ref_height, | |
| width=ref_width, | |
| frame_cap=num_frames, | |
| dtype=self.dtype, | |
| device=self.device, | |
| ) | |
| encoded_video = video_encoder(video) | |
| cond = VideoConditionByReferenceLatent( | |
| latent=encoded_video, | |
| downscale_factor=scale, | |
| strength=strength, | |
| ) | |
| if conditioning_strength < 1.0: | |
| cond = ConditioningItemAttentionStrengthWrapper( | |
| cond, attention_mask=conditioning_strength | |
| ) | |
| conditionings.append(cond) | |
| if conditionings: | |
| logging.info(f"[IC-LoRA] Added {len(conditionings)} video conditioning(s)") | |
| return conditionings | |
| # ββ Main generation entry point ββββββββββββββββββββββββββββββββββββββ | |
| def __call__( | |
| self, | |
| prompt: str, | |
| seed: int, | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| frame_rate: float, | |
| images: list[ImageConditioningInput], | |
| audio_path: str | None = None, | |
| video_conditioning: list[tuple[str, float]] | None = None, | |
| tiling_config: TilingConfig | None = None, | |
| enhance_prompt: bool = False, | |
| conditioning_strength: float = 1.0, | |
| ): | |
| """ | |
| Generate video with any combination of conditioning. | |
| Args: | |
| audio_path: Path to external audio file for lipsync/BGM conditioning. | |
| video_conditioning: List of (path, strength) tuples for IC-LoRA V2V. | |
| conditioning_strength: Scale for IC-LoRA attention influence [0, 1]. | |
| Returns: | |
| Tuple of (decoded_video_iterator, Audio). | |
| """ | |
| assert_resolution(height=height, width=width, is_two_stage=True) | |
| prompt += " synchronized lipsync" | |
| # Ensure the prompt includes lighting context to prevent dark outputs. | |
| # When the prompt is minimal, the model can inherit "darkness" from the | |
| # pose conditioning video's latent. Adding explicit brightness cues | |
| # counteracts this. | |
| _lighting_keywords = ["bright", "light", "lit", "illuminat", "sunny", "daylight", "indoor lighting"] | |
| if not any(kw in prompt.lower() for kw in _lighting_keywords): | |
| prompt += ", well-lit, natural lighting" | |
| has_audio = audio_path is not None | |
| has_video_cond = bool(video_conditioning) | |
| generator = torch.Generator(device=self.device).manual_seed(seed) | |
| noiser = GaussianNoiser(generator=generator) | |
| stepper = EulerDiffusionStep() | |
| dtype = torch.bfloat16 | |
| # ββ Encode text prompt βββββββββββββββββββββββββββββββββββββββββββ | |
| # Use stage_1 ledger for prompt encoding (has text encoder) | |
| (ctx_p,) = encode_prompts( | |
| [prompt], | |
| self.stage_1_model_ledger, | |
| enhance_first_prompt=enhance_prompt, | |
| enhance_prompt_image=images[0].path if len(images) > 0 else None, | |
| ) | |
| video_context, audio_context = ctx_p.video_encoding, ctx_p.audio_encoding | |
| # ββ Encode external audio (if provided) βββββββββββββββββββββββββ | |
| encoded_audio_latent = None | |
| decoded_audio_for_output = None | |
| if has_audio: | |
| video_duration = num_frames / frame_rate | |
| decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration) | |
| if decoded_audio is None: | |
| raise ValueError(f"Could not extract audio stream from {audio_path}") | |
| encoded_audio_latent = vae_encode_audio( | |
| decoded_audio, self.stage_1_model_ledger.audio_encoder() | |
| ) | |
| audio_shape = AudioLatentShape.from_duration( | |
| batch=1, duration=video_duration, channels=8, mel_bins=16 | |
| ) | |
| expected_frames = audio_shape.frames | |
| actual_frames = encoded_audio_latent.shape[2] | |
| if actual_frames > expected_frames: | |
| encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :] | |
| elif actual_frames < expected_frames: | |
| pad = torch.zeros( | |
| encoded_audio_latent.shape[0], encoded_audio_latent.shape[1], | |
| expected_frames - actual_frames, encoded_audio_latent.shape[3], | |
| device=encoded_audio_latent.device, dtype=encoded_audio_latent.dtype, | |
| ) | |
| encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2) | |
| decoded_audio_for_output = Audio( | |
| waveform=decoded_audio.waveform.squeeze(0), | |
| sampling_rate=decoded_audio.sampling_rate, | |
| ) | |
| # ββ Build conditionings for Stage 1 ββββββββββββββββββββββββββββββ | |
| # Use stage_1 video encoder (has IC-LoRA context) | |
| video_encoder = self.stage_1_model_ledger.video_encoder() | |
| stage_1_output_shape = VideoPixelShape( | |
| batch=1, frames=num_frames, | |
| width=width // 2, height=height // 2, fps=frame_rate, | |
| ) | |
| # Image conditionings | |
| stage_1_conditionings = combined_image_conditionings( | |
| images=images, | |
| height=stage_1_output_shape.height, | |
| width=stage_1_output_shape.width, | |
| video_encoder=video_encoder, | |
| dtype=dtype, | |
| device=self.device, | |
| ) | |
| # IC-LoRA video reference conditionings | |
| if has_video_cond: | |
| ic_conds = self._create_ic_conditionings( | |
| video_conditioning=video_conditioning, | |
| height=stage_1_output_shape.height, | |
| width=stage_1_output_shape.width, | |
| num_frames=num_frames, | |
| video_encoder=video_encoder, | |
| conditioning_strength=conditioning_strength, | |
| ) | |
| stage_1_conditionings.extend(ic_conds) | |
| # ββ Stage 1: Low-res generation ββββββββββββββββββββββββββββββββββ | |
| transformer = self.stage_1_model_ledger.transformer() | |
| stage_1_sigmas = torch.Tensor(DISTILLED_SIGMA_VALUES).to(self.device) | |
| def denoising_loop(sigmas, video_state, audio_state, stepper): | |
| return euler_denoising_loop( | |
| sigmas=sigmas, | |
| video_state=video_state, | |
| audio_state=audio_state, | |
| stepper=stepper, | |
| denoise_fn=simple_denoising_func( | |
| video_context=video_context, | |
| audio_context=audio_context, | |
| transformer=transformer, | |
| ), | |
| ) | |
| if has_audio: | |
| # Audio mode: denoise video only, use external audio latent | |
| video_state = denoise_video_only( | |
| output_shape=stage_1_output_shape, | |
| conditionings=stage_1_conditionings, | |
| noiser=noiser, | |
| sigmas=stage_1_sigmas, | |
| stepper=stepper, | |
| denoising_loop_fn=denoising_loop, | |
| components=self.pipeline_components, | |
| dtype=dtype, | |
| device=self.device, | |
| initial_audio_latent=encoded_audio_latent, | |
| ) | |
| audio_state = None # we'll use the original audio for output | |
| else: | |
| # Standard / IC-only mode: denoise both audio and video | |
| video_state, audio_state = denoise_audio_video( | |
| output_shape=stage_1_output_shape, | |
| conditionings=stage_1_conditionings, | |
| noiser=noiser, | |
| sigmas=stage_1_sigmas, | |
| stepper=stepper, | |
| denoising_loop_fn=denoising_loop, | |
| components=self.pipeline_components, | |
| dtype=dtype, | |
| device=self.device, | |
| ) | |
| torch.cuda.synchronize() | |
| cleanup_memory() | |
| # ββ Stage 2: Upsample + Refine ββββββββββββββββββββββββββββββββββ | |
| upscaled_video_latent = upsample_video( | |
| latent=video_state.latent[:1], | |
| video_encoder=video_encoder, | |
| upsampler=self.stage_2_model_ledger.spatial_upsampler(), | |
| ) | |
| torch.cuda.synchronize() | |
| cleanup_memory() | |
| # Stage 2 uses the transformer WITHOUT IC-LoRA | |
| transformer_s2 = self.stage_2_model_ledger.transformer() | |
| stage_2_sigmas = torch.Tensor(STAGE_2_DISTILLED_SIGMA_VALUES).to(self.device) | |
| def denoising_loop_s2(sigmas, video_state, audio_state, stepper): | |
| return euler_denoising_loop( | |
| sigmas=sigmas, | |
| video_state=video_state, | |
| audio_state=audio_state, | |
| stepper=stepper, | |
| denoise_fn=simple_denoising_func( | |
| video_context=video_context, | |
| audio_context=audio_context, | |
| transformer=transformer_s2, | |
| ), | |
| ) | |
| stage_2_output_shape = VideoPixelShape( | |
| batch=1, frames=num_frames, | |
| width=width, height=height, fps=frame_rate, | |
| ) | |
| stage_2_conditionings = combined_image_conditionings( | |
| images=images, | |
| height=stage_2_output_shape.height, | |
| width=stage_2_output_shape.width, | |
| video_encoder=video_encoder, | |
| dtype=dtype, | |
| device=self.device, | |
| ) | |
| if has_audio: | |
| video_state = denoise_video_only( | |
| output_shape=stage_2_output_shape, | |
| conditionings=stage_2_conditionings, | |
| noiser=noiser, | |
| sigmas=stage_2_sigmas, | |
| stepper=stepper, | |
| denoising_loop_fn=denoising_loop_s2, | |
| components=self.pipeline_components, | |
| dtype=dtype, | |
| device=self.device, | |
| noise_scale=stage_2_sigmas[0], | |
| initial_video_latent=upscaled_video_latent, | |
| initial_audio_latent=encoded_audio_latent, | |
| ) | |
| audio_state = None | |
| else: | |
| video_state, audio_state = denoise_audio_video( | |
| output_shape=stage_2_output_shape, | |
| conditionings=stage_2_conditionings, | |
| noiser=noiser, | |
| sigmas=stage_2_sigmas, | |
| stepper=stepper, | |
| denoising_loop_fn=denoising_loop_s2, | |
| components=self.pipeline_components, | |
| dtype=dtype, | |
| device=self.device, | |
| noise_scale=stage_2_sigmas[0], | |
| initial_video_latent=upscaled_video_latent, | |
| initial_audio_latent=audio_state.latent, | |
| ) | |
| torch.cuda.synchronize() | |
| del transformer, transformer_s2, video_encoder | |
| cleanup_memory() | |
| # ββ Decode βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| decoded_video = vae_decode_video( | |
| video_state.latent, | |
| self.stage_2_model_ledger.video_decoder(), | |
| tiling_config, | |
| generator, | |
| ) | |
| if has_audio: | |
| output_audio = decoded_audio_for_output | |
| else: | |
| output_audio = vae_decode_audio( | |
| audio_state.latent, | |
| self.stage_2_model_ledger.audio_decoder(), | |
| self.stage_2_model_ledger.vocoder(), | |
| ) | |
| return decoded_video, output_audio | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Constants | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MAX_SEED = np.iinfo(np.int32).max | |
| DEFAULT_PROMPT = ( | |
| "An astronaut hatches from a fragile egg on the surface of the Moon, " | |
| "the shell cracking and peeling apart in gentle low-gravity motion." | |
| ) | |
| DEFAULT_FRAME_RATE = 24.0 | |
| RESOLUTIONS = { | |
| "high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)}, | |
| "low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)}, | |
| } | |
| # Available IC-LoRA models | |
| IC_LORA_OPTIONS = { | |
| "Union Control (Depth + Edge)": { | |
| "repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control", | |
| "filename": "ltx-2.3-22b-ic-lora-union-control-ref0.5.safetensors", | |
| }, | |
| "Motion Track Control": { | |
| "repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Motion-Track-Control", | |
| "filename": "ltx-2.3-22b-ic-lora-motion-track-control-ref0.5.safetensors", | |
| }, | |
| } | |
| DEFAULT_IC_LORA = "Union Control (Depth + Edge)" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Download Models | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| LTX_MODEL_REPO = "Lightricks/LTX-2.3" | |
| CHECKPOINT_PATH = "linoyts/ltx-2.3-22b-fused-union-control" #ltx 2.3 with fused union control lora because it breaks on quantization otherwise | |
| GEMMA_REPO = "google/gemma-3-12b-it-qat-q4_0-unquantized" | |
| print("=" * 80) | |
| print("Downloading LTX-2.3 distilled model + Gemma + IC-LoRA...") | |
| print("=" * 80) | |
| checkpoint_path = hf_hub_download( | |
| # repo_id=LTX_MODEL_REPO, filename="ltx-2.3-22b-distilled.safetensors" | |
| repo_id=CHECKPOINT_PATH, filename="ltx-2.3-22b-fused-union-control.safetensors" | |
| ) | |
| spatial_upsampler_path = hf_hub_download( | |
| repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors" | |
| ) | |
| gemma_root = snapshot_download(repo_id=GEMMA_REPO, token=os.environ.get("HF_TOKEN")) | |
| # Download default IC-LoRA | |
| default_lora_info = IC_LORA_OPTIONS[DEFAULT_IC_LORA] | |
| default_ic_lora_path = hf_hub_download( | |
| repo_id=default_lora_info["repo"], filename=default_lora_info["filename"] | |
| ) | |
| print(f"Checkpoint: {checkpoint_path}") | |
| print(f"Spatial upsampler: {spatial_upsampler_path}") | |
| print(f"Gemma root: {gemma_root}") | |
| print(f"IC-LoRA: {default_ic_lora_path}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Initialize Pipeline | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ic_loras = [ | |
| LoraPathStrengthAndSDOps(default_ic_lora_path, 1.0, LTXV_LORA_COMFY_RENAMING_MAP) | |
| ] | |
| pipeline = LTX23UnifiedPipeline( | |
| distilled_checkpoint_path=checkpoint_path, | |
| spatial_upsampler_path=spatial_upsampler_path, | |
| gemma_root=gemma_root, | |
| # ic_loras=ic_loras, # LoRA already fused into checkpoint | |
| quantization=QuantizationPolicy.fp8_cast(), | |
| # Union Control IC-LoRA was trained with reference videos at half resolution. | |
| # Set explicitly so it works both with separate LoRA and fused checkpoints. | |
| reference_downscale_factor=2, | |
| ) | |
| # Preload all models for ZeroGPU tensor packing. | |
| print("Preloading all models (including Gemma, Audio encoders)...") | |
| # Shared ledger: preload once. Separate ledgers (IC-LoRA): preload both. | |
| _ledger_1 = pipeline.stage_1_model_ledger | |
| _ledger_2 = pipeline.stage_2_model_ledger | |
| _shared = _ledger_1 is _ledger_2 | |
| # Stage 1 models (with IC-LoRA if loaded) | |
| _s1_transformer = _ledger_1.transformer() | |
| _s1_video_encoder = _ledger_1.video_encoder() | |
| _s1_text_encoder = _ledger_1.text_encoder() | |
| _s1_embeddings = _ledger_1.gemma_embeddings_processor() | |
| _s1_audio_encoder = _ledger_1.audio_encoder() | |
| _ledger_1.transformer = lambda: _s1_transformer | |
| _ledger_1.video_encoder = lambda: _s1_video_encoder | |
| _ledger_1.text_encoder = lambda: _s1_text_encoder | |
| _ledger_1.gemma_embeddings_processor = lambda: _s1_embeddings | |
| _ledger_1.audio_encoder = lambda: _s1_audio_encoder | |
| if _shared: | |
| # Single ledger β also preload decoder/upsampler/vocoder on the same object | |
| _video_decoder = _ledger_1.video_decoder() | |
| _audio_decoder = _ledger_1.audio_decoder() | |
| _vocoder = _ledger_1.vocoder() | |
| _spatial_upsampler = _ledger_1.spatial_upsampler() | |
| _ledger_1.video_decoder = lambda: _video_decoder | |
| _ledger_1.audio_decoder = lambda: _audio_decoder | |
| _ledger_1.vocoder = lambda: _vocoder | |
| _ledger_1.spatial_upsampler = lambda: _spatial_upsampler | |
| print(" (single shared ledger β no IC-LoRA)") | |
| else: | |
| # Stage 2 models (separate transformer without IC-LoRA) | |
| _s2_transformer = _ledger_2.transformer() | |
| _s2_video_encoder = _ledger_2.video_encoder() | |
| _s2_video_decoder = _ledger_2.video_decoder() | |
| _s2_audio_decoder = _ledger_2.audio_decoder() | |
| _s2_vocoder = _ledger_2.vocoder() | |
| _s2_spatial_upsampler = _ledger_2.spatial_upsampler() | |
| _s2_text_encoder = _ledger_2.text_encoder() | |
| _s2_embeddings = _ledger_2.gemma_embeddings_processor() | |
| _s2_audio_encoder = _ledger_2.audio_encoder() | |
| _ledger_2.transformer = lambda: _s2_transformer | |
| _ledger_2.video_encoder = lambda: _s2_video_encoder | |
| _ledger_2.video_decoder = lambda: _s2_video_decoder | |
| _ledger_2.audio_decoder = lambda: _s2_audio_decoder | |
| _ledger_2.vocoder = lambda: _s2_vocoder | |
| _ledger_2.spatial_upsampler = lambda: _s2_spatial_upsampler | |
| _ledger_2.text_encoder = lambda: _s2_text_encoder | |
| _ledger_2.gemma_embeddings_processor = lambda: _s2_embeddings | |
| _ledger_2.audio_encoder = lambda: _s2_audio_encoder | |
| print(" (two separate ledgers β IC-LoRA active)") | |
| print("All models preloaded!") | |
| print("=" * 80) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UI Helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def detect_aspect_ratio(media_path) -> str: | |
| """Detect the closest aspect ratio from an image or video.""" | |
| if media_path is None: | |
| return "16:9" | |
| ext = str(media_path).lower().rsplit(".", 1)[-1] if "." in str(media_path) else "" | |
| # Try as image first | |
| if ext in ("jpg", "jpeg", "png", "bmp", "webp", "gif", "tiff"): | |
| import PIL.Image | |
| try: | |
| with PIL.Image.open(media_path) as img: | |
| w, h = img.size | |
| except Exception: | |
| return "16:9" | |
| else: | |
| # Try as video | |
| try: | |
| import av | |
| with av.open(str(media_path)) as container: | |
| stream = container.streams.video[0] | |
| w, h = stream.codec_context.width, stream.codec_context.height | |
| except Exception: | |
| # Fallback: try as image anyway | |
| import PIL.Image | |
| try: | |
| with PIL.Image.open(media_path) as img: | |
| w, h = img.size | |
| except Exception: | |
| return "16:9" | |
| ratio = w / h | |
| candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0} | |
| return min(candidates, key=lambda k: abs(ratio - candidates[k])) | |
| def on_image_upload(image, video, high_res): | |
| """Auto-set resolution when image is uploaded.""" | |
| media = image if image is not None else video | |
| aspect = detect_aspect_ratio(media) | |
| tier = "high" if high_res else "low" | |
| w, h = RESOLUTIONS[tier][aspect] | |
| return gr.update(value=w), gr.update(value=h) | |
| def _get_video_duration(video_path) -> float | None: | |
| """Get video duration in seconds via ffprobe.""" | |
| if video_path is None: | |
| return None | |
| try: | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "error", "-select_streams", "v:0", | |
| "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", | |
| str(video_path)], | |
| capture_output=True, text=True, | |
| ) | |
| return float(result.stdout.strip()) | |
| except Exception: | |
| return None | |
| def on_video_upload(video, image, high_res): | |
| """Auto-set resolution and duration when video is uploaded.""" | |
| media = video if video is not None else image | |
| aspect = detect_aspect_ratio(media) | |
| tier = "high" if high_res else "low" | |
| w, h = RESOLUTIONS[tier][aspect] | |
| # Auto-adjust duration to min(video_length, 10) | |
| vid_dur = _get_video_duration(video) | |
| if vid_dur is not None: | |
| dur = round(min(vid_dur, 15.0), 1) | |
| else: | |
| dur = 3.0 | |
| return gr.update(value=w), gr.update(value=h), gr.update(value=dur) | |
| def on_highres_toggle(image, video, high_res): | |
| """Update resolution when high-res toggle changes.""" | |
| media = image if image is not None else video | |
| aspect = detect_aspect_ratio(media) | |
| tier = "high" if high_res else "low" | |
| w, h = RESOLUTIONS[tier][aspect] | |
| return gr.update(value=w), gr.update(value=h) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Generation | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_audio_from_video(video_path: str) -> str | None: | |
| """Extract audio from video as a temp WAV file. Returns None if no audio.""" | |
| out_path = tempfile.mktemp(suffix=".wav") | |
| try: | |
| # Check if video has an audio stream | |
| probe = subprocess.run( | |
| ["ffprobe", "-v", "error", "-select_streams", "a:0", | |
| "-show_entries", "stream=codec_type", "-of", "default=nw=1:nk=1", | |
| video_path], | |
| capture_output=True, text=True, | |
| ) | |
| if not probe.stdout.strip(): | |
| return None | |
| # Extract audio | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-v", "error", "-i", video_path, | |
| "-vn", "-ac", "2", "-ar", "48000", "-c:a", "pcm_s16le", out_path], | |
| check=True, | |
| ) | |
| return out_path | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| return None | |
| def generate_video( | |
| input_image, | |
| input_video, | |
| prompt: str = "", | |
| duration: float = 3, | |
| conditioning_strength: float = 0.85, | |
| enhance_prompt: bool = False, | |
| use_video_audio: bool = True, | |
| seed: int = 42, | |
| randomize_seed: bool = True, | |
| height: int = 512, | |
| width: int = 768, | |
| input_audio = None, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| video_preprocess="Pose (DWPose)" | |
| try: | |
| torch.cuda.reset_peak_memory_stats() | |
| current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) | |
| frame_rate = DEFAULT_FRAME_RATE | |
| num_frames = int(duration * frame_rate) + 1 | |
| num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1 | |
| mode_parts = [] | |
| if input_image is not None: | |
| mode_parts.append("Image") | |
| if input_video is not None: | |
| mode_parts.append(f"Video({video_preprocess})") | |
| if input_audio is not None: | |
| mode_parts.append("Audio") | |
| if not mode_parts: | |
| mode_parts.append("Text") | |
| mode_str = " + ".join(mode_parts) | |
| print(f"[{mode_str}] Generating: {height}x{width}, {num_frames} frames " | |
| f"({duration}s), seed={current_seed}") | |
| # Build image conditionings | |
| images = [] | |
| if input_image is not None: | |
| images = [ImageConditioningInput(path=str(input_image), frame_idx=0, strength=1.0)] | |
| # Build video conditionings β preprocess to strip appearance | |
| video_conditioning = None | |
| if input_video is not None: | |
| video_path = str(input_video) | |
| if video_preprocess != "Raw (no preprocessing)": | |
| print(f"[Preprocess] Running {video_preprocess} on input video...") | |
| cond_mp4, first_frame_png = preprocess_conditioning_video( | |
| video_path=video_path, | |
| mode=video_preprocess, | |
| width=int(width) // 2, # Stage 1 operates at half res | |
| height=int(height) // 2, | |
| num_frames=num_frames, | |
| fps=frame_rate, | |
| ) | |
| video_conditioning = [(cond_mp4, 1.0)] | |
| # If no image was provided, use the video's first frame | |
| # (original appearance) as the image conditioning | |
| if input_image is None: | |
| images = [ImageConditioningInput( | |
| path=first_frame_png, frame_idx=0, strength=1.0, | |
| )] | |
| print(f"[Preprocess] Using video first frame as image conditioning") | |
| else: | |
| # Raw mode β pass video as-is | |
| video_conditioning = [(video_path, 1.0)] | |
| # If no audio was provided, optionally extract audio from the video | |
| if input_audio is None and use_video_audio: | |
| extracted_audio = _extract_audio_from_video(video_path) | |
| if extracted_audio is not None: | |
| input_audio = extracted_audio | |
| print(f"[Preprocess] Extracted audio from input video") | |
| tiling_config = TilingConfig.default() | |
| video_chunks_number = get_video_chunks_number(num_frames, tiling_config) | |
| # Truncate prompt to prevent Gemma token overflow (max 1024 tokens β 500 chars) | |
| if len(prompt) > 500: | |
| prompt = prompt[:500] | |
| video, audio = pipeline( | |
| prompt=prompt, | |
| seed=current_seed, | |
| height=int(height), | |
| width=int(width), | |
| num_frames=num_frames, | |
| frame_rate=frame_rate, | |
| images=images, | |
| audio_path=input_audio, | |
| video_conditioning=video_conditioning, | |
| tiling_config=tiling_config, | |
| enhance_prompt=enhance_prompt, | |
| conditioning_strength=conditioning_strength, | |
| ) | |
| output_path = tempfile.mktemp(suffix=".mp4") | |
| encode_video( | |
| video=video, | |
| fps=frame_rate, | |
| audio=audio, | |
| output_path=output_path, | |
| video_chunks_number=video_chunks_number, | |
| ) | |
| return str(output_path), current_seed | |
| except Exception as e: | |
| import traceback | |
| print(f"Error: {str(e)}\n{traceback.format_exc()}") | |
| return None, current_seed | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SmolVLM2 β Auto-describe motion from reference video | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SMOLVLM_MODEL_ID = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" | |
| _vlm_model = None | |
| _vlm_processor = None | |
| MOTION_PROMPT = """\ | |
| Watch this video carefully. Describe ONLY the following: | |
| 1. The body movements and gestures (walking, dancing, waving, turning, etc.) | |
| 2. Facial expressions and head movements (smiling, nodding, looking around, etc.) | |
| 3. The rhythm, speed, and energy of the motion (slow, fast, smooth, jerky, etc.) | |
| 4. The overall mood and tone conveyed by the movement | |
| Do NOT describe: | |
| - What the person/subject looks like (clothing, hair, skin, age, gender) | |
| - The background, setting, or environment | |
| - Colors, lighting, or visual style | |
| - Any objects or props | |
| Write a concise, single-paragraph description focused purely on motion and expression.\ | |
| """ | |
| def _load_vlm(): | |
| global _vlm_model, _vlm_processor | |
| if _vlm_model is None: | |
| from transformers import AutoProcessor, AutoModelForImageTextToText | |
| print(f"[SmolVLM] Loading {SMOLVLM_MODEL_ID}...") | |
| _vlm_processor = AutoProcessor.from_pretrained( | |
| SMOLVLM_MODEL_ID, trust_remote_code=True | |
| ) | |
| try: | |
| _vlm_model = AutoModelForImageTextToText.from_pretrained( | |
| SMOLVLM_MODEL_ID, | |
| torch_dtype=torch.bfloat16, | |
| trust_remote_code=True, | |
| _attn_implementation="flash_attention_2", | |
| ).to("cuda") | |
| except Exception: | |
| _vlm_model = AutoModelForImageTextToText.from_pretrained( | |
| SMOLVLM_MODEL_ID, | |
| torch_dtype=torch.bfloat16, | |
| trust_remote_code=True, | |
| ).to("cuda") | |
| print("[SmolVLM] Model loaded!") | |
| return _vlm_model, _vlm_processor | |
| def describe_video_motion(video_path, auto_describe=True): | |
| """Use SmolVLM2 to generate a motion-only description of a video.""" | |
| if video_path is None or not auto_describe: | |
| return gr.update() | |
| try: | |
| model, processor = _load_vlm() | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "video", "path": str(video_path)}, | |
| {"type": "text", "text": MOTION_PROMPT}, | |
| ], | |
| }, | |
| ] | |
| inputs = processor.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| tokenize=True, | |
| return_dict=True, | |
| return_tensors="pt", | |
| ).to(model.device, dtype=torch.bfloat16) | |
| generated_ids = model.generate(**inputs, do_sample=False, max_new_tokens=200) | |
| generated_text = processor.batch_decode( | |
| generated_ids, skip_special_tokens=True | |
| )[0] | |
| # Extract only the assistant's response (after the prompt) | |
| if "Assistant:" in generated_text: | |
| motion_desc = generated_text.split("Assistant:")[-1].strip() | |
| else: | |
| motion_desc = generated_text.strip() | |
| # Clean up any leftover prompt fragments | |
| for marker in [MOTION_PROMPT[:40], "Watch this video", "Do NOT describe"]: | |
| if marker in motion_desc: | |
| motion_desc = motion_desc.split(marker)[0].strip() | |
| if motion_desc: | |
| print(f"[SmolVLM] Motion description: {motion_desc[:100]}...") | |
| return gr.update(value=motion_desc) | |
| else: | |
| return gr.update() | |
| except Exception as e: | |
| print(f"[SmolVLM] Error: {e}") | |
| return gr.update() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Gradio UI β LTX 2.3 Sync | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| css = """ | |
| .main-title { text-align: center; margin-bottom: 0.5em; } | |
| .generate-btn { min-height: 52px !important; font-size: 1.1em !important; } | |
| footer { display: none !important; } | |
| video { object-fit: contain !important; } | |
| """ | |
| purple_citrus = gr.themes.Citrus( | |
| primary_hue=gr.themes.colors.purple, | |
| secondary_hue=gr.themes.colors.purple, | |
| neutral_hue=gr.themes.colors.gray, | |
| ) | |
| with gr.Blocks(title="LTX 2.3 Sync", css=css, theme=purple_citrus) as demo: | |
| gr.Markdown(""" | |
| # LTX 2.3 Sync: Fast Character AnimationπΊ | |
| **Fast Character Animation with LTX 2.3 Distilled**, using [Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control](https://huggingface.co/Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control) with pose estimation & custom audio inputs for precise lipsync and body movement replication β¨ | |
| """) | |
| # Hidden state β preprocessing is always Pose | |
| video_preprocess = gr.State("Pose (DWPose)") | |
| with gr.Row(): | |
| # ββ Left column: inputs ββββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| input_image = gr.Image( | |
| label="Character reference", | |
| type="filepath", | |
| ) | |
| input_video = gr.Video( | |
| label="Motion & audio reference", | |
| ) | |
| with gr.Row(): | |
| with gr.Column(min_width=160): | |
| prompt = gr.Textbox( | |
| label="Prompt (optional)", | |
| info="tip: describe the motion, body posture, facial expressions of the ref video", | |
| lines=2, | |
| placeholder="the person talks to the camera, making hand gestures", | |
| ) | |
| duration = gr.Slider( | |
| label="Duration (s)", minimum=1.0, maximum=15.0, value=3.0, step=0.5, | |
| ) | |
| auto_describe = gr.Checkbox( | |
| label="Auto-describe motion", value=False, visible=False, | |
| info="Use AI to describe the video's motion as a prompt", | |
| ) | |
| generate_btn = gr.Button( | |
| "Generate", variant="primary", size="lg", elem_classes=["generate-btn"], | |
| ) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False) | |
| conditioning_strength = gr.Slider( | |
| label="V2V Conditioning Strength", | |
| info="How closely to follow the reference video's structure", | |
| minimum=0.0, maximum=1.0, value=0.85, step=0.05, | |
| ) | |
| high_res = gr.Checkbox(label="High Resolution (2Γ)", value=False) | |
| use_video_audio = gr.Checkbox( | |
| label="Use Audio from Video", value=True, | |
| info="Extract the audio track from the motion source video", | |
| ) | |
| input_audio = gr.Audio( | |
| label="Override Audio (optional β replaces video audio)", | |
| type="filepath", | |
| ) | |
| seed = gr.Slider( | |
| label="Seed", minimum=0, maximum=MAX_SEED, value=42, step=1, | |
| ) | |
| randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) | |
| with gr.Row(): | |
| width = gr.Number(label="Width", value=768, precision=0) | |
| height = gr.Number(label="Height", value=512, precision=0) | |
| # ββ Right column: output βββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=1): | |
| output_video = gr.Video(label="Result", autoplay=True, height=480) | |
| gr.Examples( | |
| examples=[ | |
| [ | |
| "britney-spears-toxic-2004.jpg", | |
| "example_2.mp4", | |
| "", | |
| 3.4, | |
| 0.85, | |
| False, | |
| True, | |
| 1824535108, | |
| False, | |
| 512, | |
| 768, | |
| ], | |
| [ | |
| "1 1.jpeg", | |
| "1 (2).mp4", | |
| "a man speaking while making hand gestures", | |
| 3.5, | |
| 0.9, | |
| False, | |
| True, | |
| 1723325627, | |
| False, | |
| 512, | |
| 768, | |
| ], | |
| [ | |
| "2 (1).jpeg", | |
| "video-5.mp4", | |
| "", | |
| 6.8, | |
| 0.9, | |
| False, | |
| True, | |
| 42, | |
| True, | |
| 512, | |
| 768, | |
| ], | |
| ], | |
| inputs=[ | |
| input_image, | |
| input_video, | |
| prompt, | |
| duration, | |
| conditioning_strength, | |
| enhance_prompt, | |
| use_video_audio, | |
| seed, | |
| randomize_seed, | |
| height, | |
| width, | |
| ], | |
| fn = generate_video, | |
| cache_examples=True, | |
| cache_mode="lazy", | |
| outputs=[output_video, seed], | |
| ) | |
| # ββ Event handlers βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| input_image.change( | |
| fn=on_image_upload, | |
| inputs=[input_image, input_video, high_res], | |
| outputs=[width, height], | |
| ) | |
| input_video.change( | |
| fn=on_video_upload, | |
| inputs=[input_video, input_image, high_res], | |
| outputs=[width, height, duration], | |
| ) | |
| high_res.change( | |
| fn=on_highres_toggle, | |
| inputs=[input_image, input_video, high_res], | |
| outputs=[width, height], | |
| ) | |
| generate_btn.click( | |
| fn=generate_video, | |
| inputs=[ | |
| input_image, input_video, prompt, duration, | |
| conditioning_strength, enhance_prompt, | |
| use_video_audio, seed, randomize_seed, height, width,input_audio | |
| ], | |
| outputs=[output_video, seed], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |