|
|
import sys |
|
|
import os |
|
|
from pathlib import Path |
|
|
import json |
|
|
import os |
|
|
import dataclasses |
|
|
from dataclasses import dataclass |
|
|
|
|
|
from typing import Any, Optional |
|
|
import math |
|
|
|
|
|
import logging |
|
|
from logging import NullHandler, StreamHandler |
|
|
|
|
|
import numpy as np |
|
|
import cv2 |
|
|
import tensorflow as tf |
|
|
|
|
|
__import__('pkg_resources').declare_namespace(__name__) |
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
if not logger.hasHandlers(): |
|
|
logger.addHandler(NullHandler()) |
|
|
logger.addHandler(StreamHandler(sys.stdout)) |
|
|
logger.setLevel('INFO') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATA_FOLDER = "/data/eurova/cumulus_database/" |
|
|
if "DATAPATH" in os.environ: |
|
|
DATA_FOLDER = os.environ["DATAPATH"] |
|
|
|
|
|
if "AIX_DATA" in os.environ: |
|
|
AIX_DATA = Path(os.environ["AIX_DATA"]) |
|
|
else: |
|
|
AIX_DATA = Path("data") |
|
|
|
|
|
if "AIX_MODELS" in os.environ: |
|
|
AIX_MODELS = Path(os.environ["AIX_MODELS"]) |
|
|
else: |
|
|
AIX_MODELS = Path("models") |
|
|
if "AIX_EVALS" in os.environ: |
|
|
AIX_EVALS = Path(os.environ["AIX_EVALS"]) |
|
|
else: |
|
|
AIX_EVALS = Path("eval") |
|
|
|
|
|
AIX_DATASETS = AIX_DATA / "datasets" |
|
|
|
|
|
MATURE = "mature" |
|
|
IMMATURE = "immature" |
|
|
|
|
|
def init_path(output_path:Path, stages=[IMMATURE, MATURE]): |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
for stage in stages: |
|
|
(output_path/stage).mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Item: |
|
|
dataset: Any |
|
|
mask: bool |
|
|
index: str |
|
|
stage: str = "" |
|
|
extension: str = ".png" |
|
|
|
|
|
def filename(self): |
|
|
if self.mask: |
|
|
bp = Path(self.dataset.rooted_annotations_path) |
|
|
else: |
|
|
bp = Path(self.dataset.rooted_images_path) |
|
|
if self.stage != "": |
|
|
bp = (bp / self.stage) |
|
|
f_name = str(bp / (self.index + self.extension)) |
|
|
|
|
|
return f_name |
|
|
|
|
|
def raw_image(self, opts=cv2.IMREAD_UNCHANGED, remove_alpha=True): |
|
|
img = cv2.imread(self.filename(), opts) |
|
|
if len(img.shape) == 3 and img.shape[2] == 4: |
|
|
print(self.filename() + " is in RGBA format. We remove the A") |
|
|
|
|
|
|
|
|
img = img[:, :, :3] |
|
|
return img |
|
|
|
|
|
def float_image(self, opts=cv2.IMREAD_UNCHANGED): |
|
|
return self.raw_image(opts).astype(np.float32) |
|
|
|
|
|
def norm_image(self, opts=cv2.IMREAD_UNCHANGED): |
|
|
return self.float_image(opts) / 255. |
|
|
|
|
|
def uint_norm_image(self, opts=cv2.IMREAD_UNCHANGED): |
|
|
return self.raw_image(opts) / 255. |
|
|
|
|
|
def tensor(self, shape): |
|
|
img = self.raw_image(cv2.IMREAD_GRAYSCALE) |
|
|
if len(img.shape) == 2: |
|
|
img.shape = (img.shape[0], img.shape[1], 1) |
|
|
t = tf.convert_to_tensor(img) |
|
|
t = tf.image.resize(t, shape[:2]) |
|
|
t = tf.cast(t, tf.float32) |
|
|
return t |
|
|
|
|
|
def norm_tensor(self, shape): |
|
|
return self.tensor(shape)/255. |
|
|
|
|
|
def write(self, img): |
|
|
assert img.dtype == np.uint8 |
|
|
print("Writing image ", self.filename()) |
|
|
cv2.imwrite(self.filename(), img) |
|
|
|
|
|
def copy(self): |
|
|
return dataclasses.replace(self) |
|
|
|
|
|
class Dataset: |
|
|
def __init__(self, name, oocytes, images_path:str, annotations_path:Optional[str]=None, image_extension=".png", |
|
|
stages=[IMMATURE, MATURE], create_folders=False): |
|
|
self.name = name |
|
|
self.oocytes = oocytes |
|
|
self.stages = stages |
|
|
print("Number of oocytes for dataset ", name, ":", len(self.oocytes)) |
|
|
|
|
|
|
|
|
if os.path.isabs(images_path): |
|
|
rooted_images_path = Path(images_path) |
|
|
else: |
|
|
rooted_images_path = AIX_DATA / images_path |
|
|
|
|
|
if annotations_path is not None: |
|
|
if os.path.isabs(annotations_path): |
|
|
rooted_annotations_path = Path(annotations_path) |
|
|
else: |
|
|
|
|
|
rooted_annotations_path = AIX_DATA / annotations_path |
|
|
else: |
|
|
rooted_annotations_path = None |
|
|
|
|
|
if create_folders: |
|
|
init_path(rooted_images_path, stages) |
|
|
if rooted_annotations_path is not None: |
|
|
init_path(rooted_annotations_path, stages) |
|
|
else: |
|
|
for subfold in stages: |
|
|
if not (rooted_images_path / subfold).is_dir(): |
|
|
raise Exception("Path "+ str(rooted_images_path) +" not found.") |
|
|
if rooted_annotations_path is not None and not (rooted_annotations_path / subfold).is_dir(): |
|
|
raise Exception("Path "+ str(rooted_annotations_path) +" not found.") |
|
|
|
|
|
self.images_path = images_path |
|
|
self.annotations_path = annotations_path |
|
|
self.rooted_images_path = rooted_images_path |
|
|
self.rooted_annotations_path = rooted_annotations_path |
|
|
self.extension = image_extension |
|
|
|
|
|
@staticmethod |
|
|
def from_folder(name, folder_name, images_path, annotations_path, image_extension=".png"): |
|
|
if not Path(folder_name).is_dir(): |
|
|
raise Exception("Path "+folder_name+" not found.") |
|
|
|
|
|
oocytes = sorted(f.stem for f in Path(folder_name).iterdir() if f.suffix == image_extension) |
|
|
|
|
|
return Dataset(name, oocytes, images_path, annotations_path, image_extension) |
|
|
|
|
|
@staticmethod |
|
|
def from_file(file_name: Path): |
|
|
if not Path(file_name).is_file(): |
|
|
raise Exception("File "+str(file_name)+" not found") |
|
|
json_data = open(file_name).read() |
|
|
data = json.loads(json_data) |
|
|
if "image_extension" not in data: |
|
|
data['image_extension'] = ".png" |
|
|
dataset = Dataset(data["name"], data["oocytes"], data["images"], data["annotations"], data["image_extension"]) |
|
|
return dataset |
|
|
|
|
|
@staticmethod |
|
|
def create(name, images_path:str, annotations_path:str, |
|
|
image_extension=".png", stages=[IMMATURE, MATURE]): |
|
|
|
|
|
|
|
|
|
|
|
return Dataset(name, [], images_path, annotations_path, image_extension, create_folders=True) |
|
|
|
|
|
def num_images(self): |
|
|
return len(self.stages)*len(self.oocytes) |
|
|
|
|
|
def save(self, file_name): |
|
|
d = {"name": self.name, "oocytes" : self.oocytes, |
|
|
"image_extension": self.extension, |
|
|
"images": str(self.images_path), |
|
|
"annotations": str(self.annotations_path)} |
|
|
with open(file_name, "w") as f: |
|
|
f.write(json.dumps(d)) |
|
|
|
|
|
def has_annotations(self): |
|
|
return self.annotations_path is not None |
|
|
|
|
|
def new_item(self, mask=False, stage="", index=""): |
|
|
return Item(self, mask, index=index, stage=stage, extension=self.extension) |
|
|
|
|
|
def cv_item_iterator(self, k=10, seed=42, maturity=None): |
|
|
random_arr = np.arange(len(self.oocytes)) |
|
|
np.random.seed(seed) |
|
|
np.random.shuffle(random_arr) |
|
|
oocyte_items = [] |
|
|
mask_items = [] |
|
|
for i in random_arr: |
|
|
oocyte_index = self.oocytes[i] |
|
|
if maturity is None or maturity == IMMATURE: |
|
|
oocyte_items.append(self.new_item(mask=False, stage=IMMATURE, index=oocyte_index)) |
|
|
mask_items.append(self.new_item(mask=True, stage=IMMATURE, index=oocyte_index)) |
|
|
if maturity is None or maturity == MATURE: |
|
|
oocyte_items.append(self.new_item(mask=False, stage=MATURE, index=oocyte_index)) |
|
|
mask_items.append(self.new_item(mask=True, stage=MATURE, index=oocyte_index)) |
|
|
|
|
|
fold_sizes = np.repeat(len(self.oocytes)// k, k) |
|
|
|
|
|
fold_sizes[:len(self.oocytes) % k] += 1 |
|
|
if maturity is None: |
|
|
fold_sizes *= 2 |
|
|
num_fold = np.repeat(np.arange(k), fold_sizes) |
|
|
oocyte_items = np.array(oocyte_items) |
|
|
mask_items = np.array(mask_items) |
|
|
|
|
|
for fold in range(k): |
|
|
x_train = oocyte_items[num_fold != fold] |
|
|
y_train = mask_items[num_fold != fold] |
|
|
x_test = oocyte_items[num_fold == fold] |
|
|
y_test = mask_items[num_fold == fold] |
|
|
yield x_train, x_test, y_train, y_test |
|
|
|
|
|
@classmethod |
|
|
def tf_dataset_from_items(cls, x, y, image_shape, mask_shape): |
|
|
def f(): |
|
|
for x_item, y_item in zip(x, y): |
|
|
yield x_item.tensor(image_shape), y_item.norm_tensor(mask_shape) |
|
|
|
|
|
return tf.data.Dataset.from_generator(f, |
|
|
output_signature=(tf.TensorSpec(shape=image_shape, dtype=tf.float32), |
|
|
tf.TensorSpec(shape=mask_shape, dtype=tf.float32))) |
|
|
|
|
|
def cv_tf_dataset_iterator(self, image_shape, mask_shape, k=10, seed=42, maturity=None): |
|
|
for x_train, x_test, y_train, y_test in self.cv_item_iterator(k=k, seed=seed, maturity=maturity): |
|
|
train = self.tf_dataset_from_items(x_train, y_train, image_shape, mask_shape) |
|
|
test = self.tf_dataset_from_items(x_test, y_test, image_shape, mask_shape) |
|
|
yield (x_train, y_train), train, (x_test, y_test), test |
|
|
|
|
|
|
|
|
def train_test_iterator(self, k=10, seed=42): |
|
|
random_arr = np.arange(len(self.oocytes)) |
|
|
np.random.seed(seed) |
|
|
np.random.shuffle(random_arr) |
|
|
|
|
|
image_files = [] |
|
|
mask_files = [] |
|
|
for idx in random_arr: |
|
|
|
|
|
for stage in self.stages: |
|
|
image_files.append((Path(self.rooted_images_path) / stage / (self.oocytes[idx])).as_posix()) |
|
|
mask_files.append((Path(self.rooted_annotations_path) / stage / (self.oocytes[idx])).as_posix()) |
|
|
|
|
|
fold_sizes = np.repeat(len(self.oocytes)// k, k) |
|
|
|
|
|
fold_sizes[:len(self.oocytes) % k] += 1 |
|
|
|
|
|
num_fold = np.repeat(np.arange(10), fold_sizes * 2) |
|
|
image_files = np.array(image_files) |
|
|
mask_files = np.array(mask_files) |
|
|
|
|
|
for fold in range(k): |
|
|
x_train = image_files[num_fold!=fold] |
|
|
y_train = mask_files[num_fold!=fold] |
|
|
x_test = image_files[num_fold==fold] |
|
|
y_test = mask_files[num_fold==fold] |
|
|
yield x_train, x_test, y_train, y_test |
|
|
|
|
|
def train_test_split(self, percent=90, seed=42): |
|
|
random_arr = np.arange(len(self.oocytes)) |
|
|
np.random.seed(seed) |
|
|
np.random.shuffle(random_arr) |
|
|
first_test = math.floor(percent * len(self.oocytes)/100.) |
|
|
oocytes_a = np.array(self.oocytes) |
|
|
train_oocytes = list(oocytes_a[:first_test]) |
|
|
test_oocytes = list(oocytes_a[first_test:]) |
|
|
train_ds = Dataset(self.name+"train", train_oocytes, self.images_path, self.annotations_path) |
|
|
test_ds = Dataset(self.name+"test", test_oocytes, self.images_path, self.annotations_path) |
|
|
return train_ds, test_ds |
|
|
|
|
|
def tfDataset(self): |
|
|
idx = self.oocytes[0] |
|
|
image_shape = self.new_item(mask=False, stage=IMMATURE, index=idx).tensor().shape |
|
|
mask_shape = self.new_item(mask=True, stage=IMMATURE, index=idx).tensor().shape |
|
|
return tf.data.Dataset.from_generator(self.iterate_pairs, |
|
|
output_signature=(tf.TensorSpec(shape=image_shape, dtype=tf.float32), |
|
|
tf.TensorSpec(shape=mask_shape, dtype=tf.float32))) |
|
|
|
|
|
def tfDataset_fixed_shape(self, image_shape, mask_shape): |
|
|
def f(): |
|
|
for x_item, y_item in self.iterate_pairs(tensor=False): |
|
|
yield x_item.tensor(image_shape), y_item.norm_tensor(mask_shape) |
|
|
|
|
|
return tf.data.Dataset.from_generator(f, |
|
|
output_signature=(tf.TensorSpec(shape=image_shape, dtype=tf.float32), |
|
|
tf.TensorSpec(shape=mask_shape, dtype=tf.float32))) |
|
|
|
|
|
def iterate_pairs(self, tensor=True): |
|
|
for idx in self.oocytes: |
|
|
for stage in self.stages: |
|
|
x = self.new_item(mask=False, stage=stage, index=idx) |
|
|
y = self.new_item(mask=True, stage=stage, index=idx) |
|
|
if tensor: |
|
|
x = x.tensor() |
|
|
y = y.tensor() |
|
|
yield x, y |
|
|
|
|
|
def iterate_items(self): |
|
|
for idx in self.oocytes: |
|
|
for stage in self.stages: |
|
|
yield self.new_item(mask=False, stage=stage, index=idx) |
|
|
yield self.new_item(mask=True, stage=stage, index=idx) |
|
|
|
|
|
def iterate_oocyte_items(self, tensor=True): |
|
|
for idx in self.oocytes: |
|
|
for stage in self.stages: |
|
|
x = self.new_item(mask=False, stage=stage, index=idx) |
|
|
if tensor: |
|
|
x = x.tensor() |
|
|
yield x |
|
|
|
|
|
def iterate_mask_items(self): |
|
|
for idx in self.oocytes: |
|
|
for stage in self.stages: |
|
|
yield self.new_item(mask=True, stage=stage, index=idx) |
|
|
|
|
|
def iterate_oocyte_masks(self): |
|
|
for idx in self.oocytes: |
|
|
masks = [] |
|
|
for stage in self.stages: |
|
|
x = self.new_item(mask=True, stage=stage, index=idx) |
|
|
masks.append(x) |
|
|
yield masks |
|
|
|
|
|
def __repr__(self): |
|
|
return "<Dataset: {}>".format(self.name) |
|
|
|
|
|
def add_oocyte(self, index): |
|
|
if index not in self.oocytes: |
|
|
self.oocytes.append(index) |
|
|
|