File size: 5,214 Bytes
2be7364
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python
import pandas as pd
from loguru import logger
import pprint
import os

pd.set_option('display.max_columns', None)
"""
to get script working:
- fill required vars in .env
- run in terminal
export $(grep -v '^#' .env | xargs)

"""
standard_vcf_cols = ['#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT']


def record_result(vcf_out_path, vcf_df, required_vcf_cols, samples):
    new_info_header = [
        '##fileformat=VCFv4.2',
        '##FILTER=<ID=PASS,Description="All filters passed">',
        '##filedate=20230718',
        '##source="beagle.22Jul22.46e.jar"',
        '##INFO=<ID=AF,Number=A,Type=Float,Description="Estimated ALT Allele Frequencies">',
        '##INFO=<ID=DR2,Number=A,Type=Float,Description="Dosage R-Squared: estimated squared correlation between estimated REF dose [P(RA) + 2*P(RR)] and true REF dose">',
        '##INFO=<ID=IMP,Number=0,Type=Flag,Description="Imputed marker">',
        '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">',
        '##FORMAT=<ID=DS,Number=A,Type=Float,Description="estimated ALT dose [P(RA) + 2*P(AA)]">',
        '##contig=<ID=1,length=249250621>',
        '##INFO=<ID=AC,Number=A,Type=Integer,Description="Allele count in genotypes">',
        '##INFO=<ID=AN,Number=1,Type=Integer,Description="Total number of alleles in called genotypes">'
    ]
    with open(vcf_out_path, 'w') as vcf_file:
        for s in new_info_header:
            vcf_file.write(s + '\n')
        cols_header = '\t'.join(required_vcf_cols)
        vcf_file.write(f'{cols_header}\n')
        for _, row in vcf_df.iterrows():
            chrom = row['#CHROM']
            pos = row['POS']
            id = row['ID']
            ref = row['REF']
            alt = row['ALT']
            qual = row['QUAL']
            filter = row['FILTER']
            info = row['INFO']
            format = row['FORMAT']
            genotypes = list()
            for sample in samples:
                sample_genotype = row[f'{sample}']
                genotypes.append(sample_genotype)
            genotypes_str = '\t'.join(genotypes)
            vcf_record = f'{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{info}\t{format}\t{genotypes_str}\n'
            vcf_file.write(vcf_record)
        print('vcf recorded')


def convert_columns(snp_df, vcf_df, required_vcf_cols, samples):
    global standard_vcf_cols
    snp_df.rename(columns={'CHR': '#CHROM', 'bp': 'POS', 'allele1': 'REF', 'allele2': 'ALT'}, inplace=True)
    vcf_df.rename(columns={'#[1]CHROM': '#CHROM', '[2]POS': 'POS', '[3]ID': 'ID', '[4]REF': 'REF', '[5]ALT': 'ALT',
                           '[6](null)': 'INFO'}, inplace=True)
    for idx, sample in enumerate(samples):
        vcf_df.rename(columns={f'[{idx+7}]{sample}:GT': f'{sample}'}, inplace=True)
        snp_df[f'{sample}'] = None
    logger.info(f'snp_df.cols {snp_df.columns}')
    snp_df['#CHROM'] = snp_df['#CHROM'].astype(str)
    vcf_df['#CHROM'] = vcf_df['#CHROM'].astype(str)
    snp_df['POS'] = snp_df['POS'].astype(str)
    vcf_df['POS'] = vcf_df['POS'].astype(str)
    snp_df['REF'] = snp_df['REF'].astype(str)
    vcf_df['REF'] = vcf_df['REF'].astype(str)
    snp_df['ALT'] = snp_df['ALT'].astype(str)
    vcf_df['ALT'] = vcf_df['ALT'].astype(str)
    vcf_df['INFO'] = vcf_df['INFO'].astype(str)
    snp_df['ID'] = None
    snp_df['QUAL'] = '.'
    snp_df['FILTER'] = 'PASS'
    snp_df['INFO'] = '.'
    snp_df['FORMAT'] = 'GT'

    excess_cols = [col for col in snp_df.columns if col not in required_vcf_cols]
    snp_df.drop(excess_cols, axis=1, inplace=True)
    return snp_df, vcf_df


def main():
    input_snp = os.getenv('input_snp_path') # Annotations/UKBiobank/Input_SNPs_refseq.csv
    out_file = os.getenv('fitted_vcf')
    """
    target_vcf_snps - .csv file with necessary info from VCF, can be obtain by
    bcftools query -H -f '%CHROM\t%POS\t%ID\t%REF\t%ALT\t%INFO\t[%GT\t]\n' example.vcf.gz > example_snps.csv
    """
    target_vcf_snps = os.getenv('target_vcf_snps')
    samples = os.getenv('samples_to_extract').split(',')
    logger.info(f'samples: {samples}')
    vcf_df = pd.read_csv(target_vcf_snps, delimiter='\t', dtype={'#[1]CHROM': str})
    snp_df = pd.read_csv(input_snp, delimiter=',')
    required_vcf_cols = standard_vcf_cols + samples
    logger.info(f'required cols {required_vcf_cols}, snp_df.cols {snp_df.columns}')
    snp_df, vcf_df = convert_columns(snp_df, vcf_df, required_vcf_cols, samples)
    logger.info(
        f'after convert snp_df shape {snp_df.shape}, {snp_df.columns}\n, fit_vcf shape {vcf_df.shape}, {vcf_df.columns}')
    merged = snp_df.merge(vcf_df, on=['#CHROM', 'POS', 'REF', 'ALT'], how='left', suffixes=('_snp', '_vcf'))
    merged.rename(columns={'ID_vcf': 'ID', 'INFO_snp': 'INFO'}, inplace=True)
    for sample in samples:
        merged.rename(columns={f'{sample}_vcf': f'{sample}'}, inplace=True)
        merged[f'{sample}'].fillna('./.', inplace=True)
    merged = merged[required_vcf_cols]
    merged['ID'].fillna('.', inplace=True)
    pprint.pprint(merged)
    logger.info(f'final_df header {merged.columns} \n final_df shape: {merged.shape}')
    record_result(out_file, merged, required_vcf_cols, samples)


main()