Spaces:
Build error
Build error
| from transformers import PretrainedConfig, PreTrainedModel | |
| from typing import List | |
| import copy | |
| import math | |
| import warnings | |
| from dataclasses import dataclass | |
| from functools import partial | |
| import sys | |
| from typing import Any, Callable, List, Optional, Sequence, Tuple, Union | |
| import torch | |
| from torch import Tensor, nn | |
| from torchvision.models._utils import _make_divisible | |
| from torchvision.ops import StochasticDepth | |
| sys.path.insert(1, "../") | |
| from utils.vision_modifications import Conv2dNormActivation, SqueezeExcitation | |
| class _MBConvConfig: | |
| expand_ratio: float | |
| kernel: int | |
| stride: int | |
| input_channels: int | |
| out_channels: int | |
| num_layers: int | |
| block: Callable[..., nn.Module] | |
| def adjust_channels( | |
| channels: int, width_mult: float, min_value: Optional[int] = None | |
| ) -> int: | |
| return _make_divisible(channels * width_mult, 8, min_value) | |
| class MBConvConfig(_MBConvConfig): | |
| # Stores information listed at Table 1 of the EfficientNet paper & Table 4 of the EfficientNetV2 paper | |
| def __init__( | |
| self, | |
| expand_ratio: float, | |
| kernel: int, | |
| stride: int, | |
| input_channels: int, | |
| out_channels: int, | |
| num_layers: int, | |
| width_mult: float = 1.0, | |
| depth_mult: float = 1.0, | |
| block: Optional[Callable[..., nn.Module]] = None, | |
| ) -> None: | |
| input_channels = self.adjust_channels(input_channels, width_mult) | |
| out_channels = self.adjust_channels(out_channels, width_mult) | |
| num_layers = self.adjust_depth(num_layers, depth_mult) | |
| if block is None: | |
| block = MBConv | |
| super().__init__( | |
| expand_ratio, | |
| kernel, | |
| stride, | |
| input_channels, | |
| out_channels, | |
| num_layers, | |
| block, | |
| ) | |
| def adjust_depth(num_layers: int, depth_mult: float): | |
| return int(math.ceil(num_layers * depth_mult)) | |
| class FusedMBConvConfig(_MBConvConfig): | |
| # Stores information listed at Table 4 of the EfficientNetV2 paper | |
| def __init__( | |
| self, | |
| expand_ratio: float, | |
| kernel: int, | |
| stride: int, | |
| input_channels: int, | |
| out_channels: int, | |
| num_layers: int, | |
| block: Optional[Callable[..., nn.Module]] = None, | |
| ) -> None: | |
| if block is None: | |
| block = FusedMBConv | |
| super().__init__( | |
| expand_ratio, | |
| kernel, | |
| stride, | |
| input_channels, | |
| out_channels, | |
| num_layers, | |
| block, | |
| ) | |
| class MBConv(nn.Module): | |
| def __init__( | |
| self, | |
| cnf: MBConvConfig, | |
| stochastic_depth_prob: float, | |
| norm_layer: Callable[..., nn.Module], | |
| se_layer: Callable[..., nn.Module] = SqueezeExcitation, | |
| ) -> None: | |
| super().__init__() | |
| if not (1 <= cnf.stride <= 2): | |
| raise ValueError("illegal stride value") | |
| self.use_res_connect = ( | |
| cnf.stride == 1 and cnf.input_channels == cnf.out_channels | |
| ) | |
| layers: List[nn.Module] = [] | |
| activation_layer = nn.SiLU | |
| # expand | |
| expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio) | |
| if expanded_channels != cnf.input_channels: | |
| layers.append( | |
| Conv2dNormActivation( | |
| cnf.input_channels, | |
| expanded_channels, | |
| kernel_size=1, | |
| norm_layer=norm_layer, | |
| activation_layer=activation_layer, | |
| ) | |
| ) | |
| # depthwise | |
| layers.append( | |
| Conv2dNormActivation( | |
| expanded_channels, | |
| expanded_channels, | |
| kernel_size=cnf.kernel, | |
| stride=cnf.stride, | |
| groups=expanded_channels, | |
| norm_layer=norm_layer, | |
| activation_layer=activation_layer, | |
| ) | |
| ) | |
| # squeeze and excitation | |
| squeeze_channels = max(1, cnf.input_channels // 4) | |
| layers.append( | |
| se_layer( | |
| expanded_channels, | |
| squeeze_channels, | |
| activation=partial(nn.SiLU, inplace=True), | |
| ) | |
| ) | |
| # project | |
| layers.append( | |
| Conv2dNormActivation( | |
| expanded_channels, | |
| cnf.out_channels, | |
| kernel_size=1, | |
| norm_layer=norm_layer, | |
| activation_layer=None, | |
| ) | |
| ) | |
| self.block = nn.Sequential(*layers) | |
| self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row") | |
| self.out_channels = cnf.out_channels | |
| def forward(self, input: Tensor) -> Tensor: | |
| result = self.block(input) | |
| if self.use_res_connect: | |
| result = self.stochastic_depth(result) | |
| result += input | |
| return result | |
| class FusedMBConv(nn.Module): | |
| def __init__( | |
| self, | |
| cnf: FusedMBConvConfig, | |
| stochastic_depth_prob: float, | |
| norm_layer: Callable[..., nn.Module], | |
| ) -> None: | |
| super().__init__() | |
| if not (1 <= cnf.stride <= 2): | |
| raise ValueError("illegal stride value") | |
| self.use_res_connect = ( | |
| cnf.stride == 1 and cnf.input_channels == cnf.out_channels | |
| ) | |
| layers: List[nn.Module] = [] | |
| activation_layer = nn.SiLU | |
| expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio) | |
| if expanded_channels != cnf.input_channels: | |
| # fused expand | |
| layers.append( | |
| Conv2dNormActivation( | |
| cnf.input_channels, | |
| expanded_channels, | |
| kernel_size=cnf.kernel, | |
| stride=cnf.stride, | |
| norm_layer=norm_layer, | |
| activation_layer=activation_layer, | |
| ) | |
| ) | |
| # project | |
| layers.append( | |
| Conv2dNormActivation( | |
| expanded_channels, | |
| cnf.out_channels, | |
| kernel_size=1, | |
| norm_layer=norm_layer, | |
| activation_layer=None, | |
| ) | |
| ) | |
| else: | |
| layers.append( | |
| Conv2dNormActivation( | |
| cnf.input_channels, | |
| cnf.out_channels, | |
| kernel_size=cnf.kernel, | |
| stride=cnf.stride, | |
| norm_layer=norm_layer, | |
| activation_layer=activation_layer, | |
| ) | |
| ) | |
| self.block = nn.Sequential(*layers) | |
| self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row") | |
| self.out_channels = cnf.out_channels | |
| def forward(self, input: Tensor) -> Tensor: | |
| result = self.block(input) | |
| if self.use_res_connect: | |
| result = self.stochastic_depth(result) | |
| result += input | |
| return result | |
| class EfficientNetConfig(PretrainedConfig): | |
| # model_type = "efficientnet" | |
| model_type = "efficientnet_61_planet_detection" | |
| def __init__( | |
| self, | |
| # inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]], | |
| dropout: float=0.25, | |
| num_channels: int = 61, | |
| stochastic_depth_prob: float = 0.2, | |
| num_classes: int = 2, | |
| norm_layer: Optional[Callable[..., nn.Module]] = None, | |
| # last_channel: Optional[int] = None, | |
| size: str='v2_s', | |
| width_mult: float = 1.0, | |
| depth_mult: float = 1.0, | |
| **kwargs: Any, | |
| ) -> None: | |
| """ | |
| EfficientNet V1 and V2 main class | |
| Args: | |
| inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure | |
| dropout (float): The droupout probability | |
| stochastic_depth_prob (float): The stochastic depth probability | |
| num_classes (int): Number of classes | |
| norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use | |
| last_channel (int): The number of channels on the penultimate layer | |
| """ | |
| # self.model = EfficientNet( | |
| # dropout=dropout, | |
| # num_channels=num_channels, | |
| # num_classes=num_classes, | |
| # size=size, | |
| # stochastic_depth_prob=stochastic_depth_prob, | |
| # width_mult=width_mult, | |
| # depth_mult=depth_mult, | |
| # ) | |
| # | |
| self.dropout=dropout | |
| self.num_channels=num_channels | |
| self.num_classes=num_classes | |
| self.size=size | |
| self.stochastic_depth_prob=stochastic_depth_prob | |
| self.width_mult=width_mult | |
| self.depth_mult=depth_mult | |
| super().__init__(**kwargs) | |
| class EfficientNetPreTrained(PreTrainedModel): | |
| config_class = EfficientNetConfig | |
| def __init__( | |
| self, | |
| config | |
| ): | |
| super().__init__(config) | |
| self.model = EfficientNet(dropout=config.dropout, | |
| num_channels=config.num_channels, | |
| num_classes=config.num_classes, | |
| size=config.size, | |
| stochastic_depth_prob=config.stochastic_depth_prob, | |
| width_mult=config.width_mult, | |
| depth_mult=config.depth_mult,) | |
| def forward(self, tensor): | |
| return self.model.forward(tensor) | |
| def _init_weights(self, module): | |
| # initialize weights before loading | |
| # not all will have weights and biases | |
| try: | |
| module.weight.data.normal_(mean=0.0) | |
| except Exception as e: | |
| # print('weight 1', e) | |
| try: | |
| module.weight.data.fill_(1.0) | |
| except Exception as e: | |
| # print('weight 2', e) | |
| _ = None | |
| try: | |
| module.bias.data.zero_() | |
| except AttributeError as e: | |
| # print('bias', e) | |
| _ = None | |
| class EfficientNet(nn.Module): | |
| def __init__( | |
| self, | |
| # inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]], | |
| dropout: float=0.25, | |
| num_channels: int = 61, | |
| stochastic_depth_prob: float = 0.2, | |
| num_classes: int = 2, | |
| norm_layer: Optional[Callable[..., nn.Module]] = None, | |
| # last_channel: Optional[int] = None, | |
| size: str='v2_s', | |
| width_mult: float = 1.0, | |
| depth_mult: float = 1.0, | |
| **kwargs: Any, | |
| ) -> None: | |
| """ | |
| EfficientNet V1 and V2 main class | |
| Args: | |
| inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure | |
| dropout (float): The droupout probability | |
| stochastic_depth_prob (float): The stochastic depth probability | |
| num_classes (int): Number of classes | |
| norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use | |
| last_channel (int): The number of channels on the penultimate layer | |
| """ | |
| super().__init__() | |
| # _log_api_usage_once(self) | |
| inverted_residual_setting, last_channel = _efficientnet_conf( | |
| "efficientnet_%s" % (size), width_mult=width_mult, depth_mult=depth_mult | |
| ) | |
| if not inverted_residual_setting: | |
| raise ValueError("The inverted_residual_setting should not be empty") | |
| elif not ( | |
| isinstance(inverted_residual_setting, Sequence) | |
| and all([isinstance(s, _MBConvConfig) for s in inverted_residual_setting]) | |
| ): | |
| raise TypeError( | |
| "The inverted_residual_setting should be List[MBConvConfig]" | |
| ) | |
| if "block" in kwargs: | |
| warnings.warn( | |
| "The parameter 'block' is deprecated since 0.13 and will be removed 0.15. " | |
| "Please pass this information on 'MBConvConfig.block' instead." | |
| ) | |
| if kwargs["block"] is not None: | |
| for s in inverted_residual_setting: | |
| if isinstance(s, MBConvConfig): | |
| s.block = kwargs["block"] | |
| if norm_layer is None: | |
| norm_layer = nn.BatchNorm2d | |
| layers: List[nn.Module] = [] | |
| # building first layer | |
| firstconv_output_channels = inverted_residual_setting[0].input_channels | |
| layers.append( | |
| Conv2dNormActivation( | |
| num_channels, | |
| firstconv_output_channels, | |
| kernel_size=3, | |
| stride=2, | |
| norm_layer=norm_layer, | |
| activation_layer=nn.SiLU, | |
| ) | |
| ) | |
| # building inverted residual blocks | |
| total_stage_blocks = sum(cnf.num_layers for cnf in inverted_residual_setting) | |
| stage_block_id = 0 | |
| for cnf in inverted_residual_setting: | |
| stage: List[nn.Module] = [] | |
| for _ in range(cnf.num_layers): | |
| # copy to avoid modifications. shallow copy is enough | |
| block_cnf = copy.copy(cnf) | |
| # overwrite info if not the first conv in the stage | |
| if stage: | |
| block_cnf.input_channels = block_cnf.out_channels | |
| block_cnf.stride = 1 | |
| # adjust stochastic depth probability based on the depth of the stage block | |
| sd_prob = ( | |
| stochastic_depth_prob * float(stage_block_id) / total_stage_blocks | |
| ) | |
| stage.append(block_cnf.block(block_cnf, sd_prob, norm_layer)) | |
| stage_block_id += 1 | |
| layers.append(nn.Sequential(*stage)) | |
| # building last several layers | |
| lastconv_input_channels = inverted_residual_setting[-1].out_channels | |
| lastconv_output_channels = ( | |
| last_channel if last_channel is not None else 4 * lastconv_input_channels | |
| ) | |
| layers.append( | |
| Conv2dNormActivation( | |
| lastconv_input_channels, | |
| lastconv_output_channels, | |
| kernel_size=1, | |
| norm_layer=norm_layer, | |
| activation_layer=nn.SiLU, | |
| ) | |
| ) | |
| self.features = nn.Sequential(*layers) | |
| self.avgpool = nn.AdaptiveAvgPool2d(1) | |
| self.classifier = nn.Sequential( | |
| nn.Dropout(p=dropout, inplace=True), | |
| nn.Linear(lastconv_output_channels, num_classes), | |
| ) | |
| for m in self.modules(): | |
| if isinstance(m, nn.Conv2d): | |
| nn.init.kaiming_normal_(m.weight, mode="fan_out") | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): | |
| nn.init.ones_(m.weight) | |
| nn.init.zeros_(m.bias) | |
| elif isinstance(m, nn.Linear): | |
| init_range = 1.0 / math.sqrt(m.out_features) | |
| nn.init.uniform_(m.weight, -init_range, init_range) | |
| nn.init.zeros_(m.bias) | |
| # super().__init__(**kwargs) | |
| def _forward_impl(self, x: Tensor) -> Tensor: | |
| x = self.features(x) | |
| x = self.avgpool(x) | |
| x = torch.flatten(x, 1) | |
| x = self.classifier(x) | |
| return x | |
| def forward(self, x: Tensor) -> Tensor: | |
| return self._forward_impl(x) | |
| # def _efficientnet( | |
| # inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]], | |
| # dropout: float, | |
| # last_channel: Optional[int], | |
| # weights=None, | |
| # num_channels: int = 61, | |
| # stochastic_depth_prob: float = 0.2, | |
| # progress: bool = True, | |
| # num_classes: int = 2, | |
| # **kwargs: Any, | |
| # ) -> EfficientNetCongig: | |
| # model = EfficientNetCongif( | |
| # inverted_residual_setting, | |
| # dropout, | |
| # num_classes=num_classes, | |
| # num_channels=num_channels, | |
| # stochastic_depth_prob=stochastic_depth_prob, | |
| # last_channel=last_channel, | |
| # **kwargs, | |
| # ) | |
| # return model | |
| def _efficientnet_conf( | |
| arch: str, | |
| **kwargs: Any, | |
| ) -> Tuple[Sequence[Union[MBConvConfig, FusedMBConvConfig]], Optional[int]]: | |
| inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]] | |
| if arch.startswith("efficientnet_b"): | |
| bneck_conf = partial( | |
| MBConvConfig, | |
| width_mult=kwargs.pop("width_mult"), | |
| depth_mult=kwargs.pop("depth_mult"), | |
| ) | |
| inverted_residual_setting = [ | |
| bneck_conf(1, 3, 1, 32, 16, 1), | |
| bneck_conf(6, 3, 2, 16, 24, 2), | |
| bneck_conf(6, 5, 2, 24, 40, 2), | |
| bneck_conf(6, 3, 2, 40, 80, 3), | |
| bneck_conf(6, 5, 1, 80, 112, 3), | |
| bneck_conf(6, 5, 2, 112, 192, 4), | |
| bneck_conf(6, 3, 1, 192, 320, 1), | |
| ] | |
| last_channel = None | |
| elif arch.startswith("efficientnet_v2_s"): | |
| inverted_residual_setting = [ | |
| FusedMBConvConfig(1, 3, 1, 24, 24, 2), | |
| FusedMBConvConfig(4, 3, 2, 24, 48, 4), | |
| FusedMBConvConfig(4, 3, 2, 48, 64, 4), | |
| MBConvConfig(4, 3, 2, 64, 128, 6), | |
| MBConvConfig(6, 3, 1, 128, 160, 9), | |
| MBConvConfig(6, 3, 2, 160, 256, 15), | |
| ] | |
| last_channel = 1280 | |
| elif arch.startswith("efficientnet_v2_m"): | |
| inverted_residual_setting = [ | |
| FusedMBConvConfig(1, 3, 1, 24, 24, 3), | |
| FusedMBConvConfig(4, 3, 2, 24, 48, 5), | |
| FusedMBConvConfig(4, 3, 2, 48, 80, 5), | |
| MBConvConfig(4, 3, 2, 80, 160, 7), | |
| MBConvConfig(6, 3, 1, 160, 176, 14), | |
| MBConvConfig(6, 3, 2, 176, 304, 18), | |
| MBConvConfig(6, 3, 1, 304, 512, 5), | |
| ] | |
| last_channel = 1280 | |
| elif arch.startswith("efficientnet_v2_l"): | |
| inverted_residual_setting = [ | |
| FusedMBConvConfig(1, 3, 1, 32, 32, 4), | |
| FusedMBConvConfig(4, 3, 2, 32, 64, 7), | |
| FusedMBConvConfig(4, 3, 2, 64, 96, 7), | |
| MBConvConfig(4, 3, 2, 96, 192, 10), | |
| MBConvConfig(6, 3, 1, 192, 224, 19), | |
| MBConvConfig(6, 3, 2, 224, 384, 25), | |
| MBConvConfig(6, 3, 1, 384, 640, 7), | |
| ] | |
| last_channel = 1280 | |
| else: | |
| raise ValueError(f"Unsupported model type {arch}") | |
| return inverted_residual_setting, last_channel | |
| # def create_an_efficientnet( | |
| # num_channels: int = 61, | |
| # size: str = "v2_s", | |
| # width_mult: float = 1.0, | |
| # depth_mult: float = 1.0, | |
| # dropout: float = 0.25, | |
| # stochastic_depth_prob: float = 0.2, | |
| # num_classes: int = 2, | |
| # **kwargs, | |
| # ): | |
| # """Makes an EfficientNet of a given size and set of parameters""" | |
| # inverted_residual_setting, last_channel = _efficientnet_conf( | |
| # "efficientnet_%s" % (size), width_mult=width_mult, depth_mult=depth_mult | |
| # ) | |
| # model = _efficientnet( | |
| # inverted_residual_setting, | |
| # dropout, | |
| # last_channel, | |
| # weights=None, | |
| # num_classes=num_classes, | |
| # num_channels=num_channels, | |
| # stochastic_depth_prob=stochastic_depth_prob, | |
| # progress=True, | |
| # **kwargs, | |
| # ) | |
| # return model | |