|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import PIL |
|
|
import skimage.filters |
|
|
from monai.config.type_definitions import KeysCollection |
|
|
from monai.data.wsi_reader import WSIReader |
|
|
from monai.transforms.transform import MapTransform |
|
|
from openslide import OpenSlide |
|
|
|
|
|
from gigapath.preprocessing.data import box_utils |
|
|
|
|
|
|
|
|
def get_luminance(slide: np.ndarray) -> np.ndarray: |
|
|
"""Compute a grayscale version of the input slide. |
|
|
|
|
|
:param slide: The RGB image array in (*, C, H, W) format. |
|
|
:return: The single-channel luminance array as (*, H, W). |
|
|
""" |
|
|
|
|
|
return slide.mean(axis=-3, dtype=np.float16) |
|
|
|
|
|
|
|
|
def segment_foreground(slide: np.ndarray, threshold: Optional[float] = None) \ |
|
|
-> Tuple[np.ndarray, float]: |
|
|
"""Segment the given slide by thresholding its luminance. |
|
|
|
|
|
:param slide: The RGB image array in (*, C, H, W) format. |
|
|
:param threshold: Pixels with luminance below this value will be considered foreground. |
|
|
If `None` (default), an optimal threshold will be estimated automatically using Otsu's method. |
|
|
:return: A tuple containing the boolean output array in (*, H, W) format and the threshold used. |
|
|
""" |
|
|
luminance = get_luminance(slide) |
|
|
if threshold is None: |
|
|
threshold = skimage.filters.threshold_otsu(luminance) |
|
|
logging.info(f"Otsu threshold from luminance: {threshold}") |
|
|
return luminance < threshold, threshold |
|
|
|
|
|
|
|
|
|
|
|
class ReadImaged(MapTransform): |
|
|
"""Basic transform to read image files.""" |
|
|
|
|
|
def __init__(self, reader: WSIReader, keys: KeysCollection, |
|
|
allow_missing_keys: bool = False, **kwargs: Any) -> None: |
|
|
super().__init__(keys, allow_missing_keys=allow_missing_keys) |
|
|
self.reader = reader |
|
|
self.kwargs = kwargs |
|
|
|
|
|
def __call__(self, data: Dict) -> Dict: |
|
|
for key in self.keys: |
|
|
if key in data or not self.allow_missing_keys: |
|
|
data[key] = self.reader.read(data[key], **self.kwargs) |
|
|
return data |
|
|
|
|
|
|
|
|
|
|
|
def _get_image_size(img, size=None, level=None, location=(0, 0), backend="openslide"): |
|
|
max_size = [] |
|
|
downsampling_factor = [] |
|
|
if backend == "openslide": |
|
|
downsampling_factor = img.level_downsamples[level] |
|
|
max_size = img.level_dimensions[level][::-1] |
|
|
elif backend == "cucim": |
|
|
downsampling_factor = img.resolutions["level_downsamples"][level] |
|
|
max_size = img.resolutions["level_dimensions"][level][::-1] |
|
|
elif backend == "tifffile": |
|
|
level0_size = img.pages[0].shape[:2] |
|
|
max_size = img.pages[level].shape[:2] |
|
|
downsampling_factor = np.mean([level0_size[i] / max_size[i] for i in range(len(max_size))]) |
|
|
|
|
|
|
|
|
level_location = [round(location[i] / downsampling_factor) for i in range(len(location))] |
|
|
size = [max_size[i] - level_location[i] for i in range(len(max_size))] |
|
|
|
|
|
return size |
|
|
|
|
|
|
|
|
def load_slide_at_level(reader: WSIReader, slide_obj: OpenSlide, level: int) -> np.ndarray: |
|
|
"""Load full slide array at the given magnification level. |
|
|
|
|
|
This is a manual workaround for a MONAI bug (https://github.com/Project-MONAI/MONAI/issues/3415) |
|
|
fixed in a currently unreleased PR (https://github.com/Project-MONAI/MONAI/pull/3417). |
|
|
|
|
|
:param reader: A MONAI `WSIReader` using OpenSlide backend. |
|
|
:param slide_obj: The OpenSlide image object returned by `reader.read(<image_file>)`. |
|
|
:param level: Index of the desired magnification level as defined in the `slide_obj` headers. |
|
|
:return: The loaded image array in (C, H, W) format. |
|
|
""" |
|
|
size = _get_image_size(slide_obj, level=level) |
|
|
img_data, meta_data = reader.get_data(slide_obj, size=size, level=level) |
|
|
logging.info(f"img: {img_data.dtype} {img_data.shape}, metadata: {meta_data}") |
|
|
|
|
|
return img_data |
|
|
|
|
|
def save_image(array_chw: np.ndarray, path: Path) -> PIL.Image: |
|
|
"""Save an image array in (C, H, W) format to disk.""" |
|
|
path.parent.mkdir(parents=True, exist_ok=True) |
|
|
array_hwc = np.moveaxis(array_chw, 0, -1).astype(np.uint8).squeeze() |
|
|
pil_image = PIL.Image.fromarray(array_hwc) |
|
|
pil_image.convert('RGB').save(path) |
|
|
return pil_image |
|
|
|
|
|
class LoadROId(MapTransform): |
|
|
"""Transform that loads a pathology slide, cropped to the foreground bounding box (ROI). |
|
|
|
|
|
Operates on dictionaries, replacing the file paths in `image_key` with the |
|
|
respective loaded arrays, in (C, H, W) format. Also adds the following meta-data entries: |
|
|
- `'location'` (tuple): top-right coordinates of the bounding box |
|
|
- `'size'` (tuple): width and height of the bounding box |
|
|
- `'level'` (int): chosen magnification level |
|
|
- `'scale'` (float): corresponding scale, loaded from the file |
|
|
""" |
|
|
|
|
|
def __init__(self, image_reader: WSIReader, image_key: str = "image", level: int = 0, |
|
|
margin: int = 0, foreground_threshold: Optional[float] = None) -> None: |
|
|
""" |
|
|
:param reader: An instance of MONAI's `WSIReader`. |
|
|
:param image_key: Image key in the input and output dictionaries. |
|
|
:param level: Magnification level to load from the raw multi-scale files. |
|
|
:param margin: Amount in pixels by which to enlarge the estimated bounding box for cropping. |
|
|
""" |
|
|
super().__init__([image_key], allow_missing_keys=False) |
|
|
self.image_reader = image_reader |
|
|
self.image_key = image_key |
|
|
self.level = level |
|
|
self.margin = margin |
|
|
self.foreground_threshold = foreground_threshold |
|
|
|
|
|
def _get_bounding_box(self, slide_obj: OpenSlide) -> box_utils.Box: |
|
|
|
|
|
highest_level = slide_obj.level_count - 1 |
|
|
slide = load_slide_at_level(self.image_reader, slide_obj, level=highest_level) |
|
|
|
|
|
if slide_obj.level_count == 1: |
|
|
logging.warning(f"Only one image level found. segment_foregound will use a lot of memory.") |
|
|
|
|
|
foreground_mask, threshold = segment_foreground(slide, self.foreground_threshold) |
|
|
|
|
|
scale = slide_obj.level_downsamples[highest_level] |
|
|
bbox = scale * box_utils.get_bounding_box(foreground_mask).add_margin(self.margin) |
|
|
return bbox, threshold |
|
|
|
|
|
def __call__(self, data: Dict) -> Dict: |
|
|
logging.info(f"LoadROId: read {data[self.image_key]}") |
|
|
image_obj: OpenSlide = self.image_reader.read(data[self.image_key]) |
|
|
|
|
|
logging.info("LoadROId: get bbox") |
|
|
level0_bbox, threshold = self._get_bounding_box(image_obj) |
|
|
logging.info(f"LoadROId: level0_bbox: {level0_bbox}") |
|
|
|
|
|
|
|
|
|
|
|
scale = image_obj.level_downsamples[self.level] |
|
|
scaled_bbox = level0_bbox / scale |
|
|
|
|
|
origin = (level0_bbox.y, level0_bbox.x) |
|
|
get_data_kwargs = dict(location=origin, |
|
|
size=(scaled_bbox.h, scaled_bbox.w), |
|
|
level=self.level) |
|
|
|
|
|
img_data, _ = self.image_reader.get_data(image_obj, **get_data_kwargs) |
|
|
logging.info(f"img_data: {img_data.dtype} {img_data.shape}") |
|
|
data[self.image_key] = img_data |
|
|
data.update(get_data_kwargs) |
|
|
data["origin"] = origin |
|
|
data["scale"] = scale |
|
|
data["foreground_threshold"] = threshold |
|
|
|
|
|
image_obj.close() |
|
|
return data |
|
|
|