#!/usr/bin/env python import pandas as pd from loguru import logger import pprint import os pd.set_option('display.max_columns', None) """ to get script working: - fill required vars in .env - run in terminal export $(grep -v '^#' .env | xargs) """ standard_vcf_cols = ['#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT'] def record_result(vcf_out_path, vcf_df, required_vcf_cols, samples): new_info_header = [ '##fileformat=VCFv4.2', '##FILTER=', '##filedate=20230718', '##source="beagle.22Jul22.46e.jar"', '##INFO=', '##INFO=', '##INFO=', '##FORMAT=', '##FORMAT=', '##contig=', '##INFO=', '##INFO=' ] with open(vcf_out_path, 'w') as vcf_file: for s in new_info_header: vcf_file.write(s + '\n') cols_header = '\t'.join(required_vcf_cols) vcf_file.write(f'{cols_header}\n') for _, row in vcf_df.iterrows(): chrom = row['#CHROM'] pos = row['POS'] id = row['ID'] ref = row['REF'] alt = row['ALT'] qual = row['QUAL'] filter = row['FILTER'] info = row['INFO'] format = row['FORMAT'] genotypes = list() for sample in samples: sample_genotype = row[f'{sample}'] genotypes.append(sample_genotype) genotypes_str = '\t'.join(genotypes) vcf_record = f'{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{info}\t{format}\t{genotypes_str}\n' vcf_file.write(vcf_record) print('vcf recorded') def convert_columns(snp_df, vcf_df, required_vcf_cols, samples): global standard_vcf_cols snp_df.rename(columns={'CHR': '#CHROM', 'bp': 'POS', 'allele1': 'REF', 'allele2': 'ALT'}, inplace=True) vcf_df.rename(columns={'#[1]CHROM': '#CHROM', '[2]POS': 'POS', '[3]ID': 'ID', '[4]REF': 'REF', '[5]ALT': 'ALT', '[6](null)': 'INFO'}, inplace=True) for idx, sample in enumerate(samples): vcf_df.rename(columns={f'[{idx+7}]{sample}:GT': f'{sample}'}, inplace=True) snp_df[f'{sample}'] = None logger.info(f'snp_df.cols {snp_df.columns}') snp_df['#CHROM'] = snp_df['#CHROM'].astype(str) vcf_df['#CHROM'] = vcf_df['#CHROM'].astype(str) snp_df['POS'] = snp_df['POS'].astype(str) vcf_df['POS'] = vcf_df['POS'].astype(str) snp_df['REF'] = snp_df['REF'].astype(str) vcf_df['REF'] = vcf_df['REF'].astype(str) snp_df['ALT'] = snp_df['ALT'].astype(str) vcf_df['ALT'] = vcf_df['ALT'].astype(str) vcf_df['INFO'] = vcf_df['INFO'].astype(str) snp_df['ID'] = None snp_df['QUAL'] = '.' snp_df['FILTER'] = 'PASS' snp_df['INFO'] = '.' snp_df['FORMAT'] = 'GT' excess_cols = [col for col in snp_df.columns if col not in required_vcf_cols] snp_df.drop(excess_cols, axis=1, inplace=True) return snp_df, vcf_df def main(): input_snp = os.getenv('input_snp_path') # Annotations/UKBiobank/Input_SNPs_refseq.csv out_file = os.getenv('fitted_vcf') """ target_vcf_snps - .csv file with necessary info from VCF, can be obtain by bcftools query -H -f '%CHROM\t%POS\t%ID\t%REF\t%ALT\t%INFO\t[%GT\t]\n' example.vcf.gz > example_snps.csv """ target_vcf_snps = os.getenv('target_vcf_snps') samples = os.getenv('samples_to_extract').split(',') logger.info(f'samples: {samples}') vcf_df = pd.read_csv(target_vcf_snps, delimiter='\t', dtype={'#[1]CHROM': str}) snp_df = pd.read_csv(input_snp, delimiter=',') required_vcf_cols = standard_vcf_cols + samples logger.info(f'required cols {required_vcf_cols}, snp_df.cols {snp_df.columns}') snp_df, vcf_df = convert_columns(snp_df, vcf_df, required_vcf_cols, samples) logger.info( f'after convert snp_df shape {snp_df.shape}, {snp_df.columns}\n, fit_vcf shape {vcf_df.shape}, {vcf_df.columns}') merged = snp_df.merge(vcf_df, on=['#CHROM', 'POS', 'REF', 'ALT'], how='left', suffixes=('_snp', '_vcf')) merged.rename(columns={'ID_vcf': 'ID', 'INFO_snp': 'INFO'}, inplace=True) for sample in samples: merged.rename(columns={f'{sample}_vcf': f'{sample}'}, inplace=True) merged[f'{sample}'].fillna('./.', inplace=True) merged = merged[required_vcf_cols] merged['ID'].fillna('.', inplace=True) pprint.pprint(merged) logger.info(f'final_df header {merged.columns} \n final_df shape: {merged.shape}') record_result(out_file, merged, required_vcf_cols, samples) main()