import os import subprocess import sys from concurrent.futures import ThreadPoolExecutor # Enable fast downloads os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" os.environ["HF_XET_HIGH_PERFORMANCE"] = "1" # Disable torch.compile / dynamo before any torch import os.environ["TORCH_COMPILE_DISABLE"] = "1" os.environ["TORCHDYNAMO_DISABLE"] = "1" # Install xformers for memory-efficient attention subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False) # Clone LTX-2 repo at a pinned compatible commit and install packages LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") LTX_COMMIT = "ae855f8538843825f9015a419cf4ba5edaf5eec2" if os.path.exists(LTX_REPO_DIR): print(f"Removing existing repo at {LTX_REPO_DIR}...") subprocess.run(["rm", "-rf", LTX_REPO_DIR], check=True) print(f"Cloning {LTX_REPO_URL}...") subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True) print(f"Checking out commit {LTX_COMMIT}...") subprocess.run(["git", "-C", LTX_REPO_DIR, "checkout", LTX_COMMIT], check=True) print("Installing ltx-core and ltx-pipelines from pinned repo commit...") subprocess.run( [ sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines"), ], check=True, ) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) import logging import random import tempfile from pathlib import Path import torch torch._dynamo.config.suppress_errors = True torch._dynamo.config.disable = True # Critical workaround: Replace inference_mode with no_grad # Avoids "inference tensor" failures in spatial upsampler and VAE decoder torch.inference_mode = torch.no_grad import spaces import gradio as gr import numpy as np from huggingface_hub import hf_hub_download, snapshot_download from ltx_pipelines.distilled import DistilledPipeline from ltx_pipelines.utils.args import ImageConditioningInput from ltx_pipelines.utils.media_io import encode_video # Patch attention backend into the LTX attention module. import torch.nn.functional as F from ltx_core.model.transformer import attention as _attn_mod def _sdpa_as_mea(query, key, value, attn_bias=None, scale=None, **kwargs): # xformers memory_efficient_attention: (B, S, H, D) -> (B, S, H, D) # torch SDPA: (B, H, S, D) -> (B, H, S, D) q, k, v = query.transpose(1, 2), key.transpose(1, 2), value.transpose(1, 2) return F.scaled_dot_product_attention(q, k, v, scale=scale).transpose(1, 2) _cap = torch.cuda.get_device_capability() if torch.cuda.is_available() else (0, 0) _use_xformers = False if _cap < (12, 0): try: from xformers.ops import memory_efficient_attention as _mea _attn_mod.memory_efficient_attention = _mea _use_xformers = True print(f"[ATTN] Using xformers memory_efficient_attention") except Exception as e: print(f"[ATTN] xformers unavailable ({e}), falling back to SDPA") if not _use_xformers: _attn_mod.memory_efficient_attention = _sdpa_as_mea print(f"[ATTN] Using SDPA fallback (sm_{_cap[0]}{_cap[1]})") logging.getLogger().setLevel(logging.INFO) MAX_SEED = np.iinfo(np.int32).max DEFAULT_PROMPT = ( "An astronaut hatches from a fragile egg on the surface of the Moon, " "the shell cracking and peeling apart in gentle low-gravity motion. " "Fine lunar dust lifts and drifts outward with each movement, floating " "in slow arcs before settling back onto the ground." ) DEFAULT_FRAME_RATE = 24.0 # Resolution presets: (width, height) RESOLUTIONS = { "high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)}, "low": {"16:9": (512, 320), "9:16": (320, 512), "1:1": (512, 512)}, } # Model repos CHECKPOINT_REPO = "SulphurAI/Sulphur-2-base" LTX_MODEL_REPO = "Lightricks/LTX-2.3" GEMMA_REPO = "Lightricks/gemma-3-12b-it-qat-q4_0-unquantized" # Download model checkpoints in parallel for speed print("=" * 80) print("Downloading Element-16 (pre-distilled) + Gemma (parallel)...") print("=" * 80) def download_checkpoint(): # Use pre-distilled checkpoint - no LoRA needed return hf_hub_download(repo_id=CHECKPOINT_REPO, filename="sulphur_distil_bf16.safetensors") def download_upsampler(): return hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors") def download_gemma(): return snapshot_download(repo_id=GEMMA_REPO) with ThreadPoolExecutor(max_workers=3) as executor: future_checkpoint = executor.submit(download_checkpoint) future_upsampler = executor.submit(download_upsampler) future_gemma = executor.submit(download_gemma) checkpoint_path = future_checkpoint.result() spatial_upsampler_path = future_upsampler.result() gemma_root = future_gemma.result() print(f"Checkpoint: {checkpoint_path}") print(f"Spatial upsampler: {spatial_upsampler_path}") print(f"Gemma root: {gemma_root}") # Initialize pipeline with pre-distilled checkpoint (no LoRA needed) pipeline = DistilledPipeline( distilled_checkpoint_path=checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root=gemma_root, loras=(), ) # Preload all models for ZeroGPU tensor packing print("Preloading all pipeline components via model_ledger...") # DistilledPipeline uses model_ledger similar to other pipelines ledger = pipeline.model_ledger _transformer = ledger.transformer() _video_encoder = ledger.video_encoder() _video_decoder = ledger.video_decoder() _spatial_upsampler = ledger.spatial_upsampler() _text_encoder = ledger.text_encoder() _embeddings_processor = ledger.gemma_embeddings_processor() _audio_encoder = ledger.audio_encoder() _audio_decoder = ledger.audio_decoder() _vocoder = ledger.vocoder() # Replace ledger methods with lambdas returning preloaded instances ledger.transformer = lambda: _transformer ledger.video_encoder = lambda: _video_encoder ledger.video_decoder = lambda: _video_decoder ledger.spatial_upsampler = lambda: _spatial_upsampler ledger.text_encoder = lambda: _text_encoder ledger.gemma_embeddings_processor = lambda: _embeddings_processor ledger.audio_encoder = lambda: _audio_encoder ledger.audio_decoder = lambda: _audio_decoder ledger.vocoder = lambda: _vocoder print("All models preloaded!") print("=" * 80) print("Pipeline ready!") print("=" * 80) def log_memory(tag: str): if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated() / 1024**3 peak = torch.cuda.max_memory_allocated() / 1024**3 free, total = torch.cuda.mem_get_info() print(f"[VRAM {tag}] allocated={allocated:.2f}GB peak={peak:.2f}GB free={free / 1024**3:.2f}GB total={total / 1024**3:.2f}GB") def detect_aspect_ratio(image) -> str: if image is None: return "16:9" if hasattr(image, "size"): w, h = image.size elif hasattr(image, "shape"): h, w = image.shape[:2] else: return "16:9" ratio = w / h candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0} return min(candidates, key=lambda k: abs(ratio - candidates[k])) def on_image_upload(first_image, last_image, high_res): ref_image = first_image if first_image is not None else last_image aspect = detect_aspect_ratio(ref_image) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] return gr.update(value=w), gr.update(value=h) def on_highres_toggle(first_image, last_image, high_res): ref_image = first_image if first_image is not None else last_image aspect = detect_aspect_ratio(ref_image) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] return gr.update(value=w), gr.update(value=h) DEFAULT_NEGATIVE_PROMPT = "色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走, blurry, glasses, deformed, subtitles, text, captions, worst quality, low quality, inconsistent motion, jittery, distorted" def remove_music_demucs(input_video_path: str, output_video_path: str) -> bool: """Remove background music from video using Demucs, keeping only vocals.""" import subprocess import tempfile from pathlib import Path try: with tempfile.TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) # Extract audio from video audio_in = tmpdir / "audio.wav" extract_cmd = [ 'ffmpeg', '-y', '-i', input_video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', str(audio_in) ] result = subprocess.run(extract_cmd, capture_output=True, text=True) if result.returncode != 0: print(f"[demucs] Failed to extract audio: {result.stderr[-200:]}") return False print(f"[demucs] Running music separation...") import soundfile as sf from demucs.pretrained import get_model from demucs.apply import apply_model # Load model (cached after first run) model = get_model('htdemucs') model.to('cuda') model.eval() # Load audio data, sr = sf.read(str(audio_in)) wav = torch.from_numpy(data.T).float() if wav.dim() == 1: wav = wav.unsqueeze(0) # Resample if needed if sr != model.samplerate: import torchaudio wav = torchaudio.functional.resample(wav, sr, model.samplerate) wav = wav.unsqueeze(0).to('cuda') # Separate sources with torch.no_grad(): sources = apply_model(model, wav, overlap=0.25, progress=False) # Keep only vocals (index 3) vocals = sources[0, 3].cpu() # Save vocals audio_out = tmpdir / "vocals.wav" audio_np = vocals.numpy().T sf.write(str(audio_out), audio_np, model.samplerate) print(f"[demucs] Merging vocals back with video...") merge_cmd = [ 'ffmpeg', '-y', '-i', input_video_path, '-i', str(audio_out), '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-c:a', 'aac', '-b:a', '128k', '-shortest', output_video_path ] result = subprocess.run(merge_cmd, capture_output=True, text=True) if result.returncode != 0: print(f"[demucs] Failed to merge: {result.stderr[-200:]}") return False print(f"[demucs] Successfully removed music") return True except Exception as e: print(f"[demucs] Error: {e}") import traceback traceback.print_exc() return False def apply_gaussian_blur(video_tensor: torch.Tensor, blur_amount: int) -> torch.Tensor: """Apply Gaussian blur to video tensor. Video shape: [frames, H, W, C]""" if blur_amount <= 0: return video_tensor from torchvision.transforms.functional import gaussian_blur # Ensure kernel size is odd and at least 3 kernel_size = blur_amount * 2 + 1 sigma = blur_amount / 2.0 # Video tensor is [frames, H, W, C], but gaussian_blur expects [batch, C, H, W] # Permute to [frames, C, H, W] video_tensor = video_tensor.permute(0, 3, 1, 2) blurred = gaussian_blur(video_tensor, kernel_size=[kernel_size, kernel_size], sigma=[sigma, sigma]) # Permute back to [frames, H, W, C] blurred = blurred.permute(0, 2, 3, 1) return blurred @spaces.GPU(duration=90) @torch.inference_mode() def generate_video( first_image, last_image, prompt: str, duration: float, enhance_prompt: bool = True, seed: int = 42, randomize_seed: bool = True, height: int = 320, width: int = 512, negative_prompt: str = DEFAULT_NEGATIVE_PROMPT, blur_amount: int = 0, remove_music: bool = False, progress=gr.Progress(track_tqdm=True), ): try: torch.cuda.reset_peak_memory_stats() log_memory("start") current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) frame_rate = DEFAULT_FRAME_RATE num_frames = int(duration * frame_rate) + 1 num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1 print(f"Generating: {height}x{width}, {num_frames} frames ({duration}s), seed={current_seed}") images = [] output_dir = Path("outputs") output_dir.mkdir(exist_ok=True) if first_image is not None: temp_first_path = output_dir / f"temp_first_{current_seed}.jpg" if hasattr(first_image, "save"): first_image.save(temp_first_path) else: temp_first_path = Path(first_image) images.append(ImageConditioningInput(path=str(temp_first_path), frame_idx=0, strength=1.0)) if last_image is not None: temp_last_path = output_dir / f"temp_last_{current_seed}.jpg" if hasattr(last_image, "save"): last_image.save(temp_last_path) else: temp_last_path = Path(last_image) images.append(ImageConditioningInput(path=str(temp_last_path), frame_idx=num_frames - 1, strength=1.0)) from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number tiling_config = TilingConfig.default() video_chunks_number = get_video_chunks_number(num_frames, tiling_config) log_memory("before pipeline call") # Run inference - DistilledPipeline has simpler API video_frames_iter, audio = pipeline( prompt=prompt, seed=current_seed, height=int(height), width=int(width), num_frames=num_frames, frame_rate=frame_rate, images=images, enhance_prompt=enhance_prompt, ) # Collect video frames frames = [frame for frame in video_frames_iter] video_tensor = torch.cat(frames, dim=0) if len(frames) > 1 else frames[0] log_memory("after pipeline call") # Apply Gaussian blur if requested (for censoring/teaser effect) if blur_amount > 0: print(f"Applying Gaussian blur (amount={blur_amount})...") video_tensor = apply_gaussian_blur(video_tensor, blur_amount) log_memory("after blur") output_path = tempfile.mktemp(suffix=".mp4") encode_video( video=video_tensor, fps=frame_rate, audio=audio, output_path=output_path, video_chunks_number=video_chunks_number, ) log_memory("after encode_video") # Remove background music if requested if remove_music: print(f"Removing background music with Demucs...") processed_path = tempfile.mktemp(suffix=".mp4") success = remove_music_demucs(output_path, processed_path) if success: output_path = processed_path log_memory("after demucs") else: print(f"Warning: Music removal failed, using original video") return str(output_path), current_seed except Exception as e: import traceback log_memory("on error") print(f"Error: {str(e)}\n{traceback.format_exc()}") return None, current_seed with gr.Blocks(title="Element-16 Video", delete_cache=(3600, 7200)) as demo: # cleanup: check every 1h, delete files >2h old gr.Markdown("# Element-16: Fast Video Generation with Frame Conditioning") gr.Markdown( "High quality video + audio generation with first and last frame conditioning. " "Optimized fp8 model for faster inference. " "[[code]](https://github.com/Lightricks/LTX-2)" ) with gr.Row(): with gr.Column(): with gr.Row(): first_image = gr.Image(label="First Frame (Optional)", type="pil") last_image = gr.Image(label="Last Frame (Optional)", type="pil") prompt = gr.Textbox( label="Prompt", info="for best results - make it as elaborate as possible", value="Make this image come alive with cinematic motion, smooth animation", lines=3, placeholder="Describe the motion and animation you want...", ) duration = gr.Slider(label="Duration (seconds)", minimum=1.0, maximum=10.0, value=3.0, step=0.1) generate_btn = gr.Button("Generate Video", variant="primary", size="lg") with gr.Accordion("Advanced Settings", open=False): seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, value=10, step=1) randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) with gr.Row(): width = gr.Number(label="Width", value=512, precision=0) height = gr.Number(label="Height", value=320, precision=0) with gr.Row(): enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False) high_res = gr.Checkbox(label="High Resolution", value=False) with gr.Row(): blur_amount = gr.Number(label="Blur (0=off, 36=heavy)", value=0, precision=0) remove_music = gr.Checkbox(label="Remove Music", value=False) negative_prompt = gr.Textbox( label="Negative Prompt", value=DEFAULT_NEGATIVE_PROMPT, lines=3, placeholder="What to avoid in the generated video...", ) with gr.Column(): output_video = gr.Video(label="Generated Video", autoplay=True) gr.Examples( examples=[ [ None, "pinkknit.jpg", "The camera falls downward through darkness as if dropped into a tunnel. " "As it slows, five friends wearing pink knitted hats and sunglasses lean " "over and look down toward the camera with curious expressions. The lens " "has a strong fisheye effect, creating a circular frame around them. They " "crowd together closely, forming a symmetrical cluster while staring " "directly into the lens.", 3.0, False, 42, True, 1024, 1024, ], ], inputs=[ first_image, last_image, prompt, duration, enhance_prompt, seed, randomize_seed, height, width, ], ) first_image.change( fn=on_image_upload, inputs=[first_image, last_image, high_res], outputs=[width, height], ) last_image.change( fn=on_image_upload, inputs=[first_image, last_image, high_res], outputs=[width, height], ) high_res.change( fn=on_highres_toggle, inputs=[first_image, last_image, high_res], outputs=[width, height], ) generate_btn.click( fn=generate_video, inputs=[ first_image, last_image, prompt, duration, enhance_prompt, seed, randomize_seed, height, width, negative_prompt, blur_amount, remove_music, ], outputs=[output_video, seed], ) css = """ .fillable{max-width: 1200px !important} """ if __name__ == "__main__": demo.launch(theme=gr.themes.Citrus(), css=css)