Spaces:
Sleeping
Sleeping
| import logging | |
| import sys | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| # from .data import CTCData | |
| import tifffile | |
| from tqdm import tqdm | |
| from typing import Optional, Union, Tuple | |
| logger = logging.getLogger(__name__) | |
| def load_tiff_timeseries( | |
| dir: Path, | |
| dtype: Optional[Union[str, type]] = None, | |
| downscale: Optional[Tuple[int, ...]] = None, | |
| start_frame: int = 0, | |
| end_frame: Optional[int] = None, | |
| ) -> np.ndarray: | |
| """Loads a folder of `.tif` or `.tiff` files into a numpy array. | |
| Each file is interpreted as a frame of a time series. | |
| Args: | |
| folder: | |
| dtype: | |
| downscale: One int for each dimension of the data. Avoids memory overhead. | |
| start_frame: The first frame to load. | |
| end_frame: The last frame to load. | |
| Returns: | |
| np.ndarray: The loaded data. | |
| """ | |
| # TODO make safe for label arrays | |
| logger.debug(f"Loading tiffs from {dir} as {dtype}") | |
| files = sorted(list(dir.glob("*.tif")) + list(dir.glob("*.tiff")))[ | |
| start_frame:end_frame | |
| ] | |
| shape = tifffile.imread(files[0]).shape | |
| if downscale: | |
| assert len(downscale) == len(shape) | |
| else: | |
| downscale = (1,) * len(shape) | |
| files = files[:: downscale[0]] | |
| x = [] | |
| for f in tqdm( | |
| files, | |
| leave=False, | |
| desc=f"Loading [{start_frame}:{end_frame}:{downscale[0]}]", | |
| ): | |
| _x = tifffile.imread(f) | |
| if dtype: | |
| _x = _x.astype(dtype) | |
| assert _x.shape == shape | |
| slices = tuple(slice(None, None, d) for d in downscale[1:]) | |
| _x = _x[slices] | |
| x.append(_x) | |
| x = np.stack(x) | |
| logger.debug(f"Loaded array of shape {x.shape} from {dir}") | |
| return x | |
| def load_tracklet_links(folder: Path) -> pd.DataFrame: | |
| candidates = [ | |
| folder / "man_track.txt", | |
| folder / "res_track.txt", | |
| ] | |
| for c in candidates: | |
| if c.exists(): | |
| path = c | |
| break | |
| else: | |
| raise FileNotFoundError(f"Could not find tracklet links in {folder}") | |
| df = pd.read_csv( | |
| path, | |
| delimiter=" ", | |
| names=["label", "t1", "t2", "parent"], | |
| dtype=int, | |
| ) | |
| # Remove invalid tracks with t2 > t1 | |
| df = df[df.t1 <= df.t2] | |
| n_dets = (df.t2 - df.t1 + 1).sum() | |
| logger.debug(f"{folder} has {n_dets} detections") | |
| n_divs = (df[df.parent != 0]["parent"].value_counts() == 2).sum() | |
| logger.debug(f"{folder} has {n_divs} divisions") | |
| return df | |
| def filter_track_df( | |
| df: pd.DataFrame, | |
| start_frame: int = 0, | |
| end_frame: int = sys.maxsize, | |
| downscale: int = 1, | |
| ) -> pd.DataFrame: | |
| """Only keep tracklets that are present in the given time interval.""" | |
| df.columns = ["label", "t1", "t2", "parent"] | |
| # only retain cells in interval | |
| df = df[(df.t2 >= start_frame) & (df.t1 < end_frame)] | |
| # shift start and end of each cell | |
| df.t1 = df.t1 - start_frame | |
| df.t2 = df.t2 - start_frame | |
| # set start/end to min/max | |
| df.t1 = df.t1.clip(0, end_frame - start_frame - 1) | |
| df.t2 = df.t2.clip(0, end_frame - start_frame - 1) | |
| # set all parents to 0 that are not in the interval | |
| df.loc[~df.parent.isin(df.label), "parent"] = 0 | |
| if downscale > 1: | |
| if start_frame % downscale != 0: | |
| raise ValueError("start_frame must be a multiple of downscale") | |
| logger.debug(f"Temporal downscaling of tracklet links by {downscale}") | |
| # remove tracklets that have been fully deleted by temporal downsampling | |
| mask = ( | |
| # (df["t2"] - df["t1"] < downscale - 1) | |
| (df["t1"] % downscale != 0) | |
| & (df["t2"] % downscale != 0) | |
| & (df["t1"] // downscale == df["t2"] // downscale) | |
| ) | |
| logger.debug( | |
| f"Remove {mask.sum()} tracklets that are fully deleted by downsampling" | |
| ) | |
| logger.debug(f"Remove {df[mask]}") | |
| df = df[~mask] | |
| # set parent to 0 if it has been deleted | |
| df.loc[~df.parent.isin(df.label), "parent"] = 0 | |
| df["t2"] = (df["t2"] / float(downscale)).apply(np.floor).astype(int) | |
| df["t1"] = (df["t1"] / float(downscale)).apply(np.ceil).astype(int) | |
| # Correct for edge case of single frame tracklet | |
| assert np.all(df["t1"] == np.minimum(df["t1"], df["t2"])) | |
| return df | |
| # TODO fix | |
| # def dataset_to_ctc(dataset: CTCData, path, start: int = 0, stop: int | None = None): | |
| # """save dataset to ctc format for debugging purposes""" | |
| # out = Path(path) | |
| # print(f"Saving dataset to {out}") | |
| # out_img = out / "img" | |
| # out_img.mkdir(exist_ok=True, parents=True) | |
| # out_mask = out / "TRA" | |
| # out_mask.mkdir(exist_ok=True, parents=True) | |
| # if stop is None: | |
| # stop = len(self) | |
| # lines = [] | |
| # masks, imgs = [], [] | |
| # t_offset = 0 | |
| # max_mask = 0 | |
| # n_lines = 0 | |
| # all_coords = [] | |
| # for i in tqdm(range(start, stop)): | |
| # d = dataset.__getitem__(i, return_dense=True) | |
| # mask = d["mask"].numpy() | |
| # mask[mask > 0] += max_mask | |
| # max_mask = max(max_mask, mask.max()) | |
| # masks.extend(mask) | |
| # imgs.extend(d["img"].numpy()) | |
| # # add vertices | |
| # coords = d["coords0"].numpy() | |
| # ts, coords = coords[:, 0].astype(int), coords[:, 1:] | |
| # A = d["assoc_matrix"].numpy() | |
| # t_unique = sorted(np.unique(ts)) | |
| # for t1, t2 in zip(t_unique[:-1], t_unique[1:]): | |
| # A_sub = A[ts == t1][:, ts == t2] | |
| # for i, a in enumerate(A_sub): | |
| # v1 = coords[ts == t1][i] | |
| # for j in np.where(a > 0)[0]: | |
| # v2 = coords[ts == t2][j] | |
| # # lines.append( | |
| # # { | |
| # # "index": n_lines, | |
| # # "shape-type": "line", | |
| # # "vertex-index": 0, | |
| # # "axis-0": t2 + t_offset, | |
| # # "axis-1": v1[0], | |
| # # "axis-2": v1[1], | |
| # # } | |
| # # ) | |
| # # lines.append( | |
| # # { | |
| # # "index": n_lines, | |
| # # "shape-type": "line", | |
| # # "vertex-index": 1, | |
| # # "axis-0": t2 + t_offset, | |
| # # "axis-1": v2[0], | |
| # # "axis-2": v2[1], | |
| # # } | |
| # # ) | |
| # lines.append([n_lines, "line", 0, t2 + t_offset] + v1.tolist()) | |
| # lines.append([n_lines, "line", 1, t2 + t_offset] + v2.tolist()) | |
| # n_lines += 1 | |
| # c = d["coords0"].numpy() | |
| # c[:, 0] += t_offset | |
| # all_coords.extend(c) | |
| # t_offset += len(mask) | |
| # ax_cols = [f"axis-{i}" for i in range(dataset.ndim + 1)] | |
| # df = pd.DataFrame(lines, columns=["index", "shape-type", "vertex-index"] + ax_cols) | |
| # df.to_csv(out / "lines.csv", index=False) | |
| # df_c = pd.DataFrame(all_coords, columns=ax_cols) | |
| # df_c.to_csv(out / "coords.csv", index=False) | |
| # for i, m in enumerate(imgs): | |
| # # tifffile.imwrite(out_img/f'img_{i:04d}.tif', m) | |
| # if dataset.ndim == 2: | |
| # imageio.imwrite( | |
| # out_img / f"img_{i:04d}.jpg", | |
| # np.clip(20 + 100 * m, 0, 255).astype(np.uint8), | |
| # ) | |
| # for i, m in enumerate(masks): | |
| # tifffile.imwrite(out_mask / f"mask_{i:04d}.tif", m, compression="zstd") | |
| # return d | |