| import typing as t |
| from concurrent.futures import ThreadPoolExecutor |
| from pathlib import Path |
| from tracemalloc import start |
|
|
| import cv2 |
| import numpy as np |
| import rpack |
| from openslide import OpenSlide |
| from PIL import Image |
| from scipy.ndimage import binary_fill_holes |
| from skimage import filters |
| from skimage.morphology import remove_small_objects |
|
|
| if t.TYPE_CHECKING: |
| from _typeshed import StrPath |
|
|
| try: |
| from skimage import img_as_ubyte |
| except: |
| from skimage.util import img_as_ubyte |
|
|
|
|
| def find_contours(arr: np.ndarray, only_outer: bool = True, convex: bool = False): |
| """Find contours in a binary image |
| |
| Parameters |
| ---------- |
| arr : np.ndarray |
| Binary image |
| only_outer : bool |
| If True, only find external contours |
| convex : bool |
| If True, return convex hull of contours |
| |
| Returns |
| ------- |
| contours : list |
| List of contours |
| """ |
| mode = cv2.RETR_EXTERNAL if only_outer else cv2.RETR_LIST |
| cresults = cv2.findContours(arr.astype(np.uint8), mode, cv2.CHAIN_APPROX_SIMPLE) |
|
|
| contours = cresults[1] if len(cresults) == 3 else cresults[0] |
| contours = list(contours) if isinstance(contours, tuple) else contours |
|
|
| if convex: |
| contours = [cv2.convexHull(cnt) for cnt in contours] |
| return contours |
|
|
|
|
| def merge_overlapping_bboxes(bboxes: list): |
| """Merge overlapping bounding boxes |
| |
| Parameters |
| ---------- |
| bboxes : list |
| List of bounding boxes in format (x, y, width, height) |
| """ |
| candidate_count = 0 |
| while candidate_count < len(bboxes): |
| candidate_count += 1 |
| overlap = False |
| candidate_box = bboxes.pop(0) |
| for index, compare_box in enumerate(bboxes): |
| overlapping, new_bbox = merge_if_overlapping(candidate_box, compare_box) |
| if overlapping: |
| overlap = True |
| candidate_count = 0 |
| bboxes.pop(index) |
| bboxes.append(new_bbox) |
| break |
| if not overlap: |
| bboxes.append(candidate_box) |
|
|
|
|
| def merge_if_overlapping(a: tuple, b: tuple): |
| """Check if two bounding boxes overlap and merge them if they do |
| |
| Parameters |
| ---------- |
| a : tuple |
| First bounding box in format (x, y, width, height) |
| b : tuple |
| Second bounding box in format (x, y, width, height) |
| |
| Returns |
| ------- |
| overlapping : bool |
| True if boxes overlap |
| new_bbox : tuple |
| Merged bounding box if overlapping, empty list otherwise |
| """ |
| bottom = np.max([a[0], b[0]]) |
| top = np.min([a[0] + a[2], b[0] + b[2]]) |
| left = np.max([a[1], b[1]]) |
| right = np.min([a[1] + a[3], b[1] + b[3]]) |
|
|
| do_intersect = bottom < top and left < right |
|
|
| if do_intersect: |
| x_min = np.min([a[1], b[1]]) |
| y_min = np.min([a[0], b[0]]) |
| x_max = np.max([a[1] + a[3], b[1] + b[3]]) |
| y_max = np.max([a[0] + a[2], b[0] + b[2]]) |
| new_bbox = (y_min, x_min, y_max - y_min, x_max - x_min) |
| return True, new_bbox |
|
|
| return False, [] |
|
|
|
|
|
|
| def load_slide_img( |
| wsi, |
| level: int = 0, |
| ) -> np.ndarray: |
| """Load slide image with specific level |
| |
| Parameters |
| ---------- |
| wsi : CuImage |
| The CuImage object |
| level : int |
| Slide level to load |
| |
| Returns |
| ------- |
| slide_img : np.ndarray |
| Numpy array with RGB channels |
| """ |
| slide_img = np.asarray(wsi.read_region(level=level, device="gpu", num_workers=32)) |
| if slide_img.shape[2] == 4: |
| slide_img = slide_img[:, :, :-1] |
| return slide_img |
|
|
|
|
| def rgb2gray(img): |
| """Convert RGB image to grayscale |
| |
| Parameters |
| ---------- |
| img : np.ndarray |
| RGB image with 3 channels |
| |
| Returns |
| ------- |
| gray : np.ndarray |
| Grayscale image |
| """ |
| return np.dot(img, [0.299, 0.587, 0.114]) |
|
|
|
|
| def thresh_slide(gray, thresh_val, sigma=13): |
| """Threshold gray image to binary image |
| |
| Parameters |
| ---------- |
| gray : np.ndarray |
| 2D grayscale image |
| thresh_val : float |
| Thresholding value |
| sigma : int |
| Gaussian smoothing sigma |
| |
| Returns |
| ------- |
| bw_img : np.ndarray |
| Binary image |
| """ |
| smooth = filters.gaussian(gray, sigma=sigma) |
| smooth /= np.amax(smooth) |
| bw_img = smooth < thresh_val |
| return bw_img |
|
|
|
|
|
|
| def get_tissue_bboxes( |
| mask: np.ndarray, wsi_width: int, wsi_height: int, min_tissue_size: int = 10000 |
| ): |
| scale = wsi_height / mask.shape[0] |
|
|
| contours = find_contours(mask) |
| areas = [] |
| for cnt in contours: |
| area = cv2.contourArea(cnt) |
| areas.append(area) |
|
|
| large_contours = [] |
| large_areas = [] |
| for i, cnt in enumerate(contours): |
| area_mm = areas[i] |
| if area_mm >= min_tissue_size: |
| large_contours.append(cnt) |
| large_areas.append(area_mm) |
|
|
| areas = large_areas |
|
|
| boxes = [cv2.boundingRect(c) for c in large_contours] |
|
|
| return ( |
| [cv2.boundingRect(c) for c in large_contours] |
| if boxes |
| else [[0, 0, wsi_width, wsi_height]] |
| ) |
|
|
|
|
| def get_tissue_positions_and_packed_size( |
| boxes, |
| wsi_width: int, |
| wsi_height: int, |
| scale: float, |
| ) -> tuple[list[tuple[int, int]], tuple[int, int]]: |
| if len(boxes) > 1: |
| merge_overlapping_bboxes(boxes) |
| boxes = np.array(boxes, dtype=np.float32) * scale |
| if len(boxes.shape) == 1: |
| boxes = boxes[None] |
| boxes[:, :2] = np.floor(boxes[:, :2]) |
| boxes[:, 0] = np.clip(boxes[:, 0], 0, wsi_width - 1) |
| boxes[:, 1] = np.clip(boxes[:, 1], 0, wsi_height - 1) |
| boxes[:, 2:] = np.ceil(boxes[:, 2:]) |
| boxes[:, 2] = np.clip(boxes[:, 2], 0, wsi_width - boxes[:, 0]) |
| boxes[:, 3] = np.clip(boxes[:, 3], 0, wsi_height - boxes[:, 1]) |
| boxes = boxes.astype(np.int32) |
|
|
| box_sizes = [(int(box[2]), int(box[3])) for box in boxes] |
| positions = rpack.pack(box_sizes) |
| packed_size: tuple[int, int] = rpack.bbox_size( |
| box_sizes, positions |
| ) |
|
|
| counter = 0 |
| for sdf in np.arange(0.5, 0.96, 0.05): |
| |
| |
| rparams = { |
| "max_height": int(max(packed_size) * sdf), |
| "max_width": int(max(packed_size) * sdf), |
| } |
| try: |
| positions = rpack.pack(box_sizes, **rparams) |
| packed_size: tuple[int, int] = rpack.bbox_size(box_sizes, positions) |
| break |
| except rpack.PackingImpossibleError as ex: |
| counter += 1 |
|
|
| return positions, (int(packed_size[0]), int(packed_size[1])) |
|
|
|
|
| def pack_slide( |
| wsi_arr: np.ndarray, |
| mask: np.ndarray, |
| min_tissue_size: int = 10000, |
| ): |
| H, W = wsi_arr.shape[:2] |
| boxes = get_tissue_bboxes(mask, W, H, min_tissue_size=min_tissue_size) |
| if len(boxes) > 0: |
| positions, packed_size = get_tissue_positions_and_packed_size( |
| boxes, W, H, H / mask.shape[0] |
| ) |
| img_out = np.full( |
| (packed_size[1], packed_size[0]) + wsi_arr.shape[2:], |
| 255, |
| dtype=wsi_arr.dtype, |
| ) |
| mask_out = np.zeros((packed_size[1], packed_size[0]), dtype=np.bool) |
| for i, pos in enumerate(positions): |
| box = boxes[i] |
| img_out[pos[1] : pos[1] + box[3], pos[0] : pos[0] + box[2]] = wsi_arr[ |
| box[1] : box[1] + box[3], box[0] : box[0] + box[2] |
| ] |
| mask_out[pos[1] : pos[1] + box[3], pos[0] : pos[0] + box[2]] = mask[ |
| box[1] : box[1] + box[3], box[0] : box[0] + box[2] |
| ] |
| else: |
| img_out = wsi_arr |
| mask_out = mask |
|
|
| return img_out, mask_out |
|
|
|
|
| def get_level_downsamples(wsi: OpenSlide): |
| level_downsamples = [] |
| dim_0 = wsi.level_dimensions[0] |
|
|
| for downsample, dim in zip(wsi.level_downsamples, wsi.level_dimensions): |
| estimated_downsample = (dim_0[0] / float(dim[0]), dim_0[1] / float(dim[1])) |
| ( |
| level_downsamples.append(estimated_downsample) |
| if estimated_downsample != (downsample, downsample) |
| else level_downsamples.append((downsample, downsample)) |
| ) |
|
|
| return level_downsamples |
|
|
|
|
| def segment_tissue( |
| wsi_path: Path, |
| seg_level=-1, |
| sthresh=8, |
| sthresh_up=255, |
| mthresh=7, |
| close=4, |
| filter_params={"a_t": 1, "a_h": 1, "max_n_holes": 100}, |
| ref_patch_size=512, |
| ): |
| """ |
| Segment the tissue via HSV -> Median thresholding -> Binary threshold |
| """ |
|
|
| def _filter_contours(contours, hierarchy, filter_params): |
| """ |
| Filter contours by: area. |
| """ |
| filtered = [] |
|
|
| |
| hierarchy_1 = np.flatnonzero(hierarchy[:, 1] == -1) |
| all_holes = [] |
|
|
| |
| for cont_idx in hierarchy_1: |
| |
| cont = contours[cont_idx] |
| |
| holes = np.flatnonzero(hierarchy[:, 1] == cont_idx) |
| |
| a = cv2.contourArea(cont) |
| |
| hole_areas = [cv2.contourArea(contours[hole_idx]) for hole_idx in holes] |
| |
| a = a - np.array(hole_areas).sum() |
| if a == 0: |
| continue |
| if tuple((filter_params["a_t"],)) < tuple((a,)): |
| filtered.append(cont_idx) |
| all_holes.append(holes) |
|
|
| foreground_contours = [contours[cont_idx] for cont_idx in filtered] |
|
|
| hole_contours = [] |
|
|
| for hole_ids in all_holes: |
| unfiltered_holes = [contours[idx] for idx in hole_ids] |
| unfilered_holes = sorted( |
| unfiltered_holes, key=cv2.contourArea, reverse=True |
| ) |
| |
| unfilered_holes = unfilered_holes[: filter_params["max_n_holes"]] |
| filtered_holes = [] |
|
|
| |
| for hole in unfilered_holes: |
| if cv2.contourArea(hole) > filter_params["a_h"]: |
| filtered_holes.append(hole) |
|
|
| hole_contours.append(filtered_holes) |
|
|
| return foreground_contours, hole_contours |
|
|
| def draw_white_bands(img: np.ndarray, thickness: int): |
| height, width = img.shape[:2] |
| white = [255, 255, 255] |
|
|
| |
| |
| cv2.rectangle(img, (0, 0), (width, thickness), white, -1) |
|
|
| |
| cv2.rectangle(img, (0, height - thickness), (width, height), white, -1) |
|
|
| |
| cv2.rectangle(img, (0, 0), (thickness, height), white, -1) |
|
|
| |
| cv2.rectangle(img, (width - thickness, 0), (width, height), white, -1) |
|
|
| with OpenSlide(str(wsi_path)) as wsi: |
| if seg_level < 0: |
| seg_level = wsi.get_best_level_for_downsample(64) |
|
|
| img = np.asarray( |
| wsi.read_region( |
| location=(0, 0), level=seg_level, size=wsi.level_dimensions[seg_level] |
| ) |
| ) |
|
|
| img_rgb = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB) |
| draw_white_bands(img_rgb, thickness=20) |
| img_gray = cv2.cvtColor(img, cv2.COLOR_RGBA2GRAY) |
|
|
| H, W = img_rgb.shape[:2] |
|
|
| B_8, G_8, R_8 = cv2.split(img_rgb) |
| B = B_8.astype(np.int32) |
| G = G_8.astype(np.int32) |
| R = R_8.astype(np.int32) |
|
|
| mask = (R >= 0) & (R <= 110) & (G >= 0) & (G <= 110) & (B >= 0) & (B <= 110) |
|
|
| color_difference1 = np.abs((R) - (G)) <= 15 |
| color_difference2 = np.abs((G) - (B)) <= 15 |
| color_difference3 = np.abs((R) - (B)) <= 15 |
| color_difference = color_difference1 & color_difference2 & color_difference3 |
|
|
| final_mask = mask & color_difference |
|
|
| laplacian = cv2.Laplacian(img_gray, cv2.CV_64F) |
| laplacian_abs = cv2.convertScaleAbs(laplacian) |
| mask = laplacian_abs <= 15 |
| img_rgb[mask] = [255, 255, 255] |
|
|
| img_hsv = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2HSV) |
| img_med = cv2.medianBlur( |
| img_hsv[:, :, 1], mthresh |
| ) |
|
|
| |
| _, img_thresh = cv2.threshold(img_med, sthresh, sthresh_up, cv2.THRESH_BINARY) |
| |
| if close > 0: |
| kernel = np.ones((close, close), np.uint8) |
| img_thresh = cv2.morphologyEx(img_thresh, cv2.MORPH_CLOSE, kernel) |
|
|
| |
| scale = get_level_downsamples(wsi)[seg_level] |
| scaled_ref_patch_area = int(ref_patch_size**2 / (scale[0] * scale[1])) |
| filter_params = filter_params.copy() |
| filter_params["a_t"] = filter_params["a_t"] * scaled_ref_patch_area |
| filter_params["a_h"] = filter_params["a_h"] * scaled_ref_patch_area |
|
|
| |
| contours, hierarchy = cv2.findContours( |
| img_thresh, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE |
| ) |
|
|
| hierarchy = np.squeeze(hierarchy, axis=(0,))[:, 2:] |
| foreground_contours, hole_contours = _filter_contours( |
| contours, hierarchy, filter_params |
| ) |
|
|
| mask = np.zeros(img_rgb.shape[:2], dtype=np.uint8) |
| for i, cont in enumerate(foreground_contours): |
| if cont is None or len(cont) == 0: |
| print(f"Warning: Empty contour at index {i}") |
| continue |
|
|
| if ( |
| cont[:, :, 0].max() >= W |
| or cont[:, :, 1].max() >= H |
| or cont[:, :, 0].min() < 0 |
| or cont[:, :, 1].min() < 0 |
| ): |
| print(f"Warning: Contour {i} coordinates out of bounds!") |
| continue |
|
|
| |
| cv2.fillPoly(mask, [cont], 255) |
|
|
| |
| if i < len(hole_contours) and hole_contours[i]: |
| for hole in hole_contours[i]: |
| cv2.fillPoly(mask, [hole], 0) |
| mask = mask.astype(np.bool) |
| if not mask.any(): |
| mask[:, :] = True |
|
|
| return mask, img_rgb |
|
|
|
|
| def get_mask_path_by_wsi_path(wsi_path: Path, wsi_dir: Path, mask_dir: Path) -> Path: |
| wsi_path, wsi_dir, mask_dir = ( |
| wsi_path.absolute(), |
| wsi_dir.absolute(), |
| mask_dir.absolute(), |
| ) |
| rel_path = wsi_path.relative_to(wsi_dir) |
| stitch_path_prefix = mask_dir / rel_path |
| stitch_path_prefix = stitch_path_prefix.parent / rel_path.stem |
| extensions = ["jpg", "jpeg", "png", "webp"] |
| extensions += [ext.upper() for ext in extensions] |
| stitch_paths = [ |
| stitch_path_prefix.parent / (rel_path.stem + f".{ext}") for ext in extensions |
| ] |
| stitch_paths += [ |
| stitch_path_prefix.parent / rel_path.stem / (rel_path.stem + f".{ext}") |
| for ext in extensions |
| ] |
| ret = None |
| for stitch_path in stitch_paths: |
| if stitch_path.exists(): |
| ret = stitch_path |
| if ret is None: |
| raise FileNotFoundError( |
| f"No mask for wsi '{wsi_path}' in mask dir '{mask_dir}' (candidates: {', '.join([str(p) for p in stitch_paths])})" |
| ) |
| return ret |
|
|
|
|
| def read_mask(mask_path: Path) -> np.ndarray: |
| img = Image.open(mask_path) |
| w, h = img.size |
| return np.asarray(img).reshape((h, w, -1)).max(-1) > 0 |
|
|
|
|
| def read_mask_by_wsi_path(wsi_path: Path, wsi_dir: Path, mask_dir: Path) -> np.ndarray: |
| wsi_path, wsi_dir, mask_dir = ( |
| wsi_path.absolute(), |
| wsi_dir.absolute(), |
| mask_dir.absolute(), |
| ) |
| mask_path = get_mask_path_by_wsi_path(wsi_path, wsi_dir, mask_dir) |
| return read_mask(mask_path) |
|
|
|
|