Spaces:
Runtime error
Runtime error
| import os | |
| import subprocess | |
| import zipfile | |
| import urllib.request | |
| import threading | |
| import queue | |
| import uuid | |
| import sys | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download | |
| import gradio as gr | |
| def _pick_cache_root(): | |
| if "CACHE_DIR" in os.environ: | |
| return os.environ["CACHE_DIR"] | |
| return os.path.join(Path.home(), ".cache", "z-imggen") | |
| CACHE_ROOT = _pick_cache_root() | |
| MODELS_DIR = os.path.join(CACHE_ROOT, "models") | |
| BIN_DIR = os.path.join(CACHE_ROOT, "bin") | |
| OUTPUT_DIR = "outputs" | |
| for d in (MODELS_DIR, BIN_DIR, OUTPUT_DIR): | |
| os.makedirs(d, exist_ok=True) | |
| IS_WIN = sys.platform.startswith("win") | |
| SD_URL = ( | |
| "https://github.com/leejet/stable-diffusion.cpp/releases/download/" | |
| "master-564-fd35047/" | |
| + ("sd-master-fd35047-bin-win-cuda.zip" if IS_WIN else "sd-master-fd35047-bin-Linux-Ubuntu-24.04-x86_64.zip") | |
| ) | |
| SD_ZIP = os.path.join(BIN_DIR, "sd.zip") | |
| MODELS = { | |
| "diffusion": { | |
| "path": os.path.join(MODELS_DIR, "z-image-turbo-Q2_K.gguf"), | |
| "repo": "unsloth/Z-Image-Turbo-GGUF", | |
| "file": "z-image-turbo-Q2_K.gguf", | |
| }, | |
| "llm": { | |
| "path": os.path.join(MODELS_DIR, "Qwen3-4B-Instruct-2507-Q4_K_M.gguf"), | |
| "repo": "unsloth/Qwen3-4B-Instruct-2507-GGUF", | |
| "file": "Qwen3-4B-Instruct-2507-Q4_K_M.gguf", | |
| }, | |
| "vae": { | |
| "path": os.path.join(MODELS_DIR, "split_files/vae/ae.safetensors"), | |
| "repo": "Comfy-Org/z_image_turbo", | |
| "file": "split_files/vae/ae.safetensors", | |
| }, | |
| } | |
| DEFAULT_NEG = "blurry, low quality, distorted, watermark, ugly" | |
| N_THREADS = min(8, os.cpu_count() or 4) | |
| _cancel = threading.Event() | |
| def _ts(): | |
| return datetime.now().strftime("%H:%M:%S") | |
| def _download(url, path): | |
| if os.path.exists(path): | |
| return | |
| urllib.request.urlretrieve(url, path) | |
| def setup_sd(): | |
| exe_name = "sd-cli.exe" if IS_WIN else "sd-cli" | |
| for root, _, files in os.walk(BIN_DIR): | |
| if exe_name in files: | |
| return os.path.join(root, exe_name), root | |
| _download(SD_URL, SD_ZIP) | |
| with zipfile.ZipFile(SD_ZIP, "r") as z: | |
| z.extractall(BIN_DIR) | |
| for root, _, files in os.walk(BIN_DIR): | |
| if exe_name in files: | |
| path = os.path.join(root, exe_name) | |
| if not IS_WIN: | |
| os.chmod(path, 0o755) | |
| return path, root | |
| raise RuntimeError("sd-cli not found") | |
| def ensure_models(): | |
| paths = {} | |
| for k, m in MODELS.items(): | |
| if not os.path.exists(m["path"]): | |
| hf_hub_download( | |
| repo_id=m["repo"], | |
| filename=m["file"], | |
| local_dir=MODELS_DIR, | |
| local_dir_use_symlinks=False, | |
| ) | |
| paths[k] = m["path"] | |
| return paths["diffusion"], paths["llm"], paths["vae"] | |
| def generate(prompt, negative, steps, cfg, w, h, sampler, seed): | |
| _cancel.clear() | |
| logs = [] | |
| def log(x): | |
| logs.append(f"[{_ts()}] {x}") | |
| def flush(): | |
| return "\n".join(logs) | |
| if not prompt.strip(): | |
| yield None, "empty prompt", "idle" | |
| return | |
| steps = min(int(steps), 8) | |
| cfg = min(float(cfg), 1.5) | |
| log("setup sd-cli") | |
| yield None, flush(), "setup" | |
| try: | |
| sd, lib = setup_sd() | |
| diffusion, llm, vae = ensure_models() | |
| except Exception as e: | |
| yield None, str(e), "error" | |
| return | |
| out = os.path.join(OUTPUT_DIR, f"{uuid.uuid4().hex}.png") | |
| cmd = [ | |
| sd, | |
| "--diffusion-model", diffusion, | |
| "--vae", vae, | |
| "--llm", llm, | |
| "--cfg-scale", str(cfg), | |
| "--steps", str(steps), | |
| "-W", str(w), | |
| "-H", str(h), | |
| "--threads", str(N_THREADS), | |
| "--sampling-method", sampler, | |
| "--clip-on-cpu", | |
| "--vae-on-cpu", | |
| "--vae-tiling", | |
| "--negative-prompt", negative or DEFAULT_NEG, | |
| "-p", prompt, | |
| "-o", out, | |
| ] | |
| if int(seed) != -1: | |
| cmd += ["--seed", str(seed)] | |
| env = os.environ.copy() | |
| env["LD_LIBRARY_PATH"] = lib + ":" + env.get("LD_LIBRARY_PATH", "") | |
| proc = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| env=env, | |
| ) | |
| q = queue.Queue() | |
| def reader(): | |
| for line in proc.stdout: | |
| q.put(line) | |
| q.put(None) | |
| threading.Thread(target=reader, daemon=True).start() | |
| start = datetime.now() | |
| while True: | |
| if _cancel.is_set(): | |
| proc.kill() | |
| yield None, flush(), "cancelled" | |
| return | |
| try: | |
| item = q.get(timeout=2) | |
| except queue.Empty: | |
| yield None, flush(), "running" | |
| continue | |
| if item is None: | |
| break | |
| log(item.strip()) | |
| yield None, flush(), "running" | |
| proc.wait() | |
| if not os.path.exists(out): | |
| yield None, flush(), "error" | |
| return | |
| yield out, flush(), "done" | |
| def cancel(): | |
| _cancel.set() | |
| return "cancelling" | |
| def ui(): | |
| with gr.Blocks() as app: | |
| p = gr.Textbox(lines=4) | |
| n = gr.Textbox(value=DEFAULT_NEG) | |
| steps = gr.Slider(2, 8, value=8) | |
| cfg = gr.Slider(0.5, 1.5, value=1.0) | |
| w = gr.Slider(256, 1024, value=512, step=64) | |
| h = gr.Slider(256, 1024, value=512, step=64) | |
| sampler = gr.Dropdown(["euler","heun","dpm++2m"], value="euler") | |
| seed = gr.Number(value=-1) | |
| out = gr.Image() | |
| log = gr.Textbox(lines=10) | |
| btn = gr.Button("Generate") | |
| stop = gr.Button("Cancel") | |
| evt = btn.click(generate, [p,n,steps,cfg,w,h,sampler,seed], [out,log,log]) | |
| stop.click(cancel, None, log, cancels=[evt]) | |
| return app | |
| if __name__ == "__main__": | |
| app = ui() | |
| app.queue( | |
| max_size=4 | |
| ).launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| share=False, | |
| show_error=True, | |
| inbrowser=False, | |
| prevent_thread_lock=False, | |
| quiet=False, | |
| ) | |