Oded Regev
first commit
6b844e3
import os
from tqdm.auto import tqdm
import RNAutils
import numpy as np
def human_format(num):
num = float("{:.3g}".format(num))
magnitude = 0
while abs(num) >= 1000:
magnitude += 1
num /= 1000.0
return "{}{}".format(
"{:f}".format(num).rstrip("0").rstrip("."), ["", "K", "M", "B", "T"][magnitude]
)
def hamming(s1, s2):
"""Calculate the Hamming distance between two bit strings"""
assert len(s1) == len(s2)
if s1 == s2:
return 0 # optimization in case strings are equal
return sum(c1 != c2 for c1, c2 in zip(s1, s2))
def revcomp(str):
complement = {
"A": "T",
"C": "G",
"G": "C",
"T": "A",
"a": "t",
"c": "g",
"g": "c",
"t": "a",
}
return "".join(complement.get(base, base) for base in reversed(str))
def get_qualities(str):
return [ord(str[i]) - 33 for i in range(len(str))]
def contains_Esp3I_site(str):
return ("CGTCTC" in str) or ("GAGACG" in str)
## Reads a line from file, and updates tqdm
def tqdm_readline(file, pbar):
line = file.readline()
pbar.update(len(line))
return line
## Reads both FASTQ file, and applies callback on each read
## Returns number of reads
def process_paired_fastq_file(filename1, filename2, callback):
file_size = os.path.getsize(filename1)
with tqdm(total=file_size) as pbar:
file1 = open(filename1, "r")
file2 = open(filename2, "r")
total_reads = 0
while True:
temp = tqdm_readline(file1, pbar).strip() # header
if temp == "":
break # end of file
read_1 = tqdm_readline(file1, pbar).strip()
tqdm_readline(file1, pbar) # header
read_1_q = tqdm_readline(file1, pbar).strip()
file2.readline() # header
read_2 = file2.readline().strip()
file2.readline() # header
read_2_q = file2.readline().strip()
callback(read_1, read_2, read_1_q, read_2_q)
total_reads += 1
return total_reads
PRE_SEQUENCE = "TCTGCCTATGTCTTTCTCTGCCATCCAGGTT"
POST_SEQUENCE = "CAGGTCTGACTATGGGACCCTTGATGTTTT"
def add_flanking(nts, flanking_len):
return PRE_SEQUENCE[-flanking_len:] + nts + POST_SEQUENCE[:flanking_len]
BARCODE_PRE_SEQUENCE = "CACAAGTATCACTAAGCTCGCTCTAGA"
BARCODE_POST_SEQUENCE = "ATAGGGCCCGTTTAAACCCGCTGAT"
def add_barcode_flanking(nts, flanking_len):
return (
BARCODE_PRE_SEQUENCE[-flanking_len:]
+ nts
+ BARCODE_POST_SEQUENCE[:flanking_len]
)
def rna_fold_structs(
seq_nts,
maxBPspan=0,
RNAfold_bin="RNAfold",
):
struct_mfes = RNAutils.RNAfold(
seq_nts,
maxBPspan=maxBPspan, # maxBPspan 0 means don't pass in maxBPpan
RNAfold_bin=RNAfold_bin,
)
structs = [e[0] for e in struct_mfes]
mfes = np.array([e[1] for e in struct_mfes])
return structs, mfes
def compute_structure(
seq_nts,
RNAfold_bin="RNAfold",
):
structs, mfes = rna_fold_structs(seq_nts, RNAfold_bin=RNAfold_bin)
# one-hot-encode structure
struct_oh = np.array([folding_to_vector(x) for x in structs])
return struct_oh, structs, mfes
def compute_seq_oh(seq_nts):
return np.array(
[nts_to_vector(x) for x in [seq.replace("U", "T") for seq in seq_nts]]
)
def compute_wobbles(seq_nts, structs):
return np.array(
[
np.expand_dims(compute_wobble_indicator(x.replace("U", "T"), y), axis=-1)
for (x, y) in zip(seq_nts, structs)
]
)
def create_input_data(
seq_nts, RNAfold_bin="RNAfold"
):
# get sequence one-hot-encodings
seq_oh = compute_seq_oh(seq_nts)
# get structure one-hot-encodings and mfe
struct_oh, structs, _ = compute_structure(seq_nts, RNAfold_bin=RNAfold_bin)
# compute wobble pairs
wobbles = compute_wobbles(seq_nts, structs)
return seq_oh, struct_oh, wobbles
def ei_vec(i, len): # give a one-hot encoding
result = [0 for i in range(len)]
result[i] = 1
return result
def str_to_vector(str, template):
# return [ei_vec(template.index(nt),len(template)) for nt in str]
mapping = dict(zip(template, range(len(template))))
seq = [mapping[i] for i in str]
return np.eye(len(template))[seq]
def nts_to_vector(nts, rna=False):
if rna:
return str_to_vector(nts, "ACGU")
return str_to_vector(nts, "ACGT")
def folding_to_vector(nts):
# return str_to_vector(nts, ".,|{}()")
return str_to_vector(nts, ".()")
def find_parentheses(s):
"""Find and return the location of the matching parentheses pairs in s.
Given a string, s, return a dictionary of start: end pairs giving the
indexes of the matching parentheses in s. Suitable exceptions are
raised if s contains unbalanced parentheses.
"""
# The indexes of the open parentheses are stored in a stack, implemented
# as a list
stack = []
parentheses_locs = {}
for i, c in enumerate(s):
if c == "(":
stack.append(i)
elif c == ")":
try:
parentheses_locs[stack.pop()] = i
except IndexError:
raise IndexError("Too many close parentheses at index {}".format(i))
if stack:
raise IndexError(
"No matching close parenthesis to open parenthesis "
"at index {}".format(stack.pop())
)
return parentheses_locs
# compute_bijection("(((....)))....(...)")
# array([ 9, 8, 7, 3, 4, 5, 6, 2, 1, 0, 10, 11, 12, 13, 18, 15, 16,
# 17, 14])
def compute_bijection(s):
parens = find_parentheses(s)
ret = np.arange(len(s))
for x in parens:
ret[x] = parens[x]
ret[parens[x]] = x
return ret
def compute_wobble_indicator(sequence, structure):
# Compute an indicator vector of all the wobble base pairs (G-U or U-G)
assert len(sequence) == len(structure)
assert set(sequence).issubset(
{"A", "C", "G", "T"}
), "Unknown character found in sequence"
bij = compute_bijection(structure)
return [
(1 if {sequence[i], sequence[bij[i]]} == {"G", "T"} else 0)
for i in range(len(sequence))
]