| | |
| | """ |
| | Implementation of RegNet models from :paper:`dds` and :paper:`scaling`. |
| | |
| | This code is adapted from https://github.com/facebookresearch/pycls with minimal modifications. |
| | Some code duplication exists between RegNet and ResNets (e.g., ResStem) in order to simplify |
| | model loading. |
| | """ |
| |
|
| | import numpy as np |
| | from torch import nn |
| |
|
| | from annotator.oneformer.detectron2.layers import CNNBlockBase, ShapeSpec, get_norm |
| |
|
| | from .backbone import Backbone |
| |
|
| | __all__ = [ |
| | "AnyNet", |
| | "RegNet", |
| | "ResStem", |
| | "SimpleStem", |
| | "VanillaBlock", |
| | "ResBasicBlock", |
| | "ResBottleneckBlock", |
| | ] |
| |
|
| |
|
| | def conv2d(w_in, w_out, k, *, stride=1, groups=1, bias=False): |
| | """Helper for building a conv2d layer.""" |
| | assert k % 2 == 1, "Only odd size kernels supported to avoid padding issues." |
| | s, p, g, b = stride, (k - 1) // 2, groups, bias |
| | return nn.Conv2d(w_in, w_out, k, stride=s, padding=p, groups=g, bias=b) |
| |
|
| |
|
| | def gap2d(): |
| | """Helper for building a global average pooling layer.""" |
| | return nn.AdaptiveAvgPool2d((1, 1)) |
| |
|
| |
|
| | def pool2d(k, *, stride=1): |
| | """Helper for building a pool2d layer.""" |
| | assert k % 2 == 1, "Only odd size kernels supported to avoid padding issues." |
| | return nn.MaxPool2d(k, stride=stride, padding=(k - 1) // 2) |
| |
|
| |
|
| | def init_weights(m): |
| | """Performs ResNet-style weight initialization.""" |
| | if isinstance(m, nn.Conv2d): |
| | |
| | fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels |
| | m.weight.data.normal_(mean=0.0, std=np.sqrt(2.0 / fan_out)) |
| | elif isinstance(m, nn.BatchNorm2d): |
| | m.weight.data.fill_(1.0) |
| | m.bias.data.zero_() |
| | elif isinstance(m, nn.Linear): |
| | m.weight.data.normal_(mean=0.0, std=0.01) |
| | m.bias.data.zero_() |
| |
|
| |
|
| | class ResStem(CNNBlockBase): |
| | """ResNet stem for ImageNet: 7x7, BN, AF, MaxPool.""" |
| |
|
| | def __init__(self, w_in, w_out, norm, activation_class): |
| | super().__init__(w_in, w_out, 4) |
| | self.conv = conv2d(w_in, w_out, 7, stride=2) |
| | self.bn = get_norm(norm, w_out) |
| | self.af = activation_class() |
| | self.pool = pool2d(3, stride=2) |
| |
|
| | def forward(self, x): |
| | for layer in self.children(): |
| | x = layer(x) |
| | return x |
| |
|
| |
|
| | class SimpleStem(CNNBlockBase): |
| | """Simple stem for ImageNet: 3x3, BN, AF.""" |
| |
|
| | def __init__(self, w_in, w_out, norm, activation_class): |
| | super().__init__(w_in, w_out, 2) |
| | self.conv = conv2d(w_in, w_out, 3, stride=2) |
| | self.bn = get_norm(norm, w_out) |
| | self.af = activation_class() |
| |
|
| | def forward(self, x): |
| | for layer in self.children(): |
| | x = layer(x) |
| | return x |
| |
|
| |
|
| | class SE(nn.Module): |
| | """Squeeze-and-Excitation (SE) block: AvgPool, FC, Act, FC, Sigmoid.""" |
| |
|
| | def __init__(self, w_in, w_se, activation_class): |
| | super().__init__() |
| | self.avg_pool = gap2d() |
| | self.f_ex = nn.Sequential( |
| | conv2d(w_in, w_se, 1, bias=True), |
| | activation_class(), |
| | conv2d(w_se, w_in, 1, bias=True), |
| | nn.Sigmoid(), |
| | ) |
| |
|
| | def forward(self, x): |
| | return x * self.f_ex(self.avg_pool(x)) |
| |
|
| |
|
| | class VanillaBlock(CNNBlockBase): |
| | """Vanilla block: [3x3 conv, BN, Relu] x2.""" |
| |
|
| | def __init__(self, w_in, w_out, stride, norm, activation_class, _params): |
| | super().__init__(w_in, w_out, stride) |
| | self.a = conv2d(w_in, w_out, 3, stride=stride) |
| | self.a_bn = get_norm(norm, w_out) |
| | self.a_af = activation_class() |
| | self.b = conv2d(w_out, w_out, 3) |
| | self.b_bn = get_norm(norm, w_out) |
| | self.b_af = activation_class() |
| |
|
| | def forward(self, x): |
| | for layer in self.children(): |
| | x = layer(x) |
| | return x |
| |
|
| |
|
| | class BasicTransform(nn.Module): |
| | """Basic transformation: [3x3 conv, BN, Relu] x2.""" |
| |
|
| | def __init__(self, w_in, w_out, stride, norm, activation_class, _params): |
| | super().__init__() |
| | self.a = conv2d(w_in, w_out, 3, stride=stride) |
| | self.a_bn = get_norm(norm, w_out) |
| | self.a_af = activation_class() |
| | self.b = conv2d(w_out, w_out, 3) |
| | self.b_bn = get_norm(norm, w_out) |
| | self.b_bn.final_bn = True |
| |
|
| | def forward(self, x): |
| | for layer in self.children(): |
| | x = layer(x) |
| | return x |
| |
|
| |
|
| | class ResBasicBlock(CNNBlockBase): |
| | """Residual basic block: x + f(x), f = basic transform.""" |
| |
|
| | def __init__(self, w_in, w_out, stride, norm, activation_class, params): |
| | super().__init__(w_in, w_out, stride) |
| | self.proj, self.bn = None, None |
| | if (w_in != w_out) or (stride != 1): |
| | self.proj = conv2d(w_in, w_out, 1, stride=stride) |
| | self.bn = get_norm(norm, w_out) |
| | self.f = BasicTransform(w_in, w_out, stride, norm, activation_class, params) |
| | self.af = activation_class() |
| |
|
| | def forward(self, x): |
| | x_p = self.bn(self.proj(x)) if self.proj else x |
| | return self.af(x_p + self.f(x)) |
| |
|
| |
|
| | class BottleneckTransform(nn.Module): |
| | """Bottleneck transformation: 1x1, 3x3 [+SE], 1x1.""" |
| |
|
| | def __init__(self, w_in, w_out, stride, norm, activation_class, params): |
| | super().__init__() |
| | w_b = int(round(w_out * params["bot_mul"])) |
| | w_se = int(round(w_in * params["se_r"])) |
| | groups = w_b // params["group_w"] |
| | self.a = conv2d(w_in, w_b, 1) |
| | self.a_bn = get_norm(norm, w_b) |
| | self.a_af = activation_class() |
| | self.b = conv2d(w_b, w_b, 3, stride=stride, groups=groups) |
| | self.b_bn = get_norm(norm, w_b) |
| | self.b_af = activation_class() |
| | self.se = SE(w_b, w_se, activation_class) if w_se else None |
| | self.c = conv2d(w_b, w_out, 1) |
| | self.c_bn = get_norm(norm, w_out) |
| | self.c_bn.final_bn = True |
| |
|
| | def forward(self, x): |
| | for layer in self.children(): |
| | x = layer(x) |
| | return x |
| |
|
| |
|
| | class ResBottleneckBlock(CNNBlockBase): |
| | """Residual bottleneck block: x + f(x), f = bottleneck transform.""" |
| |
|
| | def __init__(self, w_in, w_out, stride, norm, activation_class, params): |
| | super().__init__(w_in, w_out, stride) |
| | self.proj, self.bn = None, None |
| | if (w_in != w_out) or (stride != 1): |
| | self.proj = conv2d(w_in, w_out, 1, stride=stride) |
| | self.bn = get_norm(norm, w_out) |
| | self.f = BottleneckTransform(w_in, w_out, stride, norm, activation_class, params) |
| | self.af = activation_class() |
| |
|
| | def forward(self, x): |
| | x_p = self.bn(self.proj(x)) if self.proj else x |
| | return self.af(x_p + self.f(x)) |
| |
|
| |
|
| | class AnyStage(nn.Module): |
| | """AnyNet stage (sequence of blocks w/ the same output shape).""" |
| |
|
| | def __init__(self, w_in, w_out, stride, d, block_class, norm, activation_class, params): |
| | super().__init__() |
| | for i in range(d): |
| | block = block_class(w_in, w_out, stride, norm, activation_class, params) |
| | self.add_module("b{}".format(i + 1), block) |
| | stride, w_in = 1, w_out |
| |
|
| | def forward(self, x): |
| | for block in self.children(): |
| | x = block(x) |
| | return x |
| |
|
| |
|
| | class AnyNet(Backbone): |
| | """AnyNet model. See :paper:`dds`.""" |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | stem_class, |
| | stem_width, |
| | block_class, |
| | depths, |
| | widths, |
| | group_widths, |
| | strides, |
| | bottleneck_ratios, |
| | se_ratio, |
| | activation_class, |
| | freeze_at=0, |
| | norm="BN", |
| | out_features=None, |
| | ): |
| | """ |
| | Args: |
| | stem_class (callable): A callable taking 4 arguments (channels in, channels out, |
| | normalization, callable returning an activation function) that returns another |
| | callable implementing the stem module. |
| | stem_width (int): The number of output channels that the stem produces. |
| | block_class (callable): A callable taking 6 arguments (channels in, channels out, |
| | stride, normalization, callable returning an activation function, a dict of |
| | block-specific parameters) that returns another callable implementing the repeated |
| | block module. |
| | depths (list[int]): Number of blocks in each stage. |
| | widths (list[int]): For each stage, the number of output channels of each block. |
| | group_widths (list[int]): For each stage, the number of channels per group in group |
| | convolution, if the block uses group convolution. |
| | strides (list[int]): The stride that each network stage applies to its input. |
| | bottleneck_ratios (list[float]): For each stage, the ratio of the number of bottleneck |
| | channels to the number of block input channels (or, equivalently, output channels), |
| | if the block uses a bottleneck. |
| | se_ratio (float): The ratio of the number of channels used inside the squeeze-excitation |
| | (SE) module to it number of input channels, if SE the block uses SE. |
| | activation_class (callable): A callable taking no arguments that returns another |
| | callable implementing an activation function. |
| | freeze_at (int): The number of stages at the beginning to freeze. |
| | see :meth:`freeze` for detailed explanation. |
| | norm (str or callable): normalization for all conv layers. |
| | See :func:`layers.get_norm` for supported format. |
| | out_features (list[str]): name of the layers whose outputs should |
| | be returned in forward. RegNet's use "stem" and "s1", "s2", etc for the stages after |
| | the stem. If None, will return the output of the last layer. |
| | """ |
| | super().__init__() |
| | self.stem = stem_class(3, stem_width, norm, activation_class) |
| |
|
| | current_stride = self.stem.stride |
| | self._out_feature_strides = {"stem": current_stride} |
| | self._out_feature_channels = {"stem": self.stem.out_channels} |
| | self.stages_and_names = [] |
| | prev_w = stem_width |
| |
|
| | for i, (d, w, s, b, g) in enumerate( |
| | zip(depths, widths, strides, bottleneck_ratios, group_widths) |
| | ): |
| | params = {"bot_mul": b, "group_w": g, "se_r": se_ratio} |
| | stage = AnyStage(prev_w, w, s, d, block_class, norm, activation_class, params) |
| | name = "s{}".format(i + 1) |
| | self.add_module(name, stage) |
| | self.stages_and_names.append((stage, name)) |
| | self._out_feature_strides[name] = current_stride = int( |
| | current_stride * np.prod([k.stride for k in stage.children()]) |
| | ) |
| | self._out_feature_channels[name] = list(stage.children())[-1].out_channels |
| | prev_w = w |
| |
|
| | self.apply(init_weights) |
| |
|
| | if out_features is None: |
| | out_features = [name] |
| | self._out_features = out_features |
| | assert len(self._out_features) |
| | children = [x[0] for x in self.named_children()] |
| | for out_feature in self._out_features: |
| | assert out_feature in children, "Available children: {} does not include {}".format( |
| | ", ".join(children), out_feature |
| | ) |
| | self.freeze(freeze_at) |
| |
|
| | def forward(self, x): |
| | """ |
| | Args: |
| | x: Tensor of shape (N,C,H,W). H, W must be a multiple of ``self.size_divisibility``. |
| | |
| | Returns: |
| | dict[str->Tensor]: names and the corresponding features |
| | """ |
| | assert x.dim() == 4, f"Model takes an input of shape (N, C, H, W). Got {x.shape} instead!" |
| | outputs = {} |
| | x = self.stem(x) |
| | if "stem" in self._out_features: |
| | outputs["stem"] = x |
| | for stage, name in self.stages_and_names: |
| | x = stage(x) |
| | if name in self._out_features: |
| | outputs[name] = x |
| | return outputs |
| |
|
| | def output_shape(self): |
| | return { |
| | name: ShapeSpec( |
| | channels=self._out_feature_channels[name], stride=self._out_feature_strides[name] |
| | ) |
| | for name in self._out_features |
| | } |
| |
|
| | def freeze(self, freeze_at=0): |
| | """ |
| | Freeze the first several stages of the model. Commonly used in fine-tuning. |
| | |
| | Layers that produce the same feature map spatial size are defined as one |
| | "stage" by :paper:`FPN`. |
| | |
| | Args: |
| | freeze_at (int): number of stages to freeze. |
| | `1` means freezing the stem. `2` means freezing the stem and |
| | one residual stage, etc. |
| | |
| | Returns: |
| | nn.Module: this model itself |
| | """ |
| | if freeze_at >= 1: |
| | self.stem.freeze() |
| | for idx, (stage, _) in enumerate(self.stages_and_names, start=2): |
| | if freeze_at >= idx: |
| | for block in stage.children(): |
| | block.freeze() |
| | return self |
| |
|
| |
|
| | def adjust_block_compatibility(ws, bs, gs): |
| | """Adjusts the compatibility of widths, bottlenecks, and groups.""" |
| | assert len(ws) == len(bs) == len(gs) |
| | assert all(w > 0 and b > 0 and g > 0 for w, b, g in zip(ws, bs, gs)) |
| | vs = [int(max(1, w * b)) for w, b in zip(ws, bs)] |
| | gs = [int(min(g, v)) for g, v in zip(gs, vs)] |
| | ms = [np.lcm(g, b) if b > 1 else g for g, b in zip(gs, bs)] |
| | vs = [max(m, int(round(v / m) * m)) for v, m in zip(vs, ms)] |
| | ws = [int(v / b) for v, b in zip(vs, bs)] |
| | assert all(w * b % g == 0 for w, b, g in zip(ws, bs, gs)) |
| | return ws, bs, gs |
| |
|
| |
|
| | def generate_regnet_parameters(w_a, w_0, w_m, d, q=8): |
| | """Generates per stage widths and depths from RegNet parameters.""" |
| | assert w_a >= 0 and w_0 > 0 and w_m > 1 and w_0 % q == 0 |
| | |
| | ws_cont = np.arange(d) * w_a + w_0 |
| | |
| | ks = np.round(np.log(ws_cont / w_0) / np.log(w_m)) |
| | ws_all = w_0 * np.power(w_m, ks) |
| | ws_all = np.round(np.divide(ws_all, q)).astype(int) * q |
| | |
| | ws, ds = np.unique(ws_all, return_counts=True) |
| | |
| | num_stages, total_stages = len(ws), ks.max() + 1 |
| | |
| | ws, ds, ws_all, ws_cont = (x.tolist() for x in (ws, ds, ws_all, ws_cont)) |
| | return ws, ds, num_stages, total_stages, ws_all, ws_cont |
| |
|
| |
|
| | class RegNet(AnyNet): |
| | """RegNet model. See :paper:`dds`.""" |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | stem_class, |
| | stem_width, |
| | block_class, |
| | depth, |
| | w_a, |
| | w_0, |
| | w_m, |
| | group_width, |
| | stride=2, |
| | bottleneck_ratio=1.0, |
| | se_ratio=0.0, |
| | activation_class=None, |
| | freeze_at=0, |
| | norm="BN", |
| | out_features=None, |
| | ): |
| | """ |
| | Build a RegNet from the parameterization described in :paper:`dds` Section 3.3. |
| | |
| | Args: |
| | See :class:`AnyNet` for arguments that are not listed here. |
| | depth (int): Total number of blocks in the RegNet. |
| | w_a (float): Factor by which block width would increase prior to quantizing block widths |
| | by stage. See :paper:`dds` Section 3.3. |
| | w_0 (int): Initial block width. See :paper:`dds` Section 3.3. |
| | w_m (float): Parameter controlling block width quantization. |
| | See :paper:`dds` Section 3.3. |
| | group_width (int): Number of channels per group in group convolution, if the block uses |
| | group convolution. |
| | bottleneck_ratio (float): The ratio of the number of bottleneck channels to the number |
| | of block input channels (or, equivalently, output channels), if the block uses a |
| | bottleneck. |
| | stride (int): The stride that each network stage applies to its input. |
| | """ |
| | ws, ds = generate_regnet_parameters(w_a, w_0, w_m, depth)[0:2] |
| | ss = [stride for _ in ws] |
| | bs = [bottleneck_ratio for _ in ws] |
| | gs = [group_width for _ in ws] |
| | ws, bs, gs = adjust_block_compatibility(ws, bs, gs) |
| |
|
| | def default_activation_class(): |
| | return nn.ReLU(inplace=True) |
| |
|
| | super().__init__( |
| | stem_class=stem_class, |
| | stem_width=stem_width, |
| | block_class=block_class, |
| | depths=ds, |
| | widths=ws, |
| | strides=ss, |
| | group_widths=gs, |
| | bottleneck_ratios=bs, |
| | se_ratio=se_ratio, |
| | activation_class=default_activation_class |
| | if activation_class is None |
| | else activation_class, |
| | freeze_at=freeze_at, |
| | norm=norm, |
| | out_features=out_features, |
| | ) |
| |
|