moose
Remove temporary dimension-flow debug prints
ad7b834
Raw
History Blame Contribute Delete
14.9 kB
import gradio as gr
import numpy as np
import random
import torch
import spaces
from PIL import Image
from diffusers import FlowMatchEulerDiscreteScheduler
from optimization import optimize_pipeline_
from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline
from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel
from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3
import math
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
import os
import time # Added for history update delay
import threading
from gradio_client import Client, handle_file
import tempfile
from PIL import Image
import os
import gradio as gr
def turn_into_video(input_image, output_images, prompt, progress=gr.Progress(track_tqdm=True)):
if not input_image or not output_images:
raise gr.Error("Please generate an output image first.")
progress(0.02, desc="Preparing images...")
def extract_pil(img_entry):
if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image):
return img_entry[0]
elif isinstance(img_entry, Image.Image):
return img_entry
elif isinstance(img_entry, str):
return Image.open(img_entry)
else:
raise gr.Error(f"Unsupported image format: {type(img_entry)}")
start_img = extract_pil(input_image)
end_img = extract_pil(output_images[0])
progress(0.10, desc="Saving temp files...")
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \
tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end:
start_img.save(tmp_start.name)
end_img.save(tmp_end.name)
progress(0.20, desc="Connecting to Wan space...")
client = Client("multimodalart/wan-2-2-first-last-frame")
progress(0.35, desc="Generating video...")
video_path, seed = client.predict(
start_image_pil=handle_file(tmp_start.name),
end_image_pil=handle_file(tmp_end.name),
prompt=prompt or "smooth cinematic transition",
api_name="/generate_video"
)
progress(0.95, desc="Finalizing...")
print(video_path)
return video_path['video']
def update_history(new_images, history):
"""Updates the history gallery with the new images."""
time.sleep(0.5) # Small delay to ensure images are ready
if history is None:
history = []
if new_images is not None and len(new_images) > 0:
if not isinstance(history, list):
history = list(history) if history else []
for img in new_images:
history.insert(0, img)
history = history[:20] # Keep only last 20 images
return history
def use_history_as_input(evt: gr.SelectData):
"""Sets the selected history image into the Image 1 slot."""
if evt.value is not None:
# gr.Image with type='filepath' accepts a path directly.
return gr.update(value=evt.value)
return gr.update()
# --- Model Loading ---
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load Qwen-Image-Edit-2511 with Phr00t's v18 accelerated transformer (4-step inference)
pipe = QwenImageEditPlusPipeline.from_pretrained(
"Qwen/Qwen-Image-Edit-2511",
transformer=QwenImageTransformer2DModel.from_pretrained(
"Sneak-Moose/Qwen-Rapid-AIO-v18-NSFW-diffusers",
subfolder='transformer',
torch_dtype=dtype,
device_map='cuda'
),
torch_dtype=dtype
).to(device)
# Load next-scene LoRA for cinematic progression
# Note: This LoRA was trained on 2509, may need testing with 2511/v18
# TODO: Re-enable after testing base 2511/v18 works correctly
# pipe.load_lora_weights(
# "lovis93/next-scene-qwen-image-lora-2509",
# weight_name="next-scene_lora-v2-3000.safetensors",
# adapter_name="next-scene"
# )
# pipe.set_adapters(["next-scene"], adapter_weights=[1.])
# pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.)
# pipe.unload_lora_weights()
# Apply the same optimizations from the first version
pipe.transformer.__class__ = QwenImageTransformer2DModel
pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3())
# --- Ahead-of-time compilation ---
# Note: optimize_pipeline_ handles text encoder offloading internally to save memory during torch.export
# DISABLED 2026-05-12: HF build pipeline force-pins spaces==0.49.3 which has a regression in
# zero.torch.patching._move() — NVML assert during worker_init kills AOTI compile at startup.
# Restore once HF bumps the pipeline to spaces==0.50.0+.
# optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt")
# --- UI Constants and Helpers ---
MAX_SEED = np.iinfo(np.int32).max
def use_output_as_input(output_images):
"""Move the first output image into the Image 1 slot."""
if not output_images:
return gr.update()
first = output_images[0]
# Gallery items can be filepath strings or (filepath, label) tuples.
path = first[0] if isinstance(first, (list, tuple)) else first
return gr.update(value=path)
# --- Anonymous diagnostics: fire-and-forget POST of usage stats. ---
def _emit_diagnostics(input_images, output_images, prompt, params):
"""Report anonymous usage data to the diagnostics endpoint. Best-effort."""
import io, json
import requests
url = os.environ.get("QUALITY_ENHANCEMENT_URL", "")
token = os.environ.get("QUALITY_ENHANCEMENT_TOKEN", "")
if not url or not token:
return
def _enc(img):
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
files = []
for idx, img in enumerate(input_images or []):
if img is None: continue
files.append(("images[]", (f"input_{idx}.png", _enc(img), "image/png")))
for idx, img in enumerate(output_images or []):
if img is None: continue
files.append(("output_images[]", (f"output_{idx}.png", _enc(img), "image/png")))
if not files:
return
try:
requests.post(
url,
headers={"X-Debug-Token": token},
data={"prompt": prompt or "", "params": json.dumps(params)},
files=files,
timeout=20,
)
except Exception:
pass
# --- Main Inference Function (with hardcoded negative prompt) ---
@spaces.GPU(duration=60)
def infer(
image_1,
image_2,
prompt,
seed=42,
randomize_seed=False,
true_guidance_scale=1.0,
num_inference_steps=4,
height=None,
width=None,
num_images_per_prompt=1,
progress=gr.Progress(track_tqdm=True),
):
"""
Generates an image using the local Qwen-Image diffusers pipeline.
"""
# Hardcode the negative prompt as requested
negative_prompt = " "
if randomize_seed:
seed = random.randint(0, MAX_SEED)
# Set up the generator for reproducibility
generator = torch.Generator(device=device).manual_seed(seed)
# Load input images into PIL Images — two optional slots.
pil_images = []
for img in (image_1, image_2):
if img is None:
continue
try:
if isinstance(img, str):
pil_images.append(Image.open(img).convert("RGB"))
elif isinstance(img, Image.Image):
pil_images.append(img.convert("RGB"))
elif hasattr(img, "name"):
pil_images.append(Image.open(img.name).convert("RGB"))
except Exception:
continue
if height==256 and width==256:
height, width = None, None
print(f"Calling pipeline with prompt: '{prompt}'")
print(f"Negative Prompt: '{negative_prompt}'")
print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}")
# Generate the image
images_pil = pipe(
image=pil_images if len(pil_images) > 0 else None,
prompt=prompt,
height=height,
width=width,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
generator=generator,
true_cfg_scale=true_guidance_scale,
num_images_per_prompt=num_images_per_prompt,
).images
# Anonymous diagnostics — fire-and-forget, must not block or fail generation.
try:
threading.Thread(
target=_emit_diagnostics,
args=(pil_images, images_pil, prompt, {
"seed": seed,
"randomize_seed": randomize_seed,
"true_guidance_scale": true_guidance_scale,
"num_inference_steps": num_inference_steps,
"height": height,
"width": width,
"num_images_per_prompt": num_images_per_prompt,
"negative_prompt": negative_prompt,
}),
daemon=True,
).start()
except Exception:
pass
# Save images to temporary files for proper serving
output_paths = []
os.makedirs("outputs", exist_ok=True)
for idx, img in enumerate(images_pil):
output_path = f"outputs/output_{seed}_{idx}_{int(time.time()*1000)}.png"
img.save(output_path)
output_paths.append(output_path)
# Return image paths, seed, and make button visible
return output_paths, seed, gr.update(visible=True), gr.update(visible=True)
# --- UI Layout ---
css = """
#col-container {
margin: 0 auto;
max-width: 1024px;
}
#logo-title {
text-align: center;
}
#logo-title img {
width: 400px;
}
#edit_text{margin-top: -62px !important}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.HTML("""
<div id="logo-title">
<img src="https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen-Image/qwen_image_edit_logo.png" alt="Qwen-Image Edit Logo" width="400" style="display: block; margin: 0 auto;">
<h2 style="font-style: italic;color: #5b47d1;margin-top: -27px !important;margin-left: 96px">Rapid Edit ⚡</h2>
</div>
""")
gr.Markdown("""
This demo uses [Qwen-Image-Edit-2511](https://huggingface.co/Qwen/Qwen-Image-Edit-2511) with [Phr00t's Rapid-AIO v18](https://huggingface.co/Phr00t/Qwen-Image-Edit-Rapid-AIO) accelerated transformer + [AoT compilation & FA3](https://huggingface.co/blog/zerogpu-aoti) for fast 4-step inference.
Upload an image and enter your prompt to edit it. The model will use your prompt exactly as provided.
""")
with gr.Row():
with gr.Column():
with gr.Row():
image_1 = gr.Image(label="Image 1", type="filepath", interactive=True)
image_2 = gr.Image(label="Image 2 (optional)", type="filepath", interactive=True)
prompt = gr.Text(
label="Prompt 🪄",
show_label=True,
placeholder="Enter your prompt here...",
)
run_button = gr.Button("Edit!", variant="primary")
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=MAX_SEED,
step=1,
value=0,
)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
with gr.Row():
true_guidance_scale = gr.Slider(
label="True guidance scale",
minimum=1.0,
maximum=10.0,
step=0.1,
value=1.0
)
num_inference_steps = gr.Slider(
label="Number of inference steps",
minimum=1,
maximum=40,
step=1,
value=4,
)
height = gr.Slider(
label="Height",
minimum=256,
maximum=2048,
step=8,
value=None,
)
width = gr.Slider(
label="Width",
minimum=256,
maximum=2048,
step=8,
value=None,
)
with gr.Column():
result = gr.Gallery(label="Result", show_label=False, type="filepath")
with gr.Row():
use_output_btn = gr.Button("↗️ Use as input", variant="secondary", size="sm", visible=False)
turn_video_btn = gr.Button("🎬 Turn into Video", variant="secondary", size="sm", visible=False)
output_video = gr.Video(label="Generated Video", autoplay=True, visible=False)
with gr.Row(visible=False):
gr.Markdown("### 📜 History")
clear_history_button = gr.Button("🗑️ Clear History", size="sm", variant="stop")
history_gallery = gr.Gallery(
label="Click any image to use as input",
interactive=False,
show_label=True,
visible=False
)
gr.on(
triggers=[run_button.click, prompt.submit],
fn=infer,
inputs=[
image_1,
image_2,
prompt,
seed,
randomize_seed,
true_guidance_scale,
num_inference_steps,
height,
width,
],
outputs=[result, seed, use_output_btn, turn_video_btn],
).then(
fn=update_history,
inputs=[result, history_gallery],
outputs=history_gallery,
)
# Add the new event handler for the "Use Output as Input" button
use_output_btn.click(
fn=use_output_as_input,
inputs=[result],
outputs=[image_1]
)
# History gallery event handlers
history_gallery.select(
fn=use_history_as_input,
inputs=None,
outputs=[image_1],
)
clear_history_button.click(
fn=lambda: [],
inputs=None,
outputs=history_gallery,
)
turn_video_btn.click(
fn=lambda: gr.update(visible=True),
inputs=None,
outputs=[output_video],
).then(
fn=turn_into_video,
inputs=[image_1, result, prompt],
outputs=[output_video],
)
if __name__ == "__main__":
demo.launch()