| | import torch |
| | from typing import Optional, Tuple, Union |
| | from diffusers import UNet2DConditionModel |
| | from diffusers.models.attention_processor import Attention |
| | from diffusers.models.unets.unet_2d_condition import UNet2DConditionOutput |
| |
|
| |
|
| | def switch_multiview_processor(model, enable_filter=lambda x:True): |
| | def recursive_add_processors(name: str, module: torch.nn.Module): |
| | for sub_name, child in module.named_children(): |
| | recursive_add_processors(f"{name}.{sub_name}", child) |
| |
|
| | if isinstance(module, Attention): |
| | processor = module.get_processor() |
| | if isinstance(processor, multiviewAttnProc): |
| | processor.enabled = enable_filter(f"{name}.processor") |
| |
|
| | for name, module in model.named_children(): |
| | recursive_add_processors(name, module) |
| |
|
| |
|
| | def add_multiview_processor(model: torch.nn.Module, enable_filter=lambda x:True, **kwargs): |
| | return_dict = torch.nn.ModuleDict() |
| | def recursive_add_processors(name: str, module: torch.nn.Module): |
| | for sub_name, child in module.named_children(): |
| | if "ref_unet" not in (sub_name + name): |
| | recursive_add_processors(f"{name}.{sub_name}", child) |
| |
|
| | if isinstance(module, Attention): |
| | new_processor = multiviewAttnProc( |
| | chained_proc=module.get_processor(), |
| | enabled=enable_filter(f"{name}.processor"), |
| | name=f"{name}.processor", |
| | hidden_states_dim=module.inner_dim, |
| | **kwargs |
| | ) |
| | module.set_processor(new_processor) |
| | return_dict[f"{name}.processor".replace(".", "__")] = new_processor |
| |
|
| | for name, module in model.named_children(): |
| | recursive_add_processors(name, module) |
| |
|
| | return return_dict |
| |
|
| |
|
| | class multiviewAttnProc(torch.nn.Module): |
| | def __init__( |
| | self, |
| | chained_proc, |
| | enabled=False, |
| | name=None, |
| | hidden_states_dim=None, |
| | chain_pos="parralle", |
| | num_modalities=1, |
| | views=4, |
| | base_img_size=64, |
| | ) -> None: |
| | super().__init__() |
| | self.enabled = enabled |
| | self.chained_proc = chained_proc |
| | self.name = name |
| | self.hidden_states_dim = hidden_states_dim |
| | self.num_modalities = num_modalities |
| | self.views = views |
| | self.base_img_size = base_img_size |
| | self.chain_pos = chain_pos |
| | self.diff_joint_attn = True |
| |
|
| | def __call__( |
| | self, |
| | attn: Attention, |
| | hidden_states: torch.FloatTensor, |
| | encoder_hidden_states: Optional[torch.FloatTensor] = None, |
| | attention_mask: Optional[torch.FloatTensor] = None, |
| | **kwargs |
| | ) -> torch.Tensor: |
| | if not self.enabled: |
| | return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| | |
| | B, L, C = hidden_states.shape |
| | mv = self.views |
| | hidden_states = hidden_states.reshape(B // mv, mv, L, C).reshape(-1, mv * L, C) |
| | hidden_states = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) |
| | return hidden_states.reshape(B // mv, mv, L, C).reshape(-1, L, C) |
| |
|
| |
|
| |
|
| | class UnifieldWrappedUNet(UNet2DConditionModel): |
| | def __init__( |
| | self, |
| | sample_size: Optional[int] = None, |
| | in_channels: int = 4, |
| | out_channels: int = 4, |
| | center_input_sample: bool = False, |
| | flip_sin_to_cos: bool = True, |
| | freq_shift: int = 0, |
| | down_block_types: Tuple[str] = ( |
| | "CrossAttnDownBlock2D", |
| | "CrossAttnDownBlock2D", |
| | "CrossAttnDownBlock2D", |
| | "DownBlock2D", |
| | ), |
| | mid_block_type: Optional[str] = "UNetMidBlock2DCrossAttn", |
| | up_block_types: Tuple[str] = ("UpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D"), |
| | only_cross_attention: Union[bool, Tuple[bool]] = False, |
| | block_out_channels: Tuple[int] = (320, 640, 1280, 1280), |
| | layers_per_block: Union[int, Tuple[int]] = 2, |
| | downsample_padding: int = 1, |
| | mid_block_scale_factor: float = 1, |
| | dropout: float = 0.0, |
| | act_fn: str = "silu", |
| | norm_num_groups: Optional[int] = 32, |
| | norm_eps: float = 1e-5, |
| | cross_attention_dim: Union[int, Tuple[int]] = 1280, |
| | transformer_layers_per_block: Union[int, Tuple[int], Tuple[Tuple]] = 1, |
| | reverse_transformer_layers_per_block: Optional[Tuple[Tuple[int]]] = None, |
| | encoder_hid_dim: Optional[int] = None, |
| | encoder_hid_dim_type: Optional[str] = None, |
| | attention_head_dim: Union[int, Tuple[int]] = 8, |
| | num_attention_heads: Optional[Union[int, Tuple[int]]] = None, |
| | dual_cross_attention: bool = False, |
| | use_linear_projection: bool = False, |
| | class_embed_type: Optional[str] = None, |
| | addition_embed_type: Optional[str] = None, |
| | addition_time_embed_dim: Optional[int] = None, |
| | num_class_embeds: Optional[int] = None, |
| | upcast_attention: bool = False, |
| | resnet_time_scale_shift: str = "default", |
| | resnet_skip_time_act: bool = False, |
| | resnet_out_scale_factor: float = 1.0, |
| | time_embedding_type: str = "positional", |
| | time_embedding_dim: Optional[int] = None, |
| | time_embedding_act_fn: Optional[str] = None, |
| | timestep_post_act: Optional[str] = None, |
| | time_cond_proj_dim: Optional[int] = None, |
| | conv_in_kernel: int = 3, |
| | conv_out_kernel: int = 3, |
| | projection_class_embeddings_input_dim: Optional[int] = None, |
| | attention_type: str = "default", |
| | class_embeddings_concat: bool = False, |
| | mid_block_only_cross_attention: Optional[bool] = None, |
| | cross_attention_norm: Optional[str] = None, |
| | addition_embed_type_num_heads: int = 64, |
| | multiview_attn_position: str = "attn1", |
| | n_views: int = 4, |
| | num_modalities: int = 1, |
| | latent_size: int = 64, |
| | multiview_chain_pose: str = "parralle", |
| | **kwargs |
| | ): |
| | super().__init__(**{ |
| | k: v for k, v in locals().items() if k not in |
| | ["self", "kwargs", "__class__", "multiview_attn_position", "n_views", "num_modalities", "latent_size", "multiview_chain_pose"] |
| | }) |
| | add_multiview_processor( |
| | model = self, |
| | enable_filter = lambda name: name.endswith(f"{multiview_attn_position}.processor"), |
| | num_modalities = num_modalities, |
| | base_img_size = latent_size, |
| | chain_pos = multiview_chain_pose, |
| | views=n_views |
| | ) |
| | |
| | switch_multiview_processor(self, enable_filter=lambda name: name.endswith(f"{multiview_attn_position}.processor")) |
| |
|
| | def __call__( |
| | self, |
| | sample: torch.Tensor, |
| | timestep: Union[torch.Tensor, float, int], |
| | encoder_hidden_states: torch.Tensor, |
| | condition_latens: torch.Tensor = None, |
| | class_labels: Optional[torch.Tensor] = None, |
| | ) -> Union[UNet2DConditionOutput, Tuple]: |
| | sample = torch.cat([sample, condition_latens], dim=1) |
| | return self.forward( |
| | sample, timestep, encoder_hidden_states, class_labels=class_labels, |
| | ) |