from typing import Optional import math import re import numpy as np import safetensors.torch import torch import torch.nn as nn import torch.nn.functional as F from torchvision.ops import deform_conv2d # --------------------------------------------------------------------------- # DINOv3 ViT-H/16+ # --------------------------------------------------------------------------- def _rotate_half(x: torch.Tensor) -> torch.Tensor: x1, x2 = x.chunk(2, dim=-1) return torch.cat((-x2, x1), dim=-1) class DinoV3PatchEmbed(nn.Module): def __init__(self, patch_size=16, in_chans=3, embed_dim=1280): super().__init__() self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size, bias=True) def forward(self, x): return self.proj(x).flatten(2).transpose(1, 2) class DinoV3RotaryEmbedding2D(nn.Module): def __init__(self, dim: int, base: float = 100.0): super().__init__() inv_freq = 1.0 / (base ** torch.arange(0, 1, 4.0 / dim, dtype=torch.float32)) self.register_buffer("inv_freq", inv_freq, persistent=False) def forward(self, height: int, width: int, device: torch.device, dtype: torch.dtype): coords_h = torch.arange(0.5, height, dtype=torch.float32, device=device) / height coords_w = torch.arange(0.5, width, dtype=torch.float32, device=device) / width coords = torch.stack(torch.meshgrid(coords_h, coords_w, indexing="ij"), dim=-1) coords = (2.0 * coords - 1.0).flatten(0, 1) angles = (2 * math.pi * coords[:, :, None] * self.inv_freq[None, None, :]).flatten(1, 2).tile(2) cos = angles.cos().unsqueeze(0).unsqueeze(0) sin = angles.sin().unsqueeze(0).unsqueeze(0) return cos.to(dtype=dtype), sin.to(dtype=dtype) class DinoV3Attention(nn.Module): def __init__(self, dim: int, num_heads: int, qkv_bias: tuple = (True, False, True)): super().__init__() self.num_heads = num_heads self.head_dim = dim // num_heads q_bias, k_bias, v_bias = qkv_bias self.q_proj = nn.Linear(dim, dim, bias=q_bias) self.k_proj = nn.Linear(dim, dim, bias=k_bias) self.v_proj = nn.Linear(dim, dim, bias=v_bias) self.o_proj = nn.Linear(dim, dim, bias=True) def forward(self, x: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor, num_prefix_tokens: int = 0) -> torch.Tensor: B, N, C = x.shape q = self.q_proj(x).reshape(B, N, self.num_heads, self.head_dim).transpose(1, 2) k = self.k_proj(x).reshape(B, N, self.num_heads, self.head_dim).transpose(1, 2) v = self.v_proj(x).reshape(B, N, self.num_heads, self.head_dim).transpose(1, 2) if num_prefix_tokens > 0: q_pre, q_pat = q.split((num_prefix_tokens, N - num_prefix_tokens), dim=-2) k_pre, k_pat = k.split((num_prefix_tokens, N - num_prefix_tokens), dim=-2) q = torch.cat((q_pre, q_pat * cos + _rotate_half(q_pat) * sin), dim=-2) k = torch.cat((k_pre, k_pat * cos + _rotate_half(k_pat) * sin), dim=-2) else: q = q * cos + _rotate_half(q) * sin k = k * cos + _rotate_half(k) * sin out = F.scaled_dot_product_attention(q, k, v) return self.o_proj(out.transpose(1, 2).reshape(B, N, C)) class DinoV3MLP(nn.Module): def __init__(self, dim: int, hidden_dim: int, bias: bool = True): super().__init__() self.gate_proj = nn.Linear(dim, hidden_dim, bias=bias) self.up_proj = nn.Linear(dim, hidden_dim, bias=bias) self.down_proj = nn.Linear(hidden_dim, dim, bias=bias) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.down_proj(F.silu(self.gate_proj(x)) * self.up_proj(x)) class DinoV3Block(nn.Module): def __init__(self, dim: int, num_heads: int, mlp_ratio: float = 4.0, qkv_bias: tuple = (True, False, True), layerscale_init: float = 1.0, mlp_bias: bool = True, eps: float = 1e-5): super().__init__() self.norm1 = nn.LayerNorm(dim, eps=eps) self.attn = DinoV3Attention(dim, num_heads, qkv_bias=qkv_bias) self.ls1 = nn.Parameter(torch.ones(dim) * layerscale_init) self.norm2 = nn.LayerNorm(dim, eps=eps) self.mlp = DinoV3MLP(dim, int(dim * mlp_ratio), bias=mlp_bias) self.ls2 = nn.Parameter(torch.ones(dim) * layerscale_init) def forward(self, x: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor, num_prefix_tokens: int = 0) -> torch.Tensor: x = x + self.ls1 * self.attn(self.norm1(x), cos, sin, num_prefix_tokens=num_prefix_tokens) x = x + self.ls2 * self.mlp(self.norm2(x)) return x class DinoV3ViT(nn.Module): def __init__(self, hidden_size: int = 1280, num_heads: int = 20, num_layers: int = 32, patch_size: int = 16, num_register_tokens: int = 4, intermediate_size: int = 5120, layerscale_init: float = 1.0, query_bias: bool = True, key_bias: bool = False, value_bias: bool = True, mlp_bias: bool = True, rope_theta: float = 100.0, layer_norm_eps: float = 1e-5): super().__init__() self.patch_size = patch_size self.num_register_tokens = num_register_tokens self.patch_embed = DinoV3PatchEmbed(patch_size=patch_size, embed_dim=hidden_size) self.cls_token = nn.Parameter(torch.zeros(1, 1, hidden_size)) self.register_tokens = nn.Parameter(torch.zeros(1, num_register_tokens, hidden_size)) self.rope = DinoV3RotaryEmbedding2D(dim=hidden_size // num_heads, base=rope_theta) qkv_bias = (query_bias, key_bias, value_bias) self.blocks = nn.ModuleList([ DinoV3Block(hidden_size, num_heads, mlp_ratio=intermediate_size / hidden_size, qkv_bias=qkv_bias, layerscale_init=layerscale_init, mlp_bias=mlp_bias, eps=layer_norm_eps) for _ in range(num_layers) ]) self.norm = nn.LayerNorm(hidden_size, eps=layer_norm_eps) @property def device(self) -> torch.device: return self.cls_token.device def forward(self, pixel_values: torch.Tensor) -> torch.Tensor: B, _, H, W = pixel_values.shape x = self.patch_embed(pixel_values) hp, wp = H // self.patch_size, W // self.patch_size cos, sin = self.rope(hp, wp, x.device, x.dtype) x = torch.cat([self.cls_token.expand(B, -1, -1), self.register_tokens.expand(B, -1, -1), x], dim=1) num_prefix = 1 + self.num_register_tokens for block in self.blocks: x = block(x, cos, sin, num_prefix_tokens=num_prefix) return self.norm(x) def load_safetensors(self, path: str) -> None: state_dict = safetensors.torch.load_file(path) our_sd = self.state_dict() loaded = {} for hf_key in state_dict: k = (hf_key .replace("embeddings.patch_embeddings.", "patch_embed.proj.") .replace("embeddings.cls_token", "cls_token") .replace("embeddings.mask_token", "mask_token") .replace("embeddings.register_tokens", "register_tokens")) m = re.match(r"layer\.(\d+)\.(.+)", k) if m: rest = m.group(2) for proj in ["q_proj", "k_proj", "v_proj", "o_proj"]: rest = rest.replace(f"attention.{proj}", f"attn.{proj}") rest = (rest.replace("layer_scale1.lambda1", "ls1") .replace("layer_scale2.lambda1", "ls2")) k = f"blocks.{m.group(1)}.{rest}" if k in our_sd: assert state_dict[hf_key].shape == our_sd[k].shape, \ f"Shape mismatch {k}: {state_dict[hf_key].shape} vs {our_sd[k].shape}" loaded[k] = state_dict[hf_key] check_sd = {k: v for k, v in our_sd.items() if k != "mask_token"} missing = set(check_sd) - set(loaded) unexpected = set(loaded) - set(check_sd) if missing: raise KeyError(f"[DINOv3] Missing keys: {missing}") if unexpected: raise KeyError(f"[DINOv3] Unexpected keys: {unexpected}") self.load_state_dict(loaded, strict=True) # --------------------------------------------------------------------------- # Flux2 VAE Encoder # --------------------------------------------------------------------------- class Flux2ResnetBlock(nn.Module): def __init__(self, in_channels, out_channels, use_shortcut=False): super().__init__() self.norm1 = nn.GroupNorm(32, in_channels, eps=1e-6) self.conv1 = nn.Conv2d(in_channels, out_channels, 3, 1, 1) self.norm2 = nn.GroupNorm(32, out_channels, eps=1e-6) self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1) self.conv_shortcut = nn.Conv2d(in_channels, out_channels, 1, 1, 0) if use_shortcut else None def forward(self, x): h = F.silu(self.norm1(x)) h = F.silu(self.norm2(self.conv1(h))) h = self.conv2(h) return h + (self.conv_shortcut(x) if self.conv_shortcut is not None else x) class Flux2Downsampler(nn.Module): def __init__(self, channels): super().__init__() self.conv = nn.Conv2d(channels, channels, 3, 2, 0) def forward(self, x): return self.conv(F.pad(x, (0, 1, 0, 1))) class Flux2Attention(nn.Module): def __init__(self, channels): super().__init__() self.group_norm = nn.GroupNorm(32, channels, eps=1e-6) self.to_q = nn.Linear(channels, channels) self.to_k = nn.Linear(channels, channels) self.to_v = nn.Linear(channels, channels) self.to_out = nn.ModuleList([nn.Linear(channels, channels), nn.Identity()]) def forward(self, x): B, C, H, W = x.shape h = self.group_norm(x).reshape(B, C, H * W).transpose(1, 2) q = self.to_q(h).reshape(B, -1, 1, C).permute(0, 2, 1, 3) k = self.to_k(h).reshape(B, -1, 1, C).permute(0, 2, 1, 3) v = self.to_v(h).reshape(B, -1, 1, C).permute(0, 2, 1, 3) out = F.scaled_dot_product_attention(q, k, v) out = self.to_out[0](out.permute(0, 2, 1, 3).reshape(B, -1, C)) return x + out.transpose(1, 2).reshape(B, C, H, W) class Flux2Encoder(nn.Module): def __init__(self): super().__init__() self.conv_in = nn.Conv2d(3, 128, 3, 1, 1) self.down_0_resnets = nn.ModuleList([Flux2ResnetBlock(128, 128), Flux2ResnetBlock(128, 128)]) self.down_0_sampler = Flux2Downsampler(128) self.down_1_resnets = nn.ModuleList([Flux2ResnetBlock(128, 256, use_shortcut=True), Flux2ResnetBlock(256, 256)]) self.down_1_sampler = Flux2Downsampler(256) self.down_2_resnets = nn.ModuleList([Flux2ResnetBlock(256, 512, use_shortcut=True), Flux2ResnetBlock(512, 512)]) self.down_2_sampler = Flux2Downsampler(512) self.down_3_resnets = nn.ModuleList([Flux2ResnetBlock(512, 512), Flux2ResnetBlock(512, 512)]) self.mid_attn = Flux2Attention(512) self.mid_resnets = nn.ModuleList([Flux2ResnetBlock(512, 512), Flux2ResnetBlock(512, 512)]) self.conv_norm_out = nn.GroupNorm(32, 512, eps=1e-6) self.conv_out = nn.Conv2d(512, 64, 3, 1, 1) def forward(self, x): x = self.conv_in(x) for r in self.down_0_resnets: x = r(x) x = self.down_0_sampler(x) for r in self.down_1_resnets: x = r(x) x = self.down_1_sampler(x) for r in self.down_2_resnets: x = r(x) x = self.down_2_sampler(x) for r in self.down_3_resnets: x = r(x) x = self.mid_resnets[0](x) x = self.mid_attn(x) x = self.mid_resnets[1](x) return self.conv_out(F.silu(self.conv_norm_out(x))) class Flux2VAEEncoder(nn.Module): def __init__(self): super().__init__() self.encoder = Flux2Encoder() self.quant_conv = nn.Conv2d(64, 64, 1, 1, 0) self.bn = nn.BatchNorm1d(128, eps=1e-5, momentum=0.1, affine=False, track_running_stats=True) def load_safetensors(self, path: str): sd = safetensors.torch.load_file(path) remapped = {} for k, v in sd.items(): # Skip the decoder half of a full Flux2-VAE ckpt — we only need the encoder. if k.startswith(("decoder.", "post_quant_conv.")): continue # Comfy / diffusers-style naming → our flattened naming. m = re.match(r"encoder\.down_blocks\.(\d+)\.resnets\.(\d+)\.(.+)", k) if m: remapped[f"encoder.down_{m.group(1)}_resnets.{m.group(2)}.{m.group(3)}"] = v continue m = re.match(r"encoder\.down_blocks\.(\d+)\.downsamplers\.0\.(.+)", k) if m: remapped[f"encoder.down_{m.group(1)}_sampler.{m.group(2)}"] = v continue m = re.match(r"encoder\.mid_block\.resnets\.(\d+)\.(.+)", k) if m: remapped[f"encoder.mid_resnets.{m.group(1)}.{m.group(2)}"] = v continue m = re.match(r"encoder\.mid_block\.attentions\.0\.(.+)", k) if m: remapped[f"encoder.mid_attn.{m.group(1)}"] = v continue remapped[k] = v missing, unexpected = self.load_state_dict(remapped, strict=False) if missing: raise KeyError(f"[VAE] Missing keys: {missing}") if unexpected: raise KeyError(f"[VAE] Unexpected keys: {unexpected}") def encode(self, images, deterministic: bool = True, generator: torch.Generator = None): moments = self.quant_conv(self.encoder(images)) mean, logvar = moments.chunk(2, dim=1) if deterministic: latents = mean else: noise = torch.randn(mean.shape, dtype=mean.dtype, device=mean.device, generator=generator) latents = mean + torch.exp(0.5 * logvar) * noise B, C, H, W = latents.shape latents = latents.view(B, C, H // 2, 2, W // 2, 2).permute(0, 1, 3, 5, 2, 4) latents = latents.reshape(B, C * 4, H // 2, W // 2) bn_mean = self.bn.running_mean.view(1, -1, 1, 1).to(latents.device, latents.dtype) bn_std = torch.sqrt(self.bn.running_var.view(1, -1, 1, 1) + self.bn.eps).to(latents.device, latents.dtype) return ((latents - bn_mean) / bn_std).to(torch.float32).flatten(2).transpose(1, 2).contiguous() return rgba # --------------------------------------------------------------------------- # BiRefNet background removal (Swin-L + ASPP-deformable decoder) # --------------------------------------------------------------------------- # -- timm-style helpers, inlined to avoid a timm dependency -------------------- def _trunc_normal_(tensor: torch.Tensor, mean: float = 0.0, std: float = 1.0, a: float = -2.0, b: float = 2.0) -> torch.Tensor: # Initialization helper — only used at __init__ time, the released ckpt # overwrites everything in load_safetensors so the exact distribution here # is unimportant. with torch.no_grad(): tensor.normal_(mean, std).clamp_(mean + a * std, mean + b * std) return tensor # -- Swin Transformer (Swin-Large preset) ------------------------------------- class _SwinMlp(nn.Module): def __init__(self, in_features, hidden_features=None, out_features=None): super().__init__() hidden_features = hidden_features or in_features out_features = out_features or in_features self.fc1 = nn.Linear(in_features, hidden_features) self.act = nn.GELU() self.fc2 = nn.Linear(hidden_features, out_features) def forward(self, x): return self.fc2(self.act(self.fc1(x))) def _window_partition(x, window_size): B, H, W, C = x.shape x = x.view(B, H // window_size, window_size, W // window_size, window_size, C) return x.permute(0, 1, 3, 2, 4, 5).contiguous().view(-1, window_size, window_size, C) def _window_reverse(windows, window_size, H, W): B = int(windows.shape[0] / (H * W / window_size / window_size)) x = windows.view(B, H // window_size, W // window_size, window_size, window_size, -1) return x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, H, W, -1) class _WindowAttention(nn.Module): def __init__(self, dim, window_size, num_heads): super().__init__() self.dim = dim self.window_size = window_size # (Wh, Ww) self.num_heads = num_heads head_dim = dim // num_heads self.scale = head_dim ** -0.5 self.relative_position_bias_table = nn.Parameter( torch.zeros((2 * window_size[0] - 1) * (2 * window_size[1] - 1), num_heads)) coords_h = torch.arange(window_size[0]) coords_w = torch.arange(window_size[1]) coords = torch.stack(torch.meshgrid([coords_h, coords_w], indexing="ij")) coords_flatten = torch.flatten(coords, 1) relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :] relative_coords = relative_coords.permute(1, 2, 0).contiguous() relative_coords[:, :, 0] += window_size[0] - 1 relative_coords[:, :, 1] += window_size[1] - 1 relative_coords[:, :, 0] *= 2 * window_size[1] - 1 self.register_buffer("relative_position_index", relative_coords.sum(-1)) self.qkv = nn.Linear(dim, dim * 3, bias=True) self.proj = nn.Linear(dim, dim) _trunc_normal_(self.relative_position_bias_table, std=0.02) def forward(self, x, mask=None): B_, N, C = x.shape qkv = self.qkv(x).reshape(B_, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) q, k, v = qkv[0], qkv[1], qkv[2] q = q * self.scale attn = q @ k.transpose(-2, -1) bias = self.relative_position_bias_table[self.relative_position_index.view(-1)].view( self.window_size[0] * self.window_size[1], self.window_size[0] * self.window_size[1], -1) attn = attn + bias.permute(2, 0, 1).contiguous().unsqueeze(0) if mask is not None: nW = mask.shape[0] attn = attn.view(B_ // nW, nW, self.num_heads, N, N) + mask.unsqueeze(1).unsqueeze(0) attn = attn.view(-1, self.num_heads, N, N) attn = attn.softmax(dim=-1) x = (attn @ v).transpose(1, 2).reshape(B_, N, C) return self.proj(x) class _SwinBlock(nn.Module): def __init__(self, dim, num_heads, window_size, shift_size, mlp_ratio=4.0): super().__init__() self.dim = dim self.window_size = window_size self.shift_size = shift_size self.norm1 = nn.LayerNorm(dim) self.attn = _WindowAttention(dim, (window_size, window_size), num_heads) self.norm2 = nn.LayerNorm(dim) self.mlp = _SwinMlp(dim, int(dim * mlp_ratio)) self.H = None self.W = None def forward(self, x, mask_matrix): B, L, C = x.shape H, W = self.H, self.W shortcut = x x = self.norm1(x).view(B, H, W, C) pad_r = (self.window_size - W % self.window_size) % self.window_size pad_b = (self.window_size - H % self.window_size) % self.window_size x = F.pad(x, (0, 0, 0, pad_r, 0, pad_b)) _, Hp, Wp, _ = x.shape if self.shift_size > 0: shifted_x = torch.roll(x, shifts=(-self.shift_size, -self.shift_size), dims=(1, 2)) attn_mask = mask_matrix else: shifted_x = x attn_mask = None x_windows = _window_partition(shifted_x, self.window_size).view( -1, self.window_size * self.window_size, C) attn_windows = self.attn(x_windows, mask=attn_mask).view( -1, self.window_size, self.window_size, C) shifted_x = _window_reverse(attn_windows, self.window_size, Hp, Wp) if self.shift_size > 0: x = torch.roll(shifted_x, shifts=(self.shift_size, self.shift_size), dims=(1, 2)) else: x = shifted_x if pad_r > 0 or pad_b > 0: x = x[:, :H, :W, :].contiguous() x = x.view(B, H * W, C) x = shortcut + x x = x + self.mlp(self.norm2(x)) return x class _PatchMerging(nn.Module): def __init__(self, dim): super().__init__() self.dim = dim self.reduction = nn.Linear(4 * dim, 2 * dim, bias=False) self.norm = nn.LayerNorm(4 * dim) def forward(self, x, H, W): B, L, C = x.shape x = x.view(B, H, W, C) if H % 2 == 1 or W % 2 == 1: x = F.pad(x, (0, 0, 0, W % 2, 0, H % 2)) x0 = x[:, 0::2, 0::2, :] x1 = x[:, 1::2, 0::2, :] x2 = x[:, 0::2, 1::2, :] x3 = x[:, 1::2, 1::2, :] x = torch.cat([x0, x1, x2, x3], -1).view(B, -1, 4 * C) return self.reduction(self.norm(x)) class _SwinBasicLayer(nn.Module): def __init__(self, dim, depth, num_heads, window_size, mlp_ratio=4.0, downsample=True): super().__init__() self.window_size = window_size self.shift_size = window_size // 2 self.depth = depth self.blocks = nn.ModuleList([ _SwinBlock(dim=dim, num_heads=num_heads, window_size=window_size, shift_size=0 if (i % 2 == 0) else window_size // 2, mlp_ratio=mlp_ratio) for i in range(depth) ]) self.downsample = _PatchMerging(dim) if downsample else None def forward(self, x, H, W): Hp = int(math.ceil(H / self.window_size)) * self.window_size Wp = int(math.ceil(W / self.window_size)) * self.window_size img_mask = torch.zeros((1, Hp, Wp, 1), device=x.device) h_slices = (slice(0, -self.window_size), slice(-self.window_size, -self.shift_size), slice(-self.shift_size, None)) w_slices = (slice(0, -self.window_size), slice(-self.window_size, -self.shift_size), slice(-self.shift_size, None)) cnt = 0 for h in h_slices: for w in w_slices: img_mask[:, h, w, :] = cnt cnt += 1 mask_windows = _window_partition(img_mask, self.window_size).view(-1, self.window_size ** 2) attn_mask = mask_windows.unsqueeze(1) - mask_windows.unsqueeze(2) attn_mask = attn_mask.masked_fill(attn_mask != 0, float(-100.0)) \ .masked_fill(attn_mask == 0, float(0.0)).to(x.dtype) for blk in self.blocks: blk.H, blk.W = H, W x = blk(x, attn_mask) if self.downsample is not None: x_down = self.downsample(x, H, W) Wh, Ww = (H + 1) // 2, (W + 1) // 2 return x, H, W, x_down, Wh, Ww return x, H, W, x, H, W class _SwinPatchEmbed(nn.Module): def __init__(self, patch_size=4, in_channels=3, embed_dim=192): super().__init__() self.patch_size = (patch_size, patch_size) self.proj = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size) self.norm = nn.LayerNorm(embed_dim) self.embed_dim = embed_dim def forward(self, x): _, _, H, W = x.shape if W % self.patch_size[1] != 0: x = F.pad(x, (0, self.patch_size[1] - W % self.patch_size[1])) if H % self.patch_size[0] != 0: x = F.pad(x, (0, 0, 0, self.patch_size[0] - H % self.patch_size[0])) x = self.proj(x) Wh, Ww = x.size(2), x.size(3) x = x.flatten(2).transpose(1, 2) x = self.norm(x) return x.transpose(1, 2).view(-1, self.embed_dim, Wh, Ww) class _SwinLarge(nn.Module): """Swin-Large backbone matching the BiRefNet HF release. embed_dim=192, depths=[2,2,18,2], num_heads=[6,12,24,48], window_size=12. """ def __init__(self): super().__init__() embed_dim = 192 depths = [2, 2, 18, 2] num_heads = [6, 12, 24, 48] window_size = 12 self.num_layers = len(depths) self.embed_dim = embed_dim self.patch_embed = _SwinPatchEmbed(patch_size=4, in_channels=3, embed_dim=embed_dim) self.layers = nn.ModuleList([ _SwinBasicLayer( dim=int(embed_dim * 2 ** i), depth=depths[i], num_heads=num_heads[i], window_size=window_size, downsample=(i < self.num_layers - 1), ) for i in range(self.num_layers) ]) num_features = [int(embed_dim * 2 ** i) for i in range(self.num_layers)] self.num_features = num_features for i in range(self.num_layers): self.add_module(f"norm{i}", nn.LayerNorm(num_features[i])) def forward(self, x): x = self.patch_embed(x) Wh, Ww = x.size(2), x.size(3) x = x.flatten(2).transpose(1, 2) outs = [] for i in range(self.num_layers): x_out, H, W, x, Wh, Ww = self.layers[i](x, Wh, Ww) norm_layer = getattr(self, f"norm{i}") x_out = norm_layer(x_out) out = x_out.view(-1, H, W, self.num_features[i]).permute(0, 3, 1, 2).contiguous() outs.append(out) return tuple(outs) # -- ASPP-Deformable ----------------------------------------------------------- class _DeformableConv2d(nn.Module): def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False): super().__init__() if isinstance(kernel_size, int): kernel_size = (kernel_size, kernel_size) self.stride = (stride, stride) if isinstance(stride, int) else stride self.padding = padding self.offset_conv = nn.Conv2d(in_channels, 2 * kernel_size[0] * kernel_size[1], kernel_size=kernel_size, stride=stride, padding=padding, bias=True) self.modulator_conv = nn.Conv2d(in_channels, 1 * kernel_size[0] * kernel_size[1], kernel_size=kernel_size, stride=stride, padding=padding, bias=True) self.regular_conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias) def forward(self, x): offset = self.offset_conv(x) modulator = 2.0 * torch.sigmoid(self.modulator_conv(x)) return deform_conv2d( input=x, offset=offset, weight=self.regular_conv.weight, bias=self.regular_conv.bias, padding=self.padding, mask=modulator, stride=self.stride, ) class _ASPPModuleDeformable(nn.Module): def __init__(self, in_channels, planes, kernel_size, padding): super().__init__() self.atrous_conv = _DeformableConv2d(in_channels, planes, kernel_size=kernel_size, stride=1, padding=padding, bias=False) self.bn = nn.BatchNorm2d(planes) self.relu = nn.ReLU(inplace=True) def forward(self, x): return self.relu(self.bn(self.atrous_conv(x))) class _ASPPDeformable(nn.Module): def __init__(self, in_channels, out_channels=None, parallel_block_sizes=(1, 3, 7)): super().__init__() if out_channels is None: out_channels = in_channels inter = 256 self.aspp1 = _ASPPModuleDeformable(in_channels, inter, 1, padding=0) self.aspp_deforms = nn.ModuleList([ _ASPPModuleDeformable(in_channels, inter, k, padding=k // 2) for k in parallel_block_sizes ]) self.global_avg_pool = nn.Sequential( nn.AdaptiveAvgPool2d((1, 1)), nn.Conv2d(in_channels, inter, 1, stride=1, bias=False), nn.BatchNorm2d(inter), nn.ReLU(inplace=True), ) self.conv1 = nn.Conv2d(inter * (2 + len(self.aspp_deforms)), out_channels, 1, bias=False) self.bn1 = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU(inplace=True) def forward(self, x): x1 = self.aspp1(x) x_aspp_deforms = [m(x) for m in self.aspp_deforms] x5 = self.global_avg_pool(x) x5 = F.interpolate(x5, size=x1.size()[2:], mode='bilinear', align_corners=True) y = torch.cat((x1, *x_aspp_deforms, x5), dim=1) return self.relu(self.bn1(self.conv1(y))) # -- Decoder blocks ------------------------------------------------------------ class _BasicDecBlk(nn.Module): def __init__(self, in_channels, out_channels, inter_channels=64): super().__init__() self.conv_in = nn.Conv2d(in_channels, inter_channels, 3, 1, padding=1) self.bn_in = nn.BatchNorm2d(inter_channels) self.relu_in = nn.ReLU(inplace=True) self.dec_att = _ASPPDeformable(in_channels=inter_channels) self.conv_out = nn.Conv2d(inter_channels, out_channels, 3, 1, padding=1) self.bn_out = nn.BatchNorm2d(out_channels) def forward(self, x): x = self.relu_in(self.bn_in(self.conv_in(x))) x = self.dec_att(x) x = self.bn_out(self.conv_out(x)) return x class _BasicLatBlk(nn.Module): def __init__(self, in_channels, out_channels): super().__init__() self.conv = nn.Conv2d(in_channels, out_channels, 1, 1, 0) def forward(self, x): return self.conv(x) class _SimpleConvs(nn.Module): def __init__(self, in_channels, out_channels, inter_channels=64): super().__init__() self.conv1 = nn.Conv2d(in_channels, inter_channels, 3, 1, 1) self.conv_out = nn.Conv2d(inter_channels, out_channels, 3, 1, 1) def forward(self, x): return self.conv_out(self.conv1(x)) # -- Image → patch-stack helper ----------------------------------------------- def _image2patches(image, patch_ref): """`einops` rearrange 'b c (hg h) (wg w) -> b (c hg wg) h w' replacement. Splits `image` into hg×wg non-overlapping patches and stacks them along the channel axis. `hg`/`wg` are inferred from image and patch_ref sizes. """ b, c, h_full, w_full = image.shape hg, wg = h_full // patch_ref.shape[-2], w_full // patch_ref.shape[-1] h, w = h_full // hg, w_full // wg # (b, c, hg*h, wg*w) -> (b, c, hg, h, wg, w) -> (b, c, hg, wg, h, w) -> (b, c*hg*wg, h, w) return image.view(b, c, hg, h, wg, w).permute(0, 1, 2, 4, 3, 5).reshape(b, c * hg * wg, h, w) # -- Decoder + top-level BiRefNet --------------------------------------------- class _BiRefNetDecoder(nn.Module): def __init__(self, channels=(3072, 1536, 768, 384)): super().__init__() c = channels # high-to-low resolution channel counts # input-modulator blocks (one per resolution; channels are # `3 * patch_grid**2`, see _image2patches docstring). self.ipt_blk5 = _SimpleConvs(2 ** 10 * 3, c[0] // 8, inter_channels=64) self.ipt_blk4 = _SimpleConvs(2 ** 8 * 3, c[0] // 8, inter_channels=64) self.ipt_blk3 = _SimpleConvs(2 ** 6 * 3, c[1] // 8, inter_channels=64) self.ipt_blk2 = _SimpleConvs(2 ** 4 * 3, c[2] // 8, inter_channels=64) self.ipt_blk1 = _SimpleConvs(2 ** 0 * 3, c[3] // 8, inter_channels=64) self.decoder_block4 = _BasicDecBlk(c[0] + c[0] // 8, c[1]) self.decoder_block3 = _BasicDecBlk(c[1] + c[0] // 8, c[2]) self.decoder_block2 = _BasicDecBlk(c[2] + c[1] // 8, c[3]) self.decoder_block1 = _BasicDecBlk(c[3] + c[2] // 8, c[3] // 2) self.conv_out1 = nn.Sequential(nn.Conv2d(c[3] // 2 + c[3] // 8, 1, 1, 1, 0)) self.lateral_block4 = _BasicLatBlk(c[1], c[1]) self.lateral_block3 = _BasicLatBlk(c[2], c[2]) self.lateral_block2 = _BasicLatBlk(c[3], c[3]) # multi-scale supervision heads (training only — kept for state_dict # parity with the released checkpoint; not consumed at inference). self.conv_ms_spvn_4 = nn.Conv2d(c[1], 1, 1, 1, 0) self.conv_ms_spvn_3 = nn.Conv2d(c[2], 1, 1, 1, 0) self.conv_ms_spvn_2 = nn.Conv2d(c[3], 1, 1, 1, 0) # gradient-decoder-triggering (gdt) attention: used at inference to # gate p4/p3/p2. _N = 16 def _gdt_branch(in_c): return nn.Sequential(nn.Conv2d(in_c, _N, 3, 1, 1), nn.BatchNorm2d(_N), nn.ReLU(inplace=True)) self.gdt_convs_4 = _gdt_branch(c[1]) self.gdt_convs_3 = _gdt_branch(c[2]) self.gdt_convs_2 = _gdt_branch(c[3]) def _head_1x1(): return nn.Sequential(nn.Conv2d(_N, 1, 1, 1, 0)) # multi-scale supervision heads on the gdt branch (training only) self.gdt_convs_pred_4 = _head_1x1() self.gdt_convs_pred_3 = _head_1x1() self.gdt_convs_pred_2 = _head_1x1() # attention heads self.gdt_convs_attn_4 = _head_1x1() self.gdt_convs_attn_3 = _head_1x1() self.gdt_convs_attn_2 = _head_1x1() def forward(self, x, x1, x2, x3, x4): x4 = torch.cat((x4, self.ipt_blk5(_image2patches(x, x4))), 1) p4 = self.decoder_block4(x4) p4 = p4 * self.gdt_convs_attn_4(self.gdt_convs_4(p4)).sigmoid() _p4 = F.interpolate(p4, size=x3.shape[2:], mode='bilinear', align_corners=True) _p3 = _p4 + self.lateral_block4(x3) _p3 = torch.cat((_p3, self.ipt_blk4(_image2patches(x, _p3))), 1) p3 = self.decoder_block3(_p3) p3 = p3 * self.gdt_convs_attn_3(self.gdt_convs_3(p3)).sigmoid() _p3 = F.interpolate(p3, size=x2.shape[2:], mode='bilinear', align_corners=True) _p2 = _p3 + self.lateral_block3(x2) _p2 = torch.cat((_p2, self.ipt_blk3(_image2patches(x, _p2))), 1) p2 = self.decoder_block2(_p2) p2 = p2 * self.gdt_convs_attn_2(self.gdt_convs_2(p2)).sigmoid() _p2 = F.interpolate(p2, size=x1.shape[2:], mode='bilinear', align_corners=True) _p1 = _p2 + self.lateral_block2(x1) _p1 = torch.cat((_p1, self.ipt_blk2(_image2patches(x, _p1))), 1) _p1 = self.decoder_block1(_p1) _p1 = F.interpolate(_p1, size=x.shape[2:], mode='bilinear', align_corners=True) _p1 = torch.cat((_p1, self.ipt_blk1(_image2patches(x, _p1))), 1) return self.conv_out1(_p1) class BiRefNet(nn.Module): """BiRefNet (ZhengPeng7/BiRefNet) with Swin-L backbone, multi-scale input concatenation, ASPP-deformable squeeze block, and the 4-level input-modulating decoder used in the v1 release. `forward(x)` returns a single 1-channel alpha map in `[0, 1]` (post-sigmoid). `remove_background(pil_img)` is the PIL helper used by the pipeline — accepts a PIL RGB image and returns an RGBA copy with the predicted matte in the alpha channel. """ INPUT_SIZE = (1024, 1024) # backbone channel counts post mul_scl_ipt='cat' (doubled from raw Swin-L) _CHANNELS = (3072, 1536, 768, 384) # ImageNet normalization used by the BiRefNet recipe _NORM_MEAN = (0.485, 0.456, 0.406) _NORM_STD = (0.229, 0.224, 0.225) def __init__(self): super().__init__() self.bb = _SwinLarge() cxt = list(self._CHANNELS[1:][::-1][-3:]) # = [384, 768, 1536] self.squeeze_module = nn.Sequential( _BasicDecBlk(self._CHANNELS[0] + sum(cxt), self._CHANNELS[0]) ) self.decoder = _BiRefNetDecoder(channels=self._CHANNELS) @property def device(self): return next(self.parameters()).device @property def dtype(self): return next(self.parameters()).dtype def _forward_enc(self, x): x1, x2, x3, x4 = self.bb(x) # mul_scl_ipt='cat': re-run backbone at half resolution, concat features B, C, H, W = x.shape x1_, x2_, x3_, x4_ = self.bb(F.interpolate(x, size=(H // 2, W // 2), mode='bilinear', align_corners=True)) x1 = torch.cat([x1, F.interpolate(x1_, size=x1.shape[2:], mode='bilinear', align_corners=True)], 1) x2 = torch.cat([x2, F.interpolate(x2_, size=x2.shape[2:], mode='bilinear', align_corners=True)], 1) x3 = torch.cat([x3, F.interpolate(x3_, size=x3.shape[2:], mode='bilinear', align_corners=True)], 1) x4 = torch.cat([x4, F.interpolate(x4_, size=x4.shape[2:], mode='bilinear', align_corners=True)], 1) # cxt: upsample x1/x2/x3 to x4 spatial and concat for the squeeze input x4 = torch.cat([ F.interpolate(x1, size=x4.shape[2:], mode='bilinear', align_corners=True), F.interpolate(x2, size=x4.shape[2:], mode='bilinear', align_corners=True), F.interpolate(x3, size=x4.shape[2:], mode='bilinear', align_corners=True), x4, ], 1) return x1, x2, x3, x4 def forward(self, x): x1, x2, x3, x4 = self._forward_enc(x) x4 = self.squeeze_module(x4) logits = self.decoder(x, x1, x2, x3, x4) return torch.sigmoid(logits) def load_safetensors(self, path: str) -> None: sd = safetensors.torch.load_file(path) # The decoder's gdt_convs_pred_* / conv_ms_spvn_* heads are training-only # but are kept as submodules for state_dict parity. strict=True works. missing, unexpected = self.load_state_dict(sd, strict=False) if unexpected: raise KeyError(f"[birefnet] unexpected keys (e.g. {unexpected[:3]})") if missing: raise KeyError(f"[birefnet] missing keys (e.g. {missing[:3]})") @torch.no_grad() def remove_background(self, image) -> "Image.Image": from PIL import Image if image.mode != "RGB": image = image.convert("RGB") W, H = image.size arr = np.array(image, dtype=np.float32) / 255.0 t = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0) t = F.interpolate(t, size=self.INPUT_SIZE, mode='bilinear', align_corners=True) mean = torch.tensor(self._NORM_MEAN).view(1, 3, 1, 1) std = torch.tensor(self._NORM_STD).view(1, 3, 1, 1) t = ((t - mean) / std).to(device=self.device, dtype=self.dtype) alpha = self.forward(t) alpha = F.interpolate(alpha.float(), size=(H, W), mode='bilinear', align_corners=True)[0, 0] a = (alpha.clamp(0, 1) * 255).to(torch.uint8).cpu().numpy() rgba = image.copy() rgba.putalpha(Image.fromarray(a, mode="L")) return rgba # --------------------------------------------------------------------------- # Shared transformer helpers # --------------------------------------------------------------------------- class LayerNorm32(nn.LayerNorm): def forward(self, x): origin_dtype = x.dtype return F.layer_norm( x.float(), self.normalized_shape, self.weight.float() if self.weight is not None else None, self.bias.float() if self.bias is not None else None, self.eps, ).to(origin_dtype) class MultiHeadRMSNorm(nn.Module): def __init__(self, dim, heads): super().__init__() self.scale = dim ** 0.5 self.gamma = nn.Parameter(torch.ones(heads, dim)) def forward(self, x): origin_dtype = x.dtype return (F.normalize(x.float(), dim=-1) * self.gamma.float() * self.scale).to(origin_dtype) def apply_rotary_emb(hidden_states, freqs): x_rotated = torch.view_as_complex(hidden_states.float().reshape(*hidden_states.shape[:-1], -1, 2)) x_rotated = x_rotated * freqs x_out = torch.view_as_real(x_rotated).reshape(*x_rotated.shape[:-1], -1) return x_out.type_as(hidden_states) def clamp_mul(x, f): f_t = f.tanh() return x * f_t + x.detach() * (f - f_t) def scaled_dot_product_attention(qkv=None, q=None, k=None, v=None, kv=None): if qkv is not None: q, k, v = qkv.unbind(dim=2) elif kv is not None: k, v = kv.unbind(dim=2) q, k, v = q.permute(0, 2, 1, 3), k.permute(0, 2, 1, 3), v.permute(0, 2, 1, 3) return F.scaled_dot_product_attention(q, k, v).permute(0, 2, 1, 3) # --------------------------------------------------------------------------- # Positional embeddings # --------------------------------------------------------------------------- class RePo3DRotaryEmbedding(nn.Module): def __init__(self, model_channels, num_heads, head_dim, repo_hidden_ratio=0.125, max_freq=16.0): super().__init__() self.num_heads = num_heads self.head_dim = head_dim repo_hidden_size = int(model_channels * repo_hidden_ratio) self.norm = LayerNorm32(model_channels) self.gate_map = nn.Linear(model_channels, repo_hidden_size, bias=False) self.content_map = nn.Linear(model_channels, repo_hidden_size, bias=False) self.act = nn.SiLU() self.final_map = nn.Linear(repo_hidden_size, 3 * num_heads, bias=False) self.dim_0 = 2 * (head_dim // 6) self.dim_1 = 2 * (head_dim // 6) self.dim_2 = head_dim - self.dim_0 - self.dim_1 dims = [self.dim_0, self.dim_1, self.dim_2] freqs_list = [] for d in dims: freq_dim = d // 2 freqs_list.append(torch.linspace(1.0, float(max_freq), steps=freq_dim, dtype=torch.float32)) self.freqs_0 = nn.Parameter(freqs_list[0]) self.freqs_1 = nn.Parameter(freqs_list[1]) self.freqs_2 = nn.Parameter(freqs_list[2]) def forward(self, hidden_states): h = self.norm(hidden_states) feat = self.act(self.gate_map(h)) * self.content_map(h) out = self.final_map(feat) B, L, _ = out.shape delta_pos = out.reshape(B, L, self.num_heads, 3) ang_0 = clamp_mul(delta_pos[..., 0].unsqueeze(-1), self.freqs_0) * torch.pi ang_1 = clamp_mul(delta_pos[..., 1].unsqueeze(-1), self.freqs_1) * torch.pi ang_2 = clamp_mul(delta_pos[..., 2].unsqueeze(-1), self.freqs_2) * torch.pi ang = torch.cat([ang_0, ang_1, ang_2], dim=-1).float() # fp32 needed for torch.polar → complex64 return torch.polar(torch.ones_like(ang), ang).type(torch.complex64) class PcdAbsolutePositionEmbedder(nn.Module): def __init__(self, channels: int, in_channels: int = 3, max_res: int = 16): super().__init__() self.channels = channels self.in_channels = in_channels self.max_res = max_res self.freq_dim = channels // in_channels // 2 def _freqs(self, device): freqs_2exp = torch.arange(self.max_res, dtype=torch.float32, device=device) res_dim = max(0, self.freq_dim - self.max_res) freqs_res = (torch.arange(res_dim, dtype=torch.float32, device=device) / max(res_dim, 1) * self.max_res if res_dim > 0 else torch.empty(0, device=device)) freqs = torch.cat([freqs_2exp, freqs_res], dim=0)[:self.freq_dim] return torch.pow(2.0, freqs) def forward(self, x: torch.Tensor) -> torch.Tensor: orig_dtype = x.dtype x = x.float() *dims, D = x.shape out = torch.outer(x.reshape(-1), self._freqs(x.device)) * 2 * torch.pi out = torch.cat([out.sin(), out.cos()], dim=-1).reshape(*dims, -1) if out.shape[-1] < self.channels: out = torch.cat([out, torch.zeros(*dims, self.channels - out.shape[-1], device=out.device, dtype=out.dtype)], dim=-1) return out.to(orig_dtype) class PcdAbsolutePositionEmbedderV2(nn.Module): def __init__(self, channels: int, in_channels: int = 3, max_res: int = 10): super().__init__() self.channels = channels self.in_channels = in_channels self.max_res = max_res self.freq_dim = channels // in_channels // 2 def _freqs(self, device): logs = torch.linspace(0.0, float(self.max_res), steps=self.freq_dim, dtype=torch.float32, device=device) return torch.pow(2.0, logs) def forward(self, x: torch.Tensor) -> torch.Tensor: orig_dtype = x.dtype x = x.float() N, D = x.shape ang = x.unsqueeze(-1) * self._freqs(x.device) * torch.pi embed = torch.cat([torch.sin(ang), torch.cos(ang)], dim=-1).reshape(N, -1) if embed.shape[1] < self.channels: embed = torch.cat([embed, torch.zeros(N, self.channels - embed.shape[1], device=embed.device, dtype=embed.dtype)], dim=-1) return embed.to(orig_dtype) # --------------------------------------------------------------------------- # Transformer building blocks # --------------------------------------------------------------------------- class FeedForwardNet(nn.Module): def __init__(self, channels, mlp_ratio=4.0, channels_out=None): super().__init__() self.mlp = nn.Sequential( nn.Linear(channels, int(channels * mlp_ratio)), nn.GELU(approximate="tanh"), nn.Linear(int(channels * mlp_ratio), channels if channels_out is None else channels_out), ) def forward(self, x): return self.mlp(x) class MLP(nn.Module): def __init__(self, channels: int, inner_channels: int, channels_out: Optional[int] = None, mlp_layer_num: int = 2): super().__init__() layers = [] for i in range(mlp_layer_num - 1): layers.append(nn.Linear(channels if i == 0 else inner_channels, inner_channels)) layers.append(nn.GELU(approximate="tanh")) layers.append(nn.Linear(inner_channels, channels if channels_out is None else channels_out)) self.mlp = nn.Sequential(*layers) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.mlp(x) class RopeMultiHeadAttention(nn.Module): def __init__(self, channels, num_heads, ctx_channels=None, type="self", attn_mode="full", qkv_bias=True, qk_rms_norm=False, use_rope=False): super().__init__() self.channels = channels self.num_heads = num_heads self.head_dim = channels // num_heads self.ctx_channels = ctx_channels if ctx_channels is not None else channels self._type = type self.qk_rms_norm = qk_rms_norm self.use_rope = use_rope if self._type == "self": self.qkv = nn.Linear(channels, channels * 3, bias=qkv_bias) else: self.q = nn.Linear(channels, channels, bias=qkv_bias) self.kv = nn.Linear(self.ctx_channels, channels * 2, bias=qkv_bias) if self.qk_rms_norm: self.q_norm = MultiHeadRMSNorm(self.head_dim, num_heads) self.k_norm = MultiHeadRMSNorm(self.head_dim, num_heads) self.out = nn.Linear(channels, channels) def forward(self, x, context=None, rope_emb=None): B, L, C = x.shape if self._type == "self": qkv = self.qkv(x).reshape(B, L, 3, self.num_heads, self.head_dim) q, k, v = qkv.unbind(2) if self.use_rope: q = apply_rotary_emb(q, rope_emb) k = apply_rotary_emb(k, rope_emb) else: q = self.q(x).reshape(B, L, self.num_heads, self.head_dim) if context is None: raise ValueError("Context must be provided for cross attention") kv = self.kv(context).reshape(B, context.shape[1], 2, self.num_heads, self.head_dim) k, v = kv.unbind(2) if self.qk_rms_norm: q = self.q_norm(q) k = self.k_norm(k) h = scaled_dot_product_attention(q=q, k=k, v=v) return self.out(h.reshape(B, L, C)) class MultiHeadAttention(nn.Module): def __init__(self, channels, num_heads, ctx_channels=None, type="self", attn_mode="full", qkv_bias=True, qk_rms_norm=False): super().__init__() assert channels % num_heads == 0 assert type in ["self", "cross"] assert attn_mode == "full" self.channels = channels self.head_dim = channels // num_heads self.ctx_channels = ctx_channels if ctx_channels is not None else channels self.num_heads = num_heads self._type = type self.qk_rms_norm = qk_rms_norm if self._type == "self": self.to_qkv = nn.Linear(channels, channels * 3, bias=qkv_bias) else: self.to_q = nn.Linear(channels, channels, bias=qkv_bias) self.to_kv = nn.Linear(self.ctx_channels, channels * 2, bias=qkv_bias) if self.qk_rms_norm: self.q_rms_norm = MultiHeadRMSNorm(self.head_dim, num_heads) self.k_rms_norm = MultiHeadRMSNorm(self.head_dim, num_heads) self.to_out = nn.Linear(channels, channels) def forward(self, x, context=None): B, L, C = x.shape if self._type == "self": qkv = self.to_qkv(x).reshape(B, L, 3, self.num_heads, -1) if self.qk_rms_norm: q, k, v = qkv.unbind(dim=2) q = self.q_rms_norm(q) k = self.k_rms_norm(k) qkv = torch.stack([q, k, v], dim=2) h = scaled_dot_product_attention(qkv=qkv) else: Lkv = context.shape[1] q = self.to_q(x).reshape(B, L, self.num_heads, -1) kv = self.to_kv(context).reshape(B, Lkv, 2, self.num_heads, -1) if self.qk_rms_norm: q = self.q_rms_norm(q) k, v = kv.unbind(dim=2) k = self.k_rms_norm(k) h = scaled_dot_product_attention(q=q, k=k, v=v) else: h = scaled_dot_product_attention(q=q, kv=kv) return self.to_out(h.reshape(B, L, -1)) class UnifiedTransformerBlock(nn.Module): def __init__(self, channels, num_heads, mlp_ratio=4.0, attn_mode="full", use_checkpoint=False, use_rope=False, qk_rms_norm=False, qkv_bias=True, modulation=True, share_mod=False, use_shift_table=False): super().__init__() self.modulation = modulation self.share_mod = share_mod self.norm1 = LayerNorm32(channels, elementwise_affine=not modulation, eps=1e-6) self.norm2 = LayerNorm32(channels, elementwise_affine=not modulation, eps=1e-6) self.attn = RopeMultiHeadAttention(channels, num_heads=num_heads, type="self", attn_mode=attn_mode, qkv_bias=qkv_bias, use_rope=use_rope, qk_rms_norm=qk_rms_norm) self.mlp = FeedForwardNet(channels, mlp_ratio=mlp_ratio) if modulation: if not share_mod: self.adaLN_modulation = nn.Sequential(nn.SiLU(), nn.Linear(channels, 6 * channels, bias=True)) self.shift_table = nn.Parameter(torch.randn(1, 6 * channels) / channels ** 0.5) if use_shift_table else None def forward(self, x, mod=None, rotary_emb=None): if self.modulation: if not self.share_mod: mod = self.adaLN_modulation(mod) if hasattr(self, 'shift_table') and self.shift_table is not None: mod = mod + self.shift_table.type(mod.dtype) shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = mod.chunk(6, dim=1) h = self.norm1(x) h = h * (1 + scale_msa.unsqueeze(1)) + shift_msa.unsqueeze(1) h = self.attn(h, rope_emb=rotary_emb) x = x + h * gate_msa.unsqueeze(1) h = self.norm2(x) h = h * (1 + scale_mlp.unsqueeze(1)) + shift_mlp.unsqueeze(1) x = x + self.mlp(h) * gate_mlp.unsqueeze(1) else: x = x + self.attn(self.norm1(x), rope_emb=rotary_emb) x = x + self.mlp(self.norm2(x)) return x # --------------------------------------------------------------------------- # Quasi-random sampling utilities # --------------------------------------------------------------------------- PRIMES = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53] def radical_inverse(base, n): val = 0 inv_base = 1.0 / base inv_base_n = inv_base while n > 0: digit = n % base val += digit * inv_base_n n //= base inv_base_n *= inv_base return val def halton_sequence(dim, n): return [radical_inverse(PRIMES[dim], n) for dim in range(dim)] def hammersley_sequence(dim, n, num_samples): return [n / num_samples] + halton_sequence(dim - 1, n) @torch.no_grad() def sample_probs(probs, counts, algo="systematic"): batch_shape = counts.shape B = counts.numel() P = probs.size(-1) device = probs.device probs = probs.view(B, P) counts = counts.view(B) probs = probs.to(torch.float32).clamp_min_(0) row_sums = probs.sum(1, keepdim=True) zero_mask = row_sums.eq(0) probs = probs / row_sums.clamp_min_(1) if zero_mask.any(): probs = probs.clone() probs[zero_mask.expand_as(probs)] = 1.0 / P counts = counts.to(device=device, dtype=torch.long) out = torch.zeros(B, P, dtype=torch.long, device=device) cdf = probs.cumsum(dim=1).clamp(max=1.0 - 1e-12) unique_n, inv = counts.unique(sorted=False, return_inverse=True) for i, n in enumerate(unique_n.tolist()): if n == 0: continue rows = (inv == i).nonzero(as_tuple=False).squeeze(1) r = rows.numel() U0 = torch.rand(r, 1, device=device) / float(n) grid = torch.arange(n, device=device, dtype=torch.float32)[None, :] / float(n) us = (U0 + grid).clamp(max=1.0 - 1e-12) cdf_rows = cdf.index_select(0, rows) idx = torch.searchsorted(cdf_rows, us).clamp_max(probs.size(1) - 1) buf = torch.zeros(r, P, dtype=torch.float32, device=device) buf.scatter_add_(1, idx, torch.ones_like(idx, dtype=buf.dtype)) out.index_copy_(0, rows, buf.to(torch.long)) return out.view(*batch_shape, P) # --------------------------------------------------------------------------- # VAE decoders # --------------------------------------------------------------------------- class LevelEmbedder(nn.Module): def __init__(self, hidden_size, frequency_embedding_size=256, max_period=1024): super().__init__() self.mlp = nn.Sequential( nn.Linear(frequency_embedding_size, hidden_size, bias=True), nn.SiLU(), nn.Linear(hidden_size, hidden_size, bias=True), ) self.frequency_embedding_size = frequency_embedding_size self.max_period = max_period @staticmethod def level_embedding(t, dim, max_period=1024): half = dim // 2 freqs = torch.exp(-np.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half).to(device=t.device) args = t[:, None].float() * freqs[None] * 2 * torch.pi embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) if dim % 2: embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1) return embedding def forward(self, t): emb = self.level_embedding(t, self.frequency_embedding_size, self.max_period) return self.mlp(emb.to(self.mlp[0].weight.dtype)) class ModulatedTransformerCrossOnlyBlock(nn.Module): def __init__(self, channels, ctx_channels, num_heads, mlp_ratio=4.0, share_mod=False, qk_rms_norm_cross=True, qkv_bias=True): super().__init__() self.share_mod = share_mod self.norm1 = LayerNorm32(channels, elementwise_affine=False, eps=1e-6) self.norm2 = LayerNorm32(channels, elementwise_affine=False, eps=1e-6) self.cross_attn = MultiHeadAttention(channels, ctx_channels=ctx_channels, num_heads=num_heads, type="cross", attn_mode="full", qkv_bias=qkv_bias, qk_rms_norm=qk_rms_norm_cross) self.mlp = FeedForwardNet(channels, mlp_ratio=mlp_ratio) if not share_mod: self.adaLN_modulation = nn.Sequential(nn.SiLU(), nn.Linear(channels, 6 * channels, bias=True)) def forward(self, x, mod, context): if self.share_mod: shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = mod.chunk(6, dim=1) else: shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = self.adaLN_modulation(mod).chunk(6, dim=1) h = self.norm1(x) * (1 + scale_msa.unsqueeze(1)) + shift_msa.unsqueeze(1) x = x + self.cross_attn(h, context) * gate_msa.unsqueeze(1) h = self.norm2(x) * (1 + scale_mlp.unsqueeze(1)) + shift_mlp.unsqueeze(1) x = x + self.mlp(h) * gate_mlp.unsqueeze(1) return x class ModulatedCrossOnlyTransformerBase(nn.Module): def __init__(self, in_channels, model_channels, cond_channels, num_blocks, num_heads=None, num_head_channels=64, mlp_ratio=4.0, share_mod=False, additional_level_embed=False, qk_rms_norm_cross=True): super().__init__() self.model_channels = model_channels self.cond_channels = cond_channels self.num_blocks = num_blocks self.num_heads = num_heads or model_channels // num_head_channels self.mlp_ratio = mlp_ratio self.share_mod = share_mod self.qk_rms_norm_cross = qk_rms_norm_cross self.input_layer = nn.Linear(in_channels, model_channels) self.l_embedder = LevelEmbedder(model_channels) self.l_embedder2 = LevelEmbedder(model_channels, max_period=100) if additional_level_embed else None if share_mod: self.adaLN_modulation = nn.Sequential( nn.SiLU(), nn.Linear(model_channels, 6 * model_channels, bias=True)) if cond_channels is not None: self.blocks = nn.ModuleList([ ModulatedTransformerCrossOnlyBlock( model_channels, ctx_channels=cond_channels, num_heads=self.num_heads, mlp_ratio=self.mlp_ratio, qk_rms_norm_cross=self.qk_rms_norm_cross, share_mod=self.share_mod) for _ in range(num_blocks) ]) @property def dtype(self) -> torch.dtype: return next(self.parameters()).dtype @property def device(self) -> torch.device: return next(self.parameters()).device def forward(self, x, l, cond, l2=None): h = self.input_layer(x) l_emb = self.l_embedder(l) if self.l_embedder2 is not None and l2 is not None: l_emb = l_emb + self.l_embedder2(l2) if self.share_mod: l_emb = self.adaLN_modulation(l_emb) for block in self.blocks: h = block(h, l_emb, cond) return h class OctreeProbabilityFixedlenDecoder(ModulatedCrossOnlyTransformerBase): def __init__(self, model_channels, cond_channels, num_blocks, num_heads=None, num_head_channels=64, mlp_ratio=4.0, share_mod=False, additional_level_embed=False, qk_rms_norm_cross=True, *, no_norm=False): super().__init__( in_channels=model_channels, model_channels=model_channels, cond_channels=cond_channels, num_blocks=num_blocks, num_heads=num_heads, num_head_channels=num_head_channels, mlp_ratio=mlp_ratio, share_mod=share_mod, additional_level_embed=additional_level_embed, qk_rms_norm_cross=qk_rms_norm_cross, ) self.out_proj = nn.Linear(self.model_channels, 8) self.no_norm = no_norm self.in_proj = nn.Linear(3, self.model_channels) self.pos_embedder = PcdAbsolutePositionEmbedderV2(channels=model_channels, in_channels=3) def forward(self, x, l, cond, l2=None): d = self.dtype B, L, C = x.shape h = self.in_proj(x.to(d)) + self.pos_embedder(x.reshape(-1, 3)).reshape(B, L, -1).to(d) if l2 is not None: l2 = torch.log2(l2) h = super().forward(h, l, cond.to(d), l2) h = F.layer_norm(h.float(), h.shape[-1:]).to(d) if not self.no_norm else h / (1 + 2 * self.num_blocks) ** 0.5 logits = self.out_proj(h) return {"logits": logits, "probs": torch.softmax(logits, dim=-1)} @staticmethod def sample(model, cond, num_points, level, temperature=1.0, algo="systematic"): B = cond.shape[0] device = cond.device child_offset = torch.tensor([[i, j, k] for k in [0, 1] for j in [0, 1] for i in [0, 1]], dtype=torch.long, device=device) prev_coords_int = torch.zeros(B, 1, 3, dtype=torch.long, device=device) prev_counts = torch.full((B, 1), num_points, dtype=torch.long, device=device) prev_log_probs = torch.zeros(B, 1, dtype=torch.float32, device=device) batch_indices_range = torch.arange(B, device=device).unsqueeze(1) num_tensor = torch.full((B,), num_points, dtype=torch.long, device=device) for lv in range(1, level + 1): res_p = 1 << (lv - 1) res = 1 << lv parent_coords_norm = (prev_coords_int.to(torch.float32) + 0.5) / res_p res_tensor = torch.full((B,), res, dtype=torch.long, device=device) pred_logits = model(parent_coords_norm, res_tensor, cond, num_tensor)["logits"] / temperature pred_probs = torch.softmax(pred_logits, dim=-1) pred_log_probs = torch.log_softmax(pred_logits, dim=-1) sampled = sample_probs(pred_probs, prev_counts, algo=algo).flatten(1, 2) pred_log_probs = pred_log_probs.flatten(1, 2) prev_log_probs_expanded = prev_log_probs.repeat_interleave(8, dim=1) child_coords_int = (prev_coords_int[:, :, None, :] * 2 + child_offset[None, None, :, :]).flatten(1, 2) mask = sampled > 0 max_valid = mask.sum(dim=1).max().item() scatter_indices = mask.cumsum(dim=1) - 1 valid_scatter_indices = scatter_indices[mask] valid_batch_indices = batch_indices_range.expand_as(mask)[mask] next_prev_coords_int = torch.zeros(B, max_valid, 3, dtype=child_coords_int.dtype, device=device) next_prev_coords_int[valid_batch_indices, valid_scatter_indices] = child_coords_int[mask] next_prev_counts = torch.zeros(B, max_valid, dtype=sampled.dtype, device=device) next_prev_counts[valid_batch_indices, valid_scatter_indices] = sampled[mask] next_prev_log_probs = torch.zeros(B, max_valid, dtype=prev_log_probs.dtype, device=device) next_prev_log_probs[valid_batch_indices, valid_scatter_indices] = (prev_log_probs_expanded + pred_log_probs)[mask] prev_coords_int = next_prev_coords_int prev_counts = next_prev_counts prev_log_probs = next_prev_log_probs res = 1 << level prev_log_probs = torch.repeat_interleave(prev_log_probs.flatten(0, 1), prev_counts.flatten(0, 1), dim=0).reshape(B, num_points) coords_int = torch.repeat_interleave(prev_coords_int.flatten(0, 1), prev_counts.flatten(0, 1), dim=0).reshape(B, num_points, -1) coords_norm = (coords_int.to(torch.float32) + torch.rand_like(coords_int, dtype=torch.float32)) / res return {"points": coords_norm, "log_probs": prev_log_probs} class TransformerCrossBlock(nn.Module): def __init__(self, channels, ctx_channels, num_heads, mlp_ratio=4.0, attn_mode="full", qk_rms_norm=True, qk_rms_norm_cross=True, qkv_bias=True): super().__init__() self.norm1 = LayerNorm32(channels, elementwise_affine=False, eps=1e-6) self.norm2 = LayerNorm32(channels, elementwise_affine=True, eps=1e-6) self.norm3 = LayerNorm32(channels, elementwise_affine=False, eps=1e-6) self.self_attn = MultiHeadAttention(channels, num_heads=num_heads, type="self", attn_mode=attn_mode, qkv_bias=qkv_bias, qk_rms_norm=qk_rms_norm) self.cross_attn = MultiHeadAttention(channels, ctx_channels=ctx_channels, num_heads=num_heads, type="cross", attn_mode="full", qkv_bias=qkv_bias, qk_rms_norm=qk_rms_norm_cross) self.mlp = FeedForwardNet(channels, mlp_ratio=mlp_ratio) def forward(self, x, context): x = x + self.self_attn(self.norm1(x)) x = x + self.cross_attn(self.norm2(x), context) x = x + self.mlp(self.norm3(x)) return x class TransformerBase(nn.Module): def __init__(self, in_channels, model_channels, cond_channels, num_blocks, num_heads=None, num_head_channels=64, mlp_ratio=4.0, attn_mode="full", window_num=None, qk_rms_norm=True, qk_rms_norm_cross=True): super().__init__() self.model_channels = model_channels self.cond_channels = cond_channels self.num_blocks = num_blocks self.num_heads = num_heads or model_channels // num_head_channels self.mlp_ratio = mlp_ratio self.input_layer = nn.Linear(in_channels, model_channels) if cond_channels is not None: self.blocks = nn.ModuleList([ TransformerCrossBlock(model_channels, ctx_channels=cond_channels, num_heads=self.num_heads, mlp_ratio=self.mlp_ratio, attn_mode="full", qk_rms_norm=qk_rms_norm, qk_rms_norm_cross=qk_rms_norm_cross) for _ in range(num_blocks) ]) @property def dtype(self) -> torch.dtype: return next(self.parameters()).dtype def forward(self, x, cond=None, l=None, cond2=None): h = self.input_layer(x) for block in self.blocks: h = block(h, cond) return h class FixedlenDecoder(TransformerBase): def __init__(self, in_channels, model_channels, cond_channels, num_blocks, num_heads=None, num_head_channels=64, mlp_ratio=4.0, attn_mode="full", window_num=None, qk_rms_norm=True, qk_rms_norm_cross=True): super().__init__(in_channels=model_channels, model_channels=model_channels, cond_channels=cond_channels, num_blocks=num_blocks, num_heads=num_heads, num_head_channels=num_head_channels, mlp_ratio=mlp_ratio, attn_mode=attn_mode, window_num=window_num, qk_rms_norm=qk_rms_norm, qk_rms_norm_cross=qk_rms_norm_cross) self.in_proj = nn.Linear(in_channels, model_channels) self.pos_embedder = PcdAbsolutePositionEmbedderV2(channels=model_channels, in_channels=3) def forward(self, x=None, cond=None): pcd = x["points"] d = self.dtype B, L, C = pcd.shape h = self.in_proj(pcd.to(d)) + self.pos_embedder(pcd.reshape(-1, 3)).reshape(B, L, -1).to(d) return super().forward(h, cond.to(d)) class ElasticGaussianFixedlenDecoder(FixedlenDecoder): def __init__(self, in_channels, model_channels, cond_channels, num_blocks, num_heads=None, num_head_channels=64, mlp_ratio=4.0, attn_mode="full", window_num=None, *, no_norm=False, representation_config=None, use_learned_offset_scale=True, use_per_offset=True, qk_rms_norm=True, qk_rms_norm_cross=True): self.rep_config = representation_config self.use_learned_offset_scale = use_learned_offset_scale self.use_per_offset = use_per_offset self.out_channels = self._calc_layout() super().__init__(in_channels=in_channels, model_channels=model_channels, cond_channels=cond_channels, num_blocks=num_blocks, num_heads=num_heads, num_head_channels=num_head_channels, mlp_ratio=mlp_ratio, attn_mode=attn_mode, window_num=window_num, qk_rms_norm=qk_rms_norm, qk_rms_norm_cross=qk_rms_norm_cross) self.out_proj = nn.Linear(model_channels, self.out_channels) self.no_norm = no_norm self._build_perturbation() def _calc_layout(self): ng = self.rep_config['num_gaussians'] self.layout = { '_xyz': {'shape': (ng, 3), 'size': ng * 3}, '_features_dc': {'shape': (ng, 1, 3), 'size': ng * 3}, '_scaling': {'shape': (ng, 3), 'size': ng * 3}, '_rotation': {'shape': (ng, 4), 'size': ng * 4}, '_opacity': {'shape': (ng, 1), 'size': ng}, } if self.use_learned_offset_scale and self.use_per_offset: self.layout['_offset_scale'] = {'shape': (ng, 1), 'size': ng} start = 0 for k, v in self.layout.items(): v['range'] = (start, start + v['size']) start += v['size'] return start def _build_perturbation(self): ng = self.rep_config['num_gaussians'] perturbation = torch.tensor([hammersley_sequence(3, i, ng) for i in range(ng)]).float() perturbation = torch.atanh((perturbation * 2 - 1) / self.rep_config['perturbe_size']) self.register_buffer('points_offset_perturbation', perturbation) if self.use_learned_offset_scale: base = torch.tensor(self.rep_config['offset_scale']) self.register_buffer('base_offset_scale', torch.log(torch.exp(base) - 1.0)) def _get_offset(self, h): B = h.shape[0] if self.use_learned_offset_scale: r = self.layout['_offset_scale']['range'] _offset_scale = F.softplus( h[:, :, r[0]:r[1]].reshape(B, -1, *self.layout['_offset_scale']['shape']) + self.base_offset_scale) r = self.layout['_xyz']['range'] offset = h[:, :, r[0]:r[1]].reshape(B, -1, *self.layout['_xyz']['shape']) offset = offset * self.rep_config['lr']['_xyz'] if self.rep_config['perturb_offset']: offset = offset + self.points_offset_perturbation offset = torch.tanh(offset) * 0.5 * self.rep_config['perturbe_size'] offset = offset * (_offset_scale if self.use_learned_offset_scale else self.rep_config['offset_scale']) return offset def forward(self, x=None, cond=None): h = super().forward(x, cond) h = F.layer_norm(h.float(), h.shape[-1:]).to(h.dtype) if not self.no_norm else h / (1 + 3 * self.num_blocks) ** 0.5 return {"features": self.out_proj(h)} # --------------------------------------------------------------------------- # Flow matching denoiser # --------------------------------------------------------------------------- class TimestepEmbedder(nn.Module): def __init__(self, hidden_size, frequency_embedding_size=256): super().__init__() self.mlp = nn.Sequential( nn.Linear(frequency_embedding_size, hidden_size, bias=True), nn.SiLU(), nn.Linear(hidden_size, hidden_size, bias=True), ) self.frequency_embedding_size = frequency_embedding_size @staticmethod def timestep_embedding(t, dim, max_period=10000): half = dim // 2 freqs = torch.exp(-np.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half).to(device=t.device) args = t[:, None].float() * freqs[None] embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) if dim % 2: embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1) return embedding def forward(self, t): emb = self.timestep_embedding(t, self.frequency_embedding_size) return self.mlp(emb.to(self.mlp[0].weight.dtype)) class LatentSeqMMFlowModel(nn.Module): def __init__(self, q_token_length, in_channels, model_channels, cond_channels, out_channels, num_blocks, num_refiner_blocks=2, num_heads=None, num_head_channels=64, cam_channels=None, cond2_channels=None, mlp_ratio=4, share_mod=True, qk_rms_norm=False, use_shift_table=False): super().__init__() self.q_token_length = q_token_length self.in_channels = in_channels self.cam_channels = cam_channels self.model_channels = model_channels self.cond_channels = cond_channels self.cond2_channels = cond2_channels self.out_channels = out_channels self.num_blocks = num_blocks self.num_refiner_blocks = num_refiner_blocks self.num_heads = num_heads or model_channels // num_head_channels self.mlp_ratio = mlp_ratio self.share_mod = share_mod self.qk_rms_norm = qk_rms_norm self.use_shift_table = use_shift_table self.t_embedder = TimestepEmbedder(model_channels) if share_mod: self.adaLN_modulation = nn.Sequential( nn.SiLU(), nn.Linear(model_channels, 6 * model_channels, bias=True)) self.input_layer = nn.Linear(in_channels, model_channels) self.cond_embedder = nn.Linear(cond_channels, model_channels) self.cond_embedder2 = nn.Linear(cond2_channels, model_channels) if cond2_channels is not None else None sobol_seq = torch.quasirandom.SobolEngine(dimension=3, scramble=True, seed=123).draw(q_token_length) self.pos_pe = sobol_seq.unsqueeze(0) self.pos_embedder = PcdAbsolutePositionEmbedder(model_channels) self.noise_repo_layers = nn.ModuleList([ RePo3DRotaryEmbedding(model_channels, num_heads=self.num_heads, head_dim=num_head_channels) for _ in range(num_refiner_blocks)]) self.context_repo_layers = nn.ModuleList([ RePo3DRotaryEmbedding(model_channels, num_heads=self.num_heads, head_dim=num_head_channels) for _ in range(num_refiner_blocks)]) self.repo_layers = nn.ModuleList([ RePo3DRotaryEmbedding(model_channels, num_heads=self.num_heads, head_dim=num_head_channels) for _ in range(num_blocks)]) block_kwargs = dict(num_heads=self.num_heads, mlp_ratio=self.mlp_ratio, attn_mode='full', use_rope=True, qk_rms_norm=self.qk_rms_norm, use_shift_table=self.use_shift_table) self.noise_refiner = nn.ModuleList([ UnifiedTransformerBlock(model_channels, modulation=True, share_mod=self.share_mod, **block_kwargs) for _ in range(num_refiner_blocks)]) self.context_refiner = nn.ModuleList([ UnifiedTransformerBlock(model_channels, modulation=False, **block_kwargs) for _ in range(num_refiner_blocks)]) if self.cam_channels is not None: self.cam_refiner = MLP(self.cam_channels, model_channels, model_channels, mlp_layer_num=num_refiner_blocks) self.blocks = nn.ModuleList([ UnifiedTransformerBlock(model_channels, modulation=True, share_mod=self.share_mod, **block_kwargs) for _ in range(num_blocks)]) self.shift_table = nn.Parameter(torch.randn(1, 2, model_channels) / model_channels**0.5) if use_shift_table else None self.out_layer = nn.Linear(model_channels, out_channels) if cam_channels is not None: self.cam_out_layer = nn.Linear(model_channels, cam_channels) @property def dtype(self) -> torch.dtype: return next(self.parameters()).dtype @property def device(self) -> torch.device: return next(self.parameters()).device def load_safetensors(self, path: str) -> None: self.load_state_dict(safetensors.torch.load_file(path), strict=True) def forward(self, x_t, t, cond): d = self.dtype z = x_t['latent'].to(d) feat1 = cond['feature1'].to(d) feat2 = cond['feature2'].to(d) if self.cond_embedder2 is not None else None self.pos_pe = self.pos_pe.to(z.device) h_x = self.input_layer(z) h_cond = self.cond_embedder(feat1) if feat2 is not None: h_cond = h_cond + self.cond_embedder2(feat2) t_emb = self.t_embedder(t) t_mod = self.adaLN_modulation(t_emb) if self.share_mod else t_emb h_x = h_x + self.pos_embedder(self.pos_pe).to(d) for i, block in enumerate(self.noise_refiner): h_x = block(h_x, mod=t_mod, rotary_emb=self.noise_repo_layers[i](h_x)) for i, block in enumerate(self.context_refiner): h_cond = block(h_cond, mod=None, rotary_emb=self.context_repo_layers[i](h_cond)) if self.cam_channels is not None: cam = x_t.get('camera').to(d) h_cam = self.cam_refiner(cam) h = torch.cat([h_x, h_cond], dim=1) if self.cam_channels is not None: h = torch.cat([h, h_cam], dim=1) for i, block in enumerate(self.blocks): h = block(h, mod=t_mod, rotary_emb=self.repo_layers[i](h)) h_x = F.layer_norm(h[:, :z.shape[1]].float(), h.shape[-1:]).type(d) if self.cam_channels is not None: h_cam = F.layer_norm(h[:, -cam.shape[1]:].float(), h.shape[-1:]).type(d) if self.use_shift_table: shift, scale = (self.shift_table + t_emb.unsqueeze(1)).chunk(2, dim=1) h_x = h_x * (1 + scale) + shift if self.cam_channels is not None: h_cam = h_cam * (1 + scale) + shift out = {'latent': self.out_layer(h_x)} if self.cam_channels is not None: out['camera'] = self.cam_out_layer(h_cam) return out # --------------------------------------------------------------------------- # OctreeGaussianDecoder # --------------------------------------------------------------------------- class OctreeGaussianDecoder(nn.Module): _MAX_VOXEL_LEVEL = 8 def __init__(self, octree_args: dict, gs_args: dict): super().__init__() self.octree = OctreeProbabilityFixedlenDecoder(**octree_args) self.gs = ElasticGaussianFixedlenDecoder(**gs_args) def load_safetensors(self, path: str) -> None: self.load_state_dict(safetensors.torch.load_file(path), strict=True) @property def gaussians_per_point(self) -> int: return self.gs.rep_config['num_gaussians'] @torch.no_grad() def decode(self, latent: torch.Tensor, num_gaussians: int): from triposplat import _build_gaussians # local import: avoid model.py ↔ triposplat.py cycle num_decoder_tokens = max(1, num_gaussians // self.gaussians_per_point) points_pred = OctreeProbabilityFixedlenDecoder.sample( self.octree, latent, num_points=num_decoder_tokens, level=self._MAX_VOXEL_LEVEL, temperature=1.0, algo='systematic', ) pred = self.gs(x=points_pred, cond=latent) return _build_gaussians(self.gs, points_pred, pred)[0]