| | |
| | import logging |
| | from typing import List, Optional |
| |
|
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | from mmengine.logging import print_log |
| | from torch import Tensor |
| |
|
| | from mmseg.registry import MODELS |
| | from mmseg.utils import ( |
| | ConfigType, |
| | OptConfigType, |
| | OptMultiConfig, |
| | OptSampleList, |
| | SampleList, |
| | add_prefix, |
| | ) |
| | from mmseg.models import BaseSegmentor |
| |
|
| |
|
| | @MODELS.register_module() |
| | class DistillEncoderDecoder(BaseSegmentor): |
| |
|
| | def __init__( |
| | self, |
| | backbone: ConfigType, |
| | teach_backbone: ConfigType, |
| | decode_head: ConfigType, |
| | neck: OptConfigType = None, |
| | auxiliary_head: OptConfigType = None, |
| | fam: OptConfigType = None, |
| | fmm: OptConfigType = None, |
| | train_cfg: OptConfigType = None, |
| | test_cfg: OptConfigType = None, |
| | data_preprocessor: OptConfigType = None, |
| | pretrained: Optional[str] = None, |
| | student_training=True, |
| | temperature=1.0, |
| | alpha=0.5, |
| | fuse=False, |
| | init_cfg: OptMultiConfig = None, |
| | ): |
| | super().__init__(data_preprocessor=data_preprocessor, init_cfg=init_cfg) |
| |
|
| | self.temperature = temperature |
| | self.alpha = alpha |
| | self.student_training = student_training |
| | self.fuse = fuse |
| |
|
| | if pretrained is not None: |
| | assert ( |
| | backbone.get("pretrained") is None |
| | ), "both backbone and segmentor set pretrained weight" |
| | assert ( |
| | teach_backbone.get("pretrained") is None |
| | ), "both teach backbone and segmentor set pretrained weight" |
| | backbone.pretrained = pretrained |
| | teach_backbone.pretrained = pretrained |
| | self.backbone = MODELS.build(backbone) |
| | self.teach_backbone = MODELS.build(teach_backbone) |
| | if neck is not None: |
| | self.neck = MODELS.build(neck) |
| |
|
| | self.fam = nn.Identity() |
| | self.fmm = nn.Identity() |
| | if fam is not None: |
| | self.fam = MODELS.build(fam) |
| | if fmm is not None: |
| | self.fmm = MODELS.build(fmm) |
| | self._init_decode_head(decode_head) |
| | self._init_auxiliary_head(auxiliary_head) |
| |
|
| | self.train_cfg = train_cfg |
| | self.test_cfg = test_cfg |
| |
|
| | assert self.with_decode_head |
| |
|
| | def _init_decode_head(self, decode_head: ConfigType) -> None: |
| | """Initialize ``decode_head``""" |
| | self.decode_head = MODELS.build(decode_head) |
| | self.align_corners = self.decode_head.align_corners |
| | self.num_classes = self.decode_head.num_classes |
| | self.out_channels = self.decode_head.out_channels |
| |
|
| | def _init_auxiliary_head(self, auxiliary_head: ConfigType) -> None: |
| | """Initialize ``auxiliary_head``""" |
| | if auxiliary_head is not None: |
| | if isinstance(auxiliary_head, list): |
| | self.auxiliary_head = nn.ModuleList() |
| | for head_cfg in auxiliary_head: |
| | self.auxiliary_head.append(MODELS.build(head_cfg)) |
| | else: |
| | self.auxiliary_head = MODELS.build(auxiliary_head) |
| |
|
| | def fuse_features(self,features): |
| | x = features[0] |
| | for index,feature in enumerate(features): |
| | if index == 0: |
| | continue |
| | x += feature |
| | x = [x] |
| | return tuple(x) |
| |
|
| | def extract_feat(self, inputs: Tensor) -> List[Tensor]: |
| | """Extract features from images.""" |
| | x = self.backbone(inputs) |
| | x = self.fam(x) |
| | if self.fuse: |
| | x = self.fuse_features(x) |
| | if self.with_neck: |
| | x = self.neck(x) |
| | x = self.fmm(x) |
| | return x |
| |
|
| | def encode_decode(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: |
| | """Encode images with backbone and decode into a semantic segmentation |
| | map of the same size as input.""" |
| | x = self.extract_feat(inputs) |
| | seg_logits = self.decode_head.predict(x, batch_img_metas, self.test_cfg) |
| |
|
| | return seg_logits |
| |
|
| | def _decode_head_forward_train( |
| | self, inputs: List[Tensor], data_samples: SampleList |
| | ) -> dict: |
| | """Run forward function and calculate loss for decode head in |
| | training.""" |
| | losses = dict() |
| | loss_decode = self.decode_head.loss(inputs, data_samples, self.train_cfg) |
| |
|
| | losses.update(add_prefix(loss_decode, "decode")) |
| | return losses |
| |
|
| | def _auxiliary_head_forward_train( |
| | self, inputs: List[Tensor], data_samples: SampleList |
| | ) -> dict: |
| | """Run forward function and calculate loss for auxiliary head in |
| | training.""" |
| | losses = dict() |
| | if isinstance(self.auxiliary_head, nn.ModuleList): |
| | for idx, aux_head in enumerate(self.auxiliary_head): |
| | loss_aux = aux_head.loss(inputs, data_samples, self.train_cfg) |
| | for key in loss_aux.keys(): |
| | loss_aux[key] = loss_aux[key] / len(self.auxiliary_head) |
| | losses.update(add_prefix(loss_aux, f"aux_{idx}")) |
| | else: |
| | loss_aux = self.auxiliary_head.loss(inputs, data_samples, self.train_cfg) |
| | losses.update(add_prefix(loss_aux, "aux")) |
| |
|
| | return losses |
| |
|
| | def calculate_diltill_loss(self, inputs): |
| | student_feats = self.backbone(inputs) |
| | student_feats = self.fam(student_feats) |
| | teach_feats = self.teach_backbone(inputs) |
| |
|
| | if self.fuse: |
| | student_feats = self.fuse_features(student_feats) |
| | teach_feats = self.fuse_features(teach_feats) |
| |
|
| | total_loss = 0.0 |
| | for student_feat, teach_feat in zip(student_feats, teach_feats): |
| | student_prob = F.softmax(student_feat / self.temperature, dim=-1) |
| | teach_prob = F.softmax(teach_feat / self.temperature, dim=-1) |
| | kl_loss = F.kl_div( |
| | student_prob.log(), teach_prob, reduction="batchmean" |
| | ) * (self.temperature**2) |
| | mse_loss = F.mse_loss(student_feat, teach_feat, reduction="mean") |
| | loss = self.alpha * kl_loss + (1 - self.alpha) * mse_loss |
| | total_loss += loss |
| |
|
| | avg_loss = total_loss / len(student_feats) |
| | if self.alpha == 0: |
| | avg_loss = avg_loss * 0.5 |
| | return avg_loss |
| |
|
| | def loss(self, inputs: Tensor, data_samples: SampleList) -> dict: |
| | """Calculate losses from a batch of inputs and data samples. |
| | |
| | Args: |
| | inputs (Tensor): Input images. |
| | data_samples (list[:obj:`SegDataSample`]): The seg data samples. |
| | It usually includes information such as `metainfo` and |
| | `gt_sem_seg`. |
| | |
| | Returns: |
| | dict[str, Tensor]: a dictionary of loss components |
| | """ |
| |
|
| | x = self.extract_feat(inputs) |
| |
|
| | losses = dict() |
| |
|
| | loss_decode = self._decode_head_forward_train(x, data_samples) |
| | losses.update(loss_decode) |
| | if self.student_training: |
| | losses["distill_loss"] = self.calculate_diltill_loss(inputs) |
| | if self.with_auxiliary_head: |
| | loss_aux = self._auxiliary_head_forward_train(x, data_samples) |
| | losses.update(loss_aux) |
| |
|
| | return losses |
| |
|
| | def predict(self, inputs: Tensor, data_samples: OptSampleList = None) -> SampleList: |
| | """Predict results from a batch of inputs and data samples with post- |
| | processing. |
| | |
| | Args: |
| | inputs (Tensor): Inputs with shape (N, C, H, W). |
| | data_samples (List[:obj:`SegDataSample`], optional): The seg data |
| | samples. It usually includes information such as `metainfo` |
| | and `gt_sem_seg`. |
| | |
| | Returns: |
| | list[:obj:`SegDataSample`]: Segmentation results of the |
| | input images. Each SegDataSample usually contain: |
| | |
| | - ``pred_sem_seg``(PixelData): Prediction of semantic segmentation. |
| | - ``seg_logits``(PixelData): Predicted logits of semantic |
| | segmentation before normalization. |
| | """ |
| | if data_samples is not None: |
| | batch_img_metas = [data_sample.metainfo for data_sample in data_samples] |
| | else: |
| | batch_img_metas = [ |
| | dict( |
| | ori_shape=inputs.shape[2:], |
| | img_shape=inputs.shape[2:], |
| | pad_shape=inputs.shape[2:], |
| | padding_size=[0, 0, 0, 0], |
| | ) |
| | ] * inputs.shape[0] |
| |
|
| | seg_logits = self.inference(inputs, batch_img_metas) |
| |
|
| | return self.postprocess_result(seg_logits, data_samples) |
| |
|
| | def _forward(self, inputs: Tensor, data_samples: OptSampleList = None) -> Tensor: |
| | """Network forward process. |
| | |
| | Args: |
| | inputs (Tensor): Inputs with shape (N, C, H, W). |
| | data_samples (List[:obj:`SegDataSample`]): The seg |
| | data samples. It usually includes information such |
| | as `metainfo` and `gt_sem_seg`. |
| | |
| | Returns: |
| | Tensor: Forward output of model without any post-processes. |
| | """ |
| | x = self.extract_feat(inputs) |
| | return self.decode_head.forward(x) |
| |
|
| | def slide_inference(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: |
| | """Inference by sliding-window with overlap. |
| | |
| | If h_crop > h_img or w_crop > w_img, the small patch will be used to |
| | decode without padding. |
| | |
| | Args: |
| | inputs (tensor): the tensor should have a shape NxCxHxW, |
| | which contains all images in the batch. |
| | batch_img_metas (List[dict]): List of image metainfo where each may |
| | also contain: 'img_shape', 'scale_factor', 'flip', 'img_path', |
| | 'ori_shape', and 'pad_shape'. |
| | For details on the values of these keys see |
| | `mmseg/datasets/pipelines/formatting.py:PackSegInputs`. |
| | |
| | Returns: |
| | Tensor: The segmentation results, seg_logits from model of each |
| | input image. |
| | """ |
| |
|
| | h_stride, w_stride = self.test_cfg.stride |
| | h_crop, w_crop = self.test_cfg.crop_size |
| | batch_size, _, h_img, w_img = inputs.size() |
| | out_channels = self.out_channels |
| | h_grids = max(h_img - h_crop + h_stride - 1, 0) // h_stride + 1 |
| | w_grids = max(w_img - w_crop + w_stride - 1, 0) // w_stride + 1 |
| | preds = inputs.new_zeros((batch_size, out_channels, h_img, w_img)) |
| | count_mat = inputs.new_zeros((batch_size, 1, h_img, w_img)) |
| | for h_idx in range(h_grids): |
| | for w_idx in range(w_grids): |
| | y1 = h_idx * h_stride |
| | x1 = w_idx * w_stride |
| | y2 = min(y1 + h_crop, h_img) |
| | x2 = min(x1 + w_crop, w_img) |
| | y1 = max(y2 - h_crop, 0) |
| | x1 = max(x2 - w_crop, 0) |
| | crop_img = inputs[:, :, y1:y2, x1:x2] |
| | |
| | batch_img_metas[0]["img_shape"] = crop_img.shape[2:] |
| | |
| | |
| | crop_seg_logit = self.encode_decode(crop_img, batch_img_metas) |
| | preds += F.pad( |
| | crop_seg_logit, |
| | ( |
| | int(x1), |
| | int(preds.shape[3] - x2), |
| | int(y1), |
| | int(preds.shape[2] - y2), |
| | ), |
| | ) |
| |
|
| | count_mat[:, :, y1:y2, x1:x2] += 1 |
| | assert (count_mat == 0).sum() == 0 |
| | seg_logits = preds / count_mat |
| |
|
| | return seg_logits |
| |
|
| | def whole_inference(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: |
| | """Inference with full image. |
| | |
| | Args: |
| | inputs (Tensor): The tensor should have a shape NxCxHxW, which |
| | contains all images in the batch. |
| | batch_img_metas (List[dict]): List of image metainfo where each may |
| | also contain: 'img_shape', 'scale_factor', 'flip', 'img_path', |
| | 'ori_shape', and 'pad_shape'. |
| | For details on the values of these keys see |
| | `mmseg/datasets/pipelines/formatting.py:PackSegInputs`. |
| | |
| | Returns: |
| | Tensor: The segmentation results, seg_logits from model of each |
| | input image. |
| | """ |
| |
|
| | seg_logits = self.encode_decode(inputs, batch_img_metas) |
| |
|
| | return seg_logits |
| |
|
| | def inference(self, inputs: Tensor, batch_img_metas: List[dict]) -> Tensor: |
| | """Inference with slide/whole style. |
| | |
| | Args: |
| | inputs (Tensor): The input image of shape (N, 3, H, W). |
| | batch_img_metas (List[dict]): List of image metainfo where each may |
| | also contain: 'img_shape', 'scale_factor', 'flip', 'img_path', |
| | 'ori_shape', 'pad_shape', and 'padding_size'. |
| | For details on the values of these keys see |
| | `mmseg/datasets/pipelines/formatting.py:PackSegInputs`. |
| | |
| | Returns: |
| | Tensor: The segmentation results, seg_logits from model of each |
| | input image. |
| | """ |
| | assert self.test_cfg.get("mode", "whole") in ["slide", "whole"], ( |
| | f'Only "slide" or "whole" test mode are supported, but got ' |
| | f'{self.test_cfg["mode"]}.' |
| | ) |
| | ori_shape = batch_img_metas[0]["ori_shape"] |
| | if not all(_["ori_shape"] == ori_shape for _ in batch_img_metas): |
| | print_log( |
| | "Image shapes are different in the batch.", |
| | logger="current", |
| | level=logging.WARN, |
| | ) |
| | if self.test_cfg.mode == "slide": |
| | seg_logit = self.slide_inference(inputs, batch_img_metas) |
| | else: |
| | seg_logit = self.whole_inference(inputs, batch_img_metas) |
| |
|
| | return seg_logit |
| |
|
| | def aug_test(self, inputs, batch_img_metas, rescale=True): |
| | """Test with augmentations. |
| | |
| | Only rescale=True is supported. |
| | """ |
| | |
| | assert rescale |
| | |
| | seg_logit = self.inference(inputs[0], batch_img_metas[0], rescale) |
| | for i in range(1, len(inputs)): |
| | cur_seg_logit = self.inference(inputs[i], batch_img_metas[i], rescale) |
| | seg_logit += cur_seg_logit |
| | seg_logit /= len(inputs) |
| | seg_pred = seg_logit.argmax(dim=1) |
| | |
| | seg_pred = list(seg_pred) |
| | return seg_pred |
| |
|