| |
| |
| |
| |
|
|
| import os |
| import tempfile |
| import random |
| import torch |
| from functools import lru_cache |
| import gradio as gr |
| from diffusers import LTXConditionPipeline, LTXLatentUpsamplePipeline |
| from diffusers.utils import export_to_video, load_image, load_video |
|
|
| |
| MODEL_MAP = { |
| "13B (distilled)": "Lightricks/LTX-Video-0.9.8-13B-distilled", |
| "Latest": "Lightricks/LTX-Video", |
| } |
|
|
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| @lru_cache(maxsize=4) |
| def load_pipes(repo_id: str, torch_dtype_str: str = "bfloat16"): |
| dtype = getattr(torch, torch_dtype_str, torch.bfloat16) |
|
|
| pipe = LTXConditionPipeline.from_pretrained( |
| repo_id, |
| torch_dtype=dtype, |
| use_safetensors=True, |
| token=HF_TOKEN, |
| device_map="balanced", |
| offload_folder="./offload", |
| ) |
|
|
| up_id = repo_id.replace("LTX-Video-", "ltxv-spatial-upscaler-") |
| try: |
| up = LTXLatentUpsamplePipeline.from_pretrained( |
| up_id, |
| vae=pipe.vae, |
| torch_dtype=dtype, |
| use_safetensors=True, |
| token=HF_TOKEN, |
| device_map="balanced", |
| offload_folder="./offload", |
| ) |
| except Exception: |
| up = None |
| return pipe, up |
|
|
|
|
| def sanitize_size(h, w): |
| h, w = int(h), int(w) |
| h = max(64, min(1080, h)) |
| w = max(64, min(2048, w)) |
| return h, w |
|
|
|
|
| def generate(prompt, conditioning_file, height, width, num_frames, steps, seed, model_choice): |
| if not prompt: |
| return "", "Please enter a prompt." |
|
|
| repo_id = MODEL_MAP.get(model_choice, list(MODEL_MAP.values())[0]) |
| torch_dtype = "bfloat16" if DEVICE == "cuda" else "float32" |
|
|
| pipe, up = load_pipes(repo_id, torch_dtype_str=torch_dtype) |
|
|
| height, width = sanitize_size(height, width) |
| num_frames = int(num_frames) |
| steps = int(steps) |
|
|
| generator = torch.Generator(device=DEVICE).manual_seed(int(seed) if seed else random.randint(0, 2**31 - 1)) |
|
|
| conditions = [] |
| if conditioning_file is not None: |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(conditioning_file.name)[1]) |
| tmp.write(conditioning_file.read()) |
| tmp.flush() |
| tmp.close() |
| try: |
| img = load_image(tmp.name) |
| video_cond = export_to_video([img]) |
| video = load_video(video_cond) |
| except Exception: |
| video = load_video(tmp.name) |
| conditions.append((video, 0)) |
|
|
| from diffusers.pipelines.ltx.pipeline_ltx_condition import LTXVideoCondition |
| ltx_conditions = [] |
| for vid, frame_idx in conditions: |
| ltx_conditions.append(LTXVideoCondition(video=vid, frame_index=frame_idx)) |
|
|
| negative_prompt = "worst quality, inconsistent motion, blurry, jittery, distorted" |
|
|
| downscale = 2 / 3 |
| down_h, down_w = int(height * downscale), int(width * downscale) |
| latents = pipe( |
| conditions=ltx_conditions or None, |
| prompt=prompt, |
| negative_prompt=negative_prompt, |
| width=down_w, |
| height=down_h, |
| num_frames=num_frames, |
| num_inference_steps=steps, |
| generator=generator, |
| output_type="latent", |
| ).frames |
|
|
| if up is not None: |
| upscaled_latents = up(latents=latents, output_type="latent").frames |
| else: |
| upscaled_latents = latents |
|
|
| denoise_strength = 0.4 |
| final_frames = pipe( |
| conditions=ltx_conditions or None, |
| prompt=prompt, |
| negative_prompt=negative_prompt, |
| width=width, |
| height=height, |
| num_frames=num_frames, |
| denoise_strength=denoise_strength, |
| num_inference_steps=max(5, int(steps/3)), |
| latents=upscaled_latents, |
| decode_timestep=0.05, |
| image_cond_noise_scale=0.025, |
| generator=generator, |
| output_type="pil", |
| ).frames[0] |
|
|
| final_frames = [f.resize((width, height)) for f in final_frames] |
|
|
| out_path = os.path.join(tempfile.gettempdir(), f"ltx_out_{random.randint(0,999999)}.mp4") |
| export_to_video(final_frames, out_path, fps=24) |
|
|
| return out_path, "Done" |
|
|
|
|
| with gr.Blocks(title="LTX-Video — Image/Video → Video") as demo: |
| gr.Markdown("# LTX-Video (Lightricks) — improved memory Space\nUpload an image or a short video to condition on, write an English prompt and press Generate. GPU highly recommended.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| prompt = gr.Textbox(label="Prompt (English)", lines=4, placeholder="A cute penguin reads a book by the sea...") |
| conditioning = gr.File(label="Conditioning file (image or short video)") |
| model_choice = gr.Dropdown(list(MODEL_MAP.keys()), value=list(MODEL_MAP.keys())[0], label="Model variant") |
| with gr.Column(scale=1): |
| height = gr.Number(label="Height", value=480) |
| width = gr.Number(label="Width", value=832) |
| num_frames = gr.Number(label="Num frames", value=16) |
| steps = gr.Number(label="Inference steps", value=20) |
| seed = gr.Number(label="Seed (optional)", value=0) |
| generate_btn = gr.Button("Generate") |
|
|
| out_video = gr.Video(label="Generated video") |
| status = gr.Textbox(label="Status", interactive=False) |
|
|
| generate_btn.click(fn=generate, inputs=[prompt, conditioning, height, width, num_frames, steps, seed, model_choice], outputs=[out_video, status]) |
|
|
| if __name__ == "__main__": |
| os.makedirs("./offload", exist_ok=True) |
| demo.launch() |
|
|