Spaces:
Running
Running
| """FLUX.2 Klein 4B - Free CPU Space with dynamic LoRA search from HuggingFace Hub""" | |
| import os, time, gc, shutil | |
| from pathlib import Path | |
| from PIL import Image | |
| import requests as req | |
| # --------------------------------------------------------------------------- | |
| # Thread config (cgroup-aware) | |
| # --------------------------------------------------------------------------- | |
| def get_cpu_count() -> int: | |
| try: | |
| with open("/sys/fs/cgroup/cpu.max") as f: | |
| q, p = f.read().strip().split() | |
| if q != "max": return max(1, int(q) // int(p)) | |
| except Exception: pass | |
| try: | |
| with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as f: q = int(f.read().strip()) | |
| with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as f: p = int(f.read().strip()) | |
| if q > 0: return max(1, q // p) | |
| except Exception: pass | |
| return max(1, os.cpu_count() or 2) | |
| N_THREADS = get_cpu_count() | |
| for k in ["OMP_NUM_THREADS", "OPENBLAS_NUM_THREADS", "MKL_NUM_THREADS"]: | |
| os.environ.setdefault(k, str(N_THREADS)) | |
| print(f"[init] CPU threads: {N_THREADS}") | |
| # --------------------------------------------------------------------------- | |
| # Model resolution | |
| # --------------------------------------------------------------------------- | |
| HF_CACHE = Path(os.environ.get("HF_HOME", Path.home() / ".cache" / "huggingface" / "hub")) | |
| def find_model(filename: str) -> str: | |
| for d in [Path("."), Path("models")]: | |
| if (d / filename).exists(): return str(d / filename) | |
| for p in HF_CACHE.rglob(filename): return str(p) | |
| raise FileNotFoundError(f"Not found: {filename}") | |
| # --------------------------------------------------------------------------- | |
| # Load base models | |
| # --------------------------------------------------------------------------- | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from stable_diffusion_cpp import StableDiffusion | |
| DIFFUSION_FILE = "flux-2-klein-4b-Q4_K_M.gguf" | |
| LLM_FILE = "qwen3-4b-abl-q4_0.gguf" | |
| VAE_FILE = "flux2-vae.safetensors" | |
| print("[init] Locating models...") | |
| diffusion_path = find_model(DIFFUSION_FILE) | |
| vae_path = find_model(VAE_FILE) | |
| try: | |
| llm_path = find_model(LLM_FILE) | |
| except FileNotFoundError: | |
| print("[init] Downloading uncensored text encoder...") | |
| llm_path = hf_hub_download( | |
| repo_id="WeReCooking/flux2-klein-4B-uncensored-text-encoder", | |
| filename=LLM_FILE, | |
| ) | |
| print(f"[init] Diffusion: {diffusion_path}") | |
| print(f"[init] LLM: {llm_path}") | |
| print(f"[init] VAE: {vae_path}") | |
| # --------------------------------------------------------------------------- | |
| # LoRA management | |
| # --------------------------------------------------------------------------- | |
| LORA_DIR = "/tmp/loras" | |
| os.makedirs(LORA_DIR, exist_ok=True) | |
| DOWNLOADED_LORAS: dict[str, str] = {} | |
| def fetch_all_loras(query: str = "") -> list[str]: | |
| search = f"klein 4b {query}".strip() | |
| try: | |
| r = req.get("https://huggingface.co/api/models", params={ | |
| "search": search, "filter": "lora", | |
| "sort": "downloads", "direction": "-1", "limit": 50, | |
| }, timeout=10) | |
| r.raise_for_status() | |
| results = [] | |
| for m in r.json(): | |
| mid = m.get("id", "") | |
| tags = m.get("tags", []) | |
| if "lora" in tags or "lora" in mid.lower(): | |
| results.append(mid) | |
| return results if results else [] | |
| except Exception as e: | |
| print(f"[lora] Search error: {e}") | |
| return [] | |
| def download_lora(repo_id: str) -> tuple[str, str]: | |
| if not repo_id or repo_id.startswith("("): | |
| return "", "Select a LoRA first" | |
| try: | |
| files = list_repo_files(repo_id) | |
| sf_files = [f for f in files if f.endswith(".safetensors")] | |
| if not sf_files: | |
| return "", f"No .safetensors found in {repo_id}" | |
| target = sf_files[0] | |
| for f in sf_files: | |
| if "lora" in f.lower() or "adapter" in f.lower(): | |
| target = f | |
| break | |
| label = f"{repo_id}/{target}" | |
| lora_name = label.replace("/", "_").replace("-", "_").replace(".", "_") | |
| lora_name = lora_name.rsplit("_safetensors", 1)[0] | |
| lora_dst = os.path.join(LORA_DIR, f"{lora_name}.safetensors") | |
| if label in DOWNLOADED_LORAS: | |
| size_mb = os.path.getsize(lora_dst) / 1024**2 | |
| return label, f"Already cached ({size_mb:.0f} MB)" | |
| print(f"[lora] Downloading {repo_id}/{target}...") | |
| src = hf_hub_download(repo_id=repo_id, filename=target) | |
| shutil.copy2(src, lora_dst) | |
| size_mb = os.path.getsize(lora_dst) / 1024**2 | |
| DOWNLOADED_LORAS[label] = lora_name | |
| print(f"[lora] Downloaded: {label} ({size_mb:.0f} MB)") | |
| return label, f"Downloaded: {label} ({size_mb:.0f} MB)" | |
| except Exception as e: | |
| return "", f"Failed: {e}" | |
| # --------------------------------------------------------------------------- | |
| # Engine | |
| # --------------------------------------------------------------------------- | |
| SD_ENGINE = {"instance": None, "lora_state": None} | |
| def _reload_engine(): | |
| lora_files = set(os.listdir(LORA_DIR)) if os.path.exists(LORA_DIR) else set() | |
| state_key = frozenset(lora_files) | |
| if SD_ENGINE["instance"] is not None and SD_ENGINE["lora_state"] == state_key: | |
| return | |
| print(f"[engine] Loading (loras: {len(lora_files)})...") | |
| t0 = time.time() | |
| kwargs = dict( | |
| diffusion_model_path=diffusion_path, llm_path=llm_path, vae_path=vae_path, | |
| diffusion_flash_attn=True, n_threads=N_THREADS, verbose=True, | |
| ) | |
| if lora_files: | |
| kwargs["lora_model_dir"] = LORA_DIR | |
| SD_ENGINE["instance"] = StableDiffusion(**kwargs) | |
| SD_ENGINE["lora_state"] = state_key | |
| print(f"[engine] Loaded in {time.time()-t0:.1f}s") | |
| def get_engine(): | |
| if SD_ENGINE["instance"] is None: | |
| _reload_engine() | |
| return SD_ENGINE["instance"] | |
| _reload_engine() | |
| print("[init] Fetching Klein 4B LoRA catalog...") | |
| INITIAL_LORAS = fetch_all_loras("") | |
| print(f"[init] Found {len(INITIAL_LORAS)} LoRAs") | |
| # --------------------------------------------------------------------------- | |
| # Inference | |
| # --------------------------------------------------------------------------- | |
| RESOLUTIONS = ["512x512", "768x768", "1024x1024", "1024x768", "768x1024", "1024x576", "576x1024"] | |
| def parse_res(s): | |
| w, h = s.split("x") | |
| return int(w), int(h) | |
| def generate(prompt, ref_image, resolution, steps, seed, lora_strength, active_loras, progress=None): | |
| try: | |
| gc.collect() | |
| sd = get_engine() | |
| w, h = parse_res(resolution) | |
| steps, seed = int(steps), int(seed) if int(seed) >= 0 else -1 | |
| actual_prompt = prompt | |
| lora_tags = [] | |
| if active_loras: | |
| for label in active_loras: | |
| lora_name = DOWNLOADED_LORAS.get(label) | |
| if lora_name: | |
| actual_prompt = f'<lora:{lora_name}:{lora_strength:.2f}> {actual_prompt}' | |
| lora_tags.append(label.split("/")[-1]) | |
| is_edit = ref_image is not None | |
| mode = "edit" if is_edit else "gen" | |
| print(f"[{mode}] {w}x{h} steps={steps} seed={seed} loras={lora_tags}") | |
| t0 = time.time() | |
| kwargs = dict(prompt=actual_prompt, width=w, height=h, sample_steps=steps, cfg_scale=1.0, seed=seed) | |
| if is_edit: | |
| kwargs["ref_images"] = [ref_image] | |
| images = sd.generate_image(**kwargs) | |
| elapsed = time.time() - t0 | |
| lora_info = f" +{len(lora_tags)} LoRA(s)" if lora_tags else "" | |
| edit_info = " [edit]" if is_edit else "" | |
| status = f"{elapsed:.1f}s | {w}x{h}, {steps} steps, seed {seed}{lora_info}{edit_info}" | |
| print(f"[{mode}] {status}") | |
| return (images[0] if images else None), status | |
| except Exception as e: | |
| import traceback; traceback.print_exc() | |
| return None, f"Error: {e}" | |
| # --------------------------------------------------------------------------- | |
| # Gradio UI | |
| # --------------------------------------------------------------------------- | |
| import gradio as gr | |
| with gr.Blocks(theme="NoCrypt/miku", title="FLUX.2 Klein 4B CPU") as demo: | |
| gr.Markdown( | |
| "# FLUX.2 Klein 4B / Free CPU\n" | |
| "Type a prompt to generate. Upload a reference image to edit it instead. " | |
| "Expect **15-30 min** per image at 512x512 on free CPU." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| prompt = gr.Textbox(label="Prompt", lines=3, placeholder="Describe what to generate or edit...") | |
| ref_image = gr.Image(label="Reference Image (optional, for editing)", type="pil") | |
| resolution = gr.Dropdown(choices=RESOLUTIONS, value="512x512", label="Resolution") | |
| with gr.Row(): | |
| steps = gr.Slider(2, 8, value=4, step=1, label="Steps", scale=1) | |
| seed = gr.Number(value=-1, label="Seed", precision=0, scale=1) | |
| lora_strength = gr.Slider(0.1, 1.5, value=0.8, step=0.05, label="LoRA str", scale=1) | |
| with gr.Accordion("LoRA (search Klein 4B LoRAs on HuggingFace)", open=False): | |
| lora_search = gr.Dropdown( | |
| choices=INITIAL_LORAS, value=None, | |
| label="Search LoRA repos (type to filter, select to download)", | |
| filterable=True, allow_custom_value=True, interactive=True, | |
| ) | |
| lora_status = gr.Textbox(label="Status", interactive=False, value="No LoRA active") | |
| active_loras = gr.Dropdown( | |
| choices=[], value=[], multiselect=True, interactive=True, | |
| label="Active LoRAs (click X to remove)", | |
| ) | |
| gen_btn = gr.Button("Generate / Edit", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| output_image = gr.Image(label="Output", type="pil") | |
| status_text = gr.Textbox(label="Status", interactive=False) | |
| def on_search_type(query): | |
| if not query or query in INITIAL_LORAS: | |
| return gr.update(choices=INITIAL_LORAS) | |
| results = fetch_all_loras(query) | |
| return gr.update(choices=results if results else INITIAL_LORAS) | |
| def on_lora_select(repo_id, current_active): | |
| if not repo_id or repo_id.startswith("("): | |
| return current_active or [], "Select a LoRA", gr.update() | |
| label, status_msg = download_lora(repo_id) | |
| if not label: | |
| return current_active or [], status_msg, gr.update() | |
| _reload_engine() | |
| active = list(current_active) if current_active else [] | |
| if label not in active: | |
| active.append(label) | |
| all_downloaded = list(DOWNLOADED_LORAS.keys()) | |
| return gr.update(choices=all_downloaded, value=active), status_msg, gr.update(value=None) | |
| lora_search.input(fn=on_search_type, inputs=[lora_search], outputs=[lora_search]) | |
| lora_search.select(fn=on_lora_select, inputs=[lora_search, active_loras], outputs=[active_loras, lora_status, lora_search]) | |
| gen_btn.click(fn=generate, inputs=[prompt, ref_image, resolution, steps, seed, lora_strength, active_loras], outputs=[output_image, status_text]) | |
| gr.Markdown("---\nsd.cpp Q4_K_M | Uncensored encoder | " | |
| "[BFL](https://bfl.ai/models/flux-2-klein) | [sd.cpp](https://github.com/leejet/stable-diffusion.cpp) | " | |
| "[Browse LoRAs](https://huggingface.co/models?search=klein+4b&filter=lora)") | |
| demo.queue().launch(ssr_mode=False, show_error=True) | |