coralLight's picture
add inference
21110be
raw
history blame
15.4 kB
import gradio as gr
import numpy as np
import random
import json
import spaces #[uncomment to use ZeroGPU]
from diffusers import (
AutoencoderKL,
StableDiffusionXLPipeline,
DPMSolverMultistepScheduler
)
from huggingface_hub import login, hf_hub_download
from PIL import Image
# from huggingface_hub import login
from SVDNoiseUnet import NPNet64
import functools
import random
from free_lunch_utils import register_free_upblock2d, register_free_crossattn_upblock2d
import torch
import torch.nn as nn
from einops import rearrange
from torchvision.utils import make_grid
import time
from pytorch_lightning import seed_everything
from torch import autocast
from contextlib import contextmanager, nullcontext
import accelerate
import torchsde
from SVDNoiseUnet import NPNet128
from tqdm import tqdm, trange
from itertools import islice
device = "cuda" if torch.cuda.is_available() else "cpu"
model_repo_id = "Lykon/dreamshaper-xl-1-0" # Replace to the model you would like to use
from sampler import UniPCSampler
from customed_unipc_scheduler import CustomedUniPCMultistepScheduler
precision_scope = autocast
def extract_into_tensor(a, t, x_shape):
b, *_ = t.shape
out = a.gather(-1, t)
return out.reshape(b, *((1,) * (len(x_shape) - 1)))
def append_zero(x):
return torch.cat([x, x.new_zeros([1])])
def prepare_sdxl_pipeline_step_parameter( pipe: StableDiffusionXLPipeline
, prompts
, need_cfg
, device
, negative_prompt = None
, W = 1024
, H = 1024): # need to correct the format
(
prompt_embeds,
negative_prompt_embeds,
pooled_prompt_embeds,
negative_pooled_prompt_embeds,
) = pipe.encode_prompt(
prompt=prompts,
negative_prompt=negative_prompt,
device=device,
do_classifier_free_guidance=need_cfg,
)
# timesteps = pipe.scheduler.timesteps
prompt_embeds = prompt_embeds.to(device)
add_text_embeds = pooled_prompt_embeds.to(device)
original_size = (W, H)
crops_coords_top_left = (0, 0)
target_size = (W, H)
text_encoder_projection_dim = None
add_time_ids = list(original_size + crops_coords_top_left + target_size)
if pipe.text_encoder_2 is None:
text_encoder_projection_dim = int(pooled_prompt_embeds.shape[-1])
else:
text_encoder_projection_dim = pipe.text_encoder_2.config.projection_dim
passed_add_embed_dim = (
pipe.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
)
expected_add_embed_dim = pipe.unet.add_embedding.linear_1.in_features
if expected_add_embed_dim != passed_add_embed_dim:
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
add_time_ids = torch.tensor([add_time_ids], dtype=prompt_embeds.dtype)
add_time_ids = add_time_ids.to(device)
negative_add_time_ids = add_time_ids
if need_cfg:
prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds], dim=0)
add_text_embeds = torch.cat([negative_pooled_prompt_embeds, add_text_embeds], dim=0)
add_time_ids = torch.cat([negative_add_time_ids, add_time_ids], dim=0)
ret_dict = {
"text_embeds": add_text_embeds,
"time_ids": add_time_ids
}
return prompt_embeds, ret_dict
# New helper to load a list-of-dicts preference JSON
# JSON schema: [ { 'human_preference': [int], 'prompt': str, 'file_path': [str] }, ... ]
def load_preference_json(json_path: str) -> list[dict]:
"""Load records from a JSON file formatted as a list of preference dicts."""
with open(json_path, 'r') as f:
data = json.load(f)
return data
# New helper to extract just the prompts from the preference JSON
# Returns a flat list of all 'prompt' values
def extract_prompts_from_pref_json(json_path: str) -> list[str]:
"""Load a JSON of preference records and return only the prompts."""
records = load_preference_json(json_path)
return [rec['prompt'] for rec in records]
# Example usage:
# prompts = extract_prompts_from_pref_json("path/to/preference.json")
# print(prompts)
def get_sigmas_karras(n, sigma_min, sigma_max, rho=7., device='cpu',need_append_zero = True):
"""Constructs the noise schedule of Karras et al. (2022)."""
ramp = torch.linspace(0, 1, n)
min_inv_rho = sigma_min ** (1 / rho)
max_inv_rho = sigma_max ** (1 / rho)
sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho
return append_zero(sigmas).to(device) if need_append_zero else sigmas.to(device)
def extract_into_tensor(a, t, x_shape):
b, *_ = t.shape
out = a.gather(-1, t)
return out.reshape(b, *((1,) * (len(x_shape) - 1)))
def append_zero(x):
return torch.cat([x, x.new_zeros([1])])
def append_dims(x, target_dims):
"""Appends dimensions to the end of a tensor until it has target_dims dimensions."""
dims_to_append = target_dims - x.ndim
if dims_to_append < 0:
raise ValueError(f'input has {x.ndim} dims but target_dims is {target_dims}, which is less')
return x[(...,) + (None,) * dims_to_append]
def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
def convert_caption_json_to_str(json):
caption = json["caption"]
return caption
torch_dtype = torch.float16
repo_id = "madebyollin/sdxl-vae-fp16-fix" # e.g., "distilbert/distilgpt2"
vae = AutoencoderKL.from_pretrained("madebyollin/sdxl-vae-fp16-fix",torch_dtype=torch_dtype) #from_single_file(downloaded_path, torch_dtype=torch_dtype)
vae.to('cuda')
pipe = StableDiffusionXLPipeline.from_pretrained("John6666/nova-anime-xl-il-v120-sdxl",torch_dtype=torch_dtype,vae=vae)
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 1024
accelerator = accelerate.Accelerator()
def generate_image_with_steps(prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps):
"""Helper function to generate image with specific number of steps"""
scheduler = CustomedUniPCMultistepScheduler.from_config(pipe.scheduler.config
, solver_order = 2 if num_inference_steps==8 else 1
,denoise_to_zero = False
, use_afs=True)
pipe.scheduler = scheduler
pipe.to('cuda')
with torch.no_grad():
with precision_scope("cuda"):
prompts = [prompt]
latents = torch.randn(
(1, pipe.unet.config.in_channels, height // 8, width // 8),
device=device,
)
latents = latents * pipe.scheduler.init_noise_sigma
pipe.scheduler.set_timesteps(num_inference_steps)
idx = 0
register_free_upblock2d(pipe, b1=1.0, b2=1.0, s1=1.0, s2=1.0)
register_free_crossattn_upblock2d(pipe, b1=1.0, b2=1.0, s1=1.0, s2=1.0)
for t in tqdm(pipe.scheduler.timesteps):
# Still not enough. I will tell you, what is the best implementation. Although not via the following code.
# if idx == len(pipe.scheduler.timesteps) - 1:
# break
if idx == -1:#(6 if num_inference_steps == 8 else 4):
register_free_upblock2d(pipe, b1=1.2, b2=1.2, s1=0.9, s2=0.9)
register_free_crossattn_upblock2d(pipe, b1=1.2, b2=1.2, s1=0.9, s2=0.9)
latent_model_input = torch.cat([latents] * 2)
latent_model_input = pipe.scheduler.scale_model_input(latent_model_input , timestep=t)
negative_prompts = '(worst quality:2), (low quality:2), (normal quality:2), bad anatomy, bad proportions, poorly drawn face, poorly drawn hands, missing fingers, extra limbs, blurry, pixelated, distorted, lowres, jpeg artifacts, watermark, signature, text, (deformed:1.5), (bad hands:1.3), overexposed, underexposed, censored, mutated, extra fingers, cloned face, bad eyes'
negative_prompts = 1 * [negative_prompts]
use_afs = num_inference_steps < 7
use_free_predictor = False
prompt_embeds, cond_kwargs = prepare_sdxl_pipeline_step_parameter(pipe
, prompts
, need_cfg=True
, device=pipe.device
, negative_prompt=negative_prompts
, W=width
, H=height)
if idx == 0 and use_afs:
noise_pred = latent_model_input * 0.975
elif idx == len(pipe.scheduler.timesteps) - 1 and use_free_predictor:
noise_pred = None
else:
noise_pred = pipe.unet(latent_model_input
, t
, encoder_hidden_states=prompt_embeds.to(device=latents.device, dtype=latents.dtype)
, added_cond_kwargs=cond_kwargs).sample
if noise_pred is not None:
uncond, cond = noise_pred.chunk(2)
noise_pred = uncond + (cond - uncond) * guidance_scale
latents = pipe.scheduler.step(noise_pred, t, latents).prev_sample
idx += 1
x_samples_ddim = pipe.vae.decode(latents / pipe.vae.config.scaling_factor).sample
x_samples_ddim = torch.clamp((x_samples_ddim + 1.0) / 2.0, min=0.0, max=1.0)
if True:
for x_sample in x_samples_ddim:
# x_sample = 255. * rearrange(x_sample.cpu().numpy(), 'c h w -> h w c')
x_sample = 255. * rearrange(x_sample.cpu().numpy(), 'c h w -> h w c')
img = Image.fromarray(x_sample.astype(np.uint8))
return img
@spaces.GPU #[uncomment to use ZeroGPU]
def infer(
prompt,
negative_prompt,
seed,
randomize_seed,
resolution,
guidance_scale,
num_inference_steps,
progress=gr.Progress(track_tqdm=True),
):
if randomize_seed:
seed = random.randint(0, MAX_SEED)
# Parse resolution string into width and height
width, height = map(int, resolution.split('x'))
# Generate image with selected steps
image_quick = generate_image_with_steps(prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps)
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config, final_sigmas_type="sigma_min")
# Generate image with 50 steps for high quality
negative_prompts = '(worst quality:2), (low quality:2), (normal quality:2), bad anatomy, bad proportions, poorly drawn face, poorly drawn hands, missing fingers, extra limbs, blurry, pixelated, distorted, lowres, jpeg artifacts, watermark, signature, text, (deformed:1.5), (bad hands:1.3), overexposed, underexposed, censored, mutated, extra fingers, cloned face, bad eyes'
negative_prompts = 1 * [negative_prompts]
image_50_steps = pipe(prompt=[prompt]
,negative_prompt=negative_prompts
,num_inference_steps=20
,guidance_scale=guidance_scale
,height=height
,width=width).images[0]
return image_quick, image_50_steps, seed
examples = [
"Astronaut in a jungle, cold color, muted colors, detailed, 8k",
"a painting of a virus monster playing guitar",
"a painting of a squirrel eating a burger",
]
css = """
#col-container {
margin: 0 auto;
max-width: 640px;
}
"""
with gr.Blocks() as demo:
gr.HTML(f"<style>{css}</style>")
with gr.Column(elem_id="col-container"):
gr.Markdown(" # Hyperparameters are all you need")
with gr.Row():
prompt = gr.Text(
label="Prompt",
show_label=False,
max_lines=1,
placeholder="Enter your prompt",
container=False,
)
run_button = gr.Button("Run", scale=0, variant="primary")
with gr.Row():
with gr.Column():
gr.Markdown("### Our fast inference Result using afs and uni-predictor to get 2 free steps")
result = gr.Image(label="Quick Result", show_label=False)
with gr.Column():
gr.Markdown("### Original 20 steps Result")
result_20_steps = gr.Image(label="20 Steps Result", show_label=False)
with gr.Accordion("Advanced Settings", open=False):
negative_prompt = gr.Text(
label="Negative prompt",
max_lines=1,
placeholder="Enter a negative prompt",
visible=False,
)
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=MAX_SEED,
step=1,
value=0,
)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
resolution = gr.Dropdown(
choices=[
"1024x1024",
"1216x832",
"832x1216"
],
value="1024x1024",
label="Resolution",
)
with gr.Row():
guidance_scale = gr.Slider(
label="Guidance scale",
minimum=0.0,
maximum=6.0,
step=0.1,
value=5.5, # Replace with defaults that work for your model
)
num_inference_steps = gr.Dropdown(
choices=[5, 6, 7, 8],
value=8,
label="Number of inference steps",
)
gr.Examples(examples=examples, inputs=[prompt])
gr.on(
triggers=[run_button.click, prompt.submit],
fn=infer,
inputs=[
prompt,
negative_prompt,
seed,
randomize_seed,
resolution,
guidance_scale,
num_inference_steps,
],
outputs=[result, result_20_steps, seed],
)
if __name__ == "__main__":
demo.launch()