ideogram4 / diffusers_src /src /diffusers /models /autoencoders /autoencoder_kl_kvae.py
multimodalart's picture
multimodalart HF Staff
Embed diffusers PR source; install locally
b8c861f verified
raw
history blame
30.1 kB
# Copyright 2025 The Kandinsky Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Tuple, Union
import torch
import torch.nn as nn
import torch.nn.functional as F
from ...configuration_utils import ConfigMixin, register_to_config
from ...utils.accelerate_utils import apply_forward_hook
from ..activations import get_activation
from ..modeling_outputs import AutoencoderKLOutput
from ..modeling_utils import ModelMixin
from .vae import AutoencoderMixin, DecoderOutput, DiagonalGaussianDistribution
class KVAEResnetBlock2D(nn.Module):
r"""
A Resnet block with optional guidance.
Parameters:
in_channels (`int`): The number of channels in the input.
out_channels (`int`, *optional*, default to `None`):
The number of output channels for the first conv2d layer. If None, same as `in_channels`.
conv_shortcut (`bool`, *optional*, default to `False`):
If `True` and `in_channels` not equal to `out_channels`, add a 3x3 nn.conv2d layer for skip-connection.
temb_channels (`int`, *optional*, default to `512`): The number of channels in timestep embedding.
zq_ch (`int`, *optional*, default to `None`): Guidance channels for normalization.
add_conv (`bool`, *optional*, default to `False`):
If `True` add conv2d layer for normalization.
normalization (`nn.Module`, *optional*, default to `None`): The normalization layer.
act_fn (`str`, *optional*, default to `"swish"`): The activation function to use.
"""
def __init__(
self,
*,
in_channels: int,
out_channels: Optional[int] = None,
conv_shortcut: bool = False,
temb_channels: int = 512,
zq_ch: Optional[int] = None,
add_conv: bool = False,
act_fn: str = "swish",
):
super().__init__()
self.in_channels = in_channels
out_channels = in_channels if out_channels is None else out_channels
self.out_channels = out_channels
self.use_conv_shortcut = conv_shortcut
self.nonlinearity = get_activation(act_fn)
if zq_ch is None:
self.norm1 = nn.GroupNorm(num_channels=in_channels, num_groups=32, eps=1e-6, affine=True)
else:
self.norm1 = KVAEDecoderSpatialNorm2D(in_channels, zq_channels=zq_ch, add_conv=add_conv)
self.conv1 = nn.Conv2d(
in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=(1, 1), padding_mode="replicate"
)
if temb_channels > 0:
self.temb_proj = torch.nn.Linear(temb_channels, out_channels)
if zq_ch is None:
self.norm2 = nn.GroupNorm(num_channels=out_channels, num_groups=32, eps=1e-6, affine=True)
else:
self.norm2 = KVAEDecoderSpatialNorm2D(out_channels, zq_channels=zq_ch, add_conv=add_conv)
self.conv2 = nn.Conv2d(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
padding=(1, 1),
padding_mode="replicate",
)
if self.in_channels != self.out_channels:
if self.use_conv_shortcut:
self.conv_shortcut = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
padding=(1, 1),
padding_mode="replicate",
)
else:
self.nin_shortcut = nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=1,
padding=0,
)
def forward(self, x: torch.Tensor, temb: torch.Tensor, zq: torch.Tensor = None) -> torch.Tensor:
h = x
if zq is None:
h = self.norm1(h)
else:
h = self.norm1(h, zq)
h = self.nonlinearity(h)
h = self.conv1(h)
if temb is not None:
h = h + self.temb_proj(self.nonlinearity(temb))[:, :, None, None, None]
if zq is None:
h = self.norm2(h)
else:
h = self.norm2(h, zq)
h = self.nonlinearity(h)
h = self.conv2(h)
if self.in_channels != self.out_channels:
if self.use_conv_shortcut:
x = self.conv_shortcut(x)
else:
x = self.nin_shortcut(x)
return x + h
class KVAEPXSDownsample(nn.Module):
def __init__(self, in_channels: int, factor: int = 2):
r"""
A Downsampling module.
Args:
in_channels (`int`): The number of channels in the input.
factor (`int`, *optional*, default to `2`): The downsampling factor.
"""
super().__init__()
self.factor = factor
self.unshuffle = nn.PixelUnshuffle(self.factor)
self.spatial_conv = nn.Conv2d(
in_channels, in_channels, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), padding_mode="reflect"
)
self.linear = nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: (bchw)
pxs_interm = self.unshuffle(x)
b, c, h, w = pxs_interm.shape
pxs_interm_view = pxs_interm.view(b, c // self.factor**2, self.factor**2, h, w)
pxs_out = torch.mean(pxs_interm_view, dim=2)
conv_out = self.spatial_conv(x)
# adding it all together
out = conv_out + pxs_out
return self.linear(out)
class KVAEPXSUpsample(nn.Module):
def __init__(self, in_channels: int, factor: int = 2):
r"""
An Upsampling module.
Args:
in_channels (`int`): The number of channels in the input.
factor (`int`, *optional*, default to `2`): The upsampling factor.
"""
super().__init__()
self.factor = factor
self.shuffle = nn.PixelShuffle(self.factor)
self.spatial_conv = nn.Conv2d(
in_channels, in_channels, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), padding_mode="reflect"
)
self.linear = nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1)
def forward(self, x: torch.Tensor) -> torch.Tensor:
repeated = x.repeat_interleave(self.factor**2, dim=1)
pxs_interm = self.shuffle(repeated)
image_like_ups = F.interpolate(x, scale_factor=2, mode="nearest")
conv_out = self.spatial_conv(image_like_ups)
# adding it all together
out = conv_out + pxs_interm
return self.linear(out)
class KVAEDecoderSpatialNorm2D(nn.Module):
r"""
A 2D normalization module for decoder.
Args:
in_channels (`int`): The number of channels in the input.
zq_channels (`int`): The number of channels in the guidance.
add_conv (`bool`, *optional*, default to `false`):
If `True` add conv2d 3x3 layer for guidance in the beginning.
"""
def __init__(
self,
in_channels: int,
zq_channels: int,
add_conv: bool = False,
):
super().__init__()
self.norm_layer = nn.GroupNorm(num_channels=in_channels, num_groups=32, eps=1e-6, affine=True)
self.add_conv = add_conv
if add_conv:
self.conv = nn.Conv2d(
in_channels=zq_channels,
out_channels=zq_channels,
kernel_size=3,
padding=(1, 1),
padding_mode="replicate",
)
self.conv_y = nn.Conv2d(
in_channels=zq_channels,
out_channels=in_channels,
kernel_size=1,
)
self.conv_b = nn.Conv2d(
in_channels=zq_channels,
out_channels=in_channels,
kernel_size=1,
)
def forward(self, f: torch.Tensor, zq: torch.Tensor) -> torch.Tensor:
f_first = f
f_first_size = f_first.shape[2:]
zq = F.interpolate(zq, size=f_first_size, mode="nearest")
if self.add_conv:
zq = self.conv(zq)
norm_f = self.norm_layer(f)
new_f = norm_f * self.conv_y(zq) + self.conv_b(zq)
return new_f
class KVAEEncoder2D(nn.Module):
r"""
A 2D encoder module.
Args:
ch (`int`): The base number of channels in multiresolution blocks.
ch_mult (`Tuple[int, ...]`, *optional*, default to `(1, 2, 4, 8)`):
The channel multipliers in multiresolution blocks.
num_res_blocks (`int`): The number of Resnet blocks.
in_channels (`int`): The number of channels in the input.
z_channels (`int`): The number of output channels.
double_z (`bool`, *optional*, defaults to `True`):
Whether to double the number of output channels for the last block.
act_fn (`str`, *optional*, default to `"swish"`): The activation function to use.
"""
def __init__(
self,
*,
ch: int,
ch_mult: Tuple[int, ...] = (1, 2, 4, 8),
num_res_blocks: int,
in_channels: int,
z_channels: int,
double_z: bool = True,
act_fn: str = "swish",
):
super().__init__()
self.ch = ch
self.temb_ch = 0
self.num_resolutions = len(ch_mult)
if isinstance(num_res_blocks, int):
self.num_res_blocks = [num_res_blocks] * self.num_resolutions
else:
self.num_res_blocks = num_res_blocks
self.nonlinearity = get_activation(act_fn)
self.in_channels = in_channels
self.conv_in = nn.Conv2d(
in_channels=in_channels,
out_channels=self.ch,
kernel_size=3,
padding=(1, 1),
)
in_ch_mult = (1,) + tuple(ch_mult)
self.down = nn.ModuleList()
for i_level in range(self.num_resolutions):
block = nn.ModuleList()
attn = nn.ModuleList()
block_in = ch * in_ch_mult[i_level]
block_out = ch * ch_mult[i_level]
for i_block in range(self.num_res_blocks[i_level]):
block.append(
KVAEResnetBlock2D(
in_channels=block_in,
out_channels=block_out,
temb_channels=self.temb_ch,
)
)
block_in = block_out
down = nn.Module()
down.block = block
down.attn = attn
if i_level < self.num_resolutions - 1:
down.downsample = KVAEPXSDownsample(in_channels=block_in) # mb: bad out channels
self.down.append(down)
# middle
self.mid = nn.Module()
self.mid.block_1 = KVAEResnetBlock2D(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
)
self.mid.block_2 = KVAEResnetBlock2D(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
)
# end
self.norm_out = nn.GroupNorm(num_channels=block_in, num_groups=32, eps=1e-6, affine=True)
self.conv_out = nn.Conv2d(
in_channels=block_in,
out_channels=2 * z_channels if double_z else z_channels,
kernel_size=3,
padding=(1, 1),
)
self.gradient_checkpointing = False
def forward(self, x: torch.Tensor) -> torch.Tensor:
# timestep embedding
temb = None
# downsampling
h = self.conv_in(x)
for i_level in range(self.num_resolutions):
for i_block in range(self.num_res_blocks[i_level]):
if torch.is_grad_enabled() and self.gradient_checkpointing:
h = self._gradient_checkpointing_func(self.down[i_level].block[i_block], h, temb)
else:
h = self.down[i_level].block[i_block](h, temb)
if len(self.down[i_level].attn) > 0:
h = self.down[i_level].attn[i_block](h)
if i_level != self.num_resolutions - 1:
h = self.down[i_level].downsample(h)
# middle
if torch.is_grad_enabled() and self.gradient_checkpointing:
h = self._gradient_checkpointing_func(self.mid.block_1, h, temb)
h = self._gradient_checkpointing_func(self.mid.block_2, h, temb)
else:
h = self.mid.block_1(h, temb)
h = self.mid.block_2(h, temb)
# end
h = self.norm_out(h)
h = self.nonlinearity(h)
h = self.conv_out(h)
return h
class KVAEDecoder2D(nn.Module):
r"""
A 2D decoder module.
Args:
ch (`int`): The base number of channels in multiresolution blocks.
out_ch (`int`): The number of output channels.
ch_mult (`Tuple[int, ...]`, *optional*, default to `(1, 2, 4, 8)`):
The channel multipliers in multiresolution blocks.
num_res_blocks (`int`): The number of Resnet blocks.
in_channels (`int`): The number of channels in the input.
z_channels (`int`): The number of input channels.
give_pre_end (`bool`, *optional*, default to `false`):
If `True` exit the forward pass early and return the penultimate feature map.
zq_ch (`bool`, *optional*, default to `None`): The number of channels in the guidance.
add_conv (`bool`, *optional*, default to `false`): If `True` add conv2d layer for Resnet normalization layer.
act_fn (`str`, *optional*, default to `"swish"`): The activation function to use.
"""
def __init__(
self,
*,
ch: int,
out_ch: int,
ch_mult: Tuple[int, ...] = (1, 2, 4, 8),
num_res_blocks: int,
in_channels: int,
z_channels: int,
give_pre_end: bool = False,
zq_ch: Optional[int] = None,
add_conv: bool = False,
act_fn: str = "swish",
):
super().__init__()
self.ch = ch
self.temb_ch = 0
self.num_resolutions = len(ch_mult)
self.num_res_blocks = num_res_blocks
self.in_channels = in_channels
self.give_pre_end = give_pre_end
self.nonlinearity = get_activation(act_fn)
if zq_ch is None:
zq_ch = z_channels
# compute in_ch_mult, block_in and curr_res at lowest res
block_in = ch * ch_mult[self.num_resolutions - 1]
self.conv_in = nn.Conv2d(
in_channels=z_channels, out_channels=block_in, kernel_size=3, padding=(1, 1), padding_mode="replicate"
)
# middle
self.mid = nn.Module()
self.mid.block_1 = KVAEResnetBlock2D(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
zq_ch=zq_ch,
add_conv=add_conv,
)
self.mid.block_2 = KVAEResnetBlock2D(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
zq_ch=zq_ch,
add_conv=add_conv,
)
# upsampling
self.up = nn.ModuleList()
for i_level in reversed(range(self.num_resolutions)):
block = nn.ModuleList()
attn = nn.ModuleList()
block_out = ch * ch_mult[i_level]
for i_block in range(self.num_res_blocks + 1):
block.append(
KVAEResnetBlock2D(
in_channels=block_in,
out_channels=block_out,
temb_channels=self.temb_ch,
zq_ch=zq_ch,
add_conv=add_conv,
)
)
block_in = block_out
up = nn.Module()
up.block = block
up.attn = attn
if i_level != 0:
up.upsample = KVAEPXSUpsample(in_channels=block_in)
self.up.insert(0, up)
self.norm_out = KVAEDecoderSpatialNorm2D(block_in, zq_ch, add_conv=add_conv) # , gather=gather_norm)
self.conv_out = nn.Conv2d(
in_channels=block_in, out_channels=out_ch, kernel_size=3, padding=(1, 1), padding_mode="replicate"
)
self.gradient_checkpointing = False
def forward(self, z: torch.Tensor) -> torch.Tensor:
self.last_z_shape = z.shape
# timestep embedding
temb = None
# z to block_in
zq = z
h = self.conv_in(z)
# middle
if torch.is_grad_enabled() and self.gradient_checkpointing:
h = self._gradient_checkpointing_func(self.mid.block_1, h, temb, zq)
h = self._gradient_checkpointing_func(self.mid.block_2, h, temb, zq)
else:
h = self.mid.block_1(h, temb, zq)
h = self.mid.block_2(h, temb, zq)
# upsampling
for i_level in reversed(range(self.num_resolutions)):
for i_block in range(self.num_res_blocks + 1):
if torch.is_grad_enabled() and self.gradient_checkpointing:
h = self._gradient_checkpointing_func(self.up[i_level].block[i_block], h, temb, zq)
else:
h = self.up[i_level].block[i_block](h, temb, zq)
if len(self.up[i_level].attn) > 0:
h = self.up[i_level].attn[i_block](h, zq)
if i_level != 0:
h = self.up[i_level].upsample(h)
# end
if self.give_pre_end:
return h
h = self.norm_out(h, zq)
h = self.nonlinearity(h)
h = self.conv_out(h)
return h
class AutoencoderKLKVAE(ModelMixin, AutoencoderMixin, ConfigMixin):
r"""
A VAE model with KL loss for encoding images into latents and decoding latent representations into images.
This model inherits from [`ModelMixin`]. Check the superclass documentation for its generic methods implemented for
all models (such as downloading or saving).
Parameters:
in_channels (int, *optional*, defaults to 3): Number of channels in the input image.
channels (int, *optional*, defaults to 128): The base number of channels in multiresolution blocks.
num_enc_blocks (int, *optional*, defaults to 2):
The number of Resnet blocks in encoder multiresolution layers.
num_dec_blocks (int, *optional*, defaults to 2):
The number of Resnet blocks in decoder multiresolution layers.
z_channels (int, *optional*, defaults to 16): Number of channels in the latent space.
double_z (`bool`, *optional*, defaults to `True`):
Whether to double the number of output channels of encoder.
ch_mult (`Tuple[int, ...]`, *optional*, default to `(1, 2, 4, 8)`):
The channel multipliers in multiresolution blocks.
sample_size (`int`, *optional*, defaults to `1024`): Sample input size.
"""
_supports_gradient_checkpointing = True
@register_to_config
def __init__(
self,
in_channels: int = 3,
channels: int = 128,
num_enc_blocks: int = 2,
num_dec_blocks: int = 2,
z_channels: int = 16,
double_z: bool = True,
ch_mult: Tuple[int, ...] = (1, 2, 4, 8),
sample_size: int = 1024,
):
super().__init__()
# pass init params to Encoder
self.encoder = KVAEEncoder2D(
in_channels=in_channels,
ch=channels,
ch_mult=ch_mult,
num_res_blocks=num_enc_blocks,
z_channels=z_channels,
double_z=double_z,
)
# pass init params to Decoder
self.decoder = KVAEDecoder2D(
out_ch=in_channels,
ch=channels,
ch_mult=ch_mult,
num_res_blocks=num_dec_blocks,
in_channels=None,
z_channels=z_channels,
)
self.use_slicing = False
self.use_tiling = False
# only relevant if vae tiling is enabled
self.tile_sample_min_size = self.config.sample_size
sample_size = (
self.config.sample_size[0]
if isinstance(self.config.sample_size, (list, tuple))
else self.config.sample_size
)
self.tile_latent_min_size = int(sample_size / (2 ** (len(self.config.ch_mult) - 1)))
self.tile_overlap_factor = 0.25
def _encode(self, x: torch.Tensor) -> torch.Tensor:
batch_size, num_channels, height, width = x.shape
if self.use_tiling and (width > self.tile_sample_min_size or height > self.tile_sample_min_size):
return self._tiled_encode(x)
enc = self.encoder(x)
return enc
@apply_forward_hook
def encode(
self, x: torch.Tensor, return_dict: bool = True
) -> Union[AutoencoderKLOutput, Tuple[DiagonalGaussianDistribution]]:
"""
Encode a batch of images into latents.
Args:
x (`torch.Tensor`): Input batch of images.
return_dict (`bool`, *optional*, defaults to `True`):
Whether to return a [`~models.autoencoder_kl.AutoencoderKLOutput`] instead of a plain tuple.
Returns:
The latent representations of the encoded images. If `return_dict` is True, a
[`~models.autoencoder_kl.AutoencoderKLOutput`] is returned, otherwise a plain `tuple` is returned.
"""
if self.use_slicing and x.shape[0] > 1:
encoded_slices = [self._encode(x_slice) for x_slice in x.split(1)]
h = torch.cat(encoded_slices)
else:
h = self._encode(x)
posterior = DiagonalGaussianDistribution(h)
if not return_dict:
return (posterior,)
return AutoencoderKLOutput(latent_dist=posterior)
def _decode(self, z: torch.Tensor, return_dict: bool = True) -> Union[DecoderOutput, torch.Tensor]:
if self.use_tiling and (z.shape[-1] > self.tile_latent_min_size or z.shape[-2] > self.tile_latent_min_size):
return self.tiled_decode(z, return_dict=return_dict)
dec = self.decoder(z)
if not return_dict:
return (dec,)
return DecoderOutput(sample=dec)
@apply_forward_hook
def decode(
self, z: torch.FloatTensor, return_dict: bool = True, generator=None
) -> Union[DecoderOutput, torch.FloatTensor]:
"""
Decode a batch of images.
Args:
z (`torch.Tensor`): Input batch of latent vectors.
return_dict (`bool`, *optional*, defaults to `True`):
Whether to return a [`~models.vae.DecoderOutput`] instead of a plain tuple.
Returns:
[`~models.vae.DecoderOutput`] or `tuple`:
If return_dict is True, a [`~models.vae.DecoderOutput`] is returned, otherwise a plain `tuple` is
returned.
"""
if self.use_slicing and z.shape[0] > 1:
decoded_slices = [self._decode(z_slice).sample for z_slice in z.split(1)]
decoded = torch.cat(decoded_slices)
else:
decoded = self._decode(z).sample
if not return_dict:
return (decoded,)
return DecoderOutput(sample=decoded)
def blend_v(self, a: torch.Tensor, b: torch.Tensor, blend_extent: int) -> torch.Tensor:
blend_extent = min(a.shape[2], b.shape[2], blend_extent)
for y in range(blend_extent):
b[:, :, y, :] = a[:, :, -blend_extent + y, :] * (1 - y / blend_extent) + b[:, :, y, :] * (y / blend_extent)
return b
def blend_h(self, a: torch.Tensor, b: torch.Tensor, blend_extent: int) -> torch.Tensor:
blend_extent = min(a.shape[3], b.shape[3], blend_extent)
for x in range(blend_extent):
b[:, :, :, x] = a[:, :, :, -blend_extent + x] * (1 - x / blend_extent) + b[:, :, :, x] * (x / blend_extent)
return b
def _tiled_encode(self, x: torch.Tensor) -> torch.Tensor:
r"""Encode a batch of images using a tiled encoder.
When this option is enabled, the VAE will split the input tensor into tiles to compute encoding in several
steps. This is useful to keep memory use constant regardless of image size. The end result of tiled encoding is
different from non-tiled encoding because each tile uses a different encoder. To avoid tiling artifacts, the
tiles overlap and are blended together to form a smooth output. You may still see tile-sized changes in the
output, but they should be much less noticeable.
Args:
x (`torch.Tensor`): Input batch of images.
Returns:
`torch.Tensor`:
The latent representation of the encoded videos.
"""
overlap_size = int(self.tile_sample_min_size * (1 - self.tile_overlap_factor))
blend_extent = int(self.tile_latent_min_size * self.tile_overlap_factor)
row_limit = self.tile_latent_min_size - blend_extent
# Split the image into 512x512 tiles and encode them separately.
rows = []
for i in range(0, x.shape[2], overlap_size):
row = []
for j in range(0, x.shape[3], overlap_size):
tile = x[:, :, i : i + self.tile_sample_min_size, j : j + self.tile_sample_min_size]
tile = self.encoder(tile)
row.append(tile)
rows.append(row)
result_rows = []
for i, row in enumerate(rows):
result_row = []
for j, tile in enumerate(row):
# blend the above tile and the left tile
# to the current tile and add the current tile to the result row
if i > 0:
tile = self.blend_v(rows[i - 1][j], tile, blend_extent)
if j > 0:
tile = self.blend_h(row[j - 1], tile, blend_extent)
result_row.append(tile[:, :, :row_limit, :row_limit])
result_rows.append(torch.cat(result_row, dim=3))
enc = torch.cat(result_rows, dim=2)
return enc
def tiled_decode(self, z: torch.Tensor, return_dict: bool = True) -> Union[DecoderOutput, torch.Tensor]:
r"""
Decode a batch of images using a tiled decoder.
Args:
z (`torch.Tensor`): Input batch of latent vectors.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`~models.vae.DecoderOutput`] instead of a plain tuple.
Returns:
[`~models.vae.DecoderOutput`] or `tuple`:
If return_dict is True, a [`~models.vae.DecoderOutput`] is returned, otherwise a plain `tuple` is
returned.
"""
overlap_size = int(self.tile_latent_min_size * (1 - self.tile_overlap_factor))
blend_extent = int(self.tile_sample_min_size * self.tile_overlap_factor)
row_limit = self.tile_sample_min_size - blend_extent
# Split z into overlapping 64x64 tiles and decode them separately.
# The tiles have an overlap to avoid seams between tiles.
rows = []
for i in range(0, z.shape[2], overlap_size):
row = []
for j in range(0, z.shape[3], overlap_size):
tile = z[:, :, i : i + self.tile_latent_min_size, j : j + self.tile_latent_min_size]
decoded = self.decoder(tile)
row.append(decoded)
rows.append(row)
result_rows = []
for i, row in enumerate(rows):
result_row = []
for j, tile in enumerate(row):
# blend the above tile and the left tile
# to the current tile and add the current tile to the result row
if i > 0:
tile = self.blend_v(rows[i - 1][j], tile, blend_extent)
if j > 0:
tile = self.blend_h(row[j - 1], tile, blend_extent)
result_row.append(tile[:, :, :row_limit, :row_limit])
result_rows.append(torch.cat(result_row, dim=3))
dec = torch.cat(result_rows, dim=2)
if not return_dict:
return (dec,)
return DecoderOutput(sample=dec)
def forward(
self,
sample: torch.Tensor,
sample_posterior: bool = False,
return_dict: bool = True,
generator: Optional[torch.Generator] = None,
) -> Union[DecoderOutput, torch.Tensor]:
r"""
Args:
sample (`torch.Tensor`): Input sample.
sample_posterior (`bool`, *optional*, defaults to `False`):
Whether to sample from the posterior.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`DecoderOutput`] instead of a plain tuple.
generator (`torch.Generator`, *optional*):
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make sampling
deterministic.
Returns:
[`~models.vae.DecoderOutput`] or `tuple`:
If `return_dict` is True, a [`~models.vae.DecoderOutput`] is returned, otherwise a plain `tuple` is
returned.
"""
x = sample
posterior = self.encode(x).latent_dist
if sample_posterior:
z = posterior.sample(generator=generator)
else:
z = posterior.mode()
dec = self.decode(z).sample
if not return_dict:
return (dec,)
return DecoderOutput(sample=dec)