Spaces:
Sleeping
Sleeping
| # Utility class for loading and using diffusers model | |
| import diffusers | |
| import transformers | |
| import torch | |
| from typing import Union | |
| import os | |
| import warnings | |
| import numpy as np | |
| from PIL import Image | |
| import tqdm | |
| from copy import deepcopy | |
| import matplotlib.pyplot as plt | |
| def build_generator( | |
| device : torch.device, | |
| seed : int, | |
| ): | |
| """ | |
| Build a torch.Generator with a given seed. | |
| """ | |
| generator = torch.Generator(device).manual_seed(seed) | |
| return generator | |
| def load_stablediffusion_model( | |
| model_id : Union[str, os.PathLike], | |
| device : torch.device, | |
| ): | |
| """ | |
| Load a complete diffusion model from a model id. | |
| Returns a tuple of the model and a torch.Generator if seed is not None. | |
| """ | |
| pipe = diffusers.DiffusionPipeline.from_pretrained( | |
| model_id, | |
| revision="fp16", | |
| torch_dtype=torch.float16, | |
| use_auth_token=True, | |
| ) | |
| pipe.scheduler = diffusers.DPMSolverMultistepScheduler.from_config(pipe.scheduler.config) | |
| try: | |
| pipe = pipe.to(device) | |
| except: | |
| warnings.warn( | |
| f'Could not load model to device:{device}. Using CPU instead.' | |
| ) | |
| pipe = pipe.to('cpu') | |
| device = 'cpu' | |
| return pipe | |
| def visualize_image_grid( | |
| imgs : np.array, | |
| rows : int, | |
| cols : int): | |
| assert len(imgs) == rows*cols | |
| # create grid | |
| w, h = imgs[0].size # assuming each image is the same size | |
| grid = Image.new('RGB', size=(cols*w, rows*h)) | |
| for i,img in enumerate(imgs): | |
| grid.paste(img, box=(i%cols*w, i//cols*h)) | |
| return grid | |
| def build_pipeline( | |
| autoencoder : Union[str, os.PathLike] = "CompVis/stable-diffusion-v1-4", | |
| tokenizer : Union[str, os.PathLike] = "openai/clip-vit-large-patch14", | |
| text_encoder : Union[str, os.PathLike] = "openai/clip-vit-large-patch14", | |
| unet : Union[str, os.PathLike] = "CompVis/stable-diffusion-v1-4", | |
| device : torch.device = torch.device('cuda'), | |
| ): | |
| """ | |
| Create a pipeline for StableDiffusion by loading the model and component seperetely. | |
| Arguments: | |
| autoencoder: path to model that autoencoder will be loaded from | |
| tokenizer: path to tokenizer | |
| text_encoder: path to text_encoder | |
| unet: path to unet | |
| """ | |
| # Load the VAE for encoding images into the latent space | |
| vae = diffusers.AutoencoderKL.from_pretrained(autoencoder, subfolder = 'vae') | |
| # Load tokenizer & text encoder for encoding text into the latent space | |
| tokenizer = transformers.CLIPTokenizer.from_pretrained(tokenizer) | |
| text_encoder = transformers.CLIPTextModel.from_pretrained(text_encoder) | |
| # Use the UNet model for conditioning the diffusion process | |
| unet = diffusers.UNet2DConditionModel.from_pretrained(unet, subfolder = 'unet') | |
| # Move all the components to device | |
| vae = vae.to(device) | |
| text_encoder = text_encoder.to(device) | |
| unet = unet.to(device) | |
| return vae, tokenizer, text_encoder, unet | |
| #TODO : Add negative prompting | |
| def custom_stablediffusion_inference( | |
| vae, | |
| tokenizer, | |
| text_encoder, | |
| unet, | |
| noise_scheduler, | |
| prompt : list, | |
| device : torch.device, | |
| num_inference_steps = 100, | |
| image_size = (512,512), | |
| guidance_scale = 8, | |
| seed = 42, | |
| return_image_step = 5, | |
| ): | |
| # Get the text embeddings that will condition the diffusion process | |
| if isinstance(prompt,str): | |
| prompt = [prompt] | |
| batch_size = len(prompt) | |
| text_input = tokenizer( | |
| prompt, | |
| padding = 'max_length', | |
| truncation = True, | |
| max_length = tokenizer.model_max_length, | |
| return_tensors = 'pt').to(device) | |
| text_embeddings = text_encoder( | |
| text_input.input_ids.to(device) | |
| )[0] | |
| # Get the text embeddings for classifier-free guidance | |
| max_length = text_input.input_ids.shape[-1] | |
| empty = [""] * batch_size | |
| uncond_input = tokenizer( | |
| empty, | |
| padding = 'max_length', | |
| truncation = True, | |
| max_length = max_length, | |
| return_tensors = 'pt').to(device) | |
| uncond_embeddings = text_encoder( | |
| uncond_input.input_ids.to(device) | |
| )[0] | |
| # Concatenate the text embeddings to get the conditioning vector | |
| text_embeddings = torch.cat([uncond_embeddings, text_embeddings]) | |
| # Generate initial noise | |
| latents = torch.randn( | |
| (1, unet.in_channels, image_size[0] // 8, image_size[1] // 8), | |
| generator=torch.manual_seed(seed) if seed is not None else None | |
| ) | |
| print(latents.shape) | |
| latents = latents.to(device) | |
| # Initialize scheduler for noise generation | |
| noise_scheduler.set_timesteps(num_inference_steps) | |
| latents = latents * noise_scheduler.init_noise_sigma | |
| noise_scheduler.set_timesteps(num_inference_steps) | |
| for i,t in tqdm.tqdm(enumerate(noise_scheduler.timesteps)): | |
| # If no text embedding is provided (classifier-free guidance), extend the conditioning vector | |
| latent_model_input = torch.cat([latents] * 2) | |
| latent_model_input = noise_scheduler.scale_model_input(latent_model_input, timestep=t) | |
| with torch.no_grad(): | |
| # Get the noise prediction from the UNet | |
| noise_pred = unet(latent_model_input, t, encoder_hidden_states = text_embeddings).sample | |
| # Perform guidance from the text embeddings | |
| noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) | |
| noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) | |
| # Compute the previously noisy sample x_t -> x_t-1 | |
| latents = noise_scheduler.step(noise_pred, t, latents).prev_sample | |
| # Now that latent is generated from a noise, use unet decoder to generate images | |
| if i % return_image_step == 0: | |
| with torch.no_grad(): | |
| latents_copy = deepcopy(latents) | |
| image = vae.decode(1/0.18215 * latents_copy).sample | |
| image = (image / 2 + 0.5).clamp(0,1) | |
| image = image.detach().cpu().permute(0,2,3,1).numpy() # bxhxwxc | |
| images = (image * 255).round().astype("uint8") | |
| pil_images = [Image.fromarray(img) for img in images] | |
| yield pil_images[0] | |
| yield pil_images[0] | |
| if __name__ == "__main__": | |
| device = torch.device("cpu") | |
| model_id = "stabilityai/stable-diffusion-2-1" | |
| tokenizer_id = "laion/CLIP-ViT-H-14-laion2B-s32B-b79K" | |
| #noise_scheduler = diffusers.LMSDiscreteScheduler(beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear", num_train_timesteps=1000) | |
| noise_scheduler = diffusers.DPMSolverMultistepScheduler.from_pretrained(model_id,subfolder="scheduler") | |
| prompt = "A Hyperrealistic photograph of Italian architectural modern home in Italy, lens flares,\ | |
| cinematic, hdri, matte painting, concept art, celestial, soft render, highly detailed, octane\ | |
| render, architectural HD, HQ, 4k, 8k" | |
| vae, tokenizer, text_encoder, unet = build_pipeline( | |
| autoencoder = model_id, | |
| tokenizer=tokenizer_id, | |
| text_encoder=tokenizer_id, | |
| unet=model_id, | |
| device=device, | |
| ) | |
| image_iter = custom_stablediffusion_inference(vae, tokenizer, text_encoder, unet, noise_scheduler, prompt = prompt, device=device, seed = None) | |
| for i, image in enumerate(image_iter): | |
| image.save(f"step_{i}.png") | |