Spaces:
Runtime error
Runtime error
| from keras import Model | |
| from keras.layers import Input | |
| from keras.layers import Multiply | |
| from keras.layers import Dense, Dropout, Activation, Flatten | |
| from keras.layers import Convolution1D, AveragePooling1D | |
| import pandas as pd | |
| import numpy as np | |
| import keras | |
| import requests | |
| from functools import reduce | |
| from operator import add | |
| from Bio.SeqRecord import SeqRecord | |
| from Bio.SeqFeature import SeqFeature, FeatureLocation | |
| from Bio.Seq import Seq | |
| from Bio import SeqIO | |
| ntmap = {'A': (1, 0, 0, 0), | |
| 'C': (0, 1, 0, 0), | |
| 'G': (0, 0, 1, 0), | |
| 'T': (0, 0, 0, 1) | |
| } | |
| def get_seqcode(seq): | |
| return np.array(reduce(add, map(lambda c: ntmap[c], seq.upper()))).reshape((1, len(seq), -1)) | |
| def Seq_DeepCpf1_model(input_shape): | |
| Seq_deepCpf1_Input_SEQ = Input(shape=input_shape) | |
| Seq_deepCpf1_C1 = Convolution1D(80, 5, activation='relu')(Seq_deepCpf1_Input_SEQ) | |
| Seq_deepCpf1_P1 = AveragePooling1D(2)(Seq_deepCpf1_C1) | |
| Seq_deepCpf1_F = Flatten()(Seq_deepCpf1_P1) | |
| Seq_deepCpf1_DO1 = Dropout(0.3)(Seq_deepCpf1_F) | |
| Seq_deepCpf1_D1 = Dense(80, activation='relu')(Seq_deepCpf1_DO1) | |
| Seq_deepCpf1_DO2 = Dropout(0.3)(Seq_deepCpf1_D1) | |
| Seq_deepCpf1_D2 = Dense(40, activation='relu')(Seq_deepCpf1_DO2) | |
| Seq_deepCpf1_DO3 = Dropout(0.3)(Seq_deepCpf1_D2) | |
| Seq_deepCpf1_D3 = Dense(40, activation='relu')(Seq_deepCpf1_DO3) | |
| Seq_deepCpf1_DO4 = Dropout(0.3)(Seq_deepCpf1_D3) | |
| Seq_deepCpf1_Output = Dense(1, activation='linear')(Seq_deepCpf1_DO4) | |
| Seq_deepCpf1 = Model(inputs=[Seq_deepCpf1_Input_SEQ], outputs=[Seq_deepCpf1_Output]) | |
| return Seq_deepCpf1 | |
| # seq-ca model (DeepCpf1) | |
| def DeepCpf1_model(input_shape): | |
| DeepCpf1_Input_SEQ = Input(shape=input_shape) | |
| DeepCpf1_C1 = Convolution1D(80, 5, activation='relu')(DeepCpf1_Input_SEQ) | |
| DeepCpf1_P1 = AveragePooling1D(2)(DeepCpf1_C1) | |
| DeepCpf1_F = Flatten()(DeepCpf1_P1) | |
| DeepCpf1_DO1 = Dropout(0.3)(DeepCpf1_F) | |
| DeepCpf1_D1 = Dense(80, activation='relu')(DeepCpf1_DO1) | |
| DeepCpf1_DO2 = Dropout(0.3)(DeepCpf1_D1) | |
| DeepCpf1_D2 = Dense(40, activation='relu')(DeepCpf1_DO2) | |
| DeepCpf1_DO3 = Dropout(0.3)(DeepCpf1_D2) | |
| DeepCpf1_D3_SEQ = Dense(40, activation='relu')(DeepCpf1_DO3) | |
| DeepCpf1_Input_CA = Input(shape=(1,)) | |
| DeepCpf1_D3_CA = Dense(40, activation='relu')(DeepCpf1_Input_CA) | |
| DeepCpf1_M = Multiply()([DeepCpf1_D3_SEQ, DeepCpf1_D3_CA]) | |
| DeepCpf1_DO4 = Dropout(0.3)(DeepCpf1_M) | |
| DeepCpf1_Output = Dense(1, activation='linear')(DeepCpf1_DO4) | |
| DeepCpf1 = Model(inputs=[DeepCpf1_Input_SEQ, DeepCpf1_Input_CA], outputs=[DeepCpf1_Output]) | |
| return DeepCpf1 | |
| def fetch_ensembl_transcripts(gene_symbol): | |
| url = f"https://rest.ensembl.org/lookup/symbol/homo_sapiens/{gene_symbol}?expand=1;content-type=application/json" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| gene_data = response.json() | |
| if 'Transcript' in gene_data: | |
| return gene_data['Transcript'] | |
| else: | |
| print("No transcripts found for gene:", gene_symbol) | |
| return None | |
| else: | |
| print(f"Error fetching gene data from Ensembl: {response.text}") | |
| return None | |
| def fetch_ensembl_sequence(transcript_id): | |
| url = f"https://rest.ensembl.org/sequence/id/{transcript_id}?content-type=application/json" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| sequence_data = response.json() | |
| if 'seq' in sequence_data: | |
| return sequence_data['seq'] | |
| else: | |
| print("No sequence found for transcript:", transcript_id) | |
| return None | |
| else: | |
| print(f"Error fetching sequence data from Ensembl: {response.text}") | |
| return None | |
| def find_crispr_targets(sequence, chr, start, strand, transcript_id, exon_id, pam="TTTN", target_length=34): | |
| targets = [] | |
| len_sequence = len(sequence) | |
| complement = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} | |
| dnatorna = {'A': 'A', 'T': 'U', 'C': 'C', 'G': 'G'} | |
| if strand == -1: | |
| sequence = ''.join([complement[base] for base in sequence]) | |
| for i in range(len_sequence - target_length + 1): | |
| target_seq = sequence[i:i + target_length] | |
| if target_seq[4:7] == 'TTT': | |
| tar_start = start + i | |
| tar_end = start + i + target_length | |
| gRNA = ''.join([dnatorna[base] for base in target_seq[8:28]]) | |
| targets.append([target_seq, gRNA, chr, str(tar_start), str(tar_end), str(strand), transcript_id, exon_id]) | |
| return targets | |
| def format_prediction_output(targets, model_path): | |
| # Loading weights for the model | |
| Seq_deepCpf1 = Seq_DeepCpf1_model(input_shape=(34, 4)) | |
| Seq_deepCpf1.load_weights(model_path) | |
| formatted_data = [] | |
| for target in targets: | |
| # Predict | |
| encoded_seq = get_seqcode(target[0]) | |
| prediction = float(list(Seq_deepCpf1.predict(encoded_seq)[0])[0]) | |
| if prediction > 100: | |
| prediction = 100 | |
| # Format output | |
| gRNA = target[1] | |
| chr = target[2] | |
| start = target[3] | |
| end = target[4] | |
| strand = target[5] | |
| transcript_id = target[6] | |
| exon_id = target[7] | |
| formatted_data.append([chr, start, end, strand, transcript_id, exon_id, target[0], gRNA, prediction]) | |
| return formatted_data | |
| def process_gene(gene_symbol, model_path): | |
| transcripts = fetch_ensembl_transcripts(gene_symbol) | |
| results = [] | |
| if transcripts: | |
| for i in range(len(transcripts)): | |
| Exons = transcripts[i]['Exon'] | |
| transcript_id = transcripts[i]['id'] | |
| for j in range(len(Exons)): | |
| exon_id = Exons[j]['id'] | |
| gene_sequence = fetch_ensembl_sequence(exon_id) | |
| if gene_sequence: | |
| start = Exons[j]['start'] | |
| strand = Exons[j]['strand'] | |
| chr = Exons[j]['seq_region_name'] | |
| targets = find_crispr_targets(gene_sequence, chr, start, strand, transcript_id, exon_id) | |
| if targets: | |
| formatted_data = format_prediction_output(targets, model_path) | |
| results.append(formatted_data) | |
| # for data in formatted_data: | |
| # print(f"Chr: {data[0]}, Start: {data[1]}, End: {data[2]}, Strand: {data[3]}, target: {data[4]}, gRNA: {data[5]}, pred_Score: {data[6]}") | |
| else: | |
| print("Failed to retrieve gene sequence.") | |
| else: | |
| print("Failed to retrieve transcripts.") | |
| return results, gene_sequence, Exons | |
| # def create_genbank_features(formatted_data): | |
| # features = [] | |
| # for data in formatted_data: | |
| # try: | |
| # # Attempt to convert start and end positions to integers | |
| # start = int(data[1]) | |
| # end = int(data[2]) | |
| # except ValueError as e: | |
| # # Log the error and skip this iteration if conversion fails | |
| # print(f"Error converting start/end to int: {data[1]}, {data[2]} - {e}") | |
| # continue # Skip this iteration | |
| # | |
| # # Proceed as normal if conversion is successful | |
| # strand = 1 if data[3] == '+' else -1 | |
| # location = FeatureLocation(start=start, end=end, strand=strand) | |
| # feature = SeqFeature(location=location, type="misc_feature", qualifiers={ | |
| # 'label': data[5], # gRNA as label | |
| # 'note': f"Prediction: {data[6]}" # Prediction score in note | |
| # }) | |
| # features.append(feature) | |
| # return features | |
| # | |
| # def generate_genbank_file_from_data(formatted_data, gene_sequence, gene_symbol, output_path): | |
| # features = create_genbank_features(formatted_data) | |
| # record = SeqRecord(Seq(gene_sequence), id=gene_symbol, name=gene_symbol, | |
| # description='CRISPR Cas12 predicted targets', features=features) | |
| # record.annotations["molecule_type"] = "DNA" | |
| # SeqIO.write(record, output_path, "genbank") | |
| # | |
| # def create_csv_from_df(df, output_path): | |
| # df.to_csv(output_path, index=False) | |
| # | |
| # def generate_bed_file_from_data(formatted_data, output_path): | |
| # with open(output_path, 'w') as bed_file: | |
| # for data in formatted_data: | |
| # try: | |
| # # Ensure data has the expected number of elements | |
| # if len(data) < 7: | |
| # raise ValueError("Incomplete data item") | |
| # | |
| # chrom = data[0] | |
| # start = data[1] | |
| # end = data[2] | |
| # strand = '+' if data[3] == '+' else '-' | |
| # gRNA = data[5] | |
| # score = data[6] # Ensure this index exists | |
| # | |
| # bed_file.write(f"{chrom}\t{start}\t{end}\t{gRNA}\t{score}\t{strand}\n") | |
| # except ValueError as e: | |
| # print(f"Skipping an item due to error: {e}") | |
| # continue |