# adopted from # https://github.com/openai/improved-diffusion/blob/main/improved_diffusion/gaussian_diffusion.py # and # https://github.com/lucidrains/denoising-diffusion-pytorch/blob/7706bdfc6f527f58d33f84b7b522e61e6e3164b3/denoising_diffusion_pytorch/denoising_diffusion_pytorch.py # and # https://github.com/openai/guided-diffusion/blob/0ba878e517b276c45d1195eb29f6f5f72659a05b/guided_diffusion/nn.py # # thanks! import torch.nn as nn from utils.common_utils import instantiate_from_config import math from inspect import isfunction import torch from torch import nn import torch.distributed as dist def gather_data(data, return_np=True): """gather data from multiple processes to one list""" data_list = [torch.zeros_like(data) for _ in range(dist.get_world_size())] dist.all_gather(data_list, data) # gather not supported with NCCL if return_np: data_list = [data.cpu().numpy() for data in data_list] return data_list def autocast(f): def do_autocast(*args, **kwargs): with torch.cuda.amp.autocast( enabled=True, dtype=torch.get_autocast_gpu_dtype(), cache_enabled=torch.is_autocast_cache_enabled(), ): return f(*args, **kwargs) return do_autocast def extract_into_tensor(a, t, x_shape): b, *_ = t.shape out = a.gather(-1, t) return out.reshape(b, *((1,) * (len(x_shape) - 1))) def noise_like(shape, device, repeat=False): repeat_noise = lambda: torch.randn((1, *shape[1:]), device=device).repeat( shape[0], *((1,) * (len(shape) - 1)) ) noise = lambda: torch.randn(shape, device=device) return repeat_noise() if repeat else noise() def default(val, d): if exists(val): return val return d() if isfunction(d) else d def exists(val): return val is not None def identity(*args, **kwargs): return nn.Identity() def uniq(arr): return {el: True for el in arr}.keys() def mean_flat(tensor): """ Take the mean over all non-batch dimensions. """ return tensor.mean(dim=list(range(1, len(tensor.shape)))) def ismap(x): if not isinstance(x, torch.Tensor): return False return (len(x.shape) == 4) and (x.shape[1] > 3) def isimage(x): if not isinstance(x, torch.Tensor): return False return (len(x.shape) == 4) and (x.shape[1] == 3 or x.shape[1] == 1) def max_neg_value(t): return -torch.finfo(t.dtype).max def shape_to_str(x): shape_str = "x".join([str(x) for x in x.shape]) return shape_str def init_(tensor): dim = tensor.shape[-1] std = 1 / math.sqrt(dim) tensor.uniform_(-std, std) return tensor ckpt = torch.utils.checkpoint.checkpoint def checkpoint(func, inputs, params, flag): """ Evaluate a function without caching intermediate activations, allowing for reduced memory at the expense of extra compute in the backward pass. :param func: the function to evaluate. :param inputs: the argument sequence to pass to `func`. :param params: a sequence of parameters `func` depends on but does not explicitly take as arguments. :param flag: if False, disable gradient checkpointing. """ if flag: return ckpt(func, *inputs) else: return func(*inputs) def disabled_train(self, mode=True): """Overwrite model.train with this function to make sure train/eval mode does not change anymore.""" return self def zero_module(module): """ Zero out the parameters of a module and return it. """ for p in module.parameters(): p.detach().zero_() return module def scale_module(module, scale): """ Scale the parameters of a module and return it. """ for p in module.parameters(): p.detach().mul_(scale) return module def conv_nd(dims, *args, **kwargs): """ Create a 1D, 2D, or 3D convolution module. """ if dims == 1: return nn.Conv1d(*args, **kwargs) elif dims == 2: return nn.Conv2d(*args, **kwargs) elif dims == 3: return nn.Conv3d(*args, **kwargs) raise ValueError(f"unsupported dimensions: {dims}") def linear(*args, **kwargs): """ Create a linear module. """ return nn.Linear(*args, **kwargs) def avg_pool_nd(dims, *args, **kwargs): """ Create a 1D, 2D, or 3D average pooling module. """ if dims == 1: return nn.AvgPool1d(*args, **kwargs) elif dims == 2: return nn.AvgPool2d(*args, **kwargs) elif dims == 3: return nn.AvgPool3d(*args, **kwargs) raise ValueError(f"unsupported dimensions: {dims}") def nonlinearity(type="silu"): if type == "silu": return nn.SiLU() elif type == "leaky_relu": return nn.LeakyReLU() class GroupNormSpecific(nn.GroupNorm): def forward(self, x): if x.dtype == torch.float16 or x.dtype == torch.bfloat16: return super().forward(x).type(x.dtype) else: return super().forward(x.float()).type(x.dtype) def normalization(channels, num_groups=32): """ Make a standard normalization layer. :param channels: number of input channels. :return: an nn.Module for normalization. """ return GroupNormSpecific(num_groups, channels) class HybridConditioner(nn.Module): def __init__(self, c_concat_config, c_crossattn_config): super().__init__() self.concat_conditioner = instantiate_from_config(c_concat_config) self.crossattn_conditioner = instantiate_from_config(c_crossattn_config) def forward(self, c_concat, c_crossattn): c_concat = self.concat_conditioner(c_concat) c_crossattn = self.crossattn_conditioner(c_crossattn) return {"c_concat": [c_concat], "c_crossattn": [c_crossattn]}