File size: 14,075 Bytes
1e315b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
import scanpy as sc
import numpy as np
import pandas as pd
import json
import os
from PIL import Image



def generate_gene_df(ad, house_keeping_genes, todense=True):
    """
    Generates a DataFrame with the top 50 genes for each observation in an AnnData object.
    It removes genes containing '.' or '-' in their names, as well as genes listed in
    the provided `house_keeping_genes` DataFrame/Series under the 'genesymbol' column.

    :param ad: An AnnData object containing gene expression data.
    :type ad: anndata.AnnData
    :param house_keeping_genes: DataFrame or Series with a 'genesymbol' column listing housekeeping genes to exclude.
    :type house_keeping_genes: pandas.DataFrame or pandas.Series
    :param todense: Whether to convert the sparse matrix (ad.X) to a dense matrix before creating a DataFrame.
    :type todense: bool
    :return: A DataFrame (`top_k_genes_str`) that contains a 'label' column. Each row in 'label' is a string
             with the top 50 gene names (space-separated) for that observation.
    :rtype: pandas.DataFrame
    """

    # Remove genes containing '.' in their names
    ad = ad[:, ~ad.var.index.str.contains('.', regex=False)]
    # Remove genes containing '-'
    ad = ad[:, ~ad.var.index.str.contains('-', regex=False)]
    # Exclude housekeeping genes
    ad = ad[:, ~ad.var.index.isin(house_keeping_genes['genesymbol'])]

    # Convert to dense if requested; otherwise use the data as-is
    if todense:
        expr = pd.DataFrame(ad.X.todense(), index=ad.obs.index, columns=ad.var.index)
    else:
        expr = pd.DataFrame(ad.X, index=ad.obs.index, columns=ad.var.index)

    # For each row (observation), find the top 50 genes with the highest expression
    top_k_genes = expr.apply(lambda s, n: pd.Series(s.nlargest(n).index), axis=1, n=50)

    # Create a new DataFrame to store the labels (space-separated top gene names)
    top_k_genes_str = pd.DataFrame()
    top_k_genes_str['label'] = top_k_genes[top_k_genes.columns].astype(str) \
        .apply(lambda x: ' '.join(x), axis=1)

    return top_k_genes_str



def segment_patches(img_array, coord, patch_dir, height=20, width=20):
    """
    Extracts small image patches centered at specified coordinates and saves them as individual PNG files.

    :param img_array: A NumPy array representing the full-resolution image. Shape is expected to be (H, W[, C]).
    :type img_array: numpy.ndarray
    :param coord: A pandas DataFrame containing patch center coordinates in columns "pixel_x" and "pixel_y".
                  The index corresponds to spot IDs. Example columns: ["pixel_x", "pixel_y"].
    :type coord: pandas.DataFrame
    :param patch_dir: Directory path where the patch images will be saved.
    :type patch_dir: str
    :param height: The patch's height in pixels (distance in the y-direction).
    :type height: int
    :param width: The patch's width in pixels (distance in the x-direction).
    :type width: int
    :return: None. The function saves image patches to `patch_dir` but does not return anything.
    """

    # Ensure the output directory exists; create it if it doesn't
    if not os.path.exists(patch_dir):
        os.makedirs(patch_dir)

    # Extract the overall height and width of the image
    yrange, xrange = img_array.shape[:2]

    # Iterate through each coordinate in the DataFrame
    for spot_idx in coord.index:
        # Retrieve the center x and y coordinates for the current spot
        ycenter, xcenter = coord.loc[spot_idx, ["pixel_x", "pixel_y"]]

        # Compute the top-left (x1, y1) and bottom-right (x2, y2) boundaries of the patch
        x1 = round(xcenter - width / 2)
        y1 = round(ycenter - height / 2)
        x2 = x1 + width
        y2 = y1 + height

        # Check if the patch boundaries go outside the image
        if x1 < 0 or y1 < 0 or x2 > xrange or y2 > yrange:
            print(f"Patch {spot_idx} is out of range and will be skipped.")
            continue

        # Extract the patch and convert to a PIL Image; cast to uint8 if needed
        patch_img = Image.fromarray(img_array[y1:y2, x1:x2].astype(np.uint8))

        # Create a filename for the patch image (e.g., "0_hires.png")
        patch_name = f"{spot_idx}_hires.png"
        patch_path = os.path.join(patch_dir, patch_name)

        # Save the patch image to disk
        patch_img.save(patch_path)



def read_gct(file_path):
    """
    Reads a GCT file, parses its dimensions, and returns the data as a pandas DataFrame.

    :param file_path: The path to the GCT file to be read.
    :return: A pandas DataFrame containing the GCT data, where the first two columns represent gene names and descriptions,
                  and the subsequent columns contain the expression data.
    """
    
    # Open the GCT file for reading
    with open(file_path, 'r') as file:
        # Read and ignore the first line (GCT version line)
        file.readline()
        
        # Read the second line which contains the dimensions of the data matrix
        dims = file.readline().strip().split()  # Split the dimensions line by whitespace
        num_rows = int(dims[0])  # Number of data rows (genes)
        num_cols = int(dims[1])  # Number of data columns (samples + metadata)
        
        # Read the data starting from the third line, using pandas for tab-delimited data
        # The first two columns in GCT files are "Name" and "Description" (gene identifiers and annotations)
        data = pd.read_csv(file, sep='\t', header=0, nrows=num_rows)
        
    # Return the loaded data as a pandas DataFrame
    return data



def get_library_id(adata):
    """
    Retrieves the library ID from the AnnData object, assuming it contains spatial data.
    The function will return the first library ID found in `adata.uns['spatial']`.

    :param adata: AnnData object containing spatial information in `adata.uns['spatial']`.
    :return: The first library ID found in `adata.uns['spatial']`.
    :raises: 
            AssertionError: If 'spatial' is not present in `adata.uns`.
            Logs an error if no library ID is found.
    """
    
    # Check if 'spatial' is present in adata.uns; raises an error if not found
    assert 'spatial' in adata.uns, "spatial not present in adata.uns"
    
    # Retrieve the list of library IDs (which are keys in the 'spatial' dictionary)
    library_ids = adata.uns['spatial'].keys()
    
    try:
        # Attempt to return the first library ID (converting the keys object to a list)
        library_id = list(library_ids)[0]
        return library_id
    except IndexError:
        # If no library IDs exist, log an error message
        logger.error('No library_id found in adata')



def get_scalefactors(adata, library_id=None):
    """
    Retrieves the scalefactors from the AnnData object for a given library ID. If no library ID is provided, 
    the function will automatically retrieve the first available library ID.

    :param adata: AnnData object containing spatial data and scalefactors in `adata.uns['spatial']`.
    :param library_id: The library ID for which the scalefactors are to be retrieved. If not provided, it defaults to the first available ID.
    :return: A dictionary containing scalefactors for the specified library ID.
    """
    
    # If no library_id is provided, retrieve the first available library ID
    if library_id is None:
        library_id = get_library_id(adata)
    
    try:
        # Attempt to retrieve the scalefactors for the specified library ID
        scalef = adata.uns['spatial'][library_id]['scalefactors']
        return scalef
    except KeyError:
        # Log an error if the scalefactors or library ID is not found
        logger.error('scalefactors not found in adata')



def get_spot_diameter_in_pixels(adata, library_id=None):
    """
    Retrieves the spot diameter in pixels from the AnnData object's scalefactors for a given library ID.
    If no library ID is provided, the function will automatically retrieve the first available library ID.

    :param adata: AnnData object containing spatial data and scalefactors in `adata.uns['spatial']`.
    :param library_id: The library ID for which the spot diameter is to be retrieved. If not provided, defaults to the first available ID.
    
    :return: The spot diameter in full resolution pixels, or None if not found.
    """
    
    # Get the scalefactors for the specified or default library ID
    scalef = get_scalefactors(adata, library_id=library_id)
    
    try:
        # Attempt to retrieve the spot diameter in full resolution from the scalefactors
        spot_diameter = scalef['spot_diameter_fullres']
        return spot_diameter    
    except TypeError:
        # Handle case where `scalef` is None or invalid (if get_scalefactors returned None)
        pass
    except KeyError:
        # Log an error if the 'spot_diameter_fullres' key is not found in the scalefactors
        logger.error('spot_diameter_fullres not found in adata')



def prepare_data_for_alignment(data_path, scale_type='tissue_hires_scalef'):
    """
    Prepares data for alignment by reading an AnnData object and preparing the high-resolution tissue image.

    :param data_path: The path to the AnnData (.h5ad) file containing the Visium data.
    :param scale_type: The type of scale factor to use (`tissue_hires_scalef` by default).
    
    :return:
        - ad: AnnData object containing the spatial transcriptomics data.
        - ad_coor: Numpy array of scaled spatial coordinates (adjusted for the specified resolution).
        - img: High-resolution tissue image, normalized to 8-bit unsigned integers.
    
    :raises: 
            ValueError: If required data (e.g., scale factors, spatial coordinates, or images) is missing.
    """
    
    # Load the AnnData object from the specified file path
    ad = sc.read_h5ad(data_path)
    
    # Ensure the variable (gene) names are unique to avoid potential conflicts
    ad.var_names_make_unique()
    
    try:
        # Retrieve the specified scale factor for spatial coordinates
        scalef = get_scalefactors(ad)[scale_type]
    except KeyError:
        raise ValueError(f"Scale factor '{scale_type}' not found in ad.uns['spatial']")
    
    # Scale the spatial coordinates using the specified scale factor
    try:
        ad_coor = np.array(ad.obsm['spatial']) * scalef
    except KeyError:
        raise ValueError("Spatial coordinates not found in ad.obsm['spatial']")
    
    # Retrieve the high-resolution tissue image
    try:
        img = ad.uns['spatial'][get_library_id(ad)]['images']['hires']
    except KeyError:
        raise ValueError("High-resolution image not found in ad.uns['spatial']")
    
    # If the image values are normalized to [0, 1], convert to 8-bit format for compatibility
    if img.max() < 1.1:
        img = (img * 255).astype('uint8')
    
    return ad, ad_coor, img



def load_data_for_annotation(st_data_path, json_path, in_tissue=True):
    """
    Loads spatial transcriptomics (ST) data from an .h5ad file and prepares it for annotation.

    :param sample_type: The type or category of the sample (used to locate the data in the directory structure).
    :param sample_name: The name of the sample (used to locate specific files).
    :param in_tissue: Boolean flag to filter the data to include only spots that are in tissue. Default is True.
    
    :return:
        - st_ad: AnnData object containing the spatial transcriptomics data, with spatial coordinates in `obs`.
        - library_id: The library ID associated with the spatial data.
        - roi_polygon: Region of interest polygon loaded from a JSON file for further annotation or analysis.
    """

    # Load the spatial transcriptomics data into an AnnData object
    st_ad = sc.read_h5ad(st_data_path)
    
    # Optionally filter the data to include only spots that are within the tissue
    if in_tissue:
        st_ad = st_ad[st_ad.obs['in_tissue'] == 1]
    
    # Initialize pixel coordinates for spatial information
    st_ad.obs[["pixel_y", "pixel_x"]] = None  # Ensure the columns exist
    st_ad.obs[["pixel_y", "pixel_x"]] = st_ad.obsm['spatial']  # Copy spatial coordinates into obs
    
    # Retrieve the library ID associated with the spatial data
    library_id = get_library_id(st_ad)
    
    # Load the region of interest (ROI) polygon from a JSON file
    with open(json_path) as f:
        roi_polygon = json.load(f)

    return st_ad, library_id, roi_polygon



def read_polygons(file_path, slide_id):
    """
    Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness.

    :param file_path: Path to the JSON file containing polygon configurations.
    :param slide_id: Identifier for the specific slide whose polygon data is to be extracted.
    :return: 
        - polygons: A list of numpy arrays, where each array contains the coordinates of a polygon.
        - polygon_colors: A list of color values corresponding to each polygon.
        - polygon_thickness: A list of thickness values for each polygon's border.
    """

    # Open the JSON file and load the polygon configurations into a Python dictionary
    with open(file_path, 'r') as f:
        polygons_configs = json.load(f)

    # Check if the given slide_id exists in the polygon configurations
    if slide_id not in polygons_configs:
        return None, None, None  # If slide_id is not found, return None for all outputs

    # Extract the polygon coordinates, colors, and thicknesses for the given slide_id
    polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]]  # Convert polygon coordinates to numpy arrays
    polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]]  # Extract the color for each polygon
    polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]]  # Extract the thickness for each polygon

    # Return the polygons, their colors, and their thicknesses
    return polygons, polygon_colors, polygon_thickness