from torch.utils.data import Dataset, DataLoader from typing import * from dataclasses import dataclass, field from PIL import Image from utils import parse_structure from glob import glob from random import shuffle from torchvision.transforms import v2 import os import lightning.pytorch as pl import numpy as np import torch import random class BreakhisDataset(Dataset): def __init__(self, root_dir: str, image_size: Tuple[int, int], subset: str, aug: dict = None) -> None: self.root_dir = root_dir self.image_size = image_size self.classes = { 'benign' : 0, 'malignant' : 1 } self.ratio = [0.8, 0.1] self.subset = subset self.aug = aug self.benign_subclasses = ['adenosis', 'fibroadenoma', 'phyllodes_tumor', 'tubular_adenoma'] self.malignant_subclasses = ['ductal_carcinoma', 'lobular_carcinoma', 'mucinous_carcinoma', 'papillary_carcinoma'] self.cls2sublst = { 'benign' : self.benign_subclasses, 'malignant' : self.malignant_subclasses } self.factors = ['100X', '200X', '400X', '40X'] self.sample_paths = [] self.sample_labels = [] random.seed(42) for cate in ['benign', 'malignant']: for subcls in self.cls2sublst[cate]: for factor in self.factors: lst = glob(os.path.join(self.root_dir, f'{cate}/*/{subcls}/*/{factor}/*.png')) random.shuffle(lst) sublst = self.get_subset(lst) self.sample_paths += sublst self.sample_labels += [self.classes[cate]] * len(sublst) if self.aug is not None: self.transforms = [v2.Resize(self.image_size, antialias=True)] + \ [getattr(v2, x)(**self.aug[x]) for x in self.aug] + \ [ v2.ToDtype(torch.float32, scale=True), v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ] else: self.transforms = [ v2.Resize(self.image_size, antialias=True), v2.ToDtype(torch.float32, scale=True), v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ] self.transform = v2.Compose(self.transforms) def get_subset(self, x: list): if self.subset == 'train': return x[ : int(self.ratio[0] * len(x))] elif self.subset == 'valid': return x[int(self.ratio[0] * len(x)) : int((self.ratio[0] + self.ratio[1]) * len(x))] elif self.subset == 'test': return x[int((self.ratio[0] + self.ratio[1]) * len(x)) : ] else: return ValueError('Unknown subset') def __len__(self) -> int: return len(self.sample_paths) def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]: img_path = self.sample_paths[idx] label = self.sample_labels[idx] image = Image.open(img_path).convert("RGB") image = image.resize(self.image_size) image = np.array(image) image = torch.from_numpy(image).permute(2, 0, 1) image = self.transform(image) return image, label @dataclass class BaseDatasetConfig: data_source: str = '' batch_size:int = 32 shuffle:bool = True num_workers:int = 24 image_size:Tuple[int, int] = (224, 224) aug: dict = field(default_factory=dict) class BreakhisDataModule(pl.LightningDataModule): cfg: BaseDatasetConfig def __init__(self, cfg: BaseDatasetConfig) -> None: super().__init__() self.cfg:BaseDatasetConfig = parse_structure(BaseDatasetConfig, cfg) self.data_source = self.cfg.data_source self.img_size = self.cfg.image_size self.aug = self.cfg.aug def setup(self, stage=None) -> None: if stage in [None, "fit"]: self.train_dataset = BreakhisDataset(self.data_source, self.img_size, 'train', self.aug) if stage in [None, "fit", "validate"]: self.val_dataset = BreakhisDataset(self.data_source, self.img_size, 'valid', self.aug) if stage in [None, "test", "predict"]: self.test_dataset = BreakhisDataset(self.data_source, self.img_size, 'test', self.aug) def general_loader(self, dataset, batch_size) -> DataLoader: return DataLoader( dataset, num_workers=self.cfg.num_workers, batch_size=batch_size ) def train_dataloader(self) -> DataLoader: return DataLoader( self.train_dataset, num_workers=self.cfg.num_workers, batch_size=self.cfg.batch_size, shuffle=self.cfg.shuffle ) def val_dataloader(self) -> DataLoader: return DataLoader( self.val_dataset, num_workers=self.cfg.num_workers, batch_size=self.cfg.batch_size ) def test_dataloader(self) -> DataLoader: return DataLoader( self.test_dataset, num_workers=self.cfg.num_workers, batch_size=self.cfg.batch_size )