""" FOFPred Diffusion Pipeline. Modified from OmniGen2 Diffusion Pipeline (By OmniGen2 Team and The HuggingFace Team). Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import inspect import os import warnings from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np import PIL.Image import torch import torch.nn as nn import torch.nn.functional as F from diffusers.configuration_utils import register_to_config from diffusers.image_processor import ( PipelineImageInput, VaeImageProcessor, is_valid_image_imagelist, ) from diffusers.loaders.lora_base import ( # noqa LoraBaseMixin, _fetch_state_dict, ) from diffusers.loaders.lora_conversion_utils import ( _convert_non_diffusers_lumina2_lora_to_diffusers, ) from diffusers.models.autoencoders import AutoencoderKL from diffusers.models.embeddings import get_1d_rotary_pos_embed from diffusers.pipelines.pipeline_utils import DiffusionPipeline from diffusers.utils import ( USE_PEFT_BACKEND, BaseOutput, is_peft_available, is_peft_version, is_torch_version, is_torch_xla_available, is_transformers_available, is_transformers_version, logging, ) from diffusers.utils.torch_utils import randn_tensor from einops import repeat from huggingface_hub.utils import validate_hf_hub_args from transformers import Qwen2_5_VLForConditionalGeneration from .scheduler.scheduler_fofpred import FlowMatchEulerDiscreteScheduler from .transformer.transformer_fofpred import OmniGen2Transformer3DModel logger = logging.get_logger(__name__) # pylint: disable=invalid-name _LOW_CPU_MEM_USAGE_DEFAULT_LORA = False if is_torch_version(">=", "1.9.0"): if ( is_peft_available() and is_peft_version(">=", "0.13.1") and is_transformers_available() and is_transformers_version(">", "4.45.2") ): _LOW_CPU_MEM_USAGE_DEFAULT_LORA = True if is_torch_xla_available(): XLA_AVAILABLE = True else: XLA_AVAILABLE = False TRANSFORMER_NAME = "transformer" class OmniGen2ImageProcessor(VaeImageProcessor): """ Image processor for PixArt image resize and crop. Args: do_resize (`bool`, *optional*, defaults to `True`): Whether to downscale the image's (height, width) dimensions to multiples of `vae_scale_factor`. Can accept `height` and `width` arguments from [`image_processor.VaeImageProcessor.preprocess`] method. vae_scale_factor (`int`, *optional*, defaults to `8`): VAE scale factor. If `do_resize` is `True`, the image is automatically resized to multiples of this factor. resample (`str`, *optional*, defaults to `lanczos`): Resampling filter to use when resizing the image. do_normalize (`bool`, *optional*, defaults to `True`): Whether to normalize the image to [-1,1]. do_binarize (`bool`, *optional*, defaults to `False`): Whether to binarize the image to 0/1. do_convert_rgb (`bool`, *optional*, defaults to be `False`): Whether to convert the images to RGB format. do_convert_grayscale (`bool`, *optional*, defaults to be `False`): Whether to convert the images to grayscale format. """ @register_to_config def __init__( self, do_resize: bool = True, vae_scale_factor: int = 16, resample: str = "lanczos", max_pixels: Optional[int] = None, max_side_length: Optional[int] = None, do_normalize: bool = True, do_binarize: bool = False, do_convert_grayscale: bool = False, ): super().__init__( do_resize=do_resize, vae_scale_factor=vae_scale_factor, resample=resample, do_normalize=do_normalize, do_binarize=do_binarize, do_convert_grayscale=do_convert_grayscale, ) self.max_pixels = max_pixels self.max_side_length = max_side_length def get_new_height_width( self, image: Union[PIL.Image.Image, np.ndarray, torch.Tensor], height: Optional[int] = None, width: Optional[int] = None, max_pixels: Optional[int] = None, max_side_length: Optional[int] = None, ) -> Tuple[int, int]: r""" Returns the height and width of the image, downscaled to the next integer multiple of `vae_scale_factor`. Args: image (`Union[PIL.Image.Image, np.ndarray, torch.Tensor]`): The image input, which can be a PIL image, NumPy array, or PyTorch tensor. If it is a NumPy array, it should have shape `[batch, height, width]` or `[batch, height, width, channels]`. If it is a PyTorch tensor, it should have shape `[batch, channels, height, width]`. height (`Optional[int]`, *optional*, defaults to `None`): The height of the preprocessed image. If `None`, the height of the `image` input will be used. width (`Optional[int]`, *optional*, defaults to `None`): The width of the preprocessed image. If `None`, the width of the `image` input will be used. Returns: `Tuple[int, int]`: A tuple containing the height and width, both resized to the nearest integer multiple of `vae_scale_factor`. """ if height is None: if isinstance(image, PIL.Image.Image): height = image.height elif isinstance(image, torch.Tensor): height = image.shape[2] else: height = image.shape[1] if width is None: if isinstance(image, PIL.Image.Image): width = image.width elif isinstance(image, torch.Tensor): width = image.shape[3] else: width = image.shape[2] if max_side_length is None: max_side_length = self.max_side_length if max_pixels is None: max_pixels = self.max_pixels ratio = 1.0 if max_side_length is not None: if height > width: max_side_length_ratio = max_side_length / height else: max_side_length_ratio = max_side_length / width cur_pixels = height * width max_pixels_ratio = (max_pixels / cur_pixels) ** 0.5 ratio = min( max_pixels_ratio, max_side_length_ratio, 1.0 ) # do not upscale input image new_height, new_width = ( int(height * ratio) // self.config.vae_scale_factor * self.config.vae_scale_factor, int(width * ratio) // self.config.vae_scale_factor * self.config.vae_scale_factor, ) return new_height, new_width def preprocess( self, image: PipelineImageInput, height: Optional[int] = None, width: Optional[int] = None, max_pixels: Optional[int] = None, max_side_length: Optional[int] = None, resize_mode: str = "default", # "default", "fill", "crop" crops_coords: Optional[Tuple[int, int, int, int]] = None, ) -> torch.Tensor: """ Preprocess the image input. Args: image (`PipelineImageInput`): The image input, accepted formats are PIL images, NumPy arrays, PyTorch tensors; Also accept list of supported formats. height (`int`, *optional*): The height in preprocessed image. If `None`, will use the `get_default_height_width()` to get default height. width (`int`, *optional*): The width in preprocessed. If `None`, will use get_default_height_width()` to get the default width. resize_mode (`str`, *optional*, defaults to `default`): The resize mode, can be one of `default` or `fill`. If `default`, will resize the image to fit within the specified width and height, and it may not maintaining the original aspect ratio. If `fill`, will resize the image to fit within the specified width and height, maintaining the aspect ratio, and then center the image within the dimensions, filling empty with data from image. If `crop`, will resize the image to fit within the specified width and height, maintaining the aspect ratio, and then center the image within the dimensions, cropping the excess. Note that resize_mode `fill` and `crop` are only supported for PIL image input. crops_coords (`List[Tuple[int, int, int, int]]`, *optional*, defaults to `None`): The crop coordinates for each image in the batch. If `None`, will not crop the image. Returns: `torch.Tensor`: The preprocessed image. """ supported_formats = (PIL.Image.Image, np.ndarray, torch.Tensor) # Expand the missing dimension for 3-dimensional pytorch tensor or numpy array that represents grayscale image if ( self.config.do_convert_grayscale and isinstance(image, (torch.Tensor, np.ndarray)) and image.ndim == 3 ): if isinstance(image, torch.Tensor): # if image is a pytorch tensor could have 2 possible shapes: # 1. batch x height x width: we should insert the channel dimension at position 1 # 2. channel x height x width: we should insert batch dimension at position 0, # however, since both channel and batch dimension has same size 1, it is same to insert at position 1 # for simplicity, we insert a dimension of size 1 at position 1 for both cases image = image.unsqueeze(1) else: # if it is a numpy array, it could have 2 possible shapes: # 1. batch x height x width: insert channel dimension on last position # 2. height x width x channel: insert batch dimension on first position if image.shape[-1] == 1: image = np.expand_dims(image, axis=0) else: image = np.expand_dims(image, axis=-1) if ( isinstance(image, list) and isinstance(image[0], np.ndarray) and image[0].ndim == 4 ): warnings.warn( "Passing `image` as a list of 4d np.ndarray is deprecated." "Please concatenate the list along the batch dimension and pass it as a single 4d np.ndarray", FutureWarning, ) image = np.concatenate(image, axis=0) if ( isinstance(image, list) and isinstance(image[0], torch.Tensor) and image[0].ndim == 4 ): warnings.warn( "Passing `image` as a list of 4d torch.Tensor is deprecated." "Please concatenate the list along the batch dimension and pass it as a single 4d torch.Tensor", FutureWarning, ) image = torch.cat(image, axis=0) if not is_valid_image_imagelist(image): raise ValueError( f"Input is in incorrect format. Currently, we only support {', '.join(str(x) for x in supported_formats)}" ) if not isinstance(image, list): image = [image] if isinstance(image[0], PIL.Image.Image): if crops_coords is not None: image = [i.crop(crops_coords) for i in image] if self.config.do_resize: height, width = self.get_new_height_width( image[0], height, width, max_pixels, max_side_length ) image = [ self.resize(i, height, width, resize_mode=resize_mode) for i in image ] if self.config.do_convert_rgb: image = [self.convert_to_rgb(i) for i in image] elif self.config.do_convert_grayscale: image = [self.convert_to_grayscale(i) for i in image] image = self.pil_to_numpy(image) # to np image = self.numpy_to_pt(image) # to pt elif isinstance(image[0], np.ndarray): image = ( np.concatenate(image, axis=0) if image[0].ndim == 4 else np.stack(image, axis=0) ) image = self.numpy_to_pt(image) height, width = self.get_new_height_width( image, height, width, max_pixels, max_side_length ) if self.config.do_resize: image = self.resize(image, height, width) elif isinstance(image[0], torch.Tensor): image = ( torch.cat(image, axis=0) if image[0].ndim == 4 else torch.stack(image, axis=0) ) if self.config.do_convert_grayscale and image.ndim == 3: image = image.unsqueeze(1) channel = image.shape[1] # don't need any preprocess if the image is latents if channel == self.config.vae_latent_channels: return image height, width = self.get_new_height_width( image, height, width, max_pixels, max_side_length ) if self.config.do_resize: image = self.resize(image, height, width) # expected range [0,1], normalize to [-1,1] do_normalize = self.config.do_normalize if do_normalize and image.min() < 0: warnings.warn( "Passing `image` as torch tensor with value range in [-1,1] is deprecated. The expected value range for image tensor is [0,1] " f"when passing as pytorch tensor or numpy Array. You passed `image` with value range [{image.min()},{image.max()}]", FutureWarning, ) do_normalize = False if do_normalize: image = self.normalize(image) if self.config.do_binarize: image = self.binarize(image) return image @dataclass class TeaCacheParams: """ TeaCache parameters for `OmniGen2Transformer3DModel` See https://github.com/ali-vilab/TeaCache/ for a more comprehensive understanding Args: previous_residual (Optional[torch.Tensor]): The tensor difference between the output and the input of the transformer layers from the previous timestep. previous_modulated_inp (Optional[torch.Tensor]): The modulated input from the previous timestep used to indicate the change of the transformer layer's output. accumulated_rel_l1_distance (float): The accumulated relative L1 distance. is_first_or_last_step (bool): Whether the current timestep is the first or last step. """ previous_residual: Optional[torch.Tensor] = None previous_modulated_inp: Optional[torch.Tensor] = None accumulated_rel_l1_distance: float = 0 is_first_or_last_step: bool = False class OmniGen2RotaryPosEmbed(nn.Module): def __init__( self, theta: int, axes_dim: Tuple[int, int, int], axes_lens: Tuple[int, int, int] = (300, 512, 512), patch_size: int = 2, ): super().__init__() self.theta = theta self.axes_dim = axes_dim self.axes_lens = axes_lens self.patch_size = patch_size @staticmethod def get_freqs_cis( axes_dim: Tuple[int, int, int], axes_lens: Tuple[int, int, int], theta: int ) -> List[torch.Tensor]: freqs_cis = [] freqs_dtype = ( torch.float32 if torch.backends.mps.is_available() else torch.float64 ) for i, (d, e) in enumerate(zip(axes_dim, axes_lens)): emb = get_1d_rotary_pos_embed(d, e, theta=theta, freqs_dtype=freqs_dtype) freqs_cis.append(emb) return freqs_cis def _get_freqs_cis(self, freqs_cis, ids: torch.Tensor) -> torch.Tensor: device = ids.device if ids.device.type == "mps": ids = ids.to("cpu") result = [] for i in range(len(self.axes_dim)): freqs = freqs_cis[i].to(ids.device) index = ids[:, :, i : i + 1].repeat(1, 1, freqs.shape[-1]).to(torch.int64) result.append( torch.gather( freqs.unsqueeze(0).repeat(index.shape[0], 1, 1), dim=1, index=index ) ) return torch.cat(result, dim=-1).to(device) def forward( self, freqs_cis, attention_mask, l_effective_ref_img_len, l_effective_img_len, ref_img_sizes, img_sizes, device, ): batch_size = len(attention_mask) p = self.patch_size encoder_seq_len = attention_mask.shape[1] l_effective_cap_len = attention_mask.sum(dim=1).tolist() if isinstance(l_effective_img_len[0], list): # Check for t-dim case seq_lengths = [ cap_len + sum(ref_img_len) + sum(img_len) for cap_len, ref_img_len, img_len in zip( l_effective_cap_len, l_effective_ref_img_len, l_effective_img_len ) ] else: # Original case seq_lengths = [ cap_len + sum(ref_img_len) + img_len for cap_len, ref_img_len, img_len in zip( l_effective_cap_len, l_effective_ref_img_len, l_effective_img_len ) ] max_seq_len = max(seq_lengths) max_ref_img_len = max( [sum(ref_img_len) for ref_img_len in l_effective_ref_img_len] ) if isinstance(l_effective_img_len[0], list): max_img_len = max([sum(ln) for ln in l_effective_img_len]) else: max_img_len = max(l_effective_img_len) # Create position IDs position_ids = torch.zeros( batch_size, max_seq_len, 3, dtype=torch.int32, device=device ) for i, (cap_seq_len, seq_len) in enumerate( zip(l_effective_cap_len, seq_lengths) ): # add text position ids position_ids[i, :cap_seq_len] = repeat( torch.arange(cap_seq_len, dtype=torch.int32, device=device), "l -> l 3" ) pe_shift = cap_seq_len pe_shift_len = cap_seq_len if ref_img_sizes[i] is not None: for ref_img_size, ref_img_len in zip( ref_img_sizes[i], l_effective_ref_img_len[i] ): H, W = ref_img_size ref_H_tokens, ref_W_tokens = H // p, W // p assert ref_H_tokens * ref_W_tokens == ref_img_len # add image position ids row_ids = repeat( torch.arange(ref_H_tokens, dtype=torch.int32, device=device), "h -> h w", w=ref_W_tokens, ).flatten() col_ids = repeat( torch.arange(ref_W_tokens, dtype=torch.int32, device=device), "w -> h w", h=ref_H_tokens, ).flatten() position_ids[i, pe_shift_len : pe_shift_len + ref_img_len, 0] = ( pe_shift ) position_ids[i, pe_shift_len : pe_shift_len + ref_img_len, 1] = ( row_ids ) position_ids[i, pe_shift_len : pe_shift_len + ref_img_len, 2] = ( col_ids ) pe_shift += max(ref_H_tokens, ref_W_tokens) pe_shift_len += ref_img_len if isinstance(l_effective_img_len[i], list): # New case for img_size, img_len in zip(img_sizes[i], l_effective_img_len[i]): H, W = img_size H_tokens, W_tokens = H // p, W // p assert H_tokens * W_tokens == img_len row_ids = repeat( torch.arange(H_tokens, dtype=torch.int32, device=device), "h -> h w", w=W_tokens, ).flatten() col_ids = repeat( torch.arange(W_tokens, dtype=torch.int32, device=device), "w -> h w", h=H_tokens, ).flatten() end_idx = pe_shift_len + img_len position_ids[i, pe_shift_len:end_idx, 0] = pe_shift position_ids[i, pe_shift_len:end_idx, 1] = row_ids position_ids[i, pe_shift_len:end_idx, 2] = col_ids pe_shift += max(H_tokens, W_tokens) pe_shift_len = end_idx else: # Original case H, W = img_sizes[i] H_tokens, W_tokens = H // p, W // p assert H_tokens * W_tokens == l_effective_img_len[i] row_ids = repeat( torch.arange(H_tokens, dtype=torch.int32, device=device), "h -> h w", w=W_tokens, ).flatten() col_ids = repeat( torch.arange(W_tokens, dtype=torch.int32, device=device), "w -> h w", h=H_tokens, ).flatten() assert pe_shift_len + l_effective_img_len[i] == seq_len position_ids[i, pe_shift_len:seq_len, 0] = pe_shift position_ids[i, pe_shift_len:seq_len, 1] = row_ids position_ids[i, pe_shift_len:seq_len, 2] = col_ids # Get combined rotary embeddings freqs_cis = self._get_freqs_cis(freqs_cis, position_ids) # create separate rotary embeddings for captions and images cap_freqs_cis = torch.zeros( batch_size, encoder_seq_len, freqs_cis.shape[-1], device=device, dtype=freqs_cis.dtype, ) ref_img_freqs_cis = torch.zeros( batch_size, max_ref_img_len, freqs_cis.shape[-1], device=device, dtype=freqs_cis.dtype, ) img_freqs_cis = torch.zeros( batch_size, max_img_len, freqs_cis.shape[-1], device=device, dtype=freqs_cis.dtype, ) for i, (cap_seq_len, ref_img_len, img_len, seq_len) in enumerate( zip( l_effective_cap_len, l_effective_ref_img_len, l_effective_img_len, seq_lengths, ) ): cap_freqs_cis[i, :cap_seq_len] = freqs_cis[i, :cap_seq_len] ref_img_freqs_cis[i, : sum(ref_img_len)] = freqs_cis[ i, cap_seq_len : cap_seq_len + sum(ref_img_len) ] if isinstance(img_len, list): img_len = sum(img_len) img_freqs_cis[i, :img_len] = freqs_cis[ i, cap_seq_len + sum(ref_img_len) : cap_seq_len + sum(ref_img_len) + img_len, ] return ( cap_freqs_cis, ref_img_freqs_cis, img_freqs_cis, freqs_cis, l_effective_cap_len, seq_lengths, ) class OmniGen2LoraLoaderMixin(LoraBaseMixin): r""" Load LoRA layers into [`OmniGen2Transformer3DModel`]. Specific to [`FOFPredPipeline`]. """ _lora_loadable_modules = ["transformer"] transformer_name = TRANSFORMER_NAME @classmethod @validate_hf_hub_args def lora_state_dict( cls, pretrained_model_name_or_path_or_dict: Union[str, Dict[str, torch.Tensor]], **kwargs, ): r""" Return state dict for lora weights and the network alphas. We support loading A1111 formatted LoRA checkpoints in a limited capacity. This function is experimental and might change in the future. Parameters: pretrained_model_name_or_path_or_dict (`str` or `os.PathLike` or `dict`): Can be either: - A string, the *model id* (for example `google/ddpm-celebahq-256`) of a pretrained model hosted on the Hub. - A path to a *directory* (for example `./my_model_directory`) containing the model weights saved with [`ModelMixin.save_pretrained`]. - A [torch state dict](https://pytorch.org/tutorials/beginner/saving_loading_models.html#what-is-a-state-dict). cache_dir (`Union[str, os.PathLike]`, *optional*): Path to a directory where a downloaded pretrained model configuration is cached if the standard cache is not used. force_download (`bool`, *optional*, defaults to `False`): Whether or not to force the (re-)download of the model weights and configuration files, overriding the cached versions if they exist. proxies (`Dict[str, str]`, *optional*): A dictionary of proxy servers to use by protocol or endpoint, for example, `{'http': 'foo.bar:3128', 'http://hostname': 'foo.bar:4012'}`. The proxies are used on each request. local_files_only (`bool`, *optional*, defaults to `False`): Whether to only load local model weights and configuration files or not. If set to `True`, the model won't be downloaded from the Hub. token (`str` or *bool*, *optional*): The token to use as HTTP bearer authorization for remote files. If `True`, the token generated from `diffusers-cli login` (stored in `~/.huggingface`) is used. revision (`str`, *optional*, defaults to `"main"`): The specific model version to use. It can be a branch name, a tag name, a commit id, or any identifier allowed by Git. subfolder (`str`, *optional*, defaults to `""`): The subfolder location of a model file within a larger model repository on the Hub or locally. """ # Load the main state dict first which has the LoRA layers for either of # transformer and text encoder or both. cache_dir = kwargs.pop("cache_dir", None) force_download = kwargs.pop("force_download", False) proxies = kwargs.pop("proxies", None) local_files_only = kwargs.pop("local_files_only", None) token = kwargs.pop("token", None) revision = kwargs.pop("revision", None) subfolder = kwargs.pop("subfolder", None) weight_name = kwargs.pop("weight_name", None) use_safetensors = kwargs.pop("use_safetensors", None) allow_pickle = False if use_safetensors is None: use_safetensors = True allow_pickle = True user_agent = { "file_type": "attn_procs_weights", "framework": "pytorch", } state_dict = _fetch_state_dict( pretrained_model_name_or_path_or_dict=pretrained_model_name_or_path_or_dict, weight_name=weight_name, use_safetensors=use_safetensors, local_files_only=local_files_only, cache_dir=cache_dir, force_download=force_download, proxies=proxies, token=token, revision=revision, subfolder=subfolder, user_agent=user_agent, allow_pickle=allow_pickle, ) is_dora_scale_present = any("dora_scale" in k for k in state_dict) if is_dora_scale_present: warn_msg = "It seems like you are using a DoRA checkpoint that is not compatible in Diffusers at the moment. So, we are going to filter out the keys associated to 'dora_scale` from the state dict. If you think this is a mistake please open an issue https://github.com/huggingface/diffusers/issues/new." logger.warning(warn_msg) state_dict = {k: v for k, v in state_dict.items() if "dora_scale" not in k} # conversion. non_diffusers = any(k.startswith("diffusion_model.") for k in state_dict) if non_diffusers: state_dict = _convert_non_diffusers_lumina2_lora_to_diffusers(state_dict) return state_dict # Copied from diffusers.loaders.lora_pipeline.CogVideoXLoraLoaderMixin.load_lora_weights def load_lora_weights( self, pretrained_model_name_or_path_or_dict: Union[str, Dict[str, torch.Tensor]], adapter_name=None, **kwargs, ): """ Load LoRA weights specified in `pretrained_model_name_or_path_or_dict` into `self.transformer` and `self.text_encoder`. All kwargs are forwarded to `self.lora_state_dict`. See [`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`] for more details on how the state dict is loaded. See [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_into_transformer`] for more details on how the state dict is loaded into `self.transformer`. Parameters: pretrained_model_name_or_path_or_dict (`str` or `os.PathLike` or `dict`): See [`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`]. adapter_name (`str`, *optional*): Adapter name to be used for referencing the loaded adapter model. If not specified, it will use `default_{i}` where i is the total number of adapters being loaded. low_cpu_mem_usage (`bool`, *optional*): Speed up model loading by only loading the pretrained LoRA weights and not initializing the random weights. kwargs (`dict`, *optional*): See [`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`]. """ if not USE_PEFT_BACKEND: raise ValueError("PEFT backend is required for this method.") low_cpu_mem_usage = kwargs.pop( "low_cpu_mem_usage", _LOW_CPU_MEM_USAGE_DEFAULT_LORA ) if low_cpu_mem_usage and is_peft_version("<", "0.13.0"): raise ValueError( "`low_cpu_mem_usage=True` is not compatible with this `peft` version. Please update it with `pip install -U peft`." ) # if a dict is passed, copy it instead of modifying it inplace if isinstance(pretrained_model_name_or_path_or_dict, dict): pretrained_model_name_or_path_or_dict = ( pretrained_model_name_or_path_or_dict.copy() ) # First, ensure that the checkpoint is a compatible one and can be successfully loaded. state_dict = self.lora_state_dict( pretrained_model_name_or_path_or_dict, **kwargs ) is_correct_format = all("lora" in key for key in state_dict.keys()) if not is_correct_format: raise ValueError("Invalid LoRA checkpoint.") self.load_lora_into_transformer( state_dict, transformer=getattr(self, self.transformer_name) if not hasattr(self, "transformer") else self.transformer, adapter_name=adapter_name, _pipeline=self, low_cpu_mem_usage=low_cpu_mem_usage, ) @classmethod # Copied from diffusers.loaders.lora_pipeline.SD3LoraLoaderMixin.load_lora_into_transformer with SD3Transformer2DModel->Lumina2Transformer2DModel def load_lora_into_transformer( cls, state_dict, transformer, adapter_name=None, _pipeline=None, low_cpu_mem_usage=False, hotswap: bool = False, ): """ This will load the LoRA layers specified in `state_dict` into `transformer`. Parameters: state_dict (`dict`): A standard state dict containing the lora layer parameters. The keys can either be indexed directly into the unet or prefixed with an additional `unet` which can be used to distinguish between text encoder lora layers. transformer (`Lumina2Transformer2DModel`): The Transformer model to load the LoRA layers into. adapter_name (`str`, *optional*): Adapter name to be used for referencing the loaded adapter model. If not specified, it will use `default_{i}` where i is the total number of adapters being loaded. low_cpu_mem_usage (`bool`, *optional*): Speed up model loading by only loading the pretrained LoRA weights and not initializing the random weights. hotswap : (`bool`, *optional*) Defaults to `False`. Whether to substitute an existing (LoRA) adapter with the newly loaded adapter in-place. This means that, instead of loading an additional adapter, this will take the existing adapter weights and replace them with the weights of the new adapter. This can be faster and more memory efficient. However, the main advantage of hotswapping is that when the model is compiled with torch.compile, loading the new adapter does not require recompilation of the model. When using hotswapping, the passed `adapter_name` should be the name of an already loaded adapter. If the new adapter and the old adapter have different ranks and/or LoRA alphas (i.e. scaling), you need to call an additional method before loading the adapter: ```py pipeline = ... # load diffusers pipeline max_rank = ... # the highest rank among all LoRAs that you want to load # call *before* compiling and loading the LoRA adapter pipeline.enable_lora_hotswap(target_rank=max_rank) pipeline.load_lora_weights(file_name) # optionally compile the model now ``` Note that hotswapping adapters of the text encoder is not yet supported. There are some further limitations to this technique, which are documented here: https://huggingface.co/docs/peft/main/en/package_reference/hotswap """ if low_cpu_mem_usage and is_peft_version("<", "0.13.0"): raise ValueError( "`low_cpu_mem_usage=True` is not compatible with this `peft` version. Please update it with `pip install -U peft`." ) # Load the layers corresponding to transformer. logger.info(f"Loading {cls.transformer_name}.") transformer.load_lora_adapter( state_dict, network_alphas=None, adapter_name=adapter_name, _pipeline=_pipeline, low_cpu_mem_usage=low_cpu_mem_usage, hotswap=hotswap, ) @classmethod # Copied from diffusers.loaders.lora_pipeline.CogVideoXLoraLoaderMixin.save_lora_weights def save_lora_weights( cls, save_directory: Union[str, os.PathLike], transformer_lora_layers: Dict[str, Union[torch.nn.Module, torch.Tensor]] = None, is_main_process: bool = True, weight_name: str = None, save_function: Callable = None, safe_serialization: bool = True, ): r""" Save the LoRA parameters corresponding to the UNet and text encoder. Arguments: save_directory (`str` or `os.PathLike`): Directory to save LoRA parameters to. Will be created if it doesn't exist. transformer_lora_layers (`Dict[str, torch.nn.Module]` or `Dict[str, torch.Tensor]`): State dict of the LoRA layers corresponding to the `transformer`. is_main_process (`bool`, *optional*, defaults to `True`): Whether the process calling this is the main process or not. Useful during distributed training and you need to call this function on all processes. In this case, set `is_main_process=True` only on the main process to avoid race conditions. save_function (`Callable`): The function to use to save the state dictionary. Useful during distributed training when you need to replace `torch.save` with another method. Can be configured with the environment variable `DIFFUSERS_SAVE_MODE`. safe_serialization (`bool`, *optional*, defaults to `True`): Whether to save the model using `safetensors` or the traditional PyTorch way with `pickle`. """ state_dict = {} if not transformer_lora_layers: raise ValueError("You must pass `transformer_lora_layers`.") if transformer_lora_layers: state_dict.update( cls.pack_weights(transformer_lora_layers, cls.transformer_name) ) # Save the model cls.write_lora_layers( state_dict=state_dict, save_directory=save_directory, is_main_process=is_main_process, weight_name=weight_name, save_function=save_function, safe_serialization=safe_serialization, ) # Copied from diffusers.loaders.lora_pipeline.SanaLoraLoaderMixin.fuse_lora def fuse_lora( self, components: List[str] = ["transformer"], lora_scale: float = 1.0, safe_fusing: bool = False, adapter_names: Optional[List[str]] = None, **kwargs, ): r""" Fuses the LoRA parameters into the original parameters of the corresponding blocks. This is an experimental API. Args: components: (`List[str]`): List of LoRA-injectable components to fuse the LoRAs into. lora_scale (`float`, defaults to 1.0): Controls how much to influence the outputs with the LoRA parameters. safe_fusing (`bool`, defaults to `False`): Whether to check fused weights for NaN values before fusing and if values are NaN not fusing them. adapter_names (`List[str]`, *optional*): Adapter names to be used for fusing. If nothing is passed, all active adapters will be fused. Example: ```py from diffusers import DiffusionPipeline import torch pipeline = DiffusionPipeline.from_pretrained( "stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16 ).to("cuda") pipeline.load_lora_weights("nerijs/pixel-art-xl", weight_name="pixel-art-xl.safetensors", adapter_name="pixel") pipeline.fuse_lora(lora_scale=0.7) ``` """ super().fuse_lora( components=components, lora_scale=lora_scale, safe_fusing=safe_fusing, adapter_names=adapter_names, **kwargs, ) # Copied from diffusers.loaders.lora_pipeline.SanaLoraLoaderMixin.unfuse_lora def unfuse_lora(self, components: List[str] = ["transformer"], **kwargs): r""" Reverses the effect of [`pipe.fuse_lora()`](https://huggingface.co/docs/diffusers/main/en/api/loaders#diffusers.loaders.LoraBaseMixin.fuse_lora). This is an experimental API. Args: components (`List[str]`): List of LoRA-injectable components to unfuse LoRA from. unfuse_transformer (`bool`, defaults to `True`): Whether to unfuse the UNet LoRA parameters. """ super().unfuse_lora(components=components, **kwargs) def cache_init(self, num_steps: int): """ Initialization for cache. """ cache_dic = {} cache = {} cache_index = {} cache[-1] = {} cache_index[-1] = {} cache_index["layer_index"] = {} cache[-1]["layers_stream"] = {} cache_dic["cache_counter"] = 0 for j in range(len(self.transformer.layers)): cache[-1]["layers_stream"][j] = {} cache_index[-1][j] = {} cache_dic["Delta-DiT"] = False cache_dic["cache_type"] = "random" cache_dic["cache_index"] = cache_index cache_dic["cache"] = cache cache_dic["fresh_ratio_schedule"] = "ToCa" cache_dic["fresh_ratio"] = 0.0 cache_dic["fresh_threshold"] = 3 cache_dic["soft_fresh_weight"] = 0.0 cache_dic["taylor_cache"] = True cache_dic["max_order"] = 4 cache_dic["first_enhance"] = 5 current = {} current["activated_steps"] = [0] current["step"] = 0 current["num_steps"] = num_steps return cache_dic, current @dataclass class FMPipelineOutput(BaseOutput): """ Output class for OmniGen2 pipeline. Args: images (Union[List[PIL.Image.Image], np.ndarray]): List of denoised PIL images of length `batch_size` or numpy array of shape `(batch_size, height, width, num_channels)`. Contains the generated images. """ images: Union[List[PIL.Image.Image], np.ndarray] # Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps def retrieve_timesteps( scheduler, num_inference_steps: Optional[int] = None, device: Optional[Union[str, torch.device]] = None, timesteps: Optional[List[int]] = None, **kwargs, ): """ Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`. Args: scheduler (`SchedulerMixin`): The scheduler to get timesteps from. num_inference_steps (`int`): The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps` must be `None`. device (`str` or `torch.device`, *optional*): The device to which the timesteps should be moved to. If `None`, the timesteps are not moved. timesteps (`List[int]`, *optional*): Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed, `num_inference_steps` and `sigmas` must be `None`. sigmas (`List[float]`, *optional*): Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed, `num_inference_steps` and `timesteps` must be `None`. Returns: `Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the second element is the number of inference steps. """ if timesteps is not None: accepts_timesteps = "timesteps" in set( inspect.signature(scheduler.set_timesteps).parameters.keys() ) if not accepts_timesteps: raise ValueError( f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom" f" timestep schedules. Please check whether you are using the correct scheduler." ) scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) else: scheduler.set_timesteps(num_inference_steps, device=device, **kwargs) timesteps = scheduler.timesteps return timesteps, num_inference_steps class FOFPredPipeline(DiffusionPipeline, OmniGen2LoraLoaderMixin): """ Pipeline for text-to-image generation using OmniGen2. This pipeline implements a text-to-image generation model that uses: - Qwen2.5-VL for text encoding - A custom transformer architecture for image generation - VAE for image encoding/decoding - FlowMatchEulerDiscreteScheduler for noise scheduling Args: transformer (OmniGen2Transformer3DModel): The transformer model for image generation. vae (AutoencoderKL): The VAE model for image encoding/decoding. scheduler (FlowMatchEulerDiscreteScheduler): The scheduler for noise scheduling. text_encoder (Qwen2_5_VLModel): The text encoder model. tokenizer (Union[Qwen2Tokenizer, Qwen2TokenizerFast]): The tokenizer for text processing. """ model_cpu_offload_seq = "mllm->transformer->vae" def __init__( self, transformer: OmniGen2Transformer3DModel, vae: AutoencoderKL, scheduler: FlowMatchEulerDiscreteScheduler, mllm: Qwen2_5_VLForConditionalGeneration, processor, ) -> None: """ Initialize the OmniGen2 pipeline. Args: transformer: The transformer model for image generation. vae: The VAE model for image encoding/decoding. scheduler: The scheduler for noise scheduling. text_encoder: The text encoder model. tokenizer: The tokenizer for text processing. """ super().__init__() self.register_modules( transformer=transformer, vae=vae, scheduler=scheduler, mllm=mllm, processor=processor, ) self.vae_scale_factor = ( 2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8 ) self.image_processor = OmniGen2ImageProcessor( vae_scale_factor=self.vae_scale_factor * 2, do_resize=True ) self.default_sample_size = 128 def prepare_latents( self, batch_size: int, num_channels_latents: int, height: int, width: int, dtype: torch.dtype, device: torch.device, generator: Optional[torch.Generator], latents: Optional[torch.FloatTensor] = None, frame_count: int = 1, ) -> torch.FloatTensor: """ Prepare the initial latents for the diffusion process. Args: batch_size: The number of images to generate. num_channels_latents: The number of channels in the latent space. height: The height of the generated image. width: The width of the generated image. dtype: The data type of the latents. device: The device to place the latents on. generator: The random number generator to use. latents: Optional pre-computed latents to use instead of random initialization. frame_count: The number of frames to output. Returns: torch.FloatTensor: The prepared latents tensor. """ height = int(height) // self.vae_scale_factor width = int(width) // self.vae_scale_factor if frame_count > 1: shape = (batch_size, frame_count, num_channels_latents, height, width) else: shape = (batch_size, num_channels_latents, height, width) if latents is None: latents = randn_tensor( shape, generator=generator, device=device, dtype=dtype ) else: latents = latents.to(device) return latents def encode_vae(self, img: torch.FloatTensor) -> torch.FloatTensor: """ Encode an image into the VAE latent space. Args: img: The input image tensor to encode. Returns: torch.FloatTensor: The encoded latent representation. """ z0 = self.vae.encode(img.to(dtype=self.vae.dtype)).latent_dist.sample() if self.vae.config.shift_factor is not None: z0 = z0 - self.vae.config.shift_factor if self.vae.config.scaling_factor is not None: z0 = z0 * self.vae.config.scaling_factor z0 = z0.to(dtype=self.vae.dtype) return z0 def prepare_image( self, images: Union[List[PIL.Image.Image], PIL.Image.Image], batch_size: int, num_images_per_prompt: int, max_pixels: int, max_side_length: int, device: torch.device, dtype: torch.dtype, ) -> List[Optional[torch.FloatTensor]]: """ Prepare input images for processing by encoding them into the VAE latent space. Args: images: Single image or list of images to process. batch_size: The number of images to generate per prompt. num_images_per_prompt: The number of images to generate for each prompt. device: The device to place the encoded latents on. dtype: The data type of the encoded latents. Returns: List[Optional[torch.FloatTensor]]: List of encoded latent representations for each image. """ if batch_size == 1: images = [images] latents = [] for i, img in enumerate(images): if img is not None and len(img) > 0: ref_latents = [] for j, img_j in enumerate(img): img_j = self.image_processor.preprocess( img_j, max_pixels=max_pixels, max_side_length=max_side_length ) ref_latents.append( self.encode_vae(img_j.to(device=device)).squeeze(0) ) else: ref_latents = None for _ in range(num_images_per_prompt): latents.append(ref_latents) return latents def _get_qwen2_prompt_embeds( self, prompt: Union[str, List[str]], device: Optional[torch.device] = None, max_sequence_length: int = 256, ) -> Tuple[torch.Tensor, torch.Tensor]: """ Get prompt embeddings from the Qwen2 text encoder. Args: prompt: The prompt or list of prompts to encode. device: The device to place the embeddings on. If None, uses the pipeline's device. max_sequence_length: Maximum sequence length for tokenization. Returns: Tuple[torch.Tensor, torch.Tensor]: A tuple containing: - The prompt embeddings tensor - The attention mask tensor Raises: Warning: If the input text is truncated due to sequence length limitations. """ device = device or self._execution_device prompt = [prompt] if isinstance(prompt, str) else prompt # text_inputs = self.processor.tokenizer( # prompt, # padding="max_length", # max_length=max_sequence_length, # truncation=True, # return_tensors="pt", # ) text_inputs = self.processor.tokenizer( prompt, padding="longest", max_length=max_sequence_length, truncation=True, return_tensors="pt", ) text_input_ids = text_inputs.input_ids.to(device) untruncated_ids = self.processor.tokenizer( prompt, padding="longest", return_tensors="pt" ).input_ids.to(device) if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal( text_input_ids, untruncated_ids ): removed_text = self.processor.tokenizer.batch_decode( untruncated_ids[:, max_sequence_length - 1 : -1] ) logger.warning( "The following part of your input was truncated because Gemma can only handle sequences up to" f" {max_sequence_length} tokens: {removed_text}" ) prompt_attention_mask = text_inputs.attention_mask.to(device) prompt_embeds = self.mllm( text_input_ids, attention_mask=prompt_attention_mask, output_hidden_states=True, ).hidden_states[-1] if self.mllm is not None: dtype = self.mllm.dtype elif self.transformer is not None: dtype = self.transformer.dtype else: dtype = None prompt_embeds = prompt_embeds.to(dtype=dtype, device=device) return prompt_embeds, prompt_attention_mask def _apply_chat_template(self, prompt: str): prompt = [ { "role": "system", "content": "You are a helpful assistant that generates high-quality images based on user instructions.", }, {"role": "user", "content": prompt}, ] prompt = self.processor.tokenizer.apply_chat_template( prompt, tokenize=False, add_generation_prompt=False ) return prompt def encode_prompt( self, prompt: Union[str, List[str]], do_classifier_free_guidance: bool = True, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: int = 1, device: Optional[torch.device] = None, prompt_embeds: Optional[torch.Tensor] = None, negative_prompt_embeds: Optional[torch.Tensor] = None, prompt_attention_mask: Optional[torch.Tensor] = None, negative_prompt_attention_mask: Optional[torch.Tensor] = None, max_sequence_length: int = 256, ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: r""" Encodes the prompt into text encoder hidden states. Args: prompt (`str` or `List[str]`, *optional*): prompt to be encoded negative_prompt (`str` or `List[str]`, *optional*): The prompt not to guide the image generation. If not defined, one has to pass `negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). For Lumina-T2I, this should be "". do_classifier_free_guidance (`bool`, *optional*, defaults to `True`): whether to use classifier free guidance or not num_images_per_prompt (`int`, *optional*, defaults to 1): number of images that should be generated per prompt device: (`torch.device`, *optional*): torch device to place the resulting embeddings on prompt_embeds (`torch.Tensor`, *optional*): Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not provided, text embeddings will be generated from `prompt` input argument. negative_prompt_embeds (`torch.Tensor`, *optional*): Pre-generated negative text embeddings. For Lumina-T2I, it's should be the embeddings of the "" string. max_sequence_length (`int`, defaults to `256`): Maximum sequence length to use for the prompt. """ device = device or self._execution_device prompt = [prompt] if isinstance(prompt, str) else prompt prompt = [self._apply_chat_template(_prompt) for _prompt in prompt] if prompt is not None: batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] if prompt_embeds is None: prompt_embeds, prompt_attention_mask = self._get_qwen2_prompt_embeds( prompt=prompt, device=device, max_sequence_length=max_sequence_length ) batch_size, seq_len, _ = prompt_embeds.shape # duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1) prompt_embeds = prompt_embeds.view( batch_size * num_images_per_prompt, seq_len, -1 ) prompt_attention_mask = prompt_attention_mask.repeat(num_images_per_prompt, 1) prompt_attention_mask = prompt_attention_mask.view( batch_size * num_images_per_prompt, -1 ) # Get negative embeddings for classifier free guidance if do_classifier_free_guidance and negative_prompt_embeds is None: negative_prompt = negative_prompt if negative_prompt is not None else "" # Normalize str to list negative_prompt = ( batch_size * [negative_prompt] if isinstance(negative_prompt, str) else negative_prompt ) negative_prompt = [ self._apply_chat_template(_negative_prompt) for _negative_prompt in negative_prompt ] if prompt is not None and type(prompt) is not type(negative_prompt): raise TypeError( f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !=" f" {type(prompt)}." ) elif isinstance(negative_prompt, str): negative_prompt = [negative_prompt] elif batch_size != len(negative_prompt): raise ValueError( f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:" f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches" " the batch size of `prompt`." ) negative_prompt_embeds, negative_prompt_attention_mask = ( self._get_qwen2_prompt_embeds( prompt=negative_prompt, device=device, max_sequence_length=max_sequence_length, ) ) batch_size, seq_len, _ = negative_prompt_embeds.shape # duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method negative_prompt_embeds = negative_prompt_embeds.repeat( 1, num_images_per_prompt, 1 ) negative_prompt_embeds = negative_prompt_embeds.view( batch_size * num_images_per_prompt, seq_len, -1 ) negative_prompt_attention_mask = negative_prompt_attention_mask.repeat( num_images_per_prompt, 1 ) negative_prompt_attention_mask = negative_prompt_attention_mask.view( batch_size * num_images_per_prompt, -1 ) return ( prompt_embeds, prompt_attention_mask, negative_prompt_embeds, negative_prompt_attention_mask, ) @property def num_timesteps(self): return self._num_timesteps @property def text_guidance_scale(self): return self._text_guidance_scale @property def image_guidance_scale(self): return self._image_guidance_scale @property def cfg_range(self): return self._cfg_range @torch.no_grad() def __call__( self, prompt: Optional[Union[str, List[str]]] = None, negative_prompt: Optional[Union[str, List[str]]] = None, prompt_embeds: Optional[torch.FloatTensor] = None, negative_prompt_embeds: Optional[torch.FloatTensor] = None, prompt_attention_mask: Optional[torch.LongTensor] = None, negative_prompt_attention_mask: Optional[torch.LongTensor] = None, max_sequence_length: Optional[int] = None, callback_on_step_end_tensor_inputs: Optional[List[str]] = None, input_images: Optional[List[PIL.Image.Image]] = None, num_images_per_prompt: int = 1, height: Optional[int] = None, width: Optional[int] = None, max_pixels: int = 1024 * 1024, max_input_image_side_length: int = 1024, align_res: bool = True, num_inference_steps: int = 28, text_guidance_scale: float = 4.0, image_guidance_scale: float = 1.0, cfg_range: Tuple[float, float] = (0.0, 1.0), attention_kwargs: Optional[Dict[str, Any]] = None, timesteps: List[int] = None, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.FloatTensor] = None, frame_count: int = 1, output_type: Optional[str] = "pil", return_dict: bool = True, verbose: bool = False, step_func=None, get_latents_text_embeds=False, ): height = height or self.default_sample_size * self.vae_scale_factor width = width or self.default_sample_size * self.vae_scale_factor self._text_guidance_scale = text_guidance_scale self._image_guidance_scale = image_guidance_scale self._cfg_range = cfg_range self._attention_kwargs = attention_kwargs # 2. Define call parameters if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] device = self._execution_device # 3. Encode input prompt ( prompt_embeds, prompt_attention_mask, negative_prompt_embeds, negative_prompt_attention_mask, ) = self.encode_prompt( prompt, self.text_guidance_scale > 1.0, negative_prompt=negative_prompt, num_images_per_prompt=num_images_per_prompt, device=device, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, prompt_attention_mask=prompt_attention_mask, negative_prompt_attention_mask=negative_prompt_attention_mask, max_sequence_length=max_sequence_length, ) dtype = self.vae.dtype # 3. Prepare control image ref_latents = self.prepare_image( images=input_images, batch_size=batch_size, num_images_per_prompt=num_images_per_prompt, max_pixels=max_pixels, max_side_length=max_input_image_side_length, device=device, dtype=dtype, ) if input_images is None: input_images = [] if len(input_images) == 1 and align_res: width, height = ( ref_latents[0][0].shape[-1] * self.vae_scale_factor, ref_latents[0][0].shape[-2] * self.vae_scale_factor, ) ori_width, ori_height = width, height else: ori_width, ori_height = width, height cur_pixels = height * width ratio = (max_pixels / cur_pixels) ** 0.5 ratio = min(ratio, 1.0) height, width = ( int(height * ratio) // 16 * 16, int(width * ratio) // 16 * 16, ) if len(input_images) == 0: self._image_guidance_scale = 1 # 4. Prepare latents. latent_channels = self.transformer.config.in_channels latents = self.prepare_latents( batch_size * num_images_per_prompt, latent_channels, height, width, prompt_embeds.dtype, device, generator, latents, frame_count, ) freqs_cis = OmniGen2RotaryPosEmbed.get_freqs_cis( self.transformer.config.axes_dim_rope, self.transformer.config.axes_lens, theta=10000, ) image = self.processing( latents=latents, ref_latents=ref_latents, prompt_embeds=prompt_embeds, freqs_cis=freqs_cis, negative_prompt_embeds=negative_prompt_embeds, prompt_attention_mask=prompt_attention_mask, negative_prompt_attention_mask=negative_prompt_attention_mask, num_inference_steps=num_inference_steps, timesteps=timesteps, device=device, dtype=dtype, verbose=verbose, step_func=step_func, get_latents_text_embeds=get_latents_text_embeds, ) if get_latents_text_embeds: return image, prompt_embeds if len(image.shape) == 4: image = F.interpolate(image, size=(ori_height, ori_width), mode="bilinear") image = self.image_processor.postprocess(image, output_type=output_type) else: image = [ F.interpolate( image[:, i], size=(ori_height, ori_width), mode="bilinear" ) for i in range(image.shape[1]) ] image = [ self.image_processor.postprocess(x, output_type=output_type) for x in image ] image = torch.stack(image, dim=1) # Offload all models self.maybe_free_model_hooks() if not return_dict: return image else: return FMPipelineOutput(images=image) def processing( self, latents, ref_latents, prompt_embeds, freqs_cis, negative_prompt_embeds, prompt_attention_mask, negative_prompt_attention_mask, num_inference_steps, timesteps, device, dtype, verbose, step_func=None, get_latents_text_embeds=False, ): batch_size = latents.shape[0] timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, num_tokens=latents.shape[-2] * latents.shape[-1], ) num_warmup_steps = max( len(timesteps) - num_inference_steps * self.scheduler.order, 0 ) self._num_timesteps = len(timesteps) enable_taylorseer = getattr(self, "enable_taylorseer", False) if enable_taylorseer: model_pred_cache_dic, model_pred_current = cache_init( self, num_inference_steps ) model_pred_ref_cache_dic, model_pred_ref_current = cache_init( self, num_inference_steps ) model_pred_uncond_cache_dic, model_pred_uncond_current = cache_init( self, num_inference_steps ) self.transformer.enable_taylorseer = True elif self.transformer.enable_teacache: # Use different TeaCacheParams for different conditions teacache_params = TeaCacheParams() teacache_params_uncond = TeaCacheParams() teacache_params_ref = TeaCacheParams() with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t in enumerate(timesteps): if enable_taylorseer: self.transformer.cache_dic = model_pred_cache_dic self.transformer.current = model_pred_current elif self.transformer.enable_teacache: teacache_params.is_first_or_last_step = ( i == 0 or i == len(timesteps) - 1 ) self.transformer.teacache_params = teacache_params model_pred = self.predict( t=t, latents=latents, prompt_embeds=prompt_embeds, freqs_cis=freqs_cis, prompt_attention_mask=prompt_attention_mask, ref_image_hidden_states=ref_latents, ) text_guidance_scale = ( self.text_guidance_scale if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1] else 1.0 ) image_guidance_scale = ( self.image_guidance_scale if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1] else 1.0 ) if text_guidance_scale > 1.0 and image_guidance_scale > 1.0: if enable_taylorseer: self.transformer.cache_dic = model_pred_ref_cache_dic self.transformer.current = model_pred_ref_current elif self.transformer.enable_teacache: teacache_params_ref.is_first_or_last_step = ( i == 0 or i == len(timesteps) - 1 ) self.transformer.teacache_params = teacache_params_ref model_pred_ref = self.predict( t=t, latents=latents, prompt_embeds=negative_prompt_embeds, freqs_cis=freqs_cis, prompt_attention_mask=negative_prompt_attention_mask, ref_image_hidden_states=ref_latents, ) if enable_taylorseer: self.transformer.cache_dic = model_pred_uncond_cache_dic self.transformer.current = model_pred_uncond_current elif self.transformer.enable_teacache: teacache_params_uncond.is_first_or_last_step = ( i == 0 or i == len(timesteps) - 1 ) self.transformer.teacache_params = teacache_params_uncond model_pred_uncond = self.predict( t=t, latents=latents, prompt_embeds=negative_prompt_embeds, freqs_cis=freqs_cis, prompt_attention_mask=negative_prompt_attention_mask, ref_image_hidden_states=None, ) model_pred = ( model_pred_uncond + image_guidance_scale * (model_pred_ref - model_pred_uncond) + text_guidance_scale * (model_pred - model_pred_ref) ) elif text_guidance_scale > 1.0: if enable_taylorseer: self.transformer.cache_dic = model_pred_uncond_cache_dic self.transformer.current = model_pred_uncond_current elif self.transformer.enable_teacache: teacache_params_uncond.is_first_or_last_step = ( i == 0 or i == len(timesteps) - 1 ) self.transformer.teacache_params = teacache_params_uncond model_pred_uncond = self.predict( t=t, latents=latents, prompt_embeds=negative_prompt_embeds, freqs_cis=freqs_cis, prompt_attention_mask=negative_prompt_attention_mask, ref_image_hidden_states=None, ) model_pred = model_pred_uncond + text_guidance_scale * ( model_pred - model_pred_uncond ) latents = self.scheduler.step( model_pred, t, latents, return_dict=False )[0] latents = latents.to(dtype=dtype) if i == len(timesteps) - 1 or ( (i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0 ): progress_bar.update() if step_func is not None: step_func(i, self._num_timesteps) if enable_taylorseer: del ( model_pred_cache_dic, model_pred_ref_cache_dic, model_pred_uncond_cache_dic, ) del model_pred_current, model_pred_ref_current, model_pred_uncond_current latents = latents.to(dtype=dtype) if get_latents_text_embeds: return latents if self.vae.config.scaling_factor is not None: latents = latents / self.vae.config.scaling_factor if self.vae.config.shift_factor is not None: latents = latents + self.vae.config.shift_factor if len(latents.shape) == 4: image = self.vae.decode(latents, return_dict=False)[0] else: image = [ self.vae.decode(latents[:, i], return_dict=False)[0] for i in range(latents.shape[1]) ] image = torch.stack(image, dim=1) return image def predict( self, t, latents, prompt_embeds, freqs_cis, prompt_attention_mask, ref_image_hidden_states, ): # broadcast to batch dimension in a way that's compatible with ONNX/Core ML timestep = t.expand(latents.shape[0]).to(latents.dtype) if len(latents.shape) == 4: batch_size, num_channels_latents, height, width = latents.shape is_temporal = False else: batch_size, num_frames, num_channels_latents, height, width = latents.shape latents = [_latents for _latents in latents] is_temporal = True optional_kwargs = {} if "ref_image_hidden_states" in set( inspect.signature(self.transformer.forward).parameters.keys() ): optional_kwargs["ref_image_hidden_states"] = ref_image_hidden_states model_pred = self.transformer( latents, timestep, prompt_embeds, freqs_cis, prompt_attention_mask, **optional_kwargs, ) if is_temporal: model_pred = torch.stack(model_pred) return model_pred