|
|
""" |
|
|
This module provides comprehensive VCF file parsing |
|
|
""" |
|
|
|
|
|
import vcf |
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from collections import defaultdict |
|
|
from typing import Dict, List, Tuple, Optional, Union, Any |
|
|
from dataclasses import dataclass, asdict |
|
|
import pandas as pd |
|
|
|
|
|
from config import DataConfig, ConfigManager |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class MutationRecord: |
|
|
"""Represents a single mutation record with all relevant information.""" |
|
|
|
|
|
chromosome: str |
|
|
position: int |
|
|
reference: str |
|
|
alternate: str |
|
|
impact: str |
|
|
gene_id: str |
|
|
pathway: str |
|
|
sample_id: str |
|
|
quality: Optional[float] = None |
|
|
depth: Optional[int] = None |
|
|
allele_frequency: Optional[float] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to dictionary format.""" |
|
|
return asdict(self) |
|
|
|
|
|
|
|
|
class PathwayMapper: |
|
|
"""Manages pathway mapping data and provides gene-to-pathway lookups.""" |
|
|
|
|
|
def __init__(self, pathway_file: Optional[Union[str, Path]] = None): |
|
|
self.gene_to_pathway: Dict[str, str] = {} |
|
|
self.pathway_to_genes: Dict[str, List[str]] = defaultdict(list) |
|
|
|
|
|
if pathway_file: |
|
|
self.load_pathway_mappings(pathway_file) |
|
|
|
|
|
def load_pathway_mappings(self, pathway_file: Union[str, Path]) -> None: |
|
|
""" |
|
|
Load pathway mappings from file. |
|
|
|
|
|
Expected formats: |
|
|
- JSON: {"GENE1": "pathway1", "GENE2": "pathway2", ...} |
|
|
- TSV: gene\tpathway |
|
|
""" |
|
|
pathway_file = Path(pathway_file) |
|
|
|
|
|
if not pathway_file.exists(): |
|
|
logger.warning(f"Pathway file not found: {pathway_file}") |
|
|
return |
|
|
|
|
|
try: |
|
|
if pathway_file.suffix.lower() == '.json': |
|
|
self._load_json_pathways(pathway_file) |
|
|
elif pathway_file.suffix.lower() in ['.tsv', '.txt']: |
|
|
self._load_tsv_pathways(pathway_file) |
|
|
else: |
|
|
logger.error(f"Unsupported pathway file format: {pathway_file.suffix}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error loading pathway mappings: {e}") |
|
|
raise |
|
|
|
|
|
def _load_json_pathways(self, file_path: Path) -> None: |
|
|
"""Load pathway mappings from JSON file.""" |
|
|
with open(file_path, 'r') as f: |
|
|
self.gene_to_pathway = json.load(f) |
|
|
|
|
|
|
|
|
for gene, pathway in self.gene_to_pathway.items(): |
|
|
self.pathway_to_genes[pathway].append(gene) |
|
|
|
|
|
def _load_tsv_pathways(self, file_path: Path) -> None: |
|
|
"""Load pathway mappings from TSV file.""" |
|
|
df = pd.read_csv(file_path, sep='\t') |
|
|
|
|
|
required_columns = ['gene', 'pathway'] |
|
|
if not all(col in df.columns for col in required_columns): |
|
|
raise ValueError(f"TSV file must contain columns: {required_columns}") |
|
|
|
|
|
for _, row in df.iterrows(): |
|
|
gene = str(row['gene']) |
|
|
pathway = str(row['pathway']) |
|
|
|
|
|
self.gene_to_pathway[gene] = pathway |
|
|
self.pathway_to_genes[pathway].append(gene) |
|
|
|
|
|
def get_pathway(self, gene_id: str) -> str: |
|
|
"""Get pathway for a gene, returns 'Unknown_Pathway' if not found.""" |
|
|
return self.gene_to_pathway.get(gene_id, "Unknown_Pathway") |
|
|
|
|
|
def get_genes_in_pathway(self, pathway: str) -> List[str]: |
|
|
"""Get all genes in a specific pathway.""" |
|
|
return self.pathway_to_genes.get(pathway, []) |
|
|
|
|
|
|
|
|
class VCFParser: |
|
|
""" |
|
|
Comprehensive VCF parser with hierarchical data organization. |
|
|
|
|
|
Parses VCF files and organizes mutations in a hierarchical structure: |
|
|
Sample -> Pathway -> Chromosome -> Gene -> Mutations |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
config: Optional[DataConfig] = None, |
|
|
pathway_mapper: Optional[PathwayMapper] = None): |
|
|
|
|
|
self.config = config or DataConfig() |
|
|
self.pathway_mapper = pathway_mapper or PathwayMapper() |
|
|
|
|
|
|
|
|
self.parsing_stats = { |
|
|
'total_records': 0, |
|
|
'processed_records': 0, |
|
|
'skipped_records': 0, |
|
|
'samples_processed': 0, |
|
|
'unique_genes': set(), |
|
|
'unique_pathways': set() |
|
|
} |
|
|
|
|
|
def parse_vcf_file(self, vcf_file: Union[str, Path]) -> Dict[str, Any]: |
|
|
""" |
|
|
Parse VCF file and return hierarchical mutation data. |
|
|
|
|
|
Returns: |
|
|
Dict with structure: { |
|
|
'sample_id': { |
|
|
'pathway_id': { |
|
|
'chromosome': { |
|
|
'gene_id': [MutationRecord, ...] |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
""" |
|
|
vcf_file = Path(vcf_file) |
|
|
|
|
|
if not vcf_file.exists(): |
|
|
raise FileNotFoundError(f"VCF file not found: {vcf_file}") |
|
|
|
|
|
logger.info(f"Parsing VCF file: {vcf_file}") |
|
|
|
|
|
|
|
|
hierarchical_data = defaultdict( |
|
|
lambda: defaultdict( |
|
|
lambda: defaultdict( |
|
|
lambda: defaultdict( |
|
|
lambda: [] |
|
|
) |
|
|
) |
|
|
) |
|
|
) |
|
|
|
|
|
try: |
|
|
vcf_reader = vcf.Reader(filename=str(vcf_file)) |
|
|
|
|
|
for record in vcf_reader: |
|
|
self.parsing_stats['total_records'] += 1 |
|
|
|
|
|
|
|
|
for sample in record.samples: |
|
|
mutation_record = self._process_vcf_record(record, sample) |
|
|
|
|
|
if mutation_record: |
|
|
|
|
|
sample_id = mutation_record.sample_id |
|
|
pathway = mutation_record.pathway |
|
|
chromosome = mutation_record.chromosome |
|
|
gene_id = mutation_record.gene_id |
|
|
|
|
|
hierarchical_data[sample_id][pathway][chromosome][gene_id].append( |
|
|
mutation_record |
|
|
) |
|
|
|
|
|
|
|
|
self.parsing_stats['processed_records'] += 1 |
|
|
self.parsing_stats['unique_genes'].add(gene_id) |
|
|
self.parsing_stats['unique_pathways'].add(pathway) |
|
|
else: |
|
|
self.parsing_stats['skipped_records'] += 1 |
|
|
|
|
|
self.parsing_stats['samples_processed'] = len(hierarchical_data) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing VCF file: {e}") |
|
|
raise |
|
|
|
|
|
logger.info(f"Parsing completed. Processed {self.parsing_stats['processed_records']} " |
|
|
f"mutations from {self.parsing_stats['samples_processed']} samples") |
|
|
|
|
|
return dict(hierarchical_data) |
|
|
|
|
|
def _process_vcf_record(self, record, sample) -> Optional[MutationRecord]: |
|
|
"""Process a single VCF record and return MutationRecord.""" |
|
|
try: |
|
|
|
|
|
chrom = str(record.CHROM) |
|
|
pos = record.POS |
|
|
ref = record.REF |
|
|
alt = str(record.ALT[0]) if record.ALT else '.' |
|
|
|
|
|
|
|
|
if chrom not in self.config.supported_chromosomes: |
|
|
return None |
|
|
|
|
|
|
|
|
impact = self._extract_impact(record) |
|
|
if impact not in self.config.supported_impacts: |
|
|
impact = "MODERATE" |
|
|
|
|
|
|
|
|
gene_id = self._extract_gene_id(record) |
|
|
if not gene_id: |
|
|
gene_id = "Unknown_Gene" |
|
|
|
|
|
|
|
|
pathway = self.pathway_mapper.get_pathway(gene_id) |
|
|
|
|
|
|
|
|
quality = getattr(record, 'QUAL', None) |
|
|
depth = self._extract_depth(sample) |
|
|
allele_freq = self._extract_allele_frequency(sample) |
|
|
|
|
|
return MutationRecord( |
|
|
chromosome=chrom, |
|
|
position=pos, |
|
|
reference=ref, |
|
|
alternate=alt, |
|
|
impact=impact, |
|
|
gene_id=gene_id, |
|
|
pathway=pathway, |
|
|
sample_id=sample.sample, |
|
|
quality=quality, |
|
|
depth=depth, |
|
|
allele_frequency=allele_freq |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error processing record at {record.CHROM}:{record.POS}: {e}") |
|
|
return None |
|
|
|
|
|
def _extract_gene_id(self, record) -> Optional[str]: |
|
|
"""Extract gene ID directly from VCF record INFO fields.""" |
|
|
|
|
|
gene_fields = ['GENE', 'SYMBOL', 'ANN', 'EFF', 'CSQ', 'GENEINFO'] |
|
|
|
|
|
for field in gene_fields: |
|
|
if field in record.INFO: |
|
|
gene_value = record.INFO[field] |
|
|
if isinstance(gene_value, list): |
|
|
gene_value = gene_value[0] |
|
|
|
|
|
gene_str = str(gene_value) |
|
|
|
|
|
|
|
|
if '|' in gene_str: |
|
|
|
|
|
parts = gene_str.split('|') |
|
|
for part in parts: |
|
|
if part and part not in ['', '.', 'ALLELE', 'Annotation']: |
|
|
return part |
|
|
else: |
|
|
|
|
|
if gene_str and gene_str not in ['', '.']: |
|
|
return gene_str |
|
|
|
|
|
return None |
|
|
|
|
|
def _extract_impact(self, record) -> str: |
|
|
"""Extract variant impact from VCF record.""" |
|
|
|
|
|
impact_fields = ['IMPACT', 'ANN', 'EFF', 'CSQ'] |
|
|
|
|
|
for field in impact_fields: |
|
|
if field in record.INFO: |
|
|
impact_value = record.INFO[field] |
|
|
if isinstance(impact_value, list): |
|
|
impact_value = impact_value[0] |
|
|
|
|
|
|
|
|
for supported_impact in self.config.supported_impacts: |
|
|
if supported_impact in str(impact_value).upper(): |
|
|
return supported_impact |
|
|
|
|
|
return "MODERATE" |
|
|
|
|
|
def _extract_depth(self, sample) -> Optional[int]: |
|
|
"""Extract read depth from sample.""" |
|
|
if hasattr(sample.data, 'DP') and sample.data.DP is not None: |
|
|
return int(sample.data.DP) |
|
|
return None |
|
|
|
|
|
def _extract_allele_frequency(self, sample) -> Optional[float]: |
|
|
"""Extract allele frequency from sample.""" |
|
|
if hasattr(sample.data, 'AF') and sample.data.AF is not None: |
|
|
af = sample.data.AF |
|
|
if isinstance(af, list): |
|
|
af = af[0] |
|
|
return float(af) |
|
|
return None |
|
|
|
|
|
def get_parsing_statistics(self) -> Dict[str, Any]: |
|
|
"""Get detailed parsing statistics.""" |
|
|
stats = self.parsing_stats.copy() |
|
|
stats['unique_genes'] = len(stats['unique_genes']) |
|
|
stats['unique_pathways'] = len(stats['unique_pathways']) |
|
|
stats['success_rate'] = ( |
|
|
stats['processed_records'] / max(stats['total_records'], 1) * 100 |
|
|
) |
|
|
return stats |
|
|
|
|
|
def export_parsed_data(self, |
|
|
hierarchical_data: Dict, |
|
|
output_file: Union[str, Path], |
|
|
format: str = 'json') -> None: |
|
|
""" |
|
|
Export parsed hierarchical data to file. |
|
|
|
|
|
Args: |
|
|
hierarchical_data: Parsed VCF data |
|
|
output_file: Output file path |
|
|
format: Export format ('json', 'pickle') |
|
|
""" |
|
|
output_file = Path(output_file) |
|
|
output_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if format.lower() == 'json': |
|
|
|
|
|
json_data = self._convert_to_json_serializable(hierarchical_data) |
|
|
with open(output_file, 'w') as f: |
|
|
json.dump(json_data, f, indent=2) |
|
|
|
|
|
elif format.lower() == 'pickle': |
|
|
import pickle |
|
|
with open(output_file, 'wb') as f: |
|
|
pickle.dump(hierarchical_data, f) |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unsupported export format: {format}") |
|
|
|
|
|
logger.info(f"Exported parsed data to: {output_file}") |
|
|
|
|
|
def _convert_to_json_serializable(self, data: Dict) -> Dict: |
|
|
"""Convert hierarchical data with MutationRecord objects to JSON-serializable format.""" |
|
|
result = {} |
|
|
|
|
|
for sample_id, pathways in data.items(): |
|
|
result[sample_id] = {} |
|
|
|
|
|
for pathway_id, chromosomes in pathways.items(): |
|
|
result[sample_id][pathway_id] = {} |
|
|
|
|
|
for chrom_id, genes in chromosomes.items(): |
|
|
result[sample_id][pathway_id][chrom_id] = {} |
|
|
|
|
|
for gene_id, mutations in genes.items(): |
|
|
result[sample_id][pathway_id][chrom_id][gene_id] = [ |
|
|
mutation.to_dict() for mutation in mutations |
|
|
] |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def create_parser_from_config(config_manager: ConfigManager) -> VCFParser: |
|
|
"""Create VCF parser from configuration manager.""" |
|
|
config = config_manager.data_config |
|
|
|
|
|
|
|
|
pathway_mapper = None |
|
|
if config.pathway_mapping_path: |
|
|
pathway_mapper = PathwayMapper(config.pathway_mapping_path) |
|
|
|
|
|
return VCFParser( |
|
|
config=config, |
|
|
pathway_mapper=pathway_mapper |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
config_manager = ConfigManager() |
|
|
|
|
|
|
|
|
config_manager.data_config.vcf_file_path = "example.vcf" |
|
|
config_manager.data_config.pathway_mapping_path = "pathway_mappings.json" |
|
|
|
|
|
|
|
|
parser = create_parser_from_config(config_manager) |
|
|
|
|
|
|
|
|
try: |
|
|
hierarchical_data = parser.parse_vcf_file(config_manager.data_config.vcf_file_path) |
|
|
|
|
|
|
|
|
stats = parser.get_parsing_statistics() |
|
|
print(f"Parsing Statistics: {stats}") |
|
|
|
|
|
|
|
|
parser.export_parsed_data( |
|
|
hierarchical_data, |
|
|
"parsed_vcf_data.json", |
|
|
format='json' |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in VCF parsing: {e}") |