Spaces:
Running
Running
| import os | |
| from tqdm.auto import tqdm | |
| import RNAutils | |
| import numpy as np | |
| def human_format(num): | |
| num = float("{:.3g}".format(num)) | |
| magnitude = 0 | |
| while abs(num) >= 1000: | |
| magnitude += 1 | |
| num /= 1000.0 | |
| return "{}{}".format( | |
| "{:f}".format(num).rstrip("0").rstrip("."), ["", "K", "M", "B", "T"][magnitude] | |
| ) | |
| def hamming(s1, s2): | |
| """Calculate the Hamming distance between two bit strings""" | |
| assert len(s1) == len(s2) | |
| if s1 == s2: | |
| return 0 # optimization in case strings are equal | |
| return sum(c1 != c2 for c1, c2 in zip(s1, s2)) | |
| def revcomp(str): | |
| complement = { | |
| "A": "T", | |
| "C": "G", | |
| "G": "C", | |
| "T": "A", | |
| "a": "t", | |
| "c": "g", | |
| "g": "c", | |
| "t": "a", | |
| } | |
| return "".join(complement.get(base, base) for base in reversed(str)) | |
| def get_qualities(str): | |
| return [ord(str[i]) - 33 for i in range(len(str))] | |
| def contains_Esp3I_site(str): | |
| return ("CGTCTC" in str) or ("GAGACG" in str) | |
| ## Reads a line from file, and updates tqdm | |
| def tqdm_readline(file, pbar): | |
| line = file.readline() | |
| pbar.update(len(line)) | |
| return line | |
| ## Reads both FASTQ file, and applies callback on each read | |
| ## Returns number of reads | |
| def process_paired_fastq_file(filename1, filename2, callback): | |
| file_size = os.path.getsize(filename1) | |
| with tqdm(total=file_size) as pbar: | |
| file1 = open(filename1, "r") | |
| file2 = open(filename2, "r") | |
| total_reads = 0 | |
| while True: | |
| temp = tqdm_readline(file1, pbar).strip() # header | |
| if temp == "": | |
| break # end of file | |
| read_1 = tqdm_readline(file1, pbar).strip() | |
| tqdm_readline(file1, pbar) # header | |
| read_1_q = tqdm_readline(file1, pbar).strip() | |
| file2.readline() # header | |
| read_2 = file2.readline().strip() | |
| file2.readline() # header | |
| read_2_q = file2.readline().strip() | |
| callback(read_1, read_2, read_1_q, read_2_q) | |
| total_reads += 1 | |
| return total_reads | |
| PRE_SEQUENCE = "TCTGCCTATGTCTTTCTCTGCCATCCAGGTT" | |
| POST_SEQUENCE = "CAGGTCTGACTATGGGACCCTTGATGTTTT" | |
| def add_flanking(nts, flanking_len): | |
| return PRE_SEQUENCE[-flanking_len:] + nts + POST_SEQUENCE[:flanking_len] | |
| BARCODE_PRE_SEQUENCE = "CACAAGTATCACTAAGCTCGCTCTAGA" | |
| BARCODE_POST_SEQUENCE = "ATAGGGCCCGTTTAAACCCGCTGAT" | |
| def add_barcode_flanking(nts, flanking_len): | |
| return ( | |
| BARCODE_PRE_SEQUENCE[-flanking_len:] | |
| + nts | |
| + BARCODE_POST_SEQUENCE[:flanking_len] | |
| ) | |
| def rna_fold_structs( | |
| seq_nts, | |
| maxBPspan=0, | |
| RNAfold_bin="RNAfold", | |
| ): | |
| struct_mfes = RNAutils.RNAfold( | |
| seq_nts, | |
| maxBPspan=maxBPspan, # maxBPspan 0 means don't pass in maxBPpan | |
| RNAfold_bin=RNAfold_bin, | |
| ) | |
| structs = [e[0] for e in struct_mfes] | |
| mfes = np.array([e[1] for e in struct_mfes]) | |
| return structs, mfes | |
| def compute_structure( | |
| seq_nts, | |
| RNAfold_bin="RNAfold", | |
| ): | |
| structs, mfes = rna_fold_structs(seq_nts, RNAfold_bin=RNAfold_bin) | |
| # one-hot-encode structure | |
| struct_oh = np.array([folding_to_vector(x) for x in structs]) | |
| return struct_oh, structs, mfes | |
| def compute_seq_oh(seq_nts): | |
| return np.array( | |
| [nts_to_vector(x) for x in [seq.replace("U", "T") for seq in seq_nts]] | |
| ) | |
| def compute_wobbles(seq_nts, structs): | |
| return np.array( | |
| [ | |
| np.expand_dims(compute_wobble_indicator(x.replace("U", "T"), y), axis=-1) | |
| for (x, y) in zip(seq_nts, structs) | |
| ] | |
| ) | |
| def create_input_data( | |
| seq_nts, RNAfold_bin="RNAfold" | |
| ): | |
| # get sequence one-hot-encodings | |
| seq_oh = compute_seq_oh(seq_nts) | |
| # get structure one-hot-encodings and mfe | |
| struct_oh, structs, _ = compute_structure(seq_nts, RNAfold_bin=RNAfold_bin) | |
| # compute wobble pairs | |
| wobbles = compute_wobbles(seq_nts, structs) | |
| return seq_oh, struct_oh, wobbles | |
| def ei_vec(i, len): # give a one-hot encoding | |
| result = [0 for i in range(len)] | |
| result[i] = 1 | |
| return result | |
| def str_to_vector(str, template): | |
| # return [ei_vec(template.index(nt),len(template)) for nt in str] | |
| mapping = dict(zip(template, range(len(template)))) | |
| seq = [mapping[i] for i in str] | |
| return np.eye(len(template))[seq] | |
| def nts_to_vector(nts, rna=False): | |
| if rna: | |
| return str_to_vector(nts, "ACGU") | |
| return str_to_vector(nts, "ACGT") | |
| def folding_to_vector(nts): | |
| # return str_to_vector(nts, ".,|{}()") | |
| return str_to_vector(nts, ".()") | |
| def find_parentheses(s): | |
| """Find and return the location of the matching parentheses pairs in s. | |
| Given a string, s, return a dictionary of start: end pairs giving the | |
| indexes of the matching parentheses in s. Suitable exceptions are | |
| raised if s contains unbalanced parentheses. | |
| """ | |
| # The indexes of the open parentheses are stored in a stack, implemented | |
| # as a list | |
| stack = [] | |
| parentheses_locs = {} | |
| for i, c in enumerate(s): | |
| if c == "(": | |
| stack.append(i) | |
| elif c == ")": | |
| try: | |
| parentheses_locs[stack.pop()] = i | |
| except IndexError: | |
| raise IndexError("Too many close parentheses at index {}".format(i)) | |
| if stack: | |
| raise IndexError( | |
| "No matching close parenthesis to open parenthesis " | |
| "at index {}".format(stack.pop()) | |
| ) | |
| return parentheses_locs | |
| # compute_bijection("(((....)))....(...)") | |
| # array([ 9, 8, 7, 3, 4, 5, 6, 2, 1, 0, 10, 11, 12, 13, 18, 15, 16, | |
| # 17, 14]) | |
| def compute_bijection(s): | |
| parens = find_parentheses(s) | |
| ret = np.arange(len(s)) | |
| for x in parens: | |
| ret[x] = parens[x] | |
| ret[parens[x]] = x | |
| return ret | |
| def compute_wobble_indicator(sequence, structure): | |
| # Compute an indicator vector of all the wobble base pairs (G-U or U-G) | |
| assert len(sequence) == len(structure) | |
| assert set(sequence).issubset( | |
| {"A", "C", "G", "T"} | |
| ), "Unknown character found in sequence" | |
| bij = compute_bijection(structure) | |
| return [ | |
| (1 if {sequence[i], sequence[bij[i]]} == {"G", "T"} else 0) | |
| for i in range(len(sequence)) | |
| ] | |