Spaces:
Build error
Build error
| """Custom Folder Dataset. | |
| This script creates a custom dataset from a folder. | |
| """ | |
| # Copyright (C) 2020 Intel Corporation | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, | |
| # software distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions | |
| # and limitations under the License. | |
| import logging | |
| import warnings | |
| from pathlib import Path | |
| from typing import Dict, Optional, Tuple, Union | |
| import albumentations as A | |
| import cv2 | |
| import numpy as np | |
| from pandas.core.frame import DataFrame | |
| from pytorch_lightning.core.datamodule import LightningDataModule | |
| from pytorch_lightning.utilities.types import EVAL_DATALOADERS, TRAIN_DATALOADERS | |
| from torch import Tensor | |
| from torch.utils.data import DataLoader, Dataset | |
| from torchvision.datasets.folder import IMG_EXTENSIONS | |
| from anomalib.data.inference import InferenceDataset | |
| from anomalib.data.utils import read_image | |
| from anomalib.data.utils.split import ( | |
| create_validation_set_from_test_set, | |
| split_normal_images_in_train_set, | |
| ) | |
| from anomalib.pre_processing import PreProcessor | |
| logger = logging.getLogger(__name__) | |
| def _check_and_convert_path(path: Union[str, Path]) -> Path: | |
| """Check an input path, and convert to Pathlib object. | |
| Args: | |
| path (Union[str, Path]): Input path. | |
| Returns: | |
| Path: Output path converted to pathlib object. | |
| """ | |
| if not isinstance(path, Path): | |
| path = Path(path) | |
| return path | |
| def _prepare_files_labels( | |
| path: Union[str, Path], path_type: str, extensions: Optional[Tuple[str, ...]] = None | |
| ) -> Tuple[list, list]: | |
| """Return a list of filenames and list corresponding labels. | |
| Args: | |
| path (Union[str, Path]): Path to the directory containing images. | |
| path_type (str): Type of images in the provided path ("normal", "abnormal", "normal_test") | |
| extensions (Optional[Tuple[str, ...]], optional): Type of the image extensions to read from the | |
| directory. | |
| Returns: | |
| List, List: Filenames of the images provided in the paths, labels of the images provided in the paths | |
| """ | |
| path = _check_and_convert_path(path) | |
| if extensions is None: | |
| extensions = IMG_EXTENSIONS | |
| if isinstance(extensions, str): | |
| extensions = (extensions,) | |
| filenames = [f for f in path.glob(r"**/*") if f.suffix in extensions and not f.is_dir()] | |
| if len(filenames) == 0: | |
| raise RuntimeError(f"Found 0 {path_type} images in {path}") | |
| labels = [path_type] * len(filenames) | |
| return filenames, labels | |
| def make_dataset( | |
| normal_dir: Union[str, Path], | |
| abnormal_dir: Union[str, Path], | |
| normal_test_dir: Optional[Union[str, Path]] = None, | |
| mask_dir: Optional[Union[str, Path]] = None, | |
| split: Optional[str] = None, | |
| split_ratio: float = 0.2, | |
| seed: int = 0, | |
| create_validation_set: bool = True, | |
| extensions: Optional[Tuple[str, ...]] = None, | |
| ): | |
| """Make Folder Dataset. | |
| Args: | |
| normal_dir (Union[str, Path]): Path to the directory containing normal images. | |
| abnormal_dir (Union[str, Path]): Path to the directory containing abnormal images. | |
| normal_test_dir (Optional[Union[str, Path]], optional): Path to the directory containing | |
| normal images for the test dataset. Normal test images will be a split of `normal_dir` | |
| if `None`. Defaults to None. | |
| mask_dir (Optional[Union[str, Path]], optional): Path to the directory containing | |
| the mask annotations. Defaults to None. | |
| split (Optional[str], optional): Dataset split (ie., either train or test). Defaults to None. | |
| split_ratio (float, optional): Ratio to split normal training images and add to the | |
| test set in case test set doesn't contain any normal images. | |
| Defaults to 0.2. | |
| seed (int, optional): Random seed to ensure reproducibility when splitting. Defaults to 0. | |
| create_validation_set (bool, optional):Boolean to create a validation set from the test set. | |
| Those wanting to create a validation set could set this flag to ``True``. | |
| extensions (Optional[Tuple[str, ...]], optional): Type of the image extensions to read from the | |
| directory. | |
| Returns: | |
| DataFrame: an output dataframe containing samples for the requested split (ie., train or test) | |
| """ | |
| filenames = [] | |
| labels = [] | |
| dirs = {"normal": normal_dir, "abnormal": abnormal_dir} | |
| if normal_test_dir: | |
| dirs = {**dirs, **{"normal_test": normal_test_dir}} | |
| for dir_type, path in dirs.items(): | |
| filename, label = _prepare_files_labels(path, dir_type, extensions) | |
| filenames += filename | |
| labels += label | |
| samples = DataFrame({"image_path": filenames, "label": labels}) | |
| # Create label index for normal (0) and abnormal (1) images. | |
| samples.loc[(samples.label == "normal") | (samples.label == "normal_test"), "label_index"] = 0 | |
| samples.loc[(samples.label == "abnormal"), "label_index"] = 1 | |
| samples.label_index = samples.label_index.astype(int) | |
| # If a path to mask is provided, add it to the sample dataframe. | |
| if mask_dir is not None: | |
| mask_dir = _check_and_convert_path(mask_dir) | |
| samples["mask_path"] = "" | |
| for index, row in samples.iterrows(): | |
| if row.label_index == 1: | |
| samples.loc[index, "mask_path"] = str(mask_dir / row.image_path.name) | |
| # Ensure the pathlib objects are converted to str. | |
| # This is because torch dataloader doesn't like pathlib. | |
| samples = samples.astype({"image_path": "str"}) | |
| # Create train/test split. | |
| # By default, all the normal samples are assigned as train. | |
| # and all the abnormal samples are test. | |
| samples.loc[(samples.label == "normal"), "split"] = "train" | |
| samples.loc[(samples.label == "abnormal") | (samples.label == "normal_test"), "split"] = "test" | |
| if not normal_test_dir: | |
| samples = split_normal_images_in_train_set( | |
| samples=samples, split_ratio=split_ratio, seed=seed, normal_label="normal" | |
| ) | |
| # If `create_validation_set` is set to True, the test set is split into half. | |
| if create_validation_set: | |
| samples = create_validation_set_from_test_set(samples, seed=seed, normal_label="normal") | |
| # Get the data frame for the split. | |
| if split is not None and split in ["train", "val", "test"]: | |
| samples = samples[samples.split == split] | |
| samples = samples.reset_index(drop=True) | |
| return samples | |
| class FolderDataset(Dataset): | |
| """Folder Dataset.""" | |
| def __init__( | |
| self, | |
| normal_dir: Union[Path, str], | |
| abnormal_dir: Union[Path, str], | |
| split: str, | |
| pre_process: PreProcessor, | |
| normal_test_dir: Optional[Union[Path, str]] = None, | |
| split_ratio: float = 0.2, | |
| mask_dir: Optional[Union[Path, str]] = None, | |
| extensions: Optional[Tuple[str, ...]] = None, | |
| task: Optional[str] = None, | |
| seed: int = 0, | |
| create_validation_set: bool = False, | |
| ) -> None: | |
| """Create Folder Folder Dataset. | |
| Args: | |
| normal_dir (Union[str, Path]): Path to the directory containing normal images. | |
| abnormal_dir (Union[str, Path]): Path to the directory containing abnormal images. | |
| split (Optional[str], optional): Dataset split (ie., either train or test). Defaults to None. | |
| pre_process (Optional[PreProcessor], optional): Image Pro-processor to apply transform. | |
| Defaults to None. | |
| normal_test_dir (Optional[Union[str, Path]], optional): Path to the directory containing | |
| normal images for the test dataset. Defaults to None. | |
| split_ratio (float, optional): Ratio to split normal training images and add to the | |
| test set in case test set doesn't contain any normal images. | |
| Defaults to 0.2. | |
| mask_dir (Optional[Union[str, Path]], optional): Path to the directory containing | |
| the mask annotations. Defaults to None. | |
| extensions (Optional[Tuple[str, ...]], optional): Type of the image extensions to read from the | |
| directory. | |
| task (Optional[str], optional): Task type. (classification or segmentation) Defaults to None. | |
| seed (int, optional): Random seed to ensure reproducibility when splitting. Defaults to 0. | |
| create_validation_set (bool, optional):Boolean to create a validation set from the test set. | |
| Those wanting to create a validation set could set this flag to ``True``. | |
| Raises: | |
| ValueError: When task is set to classification and `mask_dir` is provided. When `mask_dir` is | |
| provided, `task` should be set to `segmentation`. | |
| """ | |
| self.split = split | |
| if task == "segmentation" and mask_dir is None: | |
| warnings.warn( | |
| "Segmentation task is requested, but mask directory is not provided. " | |
| "Classification is to be chosen if mask directory is not provided." | |
| ) | |
| self.task = "classification" | |
| if task == "classification" and mask_dir: | |
| warnings.warn( | |
| "Classification task is requested, but mask directory is provided. " | |
| "Segmentation task is to be chosen if mask directory is provided." | |
| ) | |
| self.task = "segmentation" | |
| if task is None or mask_dir is None: | |
| self.task = "classification" | |
| else: | |
| self.task = task | |
| self.pre_process = pre_process | |
| self.samples = make_dataset( | |
| normal_dir=normal_dir, | |
| abnormal_dir=abnormal_dir, | |
| normal_test_dir=normal_test_dir, | |
| mask_dir=mask_dir, | |
| split=split, | |
| split_ratio=split_ratio, | |
| seed=seed, | |
| create_validation_set=create_validation_set, | |
| extensions=extensions, | |
| ) | |
| def __len__(self) -> int: | |
| """Get length of the dataset.""" | |
| return len(self.samples) | |
| def __getitem__(self, index: int) -> Dict[str, Union[str, Tensor]]: | |
| """Get dataset item for the index ``index``. | |
| Args: | |
| index (int): Index to get the item. | |
| Returns: | |
| Union[Dict[str, Tensor], Dict[str, Union[str, Tensor]]]: Dict of image tensor during training. | |
| Otherwise, Dict containing image path, target path, image tensor, label and transformed bounding box. | |
| """ | |
| item: Dict[str, Union[str, Tensor]] = {} | |
| image_path = self.samples.image_path[index] | |
| image = read_image(image_path) | |
| pre_processed = self.pre_process(image=image) | |
| item = {"image": pre_processed["image"]} | |
| if self.split in ["val", "test"]: | |
| label_index = self.samples.label_index[index] | |
| item["image_path"] = image_path | |
| item["label"] = label_index | |
| if self.task == "segmentation": | |
| mask_path = self.samples.mask_path[index] | |
| # Only Anomalous (1) images has masks in MVTec AD dataset. | |
| # Therefore, create empty mask for Normal (0) images. | |
| if label_index == 0: | |
| mask = np.zeros(shape=image.shape[:2]) | |
| else: | |
| mask = cv2.imread(mask_path, flags=0) / 255.0 | |
| pre_processed = self.pre_process(image=image, mask=mask) | |
| item["mask_path"] = mask_path | |
| item["image"] = pre_processed["image"] | |
| item["mask"] = pre_processed["mask"] | |
| return item | |
| class FolderDataModule(LightningDataModule): | |
| """Folder Lightning Data Module.""" | |
| def __init__( | |
| self, | |
| root: Union[str, Path], | |
| normal_dir: str = "normal", | |
| abnormal_dir: str = "abnormal", | |
| task: str = "classification", | |
| normal_test_dir: Optional[Union[Path, str]] = None, | |
| mask_dir: Optional[Union[Path, str]] = None, | |
| extensions: Optional[Tuple[str, ...]] = None, | |
| split_ratio: float = 0.2, | |
| seed: int = 0, | |
| image_size: Optional[Union[int, Tuple[int, int]]] = None, | |
| train_batch_size: int = 32, | |
| test_batch_size: int = 32, | |
| num_workers: int = 8, | |
| transform_config_train: Optional[Union[str, A.Compose]] = None, | |
| transform_config_val: Optional[Union[str, A.Compose]] = None, | |
| create_validation_set: bool = False, | |
| ) -> None: | |
| """Folder Dataset PL Datamodule. | |
| Args: | |
| root (Union[str, Path]): Path to the root folder containing normal and abnormal dirs. | |
| normal_dir (str, optional): Name of the directory containing normal images. | |
| Defaults to "normal". | |
| abnormal_dir (str, optional): Name of the directory containing abnormal images. | |
| Defaults to "abnormal". | |
| task (str, optional): Task type. Could be either classification or segmentation. | |
| Defaults to "classification". | |
| normal_test_dir (Optional[Union[str, Path]], optional): Path to the directory containing | |
| normal images for the test dataset. Defaults to None. | |
| mask_dir (Optional[Union[str, Path]], optional): Path to the directory containing | |
| the mask annotations. Defaults to None. | |
| extensions (Optional[Tuple[str, ...]], optional): Type of the image extensions to read from the | |
| directory. Defaults to None. | |
| split_ratio (float, optional): Ratio to split normal training images and add to the | |
| test set in case test set doesn't contain any normal images. | |
| Defaults to 0.2. | |
| seed (int, optional): Random seed to ensure reproducibility when splitting. Defaults to 0. | |
| image_size (Optional[Union[int, Tuple[int, int]]], optional): Size of the input image. | |
| Defaults to None. | |
| train_batch_size (int, optional): Training batch size. Defaults to 32. | |
| test_batch_size (int, optional): Test batch size. Defaults to 32. | |
| num_workers (int, optional): Number of workers. Defaults to 8. | |
| transform_config_train (Optional[Union[str, A.Compose]], optional): Config for pre-processing | |
| during training. | |
| Defaults to None. | |
| transform_config_val (Optional[Union[str, A.Compose]], optional): Config for pre-processing | |
| during validation. | |
| Defaults to None. | |
| create_validation_set (bool, optional):Boolean to create a validation set from the test set. | |
| Those wanting to create a validation set could set this flag to ``True``. | |
| Examples: | |
| Assume that we use Folder Dataset for the MVTec/bottle/broken_large category. We would do: | |
| >>> from anomalib.data import FolderDataModule | |
| >>> datamodule = FolderDataModule( | |
| ... root="./datasets/MVTec/bottle/test", | |
| ... normal="good", | |
| ... abnormal="broken_large", | |
| ... image_size=256 | |
| ... ) | |
| >>> datamodule.setup() | |
| >>> i, data = next(enumerate(datamodule.train_dataloader())) | |
| >>> data["image"].shape | |
| torch.Size([16, 3, 256, 256]) | |
| >>> i, test_data = next(enumerate(datamodule.test_dataloader())) | |
| >>> test_data.keys() | |
| dict_keys(['image']) | |
| We could also create a Folder DataModule for datasets containing mask annotations. | |
| The dataset expects that mask annotation filenames must be same as the original filename. | |
| To this end, we modified mask filenames in MVTec AD bottle category. | |
| Now we could try folder data module using the mvtec bottle broken large category | |
| >>> datamodule = FolderDataModule( | |
| ... root="./datasets/bottle/test", | |
| ... normal="good", | |
| ... abnormal="broken_large", | |
| ... mask_dir="./datasets/bottle/ground_truth/broken_large", | |
| ... image_size=256 | |
| ... ) | |
| >>> i , train_data = next(enumerate(datamodule.train_dataloader())) | |
| >>> train_data.keys() | |
| dict_keys(['image']) | |
| >>> train_data["image"].shape | |
| torch.Size([16, 3, 256, 256]) | |
| >>> i, test_data = next(enumerate(datamodule.test_dataloader())) | |
| dict_keys(['image_path', 'label', 'mask_path', 'image', 'mask']) | |
| >>> print(test_data["image"].shape, test_data["mask"].shape) | |
| torch.Size([24, 3, 256, 256]) torch.Size([24, 256, 256]) | |
| By default, Folder Data Module does not create a validation set. If a validation set | |
| is needed it could be set as follows: | |
| >>> datamodule = FolderDataModule( | |
| ... root="./datasets/bottle/test", | |
| ... normal="good", | |
| ... abnormal="broken_large", | |
| ... mask_dir="./datasets/bottle/ground_truth/broken_large", | |
| ... image_size=256, | |
| ... create_validation_set=True, | |
| ... ) | |
| >>> i, val_data = next(enumerate(datamodule.val_dataloader())) | |
| >>> val_data.keys() | |
| dict_keys(['image_path', 'label', 'mask_path', 'image', 'mask']) | |
| >>> print(val_data["image"].shape, val_data["mask"].shape) | |
| torch.Size([12, 3, 256, 256]) torch.Size([12, 256, 256]) | |
| >>> i, test_data = next(enumerate(datamodule.test_dataloader())) | |
| >>> print(test_data["image"].shape, test_data["mask"].shape) | |
| torch.Size([12, 3, 256, 256]) torch.Size([12, 256, 256]) | |
| """ | |
| super().__init__() | |
| self.root = _check_and_convert_path(root) | |
| self.normal_dir = self.root / normal_dir | |
| self.abnormal_dir = self.root / abnormal_dir | |
| self.normal_test = normal_test_dir | |
| if normal_test_dir: | |
| self.normal_test = self.root / normal_test_dir | |
| self.mask_dir = mask_dir | |
| self.extensions = extensions | |
| self.split_ratio = split_ratio | |
| if task == "classification" and mask_dir is not None: | |
| raise ValueError( | |
| "Classification type is set but mask_dir provided. " | |
| "If mask_dir is provided task type must be segmentation. " | |
| "Check your configuration." | |
| ) | |
| self.task = task | |
| self.transform_config_train = transform_config_train | |
| self.transform_config_val = transform_config_val | |
| self.image_size = image_size | |
| if self.transform_config_train is not None and self.transform_config_val is None: | |
| self.transform_config_val = self.transform_config_train | |
| self.pre_process_train = PreProcessor(config=self.transform_config_train, image_size=self.image_size) | |
| self.pre_process_val = PreProcessor(config=self.transform_config_val, image_size=self.image_size) | |
| self.train_batch_size = train_batch_size | |
| self.test_batch_size = test_batch_size | |
| self.num_workers = num_workers | |
| self.create_validation_set = create_validation_set | |
| self.seed = seed | |
| self.train_data: Dataset | |
| self.test_data: Dataset | |
| if create_validation_set: | |
| self.val_data: Dataset | |
| self.inference_data: Dataset | |
| def setup(self, stage: Optional[str] = None) -> None: | |
| """Setup train, validation and test data. | |
| Args: | |
| stage: Optional[str]: Train/Val/Test stages. (Default value = None) | |
| """ | |
| logger.info("Setting up train, validation, test and prediction datasets.") | |
| if stage in (None, "fit"): | |
| self.train_data = FolderDataset( | |
| normal_dir=self.normal_dir, | |
| abnormal_dir=self.abnormal_dir, | |
| normal_test_dir=self.normal_test, | |
| split="train", | |
| split_ratio=self.split_ratio, | |
| mask_dir=self.mask_dir, | |
| pre_process=self.pre_process_train, | |
| extensions=self.extensions, | |
| task=self.task, | |
| seed=self.seed, | |
| create_validation_set=self.create_validation_set, | |
| ) | |
| if self.create_validation_set: | |
| self.val_data = FolderDataset( | |
| normal_dir=self.normal_dir, | |
| abnormal_dir=self.abnormal_dir, | |
| normal_test_dir=self.normal_test, | |
| split="val", | |
| split_ratio=self.split_ratio, | |
| mask_dir=self.mask_dir, | |
| pre_process=self.pre_process_val, | |
| extensions=self.extensions, | |
| task=self.task, | |
| seed=self.seed, | |
| create_validation_set=self.create_validation_set, | |
| ) | |
| self.test_data = FolderDataset( | |
| normal_dir=self.normal_dir, | |
| abnormal_dir=self.abnormal_dir, | |
| split="test", | |
| normal_test_dir=self.normal_test, | |
| split_ratio=self.split_ratio, | |
| mask_dir=self.mask_dir, | |
| pre_process=self.pre_process_val, | |
| extensions=self.extensions, | |
| task=self.task, | |
| seed=self.seed, | |
| create_validation_set=self.create_validation_set, | |
| ) | |
| if stage == "predict": | |
| self.inference_data = InferenceDataset( | |
| path=self.root, image_size=self.image_size, transform_config=self.transform_config_val | |
| ) | |
| def train_dataloader(self) -> TRAIN_DATALOADERS: | |
| """Get train dataloader.""" | |
| return DataLoader(self.train_data, shuffle=True, batch_size=self.train_batch_size, num_workers=self.num_workers) | |
| def val_dataloader(self) -> EVAL_DATALOADERS: | |
| """Get validation dataloader.""" | |
| dataset = self.val_data if self.create_validation_set else self.test_data | |
| return DataLoader(dataset=dataset, shuffle=False, batch_size=self.test_batch_size, num_workers=self.num_workers) | |
| def test_dataloader(self) -> EVAL_DATALOADERS: | |
| """Get test dataloader.""" | |
| return DataLoader(self.test_data, shuffle=False, batch_size=self.test_batch_size, num_workers=self.num_workers) | |
| def predict_dataloader(self) -> EVAL_DATALOADERS: | |
| """Get predict dataloader.""" | |
| return DataLoader( | |
| self.inference_data, shuffle=False, batch_size=self.test_batch_size, num_workers=self.num_workers | |
| ) | |