# ================= ZeroGPU-Optimized ================= # - Cache ไปที่ /tmp (ล้างทุก Restart) # - Lazy load + LRU (เก็บ pipeline ล่าสุดแค่ 1-2 ตัว) # - ใช้โมเดลเบาเป็นค่าเริ่มต้น (SD 1.5 / SD-Turbo) # - ControlNet เฉพาะ Canny (เล็กและเร็ว) # - ปุ่ม Clear cache ใน UI # - Auto-retry ลดขนาด/steps เมื่อ OOM หรือค้างนาน # ===================================================== import os, io, json, time, gc, shutil from typing import Dict, List, Optional, Tuple from collections import OrderedDict # 1) ส่ง cache ไป /tmp เพื่อลดการสะสมพื้นที่ os.environ["HF_HOME"] = "/tmp/hf" os.environ["HF_HUB_CACHE"] = "/tmp/hf/hub" os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf/transformers" os.environ["DIFFUSERS_CACHE"] = "/tmp/hf/diffusers" import gradio as gr import numpy as np from PIL import Image, ImageDraw import torch from diffusers import ( StableDiffusionPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInpaintPipelineLegacy, StableDiffusionControlNetPipeline, ControlNetModel, DPMSolverMultistepScheduler, EulerDiscreteScheduler, EulerAncestralDiscreteScheduler, HeunDiscreteScheduler, ) # ---------- Optional (ไม่มีก็ข้าม) ---------- try: from rembg import remove as rembg_remove except Exception: rembg_remove = None # ---------- Runtime ---------- DEVICE = "cuda" if torch.cuda.is_available() else "cpu" DTYPE = torch.float16 if DEVICE == "cuda" else torch.float32 # ---------- Model Registry (เบา/เร็ว เหมาะ ZeroGPU) ---------- # *SDXL ถูกตัดออกจากค่าเริ่มต้น (โหลดหนัก) แต่ยังรองรับถ้ากรอก Custom ID เอง MODELS_TXT = [ ("runwayml/stable-diffusion-v1-5", "SD 1.5 (base, fast)"), ("stabilityai/sd-turbo", "SD-Turbo (ultra-fast)"), ("stabilityai/stable-diffusion-2-1", "SD 2.1 (landscape)"), ] MODEL_IMG2IMG_DEFAULT = "runwayml/stable-diffusion-v1-5" MODEL_INPAINT_DEFAULT = "runwayml/stable-diffusion-inpainting" # legacy inpaint (เล็ก/เสถียร) # ControlNet: เอาเฉพาะ Canny (เล็กและพอเพียง) CONTROLNETS = [ ("lllyasviel/sd-controlnet-canny", "Canny (edges)"), ] PRESETS = { "Cinematic": ", cinematic lighting, bokeh, film grain", "Studio": ", studio photo, softbox lighting, sharp focus", "Anime": ", anime style, clean lines, vibrant colors", } NEG_DEFAULT = "lowres, blurry, bad anatomy, extra fingers, watermark, jpeg artifacts, text" SCHEDULERS = { "DPM-Solver (Karras)": DPMSolverMultistepScheduler, "Euler": EulerDiscreteScheduler, "Euler a": EulerAncestralDiscreteScheduler, "Heun": HeunDiscreteScheduler, } # ---------- Caches with LRU ---------- MAX_PIPE_CACHE = 2 PIPE_CACHE: "OrderedDict[str, object]" = OrderedDict() CONTROL_CACHE: Dict[str, ControlNetModel] = {} def _lru_put(key, pipe): PIPE_CACHE[key] = pipe PIPE_CACHE.move_to_end(key) while len(PIPE_CACHE) > MAX_PIPE_CACHE: old_key, old_pipe = PIPE_CACHE.popitem(last=False) try: del old_pipe except Exception: pass gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() # ---------- Utils ---------- def set_scheduler(pipe, name: str): cls = SCHEDULERS.get(name, DPMSolverMultistepScheduler) pipe.scheduler = cls.from_config(pipe.scheduler.config) def seed_gen(seed: int): if seed is None or int(seed) < 0: return None g = torch.Generator(device=("cuda" if DEVICE=="cuda" else "cpu")) g.manual_seed(int(seed)) return g def _speed_tweaks(pipe): # ลดหน่วยความจำ/เพิ่มเสถียร try: if DEVICE == "cuda": pipe.enable_xformers_memory_efficient_attention() pipe.enable_vae_tiling() pipe.enable_vae_slicing() else: pipe.enable_sequential_cpu_offload() pipe.enable_attention_slicing() except Exception: pass # ---------- Lazy loaders ---------- def get_controlnet(model_id: str): if model_id in CONTROL_CACHE: return CONTROL_CACHE[model_id] cn = ControlNetModel.from_pretrained(model_id, torch_dtype=DTYPE, use_safetensors=True) cn.to(DEVICE) CONTROL_CACHE[model_id] = cn return cn def get_txt2img_pipe(model_id: str, use_control: bool, control_id: Optional[str]): key = f"t2i|{model_id}|{'cn' if use_control else 'none'}" if key in PIPE_CACHE: PIPE_CACHE.move_to_end(key) return PIPE_CACHE[key] if use_control and control_id: cn = get_controlnet(control_id) pipe = StableDiffusionControlNetPipeline.from_pretrained( model_id, controlnet=cn, torch_dtype=DTYPE, safety_checker=None, feature_extractor=None, use_safetensors=True ) else: pipe = StableDiffusionPipeline.from_pretrained( model_id, torch_dtype=DTYPE, safety_checker=None, feature_extractor=None, use_safetensors=True ) pipe.to(DEVICE) _speed_tweaks(pipe) _lru_put(key, pipe) return pipe def get_img2img_pipe(model_id: str): key = f"i2i|{model_id}" if key in PIPE_CACHE: PIPE_CACHE.move_to_end(key) return PIPE_CACHE[key] pipe = StableDiffusionImg2ImgPipeline.from_pretrained( model_id, torch_dtype=DTYPE, safety_checker=None, feature_extractor=None, use_safetensors=True ).to(DEVICE) _speed_tweaks(pipe) _lru_put(key, pipe) return pipe def get_inpaint_pipe(model_id: str): key = f"inpaint|{model_id}" if key in PIPE_CACHE: PIPE_CACHE.move_to_end(key) return PIPE_CACHE[key] pipe = StableDiffusionInpaintPipelineLegacy.from_pretrained( model_id, torch_dtype=DTYPE, safety_checker=None, feature_extractor=None, use_safetensors=True ).to(DEVICE) _speed_tweaks(pipe) _lru_put(key, pipe) return pipe # ---------- Post process ---------- def remove_bg(img: Image.Image) -> Image.Image: if rembg_remove is None: return img try: return Image.open(io.BytesIO(rembg_remove(np.array(img)))) except Exception: return img # ---------- Auto-retry wrapper ---------- def run_with_retry(func, *, width: int, height: int, steps: int, max_time: float = 280.0): """ลองรันด้วยพารามิเตอร์เดิม → ถ้า OOM/Timeout จะลดขนาดภาพ/จำนวนสเต็ปแล้วรันซ้ำ""" t0 = time.time() w, h, s = width, height, steps for attempt in range(3): try: if time.time() - t0 > max_time: raise gr.Error("งานนานเกินกำหนด โปรดลองลดขนาดภาพหรือจำนวนสเต็ป") return func(w, h, s) except RuntimeError as e: msg = str(e).lower() if "out of memory" in msg or "cuda oom" in msg or "alloc" in msg: # ลดขนาดครึ่งหนึ่ง และลดสเต็ปเล็กน้อย w = max(384, int(w * 0.75) // 64 * 64) h = max(384, int(h * 0.75) // 64 * 64) s = max(10, s - 4) gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() continue raise raise gr.Error("หน่วยความจำไม่พอ แม้จะลดขนาดแล้ว — ลองลดพารามิเตอร์เพิ่มเติม") # ---------- Generators ---------- def txt2img( model_id, custom_model, prompt, preset, negative, steps, cfg, width, height, scheduler, seed, use_control, control_choice, control_image, do_rembg ): if not prompt or not str(prompt).strip(): raise gr.Error("กรุณากรอก prompt") model = (custom_model.strip() or model_id or MODELS_TXT[0][0]).strip() if preset and preset in PRESETS: prompt = prompt + PRESETS[preset] if not negative or not str(negative).strip(): negative = NEG_DEFAULT width, height = int(width), int(height) use_control = bool(use_control and control_choice and control_image is not None) def _run(w, h, s): pipe = get_txt2img_pipe(model, use_control, CONTROLNETS[0][0] if use_control else None) set_scheduler(pipe, scheduler) gen = seed_gen(seed) if use_control: image = pipe( prompt=prompt, negative_prompt=negative, image=control_image, width=w, height=h, num_inference_steps=int(s), guidance_scale=float(cfg), generator=gen ).images[0] else: image = pipe( prompt=prompt, negative_prompt=negative, width=w, height=h, num_inference_steps=int(s), guidance_scale=float(cfg), generator=gen ).images[0] if do_rembg: image = remove_bg(image) meta = { "mode":"txt2img","model":model,"control":("canny" if use_control else None), "prompt":prompt,"neg":negative,"size":f"{w}x{h}", "steps":int(s),"cfg":float(cfg),"scheduler":scheduler,"seed":seed } return image, json.dumps(meta, ensure_ascii=False, indent=2) return run_with_retry(_run, width=width, height=height, steps=int(steps)) def img2img( model_id, custom_model, init_img, strength, prompt, preset, negative, steps, cfg, width, height, scheduler, seed, do_rembg ): if init_img is None: raise gr.Error("โปรดอัปโหลดภาพเริ่มต้น") model = (custom_model.strip() or model_id or MODEL_IMG2IMG_DEFAULT).strip() if preset and preset in PRESETS: prompt = prompt + PRESETS[preset] if not negative or not str(negative).strip(): negative = NEG_DEFAULT width, height = int(width), int(height) def _run(w, h, s): pipe = get_img2img_pipe(model) set_scheduler(pipe, scheduler) gen = seed_gen(seed) image = pipe( prompt=prompt, negative_prompt=negative, image=init_img, strength=float(strength), num_inference_steps=int(s), guidance_scale=float(cfg), generator=gen ).images[0] if do_rembg: image = remove_bg(image) meta = {"mode":"img2img","model":model,"prompt":prompt,"neg":negative, "steps":int(s),"cfg":float(cfg),"seed":seed,"strength":float(strength)} return image, json.dumps(meta, ensure_ascii=False, indent=2) return run_with_retry(_run, width=width, height=height, steps=int(steps)) def expand_canvas_for_outpaint(img: Image.Image, expand_px: int, direction: str) -> Tuple[Image.Image, Image.Image]: w, h = img.size if direction == "left": new = Image.new("RGBA",(w+expand_px,h),(0,0,0,0)); new.paste(img,(expand_px,0)) mask = Image.new("L",(w+expand_px,h),0); ImageDraw.Draw(mask).rectangle([0,0,expand_px,h], fill=255) elif direction == "right": new = Image.new("RGBA",(w+expand_px,h),(0,0,0,0)); new.paste(img,(0,0)) mask = Image.new("L",(w+expand_px,h),0); ImageDraw.Draw(mask).rectangle([w,0,w+expand_px,h], fill=255) elif direction == "top": new = Image.new("RGBA",(w,h+expand_px),(0,0,0,0)); new.paste(img,(0,expand_px)) mask = Image.new("L",(w,h+expand_px),0); ImageDraw.Draw(mask).rectangle([0,0,w,expand_px], fill=255) else: new = Image.new("RGBA",(w,h+expand_px),(0,0,0,0)); new.paste(img,(0,0)) mask = Image.new("L",(w,h+expand_px),0); ImageDraw.Draw(mask).rectangle([0,h,w,h+expand_px], fill=255) return new.convert("RGB"), mask def inpaint_outpaint( model_id, custom_model, base_img, mask_img, mode, expand_px, expand_dir, prompt, preset, negative, steps, cfg, width, height, scheduler, seed, strength, do_rembg ): if base_img is None: raise gr.Error("โปรดอัปโหลดภาพฐาน") model = (custom_model.strip() or model_id or MODEL_INPAINT_DEFAULT).strip() if preset and preset in PRESETS: prompt = prompt + PRESETS[preset] if not negative or not str(negative).strip(): negative = NEG_DEFAULT width, height = int(width), int(height) if mode == "Outpaint": base_img, mask_img = expand_canvas_for_outpaint(base_img, int(expand_px), expand_dir) def _run(w, h, s): pipe = get_inpaint_pipe(model) set_scheduler(pipe, scheduler) gen = seed_gen(seed) image = pipe( prompt=prompt, negative_prompt=negative, image=base_img, mask_image=mask_img, strength=float(strength), num_inference_steps=int(s), guidance_scale=float(cfg), generator=gen ).images[0] if do_rembg: image = remove_bg(image) meta = {"mode":mode,"model":model,"prompt":prompt,"steps":int(s),"cfg":float(cfg),"seed":seed} return image, json.dumps(meta, ensure_ascii=False, indent=2) return run_with_retry(_run, width=width, height=height, steps=int(steps)) # ---------- Clear cache ---------- def clear_runtime_caches(): cache_root = os.environ.get("HF_HOME", "/tmp/hf") try: if os.path.isdir(cache_root): shutil.rmtree(cache_root, ignore_errors=True) except Exception as e: print("[ClearCache] remove cache failed:", e) PIPE_CACHE.clear() CONTROL_CACHE.clear() gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() return "✅ Cache cleared. Pipelines will be reloaded on demand." # ---------- UI ---------- def build_ui(): with gr.Blocks(theme=gr.themes.Soft(), title="ZeroGPU SD Studio") as demo: gr.Markdown("## 🖼️ ZeroGPU SD Studio — เบา เร็ว เสถียร (CPU/ZeroGPU)") with gr.Row(): model_dd = gr.Dropdown([m[0] for m in MODELS_TXT], value=MODELS_TXT[0][0], label="Base model") model_custom = gr.Textbox(label="Custom model ID (optional)", placeholder="เช่น stabilityai/stable-diffusion-xl-base-1.0 (หนัก)") preset = gr.Dropdown(list(PRESETS.keys()), value=None, label="Style Preset") negative = gr.Textbox(value=NEG_DEFAULT, label="Negative Prompt") with gr.Row(): steps = gr.Slider(10, 40, 18, 1, label="Steps (แนะนำ ≤20 บน ZeroGPU)") cfg = gr.Slider(1.0, 12.0, 6.5, 0.1, label="CFG") with gr.Row(): width = gr.Slider(384, 768, 512, 64, label="Width") height = gr.Slider(384, 768, 512, 64, label="Height") scheduler = gr.Dropdown(list(SCHEDULERS.keys()), value="DPM-Solver (Karras)", label="Scheduler") seed = gr.Number(value=-1, precision=0, label="Seed (-1=random)") # ControlNet (Canny เท่านั้น) with gr.Accordion("ControlNet (Canny)", open=False): use_control = gr.Checkbox(False, label="Enable Canny ControlNet") control_choice = gr.Dropdown([CONTROLNETS[0][1]], value=CONTROLNETS[0][1], label="Type") control_image = gr.Image(type="pil", label="Edge image") with gr.Row(): do_rembg = gr.Checkbox(False, label="Remove background (ถ้ามี rembg)") with gr.Tab("Text → Image"): prompt_txt = gr.Textbox(lines=3, label="Prompt") btn_txt = gr.Button("🚀 Generate") out_img_txt = gr.Image(type="pil", label="Result") out_meta_txt = gr.Textbox(label="Metadata", lines=10) with gr.Tab("Image → Image"): init_img = gr.Image(type="pil", label="Init image") strength = gr.Slider(0.1, 1.0, 0.7, 0.05, label="Strength") prompt_i2i = gr.Textbox(lines=3, label="Prompt") btn_i2i = gr.Button("🚀 Img2Img") out_img_i2i = gr.Image(type="pil", label="Result") out_meta_i2i = gr.Textbox(label="Metadata", lines=10) with gr.Tab("Inpaint / Outpaint"): base_img = gr.Image(type="pil", label="Base image") mask_img = gr.Image(type="pil", label="Mask (white = edit)") mode_io = gr.Radio(["Inpaint","Outpaint"], value="Inpaint", label="Mode") expand_px = gr.Slider(64, 512, 192, 64, label="Outpaint pixels") expand_dir = gr.Radio(["left","right","top","bottom"], value="right", label="Outpaint direction") prompt_io = gr.Textbox(lines=3, label="Prompt") btn_io = gr.Button("🚀 Inpaint/Outpaint") out_img_io = gr.Image(type="pil", label="Result") out_meta_io = gr.Textbox(label="Metadata", lines=10) with gr.Row(): btn_clear = gr.Button("🧹 Clear cache (runtime)") msg_clear = gr.Markdown() # Bindings btn_txt.click( fn=txt2img, inputs=[model_dd, model_custom, prompt_txt, preset, negative, steps, cfg, width, height, scheduler, seed, use_control, control_choice, control_image, do_rembg], outputs=[out_img_txt, out_meta_txt], api_name="txt2img" ) btn_i2i.click( fn=img2img, inputs=[model_dd, model_custom, init_img, strength, prompt_i2i, preset, negative, steps, cfg, width, height, scheduler, seed, do_rembg], outputs=[out_img_i2i, out_meta_i2i], api_name="img2img" ) btn_io.click( fn=inpaint_outpaint, inputs=[model_dd, model_custom, base_img, mask_img, mode_io, expand_px, expand_dir, prompt_io, preset, negative, steps, cfg, width, height, scheduler, seed, strength, do_rembg], outputs=[out_img_io, out_meta_io], api_name="inpaint_outpaint" ) btn_clear.click(fn=clear_runtime_caches, outputs=[msg_clear]) gr.Markdown("ℹ️ โหมดนี้ออกแบบมาสำหรับ ZeroGPU/CPU: ถ้าต้องการ SDXL ให้กรอก Custom ID (จะช้าหนักขึ้น)") return demo demo = build_ui() # ลดโอกาส connection หลุดใน ZeroGPU demo.queue(concurrency_count=1, max_size=8) demo.launch(share=False, show_api=False, max_threads=1, prevent_thread_lock=True)