Spaces:
Sleeping
Sleeping
| import os | |
| import gc | |
| import time | |
| import random | |
| import torch | |
| import gradio as gr | |
| # ===================================================== | |
| # π₯ EXTREME CPU + RAM CONTROL - ULTIMATE OPTIMIZATION | |
| # ===================================================== | |
| CPU_THREADS = 1 # Minimum safe value for HF Spaces | |
| MAX_RESOLUTION = 512 | |
| MAX_STEPS = 4 | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" | |
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0" | |
| os.environ["OMP_NUM_THREADS"] = str(CPU_THREADS) | |
| os.environ["MKL_NUM_THREADS"] = str(CPU_THREADS) | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["TRANSFORMERS_CACHE"] = "./hf_cache" | |
| os.environ["HF_DATASETS_CACHE"] = "./hf_cache" | |
| torch.set_num_threads(CPU_THREADS) | |
| torch.set_grad_enabled(False) | |
| torch.set_float32_matmul_precision('lowest') | |
| DEVICE = "cpu" | |
| DTYPE = torch.float16 # CRITICAL: Use float16 to save 50% memory | |
| CACHE_DIR = "./hf_cache" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| print("β‘ Z-Image Turbo ULTRA CPU - EXTREME MODE (HF Spaces 16GB)") | |
| # ===================================================== | |
| # π¦ MINIMAL IMPORTS | |
| # ===================================================== | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| from diffusers import ( | |
| ZImagePipeline, | |
| ZImageTransformer2DModel, | |
| GGUFQuantizationConfig, | |
| AutoencoderKL, | |
| FlowMatchEulerDiscreteScheduler | |
| ) | |
| from transformers import ( | |
| AutoTokenizer, | |
| CLIPTextModel, | |
| BertModel, | |
| BertTokenizer | |
| ) | |
| except ImportError as e: | |
| print(f"β οΈ Import error (models may not load): {e}") | |
| # ===================================================== | |
| # π§ GLOBAL PIPELINE STATE (Lazy Loading) | |
| # ===================================================== | |
| pipe = None | |
| _pipe_lock = False | |
| # ===================================================== | |
| # π― LIGHTWEIGHT TEXT ENCODER LOADER | |
| # ===================================================== | |
| def load_text_encoder_lightweight(): | |
| """Load absolute minimum text encoder""" | |
| print("π Loading lightweight text encoder...") | |
| try: | |
| # Try tiny CLIP first | |
| from transformers import CLIPTokenizer, CLIPTextModel | |
| tokenizer = CLIPTokenizer.from_pretrained( | |
| "openai/clip-vit-base-patch32", | |
| cache_dir=CACHE_DIR, | |
| local_files_only=False | |
| ) | |
| text_encoder = CLIPTextModel.from_pretrained( | |
| "openai/clip-vit-base-patch32", | |
| torch_dtype=DTYPE, | |
| low_cpu_mem_usage=True, | |
| cache_dir=CACHE_DIR, | |
| local_files_only=False | |
| ) | |
| return tokenizer, text_encoder | |
| except Exception as e: | |
| print(f"β οΈ CLIP failed: {e}, using fallback...") | |
| # Fallback: Use BERT-tiny (much smaller) | |
| from transformers import AutoTokenizer, AutoModel | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| "prajjwal1/bert-tiny", | |
| cache_dir=CACHE_DIR | |
| ) | |
| text_encoder = AutoModel.from_pretrained( | |
| "prajjwal1/bert-tiny", | |
| torch_dtype=DTYPE, | |
| low_cpu_mem_usage=True, | |
| cache_dir=CACHE_DIR | |
| ) | |
| return tokenizer, text_encoder | |
| except Exception as e2: | |
| print(f"β Both encoders failed: {e2}") | |
| raise | |
| # ===================================================== | |
| # π LAZY-LOADED PIPELINE WITH MEMORY CONTROL | |
| # ===================================================== | |
| def load_pipeline(): | |
| """Load pipeline once, keep in memory""" | |
| global pipe, _pipe_lock | |
| if pipe is not None: | |
| return pipe | |
| if _pipe_lock: | |
| raise gr.Error("Pipeline already loading. Please wait...") | |
| _pipe_lock = True | |
| try: | |
| print("β‘ Loading scheduler...") | |
| scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( | |
| "Tongyi-MAI/Z-Image-Turbo", | |
| subfolder="scheduler", | |
| cache_dir=CACHE_DIR, | |
| low_cpu_mem_usage=True | |
| ) | |
| print("β‘ Loading VAE (memory-optimized)...") | |
| vae = AutoencoderKL.from_pretrained( | |
| "Tongyi-MAI/Z-Image-Turbo", | |
| subfolder="vae", | |
| torch_dtype=DTYPE, | |
| low_cpu_mem_usage=True, | |
| cache_dir=CACHE_DIR, | |
| variant="fp16" # Force fp16 variant | |
| ) | |
| print("β‘ Loading text encoder (lightweight)...") | |
| tokenizer, text_encoder = load_text_encoder_lightweight() | |
| print("β‘ Loading transformer (GGUF quantized)...") | |
| gguf_path = hf_hub_download( | |
| repo_id="unsloth/Z-Image-Turbo-GGUF", | |
| filename="z-image-turbo-Q2_K.gguf", | |
| cache_dir=CACHE_DIR, | |
| resume_download=True, | |
| local_files_only=False | |
| ) | |
| transformer = ZImageTransformer2DModel.from_single_file( | |
| gguf_path, | |
| quantization_config=GGUFQuantizationConfig(compute_dtype=DTYPE), | |
| torch_dtype=DTYPE, | |
| low_cpu_mem_usage=True | |
| ) | |
| # Build pipeline | |
| pipe = ZImagePipeline( | |
| vae=vae, | |
| text_encoder=text_encoder, | |
| tokenizer=tokenizer, | |
| transformer=transformer, | |
| scheduler=scheduler | |
| ).to(DEVICE) | |
| # EXTREME memory optimization | |
| pipe.enable_attention_slicing() | |
| pipe.enable_vae_slicing() | |
| pipe.enable_vae_tiling() | |
| pipe.set_progress_bar_config(disable=True) | |
| # Explicitly set to eval mode and disable gradients | |
| pipe.vae.eval() | |
| pipe.text_encoder.eval() | |
| pipe.transformer.eval() | |
| print("β Pipeline loaded successfully") | |
| return pipe | |
| except Exception as e: | |
| print(f"β Pipeline load failed: {e}") | |
| raise gr.Error(f"Failed to load model: {str(e)}") | |
| finally: | |
| _pipe_lock = False | |
| # ===================================================== | |
| # π¨ ULTRA-OPTIMIZED GENERATION | |
| # ===================================================== | |
| def generate(prompt, width, height, steps, seed, progress=gr.Progress()): | |
| """Generate image with aggressive memory management""" | |
| if not prompt or not prompt.strip(): | |
| raise gr.Error("β Prompt is required") | |
| # HARD safety limits for HF Spaces | |
| width = max(256, min(int(width), 512)) | |
| height = max(256, min(int(height), 512)) | |
| steps = max(1, min(int(steps), 4)) | |
| # Reduce to multiple of 64 | |
| width = (width // 64) * 64 | |
| height = (height // 64) * 64 | |
| if seed < 0 or seed == "": | |
| seed = random.randint(0, 2**31 - 1) | |
| else: | |
| seed = int(seed) | |
| # Pre-generation cleanup | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| try: | |
| # Load pipeline on first use | |
| pipe = load_pipeline() | |
| generator = torch.Generator(device=DEVICE).manual_seed(seed) | |
| start_time = time.time() | |
| def callback(step, timestep, latents=None): | |
| elapsed = time.time() - start_time | |
| avg = elapsed / (step + 1) if step > 0 else 0 | |
| remaining = avg * (steps - step - 1) if step < steps - 1 else 0 | |
| progress( | |
| (step + 1) / steps, | |
| desc=f"Step {step+1}/{steps} | ETA: {remaining:.1f}s" | |
| ) | |
| print(f"π¨ Generating {width}x{height} in {steps} steps...") | |
| result = pipe( | |
| prompt=prompt, | |
| negative_prompt=None, | |
| width=width, | |
| height=height, | |
| num_inference_steps=steps, | |
| guidance_scale=1.0, | |
| generator=generator, | |
| callback=callback, | |
| callback_steps=1, | |
| output_type="pil" | |
| ) | |
| image = result.images[0] | |
| # Post-generation cleanup | |
| del result | |
| gc.collect() | |
| return image, seed | |
| except torch.cuda.OutOfMemoryError: | |
| gc.collect() | |
| raise gr.Error("β Out of memory! Try smaller size or fewer steps") | |
| except Exception as e: | |
| gc.collect() | |
| raise gr.Error(f"β Generation error: {str(e)}") | |
| # ===================================================== | |
| # ποΈ MINIMAL GRADIO UI | |
| # ===================================================== | |
| with gr.Blocks(title="Z-Image Turbo CPU") as demo: | |
| gr.Markdown(""" | |
| # β‘ Z-Image Turbo β CPU ULTRA MODE | |
| **HF Spaces Optimized | 16GB RAM | No GPU** | |
| β οΈ Slow generation expected on CPU. Start with 256x256 and low steps. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| placeholder="Describe what you want...", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| width = gr.Slider(256, 512, 256, step=64, label="Width") | |
| height = gr.Slider(256, 512, 256, step=64, label="Height") | |
| with gr.Row(): | |
| steps = gr.Slider(1, 4, 2, step=1, label="Steps") | |
| seed = gr.Number(value=-1, precision=0, label="Seed (-1=random)") | |
| btn = gr.Button("π Generate", variant="primary", scale=2) | |
| with gr.Column(scale=1): | |
| output = gr.Image(label="Output") | |
| used_seed = gr.Number(label="Seed Used", interactive=False) | |
| btn.click( | |
| generate, | |
| inputs=[prompt, width, height, steps, seed], | |
| outputs=[output, used_seed] | |
| ) | |
| gr.Markdown(""" | |
| ### β‘ Performance Tips | |
| - Start with **256x256** resolution | |
| - Use **1-2 steps** for fast results | |
| - Each step takes ~30-60s on CPU | |
| - Results improve with more steps | |
| - Negative seeds auto-randomize | |
| ### πΎ Memory Strategy | |
| - Models loaded on first request only | |
| - Aggressive garbage collection after each run | |
| - float16 reduces memory by 50% | |
| - VAE tiling saves additional ~2GB | |
| """) | |
| demo.queue(concurrency_count=1, max_size=2) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |