Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| from pathlib import Path | |
| import json | |
| import os | |
| import dataclasses | |
| from dataclasses import dataclass | |
| from typing import Any, Optional | |
| import math | |
| import logging | |
| from logging import NullHandler, StreamHandler | |
| import numpy as np | |
| import cv2 | |
| import tensorflow as tf | |
| __import__('pkg_resources').declare_namespace(__name__) | |
| # Set default logging handler to avoid "No handler found" warnings. | |
| logger = logging.getLogger(__name__) | |
| if not logger.hasHandlers(): | |
| logger.addHandler(NullHandler()) | |
| logger.addHandler(StreamHandler(sys.stdout)) | |
| logger.setLevel('INFO') | |
| # environment variables: | |
| # DATAPATH: PATH of the data files | |
| DATA_FOLDER = "/data/eurova/cumulus_database/" | |
| if "DATAPATH" in os.environ: | |
| DATA_FOLDER = os.environ["DATAPATH"] | |
| if "AIX_DATA" in os.environ: | |
| AIX_DATA = Path(os.environ["AIX_DATA"]) | |
| else: | |
| AIX_DATA = Path("data") | |
| if "AIX_MODELS" in os.environ: | |
| AIX_MODELS = Path(os.environ["AIX_MODELS"]) | |
| else: | |
| AIX_MODELS = Path("models") | |
| if "AIX_EVALS" in os.environ: | |
| AIX_EVALS = Path(os.environ["AIX_EVALS"]) | |
| else: | |
| AIX_EVALS = Path("eval") | |
| AIX_DATASETS = AIX_DATA / "datasets" | |
| MATURE = "mature" | |
| IMMATURE = "immature" | |
| def init_path(output_path:Path, stages=[IMMATURE, MATURE]): | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| for stage in stages: | |
| (output_path/stage).mkdir(exist_ok=True) | |
| # An item is a generalization which includes as particular cases an oocyte image, an oocyte mask, and patches of those. | |
| class Item: | |
| dataset: Any | |
| mask: bool | |
| index: str | |
| stage: str = "" | |
| extension: str = ".png" | |
| def filename(self): | |
| if self.mask: | |
| bp = Path(self.dataset.rooted_annotations_path) | |
| else: | |
| bp = Path(self.dataset.rooted_images_path) | |
| if self.stage != "": | |
| bp = (bp / self.stage) | |
| f_name = str(bp / (self.index + self.extension)) | |
| #print(f_name) | |
| return f_name | |
| def raw_image(self, opts=cv2.IMREAD_UNCHANGED, remove_alpha=True): | |
| img = cv2.imread(self.filename(), opts) | |
| if len(img.shape) == 3 and img.shape[2] == 4: | |
| print(self.filename() + " is in RGBA format. We remove the A") | |
| # print(np.unique(img[:,:,3])) | |
| # print(np.unique(img[:,:,0]-img[:,:,1])) | |
| img = img[:, :, :3] | |
| return img | |
| def float_image(self, opts=cv2.IMREAD_UNCHANGED): | |
| return self.raw_image(opts).astype(np.float32) | |
| def norm_image(self, opts=cv2.IMREAD_UNCHANGED): | |
| return self.float_image(opts) / 255. | |
| def uint_norm_image(self, opts=cv2.IMREAD_UNCHANGED): | |
| return self.raw_image(opts) / 255. | |
| def tensor(self, shape): | |
| img = self.raw_image(cv2.IMREAD_GRAYSCALE) | |
| if len(img.shape) == 2: | |
| img.shape = (img.shape[0], img.shape[1], 1) | |
| t = tf.convert_to_tensor(img) | |
| t = tf.image.resize(t, shape[:2]) | |
| t = tf.cast(t, tf.float32) | |
| return t | |
| def norm_tensor(self, shape): | |
| return self.tensor(shape)/255. | |
| def write(self, img): | |
| assert img.dtype == np.uint8 | |
| print("Writing image ", self.filename()) | |
| cv2.imwrite(self.filename(), img) | |
| def copy(self): | |
| return dataclasses.replace(self) | |
| class Dataset: | |
| def __init__(self, name, oocytes, images_path:str, annotations_path:Optional[str]=None, image_extension=".png", | |
| stages=[IMMATURE, MATURE], create_folders=False): | |
| self.name = name | |
| self.oocytes = oocytes | |
| self.stages = stages | |
| print("Number of oocytes for dataset ", name, ":", len(self.oocytes)) | |
| # root path with subfolders immature / mature | |
| if os.path.isabs(images_path): | |
| rooted_images_path = Path(images_path) | |
| else: | |
| rooted_images_path = AIX_DATA / images_path | |
| if annotations_path is not None: | |
| if os.path.isabs(annotations_path): | |
| rooted_annotations_path = Path(annotations_path) | |
| else: | |
| # !="" and not os.path.isabs(annotations_path) and annotations_path[:2]!="./"): | |
| rooted_annotations_path = AIX_DATA / annotations_path | |
| else: | |
| rooted_annotations_path = None | |
| # Check | |
| if create_folders: | |
| init_path(rooted_images_path, stages) | |
| if rooted_annotations_path is not None: | |
| init_path(rooted_annotations_path, stages) | |
| else: | |
| for subfold in stages: | |
| if not (rooted_images_path / subfold).is_dir(): | |
| raise Exception("Path "+ str(rooted_images_path) +" not found.") | |
| if rooted_annotations_path is not None and not (rooted_annotations_path / subfold).is_dir(): | |
| raise Exception("Path "+ str(rooted_annotations_path) +" not found.") | |
| self.images_path = images_path | |
| self.annotations_path = annotations_path | |
| self.rooted_images_path = rooted_images_path | |
| self.rooted_annotations_path = rooted_annotations_path | |
| self.extension = image_extension | |
| def from_folder(name, folder_name, images_path, annotations_path, image_extension=".png"): | |
| if not Path(folder_name).is_dir(): | |
| raise Exception("Path "+folder_name+" not found.") | |
| oocytes = sorted(f.stem for f in Path(folder_name).iterdir() if f.suffix == image_extension) | |
| return Dataset(name, oocytes, images_path, annotations_path, image_extension) | |
| def from_file(file_name: Path): | |
| if not Path(file_name).is_file(): | |
| raise Exception("File "+str(file_name)+" not found") | |
| json_data = open(file_name).read() | |
| data = json.loads(json_data) | |
| if "image_extension" not in data: | |
| data['image_extension'] = ".png" | |
| dataset = Dataset(data["name"], data["oocytes"], data["images"], data["annotations"], data["image_extension"]) | |
| return dataset | |
| def create(name, images_path:str, annotations_path:str, | |
| image_extension=".png", stages=[IMMATURE, MATURE]): | |
| #init_path(AIX_DATA / images_path, stages) | |
| #if annotations_path!="": | |
| # init_path(AIX_DATA / annotations_path, stages) | |
| return Dataset(name, [], images_path, annotations_path, image_extension, create_folders=True) | |
| def num_images(self): | |
| return len(self.stages)*len(self.oocytes) | |
| def save(self, file_name): | |
| d = {"name": self.name, "oocytes" : self.oocytes, | |
| "image_extension": self.extension, | |
| "images": str(self.images_path), | |
| "annotations": str(self.annotations_path)} | |
| with open(file_name, "w") as f: | |
| f.write(json.dumps(d)) | |
| def has_annotations(self): | |
| return self.annotations_path is not None | |
| def new_item(self, mask=False, stage="", index=""): | |
| return Item(self, mask, index=index, stage=stage, extension=self.extension) | |
| def cv_item_iterator(self, k=10, seed=42, maturity=None): | |
| random_arr = np.arange(len(self.oocytes)) | |
| np.random.seed(seed) | |
| np.random.shuffle(random_arr) | |
| oocyte_items = [] | |
| mask_items = [] | |
| for i in random_arr: | |
| oocyte_index = self.oocytes[i] | |
| if maturity is None or maturity == IMMATURE: | |
| oocyte_items.append(self.new_item(mask=False, stage=IMMATURE, index=oocyte_index)) | |
| mask_items.append(self.new_item(mask=True, stage=IMMATURE, index=oocyte_index)) | |
| if maturity is None or maturity == MATURE: | |
| oocyte_items.append(self.new_item(mask=False, stage=MATURE, index=oocyte_index)) | |
| mask_items.append(self.new_item(mask=True, stage=MATURE, index=oocyte_index)) | |
| fold_sizes = np.repeat(len(self.oocytes)// k, k) | |
| # Adjust sizes when len no multiple of k | |
| fold_sizes[:len(self.oocytes) % k] += 1 | |
| if maturity is None: | |
| fold_sizes *= 2 | |
| num_fold = np.repeat(np.arange(k), fold_sizes) | |
| oocyte_items = np.array(oocyte_items) | |
| mask_items = np.array(mask_items) | |
| for fold in range(k): | |
| x_train = oocyte_items[num_fold != fold] | |
| y_train = mask_items[num_fold != fold] | |
| x_test = oocyte_items[num_fold == fold] | |
| y_test = mask_items[num_fold == fold] | |
| yield x_train, x_test, y_train, y_test | |
| def tf_dataset_from_items(cls, x, y, image_shape, mask_shape): | |
| def f(): | |
| for x_item, y_item in zip(x, y): | |
| yield x_item.tensor(image_shape), y_item.norm_tensor(mask_shape) | |
| return tf.data.Dataset.from_generator(f, | |
| output_signature=(tf.TensorSpec(shape=image_shape, dtype=tf.float32), | |
| tf.TensorSpec(shape=mask_shape, dtype=tf.float32))) | |
| def cv_tf_dataset_iterator(self, image_shape, mask_shape, k=10, seed=42, maturity=None): | |
| for x_train, x_test, y_train, y_test in self.cv_item_iterator(k=k, seed=seed, maturity=maturity): | |
| train = self.tf_dataset_from_items(x_train, y_train, image_shape, mask_shape) | |
| test = self.tf_dataset_from_items(x_test, y_test, image_shape, mask_shape) | |
| yield (x_train, y_train), train, (x_test, y_test), test | |
| def train_test_iterator(self, k=10, seed=42): | |
| random_arr = np.arange(len(self.oocytes)) | |
| np.random.seed(seed) | |
| np.random.shuffle(random_arr) | |
| image_files = [] | |
| mask_files = [] | |
| for idx in random_arr: | |
| for stage in self.stages: | |
| image_files.append((Path(self.rooted_images_path) / stage / (self.oocytes[idx])).as_posix()) | |
| mask_files.append((Path(self.rooted_annotations_path) / stage / (self.oocytes[idx])).as_posix()) | |
| fold_sizes = np.repeat(len(self.oocytes)// k, k) | |
| # Adjust sizes when len no multiple of k | |
| fold_sizes[:len(self.oocytes) % k] += 1 | |
| num_fold = np.repeat(np.arange(10), fold_sizes * 2) | |
| image_files = np.array(image_files) | |
| mask_files = np.array(mask_files) | |
| for fold in range(k): | |
| x_train = image_files[num_fold!=fold] | |
| y_train = mask_files[num_fold!=fold] | |
| x_test = image_files[num_fold==fold] | |
| y_test = mask_files[num_fold==fold] | |
| yield x_train, x_test, y_train, y_test | |
| def train_test_split(self, percent=90, seed=42): | |
| random_arr = np.arange(len(self.oocytes)) | |
| np.random.seed(seed) | |
| np.random.shuffle(random_arr) | |
| first_test = math.floor(percent * len(self.oocytes)/100.) | |
| oocytes_a = np.array(self.oocytes) | |
| train_oocytes = list(oocytes_a[:first_test]) | |
| test_oocytes = list(oocytes_a[first_test:]) | |
| train_ds = Dataset(self.name+"train", train_oocytes, self.images_path, self.annotations_path) | |
| test_ds = Dataset(self.name+"test", test_oocytes, self.images_path, self.annotations_path) | |
| return train_ds, test_ds | |
| def tfDataset(self): | |
| idx = self.oocytes[0] | |
| image_shape = self.new_item(mask=False, stage=IMMATURE, index=idx).tensor().shape | |
| mask_shape = self.new_item(mask=True, stage=IMMATURE, index=idx).tensor().shape | |
| return tf.data.Dataset.from_generator(self.iterate_pairs, | |
| output_signature=(tf.TensorSpec(shape=image_shape, dtype=tf.float32), | |
| tf.TensorSpec(shape=mask_shape, dtype=tf.float32))) | |
| def tfDataset_fixed_shape(self, image_shape, mask_shape): | |
| def f(): | |
| for x_item, y_item in self.iterate_pairs(tensor=False): | |
| yield x_item.tensor(image_shape), y_item.norm_tensor(mask_shape) | |
| return tf.data.Dataset.from_generator(f, | |
| output_signature=(tf.TensorSpec(shape=image_shape, dtype=tf.float32), | |
| tf.TensorSpec(shape=mask_shape, dtype=tf.float32))) | |
| def iterate_pairs(self, tensor=True): | |
| for idx in self.oocytes: | |
| for stage in self.stages: | |
| x = self.new_item(mask=False, stage=stage, index=idx) | |
| y = self.new_item(mask=True, stage=stage, index=idx) | |
| if tensor: | |
| x = x.tensor() | |
| y = y.tensor() | |
| yield x, y | |
| def iterate_items(self): | |
| for idx in self.oocytes: | |
| for stage in self.stages: | |
| yield self.new_item(mask=False, stage=stage, index=idx) | |
| yield self.new_item(mask=True, stage=stage, index=idx) | |
| def iterate_oocyte_items(self, tensor=True): | |
| for idx in self.oocytes: | |
| for stage in self.stages: | |
| x = self.new_item(mask=False, stage=stage, index=idx) | |
| if tensor: | |
| x = x.tensor() | |
| yield x | |
| def iterate_mask_items(self): | |
| for idx in self.oocytes: | |
| for stage in self.stages: | |
| yield self.new_item(mask=True, stage=stage, index=idx) | |
| def iterate_oocyte_masks(self): | |
| for idx in self.oocytes: | |
| masks = [] | |
| for stage in self.stages: | |
| x = self.new_item(mask=True, stage=stage, index=idx) | |
| masks.append(x) | |
| yield masks | |
| def __repr__(self): | |
| return "<Dataset: {}>".format(self.name) | |
| def add_oocyte(self, index): | |
| if index not in self.oocytes: | |
| self.oocytes.append(index) | |