LTX-2-3-sync / app.py
SeaWolf-AI's picture
Update app.py
30ccdc2 verified
#15초 μ œν•œ μ„€μ •λ˜μ–΄μžˆμŒ/ 해상도 x2 μ‚¬μš©κΈˆμ§€(ν™”μ§ˆμ €ν•˜)
import os
import subprocess
import sys
# Disable torch.compile / dynamo before any torch import
os.environ["TORCH_COMPILE_DISABLE"] = "1"
os.environ["TORCHDYNAMO_DISABLE"] = "1"
# Install xformers for memory-efficient attention
subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False)
# Install video preprocessing dependencies
subprocess.run([sys.executable, "-m", "pip", "install",
"dwpose", "onnxruntime-gpu", "imageio[ffmpeg]", "scikit-image",
"opencv-python-headless", "decord", "num2words"], check=False)
# Ensure num2words is installed (required by SmolVLMProcessor)
subprocess.run([sys.executable, "-m", "pip", "install", "num2words"], check=True)
# Reinstall torchaudio to match the torch CUDA version on this space.
# controlnet_aux or other deps can pull in a CPU-only torchaudio that conflicts
# with the pre-installed CUDA torch, causing "undefined symbol" errors.
_tv = subprocess.run([sys.executable, "-c", "import torch; print(torch.__version__)"],
capture_output=True, text=True)
if _tv.returncode == 0:
_full_ver = _tv.stdout.strip()
# Extract CUDA suffix if present (e.g. "2.7.0+cu124" -> "cu124")
_cuda_suffix = _full_ver.split("+")[-1] if "+" in _full_ver else "cu124"
_base_ver = _full_ver.split("+")[0]
print(f"Detected torch {_full_ver}, reinstalling matching torchaudio...")
subprocess.run([
sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps",
f"torchaudio=={_base_ver}",
"--index-url", f"https://download.pytorch.org/whl/{_cuda_suffix}",
], check=False)
# Clone LTX-2 repo and install packages
LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git"
LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2")
LTX_COMPATIBLE_COMMIT = "ae855f8" # Pin to 2026-03-11 (compatible API)
if not os.path.exists(LTX_REPO_DIR):
print(f"Cloning {LTX_REPO_URL} at commit {LTX_COMPATIBLE_COMMIT}...")
subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True)
subprocess.run(["git", "-C", LTX_REPO_DIR, "checkout", LTX_COMPATIBLE_COMMIT], check=True)
print("Installing ltx-core and ltx-pipelines from cloned repo...")
subprocess.run(
[sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e",
os.path.join(LTX_REPO_DIR, "packages", "ltx-core"),
"-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")],
check=True,
)
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src"))
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src"))
import logging
import random
import tempfile
from pathlib import Path
import torch
torch._dynamo.config.suppress_errors = True
torch._dynamo.config.disable = True
import spaces
import gradio as gr
import numpy as np
from huggingface_hub import hf_hub_download, snapshot_download
from safetensors import safe_open
from ltx_core.components.diffusion_steps import EulerDiffusionStep
from ltx_core.components.noisers import GaussianNoiser
from ltx_core.conditioning import (
ConditioningItem,
ConditioningItemAttentionStrengthWrapper,
VideoConditionByReferenceLatent,
)
from ltx_core.loader import LoraPathStrengthAndSDOps, LTXV_LORA_COMFY_RENAMING_MAP
from ltx_core.model.audio_vae import decode_audio as vae_decode_audio
from ltx_core.model.audio_vae import encode_audio as vae_encode_audio
from ltx_core.model.upsampler import upsample_video
from ltx_core.model.video_vae import TilingConfig, VideoEncoder, get_video_chunks_number
from ltx_core.model.video_vae import decode_video as vae_decode_video
from ltx_core.quantization import QuantizationPolicy
from ltx_core.types import Audio, AudioLatentShape, LatentState, VideoLatentShape, VideoPixelShape
from ltx_pipelines.utils import ModelLedger, euler_denoising_loop
from ltx_pipelines.utils.args import ImageConditioningInput
from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES
from ltx_pipelines.utils.helpers import (
assert_resolution,
cleanup_memory,
combined_image_conditionings,
denoise_audio_video,
denoise_video_only,
encode_prompts,
get_device,
simple_denoising_func,
)
from ltx_pipelines.utils.media_io import (
decode_audio_from_file,
encode_video,
load_video_conditioning,
)
from ltx_pipelines.utils.types import PipelineComponents
# Force-patch xformers attention into the LTX attention module.
from ltx_core.model.transformer import attention as _attn_mod
print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
try:
from xformers.ops import memory_efficient_attention as _mea
_attn_mod.memory_efficient_attention = _mea
print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
except Exception as e:
print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}")
logging.getLogger().setLevel(logging.INFO)
# ─────────────────────────────────────────────────────────────────────────────
# Video Preprocessing: Strip appearance, keep structure
# ─────────────────────────────────────────────────────────────────────────────
import imageio
import cv2
from PIL import Image
from dwpose import DwposeDetector
_pose_processor = None
_depth_processor = None
def _get_pose_processor():
global _pose_processor
if _pose_processor is None:
_pose_processor = DwposeDetector.from_pretrained_default()
print("[Preprocess] DWPose processor loaded")
return _pose_processor
def _get_depth_processor():
"""Placeholder β€” uses simple Laplacian edge-based depth approximation via OpenCV."""
global _depth_processor
if _depth_processor is None:
_depth_processor = "cv2" # sentinel β€” we use cv2 directly
print("[Preprocess] CV2-based depth processor loaded")
return _depth_processor
def load_video_frames(video_path: str) -> list[np.ndarray]:
"""Load video frames as list of HWC uint8 numpy arrays."""
frames = []
with imageio.get_reader(video_path) as reader:
for frame in reader:
frames.append(frame)
return frames
def write_video_mp4(frames_float_01: list[np.ndarray], fps: float, out_path: str) -> str:
"""Write float [0,1] frames to mp4."""
frames_uint8 = [(f * 255).astype(np.uint8) for f in frames_float_01]
with imageio.get_writer(out_path, fps=fps, macro_block_size=1) as writer:
for fr in frames_uint8:
writer.append_data(fr)
return out_path
def extract_first_frame(video_path: str) -> str:
"""Extract first frame as a temp PNG file, return path."""
frames = load_video_frames(video_path)
if not frames:
raise ValueError("No frames in video")
out_path = tempfile.mktemp(suffix=".png")
Image.fromarray(frames[0]).save(out_path)
return out_path
def preprocess_video_pose(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]:
"""Extract DWPose skeletons from each frame. Returns float [0,1] frames.
NOTE: We invert the pose image (white background, dark skeleton) so that
the Union Control model does not interpret the predominantly-black canvas
as a "dark lighting" cue. The control signal (skeleton structure) is
preserved; only the brightness polarity changes.
"""
processor = _get_pose_processor()
result = []
for frame in frames:
pil = Image.fromarray(frame.astype(np.uint8)).convert("RGB")
pose_img = processor(pil, include_body=True, include_hand=True, include_face=True)
if not isinstance(pose_img, Image.Image):
pose_img = Image.fromarray(np.array(pose_img).astype(np.uint8))
pose_img = pose_img.convert("RGB").resize((width, height), Image.BILINEAR)
arr = np.array(pose_img).astype(np.float32) / 255.0
# Invert: white bg + dark skeleton β€” prevents model from reading
# the black canvas as a lighting condition
arr = 1.0 - arr
result.append(arr)
return result
def preprocess_video_canny(frames: list[np.ndarray], width: int, height: int,
low_threshold: int = 50, high_threshold: int = 100) -> list[np.ndarray]:
"""Extract Canny edges from each frame. Returns float [0,1] frames."""
result = []
for frame in frames:
# Resize first
resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, low_threshold, high_threshold)
# Convert single-channel to 3-channel
edges_3ch = np.stack([edges, edges, edges], axis=-1)
result.append(edges_3ch.astype(np.float32) / 255.0)
return result
def preprocess_video_depth(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]:
"""Estimate depth-like maps from each frame using Laplacian gradient magnitude.
This is a fast approximation β€” for true depth, use MiDaS externally."""
result = []
for frame in frames:
resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY).astype(np.float32)
# Laplacian gives edge/gradient info that approximates depth discontinuities
lap = np.abs(cv2.Laplacian(gray, cv2.CV_32F, ksize=5))
# Normalize to [0, 1]
lap = lap / (lap.max() + 1e-8)
depth_3ch = np.stack([lap, lap, lap], axis=-1)
result.append(depth_3ch)
return result
def preprocess_conditioning_video(
video_path: str,
mode: str,
width: int,
height: int,
num_frames: int,
fps: float,
) -> tuple[str, str]:
"""
Preprocess a video for conditioning. Strips appearance, keeps structure.
Returns:
(conditioning_mp4_path, first_frame_png_path)
"""
frames = load_video_frames(video_path)
if not frames:
raise ValueError("No frames decoded from video")
# Trim to num_frames
frames = frames[:num_frames]
# Save first frame (original appearance) for image conditioning
first_png = tempfile.mktemp(suffix=".png")
Image.fromarray(frames[0]).save(first_png)
# Process based on mode
if mode == "Pose (DWPose)":
processed = preprocess_video_pose(frames, width, height)
elif mode == "Canny Edge":
processed = preprocess_video_canny(frames, width, height)
elif mode == "Depth (Laplacian)":
processed = preprocess_video_depth(frames, width, height)
else:
# "Raw" mode β€” no preprocessing
processed = [f.astype(np.float32) / 255.0 for f in frames]
cond_mp4 = tempfile.mktemp(suffix=".mp4")
write_video_mp4(processed, fps=fps, out_path=cond_mp4)
return cond_mp4, first_png
# ─────────────────────────────────────────────────────────────────────────────
# Helper: read reference downscale factor from IC-LoRA metadata
# ─────────────────────────────────────────────────────────────────────────────
def _read_lora_reference_downscale_factor(lora_path: str) -> int:
try:
with safe_open(lora_path, framework="pt") as f:
metadata = f.metadata() or {}
return int(metadata.get("reference_downscale_factor", 1))
except Exception as e:
logging.warning(f"Failed to read metadata from LoRA file '{lora_path}': {e}")
return 1
# ─────────────────────────────────────────────────────────────────────────────
# Unified Pipeline: Distilled + Audio + IC-LoRA Video-to-Video
# ─────────────────────────────────────────────────────────────────────────────
class LTX23UnifiedPipeline:
"""
Unified LTX-2.3 pipeline supporting all generation modes:
β€’ Text-to-Video
β€’ Image-to-Video (first-frame conditioning)
β€’ Audio-to-Video (lip-sync / BGM conditioning with external audio)
β€’ Video-to-Video (IC-LoRA reference video conditioning)
β€’ Any combination of the above
Architecture:
- stage_1_model_ledger: transformer WITH IC-LoRA fused (used for Stage 1)
- stage_2_model_ledger: transformer WITHOUT IC-LoRA (used for Stage 2 upsampling)
- When no IC-LoRA is provided, both stages use the same base model.
"""
def __init__(
self,
distilled_checkpoint_path: str,
spatial_upsampler_path: str,
gemma_root: str,
ic_loras: list[LoraPathStrengthAndSDOps] | None = None,
device: torch.device | None = None,
quantization: QuantizationPolicy | None = None,
reference_downscale_factor: int | None = None,
):
self.device = device or get_device()
self.dtype = torch.bfloat16
ic_loras = ic_loras or []
self.has_ic_lora = len(ic_loras) > 0
# Stage 1: transformer with IC-LoRA (if provided)
self.stage_1_model_ledger = ModelLedger(
dtype=self.dtype,
device=self.device,
checkpoint_path=distilled_checkpoint_path,
spatial_upsampler_path=spatial_upsampler_path,
gemma_root_path=gemma_root,
loras=ic_loras,
quantization=quantization,
)
if self.has_ic_lora:
# Stage 2 needs a separate transformer WITHOUT IC-LoRA
self.stage_2_model_ledger = ModelLedger(
dtype=self.dtype,
device=self.device,
checkpoint_path=distilled_checkpoint_path,
spatial_upsampler_path=spatial_upsampler_path,
gemma_root_path=gemma_root,
loras=[],
quantization=quantization,
)
else:
# No IC-LoRA: share a single ledger for both stages (saves ~half VRAM)
self.stage_2_model_ledger = self.stage_1_model_ledger
self.pipeline_components = PipelineComponents(
dtype=self.dtype,
device=self.device,
)
# Reference downscale factor: explicit value takes priority,
# otherwise read from IC-LoRA metadata, otherwise default to 1.
if reference_downscale_factor is not None:
self.reference_downscale_factor = reference_downscale_factor
else:
self.reference_downscale_factor = 1
for lora in ic_loras:
scale = _read_lora_reference_downscale_factor(lora.path)
if scale != 1:
if self.reference_downscale_factor not in (1, scale):
raise ValueError(
f"Conflicting reference_downscale_factor: "
f"already {self.reference_downscale_factor}, got {scale}"
)
self.reference_downscale_factor = scale
logging.info(f"[Pipeline] reference_downscale_factor={self.reference_downscale_factor}")
# ── Video reference conditioning (from ICLoraPipeline) ───────────────
def _create_ic_conditionings(
self,
video_conditioning: list[tuple[str, float]],
height: int,
width: int,
num_frames: int,
video_encoder: VideoEncoder,
conditioning_strength: float = 1.0,
) -> list[ConditioningItem]:
"""Create IC-LoRA video reference conditioning items."""
conditionings: list[ConditioningItem] = []
scale = self.reference_downscale_factor
ref_height = height // scale
ref_width = width // scale
for video_path, strength in video_conditioning:
video = load_video_conditioning(
video_path=video_path,
height=ref_height,
width=ref_width,
frame_cap=num_frames,
dtype=self.dtype,
device=self.device,
)
encoded_video = video_encoder(video)
cond = VideoConditionByReferenceLatent(
latent=encoded_video,
downscale_factor=scale,
strength=strength,
)
if conditioning_strength < 1.0:
cond = ConditioningItemAttentionStrengthWrapper(
cond, attention_mask=conditioning_strength
)
conditionings.append(cond)
if conditionings:
logging.info(f"[IC-LoRA] Added {len(conditionings)} video conditioning(s)")
return conditionings
# ── Main generation entry point ──────────────────────────────────────
def __call__(
self,
prompt: str,
seed: int,
height: int,
width: int,
num_frames: int,
frame_rate: float,
images: list[ImageConditioningInput],
audio_path: str | None = None,
video_conditioning: list[tuple[str, float]] | None = None,
tiling_config: TilingConfig | None = None,
enhance_prompt: bool = False,
conditioning_strength: float = 1.0,
):
"""
Generate video with any combination of conditioning.
Args:
audio_path: Path to external audio file for lipsync/BGM conditioning.
video_conditioning: List of (path, strength) tuples for IC-LoRA V2V.
conditioning_strength: Scale for IC-LoRA attention influence [0, 1].
Returns:
Tuple of (decoded_video_iterator, Audio).
"""
assert_resolution(height=height, width=width, is_two_stage=True)
prompt += " synchronized lipsync"
# Ensure the prompt includes lighting context to prevent dark outputs.
# When the prompt is minimal, the model can inherit "darkness" from the
# pose conditioning video's latent. Adding explicit brightness cues
# counteracts this.
_lighting_keywords = ["bright", "light", "lit", "illuminat", "sunny", "daylight", "indoor lighting"]
if not any(kw in prompt.lower() for kw in _lighting_keywords):
prompt += ", well-lit, natural lighting"
has_audio = audio_path is not None
has_video_cond = bool(video_conditioning)
generator = torch.Generator(device=self.device).manual_seed(seed)
noiser = GaussianNoiser(generator=generator)
stepper = EulerDiffusionStep()
dtype = torch.bfloat16
# ── Encode text prompt ───────────────────────────────────────────
# Use stage_1 ledger for prompt encoding (has text encoder)
(ctx_p,) = encode_prompts(
[prompt],
self.stage_1_model_ledger,
enhance_first_prompt=enhance_prompt,
enhance_prompt_image=images[0].path if len(images) > 0 else None,
)
video_context, audio_context = ctx_p.video_encoding, ctx_p.audio_encoding
# ── Encode external audio (if provided) ─────────────────────────
encoded_audio_latent = None
decoded_audio_for_output = None
if has_audio:
video_duration = num_frames / frame_rate
decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration)
if decoded_audio is None:
raise ValueError(f"Could not extract audio stream from {audio_path}")
encoded_audio_latent = vae_encode_audio(
decoded_audio, self.stage_1_model_ledger.audio_encoder()
)
audio_shape = AudioLatentShape.from_duration(
batch=1, duration=video_duration, channels=8, mel_bins=16
)
expected_frames = audio_shape.frames
actual_frames = encoded_audio_latent.shape[2]
if actual_frames > expected_frames:
encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :]
elif actual_frames < expected_frames:
pad = torch.zeros(
encoded_audio_latent.shape[0], encoded_audio_latent.shape[1],
expected_frames - actual_frames, encoded_audio_latent.shape[3],
device=encoded_audio_latent.device, dtype=encoded_audio_latent.dtype,
)
encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2)
decoded_audio_for_output = Audio(
waveform=decoded_audio.waveform.squeeze(0),
sampling_rate=decoded_audio.sampling_rate,
)
# ── Build conditionings for Stage 1 ──────────────────────────────
# Use stage_1 video encoder (has IC-LoRA context)
video_encoder = self.stage_1_model_ledger.video_encoder()
stage_1_output_shape = VideoPixelShape(
batch=1, frames=num_frames,
width=width // 2, height=height // 2, fps=frame_rate,
)
# Image conditionings
stage_1_conditionings = combined_image_conditionings(
images=images,
height=stage_1_output_shape.height,
width=stage_1_output_shape.width,
video_encoder=video_encoder,
dtype=dtype,
device=self.device,
)
# IC-LoRA video reference conditionings
if has_video_cond:
ic_conds = self._create_ic_conditionings(
video_conditioning=video_conditioning,
height=stage_1_output_shape.height,
width=stage_1_output_shape.width,
num_frames=num_frames,
video_encoder=video_encoder,
conditioning_strength=conditioning_strength,
)
stage_1_conditionings.extend(ic_conds)
# ── Stage 1: Low-res generation ──────────────────────────────────
transformer = self.stage_1_model_ledger.transformer()
stage_1_sigmas = torch.Tensor(DISTILLED_SIGMA_VALUES).to(self.device)
def denoising_loop(sigmas, video_state, audio_state, stepper):
return euler_denoising_loop(
sigmas=sigmas,
video_state=video_state,
audio_state=audio_state,
stepper=stepper,
denoise_fn=simple_denoising_func(
video_context=video_context,
audio_context=audio_context,
transformer=transformer,
),
)
if has_audio:
# Audio mode: denoise video only, use external audio latent
video_state = denoise_video_only(
output_shape=stage_1_output_shape,
conditionings=stage_1_conditionings,
noiser=noiser,
sigmas=stage_1_sigmas,
stepper=stepper,
denoising_loop_fn=denoising_loop,
components=self.pipeline_components,
dtype=dtype,
device=self.device,
initial_audio_latent=encoded_audio_latent,
)
audio_state = None # we'll use the original audio for output
else:
# Standard / IC-only mode: denoise both audio and video
video_state, audio_state = denoise_audio_video(
output_shape=stage_1_output_shape,
conditionings=stage_1_conditionings,
noiser=noiser,
sigmas=stage_1_sigmas,
stepper=stepper,
denoising_loop_fn=denoising_loop,
components=self.pipeline_components,
dtype=dtype,
device=self.device,
)
torch.cuda.synchronize()
cleanup_memory()
# ── Stage 2: Upsample + Refine ──────────────────────────────────
upscaled_video_latent = upsample_video(
latent=video_state.latent[:1],
video_encoder=video_encoder,
upsampler=self.stage_2_model_ledger.spatial_upsampler(),
)
torch.cuda.synchronize()
cleanup_memory()
# Stage 2 uses the transformer WITHOUT IC-LoRA
transformer_s2 = self.stage_2_model_ledger.transformer()
stage_2_sigmas = torch.Tensor(STAGE_2_DISTILLED_SIGMA_VALUES).to(self.device)
def denoising_loop_s2(sigmas, video_state, audio_state, stepper):
return euler_denoising_loop(
sigmas=sigmas,
video_state=video_state,
audio_state=audio_state,
stepper=stepper,
denoise_fn=simple_denoising_func(
video_context=video_context,
audio_context=audio_context,
transformer=transformer_s2,
),
)
stage_2_output_shape = VideoPixelShape(
batch=1, frames=num_frames,
width=width, height=height, fps=frame_rate,
)
stage_2_conditionings = combined_image_conditionings(
images=images,
height=stage_2_output_shape.height,
width=stage_2_output_shape.width,
video_encoder=video_encoder,
dtype=dtype,
device=self.device,
)
if has_audio:
video_state = denoise_video_only(
output_shape=stage_2_output_shape,
conditionings=stage_2_conditionings,
noiser=noiser,
sigmas=stage_2_sigmas,
stepper=stepper,
denoising_loop_fn=denoising_loop_s2,
components=self.pipeline_components,
dtype=dtype,
device=self.device,
noise_scale=stage_2_sigmas[0],
initial_video_latent=upscaled_video_latent,
initial_audio_latent=encoded_audio_latent,
)
audio_state = None
else:
video_state, audio_state = denoise_audio_video(
output_shape=stage_2_output_shape,
conditionings=stage_2_conditionings,
noiser=noiser,
sigmas=stage_2_sigmas,
stepper=stepper,
denoising_loop_fn=denoising_loop_s2,
components=self.pipeline_components,
dtype=dtype,
device=self.device,
noise_scale=stage_2_sigmas[0],
initial_video_latent=upscaled_video_latent,
initial_audio_latent=audio_state.latent,
)
torch.cuda.synchronize()
del transformer, transformer_s2, video_encoder
cleanup_memory()
# ── Decode ───────────────────────────────────────────────────────
decoded_video = vae_decode_video(
video_state.latent,
self.stage_2_model_ledger.video_decoder(),
tiling_config,
generator,
)
if has_audio:
output_audio = decoded_audio_for_output
else:
output_audio = vae_decode_audio(
audio_state.latent,
self.stage_2_model_ledger.audio_decoder(),
self.stage_2_model_ledger.vocoder(),
)
return decoded_video, output_audio
# ─────────────────────────────────────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────────────────────────────────────
MAX_SEED = np.iinfo(np.int32).max
DEFAULT_PROMPT = (
"An astronaut hatches from a fragile egg on the surface of the Moon, "
"the shell cracking and peeling apart in gentle low-gravity motion."
)
DEFAULT_FRAME_RATE = 24.0
RESOLUTIONS = {
"high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)},
"low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)},
}
# Available IC-LoRA models
IC_LORA_OPTIONS = {
"Union Control (Depth + Edge)": {
"repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control",
"filename": "ltx-2.3-22b-ic-lora-union-control-ref0.5.safetensors",
},
"Motion Track Control": {
"repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Motion-Track-Control",
"filename": "ltx-2.3-22b-ic-lora-motion-track-control-ref0.5.safetensors",
},
}
DEFAULT_IC_LORA = "Union Control (Depth + Edge)"
# ─────────────────────────────────────────────────────────────────────────────
# Download Models
# ─────────────────────────────────────────────────────────────────────────────
LTX_MODEL_REPO = "Lightricks/LTX-2.3"
CHECKPOINT_PATH = "linoyts/ltx-2.3-22b-fused-union-control" #ltx 2.3 with fused union control lora because it breaks on quantization otherwise
GEMMA_REPO = "google/gemma-3-12b-it-qat-q4_0-unquantized"
print("=" * 80)
print("Downloading LTX-2.3 distilled model + Gemma + IC-LoRA...")
print("=" * 80)
checkpoint_path = hf_hub_download(
# repo_id=LTX_MODEL_REPO, filename="ltx-2.3-22b-distilled.safetensors"
repo_id=CHECKPOINT_PATH, filename="ltx-2.3-22b-fused-union-control.safetensors"
)
spatial_upsampler_path = hf_hub_download(
repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors"
)
gemma_root = snapshot_download(repo_id=GEMMA_REPO, token=os.environ.get("HF_TOKEN"))
# Download default IC-LoRA
default_lora_info = IC_LORA_OPTIONS[DEFAULT_IC_LORA]
default_ic_lora_path = hf_hub_download(
repo_id=default_lora_info["repo"], filename=default_lora_info["filename"]
)
print(f"Checkpoint: {checkpoint_path}")
print(f"Spatial upsampler: {spatial_upsampler_path}")
print(f"Gemma root: {gemma_root}")
print(f"IC-LoRA: {default_ic_lora_path}")
# ─────────────────────────────────────────────────────────────────────────────
# Initialize Pipeline
# ─────────────────────────────────────────────────────────────────────────────
ic_loras = [
LoraPathStrengthAndSDOps(default_ic_lora_path, 1.0, LTXV_LORA_COMFY_RENAMING_MAP)
]
pipeline = LTX23UnifiedPipeline(
distilled_checkpoint_path=checkpoint_path,
spatial_upsampler_path=spatial_upsampler_path,
gemma_root=gemma_root,
# ic_loras=ic_loras, # LoRA already fused into checkpoint
quantization=QuantizationPolicy.fp8_cast(),
# Union Control IC-LoRA was trained with reference videos at half resolution.
# Set explicitly so it works both with separate LoRA and fused checkpoints.
reference_downscale_factor=2,
)
# Preload all models for ZeroGPU tensor packing.
print("Preloading all models (including Gemma, Audio encoders)...")
# Shared ledger: preload once. Separate ledgers (IC-LoRA): preload both.
_ledger_1 = pipeline.stage_1_model_ledger
_ledger_2 = pipeline.stage_2_model_ledger
_shared = _ledger_1 is _ledger_2
# Stage 1 models (with IC-LoRA if loaded)
_s1_transformer = _ledger_1.transformer()
_s1_video_encoder = _ledger_1.video_encoder()
_s1_text_encoder = _ledger_1.text_encoder()
_s1_embeddings = _ledger_1.gemma_embeddings_processor()
_s1_audio_encoder = _ledger_1.audio_encoder()
_ledger_1.transformer = lambda: _s1_transformer
_ledger_1.video_encoder = lambda: _s1_video_encoder
_ledger_1.text_encoder = lambda: _s1_text_encoder
_ledger_1.gemma_embeddings_processor = lambda: _s1_embeddings
_ledger_1.audio_encoder = lambda: _s1_audio_encoder
if _shared:
# Single ledger β€” also preload decoder/upsampler/vocoder on the same object
_video_decoder = _ledger_1.video_decoder()
_audio_decoder = _ledger_1.audio_decoder()
_vocoder = _ledger_1.vocoder()
_spatial_upsampler = _ledger_1.spatial_upsampler()
_ledger_1.video_decoder = lambda: _video_decoder
_ledger_1.audio_decoder = lambda: _audio_decoder
_ledger_1.vocoder = lambda: _vocoder
_ledger_1.spatial_upsampler = lambda: _spatial_upsampler
print(" (single shared ledger β€” no IC-LoRA)")
else:
# Stage 2 models (separate transformer without IC-LoRA)
_s2_transformer = _ledger_2.transformer()
_s2_video_encoder = _ledger_2.video_encoder()
_s2_video_decoder = _ledger_2.video_decoder()
_s2_audio_decoder = _ledger_2.audio_decoder()
_s2_vocoder = _ledger_2.vocoder()
_s2_spatial_upsampler = _ledger_2.spatial_upsampler()
_s2_text_encoder = _ledger_2.text_encoder()
_s2_embeddings = _ledger_2.gemma_embeddings_processor()
_s2_audio_encoder = _ledger_2.audio_encoder()
_ledger_2.transformer = lambda: _s2_transformer
_ledger_2.video_encoder = lambda: _s2_video_encoder
_ledger_2.video_decoder = lambda: _s2_video_decoder
_ledger_2.audio_decoder = lambda: _s2_audio_decoder
_ledger_2.vocoder = lambda: _s2_vocoder
_ledger_2.spatial_upsampler = lambda: _s2_spatial_upsampler
_ledger_2.text_encoder = lambda: _s2_text_encoder
_ledger_2.gemma_embeddings_processor = lambda: _s2_embeddings
_ledger_2.audio_encoder = lambda: _s2_audio_encoder
print(" (two separate ledgers β€” IC-LoRA active)")
print("All models preloaded!")
print("=" * 80)
# ─────────────────────────────────────────────────────────────────────────────
# UI Helpers
# ─────────────────────────────────────────────────────────────────────────────
def detect_aspect_ratio(media_path) -> str:
"""Detect the closest aspect ratio from an image or video."""
if media_path is None:
return "16:9"
ext = str(media_path).lower().rsplit(".", 1)[-1] if "." in str(media_path) else ""
# Try as image first
if ext in ("jpg", "jpeg", "png", "bmp", "webp", "gif", "tiff"):
import PIL.Image
try:
with PIL.Image.open(media_path) as img:
w, h = img.size
except Exception:
return "16:9"
else:
# Try as video
try:
import av
with av.open(str(media_path)) as container:
stream = container.streams.video[0]
w, h = stream.codec_context.width, stream.codec_context.height
except Exception:
# Fallback: try as image anyway
import PIL.Image
try:
with PIL.Image.open(media_path) as img:
w, h = img.size
except Exception:
return "16:9"
ratio = w / h
candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0}
return min(candidates, key=lambda k: abs(ratio - candidates[k]))
def on_image_upload(image, video, high_res):
"""Auto-set resolution when image is uploaded."""
media = image if image is not None else video
aspect = detect_aspect_ratio(media)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
def _get_video_duration(video_path) -> float | None:
"""Get video duration in seconds via ffprobe."""
if video_path is None:
return None
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "format=duration", "-of", "default=nw=1:nk=1",
str(video_path)],
capture_output=True, text=True,
)
return float(result.stdout.strip())
except Exception:
return None
def on_video_upload(video, image, high_res):
"""Auto-set resolution and duration when video is uploaded."""
media = video if video is not None else image
aspect = detect_aspect_ratio(media)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
# Auto-adjust duration to min(video_length, 10)
vid_dur = _get_video_duration(video)
if vid_dur is not None:
dur = round(min(vid_dur, 15.0), 1)
else:
dur = 3.0
return gr.update(value=w), gr.update(value=h), gr.update(value=dur)
def on_highres_toggle(image, video, high_res):
"""Update resolution when high-res toggle changes."""
media = image if image is not None else video
aspect = detect_aspect_ratio(media)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
# ─────────────────────────────────────────────────────────────────────────────
# Generation
# ─────────────────────────────────────────────────────────────────────────────
def _extract_audio_from_video(video_path: str) -> str | None:
"""Extract audio from video as a temp WAV file. Returns None if no audio."""
out_path = tempfile.mktemp(suffix=".wav")
try:
# Check if video has an audio stream
probe = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "a:0",
"-show_entries", "stream=codec_type", "-of", "default=nw=1:nk=1",
video_path],
capture_output=True, text=True,
)
if not probe.stdout.strip():
return None
# Extract audio
subprocess.run(
["ffmpeg", "-y", "-v", "error", "-i", video_path,
"-vn", "-ac", "2", "-ar", "48000", "-c:a", "pcm_s16le", out_path],
check=True,
)
return out_path
except (subprocess.CalledProcessError, FileNotFoundError):
return None
@spaces.GPU(duration=100)
@torch.inference_mode()
def generate_video(
input_image,
input_video,
prompt: str = "",
duration: float = 3,
conditioning_strength: float = 0.85,
enhance_prompt: bool = False,
use_video_audio: bool = True,
seed: int = 42,
randomize_seed: bool = True,
height: int = 512,
width: int = 768,
input_audio = None,
progress=gr.Progress(track_tqdm=True),
):
video_preprocess="Pose (DWPose)"
try:
torch.cuda.reset_peak_memory_stats()
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed)
frame_rate = DEFAULT_FRAME_RATE
num_frames = int(duration * frame_rate) + 1
num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1
mode_parts = []
if input_image is not None:
mode_parts.append("Image")
if input_video is not None:
mode_parts.append(f"Video({video_preprocess})")
if input_audio is not None:
mode_parts.append("Audio")
if not mode_parts:
mode_parts.append("Text")
mode_str = " + ".join(mode_parts)
print(f"[{mode_str}] Generating: {height}x{width}, {num_frames} frames "
f"({duration}s), seed={current_seed}")
# Build image conditionings
images = []
if input_image is not None:
images = [ImageConditioningInput(path=str(input_image), frame_idx=0, strength=1.0)]
# Build video conditionings β€” preprocess to strip appearance
video_conditioning = None
if input_video is not None:
video_path = str(input_video)
if video_preprocess != "Raw (no preprocessing)":
print(f"[Preprocess] Running {video_preprocess} on input video...")
cond_mp4, first_frame_png = preprocess_conditioning_video(
video_path=video_path,
mode=video_preprocess,
width=int(width) // 2, # Stage 1 operates at half res
height=int(height) // 2,
num_frames=num_frames,
fps=frame_rate,
)
video_conditioning = [(cond_mp4, 1.0)]
# If no image was provided, use the video's first frame
# (original appearance) as the image conditioning
if input_image is None:
images = [ImageConditioningInput(
path=first_frame_png, frame_idx=0, strength=1.0,
)]
print(f"[Preprocess] Using video first frame as image conditioning")
else:
# Raw mode β€” pass video as-is
video_conditioning = [(video_path, 1.0)]
# If no audio was provided, optionally extract audio from the video
if input_audio is None and use_video_audio:
extracted_audio = _extract_audio_from_video(video_path)
if extracted_audio is not None:
input_audio = extracted_audio
print(f"[Preprocess] Extracted audio from input video")
tiling_config = TilingConfig.default()
video_chunks_number = get_video_chunks_number(num_frames, tiling_config)
# Truncate prompt to prevent Gemma token overflow (max 1024 tokens β‰ˆ 500 chars)
if len(prompt) > 500:
prompt = prompt[:500]
video, audio = pipeline(
prompt=prompt,
seed=current_seed,
height=int(height),
width=int(width),
num_frames=num_frames,
frame_rate=frame_rate,
images=images,
audio_path=input_audio,
video_conditioning=video_conditioning,
tiling_config=tiling_config,
enhance_prompt=enhance_prompt,
conditioning_strength=conditioning_strength,
)
output_path = tempfile.mktemp(suffix=".mp4")
encode_video(
video=video,
fps=frame_rate,
audio=audio,
output_path=output_path,
video_chunks_number=video_chunks_number,
)
return str(output_path), current_seed
except Exception as e:
import traceback
print(f"Error: {str(e)}\n{traceback.format_exc()}")
return None, current_seed
# ─────────────────────────────────────────────────────────────────────────────
# SmolVLM2 β€” Auto-describe motion from reference video
# ─────────────────────────────────────────────────────────────────────────────
SMOLVLM_MODEL_ID = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct"
_vlm_model = None
_vlm_processor = None
MOTION_PROMPT = """\
Watch this video carefully. Describe ONLY the following:
1. The body movements and gestures (walking, dancing, waving, turning, etc.)
2. Facial expressions and head movements (smiling, nodding, looking around, etc.)
3. The rhythm, speed, and energy of the motion (slow, fast, smooth, jerky, etc.)
4. The overall mood and tone conveyed by the movement
Do NOT describe:
- What the person/subject looks like (clothing, hair, skin, age, gender)
- The background, setting, or environment
- Colors, lighting, or visual style
- Any objects or props
Write a concise, single-paragraph description focused purely on motion and expression.\
"""
def _load_vlm():
global _vlm_model, _vlm_processor
if _vlm_model is None:
from transformers import AutoProcessor, AutoModelForImageTextToText
print(f"[SmolVLM] Loading {SMOLVLM_MODEL_ID}...")
_vlm_processor = AutoProcessor.from_pretrained(
SMOLVLM_MODEL_ID, trust_remote_code=True
)
try:
_vlm_model = AutoModelForImageTextToText.from_pretrained(
SMOLVLM_MODEL_ID,
torch_dtype=torch.bfloat16,
trust_remote_code=True,
_attn_implementation="flash_attention_2",
).to("cuda")
except Exception:
_vlm_model = AutoModelForImageTextToText.from_pretrained(
SMOLVLM_MODEL_ID,
torch_dtype=torch.bfloat16,
trust_remote_code=True,
).to("cuda")
print("[SmolVLM] Model loaded!")
return _vlm_model, _vlm_processor
@spaces.GPU(duration=60)
@torch.inference_mode()
def describe_video_motion(video_path, auto_describe=True):
"""Use SmolVLM2 to generate a motion-only description of a video."""
if video_path is None or not auto_describe:
return gr.update()
try:
model, processor = _load_vlm()
messages = [
{
"role": "user",
"content": [
{"type": "video", "path": str(video_path)},
{"type": "text", "text": MOTION_PROMPT},
],
},
]
inputs = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
).to(model.device, dtype=torch.bfloat16)
generated_ids = model.generate(**inputs, do_sample=False, max_new_tokens=200)
generated_text = processor.batch_decode(
generated_ids, skip_special_tokens=True
)[0]
# Extract only the assistant's response (after the prompt)
if "Assistant:" in generated_text:
motion_desc = generated_text.split("Assistant:")[-1].strip()
else:
motion_desc = generated_text.strip()
# Clean up any leftover prompt fragments
for marker in [MOTION_PROMPT[:40], "Watch this video", "Do NOT describe"]:
if marker in motion_desc:
motion_desc = motion_desc.split(marker)[0].strip()
if motion_desc:
print(f"[SmolVLM] Motion description: {motion_desc[:100]}...")
return gr.update(value=motion_desc)
else:
return gr.update()
except Exception as e:
print(f"[SmolVLM] Error: {e}")
return gr.update()
# ─────────────────────────────────────────────────────────────────────────────
# Gradio UI β€” LTX 2.3 Sync
# ─────────────────────────────────────────────────────────────────────────────
css = """
.main-title { text-align: center; margin-bottom: 0.5em; }
.generate-btn { min-height: 52px !important; font-size: 1.1em !important; }
footer { display: none !important; }
video { object-fit: contain !important; }
"""
purple_citrus = gr.themes.Citrus(
primary_hue=gr.themes.colors.purple,
secondary_hue=gr.themes.colors.purple,
neutral_hue=gr.themes.colors.gray,
)
with gr.Blocks(title="LTX 2.3 Sync", css=css, theme=purple_citrus) as demo:
gr.Markdown("""
# LTX 2.3 Sync: Fast Character AnimationπŸ•Ί
**Fast Character Animation with LTX 2.3 Distilled**, using [Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control](https://huggingface.co/Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control) with pose estimation & custom audio inputs for precise lipsync and body movement replication ✨
""")
# Hidden state β€” preprocessing is always Pose
video_preprocess = gr.State("Pose (DWPose)")
with gr.Row():
# ── Left column: inputs ──────────────────────────────────────
with gr.Column(scale=1):
with gr.Row():
input_image = gr.Image(
label="Character reference",
type="filepath",
)
input_video = gr.Video(
label="Motion & audio reference",
)
with gr.Row():
with gr.Column(min_width=160):
prompt = gr.Textbox(
label="Prompt (optional)",
info="tip: describe the motion, body posture, facial expressions of the ref video",
lines=2,
placeholder="the person talks to the camera, making hand gestures",
)
duration = gr.Slider(
label="Duration (s)", minimum=1.0, maximum=15.0, value=3.0, step=0.5,
)
auto_describe = gr.Checkbox(
label="Auto-describe motion", value=False, visible=False,
info="Use AI to describe the video's motion as a prompt",
)
generate_btn = gr.Button(
"Generate", variant="primary", size="lg", elem_classes=["generate-btn"],
)
with gr.Accordion("Advanced Settings", open=False):
enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False)
conditioning_strength = gr.Slider(
label="V2V Conditioning Strength",
info="How closely to follow the reference video's structure",
minimum=0.0, maximum=1.0, value=0.85, step=0.05,
)
high_res = gr.Checkbox(label="High Resolution (2Γ—)", value=False)
use_video_audio = gr.Checkbox(
label="Use Audio from Video", value=True,
info="Extract the audio track from the motion source video",
)
input_audio = gr.Audio(
label="Override Audio (optional β€” replaces video audio)",
type="filepath",
)
seed = gr.Slider(
label="Seed", minimum=0, maximum=MAX_SEED, value=42, step=1,
)
randomize_seed = gr.Checkbox(label="Randomize Seed", value=True)
with gr.Row():
width = gr.Number(label="Width", value=768, precision=0)
height = gr.Number(label="Height", value=512, precision=0)
# ── Right column: output ─────────────────────────────────────
with gr.Column(scale=1):
output_video = gr.Video(label="Result", autoplay=True, height=480)
gr.Examples(
examples=[
[
"britney-spears-toxic-2004.jpg",
"example_2.mp4",
"",
3.4,
0.85,
False,
True,
1824535108,
False,
512,
768,
],
[
"1 1.jpeg",
"1 (2).mp4",
"a man speaking while making hand gestures",
3.5,
0.9,
False,
True,
1723325627,
False,
512,
768,
],
[
"2 (1).jpeg",
"video-5.mp4",
"",
6.8,
0.9,
False,
True,
42,
True,
512,
768,
],
],
inputs=[
input_image,
input_video,
prompt,
duration,
conditioning_strength,
enhance_prompt,
use_video_audio,
seed,
randomize_seed,
height,
width,
],
fn = generate_video,
cache_examples=True,
cache_mode="lazy",
outputs=[output_video, seed],
)
# ── Event handlers ───────────────────────────────────────────────────
input_image.change(
fn=on_image_upload,
inputs=[input_image, input_video, high_res],
outputs=[width, height],
)
input_video.change(
fn=on_video_upload,
inputs=[input_video, input_image, high_res],
outputs=[width, height, duration],
)
high_res.change(
fn=on_highres_toggle,
inputs=[input_image, input_video, high_res],
outputs=[width, height],
)
generate_btn.click(
fn=generate_video,
inputs=[
input_image, input_video, prompt, duration,
conditioning_strength, enhance_prompt,
use_video_audio, seed, randomize_seed, height, width,input_audio
],
outputs=[output_video, seed],
)
if __name__ == "__main__":
demo.launch()