File size: 3,905 Bytes
ab6c03c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import argparse
import random
import numpy as np
     

def cut_no_overlap(length, kmer=1, max_prob=0.5):
    cuts = []
    while length:
        if length <= 509+kmer:
            cuts.append(length)
            break
        else:
            if random.random() > max_prob:
                cut = max(int(random.random()*(509+kmer)), 5)
            else:
                cut = 509+kmer
            cuts.append(cut)
            length -= cut

    return cuts


def sampling(length, kmer=1, sampling_rate=1):
    times = int(length*sampling_rate/256)
    starts = []
    ends = []
    for i in range(times):
        cut = max(int(random.random()*(509+kmer)), 5)
        start = np.random.randint(length-kmer)
        starts.append(start)
        ends.append(start+cut)
    
    return starts, ends


def sampling_fix(length, kmer=1, sampling_rate=1, fix_length=10245):
    times = int(length*sampling_rate/fix_length)
    starts = []
    ends = []
    for i in range(times):
        cut = fix_length
        start = np.random.randint(length-6-fix_length)
        starts.append(start)
        ends.append(start+cut)
    
    return starts, ends


def get_kmer_sentence(original_string, kmer=1, stride=1):
    if kmer == -1:
        return original_string

    sentence = ""
    original_string = original_string.replace("\n", "")
    i = 0
    while i < len(original_string)-kmer:
        sentence += original_string[i:i+kmer] + " "
        i += stride
    
    return sentence[:-1].strip("\"")



def get_kmer_sequence(original_string, kmer=1):
    if kmer == -1:
        return original_string

    sequence = []
    original_string = original_string.replace("\n", "")
    for i in range(len(original_string)-kmer):
        sequence.append(original_string[i:i+kmer])
    
    sequence.append(original_string[-kmer:])
    return sequence

def Process(args):
    old_file = open(args.file_path, "r")
    if args.output_path == None:
        args.output_path = args.file_path

    if args.sampling_rate!=1.0:
        new_file_path = args.output_path + "_sam" + str(args.kmer)
    else:
        new_file_path = args.output_path + "_cut" + str(args.kmer)
    new_file = open(new_file_path, "w")
    line = old_file.readline()
    while line:
        line_length = len(line)
        if args.sampling_rate != 1.0:
            starts, ends = sampling_fix(length=line_length, kmer=args.kmer, sampling_rate=args.sampling_rate, fix_length=args.length)
            for i in range(len(starts)):
                new_line = line[starts[i]:ends[i]]
                sentence = get_kmer_sentence(new_line, kmer=args.kmer)
                new_file.write(sentence + "\n")
            
        else:
            cuts = cut_no_overlap(length=line_length, kmer=args.kmer)
            start = 0
            for cut in cuts:
                new_line = line[start:start+cut]
                sentence = get_kmer_sentence(new_line, kmer=args.kmer)
                start += cut
                new_file.write(sentence + "\n")
                
        line = old_file.readline()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--sampling_rate", 
        default=1.0,
        type=float,
        help="We will sample sampling_rate*total_length*2/512 times",
    )
    parser.add_argument(
        "--kmer",
        default=1,
        type=int,
        help="K-mer",
    )
    parser.add_argument(
        "--length",
        default=10000,
        type=int,
        help="Length of the sampled sequence",
    )
    parser.add_argument(
        "--file_path",
        default=None,
        type=str,
        help="The path of the file to be processed",
    )
    parser.add_argument(
        "--output_path",
        default=None,
        type=str,
        help="The path of the processed data",
    )
    args = parser.parse_args()

    Process(args)

    


if __name__ == "__main__":
    main()