import os import uuid import numpy as np import random import tempfile import zipfile import threading import spaces import torch import gradio as gr from PIL import Image from diffusers import QwenImageLayeredPipeline from pptx import Presentation LOG_DIR = "/tmp/local" MAX_SEED = np.iinfo(np.int32).max # Optional HF login (works in Spaces if you set HF token as secret env var "hf") from huggingface_hub import login login(token=os.environ.get("hf")) dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" # ---------------------------- # Pipeline singleton (fast path) # ---------------------------- _PIPELINE = None _PIPELINE_LOCK = threading.Lock() def _enable_fast_cuda_settings(): if not torch.cuda.is_available(): return try: torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True torch.backends.cudnn.benchmark = True torch.set_float32_matmul_precision("high") try: torch.backends.cuda.enable_flash_sdp(True) torch.backends.cuda.enable_mem_efficient_sdp(True) torch.backends.cuda.enable_math_sdp(False) except Exception: pass except Exception: pass def get_pipeline(): global _PIPELINE if _PIPELINE is not None: return _PIPELINE with _PIPELINE_LOCK: if _PIPELINE is not None: return _PIPELINE _enable_fast_cuda_settings() pipe = QwenImageLayeredPipeline.from_pretrained( "Qwen/Qwen-Image-Layered", torch_dtype=dtype, ) # Fastest mode: keep weights on GPU if available if device == "cuda": pipe.to("cuda") else: pipe.to("cpu") _PIPELINE = pipe return _PIPELINE def ensure_dirname(path: str): if path and not os.path.exists(path): os.makedirs(path, exist_ok=True) def random_str(length=8): return uuid.uuid4().hex[:length] def imagelist_to_pptx(img_files): with Image.open(img_files[0]) as img: img_width_px, img_height_px = img.size def px_to_emu(px, dpi=96): inch = px / dpi emu = inch * 914400 return int(emu) prs = Presentation() prs.slide_width = px_to_emu(img_width_px) prs.slide_height = px_to_emu(img_height_px) slide = prs.slides.add_slide(prs.slide_layouts[6]) left = top = 0 for img_path in img_files: slide.shapes.add_picture( img_path, left, top, width=px_to_emu(img_width_px), height=px_to_emu(img_height_px), ) with tempfile.NamedTemporaryFile(suffix=".pptx", delete=False) as tmp: prs.save(tmp.name) return tmp.name def _clamp_int(x, default: int, lo: int, hi: int) -> int: try: v = int(x) except Exception: v = default return max(lo, min(hi, v)) def _normalize_rgba(pil: Image.Image) -> Image.Image: if pil.mode != "RGBA": pil = pil.convert("RGB").convert("RGBA") return pil def _history_choices(history): """ history: list[dict] with keys: id, parent, title, layers(list[PIL]), meta(optional) """ choices = [] by_id = {n["id"]: n for n in history} for i, node in enumerate(history): n_layers = len(node.get("layers", []) or []) parent = node.get("parent") depth = 0 pid = parent seen = set() while pid and pid in by_id and pid not in seen: seen.add(pid) depth += 1 pid = by_id[pid].get("parent") prefix = " " * min(depth, 6) label = f"{prefix}{i+1}. {node.get('title','Node')} (layers={n_layers})" choices.append((label, node["id"])) return choices def _find_node(history, node_id): for n in history: if n.get("id") == node_id: return n return None def _layers_to_temp_pngs(layers): temp_files = [] for img in layers: tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False) _normalize_rgba(img).save(tmp.name) temp_files.append(tmp.name) return temp_files def _export_zip_from_layers(layers): temp_files = _layers_to_temp_pngs(layers) with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmpzip: with zipfile.ZipFile(tmpzip.name, "w", zipfile.ZIP_DEFLATED) as zipf: for i, p in enumerate(temp_files): zipf.write(p, f"layer_{i+1}.png") return tmpzip.name def _export_pptx_from_layers(layers): temp_files = _layers_to_temp_pngs(layers) return imagelist_to_pptx(temp_files) # ---------------------------- # ZeroGPU duration helper # ---------------------------- def get_duration( input_image=None, seed=777, randomize_seed=False, prompt=None, neg_prompt=" ", true_guidance_scale=4.0, num_inference_steps=50, layer=4, cfg_norm=True, use_en_prompt=True, resolution=640, gpu_duration=1000, **kwargs, ): return _clamp_int(gpu_duration, default=1000, lo=20, hi=1500) # ---------------------------- # GPU ops # ---------------------------- @spaces.GPU(duration=get_duration) def run_decompose_gpu( input_image, seed=777, randomize_seed=False, prompt=None, neg_prompt=" ", true_guidance_scale=4.0, num_inference_steps=50, layer=4, cfg_norm=True, use_en_prompt=True, resolution=640, gpu_duration=1000, ): if randomize_seed: seed = random.randint(0, MAX_SEED) resolution = _clamp_int(resolution, default=640, lo=640, hi=1024) if resolution not in (640, 1024): resolution = 640 if isinstance(input_image, list): input_image = input_image[0] if isinstance(input_image, str): pil_image = Image.open(input_image) elif isinstance(input_image, Image.Image): pil_image = input_image elif isinstance(input_image, np.ndarray): pil_image = Image.fromarray(input_image) else: raise ValueError(f"Unsupported input_image type: {type(input_image)}") pil_image = _normalize_rgba(pil_image) pipe = get_pipeline() gen_device = "cuda" if torch.cuda.is_available() else "cpu" generator = torch.Generator(device=gen_device).manual_seed(int(seed)) inputs = { "image": pil_image, "generator": generator, "true_cfg_scale": true_guidance_scale, "prompt": prompt, "negative_prompt": neg_prompt, "num_inference_steps": num_inference_steps, "num_images_per_prompt": 1, "layers": layer, "resolution": resolution, "cfg_normalize": cfg_norm, "use_en_prompt": use_en_prompt, } with torch.inference_mode(): if torch.cuda.is_available(): with torch.autocast("cuda", dtype=torch.bfloat16): out = pipe(**inputs) else: out = pipe(**inputs) layers_out = out.images[0] layers_out = [_normalize_rgba(x) for x in layers_out] return layers_out @spaces.GPU(duration=get_duration) def run_refine_gpu( base_layers, selected_index: int, seed=777, randomize_seed=False, prompt=None, neg_prompt=" ", true_guidance_scale=4.0, num_inference_steps=50, sub_layers=3, cfg_norm=True, use_en_prompt=True, resolution=640, gpu_duration=1000, ): if not base_layers or not isinstance(base_layers, list): raise ValueError("No base layers to refine. Run Decompose first.") if randomize_seed: seed = random.randint(0, MAX_SEED) resolution = _clamp_int(resolution, default=640, lo=640, hi=1024) if resolution not in (640, 1024): resolution = 640 sub_layers = _clamp_int(sub_layers, default=3, lo=2, hi=10) idx = _clamp_int(selected_index, default=0, lo=0, hi=len(base_layers) - 1) selected_layer = _normalize_rgba(base_layers[idx]) pipe = get_pipeline() gen_device = "cuda" if torch.cuda.is_available() else "cpu" generator = torch.Generator(device=gen_device).manual_seed(int(seed)) inputs = { "image": selected_layer, "generator": generator, "true_cfg_scale": true_guidance_scale, "prompt": prompt, "negative_prompt": neg_prompt, "num_inference_steps": num_inference_steps, "num_images_per_prompt": 1, "layers": sub_layers, "resolution": resolution, "cfg_normalize": cfg_norm, "use_en_prompt": use_en_prompt, } with torch.inference_mode(): if torch.cuda.is_available(): with torch.autocast("cuda", dtype=torch.bfloat16): out = pipe(**inputs) else: out = pipe(**inputs) refined = out.images[0] refined = [_normalize_rgba(x) for x in refined] return refined # ---------------------------- # Gradio glue (history + UX) # ---------------------------- def _init_state(): return { "history": [], "active_node_id": None, "selected_layer_idx": 0, } def _set_active_node(state, node_id): state["active_node_id"] = node_id state["selected_layer_idx"] = 0 return state def _node_layers_and_picker_updates(node): layers_out = node.get("layers") or [] layer_choices = [(f"Layer {i+1}", i) for i in range(len(layers_out))] return layers_out, layer_choices def on_decompose_click( input_image, seed, randomize_seed, prompt, neg_prompt, true_guidance_scale, num_inference_steps, layer, cfg_norm, use_en_prompt, resolution, gpu_duration, state, ): if state is None: state = _init_state() layers_out = run_decompose_gpu( input_image=input_image, seed=seed, randomize_seed=randomize_seed, prompt=prompt, neg_prompt=neg_prompt, true_guidance_scale=true_guidance_scale, num_inference_steps=num_inference_steps, layer=layer, cfg_norm=cfg_norm, use_en_prompt=use_en_prompt, resolution=resolution, gpu_duration=gpu_duration, ) node_id = random_str(10) node = { "id": node_id, "parent": None, "title": "Decompose", "layers": layers_out, "meta": {"type": "decompose"}, } state["history"].append(node) _set_active_node(state, node_id) choices = _history_choices(state["history"]) _, layer_choices = _node_layers_and_picker_updates(node) return ( state, choices, node_id, # selected history node layers_out, # base gallery layers_out, # picker gallery layer_choices, # dropdown choices 0, # dropdown selected index gr.Accordion.update(open=False), [], # refined gallery cleared node.get("title", ""), ) def on_history_change(node_id, state): if state is None: state = _init_state() node = _find_node(state["history"], node_id) if not node: return ( state, [], [], [], 0, gr.Accordion.update(open=False), [], "", ) _set_active_node(state, node_id) layers_out, layer_choices = _node_layers_and_picker_updates(node) return ( state, layers_out, layers_out, layer_choices, 0, gr.Accordion.update(open=False), [], node.get("title", ""), ) def on_picker_select(evt: gr.SelectData, state): if state is None: state = _init_state() idx = int(evt.index) if evt and evt.index is not None else 0 state["selected_layer_idx"] = idx return state, idx def on_layer_dropdown_change(layer_idx, state): if state is None: state = _init_state() try: idx = int(layer_idx) except Exception: idx = 0 state["selected_layer_idx"] = idx return state def _append_refine_node(state, parent_node, selected_idx, sub_layers_value, refined_layers): new_id = random_str(10) new_node = { "id": new_id, "parent": parent_node["id"], "title": f"Refine: Layer {selected_idx+1}", "layers": refined_layers, "meta": { "type": "refine", "refine_from": parent_node["id"], "refine_layer_idx": int(selected_idx), "sub_layers": int(sub_layers_value), }, } state["history"].append(new_node) _set_active_node(state, new_id) return new_node def on_refine_click( seed, randomize_seed, prompt, neg_prompt, true_guidance_scale, num_inference_steps, cfg_norm, use_en_prompt, resolution, gpu_duration, sub_layers, state, history_node_id, layer_dropdown_idx, ): if state is None: state = _init_state() node = _find_node(state["history"], history_node_id) if not node: raise gr.Error("No active node selected. Run Decompose first.") base_layers = node.get("layers") or [] if not base_layers: raise gr.Error("Selected node has no layers to refine.") try: selected_idx = int(layer_dropdown_idx) except Exception: selected_idx = int(state.get("selected_layer_idx", 0) or 0) refined_layers = run_refine_gpu( base_layers=base_layers, selected_index=selected_idx, seed=seed, randomize_seed=randomize_seed, prompt=prompt, neg_prompt=neg_prompt, true_guidance_scale=true_guidance_scale, num_inference_steps=num_inference_steps, sub_layers=sub_layers, cfg_norm=cfg_norm, use_en_prompt=use_en_prompt, resolution=resolution, gpu_duration=gpu_duration, ) new_node = _append_refine_node( state=state, parent_node=node, selected_idx=selected_idx, sub_layers_value=sub_layers, refined_layers=refined_layers, ) choices = _history_choices(state["history"]) _, layer_choices = _node_layers_and_picker_updates(new_node) return ( state, choices, new_node["id"], refined_layers, # base gallery shows refined node refined_layers, # picker shows refined node layer_choices, 0, gr.Accordion.update(open=True), refined_layers, # refined gallery new_node.get("title", ""), ) def on_back_to_parent_click(state, history_node_id): if state is None: state = _init_state() node = _find_node(state["history"], history_node_id) if not node: raise gr.Error("Select a node in History.") parent_id = node.get("parent") if not parent_id: # already root layers_out, layer_choices = _node_layers_and_picker_updates(node) return ( state, history_node_id, layers_out, layers_out, layer_choices, 0, gr.Accordion.update(open=False), [], node.get("title", ""), ) parent = _find_node(state["history"], parent_id) if not parent: raise gr.Error("Parent not found in history (corrupted history).") _set_active_node(state, parent_id) layers_out, layer_choices = _node_layers_and_picker_updates(parent) return ( state, parent_id, layers_out, layers_out, layer_choices, 0, gr.Accordion.update(open=False), [], parent.get("title", ""), ) def on_redo_refine_click( seed, randomize_seed, prompt, neg_prompt, true_guidance_scale, num_inference_steps, cfg_norm, use_en_prompt, resolution, gpu_duration, state, history_node_id, ): if state is None: state = _init_state() node = _find_node(state["history"], history_node_id) if not node: raise gr.Error("Select a node in History.") meta = node.get("meta") or {} if meta.get("type") != "refine": raise gr.Error("Redo refine работает только для refine-узлов (не для Decompose).") parent_id = meta.get("refine_from") or node.get("parent") if not parent_id: raise gr.Error("Refine node has no parent info.") parent = _find_node(state["history"], parent_id) if not parent: raise gr.Error("Parent not found in history.") base_layers = parent.get("layers") or [] if not base_layers: raise gr.Error("Parent node has no layers.") selected_idx = int(meta.get("refine_layer_idx", 0)) sub_layers_value = int(meta.get("sub_layers", 3)) refined_layers = run_refine_gpu( base_layers=base_layers, selected_index=selected_idx, seed=seed, randomize_seed=randomize_seed, prompt=prompt, neg_prompt=neg_prompt, true_guidance_scale=true_guidance_scale, num_inference_steps=num_inference_steps, sub_layers=sub_layers_value, cfg_norm=cfg_norm, use_en_prompt=use_en_prompt, resolution=resolution, gpu_duration=gpu_duration, ) new_node = _append_refine_node( state=state, parent_node=parent, selected_idx=selected_idx, sub_layers_value=sub_layers_value, refined_layers=refined_layers, ) choices = _history_choices(state["history"]) _, layer_choices = _node_layers_and_picker_updates(new_node) return ( state, choices, new_node["id"], refined_layers, refined_layers, layer_choices, 0, gr.Accordion.update(open=True), refined_layers, new_node.get("title", ""), ) def on_rename_node_click(state, history_node_id, new_name): if state is None: state = _init_state() node = _find_node(state["history"], history_node_id) if not node: raise gr.Error("Select a node in History.") new_name = (new_name or "").strip() if not new_name: # no-op choices = _history_choices(state["history"]) return state, choices, history_node_id, node.get("title", "") node["title"] = new_name choices = _history_choices(state["history"]) return state, choices, history_node_id, node.get("title", "") def on_export_click(state, node_id, export_kind: str): if state is None: state = _init_state() node = _find_node(state["history"], node_id) if not node: raise gr.Error("Select a node in History to export.") layers = node.get("layers") or [] if not layers: raise gr.Error("Selected node has no layers to export.") if export_kind == "pptx": return _export_pptx_from_layers(layers) if export_kind == "zip": return _export_zip_from_layers(layers) raise gr.Error("Unknown export kind.") # ---------------------------- # UI # ---------------------------- ensure_dirname(LOG_DIR) examples = [ "assets/test_images/1.png", "assets/test_images/2.png", "assets/test_images/3.png", "assets/test_images/4.png", "assets/test_images/5.png", "assets/test_images/6.png", "assets/test_images/7.png", "assets/test_images/8.png", "assets/test_images/9.png", "assets/test_images/10.png", "assets/test_images/11.png", "assets/test_images/12.png", "assets/test_images/13.png", ] with gr.Blocks() as demo: state = gr.State(_init_state()) with gr.Column(elem_id="col-container"): gr.HTML( '' ) gr.Markdown( """ The text prompt is intended to describe the overall content of the input image—including elements that may be partially occluded (e.g., you may specify the text hidden behind a foreground object). It is not designed to control the semantic content of individual layers explicitly. """ ) with gr.Row(): with gr.Column(scale=1): input_image = gr.Image(label="Input Image", image_mode="RGBA") with gr.Accordion("Advanced Settings", open=False): prompt = gr.Textbox( label="Prompt (Optional)", placeholder="Please enter the prompt to descibe the image. (Optional)", value="", lines=2, ) neg_prompt = gr.Textbox( label="Negative Prompt (Optional)", placeholder="Please enter the negative prompt", value=" ", lines=2, ) seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0) randomize_seed = gr.Checkbox(label="Randomize seed", value=True) true_guidance_scale = gr.Slider( label="True guidance scale", minimum=1.0, maximum=10.0, step=0.1, value=4.0, ) num_inference_steps = gr.Slider( label="Number of inference steps", minimum=1, maximum=100, step=1, value=50, ) layer = gr.Slider( label="Layers", minimum=2, maximum=10, step=1, value=7, ) resolution = gr.Radio( label="Processing resolution", choices=[640, 1024], value=640, ) cfg_norm = gr.Checkbox(label="Whether enable CFG normalization", value=True) use_en_prompt = gr.Checkbox( label="Automatic caption language if no prompt provided, True for EN, False for ZH", value=True, ) gpu_duration = gr.Textbox( label="GPU duration override (seconds, 20..1500)", value="1000", lines=1, placeholder="e.g. 60, 120, 300, 1000, 1500", ) decompose_btn = gr.Button("Decompose!", variant="primary") with gr.Accordion("History", open=True): history_dropdown = gr.Dropdown( label="Nodes", choices=[], value=None, interactive=True, ) with gr.Row(): back_parent_btn = gr.Button("← Back to parent") redo_refine_btn = gr.Button("↺ Redo refine") branch_name = gr.Textbox( label="Branch name", value="", lines=1, placeholder="Rename selected node...", ) rename_btn = gr.Button("Rename selected node") with gr.Row(): export_pptx_btn = gr.Button("Export PPTX (selected node)") export_zip_btn = gr.Button("Export ZIP (selected node)") export_pptx_file = gr.File(label="Download PPTX") export_zip_file = gr.File(label="Download ZIP") with gr.Accordion("Refine layer", open=True): gr.Markdown("Pick a layer visually (like Photoshop), then refine it into sub-layers.") layer_picker = gr.Gallery( label="Layer Picker (click a thumbnail)", columns=8, rows=1, height="auto", format="png", show_label=True, ) layer_idx_dropdown = gr.Dropdown( label="Refine layer index", choices=[], value=0, interactive=True, ) sub_layers = gr.Slider( label="Sub-layers (Refine)", minimum=2, maximum=10, step=1, value=3, ) refine_btn = gr.Button("Refine selected layer", variant="secondary") with gr.Column(scale=2): base_gallery = gr.Gallery(label="Current node layers", columns=4, rows=1, format="png") refined_accordion = gr.Accordion("Refined layers", open=False) with refined_accordion: refined_gallery = gr.Gallery(label="Refined layers output", columns=4, rows=1, format="png") gr.Examples( examples=examples, inputs=[input_image], cache_examples=False, ) # Decompose decompose_btn.click( fn=on_decompose_click, inputs=[ input_image, seed, randomize_seed, prompt, neg_prompt, true_guidance_scale, num_inference_steps, layer, cfg_norm, use_en_prompt, resolution, gpu_duration, state, ], outputs=[ state, history_dropdown, history_dropdown, base_gallery, layer_picker, layer_idx_dropdown, layer_idx_dropdown, refined_accordion, refined_gallery, branch_name, ], ) # History change history_dropdown.change( fn=on_history_change, inputs=[history_dropdown, state], outputs=[ state, base_gallery, layer_picker, layer_idx_dropdown, layer_idx_dropdown, refined_accordion, refined_gallery, branch_name, ], ) # Picker click layer_picker.select( fn=on_picker_select, inputs=[state], outputs=[state, layer_idx_dropdown], ) # Dropdown change -> state sync layer_idx_dropdown.change( fn=on_layer_dropdown_change, inputs=[layer_idx_dropdown, state], outputs=[state], ) # Refine refine_btn.click( fn=on_refine_click, inputs=[ seed, randomize_seed, prompt, neg_prompt, true_guidance_scale, num_inference_steps, cfg_norm, use_en_prompt, resolution, gpu_duration, sub_layers, state, history_dropdown, layer_idx_dropdown, ], outputs=[ state, history_dropdown, history_dropdown, base_gallery, layer_picker, layer_idx_dropdown, layer_idx_dropdown, refined_accordion, refined_gallery, branch_name, ], ) # Back to parent back_parent_btn.click( fn=on_back_to_parent_click, inputs=[state, history_dropdown], outputs=[ state, history_dropdown, base_gallery, layer_picker, layer_idx_dropdown, layer_idx_dropdown, refined_accordion, refined_gallery, branch_name, ], ) # Redo refine (same parent/index/sub_layers as the selected refine node) redo_refine_btn.click( fn=on_redo_refine_click, inputs=[ seed, randomize_seed, prompt, neg_prompt, true_guidance_scale, num_inference_steps, cfg_norm, use_en_prompt, resolution, gpu_duration, state, history_dropdown, ], outputs=[ state, history_dropdown, history_dropdown, base_gallery, layer_picker, layer_idx_dropdown, layer_idx_dropdown, refined_accordion, refined_gallery, branch_name, ], ) # Rename selected node rename_btn.click( fn=on_rename_node_click, inputs=[state, history_dropdown, branch_name], outputs=[state, history_dropdown, history_dropdown, branch_name], ) # Export selected node export_pptx_btn.click( fn=lambda st, node_id: on_export_click(st, node_id, "pptx"), inputs=[state, history_dropdown], outputs=[export_pptx_file], ) export_zip_btn.click( fn=lambda st, node_id: on_export_click(st, node_id, "zip"), inputs=[state, history_dropdown], outputs=[export_zip_file], ) if __name__ == "__main__": demo.launch()