| import numpy as np |
|
|
| from numba import jit |
| from tqdm import tqdm |
| from scipy.optimize import linear_sum_assignment |
| from skimage.measure import regionprops |
| from collections import namedtuple |
| from csbdeep.utils import _raise |
|
|
| matching_criteria = dict() |
|
|
|
|
| def label_are_sequential(y): |
| """ returns true if y has only sequential labels from 1... """ |
| labels = np.unique(y) |
| return (set(labels)-{0}) == set(range(1,1+labels.max())) |
|
|
|
|
| def is_array_of_integers(y): |
| return isinstance(y,np.ndarray) and np.issubdtype(y.dtype, np.integer) |
|
|
|
|
| def _check_label_array(y, name=None, check_sequential=False): |
| err = ValueError("{label} must be an array of {integers}.".format( |
| label = 'labels' if name is None else name, |
| integers = ('sequential ' if check_sequential else '') + 'non-negative integers', |
| )) |
| is_array_of_integers(y) or _raise(err) |
| if len(y) == 0: |
| return True |
| if check_sequential: |
| label_are_sequential(y) or _raise(err) |
| else: |
| y.min() >= 0 or _raise(err) |
| return True |
|
|
|
|
| def label_overlap(x, y, check=True): |
| if check: |
| _check_label_array(x,'x',True) |
| _check_label_array(y,'y',True) |
| x.shape == y.shape or _raise(ValueError("x and y must have the same shape")) |
| return _label_overlap(x, y) |
|
|
| @jit(nopython=True) |
| def _label_overlap(x, y): |
| x = x.ravel() |
| y = y.ravel() |
| overlap = np.zeros((1+x.max(),1+y.max()), dtype=np.uint) |
| for i in range(len(x)): |
| overlap[x[i],y[i]] += 1 |
| return overlap |
|
|
|
|
| def _safe_divide(x,y, eps=1e-10): |
| """computes a safe divide which returns 0 if y is zero""" |
| if np.isscalar(x) and np.isscalar(y): |
| return x/y if np.abs(y)>eps else 0.0 |
| else: |
| out = np.zeros(np.broadcast(x,y).shape, np.float32) |
| np.divide(x,y, out=out, where=np.abs(y)>eps) |
| return out |
|
|
|
|
| def intersection_over_union(overlap): |
| _check_label_array(overlap,'overlap') |
| if np.sum(overlap) == 0: |
| return overlap |
| n_pixels_pred = np.sum(overlap, axis=0, keepdims=True) |
| n_pixels_true = np.sum(overlap, axis=1, keepdims=True) |
| return _safe_divide(overlap, (n_pixels_pred + n_pixels_true - overlap)) |
|
|
| matching_criteria['iou'] = intersection_over_union |
|
|
|
|
| def intersection_over_true(overlap): |
| _check_label_array(overlap,'overlap') |
| if np.sum(overlap) == 0: |
| return overlap |
| n_pixels_true = np.sum(overlap, axis=1, keepdims=True) |
| return _safe_divide(overlap, n_pixels_true) |
|
|
| matching_criteria['iot'] = intersection_over_true |
|
|
|
|
| def intersection_over_pred(overlap): |
| _check_label_array(overlap,'overlap') |
| if np.sum(overlap) == 0: |
| return overlap |
| n_pixels_pred = np.sum(overlap, axis=0, keepdims=True) |
| return _safe_divide(overlap, n_pixels_pred) |
|
|
| matching_criteria['iop'] = intersection_over_pred |
|
|
|
|
| def precision(tp,fp,fn): |
| return tp/(tp+fp) if tp > 0 else 0 |
| def recall(tp,fp,fn): |
| return tp/(tp+fn) if tp > 0 else 0 |
| def accuracy(tp,fp,fn): |
| |
| |
| return tp/(tp+fp+fn) if tp > 0 else 0 |
| def f1(tp,fp,fn): |
| |
| return (2*tp)/(2*tp+fp+fn) if tp > 0 else 0 |
|
|
|
|
| def matching(y_true, y_pred, thresh=0.5, criterion='iou', report_matches=False): |
| """Calculate detection/instance segmentation metrics between ground truth and predicted label images. |
| |
| Currently, the following metrics are implemented: |
| |
| 'fp', 'tp', 'fn', 'precision', 'recall', 'accuracy', 'f1', 'criterion', 'thresh', 'n_true', 'n_pred', 'mean_true_score', 'mean_matched_score', 'panoptic_quality' |
| |
| Corresponding objects of y_true and y_pred are counted as true positives (tp), false positives (fp), and false negatives (fn) |
| whether their intersection over union (IoU) >= thresh (for criterion='iou', which can be changed) |
| |
| * mean_matched_score is the mean IoUs of matched true positives |
| |
| * mean_true_score is the mean IoUs of matched true positives but normalized by the total number of GT objects |
| |
| * panoptic_quality defined as in Eq. 1 of Kirillov et al. "Panoptic Segmentation", CVPR 2019 |
| |
| Parameters |
| ---------- |
| y_true: ndarray |
| ground truth label image (integer valued) |
| y_pred: ndarray |
| predicted label image (integer valued) |
| thresh: float |
| threshold for matching criterion (default 0.5) |
| criterion: string |
| matching criterion (default IoU) |
| report_matches: bool |
| if True, additionally calculate matched_pairs and matched_scores (note, that this returns even gt-pred pairs whose scores are below 'thresh') |
| |
| Returns |
| ------- |
| Matching object with different metrics as attributes |
| |
| Examples |
| -------- |
| >>> y_true = np.zeros((100,100), np.uint16) |
| >>> y_true[10:20,10:20] = 1 |
| >>> y_pred = np.roll(y_true,5,axis = 0) |
| |
| >>> stats = matching(y_true, y_pred) |
| >>> print(stats) |
| Matching(criterion='iou', thresh=0.5, fp=1, tp=0, fn=1, precision=0, recall=0, accuracy=0, f1=0, n_true=1, n_pred=1, mean_true_score=0.0, mean_matched_score=0.0, panoptic_quality=0.0) |
| |
| """ |
| _check_label_array(y_true,'y_true') |
| _check_label_array(y_pred,'y_pred') |
| y_true.shape == y_pred.shape or _raise(ValueError("y_true ({y_true.shape}) and y_pred ({y_pred.shape}) have different shapes".format(y_true=y_true, y_pred=y_pred))) |
| criterion in matching_criteria or _raise(ValueError("Matching criterion '%s' not supported." % criterion)) |
| if thresh is None: thresh = 0 |
| thresh = float(thresh) if np.isscalar(thresh) else map(float,thresh) |
|
|
| y_true, _, map_rev_true = relabel_sequential(y_true) |
| y_pred, _, map_rev_pred = relabel_sequential(y_pred) |
|
|
| overlap = label_overlap(y_true, y_pred, check=False) |
| scores = matching_criteria[criterion](overlap) |
| assert 0 <= np.min(scores) <= np.max(scores) <= 1 |
|
|
| |
| scores = scores[1:,1:] |
| n_true, n_pred = scores.shape |
| n_matched = min(n_true, n_pred) |
|
|
| def _single(thr): |
| |
| not_trivial = n_matched > 0 |
| if not_trivial: |
| |
| costs = -(scores >= thr).astype(float) - scores / (2*n_matched) |
| true_ind, pred_ind = linear_sum_assignment(costs) |
| assert n_matched == len(true_ind) == len(pred_ind) |
| match_ok = scores[true_ind,pred_ind] >= thr |
| tp = np.count_nonzero(match_ok) |
| else: |
| tp = 0 |
| fp = n_pred - tp |
| fn = n_true - tp |
| |
| |
|
|
| |
| sum_matched_score = np.sum(scores[true_ind,pred_ind][match_ok]) if not_trivial else 0.0 |
|
|
| |
| mean_matched_score = _safe_divide(sum_matched_score, tp) |
| |
| mean_true_score = _safe_divide(sum_matched_score, n_true) |
| panoptic_quality = _safe_divide(sum_matched_score, tp+fp/2+fn/2) |
|
|
| stats_dict = dict ( |
| criterion = criterion, |
| thresh = thr, |
| fp = fp, |
| tp = tp, |
| fn = fn, |
| precision = precision(tp,fp,fn), |
| recall = recall(tp,fp,fn), |
| accuracy = accuracy(tp,fp,fn), |
| f1 = f1(tp,fp,fn), |
| n_true = n_true, |
| n_pred = n_pred, |
| mean_true_score = mean_true_score, |
| mean_matched_score = mean_matched_score, |
| panoptic_quality = panoptic_quality, |
| ) |
| if bool(report_matches): |
| if not_trivial: |
| stats_dict.update ( |
| |
| matched_pairs = tuple((int(map_rev_true[i]),int(map_rev_pred[j])) for i,j in zip(1+true_ind,1+pred_ind)), |
| matched_scores = tuple(scores[true_ind,pred_ind]), |
| matched_tps = tuple(map(int,np.flatnonzero(match_ok))), |
| ) |
| else: |
| stats_dict.update ( |
| matched_pairs = (), |
| matched_scores = (), |
| matched_tps = (), |
| ) |
| return namedtuple('Matching',stats_dict.keys())(*stats_dict.values()) |
|
|
| return _single(thresh) if np.isscalar(thresh) else tuple(map(_single,thresh)) |
|
|
|
|
|
|
| def matching_dataset(y_true, y_pred, thresh=0.5, criterion='iou', by_image=False, show_progress=True, parallel=False): |
| """matching metrics for list of images, see `stardist.matching.matching` |
| """ |
| len(y_true) == len(y_pred) or _raise(ValueError("y_true and y_pred must have the same length.")) |
| return matching_dataset_lazy ( |
| tuple(zip(y_true,y_pred)), thresh=thresh, criterion=criterion, by_image=by_image, show_progress=show_progress, parallel=parallel, |
| ) |
|
|
|
|
|
|
| def matching_dataset_lazy(y_gen, thresh=0.5, criterion='iou', by_image=False, show_progress=True, parallel=False): |
|
|
| expected_keys = set(('fp', 'tp', 'fn', 'precision', 'recall', 'accuracy', 'f1', 'criterion', 'thresh', 'n_true', 'n_pred', 'mean_true_score', 'mean_matched_score', 'panoptic_quality')) |
|
|
| single_thresh = False |
| if np.isscalar(thresh): |
| single_thresh = True |
| thresh = (thresh,) |
|
|
| tqdm_kwargs = {} |
| tqdm_kwargs['disable'] = not bool(show_progress) |
| if int(show_progress) > 1: |
| tqdm_kwargs['total'] = int(show_progress) |
|
|
| |
| if parallel: |
| from concurrent.futures import ThreadPoolExecutor |
| fn = lambda pair: matching(*pair, thresh=thresh, criterion=criterion, report_matches=False) |
| with ThreadPoolExecutor() as pool: |
| stats_all = tuple(pool.map(fn, tqdm(y_gen,**tqdm_kwargs))) |
| else: |
| stats_all = tuple ( |
| matching(y_t, y_p, thresh=thresh, criterion=criterion, report_matches=False) |
| for y_t,y_p in tqdm(y_gen,**tqdm_kwargs) |
| ) |
|
|
| |
| n_images, n_threshs = len(stats_all), len(thresh) |
| accumulate = [{} for _ in range(n_threshs)] |
| for stats in stats_all: |
| for i,s in enumerate(stats): |
| acc = accumulate[i] |
| for k,v in s._asdict().items(): |
| if k == 'mean_true_score' and not bool(by_image): |
| |
| acc[k] = acc.setdefault(k,0) + v * s.n_true |
| else: |
| try: |
| acc[k] = acc.setdefault(k,0) + v |
| except TypeError: |
| pass |
|
|
| |
| for thr,acc in zip(thresh,accumulate): |
| set(acc.keys()) == expected_keys or _raise(ValueError("unexpected keys")) |
| acc['criterion'] = criterion |
| acc['thresh'] = thr |
| acc['by_image'] = bool(by_image) |
| if bool(by_image): |
| for k in ('precision', 'recall', 'accuracy', 'f1', 'mean_true_score', 'mean_matched_score', 'panoptic_quality'): |
| acc[k] /= n_images |
| else: |
| tp, fp, fn, n_true = acc['tp'], acc['fp'], acc['fn'], acc['n_true'] |
| sum_matched_score = acc['mean_true_score'] |
|
|
| mean_matched_score = _safe_divide(sum_matched_score, tp) |
| mean_true_score = _safe_divide(sum_matched_score, n_true) |
| panoptic_quality = _safe_divide(sum_matched_score, tp+fp/2+fn/2) |
|
|
| acc.update( |
| precision = precision(tp,fp,fn), |
| recall = recall(tp,fp,fn), |
| accuracy = accuracy(tp,fp,fn), |
| f1 = f1(tp,fp,fn), |
| mean_true_score = mean_true_score, |
| mean_matched_score = mean_matched_score, |
| panoptic_quality = panoptic_quality, |
| ) |
|
|
| accumulate = tuple(namedtuple('DatasetMatching',acc.keys())(*acc.values()) for acc in accumulate) |
| return accumulate[0] if single_thresh else accumulate |
|
|
|
|
|
|
| |
| def relabel_sequential(label_field, offset=1): |
| """Relabel arbitrary labels to {`offset`, ... `offset` + number_of_labels}. |
| |
| This function also returns the forward map (mapping the original labels to |
| the reduced labels) and the inverse map (mapping the reduced labels back |
| to the original ones). |
| |
| Parameters |
| ---------- |
| label_field : numpy array of int, arbitrary shape |
| An array of labels, which must be non-negative integers. |
| offset : int, optional |
| The return labels will start at `offset`, which should be |
| strictly positive. |
| |
| Returns |
| ------- |
| relabeled : numpy array of int, same shape as `label_field` |
| The input label field with labels mapped to |
| {offset, ..., number_of_labels + offset - 1}. |
| The data type will be the same as `label_field`, except when |
| offset + number_of_labels causes overflow of the current data type. |
| forward_map : numpy array of int, shape ``(label_field.max() + 1,)`` |
| The map from the original label space to the returned label |
| space. Can be used to re-apply the same mapping. See examples |
| for usage. The data type will be the same as `relabeled`. |
| inverse_map : 1D numpy array of int, of length offset + number of labels |
| The map from the new label space to the original space. This |
| can be used to reconstruct the original label field from the |
| relabeled one. The data type will be the same as `relabeled`. |
| |
| Notes |
| ----- |
| The label 0 is assumed to denote the background and is never remapped. |
| |
| The forward map can be extremely big for some inputs, since its |
| length is given by the maximum of the label field. However, in most |
| situations, ``label_field.max()`` is much smaller than |
| ``label_field.size``, and in these cases the forward map is |
| guaranteed to be smaller than either the input or output images. |
| |
| Examples |
| -------- |
| >>> from skimage.segmentation import relabel_sequential |
| >>> label_field = np.array([1, 1, 5, 5, 8, 99, 42]) |
| >>> relab, fw, inv = relabel_sequential(label_field) |
| >>> relab |
| array([1, 1, 2, 2, 3, 5, 4]) |
| >>> fw |
| array([0, 1, 0, 0, 0, 2, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5]) |
| >>> inv |
| array([ 0, 1, 5, 8, 42, 99]) |
| >>> (fw[label_field] == relab).all() |
| True |
| >>> (inv[relab] == label_field).all() |
| True |
| >>> relab, fw, inv = relabel_sequential(label_field, offset=5) |
| >>> relab |
| array([5, 5, 6, 6, 7, 9, 8]) |
| """ |
| offset = int(offset) |
| if offset <= 0: |
| raise ValueError("Offset must be strictly positive.") |
| if np.min(label_field) < 0: |
| raise ValueError("Cannot relabel array that contains negative values.") |
| max_label = int(label_field.max()) |
| if not np.issubdtype(label_field.dtype, np.integer): |
| new_type = np.min_scalar_type(max_label) |
| label_field = label_field.astype(new_type) |
| labels = np.unique(label_field) |
| labels0 = labels[labels != 0] |
| new_max_label = offset - 1 + len(labels0) |
| new_labels0 = np.arange(offset, new_max_label + 1) |
| output_type = label_field.dtype |
| required_type = np.min_scalar_type(new_max_label) |
| if np.dtype(required_type).itemsize > np.dtype(label_field.dtype).itemsize: |
| output_type = required_type |
| forward_map = np.zeros(max_label + 1, dtype=output_type) |
| forward_map[labels0] = new_labels0 |
| inverse_map = np.zeros(new_max_label + 1, dtype=output_type) |
| inverse_map[offset:] = labels0 |
| relabeled = forward_map[label_field] |
| return relabeled, forward_map, inverse_map |
|
|
|
|
|
|
| def group_matching_labels(ys, thresh=1e-10, criterion='iou'): |
| """ |
| Group matching objects (i.e. assign the same label id) in a |
| list of label images (e.g. consecutive frames of a time-lapse). |
| |
| Uses function `matching` (with provided `criterion` and `thresh`) to |
| iteratively/greedily match and group objects/labels in consecutive images of `ys`. |
| To that end, matching objects are grouped together by assigning the same label id, |
| whereas unmatched objects are assigned a new label id. |
| At the end of this process, each label group will have been assigned a unique id. |
| |
| Note that the label images `ys` will not be modified. Instead, they will initially |
| be duplicated and converted to data type `np.int32` before objects are grouped and the result |
| is returned. (Note that `np.int32` limits the number of label groups to at most 2147483647.) |
| |
| Example |
| ------- |
| import numpy as np |
| from stardist.data import test_image_nuclei_2d |
| from stardist.matching import group_matching_labels |
| |
| _y = test_image_nuclei_2d(return_mask=True)[1] |
| labels = np.stack([_y, 2*np.roll(_y,10)], axis=0) |
| |
| labels_new = group_matching_labels(labels) |
| |
| Parameters |
| ---------- |
| ys : np.ndarray or list/tuple of np.ndarray |
| list/array of integer labels (2D or 3D) |
| |
| """ |
| |
| len(ys) > 1 or _raise(ValueError("'ys' must have 2 or more entries")) |
| if isinstance(ys, np.ndarray): |
| _check_label_array(ys, 'ys') |
| ys.ndim > 1 or _raise(ValueError("'ys' must be at least 2-dimensional")) |
| ys_grouped = np.empty_like(ys, dtype=np.int32) |
| else: |
| all(_check_label_array(y, 'ys') for y in ys) or _raise(ValueError("'ys' must be a list of label images")) |
| all(y.shape==ys[0].shape for y in ys) or _raise(ValueError("all label images must have the same shape")) |
| ys_grouped = np.empty((len(ys),)+ys[0].shape, dtype=np.int32) |
|
|
| def _match_single(y_prev, y, next_id): |
| y = y.astype(np.int32, copy=False) |
| res = matching(y_prev, y, report_matches=True, thresh=thresh, criterion=criterion) |
| |
| relabel = dict(reversed(res.matched_pairs[i]) for i in res.matched_tps) |
| y_grouped = np.zeros_like(y) |
| for r in regionprops(y): |
| m = (y[r.slice] == r.label) |
| if r.label in relabel: |
| y_grouped[r.slice][m] = relabel[r.label] |
| else: |
| y_grouped[r.slice][m] = next_id |
| next_id += 1 |
| return y_grouped, next_id |
|
|
| ys_grouped[0] = ys[0] |
| next_id = ys_grouped[0].max() + 1 |
| for i in range(len(ys)-1): |
| ys_grouped[i+1], next_id = _match_single(ys_grouped[i], ys[i+1], next_id) |
| return ys_grouped |
|
|
|
|
|
|
| def _shuffle_labels(y): |
| _check_label_array(y, 'y') |
| y2 = np.zeros_like(y) |
| ids = tuple(set(np.unique(y)) - {0}) |
| relabel = dict(zip(ids,np.random.permutation(ids))) |
| for r in regionprops(y): |
| m = (y[r.slice] == r.label) |
| y2[r.slice][m] = relabel[r.label] |
| return y2 |
|
|