Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import numpy as np | |
| import random | |
| import torch | |
| import spaces | |
| from PIL import Image | |
| from diffusers import FlowMatchEulerDiscreteScheduler, QwenImageEditPlusPipeline | |
| # from optimization import optimize_pipeline_ | |
| # from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline | |
| # from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel | |
| # from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3 | |
| import math | |
| import os | |
| # --- Environment Variables for Model, LoRA and Prompts --- | |
| BASE_MODEL = os.environ.get("BASE_MODEL", "Qwen/Qwen-Image-Edit-2511") | |
| LIGHTNING_LORA_REPO = os.environ.get("LIGHTNING_LORA_REPO", "lightx2v/Qwen-Image-Edit-2511-Lightning") | |
| LIGHTNING_LORA_WEIGHT = os.environ.get("LIGHTNING_LORA_WEIGHT", "Qwen-Image-Edit-2511-Lightning-4steps-V1.0-bf16.safetensors") | |
| STAGE1_LORA_REPO = os.environ.get("STAGE1_LORA_REPO", "default/stage1-lora") | |
| STAGE1_LORA_WEIGHT = os.environ.get("STAGE1_LORA_WEIGHT", "stage1.safetensors") | |
| STAGE2_LORA_REPO = os.environ.get("STAGE2_LORA_REPO", "default/stage2-lora") | |
| STAGE2_LORA_WEIGHT = os.environ.get("STAGE2_LORA_WEIGHT", "stage2.safetensors") | |
| STAGE1_WEIGHT_DEFAULT = float(os.environ.get("STAGE1_WEIGHT_DEFAULT", "1.0")) | |
| STAGE2_WEIGHT_DEFAULT = float(os.environ.get("STAGE2_WEIGHT_DEFAULT", "1.0")) | |
| STAGE1_PROMPT = os.environ.get("STAGE1_PROMPT", "Convert anime character to base body structure") | |
| STAGE2_PROMPT = os.environ.get("STAGE2_PROMPT", "Convert base body to clear guide body with structure lines") | |
| # --- Model Loading --- | |
| dtype = torch.bfloat16 | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Scheduler configuration for Lightning | |
| scheduler_config = { | |
| "base_image_seq_len": 256, | |
| "base_shift": math.log(3), | |
| "invert_sigmas": False, | |
| "max_image_seq_len": 8192, | |
| "max_shift": math.log(3), | |
| "num_train_timesteps": 1000, | |
| "shift": 1.0, | |
| "shift_terminal": None, | |
| "stochastic_sampling": False, | |
| "time_shift_type": "exponential", | |
| "use_beta_sigmas": False, | |
| "use_dynamic_shifting": True, | |
| "use_exponential_sigmas": False, | |
| "use_karras_sigmas": False, | |
| } | |
| # Initialize scheduler with Lightning config | |
| scheduler = FlowMatchEulerDiscreteScheduler.from_config(scheduler_config) | |
| # Load single shared pipeline | |
| pipe = QwenImageEditPlusPipeline.from_pretrained(BASE_MODEL, | |
| scheduler=scheduler, | |
| torch_dtype=dtype).to(device) | |
| # Load all LoRAs but don't fuse yet | |
| # Load 4-step Lightning LoRA | |
| pipe.load_lora_weights( | |
| LIGHTNING_LORA_REPO, | |
| weight_name=LIGHTNING_LORA_WEIGHT, | |
| adapter_name="lightning" | |
| ) | |
| # Load Stage 1 LoRA | |
| pipe.load_lora_weights(STAGE1_LORA_REPO, weight_name=STAGE1_LORA_WEIGHT, adapter_name="stage1") | |
| # Load Stage 2 LoRA | |
| pipe.load_lora_weights(STAGE2_LORA_REPO, weight_name=STAGE2_LORA_WEIGHT, adapter_name="stage2") | |
| # # Apply the same optimizations from the first version | |
| # pipe.transformer.__class__ = QwenImageTransformer2DModel | |
| # pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) | |
| # # --- Ahead-of-time compilation --- | |
| # optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt") | |
| # --- UI Constants --- | |
| MAX_SEED = np.iinfo(np.int32).max | |
| # --- Main Inference Function (Split into two stages) --- | |
| def infer_stage2( | |
| image, | |
| seed=42, | |
| randomize_seed=False, | |
| true_guidance_scale=1.0, | |
| num_inference_steps=4, | |
| height=None, | |
| width=None, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """ | |
| Run stage2-only inference. | |
| Returns: | |
| (stage2_only_image, image, seed, true_guidance_scale, num_inference_steps, height, width) | |
| """ | |
| # Hardcode the negative prompt | |
| negative_prompt = " " | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| # Set up the generator for reproducibility | |
| generator = torch.Generator(device=device).manual_seed(seed) | |
| # Load input image into PIL Image | |
| pil_image = None | |
| if image is not None: | |
| if isinstance(image, Image.Image): | |
| pil_image = image.convert("RGB") | |
| elif isinstance(image, str): | |
| pil_image = Image.open(image).convert("RGB") | |
| if height==256 and width==256: | |
| height, width = None, None | |
| # Stage2-only generation | |
| print("Generating with Stage2 LoRA only...") | |
| print(f"Prompt: '{STAGE2_PROMPT}'") | |
| print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}") | |
| print("LoRA Weights - Stage2: 1.0") | |
| pipe.set_adapters(["lightning", "stage2"], adapter_weights=[1.0, 1.0]) | |
| stage2_images = pipe( | |
| image=[pil_image] if pil_image is not None else None, | |
| prompt=STAGE2_PROMPT, | |
| height=height, | |
| width=width, | |
| negative_prompt=negative_prompt, | |
| num_inference_steps=num_inference_steps, | |
| generator=generator, | |
| true_cfg_scale=true_guidance_scale, | |
| num_images_per_prompt=1, | |
| ).images | |
| stage2_only_image = stage2_images[0] if stage2_images else None | |
| return stage2_only_image, image, seed, true_guidance_scale, num_inference_steps, height, width | |
| def infer_combined( | |
| image, | |
| seed, | |
| true_guidance_scale, | |
| num_inference_steps, | |
| height, | |
| width, | |
| stage1_weight, | |
| stage2_weight, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """ | |
| Run combined LoRAs inference. | |
| Returns: | |
| result_image | |
| """ | |
| # Hardcode the negative prompt | |
| negative_prompt = " " | |
| # Set up the generator for reproducibility | |
| generator = torch.Generator(device=device).manual_seed(seed) | |
| # Load input image into PIL Image | |
| pil_image = None | |
| if image is not None: | |
| if isinstance(image, Image.Image): | |
| pil_image = image.convert("RGB") | |
| elif isinstance(image, str): | |
| pil_image = Image.open(image).convert("RGB") | |
| if height==256 and width==256: | |
| height, width = None, None | |
| # --- Combined generation --- | |
| print(f"Generating with combined LoRAs...") | |
| print(f"Prompt: '{STAGE1_PROMPT}'") | |
| print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}") | |
| print(f"LoRA Weights - Lightning: 1.0, Stage1: {stage1_weight}, Stage2: {stage2_weight}") | |
| # Set all adapters with custom weights | |
| pipe.set_adapters(["lightning", "stage1", "stage2"], adapter_weights=[1.0, stage1_weight, stage2_weight]) | |
| result_images = pipe( | |
| image=[pil_image] if pil_image is not None else None, | |
| prompt=STAGE1_PROMPT, | |
| height=height, | |
| width=width, | |
| negative_prompt=negative_prompt, | |
| num_inference_steps=num_inference_steps, | |
| generator=generator, | |
| true_cfg_scale=true_guidance_scale, | |
| num_images_per_prompt=1, | |
| ).images | |
| # Alpha blend (0.75) | |
| if result_images and pil_image is not None: | |
| generated_image = result_images[0] | |
| # Resize input image to match generated image size if different | |
| if pil_image.size != generated_image.size: | |
| pil_image = pil_image.resize(generated_image.size, Image.Resampling.LANCZOS) | |
| blended_image = Image.blend(pil_image, generated_image, alpha=0.75) | |
| return blended_image | |
| # Return first result image | |
| return result_images[0] if result_images else None | |
| # --- Examples and UI Layout --- | |
| examples = [] | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 900px; | |
| } | |
| #logo-title { | |
| text-align: center; | |
| } | |
| """ | |
| with gr.Blocks() as demo: | |
| with gr.Column(elem_id="col-container"): | |
| gr.HTML(""" | |
| <div id="logo-title"> | |
| <h1>🎨✨ Qwen Image Edit 2509 - Visualize Body Structure Lines</h1> | |
| <h3 style="color: #5b47d1;">Anime Character Converter with Combined LoRAs</h3> | |
| <p>Author: <a href="https://x.com/Yeq6X" target="_blank" rel="noopener">X @Yeq6X</a></p> | |
| </div> | |
| """) | |
| # Hidden state components to pass data between stages | |
| state_image = gr.State() | |
| state_seed = gr.State() | |
| state_guidance = gr.State() | |
| state_steps = gr.State() | |
| state_height = gr.State() | |
| state_width = gr.State() | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📥 Input") | |
| input_image = gr.Image(label="Input Image", | |
| show_label=False, | |
| type="pil", | |
| interactive=True, | |
| elem_id="input-image", | |
| height=380) | |
| run_button = gr.Button("🚀 Generate", variant="primary", size="lg") | |
| gr.HTML(""" | |
| <script> | |
| (function () { | |
| function bindDrop() { | |
| var root = document.getElementById("input-image"); | |
| if (!root || root.dataset.dropBound === "1") return; | |
| function prevent(e) { | |
| e.preventDefault(); | |
| e.stopPropagation(); | |
| } | |
| function findInput() { | |
| return root.querySelector('input[type="file"]') || root.querySelector("input"); | |
| } | |
| function onDrop(e) { | |
| prevent(e); | |
| var files = e.dataTransfer && e.dataTransfer.files; | |
| if (!files || files.length === 0) return; | |
| var input = findInput(); | |
| if (!input) return; | |
| var dt = new DataTransfer(); | |
| dt.items.add(files[0]); | |
| input.files = dt.files; | |
| input.dispatchEvent(new Event("change", { bubbles: true })); | |
| } | |
| root.addEventListener("dragenter", prevent, true); | |
| root.addEventListener("dragover", prevent, true); | |
| root.addEventListener("drop", onDrop, true); | |
| root.dataset.dropBound = "1"; | |
| } | |
| var observer = new MutationObserver(function () { | |
| bindDrop(); | |
| }); | |
| observer.observe(document.body, { childList: true, subtree: true }); | |
| window.addEventListener("load", function () { | |
| bindDrop(); | |
| }); | |
| setTimeout(bindDrop, 1000); | |
| })(); | |
| </script> | |
| """) | |
| with gr.Column(scale=2): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 🧪 Result1") | |
| stage2_result = gr.Image(label="Result1", show_label=False, type="pil", interactive=False, height=350) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📤 Result2") | |
| result = gr.Image(label="Result2", show_label=False, type="pil", interactive=False, height=350) | |
| with gr.Accordion("Advanced Settings", open=False, visible=False): | |
| with gr.Row(): | |
| seed = gr.Slider( | |
| label="Seed", | |
| minimum=0, | |
| maximum=MAX_SEED, | |
| step=1, | |
| value=0, | |
| ) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
| gr.Markdown("### LoRA Weights") | |
| with gr.Row(): | |
| stage1_weight = gr.Slider( | |
| label="Stage1 LoRA Weight", | |
| minimum=0.0, | |
| maximum=2.0, | |
| step=0.1, | |
| value=STAGE1_WEIGHT_DEFAULT | |
| ) | |
| stage2_weight = gr.Slider( | |
| label="Stage2 LoRA Weight", | |
| minimum=0.0, | |
| maximum=2.0, | |
| step=0.1, | |
| value=STAGE2_WEIGHT_DEFAULT | |
| ) | |
| gr.Markdown("### Generation Settings") | |
| with gr.Row(): | |
| true_guidance_scale = gr.Slider( | |
| label="True guidance scale", | |
| minimum=1.0, | |
| maximum=10.0, | |
| step=0.1, | |
| value=1.0 | |
| ) | |
| num_inference_steps = gr.Slider( | |
| label="Number of inference steps", | |
| minimum=1, | |
| maximum=40, | |
| step=1, | |
| value=4, | |
| ) | |
| with gr.Row(): | |
| height = gr.Slider( | |
| label="Height", | |
| minimum=256, | |
| maximum=2048, | |
| step=8, | |
| value=None, | |
| ) | |
| width = gr.Slider( | |
| label="Width", | |
| minimum=256, | |
| maximum=2048, | |
| step=8, | |
| value=None, | |
| ) | |
| # Chain two inference stages using .then() | |
| stage2_event = run_button.click( | |
| fn=infer_stage2, | |
| inputs=[ | |
| input_image, | |
| seed, | |
| randomize_seed, | |
| true_guidance_scale, | |
| num_inference_steps, | |
| height, | |
| width, | |
| ], | |
| outputs=[stage2_result, state_image, state_seed, state_guidance, state_steps, state_height, state_width], | |
| ) | |
| stage2_event.then( | |
| fn=infer_combined, | |
| inputs=[ | |
| state_image, | |
| state_seed, | |
| state_guidance, | |
| state_steps, | |
| state_height, | |
| state_width, | |
| stage1_weight, | |
| stage2_weight, | |
| ], | |
| outputs=[result], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch(mcp_server=True, css=css) | |