Spaces:
Sleeping
Sleeping
File size: 6,151 Bytes
8960670 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | #!/usr/bin/env python3
"""
Malignancy Dataset — LIDC-IDRI Malignancy Classification Data Pipeline
Handles:
- Loading LIDC-IDRI malignancy annotations (1-5 scale)
- Binary conversion: 1-2 → benign (0), 4-5 → malignant (1), skip 3
- PyTorch Dataset for loading 64³ nodule patches with labels
- 3D augmentation (rotation, flip, noise)
NOT part of the research paper — demo feature only.
"""
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
def prepare_malignancy_data(csv_path):
"""Load and prepare LIDC-IDRI malignancy annotations for binary classification.
Converts the 5-point malignancy scale to binary:
- Benign (0): malignancy 1-2
- Malignant (1): malignancy 4-5
- Skipped: malignancy 3 (indeterminate)
Args:
csv_path: Path to LIDC annotations CSV with columns:
- nodule_id: unique identifier
- malignancy: 1-5 rating
Returns:
pd.DataFrame with added 'label' column (0 or 1)
"""
annotations = pd.read_csv(csv_path)
if 'malignancy' not in annotations.columns:
raise ValueError(
f"CSV must have a 'malignancy' column. Found: {list(annotations.columns)}"
)
# Convert to binary classification
annotations['label'] = annotations['malignancy'].apply(
lambda x: 0 if x <= 2 else (1 if x >= 4 else -1)
)
# Remove indeterminate cases (malignancy == 3)
annotations = annotations[annotations['label'] != -1].reset_index(drop=True)
benign_count = (annotations['label'] == 0).sum()
malignant_count = (annotations['label'] == 1).sum()
print(f"Malignancy data prepared:")
print(f" Benign nodules: {benign_count}")
print(f" Malignant nodules: {malignant_count}")
print(f" Total: {len(annotations)}")
print(f" Ratio (B:M): {benign_count / max(malignant_count, 1):.1f}:1")
return annotations
class MalignancyDataset(Dataset):
"""PyTorch Dataset for malignancy classification.
Loads 64³ nodule patches from .npy files and returns (patch, label) pairs.
Args:
annotations_df: DataFrame with 'nodule_id' and 'label' columns
patches_dir: Directory containing {nodule_id}.npy patch files
augment: Whether to apply data augmentation
"""
def __init__(self, annotations_df, patches_dir, augment=False):
self.annotations = annotations_df.reset_index(drop=True)
self.patches_dir = Path(patches_dir)
self.augment = augment
# Verify directory exists
if not self.patches_dir.exists():
raise FileNotFoundError(
f"Patches directory not found: {self.patches_dir}"
)
def __len__(self):
return len(self.annotations)
def __getitem__(self, idx):
row = self.annotations.iloc[idx]
# Load nodule patch (64×64×64)
nodule_id = row['nodule_id']
patch_path = self.patches_dir / f"{nodule_id}.npy"
if not patch_path.exists():
# Fallback: try .npz format
npz_path = self.patches_dir / f"{nodule_id}.npz"
if npz_path.exists():
patch = np.load(npz_path)['patch'].astype(np.float32)
else:
raise FileNotFoundError(
f"Patch file not found: {patch_path} or {npz_path}"
)
else:
patch = np.load(patch_path).astype(np.float32)
# Apply augmentation
if self.augment:
patch = self._augment(patch)
# Convert to tensor: add channel dim → (1, 64, 64, 64)
patch_tensor = torch.from_numpy(patch).unsqueeze(0)
label = torch.tensor(row['label'], dtype=torch.long)
return patch_tensor, label
def _augment(self, patch):
"""Apply random 3D augmentations.
Consistent with the augmentation patterns used in the main
LunaDataset for the detection task.
"""
# Random 90° rotation along a random axis pair
k = np.random.randint(0, 4)
axes = [(0, 1), (0, 2), (1, 2)]
ax = axes[np.random.randint(0, 3)]
patch = np.rot90(patch, k=k, axes=ax).copy()
# Random flip along each axis
for axis in range(3):
if np.random.rand() > 0.5:
patch = np.flip(patch, axis=axis).copy()
# Gaussian noise
if np.random.rand() > 0.5:
noise = np.random.normal(0, 0.05, patch.shape).astype(np.float32)
patch = patch + noise
# Clamp to valid range
patch = np.clip(patch, -1.0, 1.0)
return patch
def create_malignancy_loaders(config):
"""Create train and validation DataLoaders for malignancy classification.
Args:
config: Configuration dict with 'data' and 'training' sections
Returns:
(train_loader, val_loader)
"""
from sklearn.model_selection import train_test_split
data_cfg = config.get('data', {})
training_cfg = config.get('training', {})
annotations = prepare_malignancy_data(data_cfg['annotations_csv'])
# Stratified train/val split
val_ratio = data_cfg.get('val_ratio', 0.2)
train_df, val_df = train_test_split(
annotations, test_size=val_ratio,
stratify=annotations['label'], random_state=42
)
print(f" Train split: {len(train_df)} samples")
print(f" Val split: {len(val_df)} samples")
patches_dir = data_cfg['patches_dir']
train_dataset = MalignancyDataset(train_df, patches_dir, augment=True)
val_dataset = MalignancyDataset(val_df, patches_dir, augment=False)
batch_size = training_cfg.get('batch_size', 32)
num_workers = data_cfg.get('num_workers', 4)
train_loader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True,
num_workers=num_workers, pin_memory=True, drop_last=True
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False,
num_workers=num_workers, pin_memory=True
)
return train_loader, val_loader
|