|
|
|
|
|
import math |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import numpy as np |
|
|
from einops import rearrange |
|
|
import torch.nn.functional as F |
|
|
|
|
|
|
|
|
def get_timestep_embedding(timesteps, embedding_dim): |
|
|
""" |
|
|
This matches the implementation in Denoising Diffusion Probabilistic Models: |
|
|
From Fairseq. |
|
|
Build sinusoidal embeddings. |
|
|
This matches the implementation in tensor2tensor, but differs slightly |
|
|
from the description in Section 3.5 of "Attention Is All You Need". |
|
|
""" |
|
|
assert len(timesteps.shape) == 1 |
|
|
|
|
|
half_dim = embedding_dim // 2 |
|
|
emb = math.log(10000) / (half_dim - 1) |
|
|
emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb) |
|
|
emb = emb.to(device=timesteps.device) |
|
|
emb = timesteps.float()[:, None] * emb[None, :] |
|
|
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1) |
|
|
if embedding_dim % 2 == 1: |
|
|
emb = torch.nn.functional.pad(emb, (0,1,0,0)) |
|
|
return emb |
|
|
|
|
|
|
|
|
def nonlinearity(x): |
|
|
|
|
|
return x*torch.sigmoid(x) |
|
|
|
|
|
|
|
|
def Normalize(in_channels): |
|
|
return torch.nn.GroupNorm(num_groups=32, num_channels=in_channels, eps=1e-6, affine=True) |
|
|
|
|
|
|
|
|
class Upsample(nn.Module): |
|
|
def __init__(self, in_channels, with_conv): |
|
|
super().__init__() |
|
|
self.with_conv = with_conv |
|
|
if self.with_conv: |
|
|
self.conv = torch.nn.Conv2d(in_channels, |
|
|
in_channels, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1) |
|
|
|
|
|
def forward(self, x): |
|
|
x = torch.nn.functional.interpolate(x, scale_factor=2.0, mode="nearest") |
|
|
if self.with_conv: |
|
|
x = self.conv(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class Downsample(nn.Module): |
|
|
def __init__(self, in_channels, with_conv): |
|
|
super().__init__() |
|
|
self.with_conv = with_conv |
|
|
if self.with_conv: |
|
|
|
|
|
self.conv = torch.nn.Conv2d(in_channels, |
|
|
in_channels, |
|
|
kernel_size=3, |
|
|
stride=2, |
|
|
padding=0) |
|
|
|
|
|
def forward(self, x): |
|
|
if self.with_conv: |
|
|
pad = (0,1,0,1) |
|
|
x = torch.nn.functional.pad(x, pad, mode="constant", value=0) |
|
|
x = self.conv(x) |
|
|
else: |
|
|
x = torch.nn.functional.avg_pool2d(x, kernel_size=2, stride=2) |
|
|
return x |
|
|
|
|
|
|
|
|
class ResnetBlock(nn.Module): |
|
|
def __init__(self, *, in_channels, out_channels=None, conv_shortcut=False, |
|
|
dropout, temb_channels=512): |
|
|
super().__init__() |
|
|
self.in_channels = in_channels |
|
|
out_channels = in_channels if out_channels is None else out_channels |
|
|
self.out_channels = out_channels |
|
|
self.use_conv_shortcut = conv_shortcut |
|
|
|
|
|
self.norm1 = Normalize(in_channels) |
|
|
self.conv1 = torch.nn.Conv2d(in_channels, |
|
|
out_channels, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1, |
|
|
bias=False) |
|
|
if temb_channels > 0: |
|
|
self.temb_proj = torch.nn.Linear(temb_channels, |
|
|
out_channels) |
|
|
self.norm2 = Normalize(out_channels) |
|
|
self.dropout = torch.nn.Dropout(dropout) |
|
|
self.conv2 = torch.nn.Conv2d(out_channels, |
|
|
out_channels, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1, |
|
|
bias=False) |
|
|
if self.in_channels != self.out_channels: |
|
|
if self.use_conv_shortcut: |
|
|
self.conv_shortcut = torch.nn.Conv2d(out_channels, |
|
|
out_channels, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1, |
|
|
bias=False) |
|
|
else: |
|
|
self.nin_shortcut = torch.nn.Conv2d(out_channels, |
|
|
out_channels, |
|
|
kernel_size=1, |
|
|
stride=1, |
|
|
padding=0, |
|
|
bias=False) |
|
|
|
|
|
def forward(self, x, temb): |
|
|
h = x |
|
|
h = self.norm1(h) |
|
|
h = nonlinearity(h) |
|
|
h = self.conv1(h) |
|
|
|
|
|
if temb is not None: |
|
|
h = h + self.temb_proj(nonlinearity(temb))[:,:,None,None] |
|
|
|
|
|
h = self.norm2(h) |
|
|
h = nonlinearity(h) |
|
|
h = self.dropout(h) |
|
|
h = self.conv2(h) |
|
|
|
|
|
if self.in_channels != self.out_channels: |
|
|
if self.use_conv_shortcut: |
|
|
x = self.conv_shortcut(h) |
|
|
else: |
|
|
x = self.nin_shortcut(h) |
|
|
|
|
|
return x+h |
|
|
|
|
|
|
|
|
class Encoder(nn.Module): |
|
|
def __init__(self, *, ch, out_ch, ch_mult=(1,2,4,8), num_res_blocks, |
|
|
attn_resolutions, dropout=0.0, resamp_with_conv=False, in_channels, |
|
|
resolution, z_channels, double_z=True, **ignore_kwargs): |
|
|
super().__init__() |
|
|
self.ch = ch |
|
|
self.temb_ch = 0 |
|
|
self.num_resolutions = len(ch_mult) |
|
|
self.num_res_blocks = num_res_blocks |
|
|
self.resolution = resolution |
|
|
self.in_channels = in_channels |
|
|
|
|
|
|
|
|
self.conv_in = torch.nn.Conv2d(in_channels, |
|
|
self.ch, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1, |
|
|
bias=False) |
|
|
|
|
|
curr_res = resolution |
|
|
in_ch_mult = (1,)+tuple(ch_mult) |
|
|
self.down = nn.ModuleList() |
|
|
for i_level in range(self.num_resolutions): |
|
|
block = nn.ModuleList() |
|
|
block_in = ch*in_ch_mult[i_level] |
|
|
block_out = ch*ch_mult[i_level] |
|
|
for i_block in range(self.num_res_blocks): |
|
|
block.append(ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_out, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout)) |
|
|
block_in = block_out |
|
|
down = nn.Module() |
|
|
down.block = block |
|
|
if i_level != self.num_resolutions-1: |
|
|
down.downsample = Downsample(block_in, resamp_with_conv) |
|
|
curr_res = curr_res // 2 |
|
|
self.down.append(down) |
|
|
|
|
|
|
|
|
self.mid = nn.Module() |
|
|
self.mid.block_1 = ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_in, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout) |
|
|
self.mid.block_2 = ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_in, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout) |
|
|
|
|
|
|
|
|
self.norm_out = Normalize(block_in) |
|
|
self.conv_out = torch.nn.Conv2d(block_in, |
|
|
2*z_channels if double_z else z_channels, |
|
|
kernel_size=1, |
|
|
stride=1, |
|
|
padding=0) |
|
|
|
|
|
|
|
|
def forward(self, x): |
|
|
|
|
|
|
|
|
|
|
|
temb = None |
|
|
|
|
|
|
|
|
hs = [self.conv_in(x)] |
|
|
for i_level in range(self.num_resolutions): |
|
|
for i_block in range(self.num_res_blocks): |
|
|
h = self.down[i_level].block[i_block](hs[-1], temb) |
|
|
hs.append(h) |
|
|
if i_level != self.num_resolutions-1: |
|
|
hs.append(self.down[i_level].downsample(hs[-1])) |
|
|
|
|
|
|
|
|
h = hs[-1] |
|
|
h = self.mid.block_1(h, temb) |
|
|
h = self.mid.block_2(h, temb) |
|
|
|
|
|
|
|
|
h = self.norm_out(h) |
|
|
h = nonlinearity(h) |
|
|
h = self.conv_out(h) |
|
|
return h |
|
|
|
|
|
|
|
|
class Decoder(nn.Module): |
|
|
def __init__(self, *, ch, out_ch, ch_mult=(1,2,4,8), num_res_blocks, |
|
|
attn_resolutions, dropout=0.0, resamp_with_conv=True, in_channels, |
|
|
resolution, z_channels, give_pre_end=False, **ignorekwargs): |
|
|
super().__init__() |
|
|
self.ch = ch |
|
|
self.temb_ch = 0 |
|
|
self.num_resolutions = len(ch_mult) |
|
|
self.num_res_blocks = num_res_blocks |
|
|
self.resolution = resolution |
|
|
self.in_channels = in_channels |
|
|
self.give_pre_end = give_pre_end |
|
|
|
|
|
|
|
|
in_ch_mult = (1,)+tuple(ch_mult) |
|
|
block_in = ch*ch_mult[self.num_resolutions-1] |
|
|
curr_res = resolution // 2**(self.num_resolutions-1) |
|
|
self.z_shape = (1,z_channels,curr_res,curr_res) |
|
|
print("Working with z of shape {} = {} dimensions.".format( |
|
|
self.z_shape, np.prod(self.z_shape))) |
|
|
|
|
|
|
|
|
self.conv_in = torch.nn.Conv2d(z_channels, |
|
|
block_in, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1) |
|
|
|
|
|
|
|
|
self.mid = nn.Module() |
|
|
self.mid.block_1 = ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_in, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout) |
|
|
self.mid.block_2 = ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_in, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout) |
|
|
|
|
|
|
|
|
self.up = nn.ModuleList() |
|
|
for i_level in reversed(range(self.num_resolutions)): |
|
|
block = nn.ModuleList() |
|
|
block_out = ch*ch_mult[i_level] |
|
|
for i_block in range(self.num_res_blocks): |
|
|
block.append(ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_out, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout)) |
|
|
|
|
|
block_in = block_out |
|
|
up = nn.Module() |
|
|
up.block = block |
|
|
if i_level != 0: |
|
|
up.upsample = Upsample(block_in, resamp_with_conv) |
|
|
curr_res = curr_res * 2 |
|
|
self.up.insert(0, up) |
|
|
|
|
|
|
|
|
self.norm_out = Normalize(block_in) |
|
|
self.conv_out = torch.nn.Conv2d(block_in, |
|
|
out_ch, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1) |
|
|
|
|
|
def forward(self, z): |
|
|
|
|
|
self.last_z_shape = z.shape |
|
|
|
|
|
|
|
|
temb = None |
|
|
|
|
|
|
|
|
h = self.conv_in(z) |
|
|
|
|
|
|
|
|
h = self.mid.block_1(h, temb) |
|
|
h = self.mid.block_2(h, temb) |
|
|
|
|
|
|
|
|
for i_level in reversed(range(self.num_resolutions)): |
|
|
for i_block in range(self.num_res_blocks): |
|
|
h = self.up[i_level].block[i_block](h, temb) |
|
|
if i_level != 0: |
|
|
h = self.up[i_level].upsample(h) |
|
|
|
|
|
|
|
|
if self.give_pre_end: |
|
|
return h |
|
|
|
|
|
h = self.norm_out(h) |
|
|
h = nonlinearity(h) |
|
|
h = self.conv_out(h) |
|
|
return h |
|
|
|
|
|
def init_weights_zero(m): |
|
|
if isinstance(m, nn.Conv2d): |
|
|
nn.init.constant_(m.weight, 0) |
|
|
if m.bias is not None: |
|
|
nn.init.constant_(m.bias, 0) |
|
|
|
|
|
def init_weights_kaiming(m): |
|
|
if isinstance(m, nn.Conv2d): |
|
|
nn.init.kaiming_normal_(m.weight) |
|
|
if m.bias is not None: |
|
|
nn.init.constant_(m.bias, 0) |
|
|
|
|
|
|
|
|
|
|
|
class PromptGenBlock(nn.Module): |
|
|
def __init__(self, prompt_dim=128, prompt_size=96, prompt_len=1): |
|
|
super(PromptGenBlock,self).__init__() |
|
|
self.prompt_param = nn.Parameter(torch.rand(1, prompt_len, prompt_dim, prompt_size, prompt_size)) |
|
|
self.conv3x3 = nn.Conv2d(prompt_dim, prompt_dim, kernel_size=3, stride=1, padding=1, bias=False) |
|
|
|
|
|
def init_weights_zero(self, m): |
|
|
if isinstance(m, nn.Conv2d): |
|
|
nn.init.constant_(m.weight, 0) |
|
|
if m.bias is not None: |
|
|
nn.init.constant_(m.bias, 0) |
|
|
|
|
|
def forward(self, x): |
|
|
B, C, H, W = x.shape |
|
|
prompt = self.prompt_param.unsqueeze(0).repeat(B,1,1,1,1,1).squeeze(1) |
|
|
prompt = torch.sum(prompt, dim=1) |
|
|
prompt = F.interpolate(prompt, (H, W), mode="bilinear") |
|
|
prompt = self.conv3x3(prompt) |
|
|
return prompt |
|
|
|
|
|
class Attention(nn.Module): |
|
|
def __init__(self, dim, hidden_dim, bias, prompt_dim=192): |
|
|
super(Attention, self).__init__() |
|
|
self.shared_mlp = nn.Sequential( |
|
|
nn.Conv2d(prompt_dim, hidden_dim, kernel_size=3, stride=1, padding=1, bias=bias) |
|
|
) |
|
|
self.mlp_gama = nn.Conv2d(hidden_dim, dim, kernel_size=3, stride=1, padding=1, bias=bias) |
|
|
self.mlp_beta = nn.Conv2d(hidden_dim, dim, kernel_size=3, stride=1, padding=1, bias=bias) |
|
|
|
|
|
self.mlp_gama.apply(init_weights_zero) |
|
|
self.mlp_beta.apply(init_weights_zero) |
|
|
|
|
|
|
|
|
def init_weights_zero(self, m): |
|
|
if isinstance(m, nn.Conv2d): |
|
|
nn.init.constant_(m.weight, 0) |
|
|
if m.bias is not None: |
|
|
nn.init.constant_(m.bias, 0) |
|
|
|
|
|
def forward(self, x, prompt): |
|
|
b, c, h, w = x.shape |
|
|
prompt = self.shared_mlp(prompt) |
|
|
prompt = prompt.expand(b, -1, -1, -1) |
|
|
|
|
|
gama = self.mlp_gama(prompt) |
|
|
beta = self.mlp_beta(prompt) |
|
|
|
|
|
x = x *( 1 + gama) + beta |
|
|
return x |
|
|
|
|
|
class DepthwiseSeparableConv(nn.Module): |
|
|
def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1): |
|
|
super(DepthwiseSeparableConv, self).__init__() |
|
|
|
|
|
|
|
|
self.depthwise_conv = nn.Conv2d( |
|
|
in_channels, |
|
|
in_channels, |
|
|
kernel_size=kernel_size, |
|
|
stride=stride, |
|
|
padding=padding, |
|
|
groups=in_channels, |
|
|
bias=False |
|
|
) |
|
|
|
|
|
|
|
|
self.pointwise_conv = nn.Conv2d( |
|
|
in_channels, |
|
|
out_channels, |
|
|
kernel_size=1, |
|
|
stride=1, |
|
|
padding=0, |
|
|
bias=False |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
out = self.depthwise_conv(x) |
|
|
out = self.pointwise_conv(out) |
|
|
return out |
|
|
|
|
|
class SFT(nn.Module): |
|
|
def __init__(self, x_dim, prompt_dim=192, ks=3, nhidden=128): |
|
|
super(SFT, self).__init__() |
|
|
pw = ks // 2 |
|
|
self.mlp_shared = nn.Sequential( |
|
|
nn.Conv2d(prompt_dim, nhidden, kernel_size=1), |
|
|
nn.ReLU() |
|
|
) |
|
|
self.mlp_gama = DepthwiseSeparableConv(nhidden, x_dim, kernel_size=ks, padding=pw) |
|
|
self.mlp_beta = DepthwiseSeparableConv(nhidden, x_dim, kernel_size=ks, padding=pw) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self, x, prompt): |
|
|
actv = self.mlp_shared(prompt) |
|
|
gama = self.mlp_gama(actv) |
|
|
beta = self.mlp_beta(actv) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out = x * (1 + gama) + beta |
|
|
return out |
|
|
|
|
|
def init_weights_gama(m): |
|
|
if isinstance(m, nn.Conv2d): |
|
|
nn.init.constant_(m.weight, 0) |
|
|
center = m.kernel_size[0] // 2 |
|
|
if m.groups == m.in_channels and m.in_channels == m.out_channels: |
|
|
nn.init.constant_(m.weight[:, :, center, center], 1) |
|
|
else: |
|
|
nn.init.constant_(m.weight, 1) |
|
|
if m.bias is not None: |
|
|
nn.init.constant_(m.bias, 0) |
|
|
|
|
|
class Decoder_w_Prompt(nn.Module): |
|
|
def __init__(self, *, ch, out_ch, ch_mult=(1,2,4,8), num_res_blocks, |
|
|
attn_resolutions, dropout=0.0, resamp_with_conv=True, in_channels, |
|
|
resolution, z_channels, give_pre_end=False, **ignorekwargs): |
|
|
super().__init__() |
|
|
self.ch = ch |
|
|
self.temb_ch = 0 |
|
|
self.num_resolutions = len(ch_mult) |
|
|
self.num_res_blocks = num_res_blocks |
|
|
self.resolution = resolution |
|
|
self.in_channels = in_channels |
|
|
self.give_pre_end = give_pre_end |
|
|
|
|
|
|
|
|
in_ch_mult = (1,)+tuple(ch_mult) |
|
|
block_in = ch*ch_mult[self.num_resolutions-1] |
|
|
curr_res = resolution // 2**(self.num_resolutions-1) |
|
|
self.z_shape = (1,z_channels,curr_res,curr_res) |
|
|
print("Working with z of shape {} = {} dimensions.".format( |
|
|
self.z_shape, np.prod(self.z_shape))) |
|
|
|
|
|
|
|
|
self.conv_in = torch.nn.Conv2d(z_channels, |
|
|
block_in, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1) |
|
|
|
|
|
|
|
|
self.mid = nn.Module() |
|
|
self.mid.block_1 = ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_in, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout) |
|
|
self.mid.block_2 = ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_in, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout) |
|
|
|
|
|
|
|
|
self.up = nn.ModuleList() |
|
|
for i_level in reversed(range(self.num_resolutions)): |
|
|
block = nn.ModuleList() |
|
|
block_out = ch*ch_mult[i_level] |
|
|
for i_block in range(self.num_res_blocks): |
|
|
block.append(ResnetBlock(in_channels=block_in, |
|
|
out_channels=block_out, |
|
|
temb_channels=self.temb_ch, |
|
|
dropout=dropout)) |
|
|
|
|
|
block_in = block_out |
|
|
up = nn.Module() |
|
|
up.block = block |
|
|
if i_level != 0: |
|
|
if i_level == 1: |
|
|
up.prompt = PromptGenBlock(prompt_dim=64, prompt_size=16) |
|
|
up.prompt_attn = Attention(dim=128, hidden_dim=64, bias=True, prompt_dim=64) |
|
|
|
|
|
elif i_level == 2: |
|
|
up.prompt = PromptGenBlock(prompt_dim=128, prompt_size=16) |
|
|
|
|
|
up.prompt_attn = Attention(dim=256, hidden_dim=128, bias=True, prompt_dim=128) |
|
|
elif i_level == 3: |
|
|
up.prompt = PromptGenBlock(prompt_dim=128, prompt_size=32) |
|
|
|
|
|
up.prompt_attn = Attention(dim=256, hidden_dim=128, bias=True, prompt_dim=128) |
|
|
elif i_level == 4: |
|
|
up.prompt = PromptGenBlock(prompt_dim=256, prompt_size=32) |
|
|
|
|
|
up.prompt_attn = Attention(dim=512, hidden_dim=192, bias=True, prompt_dim=256) |
|
|
up.upsample = Upsample(block_in, resamp_with_conv) |
|
|
curr_res = curr_res * 2 |
|
|
self.up.insert(0, up) |
|
|
|
|
|
|
|
|
self.norm_out = Normalize(block_in) |
|
|
self.conv_out = torch.nn.Conv2d(block_in, |
|
|
out_ch, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1) |
|
|
|
|
|
def forward(self, z): |
|
|
|
|
|
self.last_z_shape = z.shape |
|
|
|
|
|
|
|
|
temb = None |
|
|
|
|
|
|
|
|
h = self.conv_in(z) |
|
|
|
|
|
|
|
|
h = self.mid.block_1(h, temb) |
|
|
h = self.mid.block_2(h, temb) |
|
|
|
|
|
|
|
|
for i_level in reversed(range(self.num_resolutions)): |
|
|
for i_block in range(self.num_res_blocks): |
|
|
h = self.up[i_level].block[i_block](h, temb) |
|
|
if i_level != 0: |
|
|
|
|
|
prompt = self.up[i_level].prompt(h) |
|
|
|
|
|
h = self.up[i_level].prompt_attn(h, prompt) |
|
|
h = self.up[i_level].upsample(h) |
|
|
|
|
|
if self.give_pre_end: |
|
|
return h |
|
|
|
|
|
h = self.norm_out(h) |
|
|
h = nonlinearity(h) |
|
|
h = self.conv_out(h) |
|
|
return h |