Spaces:
Running
Running
File size: 5,950 Bytes
4db9215 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """
Multi-stain dataset for training a single model on all MIST IHC stains.
Combines HER2, Ki67, ER, PR into one dataset, returning a stain label (0-3)
instead of a class label. Reuses the same crop + UNI sub-crop pipeline
from CropPairedDataset.
Stain label mapping:
0 = HER2, 1 = Ki67, 2 = ER, 3 = PR, 4 = null (CFG dropout)
"""
import os
from pathlib import Path
from typing import List, Optional, Tuple
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pytorch_lightning as pl
from src.data.bci_dataset import CropPairedDataset
STAIN_TO_LABEL = {'HER2': 0, 'Ki67': 1, 'ER': 2, 'PR': 3}
LABEL_TO_STAIN = {v: k for k, v in STAIN_TO_LABEL.items()}
class MISTMultiStainCropDataset(CropPairedDataset):
"""Multi-stain MIST dataset with random 512 crops from native 1024x1024.
Loads all 4 MIST stains into a single dataset. Each sample returns a
stain label (0-3) as the conditioning signal, reusing the class embedding
slot in the generator.
"""
def __init__(
self,
base_dir: str,
stains: List[str],
split: str = 'train',
image_size: Tuple[int, int] = (512, 512),
crop_size: int = 512,
augment: bool = False,
null_class: int = 4,
):
super().__init__(
he_dir='.', # placeholder, we override __getitem__
ihc_dir='.',
image_size=image_size,
crop_size=crop_size,
augment=augment,
null_class=null_class,
)
self.base_dir = Path(base_dir)
self.samples = [] # (he_path, ihc_path, stain_label)
split_he = 'trainA' if split == 'train' else 'valA'
split_ihc = 'trainB' if split == 'train' else 'valB'
valid_exts = ('.jpg', '.jpeg', '.png')
for stain in stains:
if stain not in STAIN_TO_LABEL:
raise ValueError(f"Unknown stain: {stain}. Must be one of {list(STAIN_TO_LABEL.keys())}")
stain_label = STAIN_TO_LABEL[stain]
he_dir = self.base_dir / stain / 'TrainValAB' / split_he
ihc_dir = self.base_dir / stain / 'TrainValAB' / split_ihc
if not he_dir.exists():
raise FileNotFoundError(f"H&E directory not found: {he_dir}")
if not ihc_dir.exists():
raise FileNotFoundError(f"IHC directory not found: {ihc_dir}")
he_files = sorted([f for f in os.listdir(he_dir)
if f.lower().endswith(valid_exts)])
ihc_files = sorted([f for f in os.listdir(ihc_dir)
if f.lower().endswith(valid_exts)])
# Match by stem (H&E may be .jpg, IHC may be .png)
he_stems = {Path(f).stem: f for f in he_files}
ihc_stems = {Path(f).stem: f for f in ihc_files}
common = sorted(set(he_stems.keys()) & set(ihc_stems.keys()))
for stem in common:
self.samples.append((
he_dir / he_stems[stem],
ihc_dir / ihc_stems[stem],
stain_label,
))
print(f" {stain} ({split}): {len(common)} pairs")
# Per-stain counts for logging
from collections import Counter
dist = Counter(s[2] for s in self.samples)
stain_counts = {LABEL_TO_STAIN[k]: v for k, v in sorted(dist.items())}
print(f"Multi-Stain Crop Dataset ({split}): {len(self.samples)} total | {stain_counts}")
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
he_path, ihc_path, stain_label = self.samples[idx]
he_img = Image.open(he_path).convert('RGB')
ihc_img = Image.open(ihc_path).convert('RGB')
return self._process_pair(he_img, ihc_img, stain_label, he_path.name)
class MISTMultiStainCropDataModule(pl.LightningDataModule):
"""Lightning DataModule for multi-stain MIST training."""
def __init__(
self,
base_dir: str,
stains: Optional[List[str]] = None,
batch_size: int = 4,
num_workers: int = 4,
image_size: Tuple[int, int] = (512, 512),
crop_size: int = 512,
null_class: int = 4,
):
super().__init__()
self.base_dir = base_dir
self.stains = stains or ['HER2', 'Ki67', 'ER', 'PR']
self.batch_size = batch_size
self.num_workers = num_workers
self.image_size = image_size
self.crop_size = crop_size
self.null_class = null_class
def setup(self, stage=None):
if stage == 'fit' or stage is None:
self.train_dataset = MISTMultiStainCropDataset(
base_dir=self.base_dir,
stains=self.stains,
split='train',
image_size=self.image_size,
crop_size=self.crop_size,
augment=True,
null_class=self.null_class,
)
if stage in ('fit', 'validate', 'test') or stage is None:
self.val_dataset = MISTMultiStainCropDataset(
base_dir=self.base_dir,
stains=self.stains,
split='val',
image_size=self.image_size,
crop_size=self.crop_size,
augment=False,
null_class=self.null_class,
)
def train_dataloader(self):
return DataLoader(
self.train_dataset, batch_size=self.batch_size, shuffle=True,
num_workers=self.num_workers, pin_memory=True,
persistent_workers=self.num_workers > 0,
)
def val_dataloader(self):
return DataLoader(
self.val_dataset, batch_size=self.batch_size, shuffle=False,
num_workers=self.num_workers, pin_memory=True,
persistent_workers=self.num_workers > 0,
)
def test_dataloader(self):
return self.val_dataloader()
|