Shengxiao0709's picture
Upload 78 files
8f72b1f verified
import logging
import sys
from pathlib import Path
import numpy as np
import pandas as pd
# from .data import CTCData
import tifffile
from tqdm import tqdm
from typing import Optional, Union, Tuple
logger = logging.getLogger(__name__)
def load_tiff_timeseries(
dir: Path,
dtype: Optional[Union[str, type]] = None,
downscale: Optional[Tuple[int, ...]] = None,
start_frame: int = 0,
end_frame: Optional[int] = None,
) -> np.ndarray:
"""Loads a folder of `.tif` or `.tiff` files into a numpy array.
Each file is interpreted as a frame of a time series.
Args:
folder:
dtype:
downscale: One int for each dimension of the data. Avoids memory overhead.
start_frame: The first frame to load.
end_frame: The last frame to load.
Returns:
np.ndarray: The loaded data.
"""
# TODO make safe for label arrays
logger.debug(f"Loading tiffs from {dir} as {dtype}")
files = sorted(list(dir.glob("*.tif")) + list(dir.glob("*.tiff")))[
start_frame:end_frame
]
shape = tifffile.imread(files[0]).shape
if downscale:
assert len(downscale) == len(shape)
else:
downscale = (1,) * len(shape)
files = files[:: downscale[0]]
x = []
for f in tqdm(
files,
leave=False,
desc=f"Loading [{start_frame}:{end_frame}:{downscale[0]}]",
):
_x = tifffile.imread(f)
if dtype:
_x = _x.astype(dtype)
assert _x.shape == shape
slices = tuple(slice(None, None, d) for d in downscale[1:])
_x = _x[slices]
x.append(_x)
x = np.stack(x)
logger.debug(f"Loaded array of shape {x.shape} from {dir}")
return x
def load_tracklet_links(folder: Path) -> pd.DataFrame:
candidates = [
folder / "man_track.txt",
folder / "res_track.txt",
]
for c in candidates:
if c.exists():
path = c
break
else:
raise FileNotFoundError(f"Could not find tracklet links in {folder}")
df = pd.read_csv(
path,
delimiter=" ",
names=["label", "t1", "t2", "parent"],
dtype=int,
)
# Remove invalid tracks with t2 > t1
df = df[df.t1 <= df.t2]
n_dets = (df.t2 - df.t1 + 1).sum()
logger.debug(f"{folder} has {n_dets} detections")
n_divs = (df[df.parent != 0]["parent"].value_counts() == 2).sum()
logger.debug(f"{folder} has {n_divs} divisions")
return df
def filter_track_df(
df: pd.DataFrame,
start_frame: int = 0,
end_frame: int = sys.maxsize,
downscale: int = 1,
) -> pd.DataFrame:
"""Only keep tracklets that are present in the given time interval."""
df.columns = ["label", "t1", "t2", "parent"]
# only retain cells in interval
df = df[(df.t2 >= start_frame) & (df.t1 < end_frame)]
# shift start and end of each cell
df.t1 = df.t1 - start_frame
df.t2 = df.t2 - start_frame
# set start/end to min/max
df.t1 = df.t1.clip(0, end_frame - start_frame - 1)
df.t2 = df.t2.clip(0, end_frame - start_frame - 1)
# set all parents to 0 that are not in the interval
df.loc[~df.parent.isin(df.label), "parent"] = 0
if downscale > 1:
if start_frame % downscale != 0:
raise ValueError("start_frame must be a multiple of downscale")
logger.debug(f"Temporal downscaling of tracklet links by {downscale}")
# remove tracklets that have been fully deleted by temporal downsampling
mask = (
# (df["t2"] - df["t1"] < downscale - 1)
(df["t1"] % downscale != 0)
& (df["t2"] % downscale != 0)
& (df["t1"] // downscale == df["t2"] // downscale)
)
logger.debug(
f"Remove {mask.sum()} tracklets that are fully deleted by downsampling"
)
logger.debug(f"Remove {df[mask]}")
df = df[~mask]
# set parent to 0 if it has been deleted
df.loc[~df.parent.isin(df.label), "parent"] = 0
df["t2"] = (df["t2"] / float(downscale)).apply(np.floor).astype(int)
df["t1"] = (df["t1"] / float(downscale)).apply(np.ceil).astype(int)
# Correct for edge case of single frame tracklet
assert np.all(df["t1"] == np.minimum(df["t1"], df["t2"]))
return df
# TODO fix
# def dataset_to_ctc(dataset: CTCData, path, start: int = 0, stop: int | None = None):
# """save dataset to ctc format for debugging purposes"""
# out = Path(path)
# print(f"Saving dataset to {out}")
# out_img = out / "img"
# out_img.mkdir(exist_ok=True, parents=True)
# out_mask = out / "TRA"
# out_mask.mkdir(exist_ok=True, parents=True)
# if stop is None:
# stop = len(self)
# lines = []
# masks, imgs = [], []
# t_offset = 0
# max_mask = 0
# n_lines = 0
# all_coords = []
# for i in tqdm(range(start, stop)):
# d = dataset.__getitem__(i, return_dense=True)
# mask = d["mask"].numpy()
# mask[mask > 0] += max_mask
# max_mask = max(max_mask, mask.max())
# masks.extend(mask)
# imgs.extend(d["img"].numpy())
# # add vertices
# coords = d["coords0"].numpy()
# ts, coords = coords[:, 0].astype(int), coords[:, 1:]
# A = d["assoc_matrix"].numpy()
# t_unique = sorted(np.unique(ts))
# for t1, t2 in zip(t_unique[:-1], t_unique[1:]):
# A_sub = A[ts == t1][:, ts == t2]
# for i, a in enumerate(A_sub):
# v1 = coords[ts == t1][i]
# for j in np.where(a > 0)[0]:
# v2 = coords[ts == t2][j]
# # lines.append(
# # {
# # "index": n_lines,
# # "shape-type": "line",
# # "vertex-index": 0,
# # "axis-0": t2 + t_offset,
# # "axis-1": v1[0],
# # "axis-2": v1[1],
# # }
# # )
# # lines.append(
# # {
# # "index": n_lines,
# # "shape-type": "line",
# # "vertex-index": 1,
# # "axis-0": t2 + t_offset,
# # "axis-1": v2[0],
# # "axis-2": v2[1],
# # }
# # )
# lines.append([n_lines, "line", 0, t2 + t_offset] + v1.tolist())
# lines.append([n_lines, "line", 1, t2 + t_offset] + v2.tolist())
# n_lines += 1
# c = d["coords0"].numpy()
# c[:, 0] += t_offset
# all_coords.extend(c)
# t_offset += len(mask)
# ax_cols = [f"axis-{i}" for i in range(dataset.ndim + 1)]
# df = pd.DataFrame(lines, columns=["index", "shape-type", "vertex-index"] + ax_cols)
# df.to_csv(out / "lines.csv", index=False)
# df_c = pd.DataFrame(all_coords, columns=ax_cols)
# df_c.to_csv(out / "coords.csv", index=False)
# for i, m in enumerate(imgs):
# # tifffile.imwrite(out_img/f'img_{i:04d}.tif', m)
# if dataset.ndim == 2:
# imageio.imwrite(
# out_img / f"img_{i:04d}.jpg",
# np.clip(20 + 100 * m, 0, 255).astype(np.uint8),
# )
# for i, m in enumerate(masks):
# tifffile.imwrite(out_mask / f"mask_{i:04d}.tif", m, compression="zstd")
# return d