Spaces:
Runtime error
Runtime error
| import tensorflow as tf | |
| import numpy as np | |
| import pandas as pd | |
| import os | |
| import argparse | |
| # configure GPUs | |
| for gpu in tf.config.list_physical_devices('GPU'): | |
| tf.config.experimental.set_memory_growth(gpu, enable=True) | |
| if len(tf.config.list_physical_devices('GPU')) > 0: | |
| tf.config.experimental.set_visible_devices(tf.config.list_physical_devices('GPU')[0], 'GPU') | |
| class Encoder: | |
| def __init__(self, on_seq, off_seq, with_category = False, label = None, with_reg_val = False, value = None): | |
| tlen = 24 | |
| self.on_seq = "-" *(tlen-len(on_seq)) + on_seq | |
| self.off_seq = "-" *(tlen-len(off_seq)) + off_seq | |
| self.encoded_dict_indel = {'A': [1, 0, 0, 0, 0], 'T': [0, 1, 0, 0, 0], | |
| 'G': [0, 0, 1, 0, 0], 'C': [0, 0, 0, 1, 0], '_': [0, 0, 0, 0, 1], '-': [0, 0, 0, 0, 0]} | |
| self.direction_dict = {'A':5, 'G':4, 'C':3, 'T':2, '_':1} | |
| if with_category: | |
| self.label = label | |
| if with_reg_val: | |
| self.value = value | |
| self.encode_on_off_dim7() | |
| def encode_sgRNA(self): | |
| code_list = [] | |
| encoded_dict = self.encoded_dict_indel | |
| sgRNA_bases = list(self.on_seq) | |
| for i in range(len(sgRNA_bases)): | |
| if sgRNA_bases[i] == "N": | |
| sgRNA_bases[i] = list(self.off_seq)[i] | |
| code_list.append(encoded_dict[sgRNA_bases[i]]) | |
| self.sgRNA_code = np.array(code_list) | |
| def encode_off(self): | |
| code_list = [] | |
| encoded_dict = self.encoded_dict_indel | |
| off_bases = list(self.off_seq) | |
| for i in range(len(off_bases)): | |
| code_list.append(encoded_dict[off_bases[i]]) | |
| self.off_code = np.array(code_list) | |
| def encode_on_off_dim7(self): | |
| self.encode_sgRNA() | |
| self.encode_off() | |
| on_bases = list(self.on_seq) | |
| off_bases = list(self.off_seq) | |
| on_off_dim7_codes = [] | |
| for i in range(len(on_bases)): | |
| diff_code = np.bitwise_or(self.sgRNA_code[i], self.off_code[i]) | |
| on_b = on_bases[i] | |
| off_b = off_bases[i] | |
| if on_b == "N": | |
| on_b = off_b | |
| dir_code = np.zeros(2) | |
| if on_b == "-" or off_b == "-" or self.direction_dict[on_b] == self.direction_dict[off_b]: | |
| pass | |
| else: | |
| if self.direction_dict[on_b] > self.direction_dict[off_b]: | |
| dir_code[0] = 1 | |
| else: | |
| dir_code[1] = 1 | |
| on_off_dim7_codes.append(np.concatenate((diff_code, dir_code))) | |
| self.on_off_code = np.array(on_off_dim7_codes) | |
| def encode_on_off_seq_pairs(input_file): | |
| inputs = pd.read_csv(input_file, delimiter=",", header=None, names=['on_seq', 'off_seq']) | |
| input_codes = [] | |
| for idx, row in inputs.iterrows(): | |
| on_seq = row['on_seq'] | |
| off_seq = row['off_seq'] | |
| en = Encoder(on_seq=on_seq, off_seq=off_seq) | |
| input_codes.append(en.on_off_code) | |
| input_codes = np.array(input_codes) | |
| input_codes = input_codes.reshape((len(input_codes), 1, 24, 7)) | |
| y_pred = CRISPR_net_predict(input_codes) | |
| inputs['CRISPR_Net_score'] = y_pred | |
| inputs.to_csv("CRISPR_net_results.csv", index=False) | |
| def CRISPR_net_predict(X_test): | |
| json_file = open("cas9_model/CRISPR_Net_CIRCLE_elevation_SITE_structure.json", 'r') | |
| loaded_model_json = json_file.read() | |
| json_file.close() | |
| loaded_model = tf.keras.models.model_from_json(loaded_model_json) # Updated for TensorFlow 2 | |
| loaded_model.load_weights("cas9_model/CRISPR_Net_CIRCLE_elevation_SITE_weights.h5") | |
| y_pred = loaded_model.predict(X_test).flatten() | |
| return y_pred | |
| def process_input_and_predict(input_data, input_type='manual'): | |
| if input_type == 'manual': | |
| sequences = [seq.split(',') for seq in input_data.split('\n')] | |
| inputs = pd.DataFrame(sequences, columns=['on_seq', 'off_seq']) | |
| elif input_type == 'file': | |
| inputs = pd.read_csv(input_data, delimiter=",", header=None, names=['on_seq', 'off_seq']) | |
| valid_inputs = [] | |
| input_codes = [] | |
| for idx, row in inputs.iterrows(): | |
| on_seq = row['on_seq'] | |
| off_seq = row['off_seq'] | |
| if not on_seq or not off_seq: | |
| continue | |
| en = Encoder(on_seq=on_seq, off_seq=off_seq) | |
| input_codes.append(en.on_off_code) | |
| valid_inputs.append((on_seq, off_seq)) | |
| input_codes = np.array(input_codes) | |
| input_codes = input_codes.reshape((len(input_codes), 1, 24, 7)) | |
| y_pred = CRISPR_net_predict(input_codes) | |
| # Create a new DataFrame from valid inputs and predictions | |
| result_df = pd.DataFrame(valid_inputs, columns=['on_seq', 'off_seq']) | |
| result_df['CRISPR_Net_score'] = y_pred | |
| return result_df | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser(description="CRISPR-Net v1.0 (Aug 10 2019)") | |
| parser.add_argument("input_file", | |
| help="input_file example (on-target seq, off-target seq):\n GAGT_CCGAGCAGAAGAAGAATGG,GAGTACCAAGTAGAAGAAAAATTT\n" | |
| "GTTGCCCCACAGGGCAGTAAAGG,GTGGACACCCCGGGCAGGAAAGG\n" | |
| "GGGTGGGGGGAGTTTGCTCCAGG,AGGTGGGGTGA_TTTGCTCCAGG") | |
| args = parser.parse_args() | |
| file = args.input_file | |
| if not os.path.exists(args.input_file): | |
| print("File doesn't exist!") | |
| else: | |
| encode_on_off_seq_pairs(file) | |
| tf.keras.backend.clear_session() |