| import argparse |
| import random |
| import numpy as np |
| |
|
|
| def cut_no_overlap(length, kmer=1, max_prob=0.5): |
| cuts = [] |
| while length: |
| if length <= 509+kmer: |
| cuts.append(length) |
| break |
| else: |
| if random.random() > max_prob: |
| cut = max(int(random.random()*(509+kmer)), 5) |
| else: |
| cut = 509+kmer |
| cuts.append(cut) |
| length -= cut |
|
|
| return cuts |
|
|
|
|
| def sampling(length, kmer=1, sampling_rate=1): |
| times = int(length*sampling_rate/256) |
| starts = [] |
| ends = [] |
| for i in range(times): |
| cut = max(int(random.random()*(509+kmer)), 5) |
| start = np.random.randint(length-kmer) |
| starts.append(start) |
| ends.append(start+cut) |
| |
| return starts, ends |
|
|
|
|
| def sampling_fix(length, kmer=1, sampling_rate=1, fix_length=10245): |
| times = int(length*sampling_rate/fix_length) |
| starts = [] |
| ends = [] |
| for i in range(times): |
| cut = fix_length |
| start = np.random.randint(length-6-fix_length) |
| starts.append(start) |
| ends.append(start+cut) |
| |
| return starts, ends |
|
|
|
|
| def get_kmer_sentence(original_string, kmer=1, stride=1): |
| if kmer == -1: |
| return original_string |
|
|
| sentence = "" |
| original_string = original_string.replace("\n", "") |
| i = 0 |
| while i < len(original_string)-kmer: |
| sentence += original_string[i:i+kmer] + " " |
| i += stride |
| |
| return sentence[:-1].strip("\"") |
|
|
|
|
|
|
| def get_kmer_sequence(original_string, kmer=1): |
| if kmer == -1: |
| return original_string |
|
|
| sequence = [] |
| original_string = original_string.replace("\n", "") |
| for i in range(len(original_string)-kmer): |
| sequence.append(original_string[i:i+kmer]) |
| |
| sequence.append(original_string[-kmer:]) |
| return sequence |
|
|
| def Process(args): |
| old_file = open(args.file_path, "r") |
| if args.output_path == None: |
| args.output_path = args.file_path |
|
|
| if args.sampling_rate!=1.0: |
| new_file_path = args.output_path + "_sam" + str(args.kmer) |
| else: |
| new_file_path = args.output_path + "_cut" + str(args.kmer) |
| new_file = open(new_file_path, "w") |
| line = old_file.readline() |
| while line: |
| line_length = len(line) |
| if args.sampling_rate != 1.0: |
| starts, ends = sampling_fix(length=line_length, kmer=args.kmer, sampling_rate=args.sampling_rate, fix_length=args.length) |
| for i in range(len(starts)): |
| new_line = line[starts[i]:ends[i]] |
| sentence = get_kmer_sentence(new_line, kmer=args.kmer) |
| new_file.write(sentence + "\n") |
| |
| else: |
| cuts = cut_no_overlap(length=line_length, kmer=args.kmer) |
| start = 0 |
| for cut in cuts: |
| new_line = line[start:start+cut] |
| sentence = get_kmer_sentence(new_line, kmer=args.kmer) |
| start += cut |
| new_file.write(sentence + "\n") |
| |
| line = old_file.readline() |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--sampling_rate", |
| default=1.0, |
| type=float, |
| help="We will sample sampling_rate*total_length*2/512 times", |
| ) |
| parser.add_argument( |
| "--kmer", |
| default=1, |
| type=int, |
| help="K-mer", |
| ) |
| parser.add_argument( |
| "--length", |
| default=10000, |
| type=int, |
| help="Length of the sampled sequence", |
| ) |
| parser.add_argument( |
| "--file_path", |
| default=None, |
| type=str, |
| help="The path of the file to be processed", |
| ) |
| parser.add_argument( |
| "--output_path", |
| default=None, |
| type=str, |
| help="The path of the processed data", |
| ) |
| args = parser.parse_args() |
|
|
| Process(args) |
|
|
| |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|