Spaces:
Sleeping
Sleeping
| import diffusers | |
| import transformers | |
| import utils.log | |
| import torch | |
| import PIL | |
| from typing import Union, Dict, Any, Optional, List, Tuple, Callable | |
| import os | |
| import re | |
| class SimpleDiffusion(diffusers.DiffusionPipeline): | |
| """ | |
| An unified interface for diffusion models. This allow us to use : | |
| - txt2img | |
| - img2img | |
| - inpainting | |
| - unconditional image generation | |
| This class is highly inspired from the Stable-Diffusion-Mega pipeline. | |
| DiffusionPipeline class allow us to load/download all the models hubbed by HuggingFace with an ease. Read more information | |
| about the DiffusionPipeline class here: https://huggingface.co/transformers/main_classes/pipelines.html#transformers.DiffusionPipeline | |
| Args: | |
| logger (:obj:`utils.log.Logger`): | |
| The logger to use for logging any information. | |
| vae ([`AutoencoderKL`]): | |
| Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations. | |
| text_encoder ([`CLIPTextModel`]): | |
| Frozen text-encoder. Stable Diffusion uses the text portion of | |
| [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically | |
| the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant. | |
| tokenizer (`CLIPTokenizer`): | |
| Tokenizer of class | |
| [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). | |
| unet ([`UNet2DConditionModel`]): | |
| Conditional U-Net architecture to denoise the encoded image latents. | |
| scheduler ([`SchedulerMixin`]): | |
| A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of | |
| [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. | |
| safety_checker ([`StableDiffusionMegaSafetyChecker`]): | |
| Classification module that estimates whether generated images could be considered offensive or harmful. | |
| Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details. | |
| feature_extractor ([`CLIPFeatureExtractor`]): | |
| Model that extracts features from generated images to be used as inputs for the `safety_checker`. | |
| """ | |
| def __init__( | |
| self, | |
| vae: diffusers.AutoencoderKL, | |
| text_encoder: transformers.CLIPTextModel, | |
| tokenizer: transformers.CLIPTokenizer, | |
| unet: diffusers.UNet2DConditionModel, | |
| scheduler: Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler], | |
| safety_checker: diffusers.pipelines.stable_diffusion.safety_checker.StableDiffusionSafetyChecker, | |
| feature_extractor: transformers.CLIPFeatureExtractor, | |
| prompt_generation = "succinctly/text2image-prompt-generator" | |
| ): | |
| super().__init__() | |
| self._logger = None | |
| self.register_modules( # already defined in ConfigMixin class, from_pretrained loads these modules | |
| vae=vae, | |
| text_encoder=text_encoder, | |
| tokenizer=tokenizer, | |
| unet=unet, | |
| scheduler=scheduler, | |
| safety_checker=safety_checker, | |
| feature_extractor=feature_extractor, | |
| ) | |
| self._generated_prompts = [] | |
| self._enable_prompt_generation = False | |
| if prompt_generation: | |
| self._enable_prompt_generation = True | |
| self._prompt_generator = transformers.pipeline('text-generation', model='Gustavosta/MagicPrompt-Stable-Diffusion', tokenizer='gpt2') | |
| def _generate_prompt(self, prompt, **kwargs): | |
| """ | |
| Generate a prompt from a given text. | |
| Args: | |
| prompt (str): The text to generate a prompt from. | |
| **kwargs: Additional keyword arguments passed to the prompt generator pipeline. | |
| """ | |
| max_length = kwargs.pop("max_length", None) | |
| num_return_sequences = kwargs.pop("num_return_sequences", None) | |
| prompt = self._prompt_generator(prompt, max_length=max_length, num_return_sequences=num_return_sequences) | |
| prompt = self._process_prompt(prompt, **kwargs) | |
| return prompt[0]['generated_text'] | |
| def _process_prompt(self,original_prompt, prompt_list): | |
| # TODO : Add documentation; add more prompt processing | |
| response_list = [] | |
| for x in prompt_list: | |
| resp = x['generated_text'].strip() | |
| if resp != original_prompt and len(resp) > (len(original_prompt) + 4) and resp.endswith((":", "-", "—")) is False: | |
| response_list.append(resp+'\n') | |
| response_end = "\n".join(response_list) | |
| response_end = re.sub('[^ ]+\.[^ ]+','', response_end) | |
| response_end = response_end.replace("<", "").replace(">", "") | |
| if response_end != "": | |
| return response_end | |
| # Following components are required for the DiffusionPipeline class - but they exist in the StableDiffusionModel class | |
| def enable_attention_slicing(self, slice_size: Optional[Union[str, int]] = "auto"): | |
| r""" | |
| Enable sliced attention computation. | |
| Refer to the [StableDiffusionModel](https://github.com/huggingface/diffusers/blob/main/examples/community/stable_diffusion_mega.py) repo | |
| for more information. | |
| When this option is enabled, the attention module will split the input tensor in slices, to compute attention | |
| in several steps. This is useful to save some memory in exchange for a small speed decrease. | |
| Args: | |
| slice_size (`str` or `int`, *optional*, defaults to `"auto"`): | |
| When `"auto"`, halves the input to the attention heads, so attention will be computed in two steps. If | |
| a number is provided, uses as many slices as `attention_head_dim // slice_size`. In this case, | |
| `attention_head_dim` must be a multiple of `slice_size`. | |
| """ | |
| if slice_size == "auto": | |
| # half the attention head size is usually a good trade-off between | |
| # speed and memory | |
| if self._logger is not None: | |
| self._logger.info("Attention slicing enabled!") | |
| slice_size = self.unet.config.attention_head_dim // 2 | |
| self.unet.set_attention_slice(slice_size) | |
| def disable_attention_slicing(self): | |
| r""" | |
| Disable sliced attention computation. If `enable_attention_slicing` was previously invoked, this method will go | |
| back to computing attention in one step. | |
| """ | |
| if self._logger is not None: | |
| self._logger.info("Attention slicing disabled!") | |
| self.enable_attention_slicing(None) | |
| def set_logger(self, logger): | |
| r""" | |
| Set logger. This is useful to log information about the model. | |
| """ | |
| self._logger = logger | |
| def components(self) -> Dict[str, Any]: | |
| # Return the non-private variables | |
| return {k : getattr(self, k) for k in self.config.keys() if not k.startswith("_")} | |
| def inpaint( | |
| self, | |
| prompt: Union[str, List[str]], | |
| init_image: Union[torch.FloatTensor, PIL.Image.Image], | |
| mask_image: Union[torch.FloatTensor, PIL.Image.Image], | |
| strength: float = 0.8, | |
| num_inference_steps: Optional[int] = 50, | |
| guidance_scale: Optional[float] = 7.5, | |
| negative_prompt: Optional[Union[str, List[str]]] = None, | |
| num_images_per_prompt: Optional[int] = 1, | |
| eta: Optional[float] = 0.0, | |
| generator: Optional[torch.Generator] = None, | |
| output_type: Optional[str] = "pil", | |
| return_dict: bool = True, | |
| callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
| callback_steps: Optional[int] = 1, | |
| **kwargs, | |
| ): | |
| if self._enable_prompt_generation: | |
| prompt = self._generate_prompt(p, **kwargs)[0] | |
| self._logger.info(f"Generated prompt: {prompt}") | |
| # For more information on how this function works, please see: https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion#diffusers.StableDiffusionImg2ImgPipeline | |
| return diffusers.StableDiffusionInpaintPipelineLegacy(**self.components)( | |
| prompt=prompt, | |
| init_image=init_image, | |
| mask_image=mask_image, | |
| strength=strength, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| negative_prompt=negative_prompt, | |
| num_images_per_prompt=num_images_per_prompt, | |
| eta=eta, | |
| generator=generator, | |
| output_type=output_type, | |
| return_dict=return_dict, | |
| callback=callback, | |
| ) | |
| def img2img( | |
| self, | |
| prompt: Union[str, List[str]], | |
| init_image: Union[torch.FloatTensor, PIL.Image.Image], | |
| strength: float = 0.8, | |
| num_inference_steps: Optional[int] = 50, | |
| guidance_scale: Optional[float] = 7.5, | |
| negative_prompt: Optional[Union[str, List[str]]] = None, | |
| num_images_per_prompt: Optional[int] = 1, | |
| eta: Optional[float] = 0.0, | |
| generator: Optional[torch.Generator] = None, | |
| output_type: Optional[str] = "pil", | |
| return_dict: bool = True, | |
| callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
| callback_steps: Optional[int] = 1, | |
| **kwargs, | |
| ): | |
| if self._enable_prompt_generation: | |
| prompt = self._generate_prompt(p, **kwargs)[0] | |
| self._logger.info(f"Generated prompt: {prompt}") | |
| # For more information on how this function works, please see: https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion#diffusers.StableDiffusionImg2ImgPipeline | |
| return diffusers.StableDiffusionImg2ImgPipeline(**self.components)( | |
| prompt=prompt, | |
| init_image=init_image, | |
| strength=strength, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| negative_prompt=negative_prompt, | |
| num_images_per_prompt=num_images_per_prompt, | |
| eta=eta, | |
| generator=generator, | |
| output_type=output_type, | |
| return_dict=return_dict, | |
| callback=callback, | |
| callback_steps=callback_steps, | |
| ) | |
| def text2img( | |
| self, | |
| prompt: Union[str, List[str]], | |
| height: int = 512, | |
| width: int = 512, | |
| num_inference_steps: int = 50, | |
| guidance_scale: float = 7.5, | |
| negative_prompt: Optional[Union[str, List[str]]] = None, | |
| num_images_per_prompt: Optional[int] = 1, | |
| eta: float = 0.0, | |
| generator: Optional[torch.Generator] = None, | |
| latents: Optional[torch.FloatTensor] = None, | |
| output_type: Optional[str] = "pil", | |
| return_dict: bool = True, | |
| callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
| callback_steps: Optional[int] = 1, | |
| ): | |
| if self._enable_prompt_generation: | |
| prompt = self._generate_prompt(p, **kwargs)[0] | |
| self._logger.info(f"Generated prompt: {prompt}") | |
| # For more information on how this function https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion#diffusers.StableDiffusionPipeline | |
| return diffusers.StableDiffusionPipeline(**self.components)( | |
| prompt=prompt, | |
| height=height, | |
| width=width, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| negative_prompt=negative_prompt, | |
| num_images_per_prompt=num_images_per_prompt, | |
| eta=eta, | |
| generator=generator, | |
| latents=latents, | |
| output_type=output_type, | |
| return_dict=return_dict, | |
| callback=callback, | |
| callback_steps=callback_steps, | |
| ) | |
| def upscale( | |
| self, | |
| prompt: Union[str, List[str]], | |
| init_image: Union[torch.FloatTensor, PIL.Image.Image], | |
| num_inference_steps: Optional[int] = 75, | |
| guidance_scale: Optional[float] = 9.0, | |
| negative_prompt: Optional[Union[str, List[str]]] = None, | |
| num_images_per_prompt: Optional[int] = 1, | |
| eta: Optional[float] = 0.0, | |
| generator: Optional[torch.Generator] = None, | |
| latents: Optional[torch.FloatTensor] = None, | |
| output_type: Optional[str] = "pil", | |
| return_dict: bool = True, | |
| callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None, | |
| callback_steps: Optional[int] = 1, | |
| ): | |
| """ | |
| Upscale an image using the StableDiffusionUpscalePipeline. | |
| """ | |
| if self._enable_prompt_generation: | |
| prompt = self._generate_prompt(p, **kwargs)[0] | |
| self._logger.info(f"Generated prompt: {prompt}") | |
| return diffusers.StableDiffusionUpscalePipeline(**self.components)( | |
| prompt=prompt, | |
| image=init_image, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| negative_prompt=negative_prompt, | |
| num_images_per_prompt=num_images_per_prompt, | |
| eta=eta, | |
| generator=generator, | |
| latents=latents, | |
| output_type = output_type, | |
| return_dict=return_dict, | |
| callback=callback, | |
| callback_steps=callback_steps) | |
| def set_scheduler(self, scheduler: Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler, diffusers.EulerDiscreteScheduler]): | |
| """ | |
| Set the scheduler for the pipeline. This is useful for controlling the diffusion process. | |
| Args: | |
| scheduler (Union[diffusers.DDIMScheduler, diffusers.PNDMScheduler, diffusers.LMSDiscreteScheduler]): The scheduler to use. | |
| """ | |
| self.components["scheduler"] = scheduler |