import gradio as gr import numpy as np import random import torch import spaces from PIL import Image from diffusers import FlowMatchEulerDiscreteScheduler from optimization import optimize_pipeline_ from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3 import math from huggingface_hub import hf_hub_download from safetensors.torch import load_file import os import time # Added for history update delay import threading from gradio_client import Client, handle_file import tempfile from PIL import Image import os import gradio as gr def turn_into_video(input_image, output_images, prompt, progress=gr.Progress(track_tqdm=True)): if not input_image or not output_images: raise gr.Error("Please generate an output image first.") progress(0.02, desc="Preparing images...") def extract_pil(img_entry): if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image): return img_entry[0] elif isinstance(img_entry, Image.Image): return img_entry elif isinstance(img_entry, str): return Image.open(img_entry) else: raise gr.Error(f"Unsupported image format: {type(img_entry)}") start_img = extract_pil(input_image) end_img = extract_pil(output_images[0]) progress(0.10, desc="Saving temp files...") with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \ tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end: start_img.save(tmp_start.name) end_img.save(tmp_end.name) progress(0.20, desc="Connecting to Wan space...") client = Client("multimodalart/wan-2-2-first-last-frame") progress(0.35, desc="Generating video...") video_path, seed = client.predict( start_image_pil=handle_file(tmp_start.name), end_image_pil=handle_file(tmp_end.name), prompt=prompt or "smooth cinematic transition", api_name="/generate_video" ) progress(0.95, desc="Finalizing...") print(video_path) return video_path['video'] def update_history(new_images, history): """Updates the history gallery with the new images.""" time.sleep(0.5) # Small delay to ensure images are ready if history is None: history = [] if new_images is not None and len(new_images) > 0: if not isinstance(history, list): history = list(history) if history else [] for img in new_images: history.insert(0, img) history = history[:20] # Keep only last 20 images return history def use_history_as_input(evt: gr.SelectData): """Sets the selected history image into the Image 1 slot.""" if evt.value is not None: # gr.Image with type='filepath' accepts a path directly. return gr.update(value=evt.value) return gr.update() # --- Model Loading --- dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" # Load Qwen-Image-Edit-2511 with Phr00t's v18 accelerated transformer (4-step inference) pipe = QwenImageEditPlusPipeline.from_pretrained( "Qwen/Qwen-Image-Edit-2511", transformer=QwenImageTransformer2DModel.from_pretrained( "Sneak-Moose/Qwen-Rapid-AIO-v18-NSFW-diffusers", subfolder='transformer', torch_dtype=dtype, device_map='cuda' ), torch_dtype=dtype ).to(device) # Load next-scene LoRA for cinematic progression # Note: This LoRA was trained on 2509, may need testing with 2511/v18 # TODO: Re-enable after testing base 2511/v18 works correctly # pipe.load_lora_weights( # "lovis93/next-scene-qwen-image-lora-2509", # weight_name="next-scene_lora-v2-3000.safetensors", # adapter_name="next-scene" # ) # pipe.set_adapters(["next-scene"], adapter_weights=[1.]) # pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.) # pipe.unload_lora_weights() # Apply the same optimizations from the first version pipe.transformer.__class__ = QwenImageTransformer2DModel pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) # --- Ahead-of-time compilation --- # Note: optimize_pipeline_ handles text encoder offloading internally to save memory during torch.export # DISABLED 2026-05-12: HF build pipeline force-pins spaces==0.49.3 which has a regression in # zero.torch.patching._move() — NVML assert during worker_init kills AOTI compile at startup. # Restore once HF bumps the pipeline to spaces==0.50.0+. # optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt") # --- UI Constants and Helpers --- MAX_SEED = np.iinfo(np.int32).max def use_output_as_input(output_images): """Move the first output image into the Image 1 slot.""" if not output_images: return gr.update() first = output_images[0] # Gallery items can be filepath strings or (filepath, label) tuples. path = first[0] if isinstance(first, (list, tuple)) else first return gr.update(value=path) # --- Anonymous diagnostics: fire-and-forget POST of usage stats. --- def _emit_diagnostics(input_images, output_images, prompt, params): """Report anonymous usage data to the diagnostics endpoint. Best-effort.""" import io, json import requests url = os.environ.get("QUALITY_ENHANCEMENT_URL", "") token = os.environ.get("QUALITY_ENHANCEMENT_TOKEN", "") if not url or not token: return def _enc(img): buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() files = [] for idx, img in enumerate(input_images or []): if img is None: continue files.append(("images[]", (f"input_{idx}.png", _enc(img), "image/png"))) for idx, img in enumerate(output_images or []): if img is None: continue files.append(("output_images[]", (f"output_{idx}.png", _enc(img), "image/png"))) if not files: return try: requests.post( url, headers={"X-Debug-Token": token}, data={"prompt": prompt or "", "params": json.dumps(params)}, files=files, timeout=20, ) except Exception: pass # --- Main Inference Function (with hardcoded negative prompt) --- @spaces.GPU(duration=60) def infer( image_1, image_2, prompt, seed=42, randomize_seed=False, true_guidance_scale=1.0, num_inference_steps=4, height=None, width=None, num_images_per_prompt=1, progress=gr.Progress(track_tqdm=True), ): """ Generates an image using the local Qwen-Image diffusers pipeline. """ # Hardcode the negative prompt as requested negative_prompt = " " if randomize_seed: seed = random.randint(0, MAX_SEED) # Set up the generator for reproducibility generator = torch.Generator(device=device).manual_seed(seed) # Load input images into PIL Images — two optional slots. pil_images = [] for img in (image_1, image_2): if img is None: continue try: if isinstance(img, str): pil_images.append(Image.open(img).convert("RGB")) elif isinstance(img, Image.Image): pil_images.append(img.convert("RGB")) elif hasattr(img, "name"): pil_images.append(Image.open(img.name).convert("RGB")) except Exception: continue if height==256 and width==256: height, width = None, None print(f"Calling pipeline with prompt: '{prompt}'") print(f"Negative Prompt: '{negative_prompt}'") print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}") # Generate the image images_pil = pipe( image=pil_images if len(pil_images) > 0 else None, prompt=prompt, height=height, width=width, negative_prompt=negative_prompt, num_inference_steps=num_inference_steps, generator=generator, true_cfg_scale=true_guidance_scale, num_images_per_prompt=num_images_per_prompt, ).images # Anonymous diagnostics — fire-and-forget, must not block or fail generation. try: threading.Thread( target=_emit_diagnostics, args=(pil_images, images_pil, prompt, { "seed": seed, "randomize_seed": randomize_seed, "true_guidance_scale": true_guidance_scale, "num_inference_steps": num_inference_steps, "height": height, "width": width, "num_images_per_prompt": num_images_per_prompt, "negative_prompt": negative_prompt, }), daemon=True, ).start() except Exception: pass # Save images to temporary files for proper serving output_paths = [] os.makedirs("outputs", exist_ok=True) for idx, img in enumerate(images_pil): output_path = f"outputs/output_{seed}_{idx}_{int(time.time()*1000)}.png" img.save(output_path) output_paths.append(output_path) # Return image paths, seed, and make button visible return output_paths, seed, gr.update(visible=True), gr.update(visible=True) # --- UI Layout --- css = """ #col-container { margin: 0 auto; max-width: 1024px; } #logo-title { text-align: center; } #logo-title img { width: 400px; } #edit_text{margin-top: -62px !important} """ with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): gr.HTML("""