Spaces:
Running on Zero
Running on Zero
| import os | |
| import subprocess | |
| import sys | |
| # Disable torch.compile / dynamo before any torch import | |
| os.environ["TORCH_COMPILE_DISABLE"] = "1" | |
| os.environ["TORCHDYNAMO_DISABLE"] = "1" | |
| # Install xformers for memory-efficient attention | |
| subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False) | |
| # Clone LTX-2 repo and install packages | |
| LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" | |
| LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") | |
| LTX_COMMIT = "ae855f8538843825f9015a419cf4ba5edaf5eec2" # known working commit with decode_video | |
| if not os.path.exists(LTX_REPO_DIR): | |
| print(f"Cloning {LTX_REPO_URL}...") | |
| subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True) | |
| subprocess.run(["git", "checkout", LTX_COMMIT], cwd=LTX_REPO_DIR, check=True) | |
| print("Installing ltx-core and ltx-pipelines from cloned repo...") | |
| subprocess.run( | |
| [sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", | |
| os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), | |
| "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")], | |
| check=True, | |
| ) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) | |
| import logging | |
| import random | |
| import tempfile | |
| from pathlib import Path | |
| import gc | |
| import hashlib | |
| import torch | |
| torch._dynamo.config.suppress_errors = True | |
| torch._dynamo.config.disable = True | |
| import spaces | |
| import gradio as gr | |
| import numpy as np | |
| from huggingface_hub import hf_hub_download, snapshot_download | |
| from safetensors.torch import load_file, save_file | |
| from safetensors import safe_open | |
| import json | |
| import requests | |
| from ltx_core.components.diffusion_steps import EulerDiffusionStep | |
| from ltx_core.components.noisers import GaussianNoiser | |
| from ltx_core.model.audio_vae import encode_audio as vae_encode_audio | |
| from ltx_core.model.upsampler import upsample_video | |
| from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number, decode_video as vae_decode_video | |
| from ltx_core.quantization import QuantizationPolicy | |
| from ltx_core.types import Audio, AudioLatentShape, VideoPixelShape | |
| from ltx_pipelines.distilled import DistilledPipeline | |
| from ltx_pipelines.utils import euler_denoising_loop | |
| from ltx_pipelines.utils.args import ImageConditioningInput | |
| from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES | |
| from ltx_pipelines.utils.helpers import ( | |
| cleanup_memory, | |
| combined_image_conditionings, | |
| denoise_audio_video, | |
| denoise_video_only, | |
| encode_prompts, | |
| simple_denoising_func, | |
| ) | |
| from ltx_pipelines.utils.media_io import decode_audio_from_file, encode_video | |
| from ltx_core.loader.primitives import LoraPathStrengthAndSDOps | |
| from ltx_core.loader.sd_ops import LTXV_LORA_COMFY_RENAMING_MAP | |
| # Force-patch xformers attention into the LTX attention module. | |
| from ltx_core.model.transformer import attention as _attn_mod | |
| print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") | |
| try: | |
| from xformers.ops import memory_efficient_attention as _mea | |
| _attn_mod.memory_efficient_attention = _mea | |
| print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") | |
| except Exception as e: | |
| print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}") | |
| logging.getLogger().setLevel(logging.INFO) | |
| MAX_SEED = np.iinfo(np.int32).max | |
| DEFAULT_PROMPT = ( | |
| "An astronaut hatches from a fragile egg on the surface of the Moon, " | |
| "the shell cracking and peeling apart in gentle low-gravity motion. " | |
| "Fine lunar dust lifts and drifts outward with each movement, floating " | |
| "in slow arcs before settling back onto the ground." | |
| ) | |
| DEFAULT_FRAME_RATE = 24.0 | |
| # Resolution presets: (width, height) | |
| RESOLUTIONS = { | |
| "high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)}, | |
| "low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)}, | |
| } | |
| class LTX23DistilledA2VPipeline(DistilledPipeline): | |
| """DistilledPipeline: single stage, full resolution, 8 steps, with optional audio.""" | |
| def __call__( | |
| self, | |
| prompt: str, | |
| seed: int, | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| frame_rate: float, | |
| images: list[ImageConditioningInput], | |
| audio_path: str | None = None, | |
| tiling_config: TilingConfig | None = None, | |
| enhance_prompt: bool = False, | |
| ): | |
| print(prompt) | |
| generator = torch.Generator(device=self.device).manual_seed(seed) | |
| noiser = GaussianNoiser(generator=generator) | |
| stepper = EulerDiffusionStep() | |
| dtype = torch.bfloat16 | |
| (ctx_p,) = encode_prompts( | |
| [prompt], | |
| self.model_ledger, | |
| enhance_first_prompt=enhance_prompt, | |
| enhance_prompt_image=images[0].path if len(images) > 0 else None, | |
| ) | |
| video_context, audio_context = ctx_p.video_encoding, ctx_p.audio_encoding | |
| # Audio encoding — only runs if audio is provided | |
| encoded_audio_latent = None | |
| original_audio = None | |
| if audio_path is not None: | |
| video_duration = num_frames / frame_rate | |
| decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration) | |
| if decoded_audio is None: | |
| raise ValueError(f"Could not extract audio stream from {audio_path}") | |
| encoded_audio_latent = vae_encode_audio(decoded_audio, self.model_ledger.audio_encoder()) | |
| audio_shape = AudioLatentShape.from_duration(batch=1, duration=video_duration, channels=8, mel_bins=16) | |
| expected_frames = audio_shape.frames | |
| actual_frames = encoded_audio_latent.shape[2] | |
| if actual_frames > expected_frames: | |
| encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :] | |
| elif actual_frames < expected_frames: | |
| pad = torch.zeros( | |
| encoded_audio_latent.shape[0], | |
| encoded_audio_latent.shape[1], | |
| expected_frames - actual_frames, | |
| encoded_audio_latent.shape[3], | |
| device=encoded_audio_latent.device, | |
| dtype=encoded_audio_latent.dtype, | |
| ) | |
| encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2) | |
| original_audio = Audio( | |
| waveform=decoded_audio.waveform.squeeze(0), | |
| sampling_rate=decoded_audio.sampling_rate, | |
| ) | |
| video_encoder = self.model_ledger.video_encoder() | |
| transformer = self.model_ledger.transformer() | |
| sigmas = torch.tensor(DISTILLED_SIGMA_VALUES, device=self.device) | |
| def denoising_loop(sigmas, video_state, audio_state, stepper): | |
| return euler_denoising_loop( | |
| sigmas=sigmas, | |
| video_state=video_state, | |
| audio_state=audio_state, | |
| stepper=stepper, | |
| denoise_fn=simple_denoising_func( | |
| video_context=video_context, | |
| audio_context=audio_context, | |
| transformer=transformer, | |
| ), | |
| ) | |
| output_shape = VideoPixelShape( | |
| batch=1, | |
| frames=num_frames, | |
| width=width, | |
| height=height, | |
| fps=frame_rate, | |
| ) | |
| conditionings = combined_image_conditionings( | |
| images=images, | |
| height=output_shape.height, | |
| width=output_shape.width, | |
| video_encoder=video_encoder, | |
| dtype=dtype, | |
| device=self.device, | |
| ) | |
| video_state, audio_state = denoise_audio_video( | |
| output_shape=output_shape, | |
| conditionings=conditionings, | |
| noiser=noiser, | |
| sigmas=sigmas, | |
| stepper=stepper, | |
| denoising_loop_fn=denoising_loop, | |
| components=self.pipeline_components, | |
| dtype=dtype, | |
| device=self.device, | |
| initial_audio_latent=encoded_audio_latent, | |
| ) | |
| torch.cuda.synchronize() | |
| del transformer | |
| del video_encoder | |
| cleanup_memory() | |
| decoded_video = vae_decode_video( | |
| video_state.latent, | |
| self.model_ledger.video_decoder(), | |
| tiling_config, | |
| generator, | |
| ) | |
| # If audio was provided as input, return it as-is (higher fidelity than decoded) | |
| # If no audio input, decode the generated audio latent from the denoising | |
| if original_audio is not None: | |
| return decoded_video, original_audio | |
| else: | |
| from ltx_core.model.audio_vae import decode_audio as vae_decode_audio | |
| generated_audio = vae_decode_audio( | |
| audio_state.latent, | |
| self.model_ledger.audio_decoder(), | |
| self.model_ledger.vocoder(), | |
| ) | |
| return decoded_video, generated_audio | |
| # Model repos | |
| LTX_MODEL_REPO = "Lightricks/LTX-2.3" | |
| GEMMA_REPO ="Lightricks/gemma-3-12b-it-qat-q4_0-unquantized" | |
| GEMMA_ABLITERATED_REPO = "Sikaworld1990/gemma-3-12b-it-abliterated-sikaworld-high-fidelity-edition-Ltx-2" | |
| GEMMA_ABLITERATED_FILE = "gemma-3-12b-it-abliterated-sikaworld-high-fidelity-edition.safetensors" | |
| # Download model checkpoints | |
| print("=" * 80) | |
| print("Downloading LTX-2.3 distilled model + Gemma...") | |
| print("=" * 80) | |
| # LoRA cache directory and currently-applied key | |
| LORA_CACHE_DIR = Path("lora_cache") | |
| LORA_CACHE_DIR.mkdir(exist_ok=True) | |
| current_lora_key: str | None = None | |
| weights_dir = Path("weights") | |
| weights_dir.mkdir(exist_ok=True) | |
| checkpoint_path = hf_hub_download( | |
| repo_id=LTX_MODEL_REPO, | |
| filename="ltx-2.3-22b-distilled-1.1.safetensors", | |
| local_dir=str(weights_dir), | |
| local_dir_use_symlinks=False, | |
| ) | |
| spatial_upsampler_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors") | |
| print("[Gemma] Setting up abliterated Gemma text encoder...") | |
| MERGED_WEIGHTS = "/tmp/abliterated_gemma_merged.safetensors" | |
| gemma_root = "/tmp/abliterated_gemma" | |
| os.makedirs(gemma_root, exist_ok=True) | |
| gemma_official_dir = snapshot_download( | |
| repo_id=GEMMA_REPO, | |
| ignore_patterns=["*.safetensors", "*.safetensors.index.json"], | |
| ) | |
| for fname in os.listdir(gemma_official_dir): | |
| src = os.path.join(gemma_official_dir, fname) | |
| dst = os.path.join(gemma_root, fname) | |
| if os.path.isfile(src) and not fname.endswith(".safetensors") and fname != "model.safetensors.index.json": | |
| if not os.path.exists(dst): | |
| os.symlink(src, dst) | |
| if os.path.exists(MERGED_WEIGHTS): | |
| print("[Gemma] Using cached merged weights") | |
| else: | |
| abliterated_weights_path = hf_hub_download( | |
| repo_id=GEMMA_ABLITERATED_REPO, | |
| filename=GEMMA_ABLITERATED_FILE, | |
| ) | |
| index_path = hf_hub_download( | |
| repo_id=GEMMA_REPO, | |
| filename="model.safetensors.index.json" | |
| ) | |
| with open(index_path) as f: | |
| weight_index = json.load(f) | |
| vision_keys = {} | |
| for key, shard in weight_index["weight_map"].items(): | |
| if "vision_tower" in key or "multi_modal_projector" in key: | |
| vision_keys[key] = shard | |
| needed_shards = set(vision_keys.values()) | |
| shard_paths = {} | |
| for shard_name in needed_shards: | |
| shard_paths[shard_name] = hf_hub_download( | |
| repo_id=GEMMA_REPO, | |
| filename=shard_name | |
| ) | |
| _fp8_types = {torch.float8_e4m3fn, torch.float8_e5m2} | |
| raw = load_file(abliterated_weights_path) | |
| merged = {} | |
| for key, tensor in raw.items(): | |
| t = tensor.to(torch.bfloat16) if tensor.dtype in _fp8_types else tensor | |
| merged[f"language_model.{key}"] = t | |
| del raw | |
| for key, shard_name in vision_keys.items(): | |
| with safe_open(shard_paths[shard_name], framework="pt") as f: | |
| merged[key] = f.get_tensor(key) | |
| save_file(merged, MERGED_WEIGHTS) | |
| del merged | |
| gc.collect() | |
| weight_link = os.path.join(gemma_root, "model.safetensors") | |
| if os.path.exists(weight_link): | |
| os.remove(weight_link) | |
| os.symlink(MERGED_WEIGHTS, weight_link) | |
| print(f"[Gemma] Root ready: {gemma_root}") | |
| # ---- Insert block (LoRA downloads) between lines 268 and 269 ---- | |
| # LoRA repo + download the requested LoRA adapters | |
| LORA_REPO = "dagloop5/LoRA" | |
| print("=" * 80) | |
| print("Downloading LoRA adapters from dagloop5/LoRA...") | |
| print("=" * 80) | |
| pose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2_3_NSFW_furry_concat_v2.safetensors") | |
| general_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2.3_reasoning_I2V_V3.safetensors") | |
| motion_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="motion_helper.safetensors") | |
| dreamlay_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="DR34ML4Y_LTXXX_PREVIEW_RC1.safetensors") # m15510n4ry, bl0wj0b, d0ubl3_bj, d0gg1e, c0wg1rl | |
| mself_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="Furry Hyper Masturbation - LTX-2 I2V v1.safetensors") # Hyperfap | |
| dramatic_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX-2.3 - Orgasm.safetensors") # "[He | She] is having am orgasm." (am or an?) | |
| fluid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="cr3ampi3_animation_i2v_ltx2_v1.0.safetensors") # cr3ampi3 animation., missionary animation, doggystyle bouncy animation, double penetration animation | |
| liquid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="liquid_wet_dr1pp_ltx2_v1.0_scaled.safetensors") # wet dr1pp | |
| demopose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="clapping-cheeks-audio-v001-alpha.safetensors") | |
| voice_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="hentai_voice_ltx23.safetensors") | |
| realism_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="FurryenhancerLTX2.3V1.215.safetensors") | |
| transition_lora_path = hf_hub_download(repo_id="valiantcat/LTX-2.3-Transition-LORA", filename="ltx2.3-transition.safetensors") | |
| print(f"Pose LoRA: {pose_lora_path}") | |
| print(f"General LoRA: {general_lora_path}") | |
| print(f"Motion LoRA: {motion_lora_path}") | |
| print(f"Dreamlay LoRA: {dreamlay_lora_path}") | |
| print(f"Mself LoRA: {mself_lora_path}") | |
| print(f"Dramatic LoRA: {dramatic_lora_path}") | |
| print(f"Fluid LoRA: {fluid_lora_path}") | |
| print(f"Liquid LoRA: {liquid_lora_path}") | |
| print(f"Demopose LoRA: {demopose_lora_path}") | |
| print(f"Voice LoRA: {voice_lora_path}") | |
| print(f"Realism LoRA: {realism_lora_path}") | |
| print(f"Transition LoRA: {transition_lora_path}") | |
| # ---------------------------------------------------------------- | |
| print(f"Spatial upsampler: {spatial_upsampler_path}") | |
| print(f"Checkpoint: {checkpoint_path}") | |
| # Initialize pipeline WITH text encoder and optional audio support | |
| # ---- Replace block (pipeline init) lines 275-281 ---- | |
| pipeline = LTX23DistilledA2VPipeline( | |
| distilled_checkpoint_path=checkpoint_path, | |
| spatial_upsampler_path=spatial_upsampler_path, | |
| gemma_root=gemma_root, | |
| loras=[], | |
| quantization=QuantizationPolicy.fp8_cast(), # keep FP8 quantization unchanged | |
| ) | |
| # ---------------------------------------------------------------- | |
| # Currently applied LoRA deltas — stored so they can be undone before re-applying | |
| _applied_lora_deltas: dict[str, torch.Tensor] = {} | |
| _applied_lora_config: list[tuple[str, float]] = [] | |
| def _load_and_rename_lora_tensors(lora_path: str) -> dict[str, torch.Tensor]: | |
| """Load LoRA tensors from disk and apply ComfyUI→LTX key renaming.""" | |
| tensors = {} | |
| with safe_open(lora_path, framework="pt", device="cpu") as f: | |
| for key in f.keys(): | |
| tensors[key] = f.get_tensor(key) | |
| renamed = {} | |
| for key, tensor in tensors.items(): | |
| new_key = key | |
| for old_substr, new_substr in LTXV_LORA_COMFY_RENAMING_MAP.items(): | |
| new_key = new_key.replace(old_substr, new_substr) | |
| renamed[new_key] = tensor | |
| return renamed | |
| def _compute_lora_deltas(lora_path: str, strength: float) -> dict[str, torch.Tensor]: | |
| """Compute weight delta tensors for a single LoRA at given strength.""" | |
| tensors = _load_and_rename_lora_tensors(lora_path) | |
| deltas = {} | |
| # Collect all base keys that have a down component | |
| base_keys = set() | |
| for key in tensors: | |
| for suffix in [".lora_down.weight", ".lora_A.weight"]: | |
| if key.endswith(suffix): | |
| base_keys.add(key[: -len(suffix)]) | |
| for base in base_keys: | |
| down = tensors.get(base + ".lora_down.weight") or tensors.get(base + ".lora_A.weight") | |
| up = tensors.get(base + ".lora_up.weight") or tensors.get(base + ".lora_B.weight") | |
| if down is None or up is None: | |
| continue | |
| alpha_val = tensors.get(base + ".alpha") | |
| scale = (alpha_val.item() / down.shape[0]) if alpha_val is not None else 1.0 | |
| down_f = down.float() | |
| up_f = up.float() | |
| if down_f.dim() == 2 and up_f.dim() == 2: | |
| delta = up_f @ down_f | |
| elif down_f.dim() == 4: | |
| delta = (up_f.flatten(1) @ down_f.flatten(1)).view( | |
| up_f.shape[0], down_f.shape[1], *up_f.shape[2:] | |
| ) | |
| else: | |
| print(f"[LoRA] Skipping {base}: unexpected dims down={down_f.dim()} up={up_f.dim()}") | |
| continue | |
| deltas[base + ".weight"] = (delta * strength * scale).to(torch.bfloat16) | |
| return deltas | |
| def apply_loras_to_transformer( | |
| pose_strength, general_strength, motion_strength, dreamlay_strength, | |
| mself_strength, dramatic_strength, fluid_strength, liquid_strength, | |
| demopose_strength, voice_strength, realism_strength, transition_strength, | |
| ): | |
| global _applied_lora_deltas, _applied_lora_config | |
| lora_configs = [ | |
| (pose_lora_path, round(float(pose_strength), 2)), | |
| (general_lora_path, round(float(general_strength), 2)), | |
| (motion_lora_path, round(float(motion_strength), 2)), | |
| (dreamlay_lora_path, round(float(dreamlay_strength), 2)), | |
| (mself_lora_path, round(float(mself_strength), 2)), | |
| (dramatic_lora_path, round(float(dramatic_strength), 2)), | |
| (fluid_lora_path, round(float(fluid_strength), 2)), | |
| (liquid_lora_path, round(float(liquid_strength), 2)), | |
| (demopose_lora_path, round(float(demopose_strength), 2)), | |
| (voice_lora_path, round(float(voice_strength), 2)), | |
| (realism_lora_path, round(float(realism_strength), 2)), | |
| (transition_lora_path, round(float(transition_strength), 2)), | |
| ] | |
| # Skip if config hasn't changed since last application | |
| if lora_configs == _applied_lora_config: | |
| print("[LoRA] Config unchanged, skipping re-application.") | |
| return | |
| # Undo previously applied deltas | |
| if _applied_lora_deltas: | |
| print(f"[LoRA] Undoing {len(_applied_lora_deltas)} previously applied delta(s)...") | |
| with torch.no_grad(): | |
| for name, param in _transformer.named_parameters(): | |
| if name in _applied_lora_deltas: | |
| param.data -= _applied_lora_deltas[name].to( | |
| device=param.device, dtype=param.dtype | |
| ) | |
| _applied_lora_deltas = {} | |
| gc.collect() | |
| active = [(p, s) for p, s in lora_configs if p is not None and s != 0.0] | |
| if not active: | |
| print("[LoRA] No active LoRAs.") | |
| _applied_lora_config = lora_configs | |
| return | |
| print(f"[LoRA] Computing deltas for {len(active)} active LoRA(s)...") | |
| combined_deltas: dict[str, torch.Tensor] = {} | |
| for lora_path, strength in active: | |
| try: | |
| deltas = _compute_lora_deltas(lora_path, strength) | |
| for key, delta in deltas.items(): | |
| if key in combined_deltas: | |
| combined_deltas[key] = combined_deltas[key] + delta | |
| else: | |
| combined_deltas[key] = delta | |
| print(f"[LoRA] {Path(lora_path).name}: {len(deltas)} delta(s) at strength {strength}") | |
| except Exception as e: | |
| import traceback | |
| print(f"[LoRA] Failed on {lora_path}: {e}\n{traceback.format_exc()}") | |
| applied_count = 0 | |
| with torch.no_grad(): | |
| for name, param in _transformer.named_parameters(): | |
| if name in combined_deltas: | |
| param.data += combined_deltas[name].to( | |
| device=param.device, dtype=param.dtype | |
| ) | |
| applied_count += 1 | |
| _applied_lora_deltas = combined_deltas | |
| _applied_lora_config = lora_configs | |
| print(f"[LoRA] Applied {applied_count} weight delta(s) to live transformer.") | |
| gc.collect() | |
| # ---- REPLACE PRELOAD BLOCK START ---- | |
| # Preload all models for ZeroGPU tensor packing. | |
| print("Preloading all models (including Gemma and audio components)...") | |
| ledger = pipeline.model_ledger | |
| # Save the original factory methods so we can rebuild individual components later. | |
| # These are bound callables on ledger that will call the builder when invoked. | |
| _orig_transformer_factory = ledger.transformer | |
| _orig_video_encoder_factory = ledger.video_encoder | |
| _orig_video_decoder_factory = ledger.video_decoder | |
| _orig_audio_encoder_factory = ledger.audio_encoder | |
| _orig_audio_decoder_factory = ledger.audio_decoder | |
| _orig_vocoder_factory = ledger.vocoder | |
| _orig_spatial_upsampler_factory = ledger.spatial_upsampler | |
| _orig_text_encoder_factory = ledger.text_encoder | |
| _orig_gemma_embeddings_factory = ledger.gemma_embeddings_processor | |
| # Call the original factories once to create the cached instances we will serve by default. | |
| _transformer = _orig_transformer_factory() | |
| _video_encoder = _orig_video_encoder_factory() | |
| _video_decoder = _orig_video_decoder_factory() | |
| _audio_encoder = _orig_audio_encoder_factory() | |
| _audio_decoder = _orig_audio_decoder_factory() | |
| _vocoder = _orig_vocoder_factory() | |
| _spatial_upsampler = _orig_spatial_upsampler_factory() | |
| _text_encoder = _orig_text_encoder_factory() | |
| _embeddings_processor = _orig_gemma_embeddings_factory() | |
| # Replace ledger methods with lightweight lambdas that return the cached instances. | |
| # We keep the original factories above so we can call them later to rebuild components. | |
| ledger.transformer = lambda: _transformer | |
| ledger.video_encoder = lambda: _video_encoder | |
| ledger.video_decoder = lambda: _video_decoder | |
| ledger.audio_encoder = lambda: _audio_encoder | |
| ledger.audio_decoder = lambda: _audio_decoder | |
| ledger.vocoder = lambda: _vocoder | |
| ledger.spatial_upsampler = lambda: _spatial_upsampler | |
| ledger.text_encoder = lambda: _text_encoder | |
| ledger.gemma_embeddings_processor = lambda: _embeddings_processor | |
| print("All models preloaded (including Gemma text encoder and audio encoder)!") | |
| # ---- REPLACE PRELOAD BLOCK END ---- | |
| print("=" * 80) | |
| print("Pipeline ready!") | |
| print("=" * 80) | |
| def log_memory(tag: str): | |
| if torch.cuda.is_available(): | |
| allocated = torch.cuda.memory_allocated() / 1024**3 | |
| peak = torch.cuda.max_memory_allocated() / 1024**3 | |
| free, total = torch.cuda.mem_get_info() | |
| print(f"[VRAM {tag}] allocated={allocated:.2f}GB peak={peak:.2f}GB free={free / 1024**3:.2f}GB total={total / 1024**3:.2f}GB") | |
| def detect_aspect_ratio(image) -> str: | |
| if image is None: | |
| return "16:9" | |
| if hasattr(image, "size"): | |
| w, h = image.size | |
| elif hasattr(image, "shape"): | |
| h, w = image.shape[:2] | |
| else: | |
| return "16:9" | |
| ratio = w / h | |
| candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0} | |
| return min(candidates, key=lambda k: abs(ratio - candidates[k])) | |
| def on_image_upload(first_image, last_image, high_res): | |
| ref_image = first_image if first_image is not None else last_image | |
| aspect = detect_aspect_ratio(ref_image) | |
| tier = "high" if high_res else "low" | |
| w, h = RESOLUTIONS[tier][aspect] | |
| return gr.update(value=w), gr.update(value=h) | |
| def on_highres_toggle(first_image, last_image, high_res): | |
| ref_image = first_image if first_image is not None else last_image | |
| aspect = detect_aspect_ratio(ref_image) | |
| tier = "high" if high_res else "low" | |
| w, h = RESOLUTIONS[tier][aspect] | |
| return gr.update(value=w), gr.update(value=h) | |
| def get_gpu_duration( | |
| first_image, | |
| last_image, | |
| input_audio, | |
| prompt: str, | |
| duration: float, | |
| gpu_duration: float, | |
| enhance_prompt: bool = True, | |
| seed: int = 42, | |
| randomize_seed: bool = True, | |
| height: int = 1024, | |
| width: int = 1536, | |
| pose_strength: float = 0.0, | |
| general_strength: float = 0.0, | |
| motion_strength: float = 0.0, | |
| dreamlay_strength: float = 0.0, | |
| mself_strength: float = 0.0, | |
| dramatic_strength: float = 0.0, | |
| fluid_strength: float = 0.0, | |
| liquid_strength: float = 0.0, | |
| demopose_strength: float = 0.0, | |
| voice_strength: float = 0.0, | |
| realism_strength: float = 0.0, | |
| transition_strength: float = 0.0, | |
| progress=None, | |
| ): | |
| return int(gpu_duration) | |
| def generate_video( | |
| first_image, | |
| last_image, | |
| input_audio, | |
| prompt: str, | |
| duration: float, | |
| gpu_duration: float, | |
| enhance_prompt: bool = True, | |
| seed: int = 42, | |
| randomize_seed: bool = True, | |
| height: int = 1024, | |
| width: int = 1536, | |
| pose_strength: float = 0.0, | |
| general_strength: float = 0.0, | |
| motion_strength: float = 0.0, | |
| dreamlay_strength: float = 0.0, | |
| mself_strength: float = 0.0, | |
| dramatic_strength: float = 0.0, | |
| fluid_strength: float = 0.0, | |
| liquid_strength: float = 0.0, | |
| demopose_strength: float = 0.0, | |
| voice_strength: float = 0.0, | |
| realism_strength: float = 0.0, | |
| transition_strength: float = 0.0, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| try: | |
| torch.cuda.reset_peak_memory_stats() | |
| log_memory("start") | |
| current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) | |
| frame_rate = DEFAULT_FRAME_RATE | |
| num_frames = int(duration * frame_rate) + 1 | |
| num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1 | |
| print(f"Generating: {height}x{width}, {num_frames} frames ({duration}s), seed={current_seed}") | |
| images = [] | |
| output_dir = Path("outputs") | |
| output_dir.mkdir(exist_ok=True) | |
| if first_image is not None: | |
| temp_first_path = output_dir / f"temp_first_{current_seed}.jpg" | |
| if hasattr(first_image, "save"): | |
| first_image.save(temp_first_path) | |
| else: | |
| temp_first_path = Path(first_image) | |
| images.append(ImageConditioningInput(path=str(temp_first_path), frame_idx=0, strength=1.0)) | |
| if last_image is not None: | |
| temp_last_path = output_dir / f"temp_last_{current_seed}.jpg" | |
| if hasattr(last_image, "save"): | |
| last_image.save(temp_last_path) | |
| else: | |
| temp_last_path = Path(last_image) | |
| images.append(ImageConditioningInput(path=str(temp_last_path), frame_idx=num_frames - 1, strength=1.0)) | |
| tiling_config = TilingConfig.default() | |
| video_chunks_number = get_video_chunks_number(num_frames, tiling_config) | |
| log_memory("before pipeline call") | |
| apply_loras_to_transformer( | |
| pose_strength, general_strength, motion_strength, dreamlay_strength, | |
| mself_strength, dramatic_strength, fluid_strength, liquid_strength, | |
| demopose_strength, voice_strength, realism_strength, transition_strength, | |
| ) | |
| video, audio = pipeline( | |
| prompt=prompt, | |
| seed=current_seed, | |
| height=int(height), | |
| width=int(width), | |
| num_frames=num_frames, | |
| frame_rate=frame_rate, | |
| images=images, | |
| audio_path=input_audio, | |
| tiling_config=tiling_config, | |
| enhance_prompt=enhance_prompt, | |
| ) | |
| log_memory("after pipeline call") | |
| output_path = tempfile.mktemp(suffix=".mp4") | |
| encode_video( | |
| video=video, | |
| fps=frame_rate, | |
| audio=audio, | |
| output_path=output_path, | |
| video_chunks_number=video_chunks_number, | |
| ) | |
| log_memory("after encode_video") | |
| return str(output_path), current_seed | |
| except Exception as e: | |
| import traceback | |
| log_memory("on error") | |
| print(f"Error: {str(e)}\n{traceback.format_exc()}") | |
| return None, current_seed | |
| with gr.Blocks(title="LTX-2.3 Distilled") as demo: | |
| gr.Markdown("# LTX-2.3 F2LF with Fast Audio-Video Generation with Frame Conditioning") | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| first_image = gr.Image(label="First Frame (Optional)", type="pil") | |
| last_image = gr.Image(label="Last Frame (Optional)", type="pil") | |
| input_audio = gr.Audio(label="Audio Input (Optional)", type="filepath") | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| info="for best results - make it as elaborate as possible", | |
| value="Make this image come alive with cinematic motion, smooth animation", | |
| lines=3, | |
| placeholder="Describe the motion and animation you want...", | |
| ) | |
| duration = gr.Slider(label="Duration (seconds)", minimum=1.0, maximum=30.0, value=10.0, step=0.1) | |
| generate_btn = gr.Button("Generate Video", variant="primary", size="lg") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, value=10, step=1) | |
| randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) | |
| with gr.Row(): | |
| width = gr.Number(label="Width", value=1536, precision=0) | |
| height = gr.Number(label="Height", value=1024, precision=0) | |
| with gr.Row(): | |
| enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False) | |
| high_res = gr.Checkbox(label="High Resolution", value=True) | |
| with gr.Column(): | |
| gr.Markdown("### LoRA adapter strengths (set to 0 to disable; slow and WIP)") | |
| pose_strength = gr.Slider( | |
| label="Anthro Enhancer strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| general_strength = gr.Slider( | |
| label="Reasoning Enhancer strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| motion_strength = gr.Slider( | |
| label="Anthro Posing Helper strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| dreamlay_strength = gr.Slider( | |
| label="Dreamlay strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| mself_strength = gr.Slider( | |
| label="Mself strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| dramatic_strength = gr.Slider( | |
| label="Dramatic strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| fluid_strength = gr.Slider( | |
| label="Fluid Helper strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| liquid_strength = gr.Slider( | |
| label="Liquid Helper strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| demopose_strength = gr.Slider( | |
| label="Audio Helper strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| voice_strength = gr.Slider( | |
| label="Voice Helper strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| realism_strength = gr.Slider( | |
| label="Anthro Realism strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| transition_strength = gr.Slider( | |
| label="Transition strength", | |
| minimum=0.0, maximum=2.0, value=0.0, step=0.01 | |
| ) | |
| with gr.Column(): | |
| output_video = gr.Video(label="Generated Video", autoplay=False) | |
| gpu_duration = gr.Slider( | |
| label="ZeroGPU duration (seconds; 10 second Img2Vid with 1024x1024 and LoRAs = ~70)", | |
| minimum=30.0, | |
| maximum=240.0, | |
| value=75.0, | |
| step=1.0, | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| [ | |
| None, | |
| "pinkknit.jpg", | |
| None, | |
| "The camera falls downward through darkness as if dropped into a tunnel. " | |
| "As it slows, five friends wearing pink knitted hats and sunglasses lean " | |
| "over and look down toward the camera with curious expressions. The lens " | |
| "has a strong fisheye effect, creating a circular frame around them. They " | |
| "crowd together closely, forming a symmetrical cluster while staring " | |
| "directly into the lens.", | |
| 3.0, | |
| 80.0, | |
| False, | |
| 42, | |
| True, | |
| 1024, | |
| 1024, | |
| 0.0, # pose_strength (example) | |
| 0.0, # general_strength (example) | |
| 0.0, # motion_strength (example) | |
| 0.0, | |
| 0.0, | |
| 0.0, | |
| 0.0, | |
| 0.0, | |
| 0.0, | |
| 0.0, | |
| 0.0, | |
| 0.0, | |
| ], | |
| ], | |
| inputs=[ | |
| first_image, last_image, input_audio, prompt, duration, gpu_duration, | |
| enhance_prompt, seed, randomize_seed, height, width, | |
| pose_strength, general_strength, motion_strength, dreamlay_strength, mself_strength, dramatic_strength, fluid_strength, liquid_strength, demopose_strength, voice_strength, realism_strength, transition_strength, | |
| ], | |
| ) | |
| first_image.change( | |
| fn=on_image_upload, | |
| inputs=[first_image, last_image, high_res], | |
| outputs=[width, height], | |
| ) | |
| last_image.change( | |
| fn=on_image_upload, | |
| inputs=[first_image, last_image, high_res], | |
| outputs=[width, height], | |
| ) | |
| high_res.change( | |
| fn=on_highres_toggle, | |
| inputs=[first_image, last_image, high_res], | |
| outputs=[width, height], | |
| ) | |
| generate_btn.click( | |
| fn=generate_video, | |
| inputs=[ | |
| first_image, last_image, input_audio, prompt, duration, gpu_duration, enhance_prompt, | |
| seed, randomize_seed, height, width, | |
| pose_strength, general_strength, motion_strength, dreamlay_strength, mself_strength, dramatic_strength, fluid_strength, liquid_strength, demopose_strength, voice_strength, realism_strength, transition_strength, | |
| ], | |
| outputs=[output_video, seed], | |
| ) | |
| css = """ | |
| .fillable{max-width: 1200px !important} | |
| """ | |
| if __name__ == "__main__": | |
| demo.launch(theme=gr.themes.Citrus(), css=css) | |