AbstractPhil's picture
Update app.py
5731dbc verified
raw
history blame
22.8 kB
"""
Lyra/Lune Flow-Matching Inference Space
Author: AbstractPhil
License: MIT
SD1.5-based flow matching with geometric crystalline architectures.
"""
import os
import torch
import gradio as gr
import numpy as np
from PIL import Image
from typing import Optional, Dict
import spaces
from diffusers import (
UNet2DConditionModel,
AutoencoderKL,
DPMSolverMultistepScheduler,
EulerDiscreteScheduler
)
from transformers import CLIPTextModel, CLIPTokenizer
from huggingface_hub import hf_hub_download
# ============================================================================
# MODEL LOADING
# ============================================================================
class FlowMatchingPipeline:
"""Custom pipeline for flow-matching inference."""
def __init__(
self,
vae: AutoencoderKL,
text_encoder: CLIPTextModel,
tokenizer: CLIPTokenizer,
unet: UNet2DConditionModel,
scheduler,
device: str = "cuda"
):
self.vae = vae
self.text_encoder = text_encoder
self.tokenizer = tokenizer
self.unet = unet
self.scheduler = scheduler
self.device = device
# VAE scaling factor
self.vae_scale_factor = 0.18215
def encode_prompt(self, prompt: str, negative_prompt: str = ""):
"""Encode text prompts to embeddings."""
# Positive prompt
text_inputs = self.tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids.to(self.device)
with torch.no_grad():
prompt_embeds = self.text_encoder(text_input_ids)[0]
# Negative prompt
if negative_prompt:
uncond_inputs = self.tokenizer(
negative_prompt,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
)
uncond_input_ids = uncond_inputs.input_ids.to(self.device)
with torch.no_grad():
negative_prompt_embeds = self.text_encoder(uncond_input_ids)[0]
else:
negative_prompt_embeds = torch.zeros_like(prompt_embeds)
return prompt_embeds, negative_prompt_embeds
@torch.no_grad()
def __call__(
self,
prompt: str,
negative_prompt: str = "",
height: int = 512,
width: int = 512,
num_inference_steps: int = 20,
guidance_scale: float = 7.5,
shift: float = 2.5,
use_flow_matching: bool = True,
prediction_type: str = "epsilon",
seed: Optional[int] = None,
progress_callback=None
):
"""Generate image using flow matching or standard diffusion."""
# Set seed
if seed is not None:
generator = torch.Generator(device=self.device).manual_seed(seed)
else:
generator = None
# Encode prompts
prompt_embeds, negative_prompt_embeds = self.encode_prompt(
prompt, negative_prompt
)
# Prepare latents
latent_channels = 4
latent_height = height // 8
latent_width = width // 8
latents = torch.randn(
(1, latent_channels, latent_height, latent_width),
generator=generator,
device=self.device,
dtype=torch.float32
)
# Set timesteps
self.scheduler.set_timesteps(num_inference_steps, device=self.device)
timesteps = self.scheduler.timesteps
# Scale initial latents by scheduler's init_noise_sigma for standard diffusion
# Flow matching uses unscaled latents and custom ODE integration
if not use_flow_matching:
latents = latents * self.scheduler.init_noise_sigma
# Denoising loop
for i, t in enumerate(timesteps):
if progress_callback:
progress_callback(i, num_inference_steps, f"Step {i+1}/{num_inference_steps}")
# Expand latents for classifier-free guidance
latent_model_input = torch.cat([latents] * 2) if guidance_scale > 1.0 else latents
# For standard diffusion, let scheduler handle scaling
# For flow matching, apply custom shift-based scaling
if use_flow_matching and shift > 0:
# Compute sigma from timestep with shift
sigma = t.float() / 1000.0
sigma_shifted = (shift * sigma) / (1 + (shift - 1) * sigma)
# Scale latent input for flow matching
scaling = torch.sqrt(1 + sigma_shifted ** 2)
latent_model_input = latent_model_input / scaling
else:
# For standard diffusion, scale by scheduler
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
# Prepare timestep
timestep = t.expand(latent_model_input.shape[0])
# Predict noise/velocity
text_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if guidance_scale > 1.0 else prompt_embeds
noise_pred = self.unet(
latent_model_input,
timestep,
encoder_hidden_states=text_embeds,
return_dict=False
)[0]
# Classifier-free guidance
if guidance_scale > 1.0:
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# Flow matching step
if use_flow_matching:
# Manual flow matching update
sigma = t.float() / 1000.0
sigma_shifted = (shift * sigma) / (1 + (shift - 1) * sigma)
if prediction_type == "v_prediction":
# Convert v-prediction to epsilon
v_pred = noise_pred
alpha_t = torch.sqrt(1 - sigma_shifted ** 2)
sigma_t = sigma_shifted
noise_pred = alpha_t * v_pred + sigma_t * latents
# Compute next latent
dt = -1.0 / num_inference_steps
latents = latents + dt * noise_pred
else:
# Standard scheduler step
latents = self.scheduler.step(
noise_pred, t, latents, return_dict=False
)[0]
# Decode latents with model-specific scaling
latents = latents / self.vae_scale_factor
# Lune-specific scaling: multiply by 5.52 for Lune's latent space offset
# This must be applied ONLY for Lune model, not SD1.5 Base
if hasattr(self, 'is_lune_model') and self.is_lune_model:
latents = latents * 5.52
with torch.no_grad():
image = self.vae.decode(latents).sample
# Convert to PIL
image = (image / 2 + 0.5).clamp(0, 1)
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
image = (image * 255).round().astype("uint8")
image = Image.fromarray(image[0])
return image
def load_lune_checkpoint(repo_id: str, filename: str, device: str = "cuda"):
"""Load Lune checkpoint from .pt file."""
print(f"📥 Downloading checkpoint: {repo_id}/{filename}")
checkpoint_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
repo_type="model"
)
print(f"✓ Downloaded to: {checkpoint_path}")
print(f"📦 Loading checkpoint...")
checkpoint = torch.load(checkpoint_path, map_location="cpu")
# Initialize UNet with SD1.5 config
print(f"🏗️ Initializing SD1.5 UNet...")
unet = UNet2DConditionModel.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="unet",
torch_dtype=torch.float32
)
# Load student weights
student_state_dict = checkpoint["student"]
# Strip "unet." prefix if present
cleaned_dict = {}
for key, value in student_state_dict.items():
if key.startswith("unet."):
cleaned_dict[key[5:]] = value
else:
cleaned_dict[key] = value
# Load weights
unet.load_state_dict(cleaned_dict, strict=False)
step = checkpoint.get("gstep", "unknown")
print(f"✅ Loaded checkpoint from step {step}")
return unet.to(device)
def initialize_pipeline(model_choice: str, device: str = "cuda"):
"""Initialize the complete pipeline."""
print(f"🚀 Initializing {model_choice} pipeline...")
is_lune = "Lune" in model_choice
# Load base components
print("Loading VAE...")
vae = AutoencoderKL.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="vae",
torch_dtype=torch.float32
).to(device)
print("Loading text encoder...")
text_encoder = CLIPTextModel.from_pretrained(
"openai/clip-vit-large-patch14",
torch_dtype=torch.float32
).to(device)
tokenizer = CLIPTokenizer.from_pretrained(
"openai/clip-vit-large-patch14"
)
# Load UNet based on model choice
if is_lune:
# Load latest checkpoint from repo
repo_id = "AbstractPhil/sd15-flow-lune"
# Find latest checkpoint - for now use a known one
filename = "sd15_flow_lune_e34_s34000.pt"
unet = load_lune_checkpoint(repo_id, filename, device)
elif model_choice == "SD1.5 Base":
print("Loading SD1.5 base UNet...")
unet = UNet2DConditionModel.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="unet",
torch_dtype=torch.float32
).to(device)
else:
raise ValueError(f"Unknown model: {model_choice}")
# Initialize scheduler
scheduler = EulerDiscreteScheduler.from_pretrained(
"runwayml/stable-diffusion-v1-5",
subfolder="scheduler"
)
print("✅ Pipeline initialized!")
pipeline = FlowMatchingPipeline(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
device=device
)
# Set flag for Lune-specific VAE scaling
pipeline.is_lune_model = is_lune
return pipeline
# ============================================================================
# GLOBAL STATE
# ============================================================================
# Initialize with None, will load on first inference
CURRENT_PIPELINE = None
CURRENT_MODEL = None
def get_pipeline(model_choice: str):
"""Get or create pipeline for selected model."""
global CURRENT_PIPELINE, CURRENT_MODEL
if CURRENT_PIPELINE is None or CURRENT_MODEL != model_choice:
CURRENT_PIPELINE = initialize_pipeline(model_choice, device="cuda")
CURRENT_MODEL = model_choice
return CURRENT_PIPELINE
# ============================================================================
# INFERENCE
# ============================================================================
def estimate_duration(num_steps: int, width: int, height: int) -> int:
"""Estimate GPU duration based on generation parameters."""
# Base time per step (seconds)
base_time_per_step = 0.3
# Resolution scaling
resolution_factor = (width * height) / (512 * 512)
# Total estimate
estimated = num_steps * base_time_per_step * resolution_factor
# Add 15 seconds for model loading overhead
return int(estimated + 15)
@spaces.GPU(duration=lambda *args: estimate_duration(args[3], args[5], args[6]))
def generate_image(
prompt: str,
negative_prompt: str,
model_choice: str,
num_steps: int,
cfg_scale: float,
width: int,
height: int,
shift: float,
use_flow_matching: bool,
prediction_type: str,
seed: int,
randomize_seed: bool,
progress=gr.Progress()
):
"""Generate image with ZeroGPU support."""
# Randomize seed if requested
if randomize_seed:
seed = np.random.randint(0, 2**32 - 1)
# Progress tracking
def progress_callback(step, total, desc):
progress((step + 1) / total, desc=desc)
try:
# Get pipeline
pipeline = get_pipeline(model_choice)
# Generate
progress(0.05, desc="Starting generation...")
image = pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
height=height,
width=width,
num_inference_steps=num_steps,
guidance_scale=cfg_scale,
shift=shift,
use_flow_matching=use_flow_matching,
prediction_type=prediction_type,
seed=seed,
progress_callback=progress_callback
)
progress(1.0, desc="Complete!")
return image, seed
except Exception as e:
print(f"❌ Generation failed: {e}")
raise e
# ============================================================================
# GRADIO UI
# ============================================================================
def create_demo():
"""Create Gradio interface."""
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🌙 Lyra/Lune Flow-Matching Image Generation
**Geometric crystalline diffusion with flow matching** by [AbstractPhil](https://huggingface.co/AbstractPhil)
Generate images using SD1.5-based flow matching with pentachoron geometric structures.
Achieves high quality with dramatically reduced step counts through geometric efficiency.
""")
with gr.Row():
with gr.Column(scale=1):
# Prompt - default to first example
prompt = gr.TextArea(
label="Prompt",
value="A serene mountain landscape at golden hour, crystal clear lake reflecting snow-capped peaks, photorealistic, 8k",
lines=3
)
negative_prompt = gr.TextArea(
label="Negative Prompt",
placeholder="blurry, low quality, distorted...",
value="blurry, low quality",
lines=2
)
# Model selection
model_choice = gr.Dropdown(
label="Model",
choices=[
"Flow-Lune (Latest)",
"SD1.5 Base"
],
value="Flow-Lune (Latest)"
)
# Flow matching settings
with gr.Accordion("Flow Matching Settings", open=True):
use_flow_matching = gr.Checkbox(
label="Enable Flow Matching",
value=True,
info="Use flow matching ODE integration"
)
shift = gr.Slider(
label="Shift",
minimum=0.0,
maximum=5.0,
value=2.5,
step=0.1,
info="Flow matching shift parameter (0=disabled, 1-3 typical)"
)
prediction_type = gr.Radio(
label="Prediction Type",
choices=["epsilon", "v_prediction"],
value="v_prediction", # Default to v_prediction for Lune
info="Type of model prediction"
)
# Generation settings
with gr.Accordion("Generation Settings", open=True):
num_steps = gr.Slider(
label="Steps",
minimum=1,
maximum=50,
value=20,
step=1,
info="Flow matching typically needs fewer steps (15-25)"
)
cfg_scale = gr.Slider(
label="CFG Scale",
minimum=1.0,
maximum=20.0,
value=7.5,
step=0.5
)
with gr.Row():
width = gr.Slider(
label="Width",
minimum=256,
maximum=1024,
value=512,
step=64
)
height = gr.Slider(
label="Height",
minimum=256,
maximum=1024,
value=512,
step=64
)
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=2**32 - 1,
value=42,
step=1
)
randomize_seed = gr.Checkbox(
label="Randomize Seed",
value=True
)
generate_btn = gr.Button("🎨 Generate", variant="primary", size="lg")
with gr.Column(scale=1):
output_image = gr.Image(
label="Generated Image",
type="pil"
)
output_seed = gr.Number(
label="Used Seed",
precision=0
)
gr.Markdown("""
### Tips:
- **Flow matching** works best with 15-25 steps (vs 50+ for standard diffusion)
- **Shift** controls the flow trajectory (2.0-2.5 recommended for Lune)
- Lower shift = more direct path, higher shift = more exploration
- **Lune** uses v_prediction by default for optimal results
- **SD1.5 Base** uses epsilon (standard diffusion)
- Lune operates in a scaled latent space (5.52x) for geometric efficiency
### Model Info:
- **Flow-Lune**: Trained with flow matching on 500k SD1.5 distillation pairs
- **SD1.5 Base**: Standard Stable Diffusion 1.5 for comparison
[📚 Learn more about geometric deep learning](https://github.com/AbstractEyes/lattice_vocabulary)
""")
# Examples
gr.Examples(
examples=[
[
"A serene mountain landscape at golden hour, crystal clear lake reflecting snow-capped peaks, photorealistic, 8k",
"blurry, low quality",
"Flow-Lune (Latest)",
20,
7.5,
512,
512,
2.5,
True,
"v_prediction",
42,
False
],
[
"A futuristic cyberpunk city at night, neon lights, rain-slicked streets, highly detailed",
"low quality, blurry",
"Flow-Lune (Latest)",
22,
8.0,
512,
512,
2.5,
True,
"v_prediction",
123,
False
],
[
"Portrait of a majestic lion, golden mane, dramatic lighting, wildlife photography",
"cartoon, painting",
"Flow-Lune (Latest)",
18,
7.0,
512,
512,
2.0,
True,
"v_prediction",
456,
False
]
],
inputs=[
prompt, negative_prompt, model_choice, num_steps, cfg_scale,
width, height, shift, use_flow_matching, prediction_type,
seed, randomize_seed
],
outputs=[output_image, output_seed],
fn=generate_image,
cache_examples=False
)
# Event handlers
# Update settings when model changes
def on_model_change(model_name):
"""Update default settings based on model selection."""
if model_name == "SD1.5 Base":
# SD1.5: disable flow matching, use epsilon
return {
use_flow_matching: gr.update(value=False),
prediction_type: gr.update(value="epsilon")
}
else:
# Lune: enable flow matching, use v_prediction
return {
use_flow_matching: gr.update(value=True),
prediction_type: gr.update(value="v_prediction")
}
model_choice.change(
fn=on_model_change,
inputs=[model_choice],
outputs=[use_flow_matching, prediction_type]
)
generate_btn.click(
fn=generate_image,
inputs=[
prompt, negative_prompt, model_choice, num_steps, cfg_scale,
width, height, shift, use_flow_matching, prediction_type,
seed, randomize_seed
],
outputs=[output_image, output_seed]
)
return demo
# ============================================================================
# LAUNCH
# ============================================================================
if __name__ == "__main__":
demo = create_demo()
demo.queue(max_size=20)
demo.launch(show_api=False)