|
|
import shutil |
|
|
import time |
|
|
from pathlib import Path |
|
|
|
|
|
import cv2 |
|
|
import gradio as gr |
|
|
import PIL.Image |
|
|
import torch |
|
|
from diffusers import ( |
|
|
DiffusionPipeline, |
|
|
QwenImageEditPlusPipeline, |
|
|
) |
|
|
from nunchaku import NunchakuQwenImageTransformer2DModel |
|
|
from nunchaku.utils import get_gpu_memory, get_precision |
|
|
|
|
|
from kofi import SCRIPT |
|
|
|
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
HEADER = "# [Nunchaku Qwen-Image-Edit-2509](https://huggingface.co/nunchaku-tech/nunchaku-qwen-image-edit-2509)" |
|
|
|
|
|
RANK = 128 |
|
|
PRECISION = get_precision() if DEVICE == "cuda" else "fp4" |
|
|
TRANSFORMER_ID = f"nunchaku-tech/nunchaku-qwen-image-edit-2509/svdq-{PRECISION}_r{RANK}-qwen-image-edit-2509.safetensors" |
|
|
PIPELINE_ID = "Qwen/Qwen-Image-Edit-2509" |
|
|
|
|
|
IMAGE_SIZE = 1024 |
|
|
|
|
|
OUTPUT_DIR = Path(__file__).parent / "output" |
|
|
IMAGES_DIR = OUTPUT_DIR / "images" |
|
|
IMAGES_DIR.mkdir(parents=True, exist_ok=True) |
|
|
VIDEO_PATH = OUTPUT_DIR / "video.mp4" |
|
|
|
|
|
|
|
|
class Model: |
|
|
def __init__(self): |
|
|
self.progress = gr.Progress() |
|
|
self.num_inference_steps = 50 |
|
|
self.current_inference_step = 0 |
|
|
|
|
|
transformer = NunchakuQwenImageTransformer2DModel.from_pretrained( |
|
|
TRANSFORMER_ID, |
|
|
device="cpu", |
|
|
) |
|
|
|
|
|
pipeline = QwenImageEditPlusPipeline.from_pretrained( |
|
|
PIPELINE_ID, |
|
|
transformer=transformer, |
|
|
torch_dtype=torch.bfloat16, |
|
|
device=DEVICE, |
|
|
) |
|
|
|
|
|
self.transformer = transformer |
|
|
self.pipeline = pipeline |
|
|
|
|
|
def compute( |
|
|
self, |
|
|
images: list[PIL.Image.Image], |
|
|
prompt: str, |
|
|
negative_prompt: str = " ", |
|
|
true_cfg_scale: float = 4.0, |
|
|
num_inference_steps: int = 40, |
|
|
num_blocks_on_gpu: int = 10, |
|
|
seed: int | None = None, |
|
|
image_width: int = IMAGE_SIZE, |
|
|
image_height: int = IMAGE_SIZE, |
|
|
) -> PIL.Image.Image: |
|
|
if DEVICE == "cuda": |
|
|
|
|
|
if get_gpu_memory() > 18: |
|
|
self.pipeline.enable_model_cpu_offload() |
|
|
else: |
|
|
self.transformer.set_offload( |
|
|
True, |
|
|
use_pin_memory=False, |
|
|
num_blocks_on_gpu=num_blocks_on_gpu, |
|
|
) |
|
|
self.pipeline._exclude_from_cpu_offload.append("transformer") |
|
|
self.pipeline.enable_sequential_cpu_offload() |
|
|
|
|
|
self.num_inference_steps = num_inference_steps |
|
|
self.current_inference_step = 0 |
|
|
self.progress((self.current_inference_step, self.num_inference_steps)) |
|
|
|
|
|
self.image_width = image_width |
|
|
self.image_height = image_height |
|
|
|
|
|
shutil.rmtree(IMAGES_DIR, ignore_errors=True) |
|
|
IMAGES_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
if not images: |
|
|
raise gr.Error("No images provided. Please upload at least one image.") |
|
|
|
|
|
|
|
|
processed_images = [] |
|
|
for i, img in enumerate(images): |
|
|
if img is None: |
|
|
raise gr.Error(f"Image {i + 1} is invalid or could not be loaded.") |
|
|
processed_images.append(img) |
|
|
|
|
|
seed = seed if seed is not None else int(time.time()) |
|
|
|
|
|
inputs = dict( |
|
|
image=processed_images, |
|
|
prompt=prompt, |
|
|
negative_prompt=negative_prompt, |
|
|
true_cfg_scale=true_cfg_scale, |
|
|
num_inference_steps=num_inference_steps, |
|
|
width=self.image_width, |
|
|
height=self.image_height, |
|
|
generator=torch.manual_seed(seed), |
|
|
callback_on_step_end=self.callback, |
|
|
|
|
|
) |
|
|
output = self.pipeline(**inputs) |
|
|
output_image = output.images[0] |
|
|
|
|
|
|
|
|
if image_files := sorted(IMAGES_DIR.glob("step_*.png")): |
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
fps = 10 |
|
|
video_writer = cv2.VideoWriter( |
|
|
str(VIDEO_PATH.absolute()), |
|
|
fourcc, |
|
|
fps, |
|
|
(self.image_width, self.image_height), |
|
|
) |
|
|
for img_path in image_files: |
|
|
img = cv2.imread(str(img_path)) |
|
|
video_writer.write(img) |
|
|
video_writer.release() |
|
|
|
|
|
return output_image |
|
|
|
|
|
def callback( |
|
|
self, |
|
|
pipeline: DiffusionPipeline, |
|
|
step: int, |
|
|
timestep: int, |
|
|
callback_kwargs: dict, |
|
|
): |
|
|
latents = callback_kwargs.get("latents", None) |
|
|
|
|
|
if latents is not None: |
|
|
|
|
|
|
|
|
latents = pipeline._unpack_latents( |
|
|
latents, self.image_height, self.image_width, pipeline.vae_scale_factor |
|
|
) |
|
|
latents = latents.to(pipeline.vae.dtype) |
|
|
latents_mean = ( |
|
|
torch.tensor(pipeline.vae.config.latents_mean) |
|
|
.view(1, pipeline.vae.config.z_dim, 1, 1, 1) |
|
|
.to(latents.device, latents.dtype) |
|
|
) |
|
|
latents_std = 1.0 / torch.tensor(pipeline.vae.config.latents_std).view( |
|
|
1, pipeline.vae.config.z_dim, 1, 1, 1 |
|
|
).to(latents.device, latents.dtype) |
|
|
latents = latents / latents_std + latents_mean |
|
|
image = pipeline.vae.decode(latents, return_dict=False)[0][:, :, 0] |
|
|
image = pipeline.image_processor.postprocess(image, output_type="pil") |
|
|
image = image[0] |
|
|
|
|
|
image.save(IMAGES_DIR / f"step_{step:03d}.png") |
|
|
|
|
|
self.current_inference_step += 1 |
|
|
self.progress((self.current_inference_step, self.num_inference_steps)) |
|
|
|
|
|
return {} |
|
|
|
|
|
|
|
|
with gr.Blocks(js=SCRIPT) as demo: |
|
|
title = gr.Markdown(HEADER) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("## Input Images") |
|
|
|
|
|
image_inputs = gr.Gallery( |
|
|
label="Input Images", |
|
|
show_label=True, |
|
|
elem_id="gallery", |
|
|
columns=3, |
|
|
rows=2, |
|
|
object_fit="contain", |
|
|
height="auto", |
|
|
type="pil", |
|
|
allow_preview=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("## Output Image") |
|
|
|
|
|
image_output = gr.Image( |
|
|
label="Output Image", |
|
|
format="png", |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
download_image_button = gr.DownloadButton( |
|
|
label="Download Image", |
|
|
visible=False, |
|
|
) |
|
|
|
|
|
download_video_button = gr.DownloadButton( |
|
|
label="Download Video", |
|
|
visible=False, |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("## Prompts") |
|
|
|
|
|
prompt = gr.Textbox(label="Prompt:", lines=3) |
|
|
negative_prompt = gr.Textbox(label="Negative Prompt:", lines=3) |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("## Settings") |
|
|
|
|
|
true_cfg_scale = gr.Slider( |
|
|
0, |
|
|
20, |
|
|
value=4.0, |
|
|
step=0.1, |
|
|
interactive=True, |
|
|
label="True CFG scale:", |
|
|
) |
|
|
|
|
|
num_inference_steps = gr.Slider( |
|
|
1, |
|
|
300, |
|
|
value=50, |
|
|
step=1, |
|
|
interactive=True, |
|
|
label="Number of denoising steps:", |
|
|
) |
|
|
|
|
|
num_blocks_on_gpu = gr.Slider( |
|
|
1, |
|
|
100, |
|
|
value=10, |
|
|
step=1, |
|
|
interactive=True, |
|
|
label="Number of blocks on GPU:", |
|
|
) |
|
|
|
|
|
seed = gr.Number(label="Seed:", value=None) |
|
|
|
|
|
with gr.Row(): |
|
|
run_button = gr.Button("Run") |
|
|
|
|
|
with gr.Row(): |
|
|
if DEVICE != "cuda": |
|
|
gr.Markdown( |
|
|
"⚠️ **CUDA not available.** This application requires a CUDA-compatible GPU to function properly. You can duplicate this space with a CUDA-enabled runtime." |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
gr.HTML('<div id="kofi" style="text-align: center;"></div>') |
|
|
|
|
|
def preprocess_image( |
|
|
image: PIL.Image.Image, |
|
|
image_size: int = IMAGE_SIZE, |
|
|
) -> PIL.Image.Image: |
|
|
image = image.convert("RGB") |
|
|
width, height = image.size |
|
|
if width > height: |
|
|
new_width = image_size |
|
|
new_height = int(height * (image_size / width)) |
|
|
else: |
|
|
new_height = image_size |
|
|
new_width = int(width * (image_size / height)) |
|
|
content = image.resize((new_width, new_height), PIL.Image.LANCZOS) |
|
|
image = PIL.Image.new("RGB", (image_size, image_size), (255, 255, 255)) |
|
|
paste_x = (image_size - new_width) // 2 |
|
|
paste_y = (image_size - new_height) // 2 |
|
|
image.paste(content, (paste_x, paste_y)) |
|
|
return image |
|
|
|
|
|
def process_images( |
|
|
images, |
|
|
prompt, |
|
|
negative_prompt, |
|
|
true_cfg_scale, |
|
|
num_inference_steps, |
|
|
num_blocks_on_gpu, |
|
|
seed, |
|
|
): |
|
|
if DEVICE == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
pil_images = [] |
|
|
for contents in images: |
|
|
for content in contents: |
|
|
if isinstance(content, PIL.Image.Image): |
|
|
content = preprocess_image(content, image_size=IMAGE_SIZE) |
|
|
pil_images.append(content) |
|
|
break |
|
|
|
|
|
try: |
|
|
model = Model() |
|
|
|
|
|
try: |
|
|
output_image = model.compute( |
|
|
pil_images, |
|
|
prompt, |
|
|
negative_prompt, |
|
|
true_cfg_scale, |
|
|
num_inference_steps, |
|
|
num_blocks_on_gpu, |
|
|
seed, |
|
|
) |
|
|
except Exception: |
|
|
if DEVICE == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
timestamp = int(time.time()) |
|
|
output_image_path = IMAGES_DIR / f"output_{timestamp}.png" |
|
|
output_image.save(output_image_path) |
|
|
|
|
|
|
|
|
video_exists = VIDEO_PATH.exists() |
|
|
|
|
|
return ( |
|
|
output_image, |
|
|
gr.update(visible=True, value=str(output_image_path)), |
|
|
gr.update( |
|
|
visible=video_exists, |
|
|
value=str(VIDEO_PATH) if video_exists else None, |
|
|
), |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error processing images: {e}") |
|
|
raise gr.Error(f"Failed to process images: {str(e)}") |
|
|
|
|
|
|
|
|
run_button.click( |
|
|
fn=process_images, |
|
|
inputs=[ |
|
|
image_inputs, |
|
|
prompt, |
|
|
negative_prompt, |
|
|
true_cfg_scale, |
|
|
num_inference_steps, |
|
|
num_blocks_on_gpu, |
|
|
seed, |
|
|
], |
|
|
outputs=[ |
|
|
image_output, |
|
|
download_image_button, |
|
|
download_video_button, |
|
|
], |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch( |
|
|
allowed_paths=["output/video.mp4"], |
|
|
share=False, |
|
|
) |
|
|
|