Tipsoft72's picture
Update app.py
415a153 verified
# app.py (L40S-friendly, ZeroGPU-compatible)
# FINAL "anti-cuelgue" build (Gradio-compatible):
# - AOT OFF por defecto (ENABLE_AOT=1 para activarlo)
# - En L40S: pipeline completo en CUDA + VAE fp16 en CUDA
# - En ZeroGPU: estrategia conservadora (transformers CUDA) + VAE/text_encoder CPU
# - Fix OOM: retries en CUDA bajando FRAMES -> RES -> STEPS. NO fallback CPU con fp16/bf16.
# - UI anti-duplicado: deshabilita Generate mientras corre
# - queue() sin kwargs (compatibilidad gradio vieja/nueva)
import os
os.environ.setdefault("PYTORCH_ALLOC_CONF", "expandable_segments:True")
os.environ["TOKENIZERS_PARALLELISM"] = "true"
import shutil
import subprocess
import copy
import random
import tempfile
import warnings
import gc
import uuid
import time
from tqdm import tqdm
import cv2
import numpy as np
import torch
from torch.nn import functional as F
from PIL import Image
import gradio as gr
from diffusers import (
FlowMatchEulerDiscreteScheduler,
SASolverScheduler,
DEISMultistepScheduler,
DPMSolverMultistepInverseScheduler,
UniPCMultistepScheduler,
DPMSolverMultistepScheduler,
DPMSolverSinglestepScheduler,
)
from diffusers.pipelines.wan.pipeline_wan_i2v import WanImageToVideoPipeline
from diffusers.utils.export_utils import export_to_video
from torchao.quantization import (
quantize_,
Float8DynamicActivationFloat8WeightConfig,
Int8WeightOnlyConfig,
)
# AOT optional
import aoti
try:
import spaces
except Exception:
spaces = None
warnings.filterwarnings("ignore")
IS_ZERO_GPU = bool(os.getenv("SPACES_ZERO_GPU"))
CUDA_OK = torch.cuda.is_available()
# AOT control: OFF por defecto para evitar inestabilidad/cuelgues.
ENABLE_AOT = os.getenv("ENABLE_AOT", "0") == "1"
# Helpers CUDA
def clear_vram():
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def cuda_mem_str():
if not torch.cuda.is_available():
return "CUDA not available"
free, total = torch.cuda.mem_get_info()
return f"CUDA mem free={free/1e9:.2f}GB / total={total/1e9:.2f}GB"
# ZeroGPU scratch cleanup (opcional)
if IS_ZERO_GPU:
print("ZeroGPU detected: clearing zerogpu-offload scratch.")
subprocess.run("rm -rf /data-nvme/zerogpu-offload/*", env={}, shell=True)
# -----------------------
# Frame extraction (JS + Python)
# -----------------------
get_timestamp_js = """
function() {
const video = document.querySelector('#generated-video video');
if (video) return video.currentTime;
return 0;
}
"""
def extract_frame(video_path, timestamp):
if not video_path:
return None
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
fps = cap.get(cv2.CAP_PROP_FPS)
target_frame_num = int(float(timestamp) * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if target_frame_num >= total_frames:
target_frame_num = max(0, total_frames - 1)
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_num)
ret, frame = cap.read()
cap.release()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return None
# -----------------------
# RIFE (Frame Interpolation)
# -----------------------
if not os.path.exists("RIFEv4.26_0921.zip"):
print("Downloading RIFE Model...")
subprocess.run(
[
"wget", "-q",
"https://huggingface.co/r3gm/RIFE/resolve/main/RIFEv4.26_0921.zip",
"-O", "RIFEv4.26_0921.zip",
],
check=True,
)
subprocess.run(["unzip", "-o", "RIFEv4.26_0921.zip"], check=True)
from train_log.RIFE_HDv3 import Model # noqa: E402
rife_device = torch.device("cpu")
rife_model = Model()
rife_model.load_model("train_log", -1)
rife_model.eval()
def rife_to(device: torch.device):
global rife_device
if rife_device == device:
return
try:
rife_model.to(device)
except Exception:
if hasattr(rife_model, "flownet"):
rife_model.flownet = rife_model.flownet.to(device)
rife_device = device
@torch.no_grad()
def interpolate_bits(frames_np, multiplier=2, scale=1.0):
if isinstance(frames_np, list):
T = len(frames_np)
H, W, C = frames_np[0].shape
else:
T, H, W, C = frames_np.shape
if multiplier < 2:
return list(frames_np) if isinstance(frames_np, np.ndarray) else frames_np
n_interp = multiplier - 1
tmp = max(128, int(128 / scale))
ph = ((H - 1) // tmp + 1) * tmp
pw = ((W - 1) // tmp + 1) * tmp
padding = (0, pw - W, 0, ph - H)
interp_device = torch.device("cuda") if torch.cuda.is_available() and (not IS_ZERO_GPU) else torch.device("cpu")
try:
rife_to(interp_device)
if interp_device.type == "cuda" and hasattr(rife_model, "flownet"):
rife_model.flownet = rife_model.flownet.half()
except Exception:
interp_device = torch.device("cpu")
rife_to(interp_device)
def to_tensor(frame_np):
t = torch.from_numpy(frame_np).to(interp_device)
t = t.permute(2, 0, 1).unsqueeze(0)
return F.pad(t, padding).half()
def from_tensor(tensor):
t = tensor[0, :, :H, :W].permute(1, 2, 0)
return t.float().cpu().numpy()
def make_inference(I0, I1, n):
if rife_model.version >= 3.9:
return [rife_model.inference(I0, I1, (i + 1) / (n + 1), scale) for i in range(n)]
middle = rife_model.inference(I0, I1, scale)
if n == 1:
return [middle]
first_half = make_inference(I0, middle, n=n // 2)
second_half = make_inference(middle, I1, n=n // 2)
if n % 2:
return [*first_half, middle, *second_half]
return [*first_half, *second_half]
output_frames = []
I1 = to_tensor(frames_np[0])
try:
with tqdm(total=T - 1, desc="Interpolating", unit="frame") as pbar:
for i in range(T - 1):
I0 = I1
output_frames.append(from_tensor(I0))
I1 = to_tensor(frames_np[i + 1])
mids = make_inference(I0, I1, n_interp)
for mid in mids:
output_frames.append(from_tensor(mid))
pbar.update(1)
output_frames.append(from_tensor(I1))
except torch.cuda.OutOfMemoryError:
print("RIFE CUDA OOM: falling back to CPU interpolation.")
clear_vram()
rife_to(torch.device("cpu"))
return interpolate_bits(frames_np, multiplier=multiplier, scale=scale)
rife_to(torch.device("cpu"))
clear_vram()
return output_frames
# -----------------------
# WAN (I2V)
# -----------------------
CACHE_DIR = os.path.expanduser("~/.cache/huggingface/")
MAX_DIM = 832
MIN_DIM = 480
SQUARE_DIM = 640
MULTIPLE_OF = 16
MAX_SEED = np.iinfo(np.int32).max
FIXED_FPS = 16
MIN_FRAMES_MODEL = 8
MAX_FRAMES_MODEL = 160
MIN_DURATION = round(MIN_FRAMES_MODEL / FIXED_FPS, 1)
MAX_DURATION = round(MAX_FRAMES_MODEL / FIXED_FPS, 1)
SCHEDULER_MAP = {
"FlowMatchEulerDiscrete": FlowMatchEulerDiscreteScheduler,
"SASolver": SASolverScheduler,
"DEISMultistep": DEISMultistepScheduler,
"DPMSolverMultistepInverse": DPMSolverMultistepInverseScheduler,
"UniPCMultistep": UniPCMultistepScheduler,
"DPMSolverMultistep": DPMSolverMultistepScheduler,
"DPMSolverSinglestep": DPMSolverSinglestepScheduler,
}
MODEL_REPO = "TestOrganizationPleaseIgnore/WAMU_v2_WAN2.2_I2V_LIGHTNING"
AOT_PATH = "zerogpu-aoti/Wan2"
AOT_VARIANT = "fp8da"
CLEAR_HF_CACHE = os.getenv("CLEAR_HF_CACHE", "0") == "1"
if CLEAR_HF_CACHE and os.path.exists(CACHE_DIR):
shutil.rmtree(CACHE_DIR)
print("Deleted Hugging Face cache (CLEAR_HF_CACHE=1).")
else:
print("HF cache preserved.")
# -----------------------
# PIPE SETUP
# -----------------------
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
print("Loading pipeline (initial).")
pipe = WanImageToVideoPipeline.from_pretrained(
MODEL_REPO,
torch_dtype=torch.bfloat16,
)
try:
pipe.set_progress_bar_config(disable=False)
except Exception:
pass
original_scheduler = copy.deepcopy(pipe.scheduler)
print("Quantizing...")
quantize_(pipe.transformer, Float8DynamicActivationFloat8WeightConfig())
quantize_(pipe.transformer_2, Float8DynamicActivationFloat8WeightConfig())
if IS_ZERO_GPU or (not torch.cuda.is_available()):
quantize_(pipe.text_encoder, Int8WeightOnlyConfig())
else:
print("L40S: skipping text_encoder int8 quantization (keep bf16/fp16).")
def try_load_aot():
if not ENABLE_AOT:
print("AOT disabled (ENABLE_AOT=0).")
return
try:
print("AOT enabled: loading aoti blocks...")
aoti.aoti_blocks_load(pipe.transformer, AOT_PATH, variant=AOT_VARIANT)
aoti.aoti_blocks_load(pipe.transformer_2, AOT_PATH, variant=AOT_VARIANT)
print("AOT load OK.")
except Exception as e:
print(f"AOT load failed -> continuing without AOT. Reason: {e}")
if torch.cuda.is_available():
if IS_ZERO_GPU:
pipe.transformer.to("cuda")
pipe.transformer_2.to("cuda")
try_load_aot()
try:
pipe.text_encoder.to("cpu")
except Exception:
pass
try:
pipe.vae.to("cpu")
except Exception:
pass
print("ZeroGPU mode: transformers on CUDA; VAE/text encoder on CPU.")
else:
print("CUDA available (non-ZeroGPU): moving full pipeline to CUDA.")
pipe.to("cuda")
try:
pipe.vae.to(device="cuda", dtype=torch.float16)
except Exception as e:
print(f"Warning: could not set VAE fp16 on CUDA: {e}")
try:
pipe.text_encoder.to("cuda")
except Exception as e:
print(f"Warning: could not move text_encoder to CUDA: {e}")
try_load_aot()
print("Pipeline devices check:")
for name in ["text_encoder", "transformer", "transformer_2", "vae"]:
try:
mod = getattr(pipe, name)
dev = next(mod.parameters()).device
print(f" - {name}: {dev}")
except Exception:
pass
else:
print("CPU-only mode: everything on CPU.")
try_load_aot()
clear_vram()
default_prompt_i2v = "make this image come alive, cinematic motion, smooth animation"
default_negative_prompt = (
"色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, "
"丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, "
"杂乱的背景, 三条腿, 背景人很多, 倒着走"
)
def resize_image(image: Image.Image) -> Image.Image:
width, height = image.size
if width == height:
return image.resize((SQUARE_DIM, SQUARE_DIM), Image.LANCZOS)
aspect_ratio = width / height
MAX_ASPECT_RATIO = MAX_DIM / MIN_DIM
MIN_ASPECT_RATIO = MIN_DIM / MAX_DIM
image_to_resize = image
if aspect_ratio > MAX_ASPECT_RATIO:
target_w, target_h = MAX_DIM, MIN_DIM
crop_width = int(round(height * MAX_ASPECT_RATIO))
left = (width - crop_width) // 2
image_to_resize = image.crop((left, 0, left + crop_width, height))
elif aspect_ratio < MIN_ASPECT_RATIO:
target_w, target_h = MIN_DIM, MAX_DIM
crop_height = int(round(width / MIN_ASPECT_RATIO))
top = (height - crop_height) // 2
image_to_resize = image.crop((0, top, width, top + crop_height))
else:
if width > height:
target_w = MAX_DIM
target_h = int(round(target_w / aspect_ratio))
else:
target_h = MAX_DIM
target_w = int(round(target_h * aspect_ratio))
final_w = round(target_w / MULTIPLE_OF) * MULTIPLE_OF
final_h = round(target_h / MULTIPLE_OF) * MULTIPLE_OF
final_w = max(MIN_DIM, min(MAX_DIM, final_w))
final_h = max(MIN_DIM, min(MAX_DIM, final_h))
return image_to_resize.resize((final_w, final_h), Image.LANCZOS)
def resize_and_crop_to_match(target_image, reference_image):
ref_width, ref_height = reference_image.size
target_width, target_height = target_image.size
scale = max(ref_width / target_width, ref_height / target_height)
new_width, new_height = int(target_width * scale), int(target_height * scale)
resized = target_image.resize((new_width, new_height), Image.Resampling.LANCZOS)
left, top = (new_width - ref_width) // 2, (new_height - ref_height) // 2
return resized.crop((left, top, left + ref_width, top + ref_height))
def get_num_frames(duration_seconds: float):
base = 1 + int(np.clip(int(round(duration_seconds * FIXED_FPS)), MIN_FRAMES_MODEL, MAX_FRAMES_MODEL))
# constraint: (frames-1)%4==0 => frames ≡ 1 (mod 4)
if (base - 1) % 4 != 0:
base = base + (4 - ((base - 1) % 4))
base = int(np.clip(base, MIN_FRAMES_MODEL, MAX_FRAMES_MODEL))
if (base - 1) % 4 != 0:
base = base - ((base - 1) % 4)
base = int(np.clip(base, MIN_FRAMES_MODEL, MAX_FRAMES_MODEL))
return base
def get_inference_duration(
resized_image,
processed_last_image,
prompt,
steps,
negative_prompt,
num_frames,
guidance_scale,
guidance_scale_2,
current_seed,
scheduler_name,
flow_shift,
frame_multiplier,
quality,
duration_seconds,
progress,
):
BASE_FRAMES_HEIGHT_WIDTH = 81 * 832 * 624
BASE_STEP_DURATION = 15
width, height = resized_image.size
factor = num_frames * width * height / BASE_FRAMES_HEIGHT_WIDTH
step_duration = BASE_STEP_DURATION * factor**1.5
gen_time = int(steps) * step_duration
if guidance_scale > 1:
gen_time *= 1.8
frame_factor = frame_multiplier // FIXED_FPS
if frame_factor > 1:
total_out_frames = (num_frames * frame_factor) - num_frames
gen_time += total_out_frames * 0.02
return 10 + gen_time
def maybe_gpu_decorator(fn):
if IS_ZERO_GPU and spaces is not None:
return spaces.GPU(duration=get_inference_duration)(fn)
return fn
def _make_generator_for(device: torch.device, seed: int):
try:
g = torch.Generator(device=device)
except Exception:
g = torch.Generator(device=str(device))
return g.manual_seed(int(seed))
def _strong_cleanup():
try:
if torch.cuda.is_available():
torch.cuda.synchronize()
except Exception:
pass
clear_vram()
gc.collect()
@maybe_gpu_decorator
def run_inference(
resized_image,
processed_last_image,
prompt,
steps,
negative_prompt,
num_frames,
guidance_scale,
guidance_scale_2,
current_seed,
scheduler_name,
flow_shift,
frame_multiplier,
quality,
duration_seconds,
progress=gr.Progress(track_tqdm=True),
):
scheduler_class = SCHEDULER_MAP.get(scheduler_name)
if scheduler_class is None:
raise gr.Error(f"Unknown scheduler: {scheduler_name}")
if scheduler_class.__name__ != pipe.scheduler.config._class_name or flow_shift != pipe.scheduler.config.get("flow_shift", "shift"):
config = copy.deepcopy(original_scheduler.config)
if scheduler_class == FlowMatchEulerDiscreteScheduler:
config["shift"] = flow_shift
else:
config["flow_shift"] = flow_shift
pipe.scheduler = scheduler_class.from_config(config)
clear_vram()
task_name = str(uuid.uuid4())[:8]
print(f"Task: {task_name}, {duration_seconds}, {resized_image.size}, FM={frame_multiplier}")
if torch.cuda.is_available():
print(f"[{task_name}] {cuda_mem_str()}")
if torch.cuda.is_available() and (not IS_ZERO_GPU):
exec_device = torch.device("cuda")
else:
try:
exec_device = pipe._execution_device
except Exception:
exec_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def _progress_cb(step_idx: int, total_steps: int):
try:
progress((step_idx + 1) / max(1, total_steps), desc=f"Diffusion {step_idx+1}/{total_steps}")
except Exception:
pass
def _call_pipe(generator, _steps, _frames, _img: Image.Image):
kwargs = dict(
image=_img,
last_image=processed_last_image,
prompt=prompt,
negative_prompt=negative_prompt,
height=_img.height,
width=_img.width,
num_frames=int(_frames),
guidance_scale=float(guidance_scale),
guidance_scale_2=float(guidance_scale_2),
num_inference_steps=int(_steps),
generator=generator,
output_type="np",
)
try:
def _on_step_end(pipe_self, i, t, callback_kwargs):
_progress_cb(i, int(_steps))
return callback_kwargs
kwargs["callback_on_step_end"] = _on_step_end
except Exception:
pass
if "callback_on_step_end" not in kwargs:
try:
kwargs["callback_steps"] = 1
def _cb(i, t, latents):
_progress_cb(i, int(_steps))
kwargs["callback"] = _cb
except Exception:
pass
print(f"[{task_name}] calling pipe() now... exec_device={exec_device} steps={_steps} frames={_frames} size=({_img.width},{_img.height})")
return pipe(**kwargs)
def _downshift(attempt: int, base_steps: int, base_frames: int, base_img: Image.Image):
steps_i = int(base_steps)
frames_i = int(base_frames)
drop = 24 * attempt
new_frames = max(MIN_FRAMES_MODEL, frames_i - drop)
if (new_frames - 1) % 4 != 0:
new_frames = new_frames - ((new_frames - 1) % 4)
new_frames = max(MIN_FRAMES_MODEL, new_frames)
new_img = base_img
if attempt >= 2:
scale = 0.88 ** (attempt - 1)
new_w = max(MIN_DIM, int((base_img.width * scale) // MULTIPLE_OF) * MULTIPLE_OF)
new_h = max(MIN_DIM, int((base_img.height * scale) // MULTIPLE_OF) * MULTIPLE_OF)
new_w = min(MAX_DIM, new_w)
new_h = min(MAX_DIM, new_h)
new_img = base_img.resize((new_w, new_h), Image.LANCZOS)
new_steps = max(2, steps_i - (3 * attempt))
return new_steps, new_frames, new_img
try:
gen = _make_generator_for(exec_device, current_seed)
t0 = time.time()
result = _call_pipe(gen, int(steps), int(num_frames), resized_image)
print(f"[{task_name}] pipe() finished in {time.time()-t0:.1f}s")
except ValueError as e:
msg = str(e)
if "Cannot generate a cpu tensor from a generator of type cuda" in msg:
print(f"[{task_name}] Generator mismatch detected. Retrying with CPU generator.")
gen_cpu = _make_generator_for(torch.device("cpu"), current_seed)
t0 = time.time()
result = _call_pipe(gen_cpu, int(steps), int(num_frames), resized_image)
print(f"[{task_name}] pipe() finished in {time.time()-t0:.1f}s")
else:
print(f"[{task_name}] PIPE ERROR: {repr(e)}")
raise gr.Error(f"Pipe failed: {type(e).__name__}: {e}")
except NotImplementedError as e:
print(f"[{task_name}] NotImplementedError: {e}")
raise gr.Error(
"VAE/conv3d backend not supported on this device/dtype. "
"Ensure VAE is on CUDA fp16 (L40S) or run with GPU."
)
except torch.cuda.OutOfMemoryError:
print(f"[{task_name}] CUDA OOM at base settings. Retrying on CUDA with downshift...")
_strong_cleanup()
last_err = None
for attempt in [1, 2, 3]:
try:
ds_steps, ds_frames, ds_img = _downshift(attempt, int(steps), int(num_frames), resized_image)
gen2 = _make_generator_for(exec_device, current_seed)
t0 = time.time()
result = _call_pipe(gen2, ds_steps, ds_frames, ds_img)
print(f"[{task_name}] retry#{attempt} OK in {time.time()-t0:.1f}s")
resized_image = ds_img
num_frames = ds_frames
steps = ds_steps
break
except torch.cuda.OutOfMemoryError as e2:
last_err = e2
print(f"[{task_name}] retry#{attempt} still OOM -> cleaning and trying smaller...")
_strong_cleanup()
else:
raise gr.Error(
"CUDA OOM even after aggressive retries. "
"Try: shorter duration, smaller resolution, lower steps, or FPS=16."
) from last_err
finally:
pipe.scheduler = original_scheduler
raw_frames_np = result.frames[0]
frame_factor = int(frame_multiplier) // FIXED_FPS
if frame_factor > 1:
final_frames = interpolate_bits(raw_frames_np, multiplier=int(frame_factor))
else:
final_frames = list(raw_frames_np)
final_fps = FIXED_FPS * int(frame_factor)
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile:
video_path = tmpfile.name
with tqdm(total=3, desc="Rendering Media", unit="clip") as pbar:
pbar.update(2)
export_to_video(final_frames, video_path, fps=final_fps, quality=int(quality))
pbar.update(1)
try:
del raw_frames_np
except Exception:
pass
try:
del final_frames
except Exception:
pass
try:
del result
except Exception:
pass
_strong_cleanup()
return video_path, task_name
def generate_video(
input_image,
last_image,
prompt,
steps=6,
negative_prompt=default_negative_prompt,
duration_seconds=3.5,
guidance_scale=1,
guidance_scale_2=1,
seed=42,
randomize_seed=False,
quality=6,
scheduler="UniPCMultistep",
flow_shift=3.0,
frame_multiplier=16,
video_component=True,
progress=gr.Progress(track_tqdm=True),
):
if input_image is None:
raise gr.Error("Please upload an input image.")
num_frames = get_num_frames(duration_seconds)
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed)
resized_image = resize_image(input_image)
processed_last_image = None
if last_image:
processed_last_image = resize_and_crop_to_match(last_image, resized_image)
video_path, task_n = run_inference(
resized_image,
processed_last_image,
prompt,
steps,
negative_prompt,
num_frames,
guidance_scale,
guidance_scale_2,
current_seed,
scheduler,
flow_shift,
frame_multiplier,
quality,
duration_seconds,
progress,
)
print(f"Done: {task_n}")
return (video_path if video_component else None), video_path, current_seed
CSS = """
#hidden-timestamp {
opacity: 0;
height: 0px;
width: 0px;
margin: 0px;
padding: 0px;
overflow: hidden;
position: absolute;
pointer-events: none;
}
"""
with gr.Blocks(theme=gr.themes.Soft(), css=CSS, delete_cache=(3600, 10800)) as demo:
gr.Markdown("## WAMU V2 - Wan 2.2 I2V (14B)")
gr.Markdown("Stable build (AOT OFF by default). Enable with ENABLE_AOT=1.")
with gr.Row():
with gr.Column():
input_image_component = gr.Image(type="pil", label="Input Image", sources=["upload", "clipboard"])
prompt_input = gr.Textbox(label="Prompt", value=default_prompt_i2v)
duration_seconds_input = gr.Slider(
minimum=MIN_DURATION,
maximum=MAX_DURATION,
step=0.1,
value=3.5,
label="Duration (seconds)",
info=f"Frames are clamped to {MIN_FRAMES_MODEL}-{MAX_FRAMES_MODEL} at {FIXED_FPS}fps and aligned to (frames-1)%4==0.",
)
frame_multi = gr.Dropdown(
choices=[FIXED_FPS, FIXED_FPS * 2, FIXED_FPS * 4],
value=FIXED_FPS,
label="Video Fluidity (Frames per Second)",
info="Extra frames generated via interpolation.",
)
with gr.Accordion("Advanced Settings", open=False):
last_image_component = gr.Image(type="pil", label="Last Image (Optional)", sources=["upload", "clipboard"])
negative_prompt_input = gr.Textbox(
label="Negative Prompt",
value=default_negative_prompt,
info="Used if any Guidance Scale > 1.",
lines=3,
)
quality_slider = gr.Slider(minimum=1, maximum=10, step=1, value=6, label="Video Quality")
seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, interactive=True)
randomize_seed_checkbox = gr.Checkbox(label="Randomize seed", value=True, interactive=True)
steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=6, label="Inference Steps")
guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale")
guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale 2")
scheduler_dropdown = gr.Dropdown(label="Scheduler", choices=list(SCHEDULER_MAP.keys()), value="UniPCMultistep")
flow_shift_slider = gr.Slider(minimum=0.5, maximum=15.0, step=0.1, value=3.0, label="Flow Shift")
play_result_video = gr.Checkbox(label="Display result", value=True, interactive=True)
generate_button = gr.Button("Generate Video", variant="primary")
with gr.Column():
video_output = gr.Video(
label="Generated Video",
autoplay=True,
sources=["upload"],
show_download_button=True,
show_share_button=True,
interactive=False,
elem_id="generated-video",
)
with gr.Row():
grab_frame_btn = gr.Button("📸 Use Current Frame as Input", variant="secondary")
timestamp_box = gr.Number(value=0, label="Timestamp", visible=True, elem_id="hidden-timestamp")
file_output = gr.File(label="Download Video")
ui_inputs = [
input_image_component,
last_image_component,
prompt_input,
steps_slider,
negative_prompt_input,
duration_seconds_input,
guidance_scale_input,
guidance_scale_2_input,
seed_input,
randomize_seed_checkbox,
quality_slider,
scheduler_dropdown,
flow_shift_slider,
frame_multi,
play_result_video,
]
def _disable_btn():
return gr.update(interactive=False)
def _enable_btn():
return gr.update(interactive=True)
# Deshabilita al click, re-habilita al terminar
generate_button.click(_disable_btn, inputs=None, outputs=generate_button, queue=False)
evt = generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output, file_output, seed_input], queue=True)
evt.then(_enable_btn, inputs=None, outputs=generate_button, queue=False)
grab_frame_btn.click(fn=None, inputs=None, outputs=[timestamp_box], js=get_timestamp_js)
timestamp_box.change(fn=extract_frame, inputs=[video_output, timestamp_box], outputs=[input_image_component])
if __name__ == "__main__":
# IMPORTANTE: queue() sin kwargs para evitar crash en gradio antiguo
demo.queue().launch(
mcp_server=True,
ssr_mode=False,
show_error=True,
)