|
|
import pandas as pd |
|
|
import tangram as tg |
|
|
import numpy as np |
|
|
import torch |
|
|
import anndata |
|
|
from sklearn.decomposition import PCA |
|
|
from sklearn.neighbors import NearestNeighbors |
|
|
|
|
|
|
|
|
|
|
|
def generate_feature_ad(ad_expr, feature_path, sc=False): |
|
|
""" |
|
|
Generates an AnnData object with OmiCLIP text or image embeddings. |
|
|
|
|
|
:param ad_expr: AnnData object containing metadata for the dataset. |
|
|
:param feature_path: Path to the CSV file containing the features to be loaded. |
|
|
:param sc: Boolean flag indicating whether to copy single-cell metadata or ST metadata. Default is False (ST). |
|
|
:return: A new AnnData object with the loaded features and relevant metadata from ad_expr. |
|
|
""" |
|
|
|
|
|
|
|
|
features = pd.read_csv(feature_path, index_col=0)[ad_expr.obs.index] |
|
|
|
|
|
|
|
|
feature_ad = anndata.AnnData(features[ad_expr.obs.index].T) |
|
|
|
|
|
|
|
|
if sc: |
|
|
|
|
|
feature_ad.obs = ad_expr.obs.copy() |
|
|
else: |
|
|
|
|
|
feature_ad.obs['cell_num'] = ad_expr.obs['cell_num'].copy() |
|
|
feature_ad.uns['spatial'] = ad_expr.uns['spatial'].copy() |
|
|
feature_ad.obsm['spatial'] = ad_expr.obsm['spatial'].copy() |
|
|
|
|
|
return feature_ad |
|
|
|
|
|
|
|
|
|
|
|
def normalize_percentile(df, cols, min_percentile=5, max_percentile=95): |
|
|
""" |
|
|
Clips and normalizes the specified columns of a DataFrame based on percentile thresholds, |
|
|
transforming their values to the [0, 1] range. |
|
|
|
|
|
:param df: A pandas DataFrame containing the columns to normalize. |
|
|
:type df: pandas.DataFrame |
|
|
:param cols: A list of column names in `df` that should be normalized. |
|
|
:type cols: list[str] |
|
|
:param min_percentile: The lower percentile used for clipping (defaults to 5). |
|
|
:type min_percentile: float |
|
|
:param max_percentile: The upper percentile used for clipping (defaults to 95). |
|
|
:type max_percentile: float |
|
|
:return: The same DataFrame with specified columns clipped and normalized. |
|
|
:rtype: pandas.DataFrame |
|
|
""" |
|
|
|
|
|
|
|
|
for col in cols: |
|
|
|
|
|
min_val = np.percentile(df[col], min_percentile) |
|
|
max_val = np.percentile(df[col], max_percentile) |
|
|
|
|
|
|
|
|
df[col] = np.clip(df[col], min_val, max_val) |
|
|
|
|
|
|
|
|
df[col] = (df[col] - min_val) / (max_val - min_val) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
def cell_type_decompose(sc_ad, st_ad, cell_type_col='cell_type', NMS_mode=False, major_types=None, min_percentile=5, max_percentile=95): |
|
|
""" |
|
|
Performs cell type decomposition on spatial data (ST or image) with single-cell data . |
|
|
|
|
|
:param sc_ad: AnnData object containing single-cell meta data. |
|
|
:param st_ad: AnnData object containing spatial data (ST or image) meta data. |
|
|
:param cell_type_col: The column name in `sc_ad.obs` that contains cell type annotations. Default is 'cell_type'. |
|
|
:param NMS_mode: Boolean flag to apply Non-Maximum Suppression (NMS) mode. Default is False. |
|
|
:param major_types: Major cell types used for NMS mode. Default is None. |
|
|
:param min_percentile: The lower percentile used for clipping (defaults to 5). |
|
|
:param max_percentile: The upper percentile used for clipping (defaults to 95). |
|
|
:return: The spatial AnnData object with projected cell type annotations. |
|
|
""" |
|
|
|
|
|
|
|
|
tg.pp_adatas(sc_ad, st_ad, genes=None) |
|
|
|
|
|
|
|
|
|
|
|
ad_map = tg.map_cells_to_space( |
|
|
sc_ad, st_ad, |
|
|
mode="clusters", |
|
|
cluster_label=cell_type_col, |
|
|
device='cpu', |
|
|
scale=False, |
|
|
density_prior='uniform', |
|
|
random_state=10, |
|
|
verbose=False, |
|
|
) |
|
|
|
|
|
|
|
|
tg.project_cell_annotations(ad_map, st_ad, annotation=cell_type_col) |
|
|
|
|
|
|
|
|
if NMS_mode: |
|
|
major_types = major_types |
|
|
st_ad.obs = normalize_percentile(st_ad.obsm['tangram_ct_pred'], major_types, min_percentile, max_percentile) |
|
|
|
|
|
st_ad_binary = st_ad.obsm['tangram_ct_pred'][major_types].copy() |
|
|
|
|
|
st_ad.obs[major_types] = st_ad_binary.where(st_ad_binary.eq(st_ad_binary.max(axis=1), axis=0), other=0) |
|
|
|
|
|
return st_ad |
|
|
|
|
|
|
|
|
|
|
|
def assign_cells_to_spots(cell_locs, spot_locs, patch_size=16): |
|
|
""" |
|
|
Assigns cells to spots based on their spatial coordinates. Each cell within the specified patch size (radius) |
|
|
of a spot will be assigned to that spot. |
|
|
|
|
|
:param cell_locs: Numpy array of shape (n_cells, 2) with the x, y coordinates of the cells. |
|
|
:param spot_locs: Numpy array of shape (n_spots, 2) with the x, y coordinates of the spots. |
|
|
:param patch_size: The diameter of the spot patch. The radius used for assignment will be half of this value. |
|
|
:return: A sparse matrix where each row corresponds to a cell and each column corresponds to a spot. |
|
|
The value is 1 if the cell is assigned to that spot, 0 otherwise. |
|
|
""" |
|
|
|
|
|
neigh = NearestNeighbors(radius=patch_size * 0.5) |
|
|
|
|
|
|
|
|
neigh.fit(spot_locs) |
|
|
|
|
|
|
|
|
|
|
|
A = neigh.radius_neighbors_graph(cell_locs, mode='connectivity') |
|
|
|
|
|
return A |
|
|
|
|
|
|
|
|
|