""" Lyra/Lune Flow-Matching Inference Space Author: AbstractPhil License: MIT SD1.5-based flow matching with geometric crystalline architectures. """ import os import torch import gradio as gr import numpy as np from PIL import Image from typing import Optional, Dict import spaces from diffusers import ( UNet2DConditionModel, AutoencoderKL, DPMSolverMultistepScheduler, EulerDiscreteScheduler ) from transformers import CLIPTextModel, CLIPTokenizer from huggingface_hub import hf_hub_download # ============================================================================ # MODEL LOADING # ============================================================================ class FlowMatchingPipeline: """Custom pipeline for flow-matching inference.""" def __init__( self, vae: AutoencoderKL, text_encoder: CLIPTextModel, tokenizer: CLIPTokenizer, unet: UNet2DConditionModel, scheduler, device: str = "cuda" ): self.vae = vae self.text_encoder = text_encoder self.tokenizer = tokenizer self.unet = unet self.scheduler = scheduler self.device = device # VAE scaling factor self.vae_scale_factor = 0.18215 def encode_prompt(self, prompt: str, negative_prompt: str = ""): """Encode text prompts to embeddings.""" # Positive prompt text_inputs = self.tokenizer( prompt, padding="max_length", max_length=self.tokenizer.model_max_length, truncation=True, return_tensors="pt", ) text_input_ids = text_inputs.input_ids.to(self.device) with torch.no_grad(): prompt_embeds = self.text_encoder(text_input_ids)[0] # Negative prompt if negative_prompt: uncond_inputs = self.tokenizer( negative_prompt, padding="max_length", max_length=self.tokenizer.model_max_length, truncation=True, return_tensors="pt", ) uncond_input_ids = uncond_inputs.input_ids.to(self.device) with torch.no_grad(): negative_prompt_embeds = self.text_encoder(uncond_input_ids)[0] else: negative_prompt_embeds = torch.zeros_like(prompt_embeds) return prompt_embeds, negative_prompt_embeds @torch.no_grad() def __call__( self, prompt: str, negative_prompt: str = "", height: int = 512, width: int = 512, num_inference_steps: int = 20, guidance_scale: float = 7.5, shift: float = 2.5, use_flow_matching: bool = True, prediction_type: str = "epsilon", seed: Optional[int] = None, progress_callback=None ): """Generate image using flow matching or standard diffusion.""" # Set seed if seed is not None: generator = torch.Generator(device=self.device).manual_seed(seed) else: generator = None # Encode prompts prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt, negative_prompt ) # Prepare latents latent_channels = 4 latent_height = height // 8 latent_width = width // 8 latents = torch.randn( (1, latent_channels, latent_height, latent_width), generator=generator, device=self.device, dtype=torch.float32 ) # Set timesteps self.scheduler.set_timesteps(num_inference_steps, device=self.device) timesteps = self.scheduler.timesteps # Scale initial latents by scheduler's init_noise_sigma for standard diffusion # Flow matching uses unscaled latents and custom ODE integration if not use_flow_matching: latents = latents * self.scheduler.init_noise_sigma # Denoising loop for i, t in enumerate(timesteps): if progress_callback: progress_callback(i, num_inference_steps, f"Step {i+1}/{num_inference_steps}") # Expand latents for classifier-free guidance latent_model_input = torch.cat([latents] * 2) if guidance_scale > 1.0 else latents # For standard diffusion, let scheduler handle scaling # For flow matching, apply custom shift-based scaling if use_flow_matching and shift > 0: # Compute sigma from timestep with shift sigma = t.float() / 1000.0 sigma_shifted = (shift * sigma) / (1 + (shift - 1) * sigma) # Scale latent input for flow matching scaling = torch.sqrt(1 + sigma_shifted ** 2) latent_model_input = latent_model_input / scaling else: # For standard diffusion, scale by scheduler latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) # Prepare timestep timestep = t.expand(latent_model_input.shape[0]) # Predict noise/velocity text_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if guidance_scale > 1.0 else prompt_embeds noise_pred = self.unet( latent_model_input, timestep, encoder_hidden_states=text_embeds, return_dict=False )[0] # Classifier-free guidance if guidance_scale > 1.0: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) # Flow matching step if use_flow_matching: # Manual flow matching update sigma = t.float() / 1000.0 sigma_shifted = (shift * sigma) / (1 + (shift - 1) * sigma) if prediction_type == "v_prediction": # Convert v-prediction to epsilon v_pred = noise_pred alpha_t = torch.sqrt(1 - sigma_shifted ** 2) sigma_t = sigma_shifted noise_pred = alpha_t * v_pred + sigma_t * latents # Compute next latent dt = -1.0 / num_inference_steps latents = latents + dt * noise_pred else: # Standard scheduler step latents = self.scheduler.step( noise_pred, t, latents, return_dict=False )[0] # Decode latents with model-specific scaling latents = latents / self.vae_scale_factor # Lune-specific scaling: multiply by 5.52 for Lune's latent space offset # This must be applied ONLY for Lune model, not SD1.5 Base if hasattr(self, 'is_lune_model') and self.is_lune_model: latents = latents * 5.52 with torch.no_grad(): image = self.vae.decode(latents).sample # Convert to PIL image = (image / 2 + 0.5).clamp(0, 1) image = image.cpu().permute(0, 2, 3, 1).float().numpy() image = (image * 255).round().astype("uint8") image = Image.fromarray(image[0]) return image def load_lune_checkpoint(repo_id: str, filename: str, device: str = "cuda"): """Load Lune checkpoint from .pt file.""" print(f"📥 Downloading checkpoint: {repo_id}/{filename}") checkpoint_path = hf_hub_download( repo_id=repo_id, filename=filename, repo_type="model" ) print(f"✓ Downloaded to: {checkpoint_path}") print(f"📦 Loading checkpoint...") checkpoint = torch.load(checkpoint_path, map_location="cpu") # Initialize UNet with SD1.5 config print(f"🏗️ Initializing SD1.5 UNet...") unet = UNet2DConditionModel.from_pretrained( "runwayml/stable-diffusion-v1-5", subfolder="unet", torch_dtype=torch.float32 ) # Load student weights student_state_dict = checkpoint["student"] # Strip "unet." prefix if present cleaned_dict = {} for key, value in student_state_dict.items(): if key.startswith("unet."): cleaned_dict[key[5:]] = value else: cleaned_dict[key] = value # Load weights unet.load_state_dict(cleaned_dict, strict=False) step = checkpoint.get("gstep", "unknown") print(f"✅ Loaded checkpoint from step {step}") return unet.to(device) def initialize_pipeline(model_choice: str, device: str = "cuda"): """Initialize the complete pipeline.""" print(f"🚀 Initializing {model_choice} pipeline...") is_lune = "Lune" in model_choice # Load base components print("Loading VAE...") vae = AutoencoderKL.from_pretrained( "runwayml/stable-diffusion-v1-5", subfolder="vae", torch_dtype=torch.float32 ).to(device) print("Loading text encoder...") text_encoder = CLIPTextModel.from_pretrained( "openai/clip-vit-large-patch14", torch_dtype=torch.float32 ).to(device) tokenizer = CLIPTokenizer.from_pretrained( "openai/clip-vit-large-patch14" ) # Load UNet based on model choice if is_lune: # Load latest checkpoint from repo repo_id = "AbstractPhil/sd15-flow-lune" # Find latest checkpoint - for now use a known one filename = "sd15_flow_lune_e34_s34000.pt" unet = load_lune_checkpoint(repo_id, filename, device) elif model_choice == "SD1.5 Base": print("Loading SD1.5 base UNet...") unet = UNet2DConditionModel.from_pretrained( "runwayml/stable-diffusion-v1-5", subfolder="unet", torch_dtype=torch.float32 ).to(device) else: raise ValueError(f"Unknown model: {model_choice}") # Initialize scheduler scheduler = EulerDiscreteScheduler.from_pretrained( "runwayml/stable-diffusion-v1-5", subfolder="scheduler" ) print("✅ Pipeline initialized!") pipeline = FlowMatchingPipeline( vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, unet=unet, scheduler=scheduler, device=device ) # Set flag for Lune-specific VAE scaling pipeline.is_lune_model = is_lune return pipeline # ============================================================================ # GLOBAL STATE # ============================================================================ # Initialize with None, will load on first inference CURRENT_PIPELINE = None CURRENT_MODEL = None def get_pipeline(model_choice: str): """Get or create pipeline for selected model.""" global CURRENT_PIPELINE, CURRENT_MODEL if CURRENT_PIPELINE is None or CURRENT_MODEL != model_choice: CURRENT_PIPELINE = initialize_pipeline(model_choice, device="cuda") CURRENT_MODEL = model_choice return CURRENT_PIPELINE # ============================================================================ # INFERENCE # ============================================================================ def estimate_duration(num_steps: int, width: int, height: int) -> int: """Estimate GPU duration based on generation parameters.""" # Base time per step (seconds) base_time_per_step = 0.3 # Resolution scaling resolution_factor = (width * height) / (512 * 512) # Total estimate estimated = num_steps * base_time_per_step * resolution_factor # Add 15 seconds for model loading overhead return int(estimated + 15) @spaces.GPU(duration=lambda *args: estimate_duration(args[3], args[5], args[6])) def generate_image( prompt: str, negative_prompt: str, model_choice: str, num_steps: int, cfg_scale: float, width: int, height: int, shift: float, use_flow_matching: bool, prediction_type: str, seed: int, randomize_seed: bool, progress=gr.Progress() ): """Generate image with ZeroGPU support.""" # Randomize seed if requested if randomize_seed: seed = np.random.randint(0, 2**32 - 1) # Progress tracking def progress_callback(step, total, desc): progress((step + 1) / total, desc=desc) try: # Get pipeline pipeline = get_pipeline(model_choice) # Generate progress(0.05, desc="Starting generation...") image = pipeline( prompt=prompt, negative_prompt=negative_prompt, height=height, width=width, num_inference_steps=num_steps, guidance_scale=cfg_scale, shift=shift, use_flow_matching=use_flow_matching, prediction_type=prediction_type, seed=seed, progress_callback=progress_callback ) progress(1.0, desc="Complete!") return image, seed except Exception as e: print(f"❌ Generation failed: {e}") raise e # ============================================================================ # GRADIO UI # ============================================================================ def create_demo(): """Create Gradio interface.""" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🌙 Lyra/Lune Flow-Matching Image Generation **Geometric crystalline diffusion with flow matching** by [AbstractPhil](https://huggingface.co/AbstractPhil) Generate images using SD1.5-based flow matching with pentachoron geometric structures. Achieves high quality with dramatically reduced step counts through geometric efficiency. """) with gr.Row(): with gr.Column(scale=1): # Prompt - default to first example prompt = gr.TextArea( label="Prompt", value="A serene mountain landscape at golden hour, crystal clear lake reflecting snow-capped peaks, photorealistic, 8k", lines=3 ) negative_prompt = gr.TextArea( label="Negative Prompt", placeholder="blurry, low quality, distorted...", value="blurry, low quality", lines=2 ) # Model selection model_choice = gr.Dropdown( label="Model", choices=[ "Flow-Lune (Latest)", "SD1.5 Base" ], value="Flow-Lune (Latest)" ) # Flow matching settings with gr.Accordion("Flow Matching Settings", open=True): use_flow_matching = gr.Checkbox( label="Enable Flow Matching", value=True, info="Use flow matching ODE integration" ) shift = gr.Slider( label="Shift", minimum=0.0, maximum=5.0, value=2.5, step=0.1, info="Flow matching shift parameter (0=disabled, 1-3 typical)" ) prediction_type = gr.Radio( label="Prediction Type", choices=["epsilon", "v_prediction"], value="v_prediction", # Default to v_prediction for Lune info="Type of model prediction" ) # Generation settings with gr.Accordion("Generation Settings", open=True): num_steps = gr.Slider( label="Steps", minimum=1, maximum=50, value=20, step=1, info="Flow matching typically needs fewer steps (15-25)" ) cfg_scale = gr.Slider( label="CFG Scale", minimum=1.0, maximum=20.0, value=7.5, step=0.5 ) with gr.Row(): width = gr.Slider( label="Width", minimum=256, maximum=1024, value=512, step=64 ) height = gr.Slider( label="Height", minimum=256, maximum=1024, value=512, step=64 ) seed = gr.Slider( label="Seed", minimum=0, maximum=2**32 - 1, value=42, step=1 ) randomize_seed = gr.Checkbox( label="Randomize Seed", value=True ) generate_btn = gr.Button("🎨 Generate", variant="primary", size="lg") with gr.Column(scale=1): output_image = gr.Image( label="Generated Image", type="pil" ) output_seed = gr.Number( label="Used Seed", precision=0 ) gr.Markdown(""" ### Tips: - **Flow matching** works best with 15-25 steps (vs 50+ for standard diffusion) - **Shift** controls the flow trajectory (2.0-2.5 recommended for Lune) - Lower shift = more direct path, higher shift = more exploration - **Lune** uses v_prediction by default for optimal results - **SD1.5 Base** uses epsilon (standard diffusion) - Lune operates in a scaled latent space (5.52x) for geometric efficiency ### Model Info: - **Flow-Lune**: Trained with flow matching on 500k SD1.5 distillation pairs - **SD1.5 Base**: Standard Stable Diffusion 1.5 for comparison [📚 Learn more about geometric deep learning](https://github.com/AbstractEyes/lattice_vocabulary) """) # Examples gr.Examples( examples=[ [ "A serene mountain landscape at golden hour, crystal clear lake reflecting snow-capped peaks, photorealistic, 8k", "blurry, low quality", "Flow-Lune (Latest)", 20, 7.5, 512, 512, 2.5, True, "v_prediction", 42, False ], [ "A futuristic cyberpunk city at night, neon lights, rain-slicked streets, highly detailed", "low quality, blurry", "Flow-Lune (Latest)", 22, 8.0, 512, 512, 2.5, True, "v_prediction", 123, False ], [ "Portrait of a majestic lion, golden mane, dramatic lighting, wildlife photography", "cartoon, painting", "Flow-Lune (Latest)", 18, 7.0, 512, 512, 2.0, True, "v_prediction", 456, False ] ], inputs=[ prompt, negative_prompt, model_choice, num_steps, cfg_scale, width, height, shift, use_flow_matching, prediction_type, seed, randomize_seed ], outputs=[output_image, output_seed], fn=generate_image, cache_examples=False ) # Event handlers # Update settings when model changes def on_model_change(model_name): """Update default settings based on model selection.""" if model_name == "SD1.5 Base": # SD1.5: disable flow matching, use epsilon return { use_flow_matching: gr.update(value=False), prediction_type: gr.update(value="epsilon") } else: # Lune: enable flow matching, use v_prediction return { use_flow_matching: gr.update(value=True), prediction_type: gr.update(value="v_prediction") } model_choice.change( fn=on_model_change, inputs=[model_choice], outputs=[use_flow_matching, prediction_type] ) generate_btn.click( fn=generate_image, inputs=[ prompt, negative_prompt, model_choice, num_steps, cfg_scale, width, height, shift, use_flow_matching, prediction_type, seed, randomize_seed ], outputs=[output_image, output_seed] ) return demo # ============================================================================ # LAUNCH # ============================================================================ if __name__ == "__main__": demo = create_demo() demo.queue(max_size=20) demo.launch(show_api=False)