| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from typing import Any, Dict, List, Optional, Tuple, Union |
| |
|
| | import torch |
| | import torch.nn as nn |
| | import torch.utils.checkpoint |
| | from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection |
| |
|
| | from diffusers.configuration_utils import register_to_config |
| | from diffusers.image_processor import VaeImageProcessor |
| | from diffusers.models.autoencoders import AutoencoderKL |
| | from diffusers.models.unets.unet_2d_condition import UNet2DConditionModel, UNet2DConditionOutput |
| | from diffusers.pipelines.stable_diffusion import StableDiffusionPipeline |
| | from diffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker |
| | from diffusers.schedulers import KarrasDiffusionSchedulers |
| | from diffusers.utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers |
| |
|
| |
|
| | logger = logging.get_logger(__name__) |
| |
|
| |
|
| | class UNet2DConditionModelHighResFix(UNet2DConditionModel): |
| | r""" |
| | A conditional 2D UNet model that applies Kohya fix proposed for high resolution image generation. |
| | |
| | This model inherits from [`UNet2DConditionModel`]. Check the superclass documentation for learning about all the parameters. |
| | |
| | Parameters: |
| | high_res_fix (`List[Dict]`, *optional*, defaults to `[{'timestep': 600, 'scale_factor': 0.5, 'block_num': 1}]`): |
| | Enables Kohya fix for high resolution generation. The activation maps are scaled based on the scale_factor up to the timestep at specified block_num. |
| | """ |
| |
|
| | _supports_gradient_checkpointing = True |
| |
|
| | @register_to_config |
| | def __init__(self, high_res_fix: List[Dict] = [{"timestep": 600, "scale_factor": 0.5, "block_num": 1}], **kwargs): |
| | super().__init__(**kwargs) |
| | if high_res_fix: |
| | self.config.high_res_fix = sorted(high_res_fix, key=lambda x: x["timestep"], reverse=True) |
| |
|
| | @classmethod |
| | def _resize(cls, sample, target=None, scale_factor=1, mode="bicubic"): |
| | dtype = sample.dtype |
| | if dtype == torch.bfloat16: |
| | sample = sample.to(torch.float32) |
| |
|
| | if target is not None: |
| | if sample.shape[-2:] != target.shape[-2:]: |
| | sample = nn.functional.interpolate(sample, size=target.shape[-2:], mode=mode, align_corners=False) |
| | elif scale_factor != 1: |
| | sample = nn.functional.interpolate(sample, scale_factor=scale_factor, mode=mode, align_corners=False) |
| |
|
| | return sample.to(dtype) |
| |
|
| | def forward( |
| | self, |
| | sample: torch.FloatTensor, |
| | timestep: Union[torch.Tensor, float, int], |
| | encoder_hidden_states: torch.Tensor, |
| | class_labels: Optional[torch.Tensor] = None, |
| | timestep_cond: Optional[torch.Tensor] = None, |
| | attention_mask: Optional[torch.Tensor] = None, |
| | cross_attention_kwargs: Optional[Dict[str, Any]] = None, |
| | added_cond_kwargs: Optional[Dict[str, torch.Tensor]] = None, |
| | down_block_additional_residuals: Optional[Tuple[torch.Tensor]] = None, |
| | mid_block_additional_residual: Optional[torch.Tensor] = None, |
| | down_intrablock_additional_residuals: Optional[Tuple[torch.Tensor]] = None, |
| | encoder_attention_mask: Optional[torch.Tensor] = None, |
| | return_dict: bool = True, |
| | ) -> Union[UNet2DConditionOutput, Tuple]: |
| | r""" |
| | The [`UNet2DConditionModel`] forward method. |
| | |
| | Args: |
| | sample (`torch.FloatTensor`): |
| | The noisy input tensor with the following shape `(batch, channel, height, width)`. |
| | timestep (`torch.FloatTensor` or `float` or `int`): The number of timesteps to denoise an input. |
| | encoder_hidden_states (`torch.FloatTensor`): |
| | The encoder hidden states with shape `(batch, sequence_length, feature_dim)`. |
| | class_labels (`torch.Tensor`, *optional*, defaults to `None`): |
| | Optional class labels for conditioning. Their embeddings will be summed with the timestep embeddings. |
| | timestep_cond: (`torch.Tensor`, *optional*, defaults to `None`): |
| | Conditional embeddings for timestep. If provided, the embeddings will be summed with the samples passed |
| | through the `self.time_embedding` layer to obtain the timestep embeddings. |
| | attention_mask (`torch.Tensor`, *optional*, defaults to `None`): |
| | An attention mask of shape `(batch, key_tokens)` is applied to `encoder_hidden_states`. If `1` the mask |
| | is kept, otherwise if `0` it is discarded. Mask will be converted into a bias, which adds large |
| | negative values to the attention scores corresponding to "discard" tokens. |
| | cross_attention_kwargs (`dict`, *optional*): |
| | A kwargs dictionary that if specified is passed along to the `AttentionProcessor` as defined under |
| | `self.processor` in |
| | [diffusers.models.attention_processor](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py). |
| | added_cond_kwargs: (`dict`, *optional*): |
| | A kwargs dictionary containing additional embeddings that if specified are added to the embeddings that |
| | are passed along to the UNet blocks. |
| | down_block_additional_residuals: (`tuple` of `torch.Tensor`, *optional*): |
| | A tuple of tensors that if specified are added to the residuals of down unet blocks. |
| | mid_block_additional_residual: (`torch.Tensor`, *optional*): |
| | A tensor that if specified is added to the residual of the middle unet block. |
| | down_intrablock_additional_residuals (`tuple` of `torch.Tensor`, *optional*): |
| | additional residuals to be added within UNet down blocks, for example from T2I-Adapter side model(s) |
| | encoder_attention_mask (`torch.Tensor`): |
| | A cross-attention mask of shape `(batch, sequence_length)` is applied to `encoder_hidden_states`. If |
| | `True` the mask is kept, otherwise if `False` it is discarded. Mask will be converted into a bias, |
| | which adds large negative values to the attention scores corresponding to "discard" tokens. |
| | return_dict (`bool`, *optional*, defaults to `True`): |
| | Whether or not to return a [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] instead of a plain |
| | tuple. |
| | |
| | Returns: |
| | [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] or `tuple`: |
| | If `return_dict` is True, an [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] is returned, |
| | otherwise a `tuple` is returned where the first element is the sample tensor. |
| | """ |
| | |
| | |
| | |
| | |
| | default_overall_up_factor = 2**self.num_upsamplers |
| |
|
| | |
| | forward_upsample_size = False |
| | upsample_size = None |
| |
|
| | for dim in sample.shape[-2:]: |
| | if dim % default_overall_up_factor != 0: |
| | |
| | forward_upsample_size = True |
| | break |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if attention_mask is not None: |
| | |
| | |
| | |
| | |
| | attention_mask = (1 - attention_mask.to(sample.dtype)) * -10000.0 |
| | attention_mask = attention_mask.unsqueeze(1) |
| |
|
| | |
| | if encoder_attention_mask is not None: |
| | encoder_attention_mask = (1 - encoder_attention_mask.to(sample.dtype)) * -10000.0 |
| | encoder_attention_mask = encoder_attention_mask.unsqueeze(1) |
| |
|
| | |
| | if self.config.center_input_sample: |
| | sample = 2 * sample - 1.0 |
| |
|
| | |
| | t_emb = self.get_time_embed(sample=sample, timestep=timestep) |
| | emb = self.time_embedding(t_emb, timestep_cond) |
| | aug_emb = None |
| |
|
| | class_emb = self.get_class_embed(sample=sample, class_labels=class_labels) |
| | if class_emb is not None: |
| | if self.config.class_embeddings_concat: |
| | emb = torch.cat([emb, class_emb], dim=-1) |
| | else: |
| | emb = emb + class_emb |
| |
|
| | aug_emb = self.get_aug_embed( |
| | emb=emb, encoder_hidden_states=encoder_hidden_states, added_cond_kwargs=added_cond_kwargs |
| | ) |
| | if self.config.addition_embed_type == "image_hint": |
| | aug_emb, hint = aug_emb |
| | sample = torch.cat([sample, hint], dim=1) |
| |
|
| | emb = emb + aug_emb if aug_emb is not None else emb |
| |
|
| | if self.time_embed_act is not None: |
| | emb = self.time_embed_act(emb) |
| |
|
| | encoder_hidden_states = self.process_encoder_hidden_states( |
| | encoder_hidden_states=encoder_hidden_states, added_cond_kwargs=added_cond_kwargs |
| | ) |
| |
|
| | |
| | sample = self.conv_in(sample) |
| |
|
| | |
| | if cross_attention_kwargs is not None and cross_attention_kwargs.get("gligen", None) is not None: |
| | cross_attention_kwargs = cross_attention_kwargs.copy() |
| | gligen_args = cross_attention_kwargs.pop("gligen") |
| | cross_attention_kwargs["gligen"] = {"objs": self.position_net(**gligen_args)} |
| |
|
| | |
| | |
| | |
| | if cross_attention_kwargs is not None: |
| | cross_attention_kwargs = cross_attention_kwargs.copy() |
| | lora_scale = cross_attention_kwargs.pop("scale", 1.0) |
| | else: |
| | lora_scale = 1.0 |
| |
|
| | if USE_PEFT_BACKEND: |
| | |
| | scale_lora_layers(self, lora_scale) |
| |
|
| | is_controlnet = mid_block_additional_residual is not None and down_block_additional_residuals is not None |
| | |
| | is_adapter = down_intrablock_additional_residuals is not None |
| | |
| | |
| | |
| | if not is_adapter and mid_block_additional_residual is None and down_block_additional_residuals is not None: |
| | deprecate( |
| | "T2I should not use down_block_additional_residuals", |
| | "1.3.0", |
| | "Passing intrablock residual connections with `down_block_additional_residuals` is deprecated \ |
| | and will be removed in diffusers 1.3.0. `down_block_additional_residuals` should only be used \ |
| | for ControlNet. Please make sure use `down_intrablock_additional_residuals` instead. ", |
| | standard_warn=False, |
| | ) |
| | down_intrablock_additional_residuals = down_block_additional_residuals |
| | is_adapter = True |
| |
|
| | down_block_res_samples = (sample,) |
| | for down_i, downsample_block in enumerate(self.down_blocks): |
| | if hasattr(downsample_block, "has_cross_attention") and downsample_block.has_cross_attention: |
| | |
| | additional_residuals = {} |
| | if is_adapter and len(down_intrablock_additional_residuals) > 0: |
| | additional_residuals["additional_residuals"] = down_intrablock_additional_residuals.pop(0) |
| |
|
| | sample, res_samples = downsample_block( |
| | hidden_states=sample, |
| | temb=emb, |
| | encoder_hidden_states=encoder_hidden_states, |
| | attention_mask=attention_mask, |
| | cross_attention_kwargs=cross_attention_kwargs, |
| | encoder_attention_mask=encoder_attention_mask, |
| | **additional_residuals, |
| | ) |
| |
|
| | else: |
| | sample, res_samples = downsample_block(hidden_states=sample, temb=emb) |
| | if is_adapter and len(down_intrablock_additional_residuals) > 0: |
| | sample += down_intrablock_additional_residuals.pop(0) |
| |
|
| | down_block_res_samples += res_samples |
| |
|
| | |
| | if self.config.high_res_fix: |
| | for high_res_fix in self.config.high_res_fix: |
| | if timestep > high_res_fix["timestep"] and down_i == high_res_fix["block_num"]: |
| | sample = self.__class__._resize(sample, scale_factor=high_res_fix["scale_factor"]) |
| | break |
| |
|
| | if is_controlnet: |
| | new_down_block_res_samples = () |
| |
|
| | for down_block_res_sample, down_block_additional_residual in zip( |
| | down_block_res_samples, down_block_additional_residuals |
| | ): |
| | down_block_res_sample = down_block_res_sample + down_block_additional_residual |
| | new_down_block_res_samples = new_down_block_res_samples + (down_block_res_sample,) |
| |
|
| | down_block_res_samples = new_down_block_res_samples |
| |
|
| | |
| | if self.mid_block is not None: |
| | if hasattr(self.mid_block, "has_cross_attention") and self.mid_block.has_cross_attention: |
| | sample = self.mid_block( |
| | sample, |
| | emb, |
| | encoder_hidden_states=encoder_hidden_states, |
| | attention_mask=attention_mask, |
| | cross_attention_kwargs=cross_attention_kwargs, |
| | encoder_attention_mask=encoder_attention_mask, |
| | ) |
| | else: |
| | sample = self.mid_block(sample, emb) |
| |
|
| | |
| | if ( |
| | is_adapter |
| | and len(down_intrablock_additional_residuals) > 0 |
| | and sample.shape == down_intrablock_additional_residuals[0].shape |
| | ): |
| | sample += down_intrablock_additional_residuals.pop(0) |
| |
|
| | if is_controlnet: |
| | sample = sample + mid_block_additional_residual |
| |
|
| | |
| | for i, upsample_block in enumerate(self.up_blocks): |
| | is_final_block = i == len(self.up_blocks) - 1 |
| |
|
| | res_samples = down_block_res_samples[-len(upsample_block.resnets) :] |
| | down_block_res_samples = down_block_res_samples[: -len(upsample_block.resnets)] |
| |
|
| | |
| | if self.config.high_res_fix is not None: |
| | if res_samples[0].shape[-2:] != sample.shape[-2:]: |
| | sample = self.__class__._resize(sample, target=res_samples[0]) |
| | res_samples_up_sampled = (res_samples[0],) |
| | for res_sample in res_samples[1:]: |
| | res_samples_up_sampled += (self.__class__._resize(res_sample, target=res_samples[0]),) |
| | res_samples = res_samples_up_sampled |
| |
|
| | |
| | |
| | if not is_final_block and forward_upsample_size: |
| | upsample_size = down_block_res_samples[-1].shape[2:] |
| |
|
| | if hasattr(upsample_block, "has_cross_attention") and upsample_block.has_cross_attention: |
| | sample = upsample_block( |
| | hidden_states=sample, |
| | temb=emb, |
| | res_hidden_states_tuple=res_samples, |
| | encoder_hidden_states=encoder_hidden_states, |
| | cross_attention_kwargs=cross_attention_kwargs, |
| | upsample_size=upsample_size, |
| | attention_mask=attention_mask, |
| | encoder_attention_mask=encoder_attention_mask, |
| | ) |
| | else: |
| | sample = upsample_block( |
| | hidden_states=sample, |
| | temb=emb, |
| | res_hidden_states_tuple=res_samples, |
| | upsample_size=upsample_size, |
| | ) |
| |
|
| | |
| | if self.conv_norm_out: |
| | sample = self.conv_norm_out(sample) |
| | sample = self.conv_act(sample) |
| | sample = self.conv_out(sample) |
| |
|
| | if USE_PEFT_BACKEND: |
| | |
| | unscale_lora_layers(self, lora_scale) |
| |
|
| | if not return_dict: |
| | return (sample,) |
| |
|
| | return UNet2DConditionOutput(sample=sample) |
| |
|
| | @classmethod |
| | def from_unet(cls, unet: UNet2DConditionModel, high_res_fix: list): |
| | config = dict((unet.config)) |
| | config["high_res_fix"] = high_res_fix |
| | unet_high_res = cls(**config) |
| | unet_high_res.load_state_dict(unet.state_dict()) |
| | unet_high_res.to(unet.dtype) |
| | return unet_high_res |
| |
|
| |
|
| | EXAMPLE_DOC_STRING = """ |
| | Examples: |
| | ```py |
| | >>> import torch |
| | >>> from diffusers import DiffusionPipeline |
| | |
| | >>> pipe = DiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4", |
| | custom_pipeline="kohya_hires_fix", |
| | torch_dtype=torch.float16, |
| | high_res_fix=[{'timestep': 600, |
| | 'scale_factor': 0.5, |
| | 'block_num': 1}]) |
| | >>> pipe = pipe.to("cuda") |
| | |
| | >>> prompt = "a photo of an astronaut riding a horse on mars" |
| | >>> image = pipe(prompt, height=1000, width=1600).images[0] |
| | ``` |
| | """ |
| |
|
| |
|
| | class StableDiffusionHighResFixPipeline(StableDiffusionPipeline): |
| | r""" |
| | Pipeline for text-to-image generation using Stable Diffusion with Kohya fix for high resolution generation. |
| | |
| | This model inherits from [`StableDiffusionPipeline`]. Check the superclass documentation for the generic methods. |
| | |
| | The pipeline also inherits the following loading methods: |
| | - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings |
| | - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights |
| | - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights |
| | - [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files |
| | - [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters |
| | |
| | Args: |
| | vae ([`AutoencoderKL`]): |
| | Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations. |
| | text_encoder ([`~transformers.CLIPTextModel`]): |
| | Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)). |
| | tokenizer ([`~transformers.CLIPTokenizer`]): |
| | A `CLIPTokenizer` to tokenize text. |
| | unet ([`UNet2DConditionModel`]): |
| | A `UNet2DConditionModel` to denoise the encoded image latents. |
| | scheduler ([`SchedulerMixin`]): |
| | A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of |
| | [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. |
| | safety_checker ([`StableDiffusionSafetyChecker`]): |
| | Classification module that estimates whether generated images could be considered offensive or harmful. |
| | Please refer to the [model card](https://huggingface.co/stable-diffusion-v1-5/stable-diffusion-v1-5) for more details |
| | about a model's potential harms. |
| | feature_extractor ([`~transformers.CLIPImageProcessor`]): |
| | A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`. |
| | high_res_fix (`List[Dict]`, *optional*, defaults to `[{'timestep': 600, 'scale_factor': 0.5, 'block_num': 1}]`): |
| | Enables Kohya fix for high resolution generation. The activation maps are scaled based on the scale_factor up to the timestep at specified block_num. |
| | """ |
| |
|
| | model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae" |
| | _optional_components = ["safety_checker", "feature_extractor", "image_encoder"] |
| | _exclude_from_cpu_offload = ["safety_checker"] |
| | _callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"] |
| |
|
| | def __init__( |
| | self, |
| | vae: AutoencoderKL, |
| | text_encoder: CLIPTextModel, |
| | tokenizer: CLIPTokenizer, |
| | unet: UNet2DConditionModel, |
| | scheduler: KarrasDiffusionSchedulers, |
| | safety_checker: StableDiffusionSafetyChecker, |
| | feature_extractor: CLIPImageProcessor, |
| | image_encoder: CLIPVisionModelWithProjection = None, |
| | requires_safety_checker: bool = True, |
| | high_res_fix: List[Dict] = [{"timestep": 600, "scale_factor": 0.5, "block_num": 1}], |
| | ): |
| | super().__init__( |
| | vae=vae, |
| | text_encoder=text_encoder, |
| | tokenizer=tokenizer, |
| | unet=unet, |
| | scheduler=scheduler, |
| | safety_checker=safety_checker, |
| | feature_extractor=feature_extractor, |
| | image_encoder=image_encoder, |
| | requires_safety_checker=requires_safety_checker, |
| | ) |
| |
|
| | unet = UNet2DConditionModelHighResFix.from_unet(unet=unet, high_res_fix=high_res_fix) |
| | self.register_modules( |
| | vae=vae, |
| | text_encoder=text_encoder, |
| | tokenizer=tokenizer, |
| | unet=unet, |
| | scheduler=scheduler, |
| | safety_checker=safety_checker, |
| | feature_extractor=feature_extractor, |
| | image_encoder=image_encoder, |
| | ) |
| | self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1) if getattr(self, "vae", None) else 8 |
| | self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) |
| | self.register_to_config(requires_safety_checker=requires_safety_checker) |
| |
|