Darius Morawiec
chore: Update UI
b196700
import shutil
import time
from pathlib import Path
import cv2
import gradio as gr
import PIL.Image
import torch
from diffusers import (
DiffusionPipeline, # type: ignore
QwenImageEditPlusPipeline, # type: ignore
)
from nunchaku import NunchakuQwenImageTransformer2DModel
from nunchaku.utils import get_gpu_memory, get_precision
from kofi import SCRIPT
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
HEADER = "# [Nunchaku Qwen-Image-Edit-2509](https://huggingface.co/nunchaku-tech/nunchaku-qwen-image-edit-2509)"
RANK = 128
PRECISION = get_precision() if DEVICE == "cuda" else "fp4"
TRANSFORMER_ID = f"nunchaku-tech/nunchaku-qwen-image-edit-2509/svdq-{PRECISION}_r{RANK}-qwen-image-edit-2509.safetensors"
PIPELINE_ID = "Qwen/Qwen-Image-Edit-2509"
IMAGE_SIZE = 1024
OUTPUT_DIR = Path(__file__).parent / "output"
IMAGES_DIR = OUTPUT_DIR / "images"
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
VIDEO_PATH = OUTPUT_DIR / "video.mp4"
class Model:
def __init__(self):
self.progress = gr.Progress()
self.num_inference_steps = 50
self.current_inference_step = 0
transformer = NunchakuQwenImageTransformer2DModel.from_pretrained(
TRANSFORMER_ID,
device="cpu",
)
pipeline = QwenImageEditPlusPipeline.from_pretrained(
PIPELINE_ID,
transformer=transformer,
torch_dtype=torch.bfloat16,
device=DEVICE,
)
self.transformer = transformer
self.pipeline = pipeline
def compute(
self,
images: list[PIL.Image.Image],
prompt: str,
negative_prompt: str = " ",
true_cfg_scale: float = 4.0,
num_inference_steps: int = 40,
num_blocks_on_gpu: int = 10,
seed: int | None = None,
image_width: int = IMAGE_SIZE,
image_height: int = IMAGE_SIZE,
) -> PIL.Image.Image:
if DEVICE == "cuda":
# Optimize memory usage
if get_gpu_memory() > 18:
self.pipeline.enable_model_cpu_offload()
else:
self.transformer.set_offload(
True,
use_pin_memory=False,
num_blocks_on_gpu=num_blocks_on_gpu,
) # increase num_blocks_on_gpu if you have more VRAM
self.pipeline._exclude_from_cpu_offload.append("transformer")
self.pipeline.enable_sequential_cpu_offload()
self.num_inference_steps = num_inference_steps
self.current_inference_step = 0
self.progress((self.current_inference_step, self.num_inference_steps))
self.image_width = image_width
self.image_height = image_height
shutil.rmtree(IMAGES_DIR, ignore_errors=True)
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
# Validate inputs
if not images:
raise gr.Error("No images provided. Please upload at least one image.")
# Ensure all images are valid PIL Images
processed_images = []
for i, img in enumerate(images):
if img is None:
raise gr.Error(f"Image {i + 1} is invalid or could not be loaded.")
processed_images.append(img)
seed = seed if seed is not None else int(time.time())
inputs = dict(
image=processed_images,
prompt=prompt,
negative_prompt=negative_prompt,
true_cfg_scale=true_cfg_scale,
num_inference_steps=num_inference_steps,
width=self.image_width,
height=self.image_height,
generator=torch.manual_seed(seed),
callback_on_step_end=self.callback,
# output_type="latent"
)
output = self.pipeline(**inputs)
output_image = output.images[0]
# Create video from saved images
if image_files := sorted(IMAGES_DIR.glob("step_*.png")):
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fps = 10 # Adjust frame rate as needed
video_writer = cv2.VideoWriter(
str(VIDEO_PATH.absolute()),
fourcc,
fps,
(self.image_width, self.image_height),
)
for img_path in image_files:
img = cv2.imread(str(img_path))
video_writer.write(img)
video_writer.release()
return output_image
def callback(
self,
pipeline: DiffusionPipeline,
step: int,
timestep: int,
callback_kwargs: dict,
):
latents = callback_kwargs.get("latents", None)
if latents is not None:
# print(f"Latents shape: {latents.shape}, dtype: {latents.dtype}")
latents = pipeline._unpack_latents(
latents, self.image_height, self.image_width, pipeline.vae_scale_factor
)
latents = latents.to(pipeline.vae.dtype)
latents_mean = (
torch.tensor(pipeline.vae.config.latents_mean)
.view(1, pipeline.vae.config.z_dim, 1, 1, 1)
.to(latents.device, latents.dtype)
)
latents_std = 1.0 / torch.tensor(pipeline.vae.config.latents_std).view(
1, pipeline.vae.config.z_dim, 1, 1, 1
).to(latents.device, latents.dtype)
latents = latents / latents_std + latents_mean
image = pipeline.vae.decode(latents, return_dict=False)[0][:, :, 0]
image = pipeline.image_processor.postprocess(image, output_type="pil")
image = image[0]
image.save(IMAGES_DIR / f"step_{step:03d}.png")
self.current_inference_step += 1
self.progress((self.current_inference_step, self.num_inference_steps))
return {}
with gr.Blocks(js=SCRIPT) as demo:
title = gr.Markdown(HEADER)
with gr.Row():
with gr.Column():
gr.Markdown("## Input Images")
image_inputs = gr.Gallery(
label="Input Images",
show_label=True,
elem_id="gallery",
columns=3,
rows=2,
object_fit="contain",
height="auto",
type="pil",
allow_preview=True,
interactive=True,
)
with gr.Column():
gr.Markdown("## Output Image")
image_output = gr.Image(
label="Output Image",
format="png",
)
with gr.Row():
download_image_button = gr.DownloadButton(
label="Download Image",
visible=False,
)
download_video_button = gr.DownloadButton(
label="Download Video",
visible=False,
)
with gr.Row():
with gr.Column():
gr.Markdown("## Prompts")
prompt = gr.Textbox(label="Prompt:", lines=3)
negative_prompt = gr.Textbox(label="Negative Prompt:", lines=3)
with gr.Column():
gr.Markdown("## Settings")
true_cfg_scale = gr.Slider(
0,
20,
value=4.0,
step=0.1,
interactive=True,
label="True CFG scale:",
)
num_inference_steps = gr.Slider(
1,
300,
value=50,
step=1,
interactive=True,
label="Number of denoising steps:",
)
num_blocks_on_gpu = gr.Slider(
1,
100,
value=10,
step=1,
interactive=True,
label="Number of blocks on GPU:",
)
seed = gr.Number(label="Seed:", value=None)
with gr.Row():
run_button = gr.Button("Run")
with gr.Row():
if DEVICE != "cuda":
gr.Markdown(
"⚠️ **CUDA not available.** This application requires a CUDA-compatible GPU to function properly. You can duplicate this space with a CUDA-enabled runtime."
)
with gr.Row():
gr.HTML('<div id="kofi" style="text-align: center;"></div>')
def preprocess_image(
image: PIL.Image.Image,
image_size: int = IMAGE_SIZE,
) -> PIL.Image.Image:
image = image.convert("RGB")
width, height = image.size
if width > height:
new_width = image_size
new_height = int(height * (image_size / width))
else:
new_height = image_size
new_width = int(width * (image_size / height))
content = image.resize((new_width, new_height), PIL.Image.LANCZOS)
image = PIL.Image.new("RGB", (image_size, image_size), (255, 255, 255))
paste_x = (image_size - new_width) // 2
paste_y = (image_size - new_height) // 2
image.paste(content, (paste_x, paste_y))
return image
def process_images(
images,
prompt,
negative_prompt,
true_cfg_scale,
num_inference_steps,
num_blocks_on_gpu,
seed,
):
if DEVICE == "cuda":
torch.cuda.empty_cache()
pil_images = []
for contents in images:
for content in contents:
if isinstance(content, PIL.Image.Image):
content = preprocess_image(content, image_size=IMAGE_SIZE)
pil_images.append(content)
break
try:
model = Model()
try:
output_image = model.compute(
pil_images,
prompt,
negative_prompt,
true_cfg_scale,
num_inference_steps,
num_blocks_on_gpu,
seed,
)
except Exception:
if DEVICE == "cuda":
torch.cuda.empty_cache()
# Save the output image for download
timestamp = int(time.time())
output_image_path = IMAGES_DIR / f"output_{timestamp}.png"
output_image.save(output_image_path)
# Check if video exists
video_exists = VIDEO_PATH.exists()
return (
output_image,
gr.update(visible=True, value=str(output_image_path)),
gr.update(
visible=video_exists,
value=str(VIDEO_PATH) if video_exists else None,
),
)
except Exception as e:
print(f"Error processing images: {e}")
raise gr.Error(f"Failed to process images: {str(e)}")
# Connect the button to the detection function
run_button.click(
fn=process_images,
inputs=[
image_inputs,
prompt,
negative_prompt,
true_cfg_scale,
num_inference_steps,
num_blocks_on_gpu,
seed,
],
outputs=[
image_output,
download_image_button,
download_video_button,
],
)
if __name__ == "__main__":
demo.launch(
allowed_paths=["output/video.mp4"],
share=False,
)