| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| os.environ.setdefault("PYTORCH_ALLOC_CONF", "expandable_segments:True") |
| os.environ["TOKENIZERS_PARALLELISM"] = "true" |
|
|
| import shutil |
| import subprocess |
| import copy |
| import random |
| import tempfile |
| import warnings |
| import gc |
| import uuid |
| import time |
| from tqdm import tqdm |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| from torch.nn import functional as F |
| from PIL import Image |
|
|
| import gradio as gr |
| from diffusers import ( |
| FlowMatchEulerDiscreteScheduler, |
| SASolverScheduler, |
| DEISMultistepScheduler, |
| DPMSolverMultistepInverseScheduler, |
| UniPCMultistepScheduler, |
| DPMSolverMultistepScheduler, |
| DPMSolverSinglestepScheduler, |
| ) |
| from diffusers.pipelines.wan.pipeline_wan_i2v import WanImageToVideoPipeline |
| from diffusers.utils.export_utils import export_to_video |
|
|
| from torchao.quantization import ( |
| quantize_, |
| Float8DynamicActivationFloat8WeightConfig, |
| Int8WeightOnlyConfig, |
| ) |
|
|
| |
| import aoti |
|
|
| try: |
| import spaces |
| except Exception: |
| spaces = None |
|
|
| warnings.filterwarnings("ignore") |
|
|
| IS_ZERO_GPU = bool(os.getenv("SPACES_ZERO_GPU")) |
| CUDA_OK = torch.cuda.is_available() |
|
|
| |
| ENABLE_AOT = os.getenv("ENABLE_AOT", "0") == "1" |
|
|
| |
| def clear_vram(): |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
| def cuda_mem_str(): |
| if not torch.cuda.is_available(): |
| return "CUDA not available" |
| free, total = torch.cuda.mem_get_info() |
| return f"CUDA mem free={free/1e9:.2f}GB / total={total/1e9:.2f}GB" |
|
|
| |
| if IS_ZERO_GPU: |
| print("ZeroGPU detected: clearing zerogpu-offload scratch.") |
| subprocess.run("rm -rf /data-nvme/zerogpu-offload/*", env={}, shell=True) |
|
|
| |
| |
| |
| get_timestamp_js = """ |
| function() { |
| const video = document.querySelector('#generated-video video'); |
| if (video) return video.currentTime; |
| return 0; |
| } |
| """ |
|
|
| def extract_frame(video_path, timestamp): |
| if not video_path: |
| return None |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| return None |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| target_frame_num = int(float(timestamp) * fps) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| if target_frame_num >= total_frames: |
| target_frame_num = max(0, total_frames - 1) |
| cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_num) |
| ret, frame = cap.read() |
| cap.release() |
| if ret: |
| return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| return None |
|
|
| |
| |
| |
| if not os.path.exists("RIFEv4.26_0921.zip"): |
| print("Downloading RIFE Model...") |
| subprocess.run( |
| [ |
| "wget", "-q", |
| "https://huggingface.co/r3gm/RIFE/resolve/main/RIFEv4.26_0921.zip", |
| "-O", "RIFEv4.26_0921.zip", |
| ], |
| check=True, |
| ) |
| subprocess.run(["unzip", "-o", "RIFEv4.26_0921.zip"], check=True) |
|
|
| from train_log.RIFE_HDv3 import Model |
|
|
| rife_device = torch.device("cpu") |
| rife_model = Model() |
| rife_model.load_model("train_log", -1) |
| rife_model.eval() |
|
|
| def rife_to(device: torch.device): |
| global rife_device |
| if rife_device == device: |
| return |
| try: |
| rife_model.to(device) |
| except Exception: |
| if hasattr(rife_model, "flownet"): |
| rife_model.flownet = rife_model.flownet.to(device) |
| rife_device = device |
|
|
| @torch.no_grad() |
| def interpolate_bits(frames_np, multiplier=2, scale=1.0): |
| if isinstance(frames_np, list): |
| T = len(frames_np) |
| H, W, C = frames_np[0].shape |
| else: |
| T, H, W, C = frames_np.shape |
|
|
| if multiplier < 2: |
| return list(frames_np) if isinstance(frames_np, np.ndarray) else frames_np |
|
|
| n_interp = multiplier - 1 |
|
|
| tmp = max(128, int(128 / scale)) |
| ph = ((H - 1) // tmp + 1) * tmp |
| pw = ((W - 1) // tmp + 1) * tmp |
| padding = (0, pw - W, 0, ph - H) |
|
|
| interp_device = torch.device("cuda") if torch.cuda.is_available() and (not IS_ZERO_GPU) else torch.device("cpu") |
| try: |
| rife_to(interp_device) |
| if interp_device.type == "cuda" and hasattr(rife_model, "flownet"): |
| rife_model.flownet = rife_model.flownet.half() |
| except Exception: |
| interp_device = torch.device("cpu") |
| rife_to(interp_device) |
|
|
| def to_tensor(frame_np): |
| t = torch.from_numpy(frame_np).to(interp_device) |
| t = t.permute(2, 0, 1).unsqueeze(0) |
| return F.pad(t, padding).half() |
|
|
| def from_tensor(tensor): |
| t = tensor[0, :, :H, :W].permute(1, 2, 0) |
| return t.float().cpu().numpy() |
|
|
| def make_inference(I0, I1, n): |
| if rife_model.version >= 3.9: |
| return [rife_model.inference(I0, I1, (i + 1) / (n + 1), scale) for i in range(n)] |
| middle = rife_model.inference(I0, I1, scale) |
| if n == 1: |
| return [middle] |
| first_half = make_inference(I0, middle, n=n // 2) |
| second_half = make_inference(middle, I1, n=n // 2) |
| if n % 2: |
| return [*first_half, middle, *second_half] |
| return [*first_half, *second_half] |
|
|
| output_frames = [] |
| I1 = to_tensor(frames_np[0]) |
|
|
| try: |
| with tqdm(total=T - 1, desc="Interpolating", unit="frame") as pbar: |
| for i in range(T - 1): |
| I0 = I1 |
| output_frames.append(from_tensor(I0)) |
| I1 = to_tensor(frames_np[i + 1]) |
| mids = make_inference(I0, I1, n_interp) |
| for mid in mids: |
| output_frames.append(from_tensor(mid)) |
| pbar.update(1) |
| output_frames.append(from_tensor(I1)) |
| except torch.cuda.OutOfMemoryError: |
| print("RIFE CUDA OOM: falling back to CPU interpolation.") |
| clear_vram() |
| rife_to(torch.device("cpu")) |
| return interpolate_bits(frames_np, multiplier=multiplier, scale=scale) |
|
|
| rife_to(torch.device("cpu")) |
| clear_vram() |
| return output_frames |
|
|
| |
| |
| |
| CACHE_DIR = os.path.expanduser("~/.cache/huggingface/") |
|
|
| MAX_DIM = 832 |
| MIN_DIM = 480 |
| SQUARE_DIM = 640 |
| MULTIPLE_OF = 16 |
| MAX_SEED = np.iinfo(np.int32).max |
|
|
| FIXED_FPS = 16 |
| MIN_FRAMES_MODEL = 8 |
| MAX_FRAMES_MODEL = 160 |
|
|
| MIN_DURATION = round(MIN_FRAMES_MODEL / FIXED_FPS, 1) |
| MAX_DURATION = round(MAX_FRAMES_MODEL / FIXED_FPS, 1) |
|
|
| SCHEDULER_MAP = { |
| "FlowMatchEulerDiscrete": FlowMatchEulerDiscreteScheduler, |
| "SASolver": SASolverScheduler, |
| "DEISMultistep": DEISMultistepScheduler, |
| "DPMSolverMultistepInverse": DPMSolverMultistepInverseScheduler, |
| "UniPCMultistep": UniPCMultistepScheduler, |
| "DPMSolverMultistep": DPMSolverMultistepScheduler, |
| "DPMSolverSinglestep": DPMSolverSinglestepScheduler, |
| } |
|
|
| MODEL_REPO = "TestOrganizationPleaseIgnore/WAMU_v2_WAN2.2_I2V_LIGHTNING" |
| AOT_PATH = "zerogpu-aoti/Wan2" |
| AOT_VARIANT = "fp8da" |
|
|
| CLEAR_HF_CACHE = os.getenv("CLEAR_HF_CACHE", "0") == "1" |
| if CLEAR_HF_CACHE and os.path.exists(CACHE_DIR): |
| shutil.rmtree(CACHE_DIR) |
| print("Deleted Hugging Face cache (CLEAR_HF_CACHE=1).") |
| else: |
| print("HF cache preserved.") |
|
|
| |
| |
| |
| torch.backends.cudnn.enabled = True |
| torch.backends.cudnn.benchmark = True |
|
|
| print("Loading pipeline (initial).") |
| pipe = WanImageToVideoPipeline.from_pretrained( |
| MODEL_REPO, |
| torch_dtype=torch.bfloat16, |
| ) |
|
|
| try: |
| pipe.set_progress_bar_config(disable=False) |
| except Exception: |
| pass |
|
|
| original_scheduler = copy.deepcopy(pipe.scheduler) |
|
|
| print("Quantizing...") |
| quantize_(pipe.transformer, Float8DynamicActivationFloat8WeightConfig()) |
| quantize_(pipe.transformer_2, Float8DynamicActivationFloat8WeightConfig()) |
|
|
| if IS_ZERO_GPU or (not torch.cuda.is_available()): |
| quantize_(pipe.text_encoder, Int8WeightOnlyConfig()) |
| else: |
| print("L40S: skipping text_encoder int8 quantization (keep bf16/fp16).") |
|
|
| def try_load_aot(): |
| if not ENABLE_AOT: |
| print("AOT disabled (ENABLE_AOT=0).") |
| return |
| try: |
| print("AOT enabled: loading aoti blocks...") |
| aoti.aoti_blocks_load(pipe.transformer, AOT_PATH, variant=AOT_VARIANT) |
| aoti.aoti_blocks_load(pipe.transformer_2, AOT_PATH, variant=AOT_VARIANT) |
| print("AOT load OK.") |
| except Exception as e: |
| print(f"AOT load failed -> continuing without AOT. Reason: {e}") |
|
|
| if torch.cuda.is_available(): |
| if IS_ZERO_GPU: |
| pipe.transformer.to("cuda") |
| pipe.transformer_2.to("cuda") |
| try_load_aot() |
|
|
| try: |
| pipe.text_encoder.to("cpu") |
| except Exception: |
| pass |
| try: |
| pipe.vae.to("cpu") |
| except Exception: |
| pass |
|
|
| print("ZeroGPU mode: transformers on CUDA; VAE/text encoder on CPU.") |
| else: |
| print("CUDA available (non-ZeroGPU): moving full pipeline to CUDA.") |
| pipe.to("cuda") |
|
|
| try: |
| pipe.vae.to(device="cuda", dtype=torch.float16) |
| except Exception as e: |
| print(f"Warning: could not set VAE fp16 on CUDA: {e}") |
|
|
| try: |
| pipe.text_encoder.to("cuda") |
| except Exception as e: |
| print(f"Warning: could not move text_encoder to CUDA: {e}") |
|
|
| try_load_aot() |
|
|
| print("Pipeline devices check:") |
| for name in ["text_encoder", "transformer", "transformer_2", "vae"]: |
| try: |
| mod = getattr(pipe, name) |
| dev = next(mod.parameters()).device |
| print(f" - {name}: {dev}") |
| except Exception: |
| pass |
| else: |
| print("CPU-only mode: everything on CPU.") |
| try_load_aot() |
|
|
| clear_vram() |
|
|
| default_prompt_i2v = "make this image come alive, cinematic motion, smooth animation" |
| default_negative_prompt = ( |
| "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, " |
| "丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, " |
| "杂乱的背景, 三条腿, 背景人很多, 倒着走" |
| ) |
|
|
| def resize_image(image: Image.Image) -> Image.Image: |
| width, height = image.size |
| if width == height: |
| return image.resize((SQUARE_DIM, SQUARE_DIM), Image.LANCZOS) |
|
|
| aspect_ratio = width / height |
| MAX_ASPECT_RATIO = MAX_DIM / MIN_DIM |
| MIN_ASPECT_RATIO = MIN_DIM / MAX_DIM |
|
|
| image_to_resize = image |
| if aspect_ratio > MAX_ASPECT_RATIO: |
| target_w, target_h = MAX_DIM, MIN_DIM |
| crop_width = int(round(height * MAX_ASPECT_RATIO)) |
| left = (width - crop_width) // 2 |
| image_to_resize = image.crop((left, 0, left + crop_width, height)) |
| elif aspect_ratio < MIN_ASPECT_RATIO: |
| target_w, target_h = MIN_DIM, MAX_DIM |
| crop_height = int(round(width / MIN_ASPECT_RATIO)) |
| top = (height - crop_height) // 2 |
| image_to_resize = image.crop((0, top, width, top + crop_height)) |
| else: |
| if width > height: |
| target_w = MAX_DIM |
| target_h = int(round(target_w / aspect_ratio)) |
| else: |
| target_h = MAX_DIM |
| target_w = int(round(target_h * aspect_ratio)) |
|
|
| final_w = round(target_w / MULTIPLE_OF) * MULTIPLE_OF |
| final_h = round(target_h / MULTIPLE_OF) * MULTIPLE_OF |
| final_w = max(MIN_DIM, min(MAX_DIM, final_w)) |
| final_h = max(MIN_DIM, min(MAX_DIM, final_h)) |
| return image_to_resize.resize((final_w, final_h), Image.LANCZOS) |
|
|
| def resize_and_crop_to_match(target_image, reference_image): |
| ref_width, ref_height = reference_image.size |
| target_width, target_height = target_image.size |
| scale = max(ref_width / target_width, ref_height / target_height) |
| new_width, new_height = int(target_width * scale), int(target_height * scale) |
| resized = target_image.resize((new_width, new_height), Image.Resampling.LANCZOS) |
| left, top = (new_width - ref_width) // 2, (new_height - ref_height) // 2 |
| return resized.crop((left, top, left + ref_width, top + ref_height)) |
|
|
| def get_num_frames(duration_seconds: float): |
| base = 1 + int(np.clip(int(round(duration_seconds * FIXED_FPS)), MIN_FRAMES_MODEL, MAX_FRAMES_MODEL)) |
| |
| if (base - 1) % 4 != 0: |
| base = base + (4 - ((base - 1) % 4)) |
| base = int(np.clip(base, MIN_FRAMES_MODEL, MAX_FRAMES_MODEL)) |
| if (base - 1) % 4 != 0: |
| base = base - ((base - 1) % 4) |
| base = int(np.clip(base, MIN_FRAMES_MODEL, MAX_FRAMES_MODEL)) |
| return base |
|
|
| def get_inference_duration( |
| resized_image, |
| processed_last_image, |
| prompt, |
| steps, |
| negative_prompt, |
| num_frames, |
| guidance_scale, |
| guidance_scale_2, |
| current_seed, |
| scheduler_name, |
| flow_shift, |
| frame_multiplier, |
| quality, |
| duration_seconds, |
| progress, |
| ): |
| BASE_FRAMES_HEIGHT_WIDTH = 81 * 832 * 624 |
| BASE_STEP_DURATION = 15 |
| width, height = resized_image.size |
| factor = num_frames * width * height / BASE_FRAMES_HEIGHT_WIDTH |
| step_duration = BASE_STEP_DURATION * factor**1.5 |
| gen_time = int(steps) * step_duration |
| if guidance_scale > 1: |
| gen_time *= 1.8 |
| frame_factor = frame_multiplier // FIXED_FPS |
| if frame_factor > 1: |
| total_out_frames = (num_frames * frame_factor) - num_frames |
| gen_time += total_out_frames * 0.02 |
| return 10 + gen_time |
|
|
| def maybe_gpu_decorator(fn): |
| if IS_ZERO_GPU and spaces is not None: |
| return spaces.GPU(duration=get_inference_duration)(fn) |
| return fn |
|
|
| def _make_generator_for(device: torch.device, seed: int): |
| try: |
| g = torch.Generator(device=device) |
| except Exception: |
| g = torch.Generator(device=str(device)) |
| return g.manual_seed(int(seed)) |
|
|
| def _strong_cleanup(): |
| try: |
| if torch.cuda.is_available(): |
| torch.cuda.synchronize() |
| except Exception: |
| pass |
| clear_vram() |
| gc.collect() |
|
|
| @maybe_gpu_decorator |
| def run_inference( |
| resized_image, |
| processed_last_image, |
| prompt, |
| steps, |
| negative_prompt, |
| num_frames, |
| guidance_scale, |
| guidance_scale_2, |
| current_seed, |
| scheduler_name, |
| flow_shift, |
| frame_multiplier, |
| quality, |
| duration_seconds, |
| progress=gr.Progress(track_tqdm=True), |
| ): |
| scheduler_class = SCHEDULER_MAP.get(scheduler_name) |
| if scheduler_class is None: |
| raise gr.Error(f"Unknown scheduler: {scheduler_name}") |
|
|
| if scheduler_class.__name__ != pipe.scheduler.config._class_name or flow_shift != pipe.scheduler.config.get("flow_shift", "shift"): |
| config = copy.deepcopy(original_scheduler.config) |
| if scheduler_class == FlowMatchEulerDiscreteScheduler: |
| config["shift"] = flow_shift |
| else: |
| config["flow_shift"] = flow_shift |
| pipe.scheduler = scheduler_class.from_config(config) |
|
|
| clear_vram() |
|
|
| task_name = str(uuid.uuid4())[:8] |
| print(f"Task: {task_name}, {duration_seconds}, {resized_image.size}, FM={frame_multiplier}") |
| if torch.cuda.is_available(): |
| print(f"[{task_name}] {cuda_mem_str()}") |
|
|
| if torch.cuda.is_available() and (not IS_ZERO_GPU): |
| exec_device = torch.device("cuda") |
| else: |
| try: |
| exec_device = pipe._execution_device |
| except Exception: |
| exec_device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| def _progress_cb(step_idx: int, total_steps: int): |
| try: |
| progress((step_idx + 1) / max(1, total_steps), desc=f"Diffusion {step_idx+1}/{total_steps}") |
| except Exception: |
| pass |
|
|
| def _call_pipe(generator, _steps, _frames, _img: Image.Image): |
| kwargs = dict( |
| image=_img, |
| last_image=processed_last_image, |
| prompt=prompt, |
| negative_prompt=negative_prompt, |
| height=_img.height, |
| width=_img.width, |
| num_frames=int(_frames), |
| guidance_scale=float(guidance_scale), |
| guidance_scale_2=float(guidance_scale_2), |
| num_inference_steps=int(_steps), |
| generator=generator, |
| output_type="np", |
| ) |
|
|
| try: |
| def _on_step_end(pipe_self, i, t, callback_kwargs): |
| _progress_cb(i, int(_steps)) |
| return callback_kwargs |
| kwargs["callback_on_step_end"] = _on_step_end |
| except Exception: |
| pass |
|
|
| if "callback_on_step_end" not in kwargs: |
| try: |
| kwargs["callback_steps"] = 1 |
| def _cb(i, t, latents): |
| _progress_cb(i, int(_steps)) |
| kwargs["callback"] = _cb |
| except Exception: |
| pass |
|
|
| print(f"[{task_name}] calling pipe() now... exec_device={exec_device} steps={_steps} frames={_frames} size=({_img.width},{_img.height})") |
| return pipe(**kwargs) |
|
|
| def _downshift(attempt: int, base_steps: int, base_frames: int, base_img: Image.Image): |
| steps_i = int(base_steps) |
| frames_i = int(base_frames) |
|
|
| drop = 24 * attempt |
| new_frames = max(MIN_FRAMES_MODEL, frames_i - drop) |
| if (new_frames - 1) % 4 != 0: |
| new_frames = new_frames - ((new_frames - 1) % 4) |
| new_frames = max(MIN_FRAMES_MODEL, new_frames) |
|
|
| new_img = base_img |
| if attempt >= 2: |
| scale = 0.88 ** (attempt - 1) |
| new_w = max(MIN_DIM, int((base_img.width * scale) // MULTIPLE_OF) * MULTIPLE_OF) |
| new_h = max(MIN_DIM, int((base_img.height * scale) // MULTIPLE_OF) * MULTIPLE_OF) |
| new_w = min(MAX_DIM, new_w) |
| new_h = min(MAX_DIM, new_h) |
| new_img = base_img.resize((new_w, new_h), Image.LANCZOS) |
|
|
| new_steps = max(2, steps_i - (3 * attempt)) |
| return new_steps, new_frames, new_img |
|
|
| try: |
| gen = _make_generator_for(exec_device, current_seed) |
| t0 = time.time() |
| result = _call_pipe(gen, int(steps), int(num_frames), resized_image) |
| print(f"[{task_name}] pipe() finished in {time.time()-t0:.1f}s") |
|
|
| except ValueError as e: |
| msg = str(e) |
| if "Cannot generate a cpu tensor from a generator of type cuda" in msg: |
| print(f"[{task_name}] Generator mismatch detected. Retrying with CPU generator.") |
| gen_cpu = _make_generator_for(torch.device("cpu"), current_seed) |
| t0 = time.time() |
| result = _call_pipe(gen_cpu, int(steps), int(num_frames), resized_image) |
| print(f"[{task_name}] pipe() finished in {time.time()-t0:.1f}s") |
| else: |
| print(f"[{task_name}] PIPE ERROR: {repr(e)}") |
| raise gr.Error(f"Pipe failed: {type(e).__name__}: {e}") |
|
|
| except NotImplementedError as e: |
| print(f"[{task_name}] NotImplementedError: {e}") |
| raise gr.Error( |
| "VAE/conv3d backend not supported on this device/dtype. " |
| "Ensure VAE is on CUDA fp16 (L40S) or run with GPU." |
| ) |
|
|
| except torch.cuda.OutOfMemoryError: |
| print(f"[{task_name}] CUDA OOM at base settings. Retrying on CUDA with downshift...") |
| _strong_cleanup() |
|
|
| last_err = None |
| for attempt in [1, 2, 3]: |
| try: |
| ds_steps, ds_frames, ds_img = _downshift(attempt, int(steps), int(num_frames), resized_image) |
| gen2 = _make_generator_for(exec_device, current_seed) |
| t0 = time.time() |
| result = _call_pipe(gen2, ds_steps, ds_frames, ds_img) |
| print(f"[{task_name}] retry#{attempt} OK in {time.time()-t0:.1f}s") |
| resized_image = ds_img |
| num_frames = ds_frames |
| steps = ds_steps |
| break |
| except torch.cuda.OutOfMemoryError as e2: |
| last_err = e2 |
| print(f"[{task_name}] retry#{attempt} still OOM -> cleaning and trying smaller...") |
| _strong_cleanup() |
| else: |
| raise gr.Error( |
| "CUDA OOM even after aggressive retries. " |
| "Try: shorter duration, smaller resolution, lower steps, or FPS=16." |
| ) from last_err |
|
|
| finally: |
| pipe.scheduler = original_scheduler |
|
|
| raw_frames_np = result.frames[0] |
|
|
| frame_factor = int(frame_multiplier) // FIXED_FPS |
| if frame_factor > 1: |
| final_frames = interpolate_bits(raw_frames_np, multiplier=int(frame_factor)) |
| else: |
| final_frames = list(raw_frames_np) |
|
|
| final_fps = FIXED_FPS * int(frame_factor) |
|
|
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: |
| video_path = tmpfile.name |
|
|
| with tqdm(total=3, desc="Rendering Media", unit="clip") as pbar: |
| pbar.update(2) |
| export_to_video(final_frames, video_path, fps=final_fps, quality=int(quality)) |
| pbar.update(1) |
|
|
| try: |
| del raw_frames_np |
| except Exception: |
| pass |
| try: |
| del final_frames |
| except Exception: |
| pass |
| try: |
| del result |
| except Exception: |
| pass |
|
|
| _strong_cleanup() |
| return video_path, task_name |
|
|
| def generate_video( |
| input_image, |
| last_image, |
| prompt, |
| steps=6, |
| negative_prompt=default_negative_prompt, |
| duration_seconds=3.5, |
| guidance_scale=1, |
| guidance_scale_2=1, |
| seed=42, |
| randomize_seed=False, |
| quality=6, |
| scheduler="UniPCMultistep", |
| flow_shift=3.0, |
| frame_multiplier=16, |
| video_component=True, |
| progress=gr.Progress(track_tqdm=True), |
| ): |
| if input_image is None: |
| raise gr.Error("Please upload an input image.") |
|
|
| num_frames = get_num_frames(duration_seconds) |
| current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) |
|
|
| resized_image = resize_image(input_image) |
|
|
| processed_last_image = None |
| if last_image: |
| processed_last_image = resize_and_crop_to_match(last_image, resized_image) |
|
|
| video_path, task_n = run_inference( |
| resized_image, |
| processed_last_image, |
| prompt, |
| steps, |
| negative_prompt, |
| num_frames, |
| guidance_scale, |
| guidance_scale_2, |
| current_seed, |
| scheduler, |
| flow_shift, |
| frame_multiplier, |
| quality, |
| duration_seconds, |
| progress, |
| ) |
| print(f"Done: {task_n}") |
| return (video_path if video_component else None), video_path, current_seed |
|
|
| CSS = """ |
| #hidden-timestamp { |
| opacity: 0; |
| height: 0px; |
| width: 0px; |
| margin: 0px; |
| padding: 0px; |
| overflow: hidden; |
| position: absolute; |
| pointer-events: none; |
| } |
| """ |
|
|
| with gr.Blocks(theme=gr.themes.Soft(), css=CSS, delete_cache=(3600, 10800)) as demo: |
| gr.Markdown("## WAMU V2 - Wan 2.2 I2V (14B)") |
| gr.Markdown("Stable build (AOT OFF by default). Enable with ENABLE_AOT=1.") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| input_image_component = gr.Image(type="pil", label="Input Image", sources=["upload", "clipboard"]) |
| prompt_input = gr.Textbox(label="Prompt", value=default_prompt_i2v) |
| duration_seconds_input = gr.Slider( |
| minimum=MIN_DURATION, |
| maximum=MAX_DURATION, |
| step=0.1, |
| value=3.5, |
| label="Duration (seconds)", |
| info=f"Frames are clamped to {MIN_FRAMES_MODEL}-{MAX_FRAMES_MODEL} at {FIXED_FPS}fps and aligned to (frames-1)%4==0.", |
| ) |
| frame_multi = gr.Dropdown( |
| choices=[FIXED_FPS, FIXED_FPS * 2, FIXED_FPS * 4], |
| value=FIXED_FPS, |
| label="Video Fluidity (Frames per Second)", |
| info="Extra frames generated via interpolation.", |
| ) |
|
|
| with gr.Accordion("Advanced Settings", open=False): |
| last_image_component = gr.Image(type="pil", label="Last Image (Optional)", sources=["upload", "clipboard"]) |
| negative_prompt_input = gr.Textbox( |
| label="Negative Prompt", |
| value=default_negative_prompt, |
| info="Used if any Guidance Scale > 1.", |
| lines=3, |
| ) |
| quality_slider = gr.Slider(minimum=1, maximum=10, step=1, value=6, label="Video Quality") |
| seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, interactive=True) |
| randomize_seed_checkbox = gr.Checkbox(label="Randomize seed", value=True, interactive=True) |
| steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=6, label="Inference Steps") |
| guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale") |
| guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale 2") |
| scheduler_dropdown = gr.Dropdown(label="Scheduler", choices=list(SCHEDULER_MAP.keys()), value="UniPCMultistep") |
| flow_shift_slider = gr.Slider(minimum=0.5, maximum=15.0, step=0.1, value=3.0, label="Flow Shift") |
| play_result_video = gr.Checkbox(label="Display result", value=True, interactive=True) |
|
|
| generate_button = gr.Button("Generate Video", variant="primary") |
|
|
| with gr.Column(): |
| video_output = gr.Video( |
| label="Generated Video", |
| autoplay=True, |
| sources=["upload"], |
| show_download_button=True, |
| show_share_button=True, |
| interactive=False, |
| elem_id="generated-video", |
| ) |
|
|
| with gr.Row(): |
| grab_frame_btn = gr.Button("📸 Use Current Frame as Input", variant="secondary") |
| timestamp_box = gr.Number(value=0, label="Timestamp", visible=True, elem_id="hidden-timestamp") |
|
|
| file_output = gr.File(label="Download Video") |
|
|
| ui_inputs = [ |
| input_image_component, |
| last_image_component, |
| prompt_input, |
| steps_slider, |
| negative_prompt_input, |
| duration_seconds_input, |
| guidance_scale_input, |
| guidance_scale_2_input, |
| seed_input, |
| randomize_seed_checkbox, |
| quality_slider, |
| scheduler_dropdown, |
| flow_shift_slider, |
| frame_multi, |
| play_result_video, |
| ] |
|
|
| def _disable_btn(): |
| return gr.update(interactive=False) |
|
|
| def _enable_btn(): |
| return gr.update(interactive=True) |
|
|
| |
| generate_button.click(_disable_btn, inputs=None, outputs=generate_button, queue=False) |
| evt = generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output, file_output, seed_input], queue=True) |
| evt.then(_enable_btn, inputs=None, outputs=generate_button, queue=False) |
|
|
| grab_frame_btn.click(fn=None, inputs=None, outputs=[timestamp_box], js=get_timestamp_js) |
| timestamp_box.change(fn=extract_frame, inputs=[video_output, timestamp_box], outputs=[input_image_component]) |
|
|
| if __name__ == "__main__": |
| |
| demo.queue().launch( |
| mcp_server=True, |
| ssr_mode=False, |
| show_error=True, |
| ) |
|
|