File size: 3,905 Bytes
ab6c03c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | import argparse
import random
import numpy as np
def cut_no_overlap(length, kmer=1, max_prob=0.5):
cuts = []
while length:
if length <= 509+kmer:
cuts.append(length)
break
else:
if random.random() > max_prob:
cut = max(int(random.random()*(509+kmer)), 5)
else:
cut = 509+kmer
cuts.append(cut)
length -= cut
return cuts
def sampling(length, kmer=1, sampling_rate=1):
times = int(length*sampling_rate/256)
starts = []
ends = []
for i in range(times):
cut = max(int(random.random()*(509+kmer)), 5)
start = np.random.randint(length-kmer)
starts.append(start)
ends.append(start+cut)
return starts, ends
def sampling_fix(length, kmer=1, sampling_rate=1, fix_length=10245):
times = int(length*sampling_rate/fix_length)
starts = []
ends = []
for i in range(times):
cut = fix_length
start = np.random.randint(length-6-fix_length)
starts.append(start)
ends.append(start+cut)
return starts, ends
def get_kmer_sentence(original_string, kmer=1, stride=1):
if kmer == -1:
return original_string
sentence = ""
original_string = original_string.replace("\n", "")
i = 0
while i < len(original_string)-kmer:
sentence += original_string[i:i+kmer] + " "
i += stride
return sentence[:-1].strip("\"")
def get_kmer_sequence(original_string, kmer=1):
if kmer == -1:
return original_string
sequence = []
original_string = original_string.replace("\n", "")
for i in range(len(original_string)-kmer):
sequence.append(original_string[i:i+kmer])
sequence.append(original_string[-kmer:])
return sequence
def Process(args):
old_file = open(args.file_path, "r")
if args.output_path == None:
args.output_path = args.file_path
if args.sampling_rate!=1.0:
new_file_path = args.output_path + "_sam" + str(args.kmer)
else:
new_file_path = args.output_path + "_cut" + str(args.kmer)
new_file = open(new_file_path, "w")
line = old_file.readline()
while line:
line_length = len(line)
if args.sampling_rate != 1.0:
starts, ends = sampling_fix(length=line_length, kmer=args.kmer, sampling_rate=args.sampling_rate, fix_length=args.length)
for i in range(len(starts)):
new_line = line[starts[i]:ends[i]]
sentence = get_kmer_sentence(new_line, kmer=args.kmer)
new_file.write(sentence + "\n")
else:
cuts = cut_no_overlap(length=line_length, kmer=args.kmer)
start = 0
for cut in cuts:
new_line = line[start:start+cut]
sentence = get_kmer_sentence(new_line, kmer=args.kmer)
start += cut
new_file.write(sentence + "\n")
line = old_file.readline()
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--sampling_rate",
default=1.0,
type=float,
help="We will sample sampling_rate*total_length*2/512 times",
)
parser.add_argument(
"--kmer",
default=1,
type=int,
help="K-mer",
)
parser.add_argument(
"--length",
default=10000,
type=int,
help="Length of the sampled sequence",
)
parser.add_argument(
"--file_path",
default=None,
type=str,
help="The path of the file to be processed",
)
parser.add_argument(
"--output_path",
default=None,
type=str,
help="The path of the processed data",
)
args = parser.parse_args()
Process(args)
if __name__ == "__main__":
main()
|