Element-8-Video / app.py
Vicente Alvarez
Use Noto Sans font for universal language support
9b74663
import os
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
# Enable fast downloads
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
os.environ["HF_XET_HIGH_PERFORMANCE"] = "1"
# Disable torch.compile / dynamo before any torch import
os.environ["TORCH_COMPILE_DISABLE"] = "1"
os.environ["TORCHDYNAMO_DISABLE"] = "1"
# Install xformers for memory-efficient attention
subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False)
# Clone LTX-2 repo at a pinned compatible commit and install packages
LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git"
LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2")
LTX_COMMIT = "ae855f8538843825f9015a419cf4ba5edaf5eec2"
if os.path.exists(LTX_REPO_DIR):
print(f"Removing existing repo at {LTX_REPO_DIR}...")
subprocess.run(["rm", "-rf", LTX_REPO_DIR], check=True)
print(f"Cloning {LTX_REPO_URL}...")
subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True)
print(f"Checking out commit {LTX_COMMIT}...")
subprocess.run(["git", "-C", LTX_REPO_DIR, "checkout", LTX_COMMIT], check=True)
print("Installing ltx-core and ltx-pipelines from pinned repo commit...")
subprocess.run(
[
sys.executable, "-m", "pip", "install",
"--force-reinstall", "--no-deps",
"-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-core"),
"-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines"),
],
check=True,
)
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src"))
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src"))
import logging
import random
import tempfile
from pathlib import Path
import torch
torch._dynamo.config.suppress_errors = True
torch._dynamo.config.disable = True
# Critical workaround: Replace inference_mode with no_grad
# Avoids "inference tensor" failures in spatial upsampler and VAE decoder
torch.inference_mode = torch.no_grad
import spaces
import gradio as gr
import numpy as np
from huggingface_hub import hf_hub_download, snapshot_download
from ltx_pipelines.distilled import DistilledPipeline
from ltx_pipelines.utils.args import ImageConditioningInput
from ltx_pipelines.utils.media_io import encode_video
# Patch attention backend into the LTX attention module.
import torch.nn.functional as F
from ltx_core.model.transformer import attention as _attn_mod
def _sdpa_as_mea(query, key, value, attn_bias=None, scale=None, **kwargs):
# xformers memory_efficient_attention: (B, S, H, D) -> (B, S, H, D)
# torch SDPA: (B, H, S, D) -> (B, H, S, D)
q, k, v = query.transpose(1, 2), key.transpose(1, 2), value.transpose(1, 2)
return F.scaled_dot_product_attention(q, k, v, scale=scale).transpose(1, 2)
_cap = torch.cuda.get_device_capability() if torch.cuda.is_available() else (0, 0)
_use_xformers = False
if _cap < (12, 0):
try:
from xformers.ops import memory_efficient_attention as _mea
_attn_mod.memory_efficient_attention = _mea
_use_xformers = True
print(f"[ATTN] Using xformers memory_efficient_attention")
except Exception as e:
print(f"[ATTN] xformers unavailable ({e}), falling back to SDPA")
if not _use_xformers:
_attn_mod.memory_efficient_attention = _sdpa_as_mea
print(f"[ATTN] Using SDPA fallback (sm_{_cap[0]}{_cap[1]})")
logging.getLogger().setLevel(logging.INFO)
MAX_SEED = np.iinfo(np.int32).max
DEFAULT_PROMPT = (
"An astronaut hatches from a fragile egg on the surface of the Moon, "
"the shell cracking and peeling apart in gentle low-gravity motion. "
"Fine lunar dust lifts and drifts outward with each movement, floating "
"in slow arcs before settling back onto the ground."
)
DEFAULT_FRAME_RATE = 24.0
# Resolution presets: (width, height)
RESOLUTIONS = {
"high": {"16:9": (1024, 640), "9:16": (640, 1024), "1:1": (1024, 1024)},
"low": {"16:9": (512, 320), "9:16": (320, 512), "1:1": (512, 512)},
}
# Model repos
LTX_MODEL_REPO = "Lightricks/LTX-2.3"
GEMMA_REPO = "Lightricks/gemma-3-12b-it-qat-q4_0-unquantized"
# Download model checkpoints in parallel for speed
print("=" * 80)
print("Downloading Element-8 (pre-distilled LTX) + Gemma (parallel)...")
print("=" * 80)
def download_checkpoint():
# Use pre-distilled LTX checkpoint - no LoRA needed
return hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-22b-distilled.safetensors")
def download_upsampler():
return hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors")
def download_gemma():
return snapshot_download(repo_id=GEMMA_REPO)
with ThreadPoolExecutor(max_workers=3) as executor:
future_checkpoint = executor.submit(download_checkpoint)
future_upsampler = executor.submit(download_upsampler)
future_gemma = executor.submit(download_gemma)
checkpoint_path = future_checkpoint.result()
spatial_upsampler_path = future_upsampler.result()
gemma_root = future_gemma.result()
print(f"Checkpoint: {checkpoint_path}")
print(f"Spatial upsampler: {spatial_upsampler_path}")
print(f"Gemma root: {gemma_root}")
# Initialize pipeline with pre-distilled checkpoint (no LoRA needed)
pipeline = DistilledPipeline(
distilled_checkpoint_path=checkpoint_path,
spatial_upsampler_path=spatial_upsampler_path,
gemma_root=gemma_root,
loras=(),
)
# Preload all models for ZeroGPU tensor packing
print("Preloading all pipeline components via model_ledger...")
# DistilledPipeline uses model_ledger similar to other pipelines
ledger = pipeline.model_ledger
_transformer = ledger.transformer()
_video_encoder = ledger.video_encoder()
_video_decoder = ledger.video_decoder()
_spatial_upsampler = ledger.spatial_upsampler()
_text_encoder = ledger.text_encoder()
_embeddings_processor = ledger.gemma_embeddings_processor()
_audio_encoder = ledger.audio_encoder()
_audio_decoder = ledger.audio_decoder()
_vocoder = ledger.vocoder()
# Replace ledger methods with lambdas returning preloaded instances
ledger.transformer = lambda: _transformer
ledger.video_encoder = lambda: _video_encoder
ledger.video_decoder = lambda: _video_decoder
ledger.spatial_upsampler = lambda: _spatial_upsampler
ledger.text_encoder = lambda: _text_encoder
ledger.gemma_embeddings_processor = lambda: _embeddings_processor
ledger.audio_encoder = lambda: _audio_encoder
ledger.audio_decoder = lambda: _audio_decoder
ledger.vocoder = lambda: _vocoder
print("All models preloaded!")
print("=" * 80)
print("Pipeline ready!")
print("=" * 80)
def log_memory(tag: str):
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024**3
peak = torch.cuda.max_memory_allocated() / 1024**3
free, total = torch.cuda.mem_get_info()
print(f"[VRAM {tag}] allocated={allocated:.2f}GB peak={peak:.2f}GB free={free / 1024**3:.2f}GB total={total / 1024**3:.2f}GB")
def detect_aspect_ratio(image) -> str:
if image is None:
return "16:9"
if hasattr(image, "size"):
w, h = image.size
elif hasattr(image, "shape"):
h, w = image.shape[:2]
else:
return "16:9"
ratio = w / h
candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0}
return min(candidates, key=lambda k: abs(ratio - candidates[k]))
def on_image_upload(first_image, last_image, high_res):
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
def on_highres_toggle(first_image, last_image, high_res):
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
DEFAULT_NEGATIVE_PROMPT = "色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走, blurry, glasses, deformed, subtitles, text, captions, worst quality, low quality, inconsistent motion, jittery, distorted"
def remove_music_demucs(input_video_path: str, output_video_path: str) -> bool:
"""Remove background music from video using Demucs, keeping only vocals."""
import subprocess
import tempfile
from pathlib import Path
try:
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# Extract audio from video
audio_in = tmpdir / "audio.wav"
extract_cmd = [
'ffmpeg', '-y', '-i', input_video_path,
'-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2',
str(audio_in)
]
result = subprocess.run(extract_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"[demucs] Failed to extract audio: {result.stderr[-200:]}")
return False
print(f"[demucs] Running music separation...")
import soundfile as sf
from demucs.pretrained import get_model
from demucs.apply import apply_model
# Load model (cached after first run)
model = get_model('htdemucs')
model.to('cuda')
model.eval()
# Load audio
data, sr = sf.read(str(audio_in))
wav = torch.from_numpy(data.T).float()
if wav.dim() == 1:
wav = wav.unsqueeze(0)
# Resample if needed
if sr != model.samplerate:
import torchaudio
wav = torchaudio.functional.resample(wav, sr, model.samplerate)
wav = wav.unsqueeze(0).to('cuda')
# Separate sources
with torch.no_grad():
sources = apply_model(model, wav, overlap=0.25, progress=False)
# Keep only vocals (index 3)
vocals = sources[0, 3].cpu()
# Save vocals
audio_out = tmpdir / "vocals.wav"
audio_np = vocals.numpy().T
sf.write(str(audio_out), audio_np, model.samplerate)
print(f"[demucs] Merging vocals back with video...")
merge_cmd = [
'ffmpeg', '-y',
'-i', input_video_path,
'-i', str(audio_out),
'-c:v', 'copy',
'-map', '0:v:0', '-map', '1:a:0',
'-c:a', 'aac', '-b:a', '128k',
'-shortest',
output_video_path
]
result = subprocess.run(merge_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"[demucs] Failed to merge: {result.stderr[-200:]}")
return False
print(f"[demucs] Successfully removed music")
return True
except Exception as e:
print(f"[demucs] Error: {e}")
import traceback
traceback.print_exc()
return False
def apply_gaussian_blur(video_tensor: torch.Tensor, blur_amount: int) -> torch.Tensor:
"""Apply Gaussian blur to video tensor. Video shape: [frames, H, W, C]"""
if blur_amount <= 0:
return video_tensor
from torchvision.transforms.functional import gaussian_blur
# Ensure kernel size is odd and at least 3
kernel_size = blur_amount * 2 + 1
sigma = blur_amount / 2.0
# Video tensor is [frames, H, W, C], but gaussian_blur expects [batch, C, H, W]
# Permute to [frames, C, H, W]
video_tensor = video_tensor.permute(0, 3, 1, 2)
blurred = gaussian_blur(video_tensor, kernel_size=[kernel_size, kernel_size], sigma=[sigma, sigma])
# Permute back to [frames, H, W, C]
blurred = blurred.permute(0, 2, 3, 1)
return blurred
def loop_clips_with_audio_track(clip_paths: list[str], audio_path: str) -> str:
"""Loop video clips to match audio duration. CPU work - free."""
import subprocess
from pydub import AudioSegment
try:
# Get audio duration
audio = AudioSegment.from_file(audio_path)
audio_duration = len(audio) / 1000.0 # Convert to seconds
# Get total clips duration
clips_duration = 0.0
for clip in clip_paths:
probe = subprocess.run([
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', clip
], capture_output=True, text=True, check=True)
clips_duration += float(probe.stdout.strip())
# Calculate loop count
loop_count = int(audio_duration / clips_duration) + 1
print(f"[loop] Audio: {audio_duration:.2f}s, Clips: {clips_duration:.2f}s, Loops: {loop_count}")
# Create concat file with loops
concat_file = tempfile.mktemp(suffix=".txt")
with open(concat_file, 'w') as f:
for _ in range(loop_count):
for clip in clip_paths:
f.write(f"file '{clip}'\n")
# Concat videos
concat_video = tempfile.mktemp(suffix=".mp4")
result = subprocess.run([
'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file,
'-c', 'copy', concat_video
], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Concat failed: {result.stderr[-200:]}")
# Replace audio and trim to audio duration
final_video = tempfile.mktemp(suffix=".mp4")
result = subprocess.run([
'ffmpeg', '-y',
'-i', concat_video,
'-i', audio_path,
'-map', '0:v:0', '-map', '1:a:0',
'-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k',
'-t', str(audio_duration),
'-shortest',
final_video
], capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Audio merge failed: {result.stderr[-200:]}")
print(f"[loop] Created looped video: {audio_duration:.2f}s")
return final_video
except Exception as e:
print(f"[loop] Error: {e}")
import traceback
traceback.print_exc()
return clip_paths[0] if clip_paths else None
def transcribe_with_whisper_gpu(video_path: str, model_size: str = "small") -> list[dict]:
"""Transcribe video audio with Whisper on GPU (already inside GPU context). Returns segments with timestamps."""
import whisper
try:
print(f"[whisper] Loading {model_size} model on GPU...")
model = whisper.load_model(model_size).to('cuda')
print(f"[whisper] Transcribing audio on GPU...")
result = model.transcribe(video_path, word_timestamps=True, fp16=True)
print(f"[whisper] Transcription complete: {len(result['segments'])} segments")
return result['segments']
except Exception as e:
print(f"[whisper] Error: {e}")
import traceback
traceback.print_exc()
return []
def create_beautiful_ass_subtitles(segments: list[dict], output_path: str, video_width: int, video_height: int):
"""Create elegant animated ASS subtitles with universal language support."""
# Download Noto Sans - supports all languages (Latin, CJK, Tamil, etc.)
import urllib.request
font_url = "https://github.com/google/fonts/raw/main/ofl/notosans/NotoSans-SemiBold.ttf"
font_path = "/tmp/NotoSans-SemiBold.ttf"
font_name = "Noto Sans SemiBold"
try:
if not os.path.exists(font_path):
urllib.request.urlretrieve(font_url, font_path)
except:
font_name = "Arial" # Fallback
# ASS subtitle header with beautiful styling
ass_content = f"""[Script Info]
Title: Elegant Subtitles
ScriptType: v4.00+
WrapStyle: 0
PlayResX: {video_width}
PlayResY: {video_height}
ScaledBorderAndShadow: yes
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font_name},{int(video_height * 0.05)},&H00FFFFFF,&H000000FF,&H00000000,&H80000000,0,0,0,0,100,100,0,0,1,2,1,5,10,10,{int(video_height * 0.42)},1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
# Add each segment with fade animation
for seg in segments:
start_time = format_ass_time(seg['start'])
end_time = format_ass_time(seg['end'])
text = seg['text'].strip()
# Add fade in/out animation
fade_duration = 200 # ms
animated_text = f"{{\\fad({fade_duration},{fade_duration})}}{text}"
ass_content += f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{animated_text}\n"
with open(output_path, 'w', encoding='utf-8') as f:
f.write(ass_content)
print(f"[subtitles] Created ASS file with {len(segments)} segments")
def format_ass_time(seconds: float) -> str:
"""Convert seconds to ASS timestamp format (h:mm:ss.cc)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
centisecs = int((seconds % 1) * 100)
return f"{hours}:{minutes:02d}:{secs:02d}.{centisecs:02d}"
def burn_subtitles_and_watermark(video_path: str, output_path: str, subtitle_path: str = None, watermark_path: str = None):
"""Burn subtitles and/or watermark into video using FFmpeg. CPU work - free."""
import subprocess
import tempfile
try:
current_video = video_path
# Step 1: Apply watermark if needed (first pass)
if watermark_path and os.path.exists(watermark_path):
print(f"[burn] Pass 1: Overlaying watermark...")
# Get video duration for looping watermark
probe = subprocess.run([
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', current_video
], capture_output=True, text=True, check=True)
video_duration = float(probe.stdout.strip())
temp_watermarked = tempfile.mktemp(suffix=".mp4")
cmd = [
'ffmpeg', '-y', '-i', current_video,
'-loop', '1', '-t', str(video_duration), '-i', watermark_path,
'-filter_complex', '[1:v][0:v]scale2ref[ovr][base];[base][ovr]overlay=0:0[vout]',
'-map', '[vout]', '-map', '0:a?',
'-c:a', 'copy', '-pix_fmt', 'yuv420p',
temp_watermarked
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Watermark pass failed: {result.stderr[-200:]}")
current_video = temp_watermarked
print(f"[burn] Watermark applied")
# Step 2: Apply subtitles if needed (second pass)
if subtitle_path and os.path.exists(subtitle_path):
print(f"[burn] Pass 2: Burning subtitles from {subtitle_path}...")
# Escape the subtitle path for FFmpeg (replace : with \\: and \ with /)
subtitle_path_escaped = subtitle_path.replace('\\', '/').replace(':', '\\:')
cmd = [
'ffmpeg', '-y', '-i', current_video,
'-vf', f"subtitles='{subtitle_path_escaped}':force_style='FontName=Noto Sans SemiBold'",
'-c:a', 'copy', '-pix_fmt', 'yuv420p',
output_path
]
print(f"[burn] FFmpeg command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"[burn] Subtitle stderr: {result.stderr}")
raise Exception(f"Subtitle pass failed: {result.stderr[-500:]}")
print(f"[burn] Subtitles burned successfully")
elif current_video != video_path:
# Only watermark was applied, move temp file to output
import shutil
shutil.move(current_video, output_path)
else:
# Nothing to do, just copy
import shutil
shutil.copy2(video_path, output_path)
print(f"[burn] Successfully burned subtitles/watermark")
return True
except Exception as e:
print(f"[burn] Error: {e}")
import traceback
traceback.print_exc()
return False
@spaces.GPU(duration=90)
@torch.inference_mode()
def generate_video(
first_image,
last_image,
prompts: list[str],
duration: float,
enhance_prompt: bool = True,
seed: int = 42,
randomize_seed: bool = True,
height: int = 320,
width: int = 512,
negative_prompt: str = DEFAULT_NEGATIVE_PROMPT,
blur_amount: int = 0,
remove_music: bool = False,
add_subtitles: bool = False,
audio_track = None,
progress=gr.Progress(track_tqdm=True),
):
try:
torch.cuda.reset_peak_memory_stats()
log_memory("start")
base_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed)
generated_clips = []
# Generate multiple clips in one GPU session (one per prompt)
for clip_idx, prompt in enumerate(prompts):
current_seed = base_seed + clip_idx
print(f"[GPU] Generating clip {clip_idx + 1}/{len(prompts)}, prompt: {prompt[:50]}..., seed={current_seed}")
frame_rate = DEFAULT_FRAME_RATE
num_frames = int(duration * frame_rate) + 1
num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1
print(f"Generating: {height}x{width}, {num_frames} frames ({duration}s), seed={current_seed}")
images = []
output_dir = Path("outputs")
output_dir.mkdir(exist_ok=True)
if first_image is not None:
temp_first_path = output_dir / f"temp_first_{current_seed}.jpg"
if hasattr(first_image, "save"):
first_image.save(temp_first_path)
else:
temp_first_path = Path(first_image)
images.append(ImageConditioningInput(path=str(temp_first_path), frame_idx=0, strength=1.0))
if last_image is not None:
temp_last_path = output_dir / f"temp_last_{current_seed}.jpg"
if hasattr(last_image, "save"):
last_image.save(temp_last_path)
else:
temp_last_path = Path(last_image)
images.append(ImageConditioningInput(path=str(temp_last_path), frame_idx=num_frames - 1, strength=1.0))
from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number
tiling_config = TilingConfig.default()
video_chunks_number = get_video_chunks_number(num_frames, tiling_config)
log_memory("before pipeline call")
# Run inference - DistilledPipeline has simpler API
video_frames_iter, audio = pipeline(
prompt=prompt,
seed=current_seed,
height=int(height),
width=int(width),
num_frames=num_frames,
frame_rate=frame_rate,
images=images,
enhance_prompt=enhance_prompt,
)
# Collect video frames
frames = [frame for frame in video_frames_iter]
video_tensor = torch.cat(frames, dim=0) if len(frames) > 1 else frames[0]
log_memory("after pipeline call")
# Apply Gaussian blur if requested (for censoring/teaser effect)
if blur_amount > 0:
print(f"Applying Gaussian blur (amount={blur_amount})...")
video_tensor = apply_gaussian_blur(video_tensor, blur_amount)
log_memory("after blur")
output_path = tempfile.mktemp(suffix=".mp4")
encode_video(
video=video_tensor,
fps=frame_rate,
audio=audio,
output_path=output_path,
video_chunks_number=video_chunks_number,
)
log_memory("after encode_video")
# Remove background music if requested
if remove_music:
print(f"Removing background music with Demucs...")
processed_path = tempfile.mktemp(suffix=".mp4")
success = remove_music_demucs(output_path, processed_path)
if success:
output_path = processed_path
log_memory("after demucs")
else:
print(f"Warning: Music removal failed, using original video")
generated_clips.append(str(output_path))
# Transcribe with Whisper if requested (still within GPU context)
subtitle_segments = []
if add_subtitles and audio_track:
print("[GPU] Transcribing audio track with Whisper...")
# Transcribe the audio track file, not the generated video (which has no audio yet)
subtitle_segments = transcribe_with_whisper_gpu(audio_track, model_size="small")
log_memory("after whisper")
elif add_subtitles and not audio_track:
print("[GPU] Warning: Subtitles requested but no audio track provided - skipping transcription")
# Return all generated clips and subtitle segments
return generated_clips, subtitle_segments, base_seed
except Exception as e:
import traceback
log_memory("on error")
print(f"Error: {str(e)}\n{traceback.format_exc()}")
return [], [], base_seed
def full_generation_process(
first_image,
last_image,
prompt1: str,
prompt2: str,
prompt3: str,
duration: float,
enhance_prompt: bool,
seed: int,
randomize_seed: bool,
height: int,
width: int,
negative_prompt: str,
blur_amount: int,
remove_music: bool,
add_subtitles: bool,
watermark,
audio_track,
progress=gr.Progress(track_tqdm=True),
):
"""Main entry point: generates clips (GPU) then optionally loops with audio (CPU)."""
# Collect non-empty prompts
prompts = [p.strip() for p in [prompt1, prompt2, prompt3] if p and p.strip()]
if not prompts:
return None, seed
print(f"Generating {len(prompts)} clip(s)")
# Phase 1: Generate clips + transcribe (GPU time counted)
clips, subtitle_segments, final_seed = generate_video(
first_image, last_image, prompts, duration, enhance_prompt,
seed, randomize_seed, height, width, negative_prompt,
blur_amount, remove_music, add_subtitles, audio_track, progress
)
if not clips:
return None, final_seed
# Phase 2: CPU work (free) - loop clips with audio if provided
if audio_track and len(clips) > 1:
print("[CPU] Looping clips to match audio duration...")
final_video = loop_clips_with_audio_track(clips, audio_track)
elif len(clips) == 1:
final_video = clips[0]
else:
final_video = clips[0]
# Phase 3: CPU work (free) - add subtitles and/or watermark
if add_subtitles or watermark:
print("[CPU] Adding subtitles/watermark...")
# Use subtitle segments from GPU transcription
subtitle_file = None
if add_subtitles and subtitle_segments:
subtitle_file = tempfile.mktemp(suffix=".ass")
create_beautiful_ass_subtitles(subtitle_segments, subtitle_file, int(width), int(height))
print(f"[subtitles] Created subtitle file: {subtitle_file}, exists: {os.path.exists(subtitle_file)}")
if os.path.exists(subtitle_file):
with open(subtitle_file, 'r') as f:
print(f"[subtitles] File size: {len(f.read())} bytes")
# Burn subtitles and/or watermark
output_with_extras = tempfile.mktemp(suffix=".mp4")
success = burn_subtitles_and_watermark(final_video, output_with_extras, subtitle_file, watermark)
if success:
final_video = output_with_extras
return final_video, final_seed
with gr.Blocks(title="Element-8 Video", delete_cache=(3600, 7200)) as demo: # cleanup: check every 1h, delete files >2h old
gr.Markdown("# Element-8: Fast Video Generation with Frame Conditioning")
gr.Markdown(
"High quality video + audio generation with first and last frame conditioning. "
"Pre-distilled LTX model for fast inference. "
"[[code]](https://github.com/Lightricks/LTX-2)"
)
with gr.Row():
with gr.Column():
with gr.Row():
first_image = gr.Image(label="First Frame (Optional)", type="pil")
last_image = gr.Image(label="Last Frame (Optional)", type="pil")
prompt1 = gr.Textbox(
label="Prompt 1",
value="Make this image come alive with cinematic motion, smooth animation",
lines=2,
placeholder="First prompt (required)",
)
prompt2 = gr.Textbox(
label="Prompt 2 (Optional)",
value="",
lines=2,
placeholder="Second prompt (leave empty if not needed)",
)
prompt3 = gr.Textbox(
label="Prompt 3 (Optional)",
value="",
lines=2,
placeholder="Third prompt (leave empty if not needed)",
)
duration = gr.Slider(label="Duration (seconds)", minimum=1.0, maximum=10.0, value=3.0, step=0.1)
audio_track = gr.Audio(label="Audio Track (Optional) - loops clips to match duration", type="filepath", sources=["upload"])
generate_btn = gr.Button("Generate Video", variant="primary", size="lg")
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, value=10, step=1)
randomize_seed = gr.Checkbox(label="Randomize Seed", value=True)
with gr.Row():
width = gr.Number(label="Width", value=512, precision=0)
height = gr.Number(label="Height", value=320, precision=0)
with gr.Row():
enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False)
high_res = gr.Checkbox(label="High Resolution", value=False)
with gr.Row():
blur_amount = gr.Number(label="Blur (0=off, 36=heavy)", value=0, precision=0)
remove_music = gr.Checkbox(label="Remove Music", value=False)
with gr.Row():
add_subtitles = gr.Checkbox(label="Add Subtitles (Whisper)", value=False)
watermark = gr.File(label="Watermark PNG (full-video size, position in your editor)", file_types=[".png"])
negative_prompt = gr.Textbox(
label="Negative Prompt",
value=DEFAULT_NEGATIVE_PROMPT,
lines=3,
placeholder="What to avoid in the generated video...",
)
with gr.Column():
output_video = gr.Video(label="Generated Video", autoplay=True)
gr.Examples(
examples=[
[
None,
"pinkknit.jpg",
"The camera falls downward through darkness as if dropped into a tunnel. "
"As it slows, five friends wearing pink knitted hats and sunglasses lean "
"over and look down toward the camera with curious expressions. The lens "
"has a strong fisheye effect, creating a circular frame around them. They "
"crowd together closely, forming a symmetrical cluster while staring "
"directly into the lens.",
"",
"",
3.0,
False,
42,
True,
1024,
1024,
],
],
inputs=[
first_image, last_image, prompt1, prompt2, prompt3, duration,
enhance_prompt, seed, randomize_seed, height, width,
],
)
first_image.change(
fn=on_image_upload,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
last_image.change(
fn=on_image_upload,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
high_res.change(
fn=on_highres_toggle,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
generate_btn.click(
fn=full_generation_process,
inputs=[
first_image, last_image, prompt1, prompt2, prompt3, duration, enhance_prompt,
seed, randomize_seed, height, width, negative_prompt, blur_amount, remove_music,
add_subtitles, watermark, audio_track,
],
outputs=[output_video, seed],
)
css = """
.fillable{max-width: 1200px !important}
"""
if __name__ == "__main__":
demo.launch(theme=gr.themes.Citrus(), css=css)