| | import torch |
| | from .sd_unet import Timesteps, ResnetBlock, AttentionBlock, PushBlock, DownSampler |
| | from .sdxl_unet import SDXLUNet |
| | from .tiler import TileWorker |
| | from .sd_controlnet import ControlNetConditioningLayer |
| | from collections import OrderedDict |
| |
|
| |
|
| |
|
| | class QuickGELU(torch.nn.Module): |
| |
|
| | def forward(self, x: torch.Tensor): |
| | return x * torch.sigmoid(1.702 * x) |
| |
|
| |
|
| |
|
| | class ResidualAttentionBlock(torch.nn.Module): |
| |
|
| | def __init__(self, d_model: int, n_head: int, attn_mask: torch.Tensor = None): |
| | super().__init__() |
| |
|
| | self.attn = torch.nn.MultiheadAttention(d_model, n_head) |
| | self.ln_1 = torch.nn.LayerNorm(d_model) |
| | self.mlp = torch.nn.Sequential(OrderedDict([ |
| | ("c_fc", torch.nn.Linear(d_model, d_model * 4)), |
| | ("gelu", QuickGELU()), |
| | ("c_proj", torch.nn.Linear(d_model * 4, d_model)) |
| | ])) |
| | self.ln_2 = torch.nn.LayerNorm(d_model) |
| | self.attn_mask = attn_mask |
| |
|
| | def attention(self, x: torch.Tensor): |
| | self.attn_mask = self.attn_mask.to(dtype=x.dtype, device=x.device) if self.attn_mask is not None else None |
| | return self.attn(x, x, x, need_weights=False, attn_mask=self.attn_mask)[0] |
| |
|
| | def forward(self, x: torch.Tensor): |
| | x = x + self.attention(self.ln_1(x)) |
| | x = x + self.mlp(self.ln_2(x)) |
| | return x |
| |
|
| |
|
| |
|
| | class SDXLControlNetUnion(torch.nn.Module): |
| | def __init__(self, global_pool=False): |
| | super().__init__() |
| | self.time_proj = Timesteps(320) |
| | self.time_embedding = torch.nn.Sequential( |
| | torch.nn.Linear(320, 1280), |
| | torch.nn.SiLU(), |
| | torch.nn.Linear(1280, 1280) |
| | ) |
| | self.add_time_proj = Timesteps(256) |
| | self.add_time_embedding = torch.nn.Sequential( |
| | torch.nn.Linear(2816, 1280), |
| | torch.nn.SiLU(), |
| | torch.nn.Linear(1280, 1280) |
| | ) |
| | self.control_type_proj = Timesteps(256) |
| | self.control_type_embedding = torch.nn.Sequential( |
| | torch.nn.Linear(256 * 8, 1280), |
| | torch.nn.SiLU(), |
| | torch.nn.Linear(1280, 1280) |
| | ) |
| | self.conv_in = torch.nn.Conv2d(4, 320, kernel_size=3, padding=1) |
| |
|
| | self.controlnet_conv_in = ControlNetConditioningLayer(channels=(3, 16, 32, 96, 256, 320)) |
| | self.controlnet_transformer = ResidualAttentionBlock(320, 8) |
| | self.task_embedding = torch.nn.Parameter(torch.randn(8, 320)) |
| | self.spatial_ch_projs = torch.nn.Linear(320, 320) |
| |
|
| | self.blocks = torch.nn.ModuleList([ |
| | |
| | ResnetBlock(320, 320, 1280), |
| | PushBlock(), |
| | ResnetBlock(320, 320, 1280), |
| | PushBlock(), |
| | DownSampler(320), |
| | PushBlock(), |
| | |
| | ResnetBlock(320, 640, 1280), |
| | AttentionBlock(10, 64, 640, 2, 2048), |
| | PushBlock(), |
| | ResnetBlock(640, 640, 1280), |
| | AttentionBlock(10, 64, 640, 2, 2048), |
| | PushBlock(), |
| | DownSampler(640), |
| | PushBlock(), |
| | |
| | ResnetBlock(640, 1280, 1280), |
| | AttentionBlock(20, 64, 1280, 10, 2048), |
| | PushBlock(), |
| | ResnetBlock(1280, 1280, 1280), |
| | AttentionBlock(20, 64, 1280, 10, 2048), |
| | PushBlock(), |
| | |
| | ResnetBlock(1280, 1280, 1280), |
| | AttentionBlock(20, 64, 1280, 10, 2048), |
| | ResnetBlock(1280, 1280, 1280), |
| | PushBlock() |
| | ]) |
| |
|
| | self.controlnet_blocks = torch.nn.ModuleList([ |
| | torch.nn.Conv2d(320, 320, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(320, 320, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(320, 320, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(320, 320, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(640, 640, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(640, 640, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(640, 640, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(1280, 1280, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(1280, 1280, kernel_size=(1, 1)), |
| | torch.nn.Conv2d(1280, 1280, kernel_size=(1, 1)), |
| | ]) |
| |
|
| | self.global_pool = global_pool |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.task_id = { |
| | "openpose": 0, |
| | "depth": 1, |
| | "softedge": 2, |
| | "canny": 3, |
| | "lineart": 3, |
| | "lineart_anime": 3, |
| | "tile": 6, |
| | "inpaint": 7 |
| | } |
| |
|
| |
|
| | def fuse_condition_to_input(self, hidden_states, task_id, conditioning): |
| | controlnet_cond = self.controlnet_conv_in(conditioning) |
| | feat_seq = torch.mean(controlnet_cond, dim=(2, 3)) |
| | feat_seq = feat_seq + self.task_embedding[task_id] |
| | x = torch.stack([feat_seq, torch.mean(hidden_states, dim=(2, 3))], dim=1) |
| | x = self.controlnet_transformer(x) |
| |
|
| | alpha = self.spatial_ch_projs(x[:,0]).unsqueeze(-1).unsqueeze(-1) |
| | controlnet_cond_fuser = controlnet_cond + alpha |
| |
|
| | hidden_states = hidden_states + controlnet_cond_fuser |
| | return hidden_states |
| | |
| |
|
| | def forward( |
| | self, |
| | sample, timestep, encoder_hidden_states, |
| | conditioning, processor_id, add_time_id, add_text_embeds, |
| | tiled=False, tile_size=64, tile_stride=32, |
| | unet:SDXLUNet=None, |
| | **kwargs |
| | ): |
| | task_id = self.task_id[processor_id] |
| |
|
| | |
| | t_emb = self.time_proj(timestep).to(sample.dtype) |
| | t_emb = self.time_embedding(t_emb) |
| | |
| | time_embeds = self.add_time_proj(add_time_id) |
| | time_embeds = time_embeds.reshape((add_text_embeds.shape[0], -1)) |
| | add_embeds = torch.concat([add_text_embeds, time_embeds], dim=-1) |
| | add_embeds = add_embeds.to(sample.dtype) |
| | if unet is not None and unet.is_kolors: |
| | add_embeds = unet.add_time_embedding(add_embeds) |
| | else: |
| | add_embeds = self.add_time_embedding(add_embeds) |
| |
|
| | control_type = torch.zeros((sample.shape[0], 8), dtype=sample.dtype, device=sample.device) |
| | control_type[:, task_id] = 1 |
| | control_embeds = self.control_type_proj(control_type.flatten()) |
| | control_embeds = control_embeds.reshape((sample.shape[0], -1)) |
| | control_embeds = control_embeds.to(sample.dtype) |
| | control_embeds = self.control_type_embedding(control_embeds) |
| | time_emb = t_emb + add_embeds + control_embeds |
| |
|
| | |
| | height, width = sample.shape[2], sample.shape[3] |
| | hidden_states = self.conv_in(sample) |
| | hidden_states = self.fuse_condition_to_input(hidden_states, task_id, conditioning) |
| | text_emb = encoder_hidden_states |
| | if unet is not None and unet.is_kolors: |
| | text_emb = unet.text_intermediate_proj(text_emb) |
| | res_stack = [hidden_states] |
| |
|
| | |
| | for i, block in enumerate(self.blocks): |
| | if tiled and not isinstance(block, PushBlock): |
| | _, _, inter_height, _ = hidden_states.shape |
| | resize_scale = inter_height / height |
| | hidden_states = TileWorker().tiled_forward( |
| | lambda x: block(x, time_emb, text_emb, res_stack)[0], |
| | hidden_states, |
| | int(tile_size * resize_scale), |
| | int(tile_stride * resize_scale), |
| | tile_device=hidden_states.device, |
| | tile_dtype=hidden_states.dtype |
| | ) |
| | else: |
| | hidden_states, _, _, _ = block(hidden_states, time_emb, text_emb, res_stack) |
| |
|
| | |
| | controlnet_res_stack = [block(res) for block, res in zip(self.controlnet_blocks, res_stack)] |
| |
|
| | |
| | if self.global_pool: |
| | controlnet_res_stack = [res.mean(dim=(2, 3), keepdim=True) for res in controlnet_res_stack] |
| |
|
| | return controlnet_res_stack |
| |
|
| | @staticmethod |
| | def state_dict_converter(): |
| | return SDXLControlNetUnionStateDictConverter() |
| |
|
| |
|
| |
|
| | class SDXLControlNetUnionStateDictConverter: |
| | def __init__(self): |
| | pass |
| |
|
| | def from_diffusers(self, state_dict): |
| | |
| | block_types = [ |
| | "ResnetBlock", "PushBlock", "ResnetBlock", "PushBlock", "DownSampler", "PushBlock", |
| | "ResnetBlock", "AttentionBlock", "PushBlock", "ResnetBlock", "AttentionBlock", "PushBlock", "DownSampler", "PushBlock", |
| | "ResnetBlock", "AttentionBlock", "PushBlock", "ResnetBlock", "AttentionBlock", "PushBlock", |
| | "ResnetBlock", "AttentionBlock", "ResnetBlock", "PushBlock" |
| | ] |
| |
|
| | |
| | controlnet_rename_dict = { |
| | "controlnet_cond_embedding.conv_in.weight": "controlnet_conv_in.blocks.0.weight", |
| | "controlnet_cond_embedding.conv_in.bias": "controlnet_conv_in.blocks.0.bias", |
| | "controlnet_cond_embedding.blocks.0.weight": "controlnet_conv_in.blocks.2.weight", |
| | "controlnet_cond_embedding.blocks.0.bias": "controlnet_conv_in.blocks.2.bias", |
| | "controlnet_cond_embedding.blocks.1.weight": "controlnet_conv_in.blocks.4.weight", |
| | "controlnet_cond_embedding.blocks.1.bias": "controlnet_conv_in.blocks.4.bias", |
| | "controlnet_cond_embedding.blocks.2.weight": "controlnet_conv_in.blocks.6.weight", |
| | "controlnet_cond_embedding.blocks.2.bias": "controlnet_conv_in.blocks.6.bias", |
| | "controlnet_cond_embedding.blocks.3.weight": "controlnet_conv_in.blocks.8.weight", |
| | "controlnet_cond_embedding.blocks.3.bias": "controlnet_conv_in.blocks.8.bias", |
| | "controlnet_cond_embedding.blocks.4.weight": "controlnet_conv_in.blocks.10.weight", |
| | "controlnet_cond_embedding.blocks.4.bias": "controlnet_conv_in.blocks.10.bias", |
| | "controlnet_cond_embedding.blocks.5.weight": "controlnet_conv_in.blocks.12.weight", |
| | "controlnet_cond_embedding.blocks.5.bias": "controlnet_conv_in.blocks.12.bias", |
| | "controlnet_cond_embedding.conv_out.weight": "controlnet_conv_in.blocks.14.weight", |
| | "controlnet_cond_embedding.conv_out.bias": "controlnet_conv_in.blocks.14.bias", |
| | "control_add_embedding.linear_1.weight": "control_type_embedding.0.weight", |
| | "control_add_embedding.linear_1.bias": "control_type_embedding.0.bias", |
| | "control_add_embedding.linear_2.weight": "control_type_embedding.2.weight", |
| | "control_add_embedding.linear_2.bias": "control_type_embedding.2.bias", |
| | } |
| |
|
| | |
| | name_list = sorted([name for name in state_dict]) |
| | rename_dict = {} |
| | block_id = {"ResnetBlock": -1, "AttentionBlock": -1, "DownSampler": -1, "UpSampler": -1} |
| | last_block_type_with_id = {"ResnetBlock": "", "AttentionBlock": "", "DownSampler": "", "UpSampler": ""} |
| | for name in name_list: |
| | names = name.split(".") |
| | if names[0] in ["conv_in", "conv_norm_out", "conv_out", "task_embedding", "spatial_ch_projs"]: |
| | pass |
| | elif name in controlnet_rename_dict: |
| | names = controlnet_rename_dict[name].split(".") |
| | elif names[0] == "controlnet_down_blocks": |
| | names[0] = "controlnet_blocks" |
| | elif names[0] == "controlnet_mid_block": |
| | names = ["controlnet_blocks", "9", names[-1]] |
| | elif names[0] in ["time_embedding", "add_embedding"]: |
| | if names[0] == "add_embedding": |
| | names[0] = "add_time_embedding" |
| | names[1] = {"linear_1": "0", "linear_2": "2"}[names[1]] |
| | elif names[0] == "control_add_embedding": |
| | names[0] = "control_type_embedding" |
| | elif names[0] == "transformer_layes": |
| | names[0] = "controlnet_transformer" |
| | names.pop(1) |
| | elif names[0] in ["down_blocks", "mid_block", "up_blocks"]: |
| | if names[0] == "mid_block": |
| | names.insert(1, "0") |
| | block_type = {"resnets": "ResnetBlock", "attentions": "AttentionBlock", "downsamplers": "DownSampler", "upsamplers": "UpSampler"}[names[2]] |
| | block_type_with_id = ".".join(names[:4]) |
| | if block_type_with_id != last_block_type_with_id[block_type]: |
| | block_id[block_type] += 1 |
| | last_block_type_with_id[block_type] = block_type_with_id |
| | while block_id[block_type] < len(block_types) and block_types[block_id[block_type]] != block_type: |
| | block_id[block_type] += 1 |
| | block_type_with_id = ".".join(names[:4]) |
| | names = ["blocks", str(block_id[block_type])] + names[4:] |
| | if "ff" in names: |
| | ff_index = names.index("ff") |
| | component = ".".join(names[ff_index:ff_index+3]) |
| | component = {"ff.net.0": "act_fn", "ff.net.2": "ff"}[component] |
| | names = names[:ff_index] + [component] + names[ff_index+3:] |
| | if "to_out" in names: |
| | names.pop(names.index("to_out") + 1) |
| | else: |
| | print(name, state_dict[name].shape) |
| | |
| | rename_dict[name] = ".".join(names) |
| |
|
| | |
| | state_dict_ = {} |
| | for name, param in state_dict.items(): |
| | if name not in rename_dict: |
| | continue |
| | if ".proj_in." in name or ".proj_out." in name: |
| | param = param.squeeze() |
| | state_dict_[rename_dict[name]] = param |
| | return state_dict_ |
| | |
| | def from_civitai(self, state_dict): |
| | return self.from_diffusers(state_dict) |