| | from typing import Optional |
| |
|
| | from dataclasses import dataclass, field |
| | from diffusers.models import AutoencoderKL, UNet2DConditionModel |
| |
|
| |
|
| | import torch |
| | from torch import nn |
| |
|
| |
|
| | from dataclasses import dataclass |
| |
|
| |
|
| |
|
| | @dataclass |
| | class BaseModelConfig: |
| | pass |
| |
|
| |
|
| | from diffusers import AutoencoderKL, UNet2DConditionModel |
| | from trainer.noise_schedulers.scheduling_ddpm_zerosnr import DDPMScheduler |
| |
|
| | from transformers import CLIPTextModel, CLIPTokenizer |
| | from diffusers.training_utils import EMAModel |
| |
|
| | from diffusers.utils import logging |
| |
|
| | from diffusers.utils.hub_utils import PushToHubMixin |
| |
|
| | from diffusers.models.modeling_utils import ModelMixin |
| |
|
| | from diffusers.configuration_utils import ConfigMixin |
| |
|
| | logger = logging.get_logger(__name__) |
| |
|
| | |
| | from peft import get_peft_model |
| |
|
| | from layers import PositionalEncodingPermute1D |
| | from einops import rearrange, repeat |
| |
|
| | from typing import Optional |
| | from omegaconf import II |
| |
|
| |
|
| | @dataclass |
| | class LoraConfig: |
| | _target_: str = "peft.LoraConfig" |
| | r: int = 8 |
| | lora_alpha: int =32 |
| | target_modules: list = field(default_factory=lambda: ["to_q", "to_v", "query", "value"]) |
| | lora_dropout: float =0.0 |
| | bias: str ="none" |
| |
|
| |
|
| | @dataclass |
| | class SDModelConfig(BaseModelConfig): |
| | _target_: str = "trainer.models.sd_model.SDModel" |
| | pretrained_model_name_or_path: str = "runwayml/stable-diffusion-v1-5" |
| | conditioning_dropout_prob: float = 0.05 |
| | use_ema: bool = True |
| | concat_all_steps: bool = False |
| | positional_encoding_type: Optional[str] = "sinusoidal" |
| | positional_encoding_length: Optional[int] = None |
| | image_positional_encoding_type: Optional[str] = None |
| | image_positional_encoding_length: Optional[int] = None |
| | broadcast_positional_encoding: bool = True |
| | sequence_length: Optional[int] = 6 |
| | text_sequence_length: Optional[int] = 7 |
| | use_lora: bool = False |
| | |
| | zero_snr: bool = True |
| | |
| | |
| | |
| | |
| | |
| | class SDModel(ModelMixin, ConfigMixin, PushToHubMixin): |
| | def __init__(self, cfg: SDModelConfig = None) -> None: |
| | super().__init__() |
| | if cfg is None: |
| | cfg = SDModelConfig() |
| | else: |
| | self.cfg = cfg |
| | self.noise_scheduler = DDPMScheduler.from_pretrained( |
| | self.cfg.pretrained_model_name_or_path, |
| | subfolder="scheduler", |
| | zero_snr=self.cfg.zero_snr) |
| | |
| | |
| | |
| | self.text_encoder = CLIPTextModel.from_pretrained( |
| | self.cfg.pretrained_model_name_or_path, subfolder="text_encoder", |
| | ) |
| | self.tokenizer = CLIPTokenizer.from_pretrained( |
| | self.cfg.pretrained_model_name_or_path, subfolder="tokenizer" |
| | ) |
| | |
| | self.vae = AutoencoderKL.from_pretrained(self.cfg.pretrained_model_name_or_path, subfolder="vae") |
| | self.unet = UNet2DConditionModel.from_pretrained( |
| | self.cfg.pretrained_model_name_or_path, subfolder="unet" |
| | ) |
| | |
| | in_channels = 8 |
| | out_channels = self.unet.conv_in.out_channels |
| | self.unet.register_to_config(in_channels=in_channels) |
| |
|
| | with torch.no_grad(): |
| | new_conv_in = nn.Conv2d( |
| | in_channels, out_channels, self.unet.conv_in.kernel_size, self.unet.conv_in.stride, self.unet.conv_in.padding |
| | ) |
| | new_conv_in.weight.zero_() |
| | new_conv_in.weight[:, :4, :, :].copy_(self.unet.conv_in.weight) |
| | new_conv_in.bias.copy_(self.unet.conv_in.bias) |
| | self.unet.conv_in = new_conv_in |
| | |
| | self.init_pos() |
| | self.init_image_pos() |
| | |
| | |
| | if self.cfg.use_lora: |
| | config = LoraConfig( |
| | r=8, |
| | lora_alpha=32, |
| | target_modules=["to_q", "to_v", "query", "value"], |
| | lora_dropout=0.0, |
| | bias="none", |
| | ) |
| | self.unet = get_peft_model(self.unet, config) |
| | self.unet.conv_in.requires_grad_(True) |
| | self.unet.print_trainable_parameters() |
| | print(self.unet) |
| | |
| | self.vae.requires_grad_(False) |
| | self.text_encoder.requires_grad_(False) |
| | |
| | |
| | |
| | if self.cfg.use_ema: |
| | self.ema_unet = EMAModel(self.unet.parameters(), model_cls=UNet2DConditionModel, model_config=self.unet.config) |
| | |
| | self.generator = None |
| |
|
| | def init_pos(self): |
| | self.cfg.positional_encoding_length = self.cfg.text_sequence_length |
| | if not self.cfg.broadcast_positional_encoding: |
| | self.cfg.positional_encoding_length *= 77 |
| | elif self.cfg.positional_encoding_type == 'sinusoidal': |
| | self.unet.pos = PositionalEncodingPermute1D(self.cfg.positional_encoding_length) |
| | elif self.cfg.positional_encoding_type is None or self.cfg.positional_encoding_type == 'None': |
| | self.unet.pos = nn.Identity() |
| | else: |
| | raise ValueError(f'Unknown positional encoding type {self.cfg.positional_encoding_type}') |
| | |
| | def init_image_pos(self): |
| | self.cfg.image_positional_encoding_length = self.cfg.sequence_length |
| | if self.cfg.image_positional_encoding_type == 'sinusoidal': |
| | self.unet.image_pos = PositionalEncodingPermute1D(self.cfg.image_positional_encoding_length) |
| | elif self.cfg.image_positional_encoding_type is None: |
| | self.unet.image_pos = nn.Identity() |
| | else: |
| | raise ValueError(f'Unknown image positional encoding type {self.cfg.image_positional_encoding_type}') |
| | |
| | def tokenize_captions(self, captions): |
| | inputs = self.tokenizer( |
| | captions, max_length=self.tokenizer.model_max_length, padding="max_length", truncation=True, return_tensors="pt" |
| | ) |
| | return inputs.input_ids |
| | |
| | def forward(self, batch): |
| | batch_size = batch["input_ids"].shape[0] |
| | condition_image = batch["original_pixel_values"] |
| | input_ids = batch["input_ids"].to(self.text_encoder.device) |
| | |
| | |
| | |
| | edited_images = batch["edited_pixel_values"] |
| | output_seq_length = edited_images.shape[1] |
| | |
| | edited_images = rearrange(edited_images, 'b s c h w -> (b s) c h w') |
| | |
| | latents = self.vae.encode(edited_images).latent_dist.sample() |
| | latents = latents * self.vae.config.scaling_factor |
| | |
| | latents = rearrange(latents, '(b s) c h w -> b c (s h) w', s=output_seq_length) |
| | |
| | |
| | noise = torch.randn_like(latents) |
| | bsz = latents.shape[0] |
| | |
| | timesteps = torch.randint(0, self.noise_scheduler.config.num_train_timesteps, (bsz,), device=latents.device) |
| | timesteps = timesteps.long() |
| |
|
| | |
| | |
| | noisy_latents = self.noise_scheduler.add_noise(latents, noise, timesteps) |
| | |
| | if self.cfg.image_positional_encoding_type is not None: |
| | latents = self.apply_image_positional_encoding(noisy_latents, output_seq_length) |
| | |
| | if len(input_ids.shape) == 2: |
| | input_ids = input_ids.unsqueeze(0) |
| |
|
| | encoder_hidden_states = self.input_ids_to_text_condition(input_ids) |
| | if self.cfg.positional_encoding_type is not None: |
| | encoder_hidden_states = self.apply_step_positional_encoding(encoder_hidden_states) |
| |
|
| | |
| | |
| | original_image_embeds = self.vae.encode(condition_image).latent_dist.mode() |
| |
|
| | |
| | |
| | if self.cfg.conditioning_dropout_prob is not None: |
| | encoder_hidden_states, original_image_embeds = self.apply_conditioning_dropout(encoder_hidden_states, original_image_embeds) |
| |
|
| | |
| | |
| | original_image_embeds = repeat(original_image_embeds, 'b c h w -> b c (s h) w', s=output_seq_length) |
| | |
| | |
| | concatenated_noisy_latents = torch.cat([noisy_latents, original_image_embeds], dim=1) |
| |
|
| | target = self.get_loss_target(latents, noise, timesteps) |
| |
|
| | |
| | model_pred = self.unet(concatenated_noisy_latents, timesteps, encoder_hidden_states).sample |
| | return model_pred, target |
| |
|
| | def get_loss_target(self, latents, noise, timesteps): |
| | |
| | if self.noise_scheduler.config.prediction_type == "epsilon": |
| | target = noise |
| | elif self.noise_scheduler.config.prediction_type == "v_prediction": |
| | target = self.noise_scheduler.get_velocity(latents, noise, timesteps) |
| | else: |
| | raise ValueError(f"Unknown prediction type {self.noise_scheduler.config.prediction_type}") |
| | return target |
| |
|
| | def apply_conditioning_dropout(self, encoder_hidden_states, original_image_embeds): |
| | bsz = original_image_embeds.shape[0] |
| | random_p = torch.rand(bsz, device=encoder_hidden_states.device, generator=self.generator) |
| | |
| | prompt_mask = random_p < 2 * self.cfg.conditioning_dropout_prob |
| | prompt_mask = prompt_mask.reshape(bsz, 1, 1) |
| | |
| | null_conditioning = self.get_null_conditioning() |
| | encoder_hidden_states = torch.where(prompt_mask, null_conditioning, encoder_hidden_states) |
| |
|
| | |
| | image_mask_dtype = original_image_embeds.dtype |
| | image_mask = 1 - ( |
| | (random_p >= self.cfg.conditioning_dropout_prob).to(image_mask_dtype) |
| | * (random_p < 3 * self.cfg.conditioning_dropout_prob).to(image_mask_dtype) |
| | ) |
| | image_mask = image_mask.reshape(bsz, 1, 1, 1) |
| | |
| | original_image_embeds = image_mask * original_image_embeds |
| | return encoder_hidden_states,original_image_embeds |
| |
|
| | def get_null_conditioning(self): |
| | null_token = self.tokenize_captions([""]).to(self.text_encoder.device) |
| | |
| | null_conditioning = self.text_encoder(null_token)[0] |
| | if not self.cfg.concat_all_steps: |
| | null_conditioning = repeat(null_conditioning, 'b t l -> b (s t) l', s=self.cfg.text_sequence_length) |
| | return null_conditioning |
| |
|
| | def input_ids_to_text_condition(self, input_ids): |
| | |
| | if self.cfg.concat_all_steps: |
| | encoder_hidden_states = self.text_encoder(input_ids)[0] |
| | else: |
| | input_ids = rearrange(input_ids, 'b s t->(b s) t') |
| | encoder_hidden_states = self.text_encoder(input_ids)[0] |
| | |
| | |
| | |
| | encoder_hidden_states = rearrange(encoder_hidden_states, '(b s) t d->b (s t) d', s=self.cfg.text_sequence_length) |
| |
|
| | return encoder_hidden_states |
| |
|
| | def apply_step_positional_encoding(self, encoder_hidden_states): |
| | positional_encoding = self.unet.pos(encoder_hidden_states) |
| | if self.cfg.broadcast_positional_encoding: |
| | positional_encoding = repeat(positional_encoding, 'b s d -> b (s t) d', t=77) |
| | encoder_hidden_states = positional_encoding + encoder_hidden_states |
| | return encoder_hidden_states |
| | |
| | def apply_image_positional_encoding(self, latents, output_seq_length): |
| | original_latents_shape = latents.shape |
| | h = original_latents_shape[2]//output_seq_length |
| | latents = rearrange(latents, 'b c (s h) w -> b s (c h w)', s=output_seq_length) |
| | image_pos = self.unet.image_pos(latents) |
| | latents = latents + image_pos |
| | latents = rearrange(latents, 'b s (c h w) -> b c (s h) w', s=output_seq_length, c=original_latents_shape[1], h=h, w=original_latents_shape[3]) |
| | return latents |
| | |
| | def instantiate_pipeline(self): |
| | pass |