import shutil import time from pathlib import Path import cv2 import gradio as gr import PIL.Image import torch from diffusers import ( DiffusionPipeline, # type: ignore QwenImageEditPlusPipeline, # type: ignore ) from nunchaku import NunchakuQwenImageTransformer2DModel from nunchaku.utils import get_gpu_memory, get_precision from kofi import SCRIPT DEVICE = "cuda" if torch.cuda.is_available() else "cpu" HEADER = "# [Nunchaku Qwen-Image-Edit-2509](https://huggingface.co/nunchaku-tech/nunchaku-qwen-image-edit-2509)" RANK = 128 PRECISION = get_precision() if DEVICE == "cuda" else "fp4" TRANSFORMER_ID = f"nunchaku-tech/nunchaku-qwen-image-edit-2509/svdq-{PRECISION}_r{RANK}-qwen-image-edit-2509.safetensors" PIPELINE_ID = "Qwen/Qwen-Image-Edit-2509" IMAGE_SIZE = 1024 OUTPUT_DIR = Path(__file__).parent / "output" IMAGES_DIR = OUTPUT_DIR / "images" IMAGES_DIR.mkdir(parents=True, exist_ok=True) VIDEO_PATH = OUTPUT_DIR / "video.mp4" class Model: def __init__(self): self.progress = gr.Progress() self.num_inference_steps = 50 self.current_inference_step = 0 transformer = NunchakuQwenImageTransformer2DModel.from_pretrained( TRANSFORMER_ID, device="cpu", ) pipeline = QwenImageEditPlusPipeline.from_pretrained( PIPELINE_ID, transformer=transformer, torch_dtype=torch.bfloat16, device=DEVICE, ) self.transformer = transformer self.pipeline = pipeline def compute( self, images: list[PIL.Image.Image], prompt: str, negative_prompt: str = " ", true_cfg_scale: float = 4.0, num_inference_steps: int = 40, num_blocks_on_gpu: int = 10, seed: int | None = None, image_width: int = IMAGE_SIZE, image_height: int = IMAGE_SIZE, ) -> PIL.Image.Image: if DEVICE == "cuda": # Optimize memory usage if get_gpu_memory() > 18: self.pipeline.enable_model_cpu_offload() else: self.transformer.set_offload( True, use_pin_memory=False, num_blocks_on_gpu=num_blocks_on_gpu, ) # increase num_blocks_on_gpu if you have more VRAM self.pipeline._exclude_from_cpu_offload.append("transformer") self.pipeline.enable_sequential_cpu_offload() self.num_inference_steps = num_inference_steps self.current_inference_step = 0 self.progress((self.current_inference_step, self.num_inference_steps)) self.image_width = image_width self.image_height = image_height shutil.rmtree(IMAGES_DIR, ignore_errors=True) IMAGES_DIR.mkdir(parents=True, exist_ok=True) # Validate inputs if not images: raise gr.Error("No images provided. Please upload at least one image.") # Ensure all images are valid PIL Images processed_images = [] for i, img in enumerate(images): if img is None: raise gr.Error(f"Image {i + 1} is invalid or could not be loaded.") processed_images.append(img) seed = seed if seed is not None else int(time.time()) inputs = dict( image=processed_images, prompt=prompt, negative_prompt=negative_prompt, true_cfg_scale=true_cfg_scale, num_inference_steps=num_inference_steps, width=self.image_width, height=self.image_height, generator=torch.manual_seed(seed), callback_on_step_end=self.callback, # output_type="latent" ) output = self.pipeline(**inputs) output_image = output.images[0] # Create video from saved images if image_files := sorted(IMAGES_DIR.glob("step_*.png")): fourcc = cv2.VideoWriter_fourcc(*"mp4v") fps = 10 # Adjust frame rate as needed video_writer = cv2.VideoWriter( str(VIDEO_PATH.absolute()), fourcc, fps, (self.image_width, self.image_height), ) for img_path in image_files: img = cv2.imread(str(img_path)) video_writer.write(img) video_writer.release() return output_image def callback( self, pipeline: DiffusionPipeline, step: int, timestep: int, callback_kwargs: dict, ): latents = callback_kwargs.get("latents", None) if latents is not None: # print(f"Latents shape: {latents.shape}, dtype: {latents.dtype}") latents = pipeline._unpack_latents( latents, self.image_height, self.image_width, pipeline.vae_scale_factor ) latents = latents.to(pipeline.vae.dtype) latents_mean = ( torch.tensor(pipeline.vae.config.latents_mean) .view(1, pipeline.vae.config.z_dim, 1, 1, 1) .to(latents.device, latents.dtype) ) latents_std = 1.0 / torch.tensor(pipeline.vae.config.latents_std).view( 1, pipeline.vae.config.z_dim, 1, 1, 1 ).to(latents.device, latents.dtype) latents = latents / latents_std + latents_mean image = pipeline.vae.decode(latents, return_dict=False)[0][:, :, 0] image = pipeline.image_processor.postprocess(image, output_type="pil") image = image[0] image.save(IMAGES_DIR / f"step_{step:03d}.png") self.current_inference_step += 1 self.progress((self.current_inference_step, self.num_inference_steps)) return {} with gr.Blocks(js=SCRIPT) as demo: title = gr.Markdown(HEADER) with gr.Row(): with gr.Column(): gr.Markdown("## Input Images") image_inputs = gr.Gallery( label="Input Images", show_label=True, elem_id="gallery", columns=3, rows=2, object_fit="contain", height="auto", type="pil", allow_preview=True, interactive=True, ) with gr.Column(): gr.Markdown("## Output Image") image_output = gr.Image( label="Output Image", format="png", ) with gr.Row(): download_image_button = gr.DownloadButton( label="Download Image", visible=False, ) download_video_button = gr.DownloadButton( label="Download Video", visible=False, ) with gr.Row(): with gr.Column(): gr.Markdown("## Prompts") prompt = gr.Textbox(label="Prompt:", lines=3) negative_prompt = gr.Textbox(label="Negative Prompt:", lines=3) with gr.Column(): gr.Markdown("## Settings") true_cfg_scale = gr.Slider( 0, 20, value=4.0, step=0.1, interactive=True, label="True CFG scale:", ) num_inference_steps = gr.Slider( 1, 300, value=50, step=1, interactive=True, label="Number of denoising steps:", ) num_blocks_on_gpu = gr.Slider( 1, 100, value=10, step=1, interactive=True, label="Number of blocks on GPU:", ) seed = gr.Number(label="Seed:", value=None) with gr.Row(): run_button = gr.Button("Run") with gr.Row(): if DEVICE != "cuda": gr.Markdown( "⚠️ **CUDA not available.** This application requires a CUDA-compatible GPU to function properly. You can duplicate this space with a CUDA-enabled runtime." ) with gr.Row(): gr.HTML('
') def preprocess_image( image: PIL.Image.Image, image_size: int = IMAGE_SIZE, ) -> PIL.Image.Image: image = image.convert("RGB") width, height = image.size if width > height: new_width = image_size new_height = int(height * (image_size / width)) else: new_height = image_size new_width = int(width * (image_size / height)) content = image.resize((new_width, new_height), PIL.Image.LANCZOS) image = PIL.Image.new("RGB", (image_size, image_size), (255, 255, 255)) paste_x = (image_size - new_width) // 2 paste_y = (image_size - new_height) // 2 image.paste(content, (paste_x, paste_y)) return image def process_images( images, prompt, negative_prompt, true_cfg_scale, num_inference_steps, num_blocks_on_gpu, seed, ): if DEVICE == "cuda": torch.cuda.empty_cache() pil_images = [] for contents in images: for content in contents: if isinstance(content, PIL.Image.Image): content = preprocess_image(content, image_size=IMAGE_SIZE) pil_images.append(content) break try: model = Model() try: output_image = model.compute( pil_images, prompt, negative_prompt, true_cfg_scale, num_inference_steps, num_blocks_on_gpu, seed, ) except Exception: if DEVICE == "cuda": torch.cuda.empty_cache() # Save the output image for download timestamp = int(time.time()) output_image_path = IMAGES_DIR / f"output_{timestamp}.png" output_image.save(output_image_path) # Check if video exists video_exists = VIDEO_PATH.exists() return ( output_image, gr.update(visible=True, value=str(output_image_path)), gr.update( visible=video_exists, value=str(VIDEO_PATH) if video_exists else None, ), ) except Exception as e: print(f"Error processing images: {e}") raise gr.Error(f"Failed to process images: {str(e)}") # Connect the button to the detection function run_button.click( fn=process_images, inputs=[ image_inputs, prompt, negative_prompt, true_cfg_scale, num_inference_steps, num_blocks_on_gpu, seed, ], outputs=[ image_output, download_image_button, download_video_button, ], ) if __name__ == "__main__": demo.launch( allowed_paths=["output/video.mp4"], share=False, )