Spaces:
Running
Running
| import os | |
| import random | |
| import threading | |
| import gradio as gr | |
| import torch | |
| from diffusers import Flux2KleinPipeline, Flux2Transformer2DModel, GGUFQuantizationConfig | |
| from huggingface_hub import hf_hub_download | |
| from PIL import Image | |
| GGUF_REPO = "unsloth/FLUX.2-klein-base-4B-GGUF" | |
| BASE_REPO = "black-forest-labs/FLUX.2-klein-base-4B" | |
| GGUF_FILE = os.getenv("GGUF_FILE", "flux-2-klein-base-4b-Q2_K.gguf") | |
| MAX_SIDE = int(os.getenv("MAX_SIDE", "768")) | |
| DEFAULT_STEPS = int(os.getenv("DEFAULT_STEPS", "8")) | |
| DEFAULT_GUIDANCE = float(os.getenv("DEFAULT_GUIDANCE", "3.5")) | |
| _pipe = None | |
| _pipe_error = None | |
| _lock = threading.Lock() | |
| def _prepare_image(img: Image.Image) -> Image.Image: | |
| img = img.convert("RGB") | |
| w, h = img.size | |
| m = max(w, h) | |
| if m > MAX_SIDE: | |
| s = MAX_SIDE / m | |
| w = int(w * s) | |
| h = int(h * s) | |
| w = max(256, (w // 32) * 32) | |
| h = max(256, (h // 32) * 32) | |
| return img.resize((w, h), Image.Resampling.LANCZOS) | |
| def get_pipe() -> Flux2KleinPipeline: | |
| global _pipe, _pipe_error | |
| if _pipe_error is not None: | |
| raise RuntimeError(_pipe_error) | |
| if _pipe is None: | |
| with _lock: | |
| if _pipe is None: | |
| try: | |
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| gguf_path = hf_hub_download(repo_id=GGUF_REPO, filename=GGUF_FILE) | |
| qconfig = GGUFQuantizationConfig(compute_dtype=torch.float32) | |
| transformer = Flux2Transformer2DModel.from_single_file( | |
| gguf_path, | |
| config=BASE_REPO, | |
| subfolder="transformer", | |
| token=hf_token, | |
| quantization_config=qconfig, | |
| torch_dtype=torch.float32, | |
| ) | |
| pipe = Flux2KleinPipeline.from_pretrained( | |
| BASE_REPO, | |
| transformer=transformer, | |
| token=hf_token, | |
| torch_dtype=torch.float32, | |
| ) | |
| pipe = pipe.to("cpu") | |
| pipe.set_progress_bar_config(disable=True) | |
| pipe.enable_attention_slicing() | |
| if hasattr(pipe, "enable_vae_slicing"): | |
| pipe.enable_vae_slicing() | |
| _pipe = pipe | |
| except Exception as e: | |
| _pipe_error = str(e) | |
| raise | |
| return _pipe | |
| def run_edit( | |
| image: Image.Image, | |
| prompt: str, | |
| steps: int = DEFAULT_STEPS, | |
| guidance: float = DEFAULT_GUIDANCE, | |
| seed: int = -1, | |
| ): | |
| if not prompt or not prompt.strip(): | |
| raise gr.Error("Escreve uma instrucao.") | |
| try: | |
| pipe = get_pipe() | |
| except Exception: | |
| raise gr.Error("Falha ao carregar FLUX.2-klein-base-4B-GGUF no CPU Basic.") | |
| src = _prepare_image(image) if image is not None else None | |
| if src is not None: | |
| width, height = src.size | |
| else: | |
| width, height = 768, 768 | |
| if seed < 0: | |
| seed = random.randint(0, 2**31 - 1) | |
| generator = torch.Generator(device="cpu").manual_seed(seed) | |
| try: | |
| out = pipe( | |
| prompt=prompt.strip(), | |
| image=src, | |
| height=height, | |
| width=width, | |
| num_inference_steps=max(1, int(steps if steps is not None else DEFAULT_STEPS)), | |
| guidance_scale=float(guidance if guidance is not None else DEFAULT_GUIDANCE), | |
| generator=generator, | |
| ).images[0] | |
| return out | |
| except Exception: | |
| raise gr.Error("Falha na geracao. Tenta imagem menor e menos steps.") | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# FLUX.2-klein-base-4B-GGUF local (CPU Basic)") | |
| gr.Markdown("Modelo: unsloth/FLUX.2-klein-base-4B-GGUF (Q2_K por default).") | |
| with gr.Row(): | |
| inp = gr.Image(type="pil", label="Imagem (opcional)") | |
| out = gr.Image(type="pil", label="Resultado") | |
| prompt = gr.Textbox(lines=3, label="Instrucao") | |
| with gr.Row(): | |
| steps = gr.Slider(minimum=1, maximum=20, value=DEFAULT_STEPS, step=1, label="Steps") | |
| guidance = gr.Slider(minimum=1.0, maximum=8.0, value=DEFAULT_GUIDANCE, step=0.1, label="Guidance") | |
| seed = gr.Number(value=-1, label="Seed (-1 aleatorio)") | |
| run_btn = gr.Button("Gerar") | |
| run_btn.click(run_edit, inputs=[inp, prompt, steps, guidance, seed], outputs=out) | |
| if __name__ == "__main__": | |
| demo.queue(default_concurrency_limit=1).launch(show_error=True) | |