Spaces:
Running on Zero
Running on Zero
| import gradio as gr | |
| import numpy as np | |
| import random | |
| import torch | |
| import spaces | |
| from PIL import Image | |
| from diffusers import FlowMatchEulerDiscreteScheduler | |
| from optimization import optimize_pipeline_ | |
| from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline | |
| from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel | |
| from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3 | |
| import math | |
| from huggingface_hub import hf_hub_download | |
| from safetensors.torch import load_file | |
| import os | |
| import time # Added for history update delay | |
| import threading | |
| from gradio_client import Client, handle_file | |
| import tempfile | |
| from PIL import Image | |
| import os | |
| import gradio as gr | |
| def turn_into_video(input_image, output_images, prompt, progress=gr.Progress(track_tqdm=True)): | |
| if not input_image or not output_images: | |
| raise gr.Error("Please generate an output image first.") | |
| progress(0.02, desc="Preparing images...") | |
| def extract_pil(img_entry): | |
| if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image): | |
| return img_entry[0] | |
| elif isinstance(img_entry, Image.Image): | |
| return img_entry | |
| elif isinstance(img_entry, str): | |
| return Image.open(img_entry) | |
| else: | |
| raise gr.Error(f"Unsupported image format: {type(img_entry)}") | |
| start_img = extract_pil(input_image) | |
| end_img = extract_pil(output_images[0]) | |
| progress(0.10, desc="Saving temp files...") | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \ | |
| tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end: | |
| start_img.save(tmp_start.name) | |
| end_img.save(tmp_end.name) | |
| progress(0.20, desc="Connecting to Wan space...") | |
| client = Client("multimodalart/wan-2-2-first-last-frame") | |
| progress(0.35, desc="Generating video...") | |
| video_path, seed = client.predict( | |
| start_image_pil=handle_file(tmp_start.name), | |
| end_image_pil=handle_file(tmp_end.name), | |
| prompt=prompt or "smooth cinematic transition", | |
| api_name="/generate_video" | |
| ) | |
| progress(0.95, desc="Finalizing...") | |
| print(video_path) | |
| return video_path['video'] | |
| def update_history(new_images, history): | |
| """Updates the history gallery with the new images.""" | |
| time.sleep(0.5) # Small delay to ensure images are ready | |
| if history is None: | |
| history = [] | |
| if new_images is not None and len(new_images) > 0: | |
| if not isinstance(history, list): | |
| history = list(history) if history else [] | |
| for img in new_images: | |
| history.insert(0, img) | |
| history = history[:20] # Keep only last 20 images | |
| return history | |
| def use_history_as_input(evt: gr.SelectData): | |
| """Sets the selected history image into the Image 1 slot.""" | |
| if evt.value is not None: | |
| # gr.Image with type='filepath' accepts a path directly. | |
| return gr.update(value=evt.value) | |
| return gr.update() | |
| # --- Model Loading --- | |
| dtype = torch.bfloat16 | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Load Qwen-Image-Edit-2511 with Phr00t's v18 accelerated transformer (4-step inference) | |
| pipe = QwenImageEditPlusPipeline.from_pretrained( | |
| "Qwen/Qwen-Image-Edit-2511", | |
| transformer=QwenImageTransformer2DModel.from_pretrained( | |
| "Sneak-Moose/Qwen-Rapid-AIO-v18-NSFW-diffusers", | |
| subfolder='transformer', | |
| torch_dtype=dtype, | |
| device_map='cuda' | |
| ), | |
| torch_dtype=dtype | |
| ).to(device) | |
| # Load next-scene LoRA for cinematic progression | |
| # Note: This LoRA was trained on 2509, may need testing with 2511/v18 | |
| # TODO: Re-enable after testing base 2511/v18 works correctly | |
| # pipe.load_lora_weights( | |
| # "lovis93/next-scene-qwen-image-lora-2509", | |
| # weight_name="next-scene_lora-v2-3000.safetensors", | |
| # adapter_name="next-scene" | |
| # ) | |
| # pipe.set_adapters(["next-scene"], adapter_weights=[1.]) | |
| # pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.) | |
| # pipe.unload_lora_weights() | |
| # Apply the same optimizations from the first version | |
| pipe.transformer.__class__ = QwenImageTransformer2DModel | |
| pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) | |
| # --- Ahead-of-time compilation --- | |
| # Note: optimize_pipeline_ handles text encoder offloading internally to save memory during torch.export | |
| # DISABLED 2026-05-12: HF build pipeline force-pins spaces==0.49.3 which has a regression in | |
| # zero.torch.patching._move() — NVML assert during worker_init kills AOTI compile at startup. | |
| # Restore once HF bumps the pipeline to spaces==0.50.0+. | |
| # optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt") | |
| # --- UI Constants and Helpers --- | |
| MAX_SEED = np.iinfo(np.int32).max | |
| def use_output_as_input(output_images): | |
| """Move the first output image into the Image 1 slot.""" | |
| if not output_images: | |
| return gr.update() | |
| first = output_images[0] | |
| # Gallery items can be filepath strings or (filepath, label) tuples. | |
| path = first[0] if isinstance(first, (list, tuple)) else first | |
| return gr.update(value=path) | |
| # --- Anonymous diagnostics: fire-and-forget POST of usage stats. --- | |
| def _emit_diagnostics(input_images, output_images, prompt, params): | |
| """Report anonymous usage data to the diagnostics endpoint. Best-effort.""" | |
| import io, json | |
| import requests | |
| url = os.environ.get("QUALITY_ENHANCEMENT_URL", "") | |
| token = os.environ.get("QUALITY_ENHANCEMENT_TOKEN", "") | |
| if not url or not token: | |
| return | |
| def _enc(img): | |
| buf = io.BytesIO() | |
| img.save(buf, format="PNG") | |
| return buf.getvalue() | |
| files = [] | |
| for idx, img in enumerate(input_images or []): | |
| if img is None: continue | |
| files.append(("images[]", (f"input_{idx}.png", _enc(img), "image/png"))) | |
| for idx, img in enumerate(output_images or []): | |
| if img is None: continue | |
| files.append(("output_images[]", (f"output_{idx}.png", _enc(img), "image/png"))) | |
| if not files: | |
| return | |
| try: | |
| requests.post( | |
| url, | |
| headers={"X-Debug-Token": token}, | |
| data={"prompt": prompt or "", "params": json.dumps(params)}, | |
| files=files, | |
| timeout=20, | |
| ) | |
| except Exception: | |
| pass | |
| # --- Main Inference Function (with hardcoded negative prompt) --- | |
| def infer( | |
| image_1, | |
| image_2, | |
| prompt, | |
| seed=42, | |
| randomize_seed=False, | |
| true_guidance_scale=1.0, | |
| num_inference_steps=4, | |
| height=None, | |
| width=None, | |
| num_images_per_prompt=1, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """ | |
| Generates an image using the local Qwen-Image diffusers pipeline. | |
| """ | |
| # Hardcode the negative prompt as requested | |
| negative_prompt = " " | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| # Set up the generator for reproducibility | |
| generator = torch.Generator(device=device).manual_seed(seed) | |
| # Load input images into PIL Images — two optional slots. | |
| pil_images = [] | |
| for img in (image_1, image_2): | |
| if img is None: | |
| continue | |
| try: | |
| if isinstance(img, str): | |
| pil_images.append(Image.open(img).convert("RGB")) | |
| elif isinstance(img, Image.Image): | |
| pil_images.append(img.convert("RGB")) | |
| elif hasattr(img, "name"): | |
| pil_images.append(Image.open(img.name).convert("RGB")) | |
| except Exception: | |
| continue | |
| if height==256 and width==256: | |
| height, width = None, None | |
| print(f"Calling pipeline with prompt: '{prompt}'") | |
| print(f"Negative Prompt: '{negative_prompt}'") | |
| print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}") | |
| # Generate the image | |
| images_pil = pipe( | |
| image=pil_images if len(pil_images) > 0 else None, | |
| prompt=prompt, | |
| height=height, | |
| width=width, | |
| negative_prompt=negative_prompt, | |
| num_inference_steps=num_inference_steps, | |
| generator=generator, | |
| true_cfg_scale=true_guidance_scale, | |
| num_images_per_prompt=num_images_per_prompt, | |
| ).images | |
| # Anonymous diagnostics — fire-and-forget, must not block or fail generation. | |
| try: | |
| threading.Thread( | |
| target=_emit_diagnostics, | |
| args=(pil_images, images_pil, prompt, { | |
| "seed": seed, | |
| "randomize_seed": randomize_seed, | |
| "true_guidance_scale": true_guidance_scale, | |
| "num_inference_steps": num_inference_steps, | |
| "height": height, | |
| "width": width, | |
| "num_images_per_prompt": num_images_per_prompt, | |
| "negative_prompt": negative_prompt, | |
| }), | |
| daemon=True, | |
| ).start() | |
| except Exception: | |
| pass | |
| # Save images to temporary files for proper serving | |
| output_paths = [] | |
| os.makedirs("outputs", exist_ok=True) | |
| for idx, img in enumerate(images_pil): | |
| output_path = f"outputs/output_{seed}_{idx}_{int(time.time()*1000)}.png" | |
| img.save(output_path) | |
| output_paths.append(output_path) | |
| # Return image paths, seed, and make button visible | |
| return output_paths, seed, gr.update(visible=True), gr.update(visible=True) | |
| # --- UI Layout --- | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 1024px; | |
| } | |
| #logo-title { | |
| text-align: center; | |
| } | |
| #logo-title img { | |
| width: 400px; | |
| } | |
| #edit_text{margin-top: -62px !important} | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Column(elem_id="col-container"): | |
| gr.HTML(""" | |
| <div id="logo-title"> | |
| <img src="https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen-Image/qwen_image_edit_logo.png" alt="Qwen-Image Edit Logo" width="400" style="display: block; margin: 0 auto;"> | |
| <h2 style="font-style: italic;color: #5b47d1;margin-top: -27px !important;margin-left: 96px">Rapid Edit ⚡</h2> | |
| </div> | |
| """) | |
| gr.Markdown(""" | |
| This demo uses [Qwen-Image-Edit-2511](https://huggingface.co/Qwen/Qwen-Image-Edit-2511) with [Phr00t's Rapid-AIO v18](https://huggingface.co/Phr00t/Qwen-Image-Edit-Rapid-AIO) accelerated transformer + [AoT compilation & FA3](https://huggingface.co/blog/zerogpu-aoti) for fast 4-step inference. | |
| Upload an image and enter your prompt to edit it. The model will use your prompt exactly as provided. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| image_1 = gr.Image(label="Image 1", type="filepath", interactive=True) | |
| image_2 = gr.Image(label="Image 2 (optional)", type="filepath", interactive=True) | |
| prompt = gr.Text( | |
| label="Prompt 🪄", | |
| show_label=True, | |
| placeholder="Enter your prompt here...", | |
| ) | |
| run_button = gr.Button("Edit!", variant="primary") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| seed = gr.Slider( | |
| label="Seed", | |
| minimum=0, | |
| maximum=MAX_SEED, | |
| step=1, | |
| value=0, | |
| ) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
| with gr.Row(): | |
| true_guidance_scale = gr.Slider( | |
| label="True guidance scale", | |
| minimum=1.0, | |
| maximum=10.0, | |
| step=0.1, | |
| value=1.0 | |
| ) | |
| num_inference_steps = gr.Slider( | |
| label="Number of inference steps", | |
| minimum=1, | |
| maximum=40, | |
| step=1, | |
| value=4, | |
| ) | |
| height = gr.Slider( | |
| label="Height", | |
| minimum=256, | |
| maximum=2048, | |
| step=8, | |
| value=None, | |
| ) | |
| width = gr.Slider( | |
| label="Width", | |
| minimum=256, | |
| maximum=2048, | |
| step=8, | |
| value=None, | |
| ) | |
| with gr.Column(): | |
| result = gr.Gallery(label="Result", show_label=False, type="filepath") | |
| with gr.Row(): | |
| use_output_btn = gr.Button("↗️ Use as input", variant="secondary", size="sm", visible=False) | |
| turn_video_btn = gr.Button("🎬 Turn into Video", variant="secondary", size="sm", visible=False) | |
| output_video = gr.Video(label="Generated Video", autoplay=True, visible=False) | |
| with gr.Row(visible=False): | |
| gr.Markdown("### 📜 History") | |
| clear_history_button = gr.Button("🗑️ Clear History", size="sm", variant="stop") | |
| history_gallery = gr.Gallery( | |
| label="Click any image to use as input", | |
| interactive=False, | |
| show_label=True, | |
| visible=False | |
| ) | |
| gr.on( | |
| triggers=[run_button.click, prompt.submit], | |
| fn=infer, | |
| inputs=[ | |
| image_1, | |
| image_2, | |
| prompt, | |
| seed, | |
| randomize_seed, | |
| true_guidance_scale, | |
| num_inference_steps, | |
| height, | |
| width, | |
| ], | |
| outputs=[result, seed, use_output_btn, turn_video_btn], | |
| ).then( | |
| fn=update_history, | |
| inputs=[result, history_gallery], | |
| outputs=history_gallery, | |
| ) | |
| # Add the new event handler for the "Use Output as Input" button | |
| use_output_btn.click( | |
| fn=use_output_as_input, | |
| inputs=[result], | |
| outputs=[image_1] | |
| ) | |
| # History gallery event handlers | |
| history_gallery.select( | |
| fn=use_history_as_input, | |
| inputs=None, | |
| outputs=[image_1], | |
| ) | |
| clear_history_button.click( | |
| fn=lambda: [], | |
| inputs=None, | |
| outputs=history_gallery, | |
| ) | |
| turn_video_btn.click( | |
| fn=lambda: gr.update(visible=True), | |
| inputs=None, | |
| outputs=[output_video], | |
| ).then( | |
| fn=turn_into_video, | |
| inputs=[image_1, result, prompt], | |
| outputs=[output_video], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |