Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import numpy as np | |
| import random | |
| import json | |
| import spaces #[uncomment to use ZeroGPU] | |
| from diffusers import ( | |
| AutoencoderKL, | |
| StableDiffusionXLPipeline, | |
| DPMSolverMultistepScheduler | |
| ) | |
| from huggingface_hub import login, hf_hub_download | |
| from PIL import Image | |
| # from huggingface_hub import login | |
| from SVDNoiseUnet import NPNet64 | |
| import functools | |
| import random | |
| from free_lunch_utils import register_free_upblock2d, register_free_crossattn_upblock2d | |
| import torch | |
| import torch.nn as nn | |
| from einops import rearrange | |
| from torchvision.utils import make_grid | |
| import time | |
| from pytorch_lightning import seed_everything | |
| from torch import autocast | |
| from contextlib import contextmanager, nullcontext | |
| import accelerate | |
| import torchsde | |
| from SVDNoiseUnet import NPNet128 | |
| from tqdm import tqdm, trange | |
| from itertools import islice | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model_repo_id = "Lykon/dreamshaper-xl-1-0" # Replace to the model you would like to use | |
| from sampler import UniPCSampler | |
| from customed_unipc_scheduler import CustomedUniPCMultistepScheduler | |
| precision_scope = autocast | |
| def extract_into_tensor(a, t, x_shape): | |
| b, *_ = t.shape | |
| out = a.gather(-1, t) | |
| return out.reshape(b, *((1,) * (len(x_shape) - 1))) | |
| def append_zero(x): | |
| return torch.cat([x, x.new_zeros([1])]) | |
| def prepare_sdxl_pipeline_step_parameter( pipe: StableDiffusionXLPipeline | |
| , prompts | |
| , need_cfg | |
| , device | |
| , negative_prompt = None | |
| , W = 1024 | |
| , H = 1024): # need to correct the format | |
| ( | |
| prompt_embeds, | |
| negative_prompt_embeds, | |
| pooled_prompt_embeds, | |
| negative_pooled_prompt_embeds, | |
| ) = pipe.encode_prompt( | |
| prompt=prompts, | |
| negative_prompt=negative_prompt, | |
| device=device, | |
| do_classifier_free_guidance=need_cfg, | |
| ) | |
| # timesteps = pipe.scheduler.timesteps | |
| prompt_embeds = prompt_embeds.to(device) | |
| add_text_embeds = pooled_prompt_embeds.to(device) | |
| original_size = (W, H) | |
| crops_coords_top_left = (0, 0) | |
| target_size = (W, H) | |
| text_encoder_projection_dim = None | |
| add_time_ids = list(original_size + crops_coords_top_left + target_size) | |
| if pipe.text_encoder_2 is None: | |
| text_encoder_projection_dim = int(pooled_prompt_embeds.shape[-1]) | |
| else: | |
| text_encoder_projection_dim = pipe.text_encoder_2.config.projection_dim | |
| passed_add_embed_dim = ( | |
| pipe.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim | |
| ) | |
| expected_add_embed_dim = pipe.unet.add_embedding.linear_1.in_features | |
| if expected_add_embed_dim != passed_add_embed_dim: | |
| raise ValueError( | |
| f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`." | |
| ) | |
| add_time_ids = torch.tensor([add_time_ids], dtype=prompt_embeds.dtype) | |
| add_time_ids = add_time_ids.to(device) | |
| negative_add_time_ids = add_time_ids | |
| if need_cfg: | |
| prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds], dim=0) | |
| add_text_embeds = torch.cat([negative_pooled_prompt_embeds, add_text_embeds], dim=0) | |
| add_time_ids = torch.cat([negative_add_time_ids, add_time_ids], dim=0) | |
| ret_dict = { | |
| "text_embeds": add_text_embeds, | |
| "time_ids": add_time_ids | |
| } | |
| return prompt_embeds, ret_dict | |
| # New helper to load a list-of-dicts preference JSON | |
| # JSON schema: [ { 'human_preference': [int], 'prompt': str, 'file_path': [str] }, ... ] | |
| def load_preference_json(json_path: str) -> list[dict]: | |
| """Load records from a JSON file formatted as a list of preference dicts.""" | |
| with open(json_path, 'r') as f: | |
| data = json.load(f) | |
| return data | |
| # New helper to extract just the prompts from the preference JSON | |
| # Returns a flat list of all 'prompt' values | |
| def extract_prompts_from_pref_json(json_path: str) -> list[str]: | |
| """Load a JSON of preference records and return only the prompts.""" | |
| records = load_preference_json(json_path) | |
| return [rec['prompt'] for rec in records] | |
| # Example usage: | |
| # prompts = extract_prompts_from_pref_json("path/to/preference.json") | |
| # print(prompts) | |
| def get_sigmas_karras(n, sigma_min, sigma_max, rho=7., device='cpu',need_append_zero = True): | |
| """Constructs the noise schedule of Karras et al. (2022).""" | |
| ramp = torch.linspace(0, 1, n) | |
| min_inv_rho = sigma_min ** (1 / rho) | |
| max_inv_rho = sigma_max ** (1 / rho) | |
| sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho | |
| return append_zero(sigmas).to(device) if need_append_zero else sigmas.to(device) | |
| def extract_into_tensor(a, t, x_shape): | |
| b, *_ = t.shape | |
| out = a.gather(-1, t) | |
| return out.reshape(b, *((1,) * (len(x_shape) - 1))) | |
| def append_zero(x): | |
| return torch.cat([x, x.new_zeros([1])]) | |
| def append_dims(x, target_dims): | |
| """Appends dimensions to the end of a tensor until it has target_dims dimensions.""" | |
| dims_to_append = target_dims - x.ndim | |
| if dims_to_append < 0: | |
| raise ValueError(f'input has {x.ndim} dims but target_dims is {target_dims}, which is less') | |
| return x[(...,) + (None,) * dims_to_append] | |
| def chunk(it, size): | |
| it = iter(it) | |
| return iter(lambda: tuple(islice(it, size)), ()) | |
| def convert_caption_json_to_str(json): | |
| caption = json["caption"] | |
| return caption | |
| torch_dtype = torch.float16 | |
| repo_id = "madebyollin/sdxl-vae-fp16-fix" # e.g., "distilbert/distilgpt2" | |
| vae = AutoencoderKL.from_pretrained("madebyollin/sdxl-vae-fp16-fix",torch_dtype=torch_dtype) #from_single_file(downloaded_path, torch_dtype=torch_dtype) | |
| vae.to('cuda') | |
| pipe = StableDiffusionXLPipeline.from_pretrained("John6666/nova-anime-xl-il-v120-sdxl",torch_dtype=torch_dtype,vae=vae) | |
| MAX_SEED = np.iinfo(np.int32).max | |
| MAX_IMAGE_SIZE = 1024 | |
| accelerator = accelerate.Accelerator() | |
| def generate_image_with_steps(prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps): | |
| """Helper function to generate image with specific number of steps""" | |
| scheduler = CustomedUniPCMultistepScheduler.from_config(pipe.scheduler.config | |
| , solver_order = 2 if num_inference_steps==8 else 1 | |
| ,denoise_to_zero = False | |
| , use_afs=True) | |
| pipe.scheduler = scheduler | |
| pipe.to('cuda') | |
| with torch.no_grad(): | |
| with precision_scope("cuda"): | |
| prompts = [prompt] | |
| latents = torch.randn( | |
| (1, pipe.unet.config.in_channels, height // 8, width // 8), | |
| device=device, | |
| ) | |
| latents = latents * pipe.scheduler.init_noise_sigma | |
| pipe.scheduler.set_timesteps(num_inference_steps) | |
| idx = 0 | |
| register_free_upblock2d(pipe, b1=1.0, b2=1.0, s1=1.0, s2=1.0) | |
| register_free_crossattn_upblock2d(pipe, b1=1.0, b2=1.0, s1=1.0, s2=1.0) | |
| for t in tqdm(pipe.scheduler.timesteps): | |
| # Still not enough. I will tell you, what is the best implementation. Although not via the following code. | |
| # if idx == len(pipe.scheduler.timesteps) - 1: | |
| # break | |
| if idx == -1:#(6 if num_inference_steps == 8 else 4): | |
| register_free_upblock2d(pipe, b1=1.2, b2=1.2, s1=0.9, s2=0.9) | |
| register_free_crossattn_upblock2d(pipe, b1=1.2, b2=1.2, s1=0.9, s2=0.9) | |
| latent_model_input = torch.cat([latents] * 2) | |
| latent_model_input = pipe.scheduler.scale_model_input(latent_model_input , timestep=t) | |
| negative_prompts = '(worst quality:2), (low quality:2), (normal quality:2), bad anatomy, bad proportions, poorly drawn face, poorly drawn hands, missing fingers, extra limbs, blurry, pixelated, distorted, lowres, jpeg artifacts, watermark, signature, text, (deformed:1.5), (bad hands:1.3), overexposed, underexposed, censored, mutated, extra fingers, cloned face, bad eyes' | |
| negative_prompts = 1 * [negative_prompts] | |
| use_afs = num_inference_steps < 7 | |
| use_free_predictor = False | |
| prompt_embeds, cond_kwargs = prepare_sdxl_pipeline_step_parameter(pipe | |
| , prompts | |
| , need_cfg=True | |
| , device=pipe.device | |
| , negative_prompt=negative_prompts | |
| , W=width | |
| , H=height) | |
| if idx == 0 and use_afs: | |
| noise_pred = latent_model_input * 0.975 | |
| elif idx == len(pipe.scheduler.timesteps) - 1 and use_free_predictor: | |
| noise_pred = None | |
| else: | |
| noise_pred = pipe.unet(latent_model_input | |
| , t | |
| , encoder_hidden_states=prompt_embeds.to(device=latents.device, dtype=latents.dtype) | |
| , added_cond_kwargs=cond_kwargs).sample | |
| if noise_pred is not None: | |
| uncond, cond = noise_pred.chunk(2) | |
| noise_pred = uncond + (cond - uncond) * guidance_scale | |
| latents = pipe.scheduler.step(noise_pred, t, latents).prev_sample | |
| idx += 1 | |
| x_samples_ddim = pipe.vae.decode(latents / pipe.vae.config.scaling_factor).sample | |
| x_samples_ddim = torch.clamp((x_samples_ddim + 1.0) / 2.0, min=0.0, max=1.0) | |
| if True: | |
| for x_sample in x_samples_ddim: | |
| # x_sample = 255. * rearrange(x_sample.cpu().numpy(), 'c h w -> h w c') | |
| x_sample = 255. * rearrange(x_sample.cpu().numpy(), 'c h w -> h w c') | |
| img = Image.fromarray(x_sample.astype(np.uint8)) | |
| return img | |
| #[uncomment to use ZeroGPU] | |
| def infer( | |
| prompt, | |
| negative_prompt, | |
| seed, | |
| randomize_seed, | |
| resolution, | |
| guidance_scale, | |
| num_inference_steps, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| # Parse resolution string into width and height | |
| width, height = map(int, resolution.split('x')) | |
| # Generate image with selected steps | |
| image_quick = generate_image_with_steps(prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps) | |
| pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config, final_sigmas_type="sigma_min") | |
| # Generate image with 50 steps for high quality | |
| negative_prompts = '(worst quality:2), (low quality:2), (normal quality:2), bad anatomy, bad proportions, poorly drawn face, poorly drawn hands, missing fingers, extra limbs, blurry, pixelated, distorted, lowres, jpeg artifacts, watermark, signature, text, (deformed:1.5), (bad hands:1.3), overexposed, underexposed, censored, mutated, extra fingers, cloned face, bad eyes' | |
| negative_prompts = 1 * [negative_prompts] | |
| image_50_steps = pipe(prompt=[prompt] | |
| ,negative_prompt=negative_prompts | |
| ,num_inference_steps=20 | |
| ,guidance_scale=guidance_scale | |
| ,height=height | |
| ,width=width).images[0] | |
| return image_quick, image_50_steps, seed | |
| examples = [ | |
| "Astronaut in a jungle, cold color, muted colors, detailed, 8k", | |
| "a painting of a virus monster playing guitar", | |
| "a painting of a squirrel eating a burger", | |
| ] | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 640px; | |
| } | |
| """ | |
| with gr.Blocks() as demo: | |
| gr.HTML(f"<style>{css}</style>") | |
| with gr.Column(elem_id="col-container"): | |
| gr.Markdown(" # Hyperparameters are all you need") | |
| with gr.Row(): | |
| prompt = gr.Text( | |
| label="Prompt", | |
| show_label=False, | |
| max_lines=1, | |
| placeholder="Enter your prompt", | |
| container=False, | |
| ) | |
| run_button = gr.Button("Run", scale=0, variant="primary") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Our fast inference Result using afs and uni-predictor to get 2 free steps") | |
| result = gr.Image(label="Quick Result", show_label=False) | |
| with gr.Column(): | |
| gr.Markdown("### Original 20 steps Result") | |
| result_20_steps = gr.Image(label="20 Steps Result", show_label=False) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| negative_prompt = gr.Text( | |
| label="Negative prompt", | |
| max_lines=1, | |
| placeholder="Enter a negative prompt", | |
| visible=False, | |
| ) | |
| seed = gr.Slider( | |
| label="Seed", | |
| minimum=0, | |
| maximum=MAX_SEED, | |
| step=1, | |
| value=0, | |
| ) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
| resolution = gr.Dropdown( | |
| choices=[ | |
| "1024x1024", | |
| "1216x832", | |
| "832x1216" | |
| ], | |
| value="1024x1024", | |
| label="Resolution", | |
| ) | |
| with gr.Row(): | |
| guidance_scale = gr.Slider( | |
| label="Guidance scale", | |
| minimum=0.0, | |
| maximum=6.0, | |
| step=0.1, | |
| value=5.5, # Replace with defaults that work for your model | |
| ) | |
| num_inference_steps = gr.Dropdown( | |
| choices=[5, 6, 7, 8], | |
| value=8, | |
| label="Number of inference steps", | |
| ) | |
| gr.Examples(examples=examples, inputs=[prompt]) | |
| gr.on( | |
| triggers=[run_button.click, prompt.submit], | |
| fn=infer, | |
| inputs=[ | |
| prompt, | |
| negative_prompt, | |
| seed, | |
| randomize_seed, | |
| resolution, | |
| guidance_scale, | |
| num_inference_steps, | |
| ], | |
| outputs=[result, result_20_steps, seed], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |