FOFPred / pipeline_fofpred.py
kahnchana's picture
Add pipeline code for trust_remote_code
0cb69f0 verified
raw
history blame
34.7 kB
"""
FOFPred Diffusion Pipeline.
Modified from OmniGen2 Diffusion Pipeline (By OmniGen2 Team and The HuggingFace Team).
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import inspect
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
import PIL.Image
import torch
import torch.nn.functional as F
from diffusers.models.autoencoders import AutoencoderKL
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from diffusers.schedulers import FlowMatchEulerDiscreteScheduler
from diffusers.utils import (
BaseOutput,
is_torch_xla_available,
logging,
)
from diffusers.utils.torch_utils import randn_tensor
from transformers import Qwen2_5_VLForConditionalGeneration
from fofpred.pipelines.image_processor import OmniGen2ImageProcessor
from fofpred.utils.teacache_util import TeaCacheParams
from ...models.transformers import OmniGen2Transformer3DModel
from ...models.transformers.repo import OmniGen2RotaryPosEmbed
from ..lora_pipeline import OmniGen2LoraLoaderMixin
if is_torch_xla_available():
XLA_AVAILABLE = True
else:
XLA_AVAILABLE = False
from ...cache_functions import cache_init
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
@dataclass
class FMPipelineOutput(BaseOutput):
"""
Output class for OmniGen2 pipeline.
Args:
images (Union[List[PIL.Image.Image], np.ndarray]):
List of denoised PIL images of length `batch_size` or numpy array of shape
`(batch_size, height, width, num_channels)`. Contains the generated images.
"""
images: Union[List[PIL.Image.Image], np.ndarray]
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
**kwargs,
):
"""
Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`.
Args:
scheduler (`SchedulerMixin`):
The scheduler to get timesteps from.
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
if timesteps is not None:
accepts_timesteps = "timesteps" in set(
inspect.signature(scheduler.set_timesteps).parameters.keys()
)
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
timesteps = scheduler.timesteps
return timesteps, num_inference_steps
class FOFPredPipeline(DiffusionPipeline, OmniGen2LoraLoaderMixin):
"""
Pipeline for text-to-image generation using OmniGen2.
This pipeline implements a text-to-image generation model that uses:
- Qwen2.5-VL for text encoding
- A custom transformer architecture for image generation
- VAE for image encoding/decoding
- FlowMatchEulerDiscreteScheduler for noise scheduling
Args:
transformer (OmniGen2Transformer3DModel): The transformer model for image generation.
vae (AutoencoderKL): The VAE model for image encoding/decoding.
scheduler (FlowMatchEulerDiscreteScheduler): The scheduler for noise scheduling.
text_encoder (Qwen2_5_VLModel): The text encoder model.
tokenizer (Union[Qwen2Tokenizer, Qwen2TokenizerFast]): The tokenizer for text processing.
"""
model_cpu_offload_seq = "mllm->transformer->vae"
def __init__(
self,
transformer: OmniGen2Transformer3DModel,
vae: AutoencoderKL,
scheduler: FlowMatchEulerDiscreteScheduler,
mllm: Qwen2_5_VLForConditionalGeneration,
processor,
) -> None:
"""
Initialize the OmniGen2 pipeline.
Args:
transformer: The transformer model for image generation.
vae: The VAE model for image encoding/decoding.
scheduler: The scheduler for noise scheduling.
text_encoder: The text encoder model.
tokenizer: The tokenizer for text processing.
"""
super().__init__()
self.register_modules(
transformer=transformer,
vae=vae,
scheduler=scheduler,
mllm=mllm,
processor=processor,
)
self.vae_scale_factor = (
2 ** (len(self.vae.config.block_out_channels) - 1)
if hasattr(self, "vae") and self.vae is not None
else 8
)
self.image_processor = OmniGen2ImageProcessor(
vae_scale_factor=self.vae_scale_factor * 2, do_resize=True
)
self.default_sample_size = 128
def prepare_latents(
self,
batch_size: int,
num_channels_latents: int,
height: int,
width: int,
dtype: torch.dtype,
device: torch.device,
generator: Optional[torch.Generator],
latents: Optional[torch.FloatTensor] = None,
frame_count: int = 1,
) -> torch.FloatTensor:
"""
Prepare the initial latents for the diffusion process.
Args:
batch_size: The number of images to generate.
num_channels_latents: The number of channels in the latent space.
height: The height of the generated image.
width: The width of the generated image.
dtype: The data type of the latents.
device: The device to place the latents on.
generator: The random number generator to use.
latents: Optional pre-computed latents to use instead of random initialization.
frame_count: The number of frames to output.
Returns:
torch.FloatTensor: The prepared latents tensor.
"""
height = int(height) // self.vae_scale_factor
width = int(width) // self.vae_scale_factor
if frame_count > 1:
shape = (batch_size, frame_count, num_channels_latents, height, width)
else:
shape = (batch_size, num_channels_latents, height, width)
if latents is None:
latents = randn_tensor(
shape, generator=generator, device=device, dtype=dtype
)
else:
latents = latents.to(device)
return latents
def encode_vae(self, img: torch.FloatTensor) -> torch.FloatTensor:
"""
Encode an image into the VAE latent space.
Args:
img: The input image tensor to encode.
Returns:
torch.FloatTensor: The encoded latent representation.
"""
z0 = self.vae.encode(img.to(dtype=self.vae.dtype)).latent_dist.sample()
if self.vae.config.shift_factor is not None:
z0 = z0 - self.vae.config.shift_factor
if self.vae.config.scaling_factor is not None:
z0 = z0 * self.vae.config.scaling_factor
z0 = z0.to(dtype=self.vae.dtype)
return z0
def prepare_image(
self,
images: Union[List[PIL.Image.Image], PIL.Image.Image],
batch_size: int,
num_images_per_prompt: int,
max_pixels: int,
max_side_length: int,
device: torch.device,
dtype: torch.dtype,
) -> List[Optional[torch.FloatTensor]]:
"""
Prepare input images for processing by encoding them into the VAE latent space.
Args:
images: Single image or list of images to process.
batch_size: The number of images to generate per prompt.
num_images_per_prompt: The number of images to generate for each prompt.
device: The device to place the encoded latents on.
dtype: The data type of the encoded latents.
Returns:
List[Optional[torch.FloatTensor]]: List of encoded latent representations for each image.
"""
if batch_size == 1:
images = [images]
latents = []
for i, img in enumerate(images):
if img is not None and len(img) > 0:
ref_latents = []
for j, img_j in enumerate(img):
img_j = self.image_processor.preprocess(
img_j, max_pixels=max_pixels, max_side_length=max_side_length
)
ref_latents.append(
self.encode_vae(img_j.to(device=device)).squeeze(0)
)
else:
ref_latents = None
for _ in range(num_images_per_prompt):
latents.append(ref_latents)
return latents
def _get_qwen2_prompt_embeds(
self,
prompt: Union[str, List[str]],
device: Optional[torch.device] = None,
max_sequence_length: int = 256,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Get prompt embeddings from the Qwen2 text encoder.
Args:
prompt: The prompt or list of prompts to encode.
device: The device to place the embeddings on. If None, uses the pipeline's device.
max_sequence_length: Maximum sequence length for tokenization.
Returns:
Tuple[torch.Tensor, torch.Tensor]: A tuple containing:
- The prompt embeddings tensor
- The attention mask tensor
Raises:
Warning: If the input text is truncated due to sequence length limitations.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
# text_inputs = self.processor.tokenizer(
# prompt,
# padding="max_length",
# max_length=max_sequence_length,
# truncation=True,
# return_tensors="pt",
# )
text_inputs = self.processor.tokenizer(
prompt,
padding="longest",
max_length=max_sequence_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids.to(device)
untruncated_ids = self.processor.tokenizer(
prompt, padding="longest", return_tensors="pt"
).input_ids.to(device)
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(
text_input_ids, untruncated_ids
):
removed_text = self.processor.tokenizer.batch_decode(
untruncated_ids[:, max_sequence_length - 1 : -1]
)
logger.warning(
"The following part of your input was truncated because Gemma can only handle sequences up to"
f" {max_sequence_length} tokens: {removed_text}"
)
prompt_attention_mask = text_inputs.attention_mask.to(device)
prompt_embeds = self.mllm(
text_input_ids,
attention_mask=prompt_attention_mask,
output_hidden_states=True,
).hidden_states[-1]
if self.mllm is not None:
dtype = self.mllm.dtype
elif self.transformer is not None:
dtype = self.transformer.dtype
else:
dtype = None
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
return prompt_embeds, prompt_attention_mask
def _apply_chat_template(self, prompt: str):
prompt = [
{
"role": "system",
"content": "You are a helpful assistant that generates high-quality images based on user instructions.",
},
{"role": "user", "content": prompt},
]
prompt = self.processor.tokenizer.apply_chat_template(
prompt, tokenize=False, add_generation_prompt=False
)
return prompt
def encode_prompt(
self,
prompt: Union[str, List[str]],
do_classifier_free_guidance: bool = True,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: int = 1,
device: Optional[torch.device] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
prompt_attention_mask: Optional[torch.Tensor] = None,
negative_prompt_attention_mask: Optional[torch.Tensor] = None,
max_sequence_length: int = 256,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
negative_prompt (`str` or `List[str]`, *optional*):
The prompt not to guide the image generation. If not defined, one has to pass `negative_prompt_embeds`
instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). For
Lumina-T2I, this should be "".
do_classifier_free_guidance (`bool`, *optional*, defaults to `True`):
whether to use classifier free guidance or not
num_images_per_prompt (`int`, *optional*, defaults to 1):
number of images that should be generated per prompt
device: (`torch.device`, *optional*):
torch device to place the resulting embeddings on
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
negative_prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated negative text embeddings. For Lumina-T2I, it's should be the embeddings of the "" string.
max_sequence_length (`int`, defaults to `256`):
Maximum sequence length to use for the prompt.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
prompt = [self._apply_chat_template(_prompt) for _prompt in prompt]
if prompt is not None:
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if prompt_embeds is None:
prompt_embeds, prompt_attention_mask = self._get_qwen2_prompt_embeds(
prompt=prompt, device=device, max_sequence_length=max_sequence_length
)
batch_size, seq_len, _ = prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
prompt_embeds = prompt_embeds.view(
batch_size * num_images_per_prompt, seq_len, -1
)
prompt_attention_mask = prompt_attention_mask.repeat(num_images_per_prompt, 1)
prompt_attention_mask = prompt_attention_mask.view(
batch_size * num_images_per_prompt, -1
)
# Get negative embeddings for classifier free guidance
if do_classifier_free_guidance and negative_prompt_embeds is None:
negative_prompt = negative_prompt if negative_prompt is not None else ""
# Normalize str to list
negative_prompt = (
batch_size * [negative_prompt]
if isinstance(negative_prompt, str)
else negative_prompt
)
negative_prompt = [
self._apply_chat_template(_negative_prompt)
for _negative_prompt in negative_prompt
]
if prompt is not None and type(prompt) is not type(negative_prompt):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
elif isinstance(negative_prompt, str):
negative_prompt = [negative_prompt]
elif batch_size != len(negative_prompt):
raise ValueError(
f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
negative_prompt_embeds, negative_prompt_attention_mask = (
self._get_qwen2_prompt_embeds(
prompt=negative_prompt,
device=device,
max_sequence_length=max_sequence_length,
)
)
batch_size, seq_len, _ = negative_prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
negative_prompt_embeds = negative_prompt_embeds.repeat(
1, num_images_per_prompt, 1
)
negative_prompt_embeds = negative_prompt_embeds.view(
batch_size * num_images_per_prompt, seq_len, -1
)
negative_prompt_attention_mask = negative_prompt_attention_mask.repeat(
num_images_per_prompt, 1
)
negative_prompt_attention_mask = negative_prompt_attention_mask.view(
batch_size * num_images_per_prompt, -1
)
return (
prompt_embeds,
prompt_attention_mask,
negative_prompt_embeds,
negative_prompt_attention_mask,
)
@property
def num_timesteps(self):
return self._num_timesteps
@property
def text_guidance_scale(self):
return self._text_guidance_scale
@property
def image_guidance_scale(self):
return self._image_guidance_scale
@property
def cfg_range(self):
return self._cfg_range
@torch.no_grad()
def __call__(
self,
prompt: Optional[Union[str, List[str]]] = None,
negative_prompt: Optional[Union[str, List[str]]] = None,
prompt_embeds: Optional[torch.FloatTensor] = None,
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
prompt_attention_mask: Optional[torch.LongTensor] = None,
negative_prompt_attention_mask: Optional[torch.LongTensor] = None,
max_sequence_length: Optional[int] = None,
callback_on_step_end_tensor_inputs: Optional[List[str]] = None,
input_images: Optional[List[PIL.Image.Image]] = None,
num_images_per_prompt: int = 1,
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: int = 1024 * 1024,
max_input_image_side_length: int = 1024,
align_res: bool = True,
num_inference_steps: int = 28,
text_guidance_scale: float = 4.0,
image_guidance_scale: float = 1.0,
cfg_range: Tuple[float, float] = (0.0, 1.0),
attention_kwargs: Optional[Dict[str, Any]] = None,
timesteps: List[int] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.FloatTensor] = None,
frame_count: int = 1,
output_type: Optional[str] = "pil",
return_dict: bool = True,
verbose: bool = False,
step_func=None,
get_latents_text_embeds=False,
):
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
self._text_guidance_scale = text_guidance_scale
self._image_guidance_scale = image_guidance_scale
self._cfg_range = cfg_range
self._attention_kwargs = attention_kwargs
# 2. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
device = self._execution_device
# 3. Encode input prompt
(
prompt_embeds,
prompt_attention_mask,
negative_prompt_embeds,
negative_prompt_attention_mask,
) = self.encode_prompt(
prompt,
self.text_guidance_scale > 1.0,
negative_prompt=negative_prompt,
num_images_per_prompt=num_images_per_prompt,
device=device,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
max_sequence_length=max_sequence_length,
)
dtype = self.vae.dtype
# 3. Prepare control image
ref_latents = self.prepare_image(
images=input_images,
batch_size=batch_size,
num_images_per_prompt=num_images_per_prompt,
max_pixels=max_pixels,
max_side_length=max_input_image_side_length,
device=device,
dtype=dtype,
)
if input_images is None:
input_images = []
if len(input_images) == 1 and align_res:
width, height = (
ref_latents[0][0].shape[-1] * self.vae_scale_factor,
ref_latents[0][0].shape[-2] * self.vae_scale_factor,
)
ori_width, ori_height = width, height
else:
ori_width, ori_height = width, height
cur_pixels = height * width
ratio = (max_pixels / cur_pixels) ** 0.5
ratio = min(ratio, 1.0)
height, width = (
int(height * ratio) // 16 * 16,
int(width * ratio) // 16 * 16,
)
if len(input_images) == 0:
self._image_guidance_scale = 1
# 4. Prepare latents.
latent_channels = self.transformer.config.in_channels
latents = self.prepare_latents(
batch_size * num_images_per_prompt,
latent_channels,
height,
width,
prompt_embeds.dtype,
device,
generator,
latents,
frame_count,
)
freqs_cis = OmniGen2RotaryPosEmbed.get_freqs_cis(
self.transformer.config.axes_dim_rope,
self.transformer.config.axes_lens,
theta=10000,
)
image = self.processing(
latents=latents,
ref_latents=ref_latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
num_inference_steps=num_inference_steps,
timesteps=timesteps,
device=device,
dtype=dtype,
verbose=verbose,
step_func=step_func,
get_latents_text_embeds=get_latents_text_embeds,
)
if get_latents_text_embeds:
return image, prompt_embeds
if len(image.shape) == 4:
image = F.interpolate(image, size=(ori_height, ori_width), mode="bilinear")
image = self.image_processor.postprocess(image, output_type=output_type)
else:
image = [
F.interpolate(
image[:, i], size=(ori_height, ori_width), mode="bilinear"
)
for i in range(image.shape[1])
]
image = [
self.image_processor.postprocess(x, output_type=output_type)
for x in image
]
image = torch.stack(image, dim=1)
# Offload all models
self.maybe_free_model_hooks()
if not return_dict:
return image
else:
return FMPipelineOutput(images=image)
def processing(
self,
latents,
ref_latents,
prompt_embeds,
freqs_cis,
negative_prompt_embeds,
prompt_attention_mask,
negative_prompt_attention_mask,
num_inference_steps,
timesteps,
device,
dtype,
verbose,
step_func=None,
get_latents_text_embeds=False,
):
batch_size = latents.shape[0]
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler,
num_inference_steps,
device,
timesteps,
num_tokens=latents.shape[-2] * latents.shape[-1],
)
num_warmup_steps = max(
len(timesteps) - num_inference_steps * self.scheduler.order, 0
)
self._num_timesteps = len(timesteps)
enable_taylorseer = getattr(self, "enable_taylorseer", False)
if enable_taylorseer:
model_pred_cache_dic, model_pred_current = cache_init(
self, num_inference_steps
)
model_pred_ref_cache_dic, model_pred_ref_current = cache_init(
self, num_inference_steps
)
model_pred_uncond_cache_dic, model_pred_uncond_current = cache_init(
self, num_inference_steps
)
self.transformer.enable_taylorseer = True
elif self.transformer.enable_teacache:
# Use different TeaCacheParams for different conditions
teacache_params = TeaCacheParams()
teacache_params_uncond = TeaCacheParams()
teacache_params_ref = TeaCacheParams()
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
if enable_taylorseer:
self.transformer.cache_dic = model_pred_cache_dic
self.transformer.current = model_pred_current
elif self.transformer.enable_teacache:
teacache_params.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params
model_pred = self.predict(
t=t,
latents=latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
text_guidance_scale = (
self.text_guidance_scale
if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1]
else 1.0
)
image_guidance_scale = (
self.image_guidance_scale
if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1]
else 1.0
)
if text_guidance_scale > 1.0 and image_guidance_scale > 1.0:
if enable_taylorseer:
self.transformer.cache_dic = model_pred_ref_cache_dic
self.transformer.current = model_pred_ref_current
elif self.transformer.enable_teacache:
teacache_params_ref.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params_ref
model_pred_ref = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
if enable_taylorseer:
self.transformer.cache_dic = model_pred_uncond_cache_dic
self.transformer.current = model_pred_uncond_current
elif self.transformer.enable_teacache:
teacache_params_uncond.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params_uncond
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
model_pred = (
model_pred_uncond
+ image_guidance_scale * (model_pred_ref - model_pred_uncond)
+ text_guidance_scale * (model_pred - model_pred_ref)
)
elif text_guidance_scale > 1.0:
if enable_taylorseer:
self.transformer.cache_dic = model_pred_uncond_cache_dic
self.transformer.current = model_pred_uncond_current
elif self.transformer.enable_teacache:
teacache_params_uncond.is_first_or_last_step = (
i == 0 or i == len(timesteps) - 1
)
self.transformer.teacache_params = teacache_params_uncond
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
model_pred = model_pred_uncond + text_guidance_scale * (
model_pred - model_pred_uncond
)
latents = self.scheduler.step(
model_pred, t, latents, return_dict=False
)[0]
latents = latents.to(dtype=dtype)
if i == len(timesteps) - 1 or (
(i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0
):
progress_bar.update()
if step_func is not None:
step_func(i, self._num_timesteps)
if enable_taylorseer:
del (
model_pred_cache_dic,
model_pred_ref_cache_dic,
model_pred_uncond_cache_dic,
)
del model_pred_current, model_pred_ref_current, model_pred_uncond_current
latents = latents.to(dtype=dtype)
if get_latents_text_embeds:
return latents
if self.vae.config.scaling_factor is not None:
latents = latents / self.vae.config.scaling_factor
if self.vae.config.shift_factor is not None:
latents = latents + self.vae.config.shift_factor
if len(latents.shape) == 4:
image = self.vae.decode(latents, return_dict=False)[0]
else:
image = [
self.vae.decode(latents[:, i], return_dict=False)[0]
for i in range(latents.shape[1])
]
image = torch.stack(image, dim=1)
return image
def predict(
self,
t,
latents,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
ref_image_hidden_states,
):
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML
timestep = t.expand(latents.shape[0]).to(latents.dtype)
if len(latents.shape) == 4:
batch_size, num_channels_latents, height, width = latents.shape
is_temporal = False
else:
batch_size, num_frames, num_channels_latents, height, width = latents.shape
latents = [_latents for _latents in latents]
is_temporal = True
optional_kwargs = {}
if "ref_image_hidden_states" in set(
inspect.signature(self.transformer.forward).parameters.keys()
):
optional_kwargs["ref_image_hidden_states"] = ref_image_hidden_states
model_pred = self.transformer(
latents,
timestep,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
**optional_kwargs,
)
if is_temporal:
model_pred = torch.stack(model_pred)
return model_pred