|
|
from torch.utils.data import Dataset, DataLoader |
|
|
from typing import * |
|
|
from dataclasses import dataclass, field |
|
|
from PIL import Image |
|
|
from utils import parse_structure |
|
|
from glob import glob |
|
|
from random import shuffle |
|
|
from torchvision.transforms import v2 |
|
|
|
|
|
import os |
|
|
import lightning.pytorch as pl |
|
|
import numpy as np |
|
|
import torch |
|
|
import random |
|
|
|
|
|
|
|
|
class BreakhisDataset(Dataset): |
|
|
def __init__(self, root_dir: str, image_size: Tuple[int, int], subset: str, aug: dict = None) -> None: |
|
|
self.root_dir = root_dir |
|
|
self.image_size = image_size |
|
|
self.classes = { |
|
|
'benign' : 0, |
|
|
'malignant' : 1 |
|
|
} |
|
|
self.ratio = [0.8, 0.1] |
|
|
self.subset = subset |
|
|
self.aug = aug |
|
|
|
|
|
self.benign_subclasses = ['adenosis', 'fibroadenoma', 'phyllodes_tumor', 'tubular_adenoma'] |
|
|
self.malignant_subclasses = ['ductal_carcinoma', 'lobular_carcinoma', 'mucinous_carcinoma', 'papillary_carcinoma'] |
|
|
self.cls2sublst = { |
|
|
'benign' : self.benign_subclasses, |
|
|
'malignant' : self.malignant_subclasses |
|
|
} |
|
|
self.factors = ['100X', '200X', '400X', '40X'] |
|
|
|
|
|
self.sample_paths = [] |
|
|
self.sample_labels = [] |
|
|
|
|
|
random.seed(42) |
|
|
for cate in ['benign', 'malignant']: |
|
|
for subcls in self.cls2sublst[cate]: |
|
|
for factor in self.factors: |
|
|
lst = glob(os.path.join(self.root_dir, f'{cate}/*/{subcls}/*/{factor}/*.png')) |
|
|
random.shuffle(lst) |
|
|
|
|
|
sublst = self.get_subset(lst) |
|
|
self.sample_paths += sublst |
|
|
self.sample_labels += [self.classes[cate]] * len(sublst) |
|
|
|
|
|
if self.aug is not None: |
|
|
self.transforms = [v2.Resize(self.image_size, antialias=True)] + \ |
|
|
[getattr(v2, x)(**self.aug[x]) for x in self.aug] + \ |
|
|
[ |
|
|
v2.ToDtype(torch.float32, scale=True), |
|
|
v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
|
|
] |
|
|
else: |
|
|
self.transforms = [ |
|
|
v2.Resize(self.image_size, antialias=True), |
|
|
v2.ToDtype(torch.float32, scale=True), |
|
|
v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
|
|
] |
|
|
|
|
|
self.transform = v2.Compose(self.transforms) |
|
|
|
|
|
def get_subset(self, x: list): |
|
|
if self.subset == 'train': |
|
|
return x[ : int(self.ratio[0] * len(x))] |
|
|
elif self.subset == 'valid': |
|
|
return x[int(self.ratio[0] * len(x)) : int((self.ratio[0] + self.ratio[1]) * len(x))] |
|
|
elif self.subset == 'test': |
|
|
return x[int((self.ratio[0] + self.ratio[1]) * len(x)) : ] |
|
|
else: |
|
|
return ValueError('Unknown subset') |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return len(self.sample_paths) |
|
|
|
|
|
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]: |
|
|
img_path = self.sample_paths[idx] |
|
|
label = self.sample_labels[idx] |
|
|
image = Image.open(img_path).convert("RGB") |
|
|
image = image.resize(self.image_size) |
|
|
image = np.array(image) |
|
|
image = torch.from_numpy(image).permute(2, 0, 1) |
|
|
image = self.transform(image) |
|
|
|
|
|
return image, label |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BaseDatasetConfig: |
|
|
data_source: str = '' |
|
|
batch_size:int = 32 |
|
|
shuffle:bool = True |
|
|
num_workers:int = 24 |
|
|
image_size:Tuple[int, int] = (224, 224) |
|
|
aug: dict = field(default_factory=dict) |
|
|
|
|
|
class BreakhisDataModule(pl.LightningDataModule): |
|
|
cfg: BaseDatasetConfig |
|
|
|
|
|
def __init__(self, cfg: BaseDatasetConfig) -> None: |
|
|
super().__init__() |
|
|
self.cfg:BaseDatasetConfig = parse_structure(BaseDatasetConfig, cfg) |
|
|
self.data_source = self.cfg.data_source |
|
|
self.img_size = self.cfg.image_size |
|
|
self.aug = self.cfg.aug |
|
|
|
|
|
def setup(self, stage=None) -> None: |
|
|
if stage in [None, "fit"]: |
|
|
self.train_dataset = BreakhisDataset(self.data_source, self.img_size, 'train', self.aug) |
|
|
if stage in [None, "fit", "validate"]: |
|
|
self.val_dataset = BreakhisDataset(self.data_source, self.img_size, 'valid', self.aug) |
|
|
if stage in [None, "test", "predict"]: |
|
|
self.test_dataset = BreakhisDataset(self.data_source, self.img_size, 'test', self.aug) |
|
|
|
|
|
def general_loader(self, dataset, batch_size) -> DataLoader: |
|
|
return DataLoader( |
|
|
dataset, |
|
|
num_workers=self.cfg.num_workers, |
|
|
batch_size=batch_size |
|
|
) |
|
|
|
|
|
def train_dataloader(self) -> DataLoader: |
|
|
return DataLoader( |
|
|
self.train_dataset, |
|
|
num_workers=self.cfg.num_workers, |
|
|
batch_size=self.cfg.batch_size, |
|
|
shuffle=self.cfg.shuffle |
|
|
) |
|
|
|
|
|
def val_dataloader(self) -> DataLoader: |
|
|
return DataLoader( |
|
|
self.val_dataset, |
|
|
num_workers=self.cfg.num_workers, |
|
|
batch_size=self.cfg.batch_size |
|
|
) |
|
|
|
|
|
def test_dataloader(self) -> DataLoader: |
|
|
return DataLoader( |
|
|
self.test_dataset, |
|
|
num_workers=self.cfg.num_workers, |
|
|
batch_size=self.cfg.batch_size |
|
|
) |