raster2seq / models /deformable_transformer.py
anas
Initial deployment of Raster2Seq floor plan vectorization API
fadb92b
# ------------------------------------------------------------------------------------
# Original RoomFormer implementation (https://github.com/ywyue/RoomFormer.git)
# ------------------------------------------------------------------------------------
import copy
import math
import torch
import torch.nn.functional as F
from torch import nn
from torch.nn.init import normal_
from models.ops.modules import MSDeformAttn
from util.misc import inverse_sigmoid
class MLP(nn.Module):
"""Very simple multi-layer perceptron (also called FFN)"""
def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
super().__init__()
self.num_layers = num_layers
h = [hidden_dim] * (num_layers - 1)
self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim]))
def forward(self, x):
for i, layer in enumerate(self.layers):
x = F.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
return x
class DeformableTransformer(nn.Module):
def __init__(
self,
d_model=256,
nhead=8,
num_encoder_layers=6,
num_decoder_layers=6,
dim_feedforward=1024,
dropout=0.1,
activation="relu",
poly_refine=True,
return_intermediate_dec=False,
aux_loss=False,
num_feature_levels=4,
dec_n_points=4,
enc_n_points=4,
query_pos_type="none",
):
super().__init__()
self.d_model = d_model
self.nhead = nhead
encoder_layer = DeformableTransformerEncoderLayer(
d_model, dim_feedforward, dropout, activation, num_feature_levels, nhead, enc_n_points
)
self.encoder = DeformableTransformerEncoder(encoder_layer, num_encoder_layers)
decoder_layer = DeformableTransformerDecoderLayer(
d_model, dim_feedforward, dropout, activation, num_feature_levels, nhead, dec_n_points
)
self.decoder = DeformableTransformerDecoder(
decoder_layer, num_decoder_layers, poly_refine, return_intermediate_dec, aux_loss, query_pos_type
)
self.level_embed = nn.Parameter(torch.Tensor(num_feature_levels, d_model))
if query_pos_type == "sine":
self.decoder.pos_trans = nn.Linear(d_model, d_model)
self.decoder.pos_trans_norm = nn.LayerNorm(d_model)
self._reset_parameters()
def _reset_parameters(self):
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
for m in self.modules():
if isinstance(m, MSDeformAttn):
m._reset_parameters()
normal_(self.level_embed)
def get_valid_ratio(self, mask):
_, H, W = mask.shape
valid_H = torch.sum(~mask[:, :, 0], 1)
valid_W = torch.sum(~mask[:, 0, :], 1)
valid_ratio_h = valid_H.float() / H
valid_ratio_w = valid_W.float() / W
valid_ratio = torch.stack([valid_ratio_w, valid_ratio_h], -1)
return valid_ratio
def forward(self, srcs, masks, pos_embeds, query_embed=None, tgt=None, tgt_masks=None):
assert query_embed is not None
# prepare input for encoder
src_flatten = []
mask_flatten = []
lvl_pos_embed_flatten = []
spatial_shapes = []
for lvl, (src, mask, pos_embed) in enumerate(zip(srcs, masks, pos_embeds)):
bs, c, h, w = src.shape
spatial_shape = (h, w)
spatial_shapes.append(spatial_shape)
src = src.flatten(2).transpose(1, 2)
mask = mask.flatten(1)
pos_embed = pos_embed.flatten(2).transpose(1, 2)
lvl_pos_embed = pos_embed + self.level_embed[lvl].view(1, 1, -1)
lvl_pos_embed_flatten.append(lvl_pos_embed)
src_flatten.append(src)
mask_flatten.append(mask)
src_flatten = torch.cat(src_flatten, 1)
mask_flatten = torch.cat(mask_flatten, 1)
lvl_pos_embed_flatten = torch.cat(lvl_pos_embed_flatten, 1)
spatial_shapes = torch.as_tensor(spatial_shapes, dtype=torch.long, device=src_flatten.device)
level_start_index = torch.cat((spatial_shapes.new_zeros((1,)), spatial_shapes.prod(1).cumsum(0)[:-1]))
valid_ratios = torch.stack([self.get_valid_ratio(m) for m in masks], 1)
# encoder
memory = self.encoder(
src_flatten, spatial_shapes, level_start_index, valid_ratios, lvl_pos_embed_flatten, mask_flatten
)
# prepare input for decoder
bs, _, c = memory.shape
query_embed = query_embed.unsqueeze(0).expand(bs, -1, -1)
tgt = tgt.unsqueeze(0).expand(bs, -1, -1)
reference_points = query_embed.sigmoid()
init_reference_out = reference_points
# decoder
hs, inter_references, inter_classes = self.decoder(
tgt,
reference_points,
memory,
src_flatten,
spatial_shapes,
level_start_index,
valid_ratios,
query_embed,
mask_flatten,
tgt_masks,
)
return hs, init_reference_out, inter_references, inter_classes
class DeformableTransformerEncoderLayer(nn.Module):
def __init__(self, d_model=256, d_ffn=1024, dropout=0.1, activation="relu", n_levels=4, n_heads=8, n_points=4):
super().__init__()
# self attention
self.self_attn = MSDeformAttn(d_model, n_levels, n_heads, n_points)
self.dropout1 = nn.Dropout(dropout)
self.norm1 = nn.LayerNorm(d_model)
# ffn
self.linear1 = nn.Linear(d_model, d_ffn)
self.activation = _get_activation_fn(activation)
self.dropout2 = nn.Dropout(dropout)
self.linear2 = nn.Linear(d_ffn, d_model)
self.dropout3 = nn.Dropout(dropout)
self.norm2 = nn.LayerNorm(d_model)
@staticmethod
def with_pos_embed(tensor, pos):
return tensor if pos is None else tensor + pos
def forward_ffn(self, src):
src2 = self.linear2(self.dropout2(self.activation(self.linear1(src))))
src = src + self.dropout3(src2)
src = self.norm2(src)
return src
def forward(self, src, pos, reference_points, spatial_shapes, level_start_index, padding_mask=None):
# self attention
src2 = self.self_attn(
self.with_pos_embed(src, pos), reference_points, src, spatial_shapes, level_start_index, padding_mask
)
src = src + self.dropout1(src2)
src = self.norm1(src)
# ffn
src = self.forward_ffn(src)
return src
class DeformableTransformerEncoder(nn.Module):
def __init__(self, encoder_layer, num_layers):
super().__init__()
self.layers = _get_clones(encoder_layer, num_layers)
self.num_layers = num_layers
@staticmethod
def get_reference_points(spatial_shapes, valid_ratios, device):
reference_points_list = []
for lvl, (H_, W_) in enumerate(spatial_shapes):
ref_y, ref_x = torch.meshgrid(
torch.linspace(0.5, H_ - 0.5, H_, dtype=torch.float32, device=device),
torch.linspace(0.5, W_ - 0.5, W_, dtype=torch.float32, device=device),
)
ref_y = ref_y.reshape(-1)[None] / (valid_ratios[:, None, lvl, 1] * H_)
ref_x = ref_x.reshape(-1)[None] / (valid_ratios[:, None, lvl, 0] * W_)
ref = torch.stack((ref_x, ref_y), -1)
reference_points_list.append(ref)
reference_points = torch.cat(reference_points_list, 1)
reference_points = reference_points[:, :, None] * valid_ratios[:, None]
return reference_points
def forward(self, src, spatial_shapes, level_start_index, valid_ratios, pos=None, padding_mask=None):
output = src
reference_points = self.get_reference_points(spatial_shapes, valid_ratios, device=src.device)
for _, layer in enumerate(self.layers):
output = layer(output, pos, reference_points, spatial_shapes, level_start_index, padding_mask)
return output
class DeformableTransformerDecoderLayer(nn.Module):
def __init__(self, d_model=256, d_ffn=1024, dropout=0.1, activation="relu", n_levels=4, n_heads=8, n_points=4):
super().__init__()
# cross attention
self.cross_attn = MSDeformAttn(d_model, n_levels, n_heads, n_points)
self.dropout1 = nn.Dropout(dropout)
self.norm1 = nn.LayerNorm(d_model)
# self attention
self.self_attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout)
self.dropout2 = nn.Dropout(dropout)
self.norm2 = nn.LayerNorm(d_model)
# ffn
self.linear1 = nn.Linear(d_model, d_ffn)
self.activation = _get_activation_fn(activation)
self.dropout3 = nn.Dropout(dropout)
self.linear2 = nn.Linear(d_ffn, d_model)
self.dropout4 = nn.Dropout(dropout)
self.norm3 = nn.LayerNorm(d_model)
@staticmethod
def with_pos_embed(tensor, pos):
return tensor if pos is None else tensor + pos
def forward_ffn(self, tgt):
tgt2 = self.linear2(self.dropout3(self.activation(self.linear1(tgt))))
tgt = tgt + self.dropout4(tgt2)
tgt = self.norm3(tgt)
return tgt
def forward(
self,
tgt,
query_pos,
reference_points,
src,
src_spatial_shapes,
level_start_index,
src_padding_mask=None,
tgt_masks=None,
):
# self attention
q = k = self.with_pos_embed(tgt, query_pos)
tgt2 = self.self_attn(q.transpose(0, 1), k.transpose(0, 1), tgt.transpose(0, 1), attn_mask=tgt_masks)[
0
].transpose(0, 1)
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
# cross attention
tgt2 = self.cross_attn(
self.with_pos_embed(tgt, query_pos),
reference_points,
src,
src_spatial_shapes,
level_start_index,
src_padding_mask,
)
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)
# ffn
tgt = self.forward_ffn(tgt)
return tgt
class DeformableTransformerDecoder(nn.Module):
def __init__(
self,
decoder_layer,
num_layers,
poly_refine=True,
return_intermediate=False,
aux_loss=False,
query_pos_type="none",
):
super().__init__()
self.layers = _get_clones(decoder_layer, num_layers)
self.num_layers = num_layers
self.poly_refine = poly_refine
self.return_intermediate = return_intermediate
self.aux_loss = aux_loss
self.query_pos_type = query_pos_type
self.coords_embed = None
self.class_embed = None
self.pos_trans = None
self.pos_trans_norm = None
def get_query_pos_embed(self, ref_points):
num_pos_feats = 128
temperature = 10000
scale = 2 * math.pi
dim_t = torch.arange(num_pos_feats, dtype=torch.float32, device=ref_points.device)
dim_t = temperature ** (2 * (dim_t // 2) / num_pos_feats) # [128]
# N, L, 2
ref_points = ref_points * scale
# N, L, 2, 128
pos = ref_points[:, :, :, None] / dim_t
# N, L, 256
pos = torch.stack((pos[:, :, :, 0::2].sin(), pos[:, :, :, 1::2].cos()), dim=4).flatten(2)
return pos
def forward(
self,
tgt,
reference_points,
src,
src_flatten,
src_spatial_shapes,
src_level_start_index,
src_valid_ratios,
query_pos=None,
src_padding_mask=None,
tgt_masks=None,
):
output = tgt # [10, 800, 256]
intermediate = []
intermediate_reference_points = []
intermediate_classes = []
point_classes = torch.zeros(output.shape[:2]).unsqueeze(-1).to(output.device)
for lid, layer in enumerate(self.layers):
assert reference_points.shape[-1] == 2
reference_points_input = reference_points[:, :, None] * src_valid_ratios[:, None]
if self.query_pos_type == "sine":
query_pos = self.pos_trans_norm(self.pos_trans(self.get_query_pos_embed(reference_points)))
elif self.query_pos_type == "none":
query_pos = None
output = layer(
output,
query_pos,
reference_points_input,
src,
src_spatial_shapes,
src_level_start_index,
src_padding_mask,
tgt_masks,
)
# iterative polygon refinement
if self.poly_refine:
offset = self.coords_embed[lid](output)
assert reference_points.shape[-1] == 2
new_reference_points = offset
new_reference_points = offset + inverse_sigmoid(reference_points)
new_reference_points = new_reference_points.sigmoid()
reference_points = new_reference_points
# if not using iterative polygon refinement, just output the reference points decoded from the last layer
elif lid == len(self.layers) - 1:
offset = self.coords_embed[-1](output)
assert reference_points.shape[-1] == 2
new_reference_points = offset
new_reference_points = offset + inverse_sigmoid(reference_points)
new_reference_points = new_reference_points.sigmoid()
reference_points = new_reference_points
# If aux loss supervision, we predict classes label from each layer and supervise loss
if self.aux_loss:
point_classes = self.class_embed[lid](output)
# Otherwise, we only predict class label from the last layer
elif lid == len(self.layers) - 1:
point_classes = self.class_embed[-1](output)
if self.return_intermediate:
intermediate.append(output)
intermediate_reference_points.append(reference_points)
intermediate_classes.append(point_classes)
if self.return_intermediate:
return (
torch.stack(intermediate),
torch.stack(intermediate_reference_points),
torch.stack(intermediate_classes),
)
return output, reference_points, point_classes
def _get_clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
def _get_activation_fn(activation):
"""Return an activation function given a string"""
if activation == "relu":
return F.relu
if activation == "gelu":
return F.gelu
if activation == "glu":
return F.glu
raise RuntimeError(f"activation should be relu/gelu, not {activation}.")
def build_deforamble_transformer(args):
return DeformableTransformer(
d_model=args.hidden_dim,
nhead=args.nheads,
num_encoder_layers=args.enc_layers,
num_decoder_layers=args.dec_layers,
dim_feedforward=args.dim_feedforward,
dropout=args.dropout,
activation="relu",
poly_refine=args.with_poly_refine,
return_intermediate_dec=True,
aux_loss=args.aux_loss,
num_feature_levels=args.num_feature_levels,
dec_n_points=args.dec_n_points,
enc_n_points=args.enc_n_points,
query_pos_type=args.query_pos_type,
)