"""
FOFPred Diffusion Pipeline.
Modified from OmniGen2 Diffusion Pipeline (By OmniGen2 Team and The HuggingFace Team).
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import inspect
import os
import warnings
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import PIL.Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from diffusers.configuration_utils import register_to_config
from diffusers.image_processor import (
PipelineImageInput,
VaeImageProcessor,
is_valid_image_imagelist,
)
from diffusers.loaders.lora_base import ( # noqa
LoraBaseMixin,
_fetch_state_dict,
)
from diffusers.loaders.lora_conversion_utils import (
_convert_non_diffusers_lumina2_lora_to_diffusers,
)
from diffusers.models.autoencoders import AutoencoderKL
from diffusers.models.embeddings import get_1d_rotary_pos_embed
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from diffusers.utils import (
USE_PEFT_BACKEND,
BaseOutput,
is_peft_available,
is_peft_version,
is_torch_version,
is_torch_xla_available,
is_transformers_available,
is_transformers_version,
logging,
)
from diffusers.utils.torch_utils import randn_tensor
from einops import repeat
from huggingface_hub.utils import validate_hf_hub_args
from transformers import Qwen2_5_VLForConditionalGeneration
from .scheduler.scheduler_fofpred import FlowMatchEulerDiscreteScheduler
from .transformer.transformer_fofpred import OmniGen2Transformer3DModel
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
_LOW_CPU_MEM_USAGE_DEFAULT_LORA = False
if is_torch_version(">=", "1.9.0"):
if (
is_peft_available()
and is_peft_version(">=", "0.13.1")
and is_transformers_available()
and is_transformers_version(">", "4.45.2")
):
_LOW_CPU_MEM_USAGE_DEFAULT_LORA = True
if is_torch_xla_available():
XLA_AVAILABLE = True
else:
XLA_AVAILABLE = False
TRANSFORMER_NAME = "transformer"
class OmniGen2ImageProcessor(VaeImageProcessor):
"""
Image processor for PixArt image resize and crop.
Args:
do_resize (`bool`, *optional*, defaults to `True`):
Whether to downscale the image's (height, width) dimensions to multiples of `vae_scale_factor`. Can accept
`height` and `width` arguments from [`image_processor.VaeImageProcessor.preprocess`] method.
vae_scale_factor (`int`, *optional*, defaults to `8`):
VAE scale factor. If `do_resize` is `True`, the image is automatically resized to multiples of this factor.
resample (`str`, *optional*, defaults to `lanczos`):
Resampling filter to use when resizing the image.
do_normalize (`bool`, *optional*, defaults to `True`):
Whether to normalize the image to [-1,1].
do_binarize (`bool`, *optional*, defaults to `False`):
Whether to binarize the image to 0/1.
do_convert_rgb (`bool`, *optional*, defaults to be `False`):
Whether to convert the images to RGB format.
do_convert_grayscale (`bool`, *optional*, defaults to be `False`):
Whether to convert the images to grayscale format.
"""
@register_to_config
def __init__(
self,
do_resize: bool = True,
vae_scale_factor: int = 16,
resample: str = "lanczos",
max_pixels: Optional[int] = None,
max_side_length: Optional[int] = None,
do_normalize: bool = True,
do_binarize: bool = False,
do_convert_grayscale: bool = False,
):
super().__init__(
do_resize=do_resize,
vae_scale_factor=vae_scale_factor,
resample=resample,
do_normalize=do_normalize,
do_binarize=do_binarize,
do_convert_grayscale=do_convert_grayscale,
)
self.max_pixels = max_pixels
self.max_side_length = max_side_length
def get_new_height_width(
self,
image: Union[PIL.Image.Image, np.ndarray, torch.Tensor],
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: Optional[int] = None,
max_side_length: Optional[int] = None,
) -> Tuple[int, int]:
r"""
Returns the height and width of the image, downscaled to the next integer multiple of `vae_scale_factor`.
Args:
image (`Union[PIL.Image.Image, np.ndarray, torch.Tensor]`):
The image input, which can be a PIL image, NumPy array, or PyTorch tensor. If it is a NumPy array, it
should have shape `[batch, height, width]` or `[batch, height, width, channels]`. If it is a PyTorch
tensor, it should have shape `[batch, channels, height, width]`.
height (`Optional[int]`, *optional*, defaults to `None`):
The height of the preprocessed image. If `None`, the height of the `image` input will be used.
width (`Optional[int]`, *optional*, defaults to `None`):
The width of the preprocessed image. If `None`, the width of the `image` input will be used.
Returns:
`Tuple[int, int]`:
A tuple containing the height and width, both resized to the nearest integer multiple of
`vae_scale_factor`.
"""
if height is None:
if isinstance(image, PIL.Image.Image):
height = image.height
elif isinstance(image, torch.Tensor):
height = image.shape[2]
else:
height = image.shape[1]
if width is None:
if isinstance(image, PIL.Image.Image):
width = image.width
elif isinstance(image, torch.Tensor):
width = image.shape[3]
else:
width = image.shape[2]
if max_side_length is None:
max_side_length = self.max_side_length
if max_pixels is None:
max_pixels = self.max_pixels
ratio = 1.0
if max_side_length is not None:
if height > width:
max_side_length_ratio = max_side_length / height
else:
max_side_length_ratio = max_side_length / width
cur_pixels = height * width
max_pixels_ratio = (max_pixels / cur_pixels) ** 0.5
ratio = min(
max_pixels_ratio, max_side_length_ratio, 1.0
) # do not upscale input image
new_height, new_width = (
int(height * ratio)
// self.config.vae_scale_factor
* self.config.vae_scale_factor,
int(width * ratio)
// self.config.vae_scale_factor
* self.config.vae_scale_factor,
)
return new_height, new_width
def preprocess(
self,
image: PipelineImageInput,
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: Optional[int] = None,
max_side_length: Optional[int] = None,
resize_mode: str = "default", # "default", "fill", "crop"
crops_coords: Optional[Tuple[int, int, int, int]] = None,
) -> torch.Tensor:
"""
Preprocess the image input.
Args:
image (`PipelineImageInput`):
The image input, accepted formats are PIL images, NumPy arrays, PyTorch tensors; Also accept list of
supported formats.
height (`int`, *optional*):
The height in preprocessed image. If `None`, will use the `get_default_height_width()` to get default
height.
width (`int`, *optional*):
The width in preprocessed. If `None`, will use get_default_height_width()` to get the default width.
resize_mode (`str`, *optional*, defaults to `default`):
The resize mode, can be one of `default` or `fill`. If `default`, will resize the image to fit within
the specified width and height, and it may not maintaining the original aspect ratio. If `fill`, will
resize the image to fit within the specified width and height, maintaining the aspect ratio, and then
center the image within the dimensions, filling empty with data from image. If `crop`, will resize the
image to fit within the specified width and height, maintaining the aspect ratio, and then center the
image within the dimensions, cropping the excess. Note that resize_mode `fill` and `crop` are only
supported for PIL image input.
crops_coords (`List[Tuple[int, int, int, int]]`, *optional*, defaults to `None`):
The crop coordinates for each image in the batch. If `None`, will not crop the image.
Returns:
`torch.Tensor`:
The preprocessed image.
"""
supported_formats = (PIL.Image.Image, np.ndarray, torch.Tensor)
# Expand the missing dimension for 3-dimensional pytorch tensor or numpy array that represents grayscale image
if (
self.config.do_convert_grayscale
and isinstance(image, (torch.Tensor, np.ndarray))
and image.ndim == 3
):
if isinstance(image, torch.Tensor):
# if image is a pytorch tensor could have 2 possible shapes:
# 1. batch x height x width: we should insert the channel dimension at position 1
# 2. channel x height x width: we should insert batch dimension at position 0,
# however, since both channel and batch dimension has same size 1, it is same to insert at position 1
# for simplicity, we insert a dimension of size 1 at position 1 for both cases
image = image.unsqueeze(1)
else:
# if it is a numpy array, it could have 2 possible shapes:
# 1. batch x height x width: insert channel dimension on last position
# 2. height x width x channel: insert batch dimension on first position
if image.shape[-1] == 1:
image = np.expand_dims(image, axis=0)
else:
image = np.expand_dims(image, axis=-1)
if (
isinstance(image, list)
and isinstance(image[0], np.ndarray)
and image[0].ndim == 4
):
warnings.warn(
"Passing `image` as a list of 4d np.ndarray is deprecated."
"Please concatenate the list along the batch dimension and pass it as a single 4d np.ndarray",
FutureWarning,
)
image = np.concatenate(image, axis=0)
if (
isinstance(image, list)
and isinstance(image[0], torch.Tensor)
and image[0].ndim == 4
):
warnings.warn(
"Passing `image` as a list of 4d torch.Tensor is deprecated."
"Please concatenate the list along the batch dimension and pass it as a single 4d torch.Tensor",
FutureWarning,
)
image = torch.cat(image, axis=0)
if not is_valid_image_imagelist(image):
raise ValueError(
f"Input is in incorrect format. Currently, we only support {', '.join(str(x) for x in supported_formats)}"
)
if not isinstance(image, list):
image = [image]
if isinstance(image[0], PIL.Image.Image):
if crops_coords is not None:
image = [i.crop(crops_coords) for i in image]
if self.config.do_resize:
height, width = self.get_new_height_width(
image[0], height, width, max_pixels, max_side_length
)
image = [
self.resize(i, height, width, resize_mode=resize_mode)
for i in image
]
if self.config.do_convert_rgb:
image = [self.convert_to_rgb(i) for i in image]
elif self.config.do_convert_grayscale:
image = [self.convert_to_grayscale(i) for i in image]
image = self.pil_to_numpy(image) # to np
image = self.numpy_to_pt(image) # to pt
elif isinstance(image[0], np.ndarray):
image = (
np.concatenate(image, axis=0)
if image[0].ndim == 4
else np.stack(image, axis=0)
)
image = self.numpy_to_pt(image)
height, width = self.get_new_height_width(
image, height, width, max_pixels, max_side_length
)
if self.config.do_resize:
image = self.resize(image, height, width)
elif isinstance(image[0], torch.Tensor):
image = (
torch.cat(image, axis=0)
if image[0].ndim == 4
else torch.stack(image, axis=0)
)
if self.config.do_convert_grayscale and image.ndim == 3:
image = image.unsqueeze(1)
channel = image.shape[1]
# don't need any preprocess if the image is latents
if channel == self.config.vae_latent_channels:
return image
height, width = self.get_new_height_width(
image, height, width, max_pixels, max_side_length
)
if self.config.do_resize:
image = self.resize(image, height, width)
# expected range [0,1], normalize to [-1,1]
do_normalize = self.config.do_normalize
if do_normalize and image.min() < 0:
warnings.warn(
"Passing `image` as torch tensor with value range in [-1,1] is deprecated. The expected value range for image tensor is [0,1] "
f"when passing as pytorch tensor or numpy Array. You passed `image` with value range [{image.min()},{image.max()}]",
FutureWarning,
)
do_normalize = False
if do_normalize:
image = self.normalize(image)
if self.config.do_binarize:
image = self.binarize(image)
return image
@dataclass
class TeaCacheParams:
"""
TeaCache parameters for `OmniGen2Transformer3DModel`
See https://github.com/ali-vilab/TeaCache/ for a more comprehensive understanding
Args:
previous_residual (Optional[torch.Tensor]):
The tensor difference between the output and the input of the transformer layers from the previous timestep.
previous_modulated_inp (Optional[torch.Tensor]):
The modulated input from the previous timestep used to indicate the change of the transformer layer's output.
accumulated_rel_l1_distance (float):
The accumulated relative L1 distance.
is_first_or_last_step (bool):
Whether the current timestep is the first or last step.
"""
previous_residual: Optional[torch.Tensor] = None
previous_modulated_inp: Optional[torch.Tensor] = None
accumulated_rel_l1_distance: float = 0
is_first_or_last_step: bool = False
class OmniGen2RotaryPosEmbed(nn.Module):
def __init__(
self,
theta: int,
axes_dim: Tuple[int, int, int],
axes_lens: Tuple[int, int, int] = (300, 512, 512),
patch_size: int = 2,
):
super().__init__()
self.theta = theta
self.axes_dim = axes_dim
self.axes_lens = axes_lens
self.patch_size = patch_size
@staticmethod
def get_freqs_cis(
axes_dim: Tuple[int, int, int], axes_lens: Tuple[int, int, int], theta: int
) -> List[torch.Tensor]:
freqs_cis = []
freqs_dtype = (
torch.float32 if torch.backends.mps.is_available() else torch.float64
)
for i, (d, e) in enumerate(zip(axes_dim, axes_lens)):
emb = get_1d_rotary_pos_embed(d, e, theta=theta, freqs_dtype=freqs_dtype)
freqs_cis.append(emb)
return freqs_cis
def _get_freqs_cis(self, freqs_cis, ids: torch.Tensor) -> torch.Tensor:
device = ids.device
if ids.device.type == "mps":
ids = ids.to("cpu")
result = []
for i in range(len(self.axes_dim)):
freqs = freqs_cis[i].to(ids.device)
index = ids[:, :, i : i + 1].repeat(1, 1, freqs.shape[-1]).to(torch.int64)
result.append(
torch.gather(
freqs.unsqueeze(0).repeat(index.shape[0], 1, 1), dim=1, index=index
)
)
return torch.cat(result, dim=-1).to(device)
def forward(
self,
freqs_cis,
attention_mask,
l_effective_ref_img_len,
l_effective_img_len,
ref_img_sizes,
img_sizes,
device,
):
batch_size = len(attention_mask)
p = self.patch_size
encoder_seq_len = attention_mask.shape[1]
l_effective_cap_len = attention_mask.sum(dim=1).tolist()
if isinstance(l_effective_img_len[0], list): # Check for t-dim case
seq_lengths = [
cap_len + sum(ref_img_len) + sum(img_len)
for cap_len, ref_img_len, img_len in zip(
l_effective_cap_len, l_effective_ref_img_len, l_effective_img_len
)
]
else: # Original case
seq_lengths = [
cap_len + sum(ref_img_len) + img_len
for cap_len, ref_img_len, img_len in zip(
l_effective_cap_len, l_effective_ref_img_len, l_effective_img_len
)
]
max_seq_len = max(seq_lengths)
max_ref_img_len = max(
[sum(ref_img_len) for ref_img_len in l_effective_ref_img_len]
)
if isinstance(l_effective_img_len[0], list):
max_img_len = max([sum(ln) for ln in l_effective_img_len])
else:
max_img_len = max(l_effective_img_len)
# Create position IDs
position_ids = torch.zeros(
batch_size, max_seq_len, 3, dtype=torch.int32, device=device
)
for i, (cap_seq_len, seq_len) in enumerate(
zip(l_effective_cap_len, seq_lengths)
):
# add text position ids
position_ids[i, :cap_seq_len] = repeat(
torch.arange(cap_seq_len, dtype=torch.int32, device=device), "l -> l 3"
)
pe_shift = cap_seq_len
pe_shift_len = cap_seq_len
if ref_img_sizes[i] is not None:
for ref_img_size, ref_img_len in zip(
ref_img_sizes[i], l_effective_ref_img_len[i]
):
H, W = ref_img_size
ref_H_tokens, ref_W_tokens = H // p, W // p
assert ref_H_tokens * ref_W_tokens == ref_img_len
# add image position ids
row_ids = repeat(
torch.arange(ref_H_tokens, dtype=torch.int32, device=device),
"h -> h w",
w=ref_W_tokens,
).flatten()
col_ids = repeat(
torch.arange(ref_W_tokens, dtype=torch.int32, device=device),
"w -> h w",
h=ref_H_tokens,
).flatten()
position_ids[i, pe_shift_len : pe_shift_len + ref_img_len, 0] = (
pe_shift
)
position_ids[i, pe_shift_len : pe_shift_len + ref_img_len, 1] = (
row_ids
)
position_ids[i, pe_shift_len : pe_shift_len + ref_img_len, 2] = (
col_ids
)
pe_shift += max(ref_H_tokens, ref_W_tokens)
pe_shift_len += ref_img_len
if isinstance(l_effective_img_len[i], list): # New case
for img_size, img_len in zip(img_sizes[i], l_effective_img_len[i]):
H, W = img_size
H_tokens, W_tokens = H // p, W // p
assert H_tokens * W_tokens == img_len
row_ids = repeat(
torch.arange(H_tokens, dtype=torch.int32, device=device),
"h -> h w",
w=W_tokens,
).flatten()
col_ids = repeat(
torch.arange(W_tokens, dtype=torch.int32, device=device),
"w -> h w",
h=H_tokens,
).flatten()
end_idx = pe_shift_len + img_len
position_ids[i, pe_shift_len:end_idx, 0] = pe_shift
position_ids[i, pe_shift_len:end_idx, 1] = row_ids
position_ids[i, pe_shift_len:end_idx, 2] = col_ids
pe_shift += max(H_tokens, W_tokens)
pe_shift_len = end_idx
else: # Original case
H, W = img_sizes[i]
H_tokens, W_tokens = H // p, W // p
assert H_tokens * W_tokens == l_effective_img_len[i]
row_ids = repeat(
torch.arange(H_tokens, dtype=torch.int32, device=device),
"h -> h w",
w=W_tokens,
).flatten()
col_ids = repeat(
torch.arange(W_tokens, dtype=torch.int32, device=device),
"w -> h w",
h=H_tokens,
).flatten()
assert pe_shift_len + l_effective_img_len[i] == seq_len
position_ids[i, pe_shift_len:seq_len, 0] = pe_shift
position_ids[i, pe_shift_len:seq_len, 1] = row_ids
position_ids[i, pe_shift_len:seq_len, 2] = col_ids
# Get combined rotary embeddings
freqs_cis = self._get_freqs_cis(freqs_cis, position_ids)
# create separate rotary embeddings for captions and images
cap_freqs_cis = torch.zeros(
batch_size,
encoder_seq_len,
freqs_cis.shape[-1],
device=device,
dtype=freqs_cis.dtype,
)
ref_img_freqs_cis = torch.zeros(
batch_size,
max_ref_img_len,
freqs_cis.shape[-1],
device=device,
dtype=freqs_cis.dtype,
)
img_freqs_cis = torch.zeros(
batch_size,
max_img_len,
freqs_cis.shape[-1],
device=device,
dtype=freqs_cis.dtype,
)
for i, (cap_seq_len, ref_img_len, img_len, seq_len) in enumerate(
zip(
l_effective_cap_len,
l_effective_ref_img_len,
l_effective_img_len,
seq_lengths,
)
):
cap_freqs_cis[i, :cap_seq_len] = freqs_cis[i, :cap_seq_len]
ref_img_freqs_cis[i, : sum(ref_img_len)] = freqs_cis[
i, cap_seq_len : cap_seq_len + sum(ref_img_len)
]
if isinstance(img_len, list):
img_len = sum(img_len)
img_freqs_cis[i, :img_len] = freqs_cis[
i,
cap_seq_len + sum(ref_img_len) : cap_seq_len
+ sum(ref_img_len)
+ img_len,
]
return (
cap_freqs_cis,
ref_img_freqs_cis,
img_freqs_cis,
freqs_cis,
l_effective_cap_len,
seq_lengths,
)
class OmniGen2LoraLoaderMixin(LoraBaseMixin):
r"""
Load LoRA layers into [`OmniGen2Transformer3DModel`]. Specific to [`FOFPredPipeline`].
"""
_lora_loadable_modules = ["transformer"]
transformer_name = TRANSFORMER_NAME
@classmethod
@validate_hf_hub_args
def lora_state_dict(
cls,
pretrained_model_name_or_path_or_dict: Union[str, Dict[str, torch.Tensor]],
**kwargs,
):
r"""
Return state dict for lora weights and the network alphas.
We support loading A1111 formatted LoRA checkpoints in a limited capacity.
This function is experimental and might change in the future.
Parameters:
pretrained_model_name_or_path_or_dict (`str` or `os.PathLike` or `dict`):
Can be either:
- A string, the *model id* (for example `google/ddpm-celebahq-256`) of a pretrained model hosted on
the Hub.
- A path to a *directory* (for example `./my_model_directory`) containing the model weights saved
with [`ModelMixin.save_pretrained`].
- A [torch state
dict](https://pytorch.org/tutorials/beginner/saving_loading_models.html#what-is-a-state-dict).
cache_dir (`Union[str, os.PathLike]`, *optional*):
Path to a directory where a downloaded pretrained model configuration is cached if the standard cache
is not used.
force_download (`bool`, *optional*, defaults to `False`):
Whether or not to force the (re-)download of the model weights and configuration files, overriding the
cached versions if they exist.
proxies (`Dict[str, str]`, *optional*):
A dictionary of proxy servers to use by protocol or endpoint, for example, `{'http': 'foo.bar:3128',
'http://hostname': 'foo.bar:4012'}`. The proxies are used on each request.
local_files_only (`bool`, *optional*, defaults to `False`):
Whether to only load local model weights and configuration files or not. If set to `True`, the model
won't be downloaded from the Hub.
token (`str` or *bool*, *optional*):
The token to use as HTTP bearer authorization for remote files. If `True`, the token generated from
`diffusers-cli login` (stored in `~/.huggingface`) is used.
revision (`str`, *optional*, defaults to `"main"`):
The specific model version to use. It can be a branch name, a tag name, a commit id, or any identifier
allowed by Git.
subfolder (`str`, *optional*, defaults to `""`):
The subfolder location of a model file within a larger model repository on the Hub or locally.
"""
# Load the main state dict first which has the LoRA layers for either of
# transformer and text encoder or both.
cache_dir = kwargs.pop("cache_dir", None)
force_download = kwargs.pop("force_download", False)
proxies = kwargs.pop("proxies", None)
local_files_only = kwargs.pop("local_files_only", None)
token = kwargs.pop("token", None)
revision = kwargs.pop("revision", None)
subfolder = kwargs.pop("subfolder", None)
weight_name = kwargs.pop("weight_name", None)
use_safetensors = kwargs.pop("use_safetensors", None)
allow_pickle = False
if use_safetensors is None:
use_safetensors = True
allow_pickle = True
user_agent = {
"file_type": "attn_procs_weights",
"framework": "pytorch",
}
state_dict = _fetch_state_dict(
pretrained_model_name_or_path_or_dict=pretrained_model_name_or_path_or_dict,
weight_name=weight_name,
use_safetensors=use_safetensors,
local_files_only=local_files_only,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
token=token,
revision=revision,
subfolder=subfolder,
user_agent=user_agent,
allow_pickle=allow_pickle,
)
is_dora_scale_present = any("dora_scale" in k for k in state_dict)
if is_dora_scale_present:
warn_msg = "It seems like you are using a DoRA checkpoint that is not compatible in Diffusers at the moment. So, we are going to filter out the keys associated to 'dora_scale` from the state dict. If you think this is a mistake please open an issue https://github.com/huggingface/diffusers/issues/new."
logger.warning(warn_msg)
state_dict = {k: v for k, v in state_dict.items() if "dora_scale" not in k}
# conversion.
non_diffusers = any(k.startswith("diffusion_model.") for k in state_dict)
if non_diffusers:
state_dict = _convert_non_diffusers_lumina2_lora_to_diffusers(state_dict)
return state_dict
# Copied from diffusers.loaders.lora_pipeline.CogVideoXLoraLoaderMixin.load_lora_weights
def load_lora_weights(
self,
pretrained_model_name_or_path_or_dict: Union[str, Dict[str, torch.Tensor]],
adapter_name=None,
**kwargs,
):
"""
Load LoRA weights specified in `pretrained_model_name_or_path_or_dict` into `self.transformer` and
`self.text_encoder`. All kwargs are forwarded to `self.lora_state_dict`. See
[`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`] for more details on how the state dict is loaded.
See [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_into_transformer`] for more details on how the state
dict is loaded into `self.transformer`.
Parameters:
pretrained_model_name_or_path_or_dict (`str` or `os.PathLike` or `dict`):
See [`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`].
adapter_name (`str`, *optional*):
Adapter name to be used for referencing the loaded adapter model. If not specified, it will use
`default_{i}` where i is the total number of adapters being loaded.
low_cpu_mem_usage (`bool`, *optional*):
Speed up model loading by only loading the pretrained LoRA weights and not initializing the random
weights.
kwargs (`dict`, *optional*):
See [`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`].
"""
if not USE_PEFT_BACKEND:
raise ValueError("PEFT backend is required for this method.")
low_cpu_mem_usage = kwargs.pop(
"low_cpu_mem_usage", _LOW_CPU_MEM_USAGE_DEFAULT_LORA
)
if low_cpu_mem_usage and is_peft_version("<", "0.13.0"):
raise ValueError(
"`low_cpu_mem_usage=True` is not compatible with this `peft` version. Please update it with `pip install -U peft`."
)
# if a dict is passed, copy it instead of modifying it inplace
if isinstance(pretrained_model_name_or_path_or_dict, dict):
pretrained_model_name_or_path_or_dict = (
pretrained_model_name_or_path_or_dict.copy()
)
# First, ensure that the checkpoint is a compatible one and can be successfully loaded.
state_dict = self.lora_state_dict(
pretrained_model_name_or_path_or_dict, **kwargs
)
is_correct_format = all("lora" in key for key in state_dict.keys())
if not is_correct_format:
raise ValueError("Invalid LoRA checkpoint.")
self.load_lora_into_transformer(
state_dict,
transformer=getattr(self, self.transformer_name)
if not hasattr(self, "transformer")
else self.transformer,
adapter_name=adapter_name,
_pipeline=self,
low_cpu_mem_usage=low_cpu_mem_usage,
)
@classmethod
# Copied from diffusers.loaders.lora_pipeline.SD3LoraLoaderMixin.load_lora_into_transformer with SD3Transformer2DModel->Lumina2Transformer2DModel
def load_lora_into_transformer(
cls,
state_dict,
transformer,
adapter_name=None,
_pipeline=None,
low_cpu_mem_usage=False,
hotswap: bool = False,
):
"""
This will load the LoRA layers specified in `state_dict` into `transformer`.
Parameters:
state_dict (`dict`):
A standard state dict containing the lora layer parameters. The keys can either be indexed directly
into the unet or prefixed with an additional `unet` which can be used to distinguish between text
encoder lora layers.
transformer (`Lumina2Transformer2DModel`):
The Transformer model to load the LoRA layers into.
adapter_name (`str`, *optional*):
Adapter name to be used for referencing the loaded adapter model. If not specified, it will use
`default_{i}` where i is the total number of adapters being loaded.
low_cpu_mem_usage (`bool`, *optional*):
Speed up model loading by only loading the pretrained LoRA weights and not initializing the random
weights.
hotswap : (`bool`, *optional*)
Defaults to `False`. Whether to substitute an existing (LoRA) adapter with the newly loaded adapter
in-place. This means that, instead of loading an additional adapter, this will take the existing
adapter weights and replace them with the weights of the new adapter. This can be faster and more
memory efficient. However, the main advantage of hotswapping is that when the model is compiled with
torch.compile, loading the new adapter does not require recompilation of the model. When using
hotswapping, the passed `adapter_name` should be the name of an already loaded adapter.
If the new adapter and the old adapter have different ranks and/or LoRA alphas (i.e. scaling), you need
to call an additional method before loading the adapter:
```py
pipeline = ... # load diffusers pipeline
max_rank = ... # the highest rank among all LoRAs that you want to load
# call *before* compiling and loading the LoRA adapter
pipeline.enable_lora_hotswap(target_rank=max_rank)
pipeline.load_lora_weights(file_name)
# optionally compile the model now
```
Note that hotswapping adapters of the text encoder is not yet supported. There are some further
limitations to this technique, which are documented here:
https://huggingface.co/docs/peft/main/en/package_reference/hotswap
"""
if low_cpu_mem_usage and is_peft_version("<", "0.13.0"):
raise ValueError(
"`low_cpu_mem_usage=True` is not compatible with this `peft` version. Please update it with `pip install -U peft`."
)
# Load the layers corresponding to transformer.
logger.info(f"Loading {cls.transformer_name}.")
transformer.load_lora_adapter(
state_dict,
network_alphas=None,
adapter_name=adapter_name,
_pipeline=_pipeline,
low_cpu_mem_usage=low_cpu_mem_usage,
hotswap=hotswap,
)
@classmethod
# Copied from diffusers.loaders.lora_pipeline.CogVideoXLoraLoaderMixin.save_lora_weights
def save_lora_weights(
cls,
save_directory: Union[str, os.PathLike],
transformer_lora_layers: Dict[str, Union[torch.nn.Module, torch.Tensor]] = None,
is_main_process: bool = True,
weight_name: str = None,
save_function: Callable = None,
safe_serialization: bool = True,
):
r"""
Save the LoRA parameters corresponding to the UNet and text encoder.
Arguments:
save_directory (`str` or `os.PathLike`):
Directory to save LoRA parameters to. Will be created if it doesn't exist.
transformer_lora_layers (`Dict[str, torch.nn.Module]` or `Dict[str, torch.Tensor]`):
State dict of the LoRA layers corresponding to the `transformer`.
is_main_process (`bool`, *optional*, defaults to `True`):
Whether the process calling this is the main process or not. Useful during distributed training and you
need to call this function on all processes. In this case, set `is_main_process=True` only on the main
process to avoid race conditions.
save_function (`Callable`):
The function to use to save the state dictionary. Useful during distributed training when you need to
replace `torch.save` with another method. Can be configured with the environment variable
`DIFFUSERS_SAVE_MODE`.
safe_serialization (`bool`, *optional*, defaults to `True`):
Whether to save the model using `safetensors` or the traditional PyTorch way with `pickle`.
"""
state_dict = {}
if not transformer_lora_layers:
raise ValueError("You must pass `transformer_lora_layers`.")
if transformer_lora_layers:
state_dict.update(
cls.pack_weights(transformer_lora_layers, cls.transformer_name)
)
# Save the model
cls.write_lora_layers(
state_dict=state_dict,
save_directory=save_directory,
is_main_process=is_main_process,
weight_name=weight_name,
save_function=save_function,
safe_serialization=safe_serialization,
)
# Copied from diffusers.loaders.lora_pipeline.SanaLoraLoaderMixin.fuse_lora
def fuse_lora(
self,
components: List[str] = ["transformer"],
lora_scale: float = 1.0,
safe_fusing: bool = False,
adapter_names: Optional[List[str]] = None,
**kwargs,
):
r"""
Fuses the LoRA parameters into the original parameters of the corresponding blocks.
This is an experimental API.
Args:
components: (`List[str]`): List of LoRA-injectable components to fuse the LoRAs into.
lora_scale (`float`, defaults to 1.0):
Controls how much to influence the outputs with the LoRA parameters.
safe_fusing (`bool`, defaults to `False`):
Whether to check fused weights for NaN values before fusing and if values are NaN not fusing them.
adapter_names (`List[str]`, *optional*):
Adapter names to be used for fusing. If nothing is passed, all active adapters will be fused.
Example:
```py
from diffusers import DiffusionPipeline
import torch
pipeline = DiffusionPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16
).to("cuda")
pipeline.load_lora_weights("nerijs/pixel-art-xl", weight_name="pixel-art-xl.safetensors", adapter_name="pixel")
pipeline.fuse_lora(lora_scale=0.7)
```
"""
super().fuse_lora(
components=components,
lora_scale=lora_scale,
safe_fusing=safe_fusing,
adapter_names=adapter_names,
**kwargs,
)
# Copied from diffusers.loaders.lora_pipeline.SanaLoraLoaderMixin.unfuse_lora
def unfuse_lora(self, components: List[str] = ["transformer"], **kwargs):
r"""
Reverses the effect of
[`pipe.fuse_lora()`](https://huggingface.co/docs/diffusers/main/en/api/loaders#diffusers.loaders.LoraBaseMixin.fuse_lora).
This is an experimental API.
Args:
components (`List[str]`): List of LoRA-injectable components to unfuse LoRA from.
unfuse_transformer (`bool`, defaults to `True`): Whether to unfuse the UNet LoRA parameters.
"""
super().unfuse_lora(components=components, **kwargs)
def cache_init(self, num_steps: int):
"""
Initialization for cache.
"""
cache_dic = {}
cache = {}
cache_index = {}
cache[-1] = {}
cache_index[-1] = {}
cache_index["layer_index"] = {}
cache[-1]["layers_stream"] = {}
cache_dic["cache_counter"] = 0
for j in range(len(self.transformer.layers)):
cache[-1]["layers_stream"][j] = {}
cache_index[-1][j] = {}
cache_dic["Delta-DiT"] = False
cache_dic["cache_type"] = "random"
cache_dic["cache_index"] = cache_index
cache_dic["cache"] = cache
cache_dic["fresh_ratio_schedule"] = "ToCa"
cache_dic["fresh_ratio"] = 0.0
cache_dic["fresh_threshold"] = 3
cache_dic["soft_fresh_weight"] = 0.0
cache_dic["taylor_cache"] = True
cache_dic["max_order"] = 4
cache_dic["first_enhance"] = 5
current = {}
current["activated_steps"] = [0]
current["step"] = 0
current["num_steps"] = num_steps
return cache_dic, current
@dataclass
class FMPipelineOutput(BaseOutput):
"""
Output class for OmniGen2 pipeline.
Args:
images (Union[List[PIL.Image.Image], np.ndarray]):
List of denoised PIL images of length `batch_size` or numpy array of shape
`(batch_size, height, width, num_channels)`. Contains the generated images.
"""
images: Union[List[PIL.Image.Image], np.ndarray]
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
**kwargs,
):
"""
Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`.
Args:
scheduler (`SchedulerMixin`):
The scheduler to get timesteps from.
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
if timesteps is not None:
accepts_timesteps = "timesteps" in set(
inspect.signature(scheduler.set_timesteps).parameters.keys()
)
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
timesteps = scheduler.timesteps
return timesteps, num_inference_steps
class FOFPredPipeline(DiffusionPipeline, OmniGen2LoraLoaderMixin):
"""
Pipeline for text-to-image generation using OmniGen2.
This pipeline implements a text-to-image generation model that uses:
- Qwen2.5-VL for text encoding
- A custom transformer architecture for image generation
- VAE for image encoding/decoding
- FlowMatchEulerDiscreteScheduler for noise scheduling
Args:
transformer (OmniGen2Transformer3DModel): The transformer model for image generation.
vae (AutoencoderKL): The VAE model for image encoding/decoding.
scheduler (FlowMatchEulerDiscreteScheduler): The scheduler for noise scheduling.
text_encoder (Qwen2_5_VLModel): The text encoder model.
tokenizer (Union[Qwen2Tokenizer, Qwen2TokenizerFast]): The tokenizer for text processing.
"""
model_cpu_offload_seq = "mllm->transformer->vae"
def __init__(
self,
transformer: OmniGen2Transformer3DModel,
vae: AutoencoderKL,
scheduler: FlowMatchEulerDiscreteScheduler,
mllm: Qwen2_5_VLForConditionalGeneration,
processor,
) -> None:
"""
Initialize the OmniGen2 pipeline.
Args:
transformer: The transformer model for image generation.
vae: The VAE model for image encoding/decoding.
scheduler: The scheduler for noise scheduling.
text_encoder: The text encoder model.
tokenizer: The tokenizer for text processing.
"""
super().__init__()
self.register_modules(
transformer=transformer,
vae=vae,
scheduler=scheduler,
mllm=mllm,
processor=processor,
)
self.vae_scale_factor = (
2 ** (len(self.vae.config.block_out_channels) - 1)
if hasattr(self, "vae") and self.vae is not None
else 8
)
self.image_processor = OmniGen2ImageProcessor(
vae_scale_factor=self.vae_scale_factor * 2, do_resize=True
)
self.default_sample_size = 128
def prepare_latents(
self,
batch_size: int,
num_channels_latents: int,
height: int,
width: int,
dtype: torch.dtype,
device: torch.device,
generator: Optional[torch.Generator],
latents: Optional[torch.FloatTensor] = None,
frame_count: int = 1,
) -> torch.FloatTensor:
"""
Prepare the initial latents for the diffusion process.
Args:
batch_size: The number of images to generate.
num_channels_latents: The number of channels in the latent space.
height: The height of the generated image.
width: The width of the generated image.
dtype: The data type of the latents.
device: The device to place the latents on.
generator: The random number generator to use.
latents: Optional pre-computed latents to use instead of random initialization.
frame_count: The number of frames to output.
Returns:
torch.FloatTensor: The prepared latents tensor.
"""
height = int(height) // self.vae_scale_factor
width = int(width) // self.vae_scale_factor
if frame_count > 1:
shape = (batch_size, frame_count, num_channels_latents, height, width)
else:
shape = (batch_size, num_channels_latents, height, width)
if latents is None:
latents = randn_tensor(
shape, generator=generator, device=device, dtype=dtype
)
else:
latents = latents.to(device)
return latents
def encode_vae(self, img: torch.FloatTensor) -> torch.FloatTensor:
"""
Encode an image into the VAE latent space.
Args:
img: The input image tensor to encode.
Returns:
torch.FloatTensor: The encoded latent representation.
"""
z0 = self.vae.encode(img.to(dtype=self.vae.dtype)).latent_dist.sample()
if self.vae.config.shift_factor is not None:
z0 = z0 - self.vae.config.shift_factor
if self.vae.config.scaling_factor is not None:
z0 = z0 * self.vae.config.scaling_factor
z0 = z0.to(dtype=self.vae.dtype)
return z0
def prepare_image(
self,
images: Union[List[PIL.Image.Image], PIL.Image.Image],
batch_size: int,
num_images_per_prompt: int,
max_pixels: int,
max_side_length: int,
device: torch.device,
dtype: torch.dtype,
) -> List[Optional[torch.FloatTensor]]:
"""
Prepare input images for processing by encoding them into the VAE latent space.
Args:
images: Single image or list of images to process.
batch_size: The number of images to generate per prompt.
num_images_per_prompt: The number of images to generate for each prompt.
device: The device to place the encoded latents on.
dtype: The data type of the encoded latents.
Returns:
List[Optional[torch.FloatTensor]]: List of encoded latent representations for each image.
"""
if batch_size == 1:
images = [images]
latents = []
for i, img in enumerate(images):
if img is not None and len(img) > 0:
ref_latents = []
for j, img_j in enumerate(img):
img_j = self.image_processor.preprocess(
img_j, max_pixels=max_pixels, max_side_length=max_side_length
)
ref_latents.append(
self.encode_vae(img_j.to(device=device)).squeeze(0)
)
else:
ref_latents = None
for _ in range(num_images_per_prompt):
latents.append(ref_latents)
return latents
def _get_qwen2_prompt_embeds(
self,
prompt: Union[str, List[str]],
device: Optional[torch.device] = None,
max_sequence_length: int = 256,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Get prompt embeddings from the Qwen2 text encoder.
Args:
prompt: The prompt or list of prompts to encode.
device: The device to place the embeddings on. If None, uses the pipeline's device.
max_sequence_length: Maximum sequence length for tokenization.
Returns:
Tuple[torch.Tensor, torch.Tensor]: A tuple containing:
- The prompt embeddings tensor
- The attention mask tensor
Raises:
Warning: If the input text is truncated due to sequence length limitations.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
# text_inputs = self.processor.tokenizer(
# prompt,
# padding="max_length",
# max_length=max_sequence_length,
# truncation=True,
# return_tensors="pt",
# )
text_inputs = self.processor.tokenizer(
prompt,
padding="longest",
max_length=max_sequence_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids.to(device)
untruncated_ids = self.processor.tokenizer(
prompt, padding="longest", return_tensors="pt"
).input_ids.to(device)
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(
text_input_ids, untruncated_ids
):
removed_text = self.processor.tokenizer.batch_decode(
untruncated_ids[:, max_sequence_length - 1 : -1]
)
logger.warning(
"The following part of your input was truncated because Gemma can only handle sequences up to"
f" {max_sequence_length} tokens: {removed_text}"
)
prompt_attention_mask = text_inputs.attention_mask.to(device)
prompt_embeds = self.mllm(
text_input_ids,
attention_mask=prompt_attention_mask,
output_hidden_states=True,
).hidden_states[-1]
if self.mllm is not None:
dtype = self.mllm.dtype
elif self.transformer is not None:
dtype = self.transformer.dtype
else:
dtype = None
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
return prompt_embeds, prompt_attention_mask
def _apply_chat_template(self, prompt: str):
prompt = [
{
"role": "system",
"content": "You are a helpful assistant that generates high-quality images based on user instructions.",
},
{"role": "user", "content": prompt},
]
prompt = self.processor.tokenizer.apply_chat_template(
prompt, tokenize=False, add_generation_prompt=False
)
return prompt
def encode_prompt(
self,
prompt: Union[str, List[str]],
do_classifier_free_guidance: bool = True,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: int = 1,
device: Optional[torch.device] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
prompt_attention_mask: Optional[torch.Tensor] = None,
negative_prompt_attention_mask: Optional[torch.Tensor] = None,
max_sequence_length: int = 256,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
negative_prompt (`str` or `List[str]`, *optional*):
The prompt not to guide the image generation. If not defined, one has to pass `negative_prompt_embeds`
instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). For
Lumina-T2I, this should be "".
do_classifier_free_guidance (`bool`, *optional*, defaults to `True`):
whether to use classifier free guidance or not
num_images_per_prompt (`int`, *optional*, defaults to 1):
number of images that should be generated per prompt
device: (`torch.device`, *optional*):
torch device to place the resulting embeddings on
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
negative_prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated negative text embeddings. For Lumina-T2I, it's should be the embeddings of the "" string.
max_sequence_length (`int`, defaults to `256`):
Maximum sequence length to use for the prompt.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
prompt = [self._apply_chat_template(_prompt) for _prompt in prompt]
if prompt is not None:
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if prompt_embeds is None:
prompt_embeds, prompt_attention_mask = self._get_qwen2_prompt_embeds(
prompt=prompt, device=device, max_sequence_length=max_sequence_length
)
batch_size, seq_len, _ = prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
prompt_embeds = prompt_embeds.view(
batch_size * num_images_per_prompt, seq_len, -1
)
prompt_attention_mask = prompt_attention_mask.repeat(num_images_per_prompt, 1)
prompt_attention_mask = prompt_attention_mask.view(
batch_size * num_images_per_prompt, -1
)
# Get negative embeddings for classifier free guidance
if do_classifier_free_guidance and negative_prompt_embeds is None:
negative_prompt = negative_prompt if negative_prompt is not None else ""
# Normalize str to list
negative_prompt = (
batch_size * [negative_prompt]
if isinstance(negative_prompt, str)
else negative_prompt
)
negative_prompt = [
self._apply_chat_template(_negative_prompt)
for _negative_prompt in negative_prompt
]
if prompt is not None and type(prompt) is not type(negative_prompt):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
elif isinstance(negative_prompt, str):
negative_prompt = [negative_prompt]
elif batch_size != len(negative_prompt):
raise ValueError(
f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
negative_prompt_embeds, negative_prompt_attention_mask = (
self._get_qwen2_prompt_embeds(
prompt=negative_prompt,
device=device,
max_sequence_length=max_sequence_length,
)
)
batch_size, seq_len, _ = negative_prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
negative_prompt_embeds = negative_prompt_embeds.repeat(
1, num_images_per_prompt, 1
)
negative_prompt_embeds = negative_prompt_embeds.view(
batch_size * num_images_per_prompt, seq_len, -1
)
negative_prompt_attention_mask = negative_prompt_attention_mask.repeat(
num_images_per_prompt, 1
)
negative_prompt_attention_mask = negative_prompt_attention_mask.view(
batch_size * num_images_per_prompt, -1
)
return (
prompt_embeds,
prompt_attention_mask,
negative_prompt_embeds,
negative_prompt_attention_mask,
)
@property
def num_timesteps(self):
return self._num_timesteps
@property
def text_guidance_scale(self):
return self._text_guidance_scale
@property
def image_guidance_scale(self):
return self._image_guidance_scale
@property
def cfg_range(self):
return self._cfg_range
@torch.no_grad()
def __call__(
self,
prompt: Optional[Union[str, List[str]]] = None,
negative_prompt: Optional[Union[str, List[str]]] = None,
prompt_embeds: Optional[torch.FloatTensor] = None,
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
prompt_attention_mask: Optional[torch.LongTensor] = None,
negative_prompt_attention_mask: Optional[torch.LongTensor] = None,
max_sequence_length: Optional[int] = None,
callback_on_step_end_tensor_inputs: Optional[List[str]] = None,
input_images: Optional[List[PIL.Image.Image]] = None,
num_images_per_prompt: int = 1,
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: int = 1024 * 1024,
max_input_image_side_length: int = 1024,
align_res: bool = True,
num_inference_steps: int = 28,
text_guidance_scale: float = 4.0,
image_guidance_scale: float = 1.0,
cfg_range: Tuple[float, float] = (0.0, 1.0),
attention_kwargs: Optional[Dict[str, Any]] = None,
timesteps: List[int] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.FloatTensor] = None,
frame_count: int = 1,
output_type: Optional[str] = "pil",
return_dict: bool = True,
verbose: bool = False,
step_func=None,
get_latents_text_embeds=False,
):
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
self._text_guidance_scale = text_guidance_scale
self._image_guidance_scale = image_guidance_scale
self._cfg_range = cfg_range
self._attention_kwargs = attention_kwargs
# 2. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
device = self._execution_device
# 3. Encode input prompt
(
prompt_embeds,
prompt_attention_mask,
negative_prompt_embeds,
negative_prompt_attention_mask,
) = self.encode_prompt(
prompt,
self.text_guidance_scale > 1.0,
negative_prompt=negative_prompt,
num_images_per_prompt=num_images_per_prompt,
device=device,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
max_sequence_length=max_sequence_length,
)
dtype = self.vae.dtype
# 3. Prepare control image
ref_latents = self.prepare_image(
images=input_images,
batch_size=batch_size,
num_images_per_prompt=num_images_per_prompt,
max_pixels=max_pixels,
max_side_length=max_input_image_side_length,
device=device,
dtype=dtype,
)
if input_images is None:
input_images = []
if len(input_images) == 1 and align_res:
width, height = (
ref_latents[0][0].shape[-1] * self.vae_scale_factor,
ref_latents[0][0].shape[-2] * self.vae_scale_factor,
)
ori_width, ori_height = width, height
else:
ori_width, ori_height = width, height
cur_pixels = height * width
ratio = (max_pixels / cur_pixels) ** 0.5
ratio = min(ratio, 1.0)
height, width = (
int(height * ratio) // 16 * 16,
int(width * ratio) // 16 * 16,
)
if len(input_images) == 0:
self._image_guidance_scale = 1
# 4. Prepare latents.
latent_channels = self.transformer.config.in_channels
latents = self.prepare_latents(
batch_size * num_images_per_prompt,
latent_channels,
height,
width,
prompt_embeds.dtype,
device,
generator,
latents,
frame_count,
)
freqs_cis = OmniGen2RotaryPosEmbed.get_freqs_cis(
self.transformer.config.axes_dim_rope,
self.transformer.config.axes_lens,
theta=10000,
)
image = self.processing(
latents=latents,
ref_latents=ref_latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
num_inference_steps=num_inference_steps,
timesteps=timesteps,
device=device,
dtype=dtype,
verbose=verbose,
step_func=step_func,
get_latents_text_embeds=get_latents_text_embeds,
)
if get_latents_text_embeds:
return image, prompt_embeds
if len(image.shape) == 4:
image = F.interpolate(image, size=(ori_height, ori_width), mode="bilinear")
image = self.image_processor.postprocess(image, output_type=output_type)
else:
image = [
F.interpolate(
image[:, i], size=(ori_height, ori_width), mode="bilinear"
)
for i in range(image.shape[1])
]
image = [
self.image_processor.postprocess(x, output_type=output_type)
for x in image
]
image = torch.stack(image, dim=1)
# Offload all models
self.maybe_free_model_hooks()
if not return_dict:
return image
else:
return FMPipelineOutput(images=image)
def processing(
self,
latents,
ref_latents,
prompt_embeds,
freqs_cis,
negative_prompt_embeds,
prompt_attention_mask,
negative_prompt_attention_mask,
num_inference_steps,
timesteps,
device,
dtype,
verbose,
step_func=None,
get_latents_text_embeds=False,
):
batch_size = latents.shape[0]
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler,
num_inference_steps,
device,
timesteps,
num_tokens=latents.shape[-2] * latents.shape[-1],
)
num_warmup_steps = max(
len(timesteps) - num_inference_steps * self.scheduler.order, 0
)
self._num_timesteps = len(timesteps)
enable_taylorseer = getattr(self, "enable_taylorseer", False)
if enable_taylorseer:
model_pred_cache_dic, model_pred_current = cache_init(
self, num_inference_steps
)
model_pred_ref_cache_dic, model_pred_ref_current = cache_init(
self, num_inference_steps
)
model_pred_uncond_cache_dic, model_pred_uncond_current = cache_init(
self, num_inference_steps
)
self.transformer.enable_taylorseer = True
elif self.transformer.enable_teacache:
# Use different TeaCacheParams for different conditions
teacache_params = TeaCacheParams()
teacache_params_uncond = TeaCacheParams()
teacache_params_ref = TeaCacheParams()
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
if enable_taylorseer:
self.transformer.cache_dic = model_pred_cache_dic
self.transformer.current = model_pred_current
elif self.transformer.enable_teacache:
teacache_params.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params
model_pred = self.predict(
t=t,
latents=latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
text_guidance_scale = (
self.text_guidance_scale
if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1]
else 1.0
)
image_guidance_scale = (
self.image_guidance_scale
if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1]
else 1.0
)
if text_guidance_scale > 1.0 and image_guidance_scale > 1.0:
if enable_taylorseer:
self.transformer.cache_dic = model_pred_ref_cache_dic
self.transformer.current = model_pred_ref_current
elif self.transformer.enable_teacache:
teacache_params_ref.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params_ref
model_pred_ref = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
if enable_taylorseer:
self.transformer.cache_dic = model_pred_uncond_cache_dic
self.transformer.current = model_pred_uncond_current
elif self.transformer.enable_teacache:
teacache_params_uncond.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params_uncond
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
model_pred = (
model_pred_uncond
+ image_guidance_scale * (model_pred_ref - model_pred_uncond)
+ text_guidance_scale * (model_pred - model_pred_ref)
)
elif text_guidance_scale > 1.0:
if enable_taylorseer:
self.transformer.cache_dic = model_pred_uncond_cache_dic
self.transformer.current = model_pred_uncond_current
elif self.transformer.enable_teacache:
teacache_params_uncond.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params_uncond
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
model_pred = model_pred_uncond + text_guidance_scale * (
model_pred - model_pred_uncond
)
latents = self.scheduler.step(
model_pred, t, latents, return_dict=False
)[0]
latents = latents.to(dtype=dtype)
if i == len(timesteps) - 1 or (
(i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0
):
progress_bar.update()
if step_func is not None:
step_func(i, self._num_timesteps)
if enable_taylorseer:
del (
model_pred_cache_dic,
model_pred_ref_cache_dic,
model_pred_uncond_cache_dic,
)
del model_pred_current, model_pred_ref_current, model_pred_uncond_current
latents = latents.to(dtype=dtype)
if get_latents_text_embeds:
return latents
if self.vae.config.scaling_factor is not None:
latents = latents / self.vae.config.scaling_factor
if self.vae.config.shift_factor is not None:
latents = latents + self.vae.config.shift_factor
if len(latents.shape) == 4:
image = self.vae.decode(latents, return_dict=False)[0]
else:
image = [
self.vae.decode(latents[:, i], return_dict=False)[0]
for i in range(latents.shape[1])
]
image = torch.stack(image, dim=1)
return image
def predict(
self,
t,
latents,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
ref_image_hidden_states,
):
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML
timestep = t.expand(latents.shape[0]).to(latents.dtype)
if len(latents.shape) == 4:
batch_size, num_channels_latents, height, width = latents.shape
is_temporal = False
else:
batch_size, num_frames, num_channels_latents, height, width = latents.shape
latents = [_latents for _latents in latents]
is_temporal = True
optional_kwargs = {}
if "ref_image_hidden_states" in set(
inspect.signature(self.transformer.forward).parameters.keys()
):
optional_kwargs["ref_image_hidden_states"] = ref_image_hidden_states
model_pred = self.transformer(
latents,
timestep,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
**optional_kwargs,
)
if is_temporal:
model_pred = torch.stack(model_pred)
return model_pred