Spaces:
Sleeping
Sleeping
| import os | |
| import torch | |
| import numpy as np | |
| import cv2 | |
| from diffusers import DiffusionPipeline, StableDiffusionPipeline | |
| from diffusers import StableDiffusionControlNetPipeline, ControlNetModel | |
| from diffusers import DPMSolverMultistepScheduler, UniPCMultistepScheduler | |
| from diffusers import AutoencoderKL | |
| from PIL import Image | |
| ################################################## | |
| def make_canny_condition(image): | |
| image = np.array(image) | |
| image = cv2.Canny(image, 100, 200) | |
| image = image[:, :, None] | |
| image = np.concatenate([image, image, image], axis=2) | |
| return image | |
| def make_merged_canny(composition_image, reference_image, alpha): | |
| if isinstance(composition_image, Image.Image): | |
| composition_image = np.array(composition_image) | |
| if isinstance(reference_image, Image.Image): | |
| reference_image = np.array(reference_image) | |
| composition_image = cv2.resize(composition_image, reference_image.shape[1::-1]) | |
| composition_canny = make_canny_condition(composition_image) | |
| reference_canny = make_canny_condition(reference_image) | |
| control_canny = cv2.addWeighted(composition_canny, alpha, reference_canny, (1.0 - alpha), 0.0) | |
| return control_canny | |
| ################################################## | |
| class SDHelper: | |
| def __init__(self, config) -> None: | |
| self.setup_config(config) | |
| def get_stable_diffusion_models(self): | |
| # "runwayml/stable-diffusion-v1-5", "stabilityai/stable-diffusion-2-1", "stabilityai/stable-diffusion-xl-base-1.0" | |
| return { | |
| '1.5': 'runwayml/stable-diffusion-v1-5', | |
| '2.1': 'stabilityai/stable-diffusion-2-1', | |
| 'xl': 'stabilityai/stable-diffusion-xl-base-1.0', | |
| } | |
| # controlnet = ControlNetModel.from_pretrained('lllyasviel/control_v11p_sd15_seg', torch_dtype=torch.float16) | |
| # pipe = StableDiffusionControlNetPipeline.from_pretrained(config.model_id, controlnet=self.controlnet, torch_dtype=torch.float16) | |
| # def load_model(self, module, model_id, **kwargs): | |
| # local_fn = os.path.join(self.config.model_dir, model_id) | |
| # if os.path.exists(local_fn): | |
| # controlnet = module.from_pretrained(local_fn, **kwargs) | |
| # else: | |
| # controlnet = module.from_pretrained(model_id, **kwargs) | |
| # controlnet.save_pretrained(local_fn) | |
| # return controlnet | |
| # hugging face | |
| def load_model(self, module, model_id, **kwargs): | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| if torch.cuda.is_available(): | |
| torch.cuda.max_memory_allocated(device=device) | |
| m = module.from_pretrained(model_id, torch_dtype=torch.float16, variant="fp16", | |
| **kwargs) | |
| m.enable_xformers_memory_efficient_attention() | |
| m = m.to(device) | |
| else: | |
| m = module.from_pretrained(model_id, **kwargs) | |
| m = m.to(device) | |
| return m | |
| def setup_config(self, config): | |
| self.config = config | |
| # ae | |
| if config.get('vae', None) is not None: | |
| vae = self.load_model(AutoencoderKL, config.vae) | |
| else: | |
| vae = None | |
| # with controlnet | |
| if config.get('controlnet_id', None) is not None: | |
| self.controlnet = self.load_model(ControlNetModel, config.controlnet_id) | |
| self.controlnet_conditioning_scale = config.get('controlnet_conditioning_scale', 1.0) | |
| pipe = self.load_model(StableDiffusionControlNetPipeline, config.model_id, | |
| controlnet=self.controlnet) | |
| # w/o controlnet | |
| else: | |
| self.controlnet = None | |
| # stable diffusion pipeline | |
| if config.model_id == 'stabilityai/stable-diffusion-xl-base-1.0': | |
| # sdxl | |
| pipe = self.load_model(DiffusionPipeline, config.model_id) | |
| else: | |
| # sd 1.5, 2.1 | |
| pipe = self.load_model(StableDiffusionPipeline, config.model_id) | |
| # scheduler | |
| if config.scheduler == 'DPMSolverMultistepScheduler': | |
| pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config) | |
| elif config.scheduler == 'UniPCMultistepScheduler': | |
| pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config) | |
| self.pipe = pipe | |
| def forward(self, prompt, control_image=None): | |
| if isinstance(control_image, np.ndarray): | |
| control_image = Image.fromarray(control_image) | |
| num_images_per_prompt = self.config.get('num_images_per_prompt', 4) | |
| if control_image is None: | |
| images = self.pipe(prompt, num_images_per_prompt=num_images_per_prompt).images | |
| else: | |
| images = self.pipe(prompt, | |
| num_inference_steps=self.config.get('num_inference_steps', 50), | |
| image=control_image, | |
| num_images_per_prompt=num_images_per_prompt, | |
| controlnet_conditioning_scale=self.controlnet_conditioning_scale, | |
| ).images | |
| return images | |
| ########## | |