Spaces:
Build error
Build error
| """Normality model of DFKDE.""" | |
| # Copyright (C) 2020 Intel Corporation | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, | |
| # software distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions | |
| # and limitations under the License. | |
| import logging | |
| import random | |
| from typing import List, Optional, Tuple | |
| import torch | |
| import torchvision | |
| from torch import Tensor, nn | |
| from anomalib.models.components import PCA, FeatureExtractor, GaussianKDE | |
| logger = logging.getLogger(__name__) | |
| class DfkdeModel(nn.Module): | |
| """Normality Model for the DFKDE algorithm. | |
| Args: | |
| backbone (str): Pre-trained model backbone. | |
| n_comps (int, optional): Number of PCA components. Defaults to 16. | |
| pre_processing (str, optional): Preprocess features before passing to KDE. | |
| Options are between `norm` and `scale`. Defaults to "scale". | |
| filter_count (int, optional): Number of training points to fit the KDE model. Defaults to 40000. | |
| threshold_steepness (float, optional): Controls how quickly the value saturates around zero. Defaults to 0.05. | |
| threshold_offset (float, optional): Offset of the density function from 0. Defaults to 12.0. | |
| """ | |
| def __init__( | |
| self, | |
| backbone: str, | |
| n_comps: int = 16, | |
| pre_processing: str = "scale", | |
| filter_count: int = 40000, | |
| threshold_steepness: float = 0.05, | |
| threshold_offset: float = 12.0, | |
| ): | |
| super().__init__() | |
| self.n_components = n_comps | |
| self.pre_processing = pre_processing | |
| self.filter_count = filter_count | |
| self.threshold_steepness = threshold_steepness | |
| self.threshold_offset = threshold_offset | |
| _backbone = getattr(torchvision.models, backbone) | |
| self.feature_extractor = FeatureExtractor(backbone=_backbone(pretrained=True), layers=["avgpool"]).eval() | |
| self.pca_model = PCA(n_components=self.n_components) | |
| self.kde_model = GaussianKDE() | |
| self.register_buffer("max_length", Tensor(torch.Size([]))) | |
| self.max_length = Tensor(torch.Size([])) | |
| def get_features(self, batch: Tensor) -> Tensor: | |
| """Extract features from the pretrained network. | |
| Args: | |
| batch (Tensor): Image batch. | |
| Returns: | |
| Tensor: Tensor containing extracted features. | |
| """ | |
| self.feature_extractor.eval() | |
| layer_outputs = self.feature_extractor(batch) | |
| layer_outputs = torch.cat(list(layer_outputs.values())).detach() | |
| return layer_outputs | |
| def fit(self, embeddings: List[Tensor]) -> bool: | |
| """Fit a kde model to embeddings. | |
| Args: | |
| embeddings (Tensor): Input embeddings to fit the model. | |
| Returns: | |
| Boolean confirming whether the training is successful. | |
| """ | |
| _embeddings = torch.vstack(embeddings) | |
| if _embeddings.shape[0] < self.n_components: | |
| logger.info("Not enough features to commit. Not making a model.") | |
| return False | |
| # if max training points is non-zero and smaller than number of staged features, select random subset | |
| if self.filter_count and _embeddings.shape[0] > self.filter_count: | |
| # pylint: disable=not-callable | |
| selected_idx = torch.tensor(random.sample(range(_embeddings.shape[0]), self.filter_count)) | |
| selected_features = _embeddings[selected_idx] | |
| else: | |
| selected_features = _embeddings | |
| feature_stack = self.pca_model.fit_transform(selected_features) | |
| feature_stack, max_length = self.preprocess(feature_stack) | |
| self.max_length = max_length | |
| self.kde_model.fit(feature_stack) | |
| return True | |
| def preprocess(self, feature_stack: Tensor, max_length: Optional[Tensor] = None) -> Tuple[Tensor, Tensor]: | |
| """Pre-process the CNN features. | |
| Args: | |
| feature_stack (Tensor): Features extracted from CNN | |
| max_length (Optional[Tensor]): Used to unit normalize the feature_stack vector. If ``max_len`` is not | |
| provided, the length is calculated from the ``feature_stack``. Defaults to None. | |
| Returns: | |
| (Tuple): Stacked features and length | |
| """ | |
| if max_length is None: | |
| max_length = torch.max(torch.linalg.norm(feature_stack, ord=2, dim=1)) | |
| if self.pre_processing == "norm": | |
| feature_stack /= torch.linalg.norm(feature_stack, ord=2, dim=1)[:, None] | |
| elif self.pre_processing == "scale": | |
| feature_stack /= max_length | |
| else: | |
| raise RuntimeError("Unknown pre-processing mode. Available modes are: Normalized and Scale.") | |
| return feature_stack, max_length | |
| def evaluate(self, features: Tensor, as_log_likelihood: Optional[bool] = False) -> Tensor: | |
| """Compute the KDE scores. | |
| The scores calculated from the KDE model are converted to densities. If `as_log_likelihood` is set to true then | |
| the log of the scores are calculated. | |
| Args: | |
| features (Tensor): Features to which the PCA model is fit. | |
| as_log_likelihood (Optional[bool], optional): If true, gets log likelihood scores. Defaults to False. | |
| Returns: | |
| (Tensor): Score | |
| """ | |
| features = self.pca_model.transform(features) | |
| features, _ = self.preprocess(features, self.max_length) | |
| # Scores are always assumed to be passed as a density | |
| kde_scores = self.kde_model(features) | |
| # add small constant to avoid zero division in log computation | |
| kde_scores += 1e-300 | |
| if as_log_likelihood: | |
| kde_scores = torch.log(kde_scores) | |
| return kde_scores | |
| def predict(self, features: Tensor) -> Tensor: | |
| """Predicts the probability that the features belong to the anomalous class. | |
| Args: | |
| features (Tensor): Feature from which the output probabilities are detected. | |
| Returns: | |
| Detection probabilities | |
| """ | |
| densities = self.evaluate(features, as_log_likelihood=True) | |
| probabilities = self.to_probability(densities) | |
| return probabilities | |
| def to_probability(self, densities: Tensor) -> Tensor: | |
| """Converts density scores to anomaly probabilities (see https://www.desmos.com/calculator/ifju7eesg7). | |
| Args: | |
| densities (Tensor): density of an image. | |
| Returns: | |
| probability that image with {density} is anomalous | |
| """ | |
| return 1 / (1 + torch.exp(self.threshold_steepness * (densities - self.threshold_offset))) | |
| def forward(self, batch: Tensor) -> Tensor: | |
| """Prediction by normality model. | |
| Args: | |
| batch (Tensor): Input images. | |
| Returns: | |
| Tensor: Predictions | |
| """ | |
| feature_vector = self.get_features(batch) | |
| return self.predict(feature_vector.view(feature_vector.shape[:2])) | |