|
|
import os
|
|
|
|
|
|
import random
|
|
|
import torch
|
|
|
import imageio.v3 as imageio
|
|
|
import numpy as np
|
|
|
import skimage.morphology as morph
|
|
|
import torchvision.transforms.v2.functional as T_F
|
|
|
|
|
|
from skimage.filters import sato
|
|
|
from pathlib import Path
|
|
|
from scipy.ndimage import zoom
|
|
|
from torchvision.datasets.folder import has_file_allowed_extension
|
|
|
|
|
|
|
|
|
def make_dataset_t(image_dir, extensions=(".tif", ".tiff")):
|
|
|
image_dir = Path(image_dir)
|
|
|
images = [
|
|
|
(path, image_dir / f'Ridge_{path.name}', image_dir / f'Basins_{path.name}')
|
|
|
for path in sorted(image_dir.iterdir())
|
|
|
if (has_file_allowed_extension(path.name, extensions)
|
|
|
and (not path.name.startswith('Ridge_')) and (not path.name.startswith('Basins_')))
|
|
|
]
|
|
|
return images
|
|
|
|
|
|
def make_dataset_t_v(image_dir, extensions=(".tif", ".tiff")):
|
|
|
image_dir = Path(image_dir)
|
|
|
|
|
|
images = [
|
|
|
(path, image_dir / f'Ridge_{path.name}', image_dir / f'Basins_{path.name}')
|
|
|
for path in sorted(image_dir.iterdir())
|
|
|
if (has_file_allowed_extension(path.name, extensions)
|
|
|
and (not path.name.startswith('Ridge_')) and (not path.name.startswith('Basins_')))
|
|
|
]
|
|
|
|
|
|
|
|
|
random.shuffle(images)
|
|
|
|
|
|
|
|
|
split_idx = int(0.95 * len(images))
|
|
|
return images[:split_idx], images[split_idx:]
|
|
|
|
|
|
def augmentations(image, label1, label2):
|
|
|
if random.random() < 0.5:
|
|
|
image, label1, label2 = T_F.vflip(image), T_F.vflip(label1), T_F.vflip(label2)
|
|
|
if random.random() < 0.5:
|
|
|
image, label1, label2 = T_F.hflip(image), T_F.hflip(label1), T_F.vflip(label2)
|
|
|
angles = [90, 180, 270]
|
|
|
angle = random.choice(angles)
|
|
|
if random.random() < 0.75:
|
|
|
image, label1, label2 = T_F.rotate(image, angle), T_F.rotate(label1, angle), T_F.rotate(label2, angle)
|
|
|
return image, label1, label2
|
|
|
|
|
|
mean, std = (149.95293407563648, 330.8314960521203)
|
|
|
target_water_level_range = [-100, 300]
|
|
|
|
|
|
class TrainDataset(torch.utils.data.Dataset):
|
|
|
def __init__(self, train_split):
|
|
|
self.train_split = train_split
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.train_split)
|
|
|
|
|
|
def __getitem__(self, index):
|
|
|
pair = self.train_split[index]
|
|
|
img = torch.from_numpy(imageio.imread(str(pair[0])))[None, :]
|
|
|
img = (img - mean) / std
|
|
|
ridge = torch.from_numpy(imageio.imread(str(pair[1])))[None, :].to(torch.float16)
|
|
|
basins = torch.from_numpy(imageio.imread(str(pair[2])))[None, :]
|
|
|
water_level = random.randint(*target_water_level_range)
|
|
|
basins = (basins >= water_level).to(torch.float16)
|
|
|
img, ridge, basins = augmentations(img, ridge, basins)
|
|
|
return img, ridge, basins, torch.tensor(water_level, dtype=torch.float16)
|
|
|
|
|
|
class ValDataset(torch.utils.data.Dataset):
|
|
|
def __init__(self, val_split):
|
|
|
self.val_split = val_split
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.val_split)
|
|
|
|
|
|
def __getitem__(self, index):
|
|
|
pair = self.val_split[index]
|
|
|
img = torch.from_numpy(imageio.imread(str(pair[0])))[None, :]
|
|
|
img = (img - mean) / std
|
|
|
ridge = torch.from_numpy(imageio.imread(str(pair[1])))[None, :].to(torch.float16)
|
|
|
basins = torch.from_numpy(imageio.imread(str(pair[2])))[None, :]
|
|
|
target_level = random.randint(*target_water_level_range)
|
|
|
basins = (basins >= target_level).to(torch.float16)
|
|
|
return img, ridge, basins, torch.tensor(target_level, dtype=torch.float16)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
train_split, val_split = make_dataset_t_v('dataset')
|
|
|
|
|
|
train_dataset = TrainDataset(train_split)
|
|
|
val_dataset = ValDataset(val_split)
|
|
|
|
|
|
print(train_dataset.__getitem__(0))
|
|
|
print(val_dataset.__getitem__(0)) |