linoyts's picture
linoyts HF Staff
Update app.py
85c5116 verified
import os
import subprocess
import sys
# Disable torch.compile / dynamo before any torch import
os.environ["TORCH_COMPILE_DISABLE"] = "1"
os.environ["TORCHDYNAMO_DISABLE"] = "1"
# Install xformers for memory-efficient attention
subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False)
# Clone LTX-2 repo and install packages
LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git"
LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2")
if not os.path.exists(LTX_REPO_DIR):
print(f"Cloning {LTX_REPO_URL}...")
subprocess.run(["git", "clone", "--depth", "1", LTX_REPO_URL, LTX_REPO_DIR], check=True)
print("Installing ltx-core and ltx-pipelines from cloned repo...")
subprocess.run(
[sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e",
os.path.join(LTX_REPO_DIR, "packages", "ltx-core"),
"-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")],
check=True,
)
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src"))
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src"))
import logging
import random
import tempfile
from pathlib import Path
import torch
torch._dynamo.config.suppress_errors = True
torch._dynamo.config.disable = True
import spaces
import gradio as gr
import numpy as np
from huggingface_hub import hf_hub_download, snapshot_download
from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number
from ltx_core.quantization import QuantizationPolicy
from ltx_pipelines.distilled import DistilledPipeline
from ltx_pipelines.utils.args import ImageConditioningInput
from ltx_pipelines.utils.media_io import encode_video
# Force-patch xformers attention into the LTX attention module.
from ltx_core.model.transformer import attention as _attn_mod
print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
try:
from xformers.ops import memory_efficient_attention as _mea
_attn_mod.memory_efficient_attention = _mea
print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
except Exception as e:
print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}")
logging.getLogger().setLevel(logging.INFO)
MAX_SEED = np.iinfo(np.int32).max
DEFAULT_PROMPT = (
"An astronaut hatches from a fragile egg on the surface of the Moon, "
"the shell cracking and peeling apart in gentle low-gravity motion. "
"Fine lunar dust lifts and drifts outward with each movement, floating "
"in slow arcs before settling back onto the ground."
)
DEFAULT_FRAME_RATE = 24.0
# Resolution presets: (width, height)
RESOLUTIONS = {
"high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)},
"low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)},
}
# Model repos
LTX_MODEL_REPO = "diffusers-internal-dev/ltx-23"
GEMMA_REPO = "google/gemma-3-12b-it-qat-q4_0-unquantized"
# Download model checkpoints
print("=" * 80)
print("Downloading LTX-2.3 distilled model + Gemma...")
print("=" * 80)
checkpoint_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-22b-distilled.safetensors")
spatial_upsampler_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors")
gemma_root = snapshot_download(repo_id=GEMMA_REPO)
print(f"Checkpoint: {checkpoint_path}")
print(f"Spatial upsampler: {spatial_upsampler_path}")
print(f"Gemma root: {gemma_root}")
# Initialize pipeline WITH text encoder
pipeline = DistilledPipeline(
distilled_checkpoint_path=checkpoint_path,
spatial_upsampler_path=spatial_upsampler_path,
gemma_root=gemma_root,
loras=[],
quantization=QuantizationPolicy.fp8_cast(),
)
# Preload all models for ZeroGPU tensor packing.
print("Preloading all models (including Gemma)...")
ledger = pipeline.model_ledger
_transformer = ledger.transformer()
_video_encoder = ledger.video_encoder()
_video_decoder = ledger.video_decoder()
_audio_decoder = ledger.audio_decoder()
_vocoder = ledger.vocoder()
_spatial_upsampler = ledger.spatial_upsampler()
_text_encoder = ledger.text_encoder()
_embeddings_processor = ledger.gemma_embeddings_processor()
ledger.transformer = lambda: _transformer
ledger.video_encoder = lambda: _video_encoder
ledger.video_decoder = lambda: _video_decoder
ledger.audio_decoder = lambda: _audio_decoder
ledger.vocoder = lambda: _vocoder
ledger.spatial_upsampler = lambda: _spatial_upsampler
ledger.text_encoder = lambda: _text_encoder
ledger.gemma_embeddings_processor = lambda: _embeddings_processor
print("All models preloaded (including Gemma text encoder)!")
print("=" * 80)
print("Pipeline ready!")
print("=" * 80)
def log_memory(tag: str):
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024**3
peak = torch.cuda.max_memory_allocated() / 1024**3
free, total = torch.cuda.mem_get_info()
print(f"[VRAM {tag}] allocated={allocated:.2f}GB peak={peak:.2f}GB free={free / 1024**3:.2f}GB total={total / 1024**3:.2f}GB")
def detect_aspect_ratio(image) -> str:
"""Detect the closest aspect ratio (16:9, 9:16, or 1:1) from an image."""
if image is None:
return "16:9"
if hasattr(image, "size"):
w, h = image.size
elif hasattr(image, "shape"):
h, w = image.shape[:2]
else:
return "16:9"
ratio = w / h
candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0}
return min(candidates, key=lambda k: abs(ratio - candidates[k]))
def on_image_upload(first_image, last_image, high_res):
"""Auto-set resolution when an image is uploaded."""
# Use first image for aspect ratio detection, fall back to last image
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
def on_highres_toggle(first_image, last_image, high_res):
"""Update resolution when high-res toggle changes."""
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
@spaces.GPU(duration=75)
@torch.inference_mode()
def generate_video(
first_image,
last_image,
prompt: str,
duration: float,
enhance_prompt: bool = True,
seed: int = 42,
randomize_seed: bool = True,
height: int = 1024,
width: int = 1536,
progress=gr.Progress(track_tqdm=True),
):
try:
torch.cuda.reset_peak_memory_stats()
log_memory("start")
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed)
frame_rate = DEFAULT_FRAME_RATE
num_frames = int(duration * frame_rate) + 1
num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1
print(f"Generating: {height}x{width}, {num_frames} frames ({duration}s), seed={current_seed}")
images = []
output_dir = Path("outputs")
output_dir.mkdir(exist_ok=True)
if first_image is not None:
temp_first_path = output_dir / f"temp_first_{current_seed}.jpg"
if hasattr(first_image, "save"):
first_image.save(temp_first_path)
else:
temp_first_path = Path(first_image)
images.append(ImageConditioningInput(path=str(temp_first_path), frame_idx=0, strength=1.0))
if last_image is not None:
temp_last_path = output_dir / f"temp_last_{current_seed}.jpg"
if hasattr(last_image, "save"):
last_image.save(temp_last_path)
else:
temp_last_path = Path(last_image)
images.append(ImageConditioningInput(path=str(temp_last_path), frame_idx=num_frames - 1, strength=1.0))
tiling_config = TilingConfig.default()
video_chunks_number = get_video_chunks_number(num_frames, tiling_config)
log_memory("before pipeline call")
video, audio = pipeline(
prompt=prompt,
seed=current_seed,
height=int(height),
width=int(width),
num_frames=num_frames,
frame_rate=frame_rate,
images=images,
tiling_config=tiling_config,
enhance_prompt=enhance_prompt,
)
log_memory("after pipeline call")
output_path = tempfile.mktemp(suffix=".mp4")
encode_video(
video=video,
fps=frame_rate,
audio=audio,
output_path=output_path,
video_chunks_number=video_chunks_number,
)
log_memory("after encode_video")
return str(output_path), current_seed
except Exception as e:
import traceback
log_memory("on error")
print(f"Error: {str(e)}\n{traceback.format_exc()}")
return None, current_seed
with gr.Blocks(title="LTX-2.3 Distilled") as demo:
gr.Markdown("# LTX-2.3 F2LF: Fast Audio-Video Generation with Frame Conditioning")
gr.Markdown(
"Fast and high quality video + audio generation with first and last frame conditioing "
"[[model]](https://huggingface.co/Lightricks/LTX-2.3) "
"[[code]](https://github.com/Lightricks/LTX-2)"
)
with gr.Row():
with gr.Column():
with gr.Row():
first_image = gr.Image(label="First Frame (Optional)", type="pil")
last_image = gr.Image(label="Last Frame (Optional)", type="pil")
prompt = gr.Textbox(
label="Prompt",
info="for best results - make it as elaborate as possible",
value="Make this image come alive with cinematic motion, smooth animation",
lines=3,
placeholder="Describe the motion and animation you want...",
)
with gr.Row():
duration = gr.Slider(label="Duration (seconds)", minimum=1.0, maximum=10.0, value=3.0, step=0.1)
with gr.Column():
enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False)
high_res = gr.Checkbox(label="High Resolution", value=True)
generate_btn = gr.Button("Generate Video", variant="primary", size="lg")
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, value=10, step=1)
randomize_seed = gr.Checkbox(label="Randomize Seed", value=True)
with gr.Row():
width = gr.Number(label="Width", value=1536, precision=0)
height = gr.Number(label="Height", value=1024, precision=0)
with gr.Column():
output_video = gr.Video(label="Generated Video", autoplay=True)
gr.Examples(
examples=[
[
None, # first_image
"pinkknit.jpg", # last_image
"The camera falls downward through darkness as if dropped into a tunnel. "
"As it slows, five friends wearing pink knitted hats and sunglasses lean "
"over and look down toward the camera with curious expressions. The lens "
"has a strong fisheye effect, creating a circular frame around them. They "
"crowd together closely, forming a symmetrical cluster while staring "
"directly into the lens.",
3.0, # duration
False, # enhance_prompt
42, # seed
True, # randomize_seed
1024,
1024
],
],
inputs=[
first_image, last_image, prompt, duration,
enhance_prompt, seed, randomize_seed, height, width,
],
)
# Auto-detect aspect ratio from uploaded image and set resolution
first_image.change(
fn=on_image_upload,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
last_image.change(
fn=on_image_upload,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
# Update resolution when high-res toggle changes
high_res.change(
fn=on_highres_toggle,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
generate_btn.click(
fn=generate_video,
inputs=[
first_image, last_image, prompt, duration, enhance_prompt,
seed, randomize_seed, height, width,
],
outputs=[output_video, seed],
)
css = """
.fillable{max-width: 1200px !important}
"""
if __name__ == "__main__":
demo.launch(theme=gr.themes.Citrus(), css=css)