import gradio as gr import os import scipy from scipy.sparse import tril, triu import numpy as np import pandas as pd import matplotlib import matplotlib.pyplot as plt from pathlib import Path from tensorflow.keras.models import model_from_json from huggingface_hub import hf_hub_download #input_file = hf_hub_download(repo_id="dylanplummer/hicorr", filename="arima_beta.chr22", repo_type="dataset", token=os.environ['DATASET_SECRET']) input_file = hf_hub_download(repo_id="dylanplummer/hicorr", filename="ORC2.chr22", repo_type="dataset", token=os.environ['DATASET_SECRET']) data_dir = 'data/' sparse_data_dir = 'data/sparse_data/' def get_chromosome_from_filename(filename): """ Extract the chromosome string from any of the file name formats we use Args: filename (:obj:`str`) : name of anchor to anchor file Returns: Chromosome string of form chr<> """ chr_index = filename.find('chr') # index of chromosome name if chr_index == 0: # if chromosome name is file prefix return filename[:filename.find('.')] file_ending_index = filename.rfind('.') # index of file ending if chr_index > file_ending_index: # if chromosome name is file ending return filename[chr_index:] else: return filename[chr_index: file_ending_index] def draw_heatmap(matrix, color_scale, ax=None, return_image=False): """ Display ratio heatmap containing only strong signals (values > 1 or 0.98th quantile) Args: matrix (:obj:`numpy.array`) : ratio matrix to be displayed color_scale (:obj:`int`) : max ratio value to be considered strongest by color mapping ax (:obj:`matplotlib.axes.Axes`) : axes which will contain the heatmap. If None, new axes are created return_image (:obj:`bool`) : set to True to return the image obtained from drawing the heatmap with the generated color map Returns: ``numpy.array`` : if ``return_image`` is set to True, return the heatmap as an array """ if color_scale != 0: breaks = np.append(np.arange(1.001, color_scale, (color_scale - 1.001) / 18), np.max(matrix)) elif np.max(matrix) < 2: breaks = np.arange(1.001, np.max(matrix), (np.max(matrix) - 1.001) / 19) else: step = (np.quantile(matrix, q=0.95) - 1) / 18 up = np.quantile(matrix, q=0.95) + 0.011 if up < 2: up = 2 step = 0.999 / 18 breaks = np.append(np.arange(1.001, up, step), np.max(matrix)) n_bin = 20 # Discretizes the interpolation into bins colors = ["#FFFFFF", "#FFE4E4", "#FFD7D7", "#FFC9C9", "#FFBCBC", "#FFAEAE", "#FFA1A1", "#FF9494", "#FF8686", "#FF7979", "#FF6B6B", "#FF5E5E", "#FF5151", "#FF4343", "#FF3636", "#FF2828", "#FF1B1B", "#FF0D0D", "#FF0000"] cmap_name = 'my_list' # Create the colormap cm = matplotlib.colors.LinearSegmentedColormap.from_list( cmap_name, colors, N=n_bin) norm = matplotlib.colors.BoundaryNorm(breaks, 20) # Fewer bins will result in "coarser" colomap interpolation if ax is None: _, ax = plt.subplots() img = ax.imshow(matrix, cmap=cm, norm=norm, interpolation='nearest') if return_image: plt.close() return img.get_array() def anchor_list_to_dict(anchors): """ Converts the array of anchor names to a dictionary mapping each anchor to its chromosomal index Args: anchors (:obj:`numpy.array`) : array of anchor name values Returns: `dict` : dictionary mapping each anchor to its index from the array """ anchor_dict = {} for i, anchor in enumerate(anchors): anchor_dict[anchor] = i return anchor_dict def anchor_to_locus(anchor_dict): """ Function to convert an anchor name to its genomic locus which can be easily vectorized Args: anchor_dict (:obj:`dict`) : dictionary mapping each anchor to its chromosomal index Returns: `function` : function which returns the locus of an anchor name """ def f(anchor): return anchor_dict[anchor] return f def load_chr_ratio_matrix_from_sparse(dir_name, file_name, anchor_dir, sparse_dir=None, anchor_list=None, chr_name=None, dummy=5, ignore_sparse=False, force_symmetry=True, use_raw=False): """ Loads data as a sparse matrix by either reading a precompiled sparse matrix or an anchor to anchor file which is converted to sparse CSR format. Ratio values are computed using the observed (obs) and expected (exp) values: .. math:: ratio = \\frac{obs + dummy}{exp + dummy} Args: dir_name (:obj:`str`) : directory containing the anchor to anchor or precompiled (.npz) sparse matrix file file_name (:obj:`str`) : name of anchor to anchor or precompiled (.npz) sparse matrix file anchor_dir (:obj:`str`) : directory containing the reference anchor ``.bed`` files dummy (:obj:`int`) : dummy value to used when computing ratio values ignore_sparse (:obj:`bool`) : set to True to ignore precompiled sparse matrices even if they exist Returns: ``scipy.sparse.csr_matrix``: sparse matrix of ratio values """ global data_dir global sparse_data_dir if chr_name is None: chr_name = get_chromosome_from_filename(file_name) sparse_rep_dir = dir_name[dir_name[: -1].rfind('/') + 1:] # directory where the pre-compiled sparse matrices are saved if sparse_dir is not None: sparse_data_dir = sparse_dir os.makedirs(os.path.join(sparse_data_dir, sparse_rep_dir), exist_ok=True) if file_name.endswith('.npz'): # loading pre-combined and pre-compiled sparse data sparse_matrix = scipy.sparse.load_npz(dir_name + file_name) else: # load from file name if file_name + '.npz' in os.listdir(os.path.join(sparse_data_dir, sparse_rep_dir)) and not ignore_sparse: # check if pre-compiled data already exists sparse_matrix = scipy.sparse.load_npz(os.path.join(sparse_data_dir, sparse_rep_dir, file_name + '.npz')) else: # otherwise generate sparse matrix from anchor2anchor file and save pre-compiled data if anchor_list is None: if anchor_dir is None: assert 'You must supply either an anchor reference list or the directory containing one' anchor_list = pd.read_csv(os.path.join(anchor_dir, '%s.bed' % chr_name), sep='\t', names=['chr', 'start', 'end', 'anchor']) # read anchor list file matrix_size = len(anchor_list) # matrix size is needed to construct sparse CSR matrix anchor_dict = anchor_list_to_dict(anchor_list['anchor'].values) # convert to anchor --> index dictionary try: # first try reading anchor to anchor file as chr_anchor_file = pd.read_csv( os.path.join(dir_name, file_name), delimiter='\t', names=['anchor1', 'anchor2', 'obs', 'exp'], usecols=['anchor1', 'anchor2', 'obs', 'exp']) # read chromosome anchor to anchor file rows = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor1'].values) # convert anchor names to row indices cols = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor2'].values) # convert anchor names to column indices ratio = (chr_anchor_file['obs'] + dummy) / (chr_anchor_file['exp'] + dummy) # compute matrix ratio value sparse_matrix = scipy.sparse.csr_matrix((ratio, (rows, cols)), shape=(matrix_size, matrix_size)) # construct sparse CSR matrix except: # otherwise read anchor to anchor file as chr_anchor_file = pd.read_csv( os.path.join(dir_name, file_name), delimiter='\t', names=['anchor1', 'anchor2', 'ratio'], usecols=['anchor1', 'anchor2', 'ratio']) rows = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor1'].values) # convert anchor names to row indices cols = np.vectorize(anchor_to_locus(anchor_dict))(chr_anchor_file['anchor2'].values) # convert anchor names to column indices if use_raw: sparse_matrix = scipy.sparse.csr_matrix((chr_anchor_file['obs'], (rows, cols)), shape=( matrix_size, matrix_size)) # construct sparse CSR matrix else: sparse_matrix = scipy.sparse.csr_matrix((chr_anchor_file['ratio'], (rows, cols)), shape=(matrix_size, matrix_size)) # construct sparse CSR matrix if force_symmetry: upper_sum = triu(sparse_matrix, k=1).sum() lower_sum = tril(sparse_matrix, k=-1).sum() if upper_sum == 0 or lower_sum == 0: sparse_matrix = sparse_matrix + sparse_matrix.transpose() sparse_triu = scipy.sparse.triu(sparse_matrix) sparse_matrix = sparse_triu + sparse_triu.transpose() if not ignore_sparse: scipy.sparse.save_npz(os.path.join(sparse_data_dir, sparse_rep_dir, file_name), sparse_matrix) # save precompiled data return sparse_matrix model_depths = ['1.5M', '2M', '2.4M', '4.88M', '5M', '6.29M', '8.5M', '12.5M', '16.5M', '25M', '32M', '50M', '100M', '150M'] # Load the model model_weights = 'DeepLoop_models/CPGZ_trained/12.5M.h5' # Replace with your model weights file model_architecture = 'DeepLoop_models/CPGZ_trained/12.5M.json' # Replace with your model architecture file with open(model_architecture, 'r') as f: model = model_from_json(f.read()) model.load_weights(model_weights) # Define the anchor file path anchor_file = 'ref/hg19_DPNII_anchor_bed/chr22.bed' #anchor_file = 'ref/hg19_Arima_anchor_bed/chr22.bed' # Define the tile size tile_size = 128 # Load the input matrix # input_file = '../anchor_2_anchor.loop.chr22' input_matrix = load_chr_ratio_matrix_from_sparse(os.path.dirname(input_file), os.path.basename(input_file), os.path.dirname(anchor_file), force_symmetry=True) # input_file = None # input_matrix = None # Load the anchor list anchor_list = pd.read_csv(anchor_file, sep='\t', names=['chr', 'start', 'end', 'anchor']) def predict(depth_idx): """Loads the input file, predicts the output, and visualizes the tile.""" selected_depth = model_depths[depth_idx] model_weights = f'DeepLoop_models/CPGZ_trained/{selected_depth}.h5' # Replace with your model weights file model_architecture = f'DeepLoop_models/CPGZ_trained/{selected_depth}.json' # Replace with your model architecture file with open(model_architecture, 'r') as f: model = model_from_json(f.read()) model.load_weights(model_weights) # Get the tile center_anchor = int(len(anchor_list) / 2) i = max(0, center_anchor - int(tile_size / 2)) j = i + tile_size tile = input_matrix[i:j, i:j].toarray() tile = np.expand_dims(tile, -1) tile = np.expand_dims(tile, 0) # Predict the output denoised_tile = model.predict(tile).reshape((tile_size, tile_size)) denoised_tile[denoised_tile < 0] = 0 # Normalize the tiles tile = tile[0, ..., 0] denoised_tile = (denoised_tile + denoised_tile.T) / 2 # Visualize the tiles fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4)) draw_heatmap(tile, 0, ax=ax1) draw_heatmap(denoised_tile, 0, ax=ax2) ax1.set_title('Input Tile') ax2.set_title(f'{selected_depth} model') plt.tight_layout() # return as a numpy array fig.canvas.draw() data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,)) plt.close(fig) return data def upload_file(file): global input_file, input_matrix print(file) input_file = file input_matrix = load_chr_ratio_matrix_from_sparse(os.path.dirname(input_file), os.path.basename(input_file), os.path.dirname(anchor_file), force_symmetry=True) with gr.Blocks() as demo: with gr.Row(): upload = gr.UploadButton("Upload a file", file_count="single") with gr.Row(): slider = gr.Slider(minimum=0, maximum=len(model_depths) - 1, step=1, label='Model Depth', interactive=True) heatmap = gr.Image(label='Visualization') upload.upload(upload_file, upload) slider.change(predict, [slider], heatmap) if __name__ == "__main__": demo.queue().launch()