""" This module provides comprehensive VCF file parsing """ import vcf import json import logging from pathlib import Path from collections import defaultdict from typing import Dict, List, Tuple, Optional, Union, Any from dataclasses import dataclass, asdict import pandas as pd from config import DataConfig, ConfigManager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class MutationRecord: """Represents a single mutation record with all relevant information.""" chromosome: str position: int reference: str alternate: str impact: str gene_id: str pathway: str sample_id: str quality: Optional[float] = None depth: Optional[int] = None allele_frequency: Optional[float] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary format.""" return asdict(self) class PathwayMapper: """Manages pathway mapping data and provides gene-to-pathway lookups.""" def __init__(self, pathway_file: Optional[Union[str, Path]] = None): self.gene_to_pathway: Dict[str, str] = {} self.pathway_to_genes: Dict[str, List[str]] = defaultdict(list) if pathway_file: self.load_pathway_mappings(pathway_file) def load_pathway_mappings(self, pathway_file: Union[str, Path]) -> None: """ Load pathway mappings from file. Expected formats: - JSON: {"GENE1": "pathway1", "GENE2": "pathway2", ...} - TSV: gene\tpathway """ pathway_file = Path(pathway_file) if not pathway_file.exists(): logger.warning(f"Pathway file not found: {pathway_file}") return try: if pathway_file.suffix.lower() == '.json': self._load_json_pathways(pathway_file) elif pathway_file.suffix.lower() in ['.tsv', '.txt']: self._load_tsv_pathways(pathway_file) else: logger.error(f"Unsupported pathway file format: {pathway_file.suffix}") except Exception as e: logger.error(f"Error loading pathway mappings: {e}") raise def _load_json_pathways(self, file_path: Path) -> None: """Load pathway mappings from JSON file.""" with open(file_path, 'r') as f: self.gene_to_pathway = json.load(f) # Build reverse mapping for gene, pathway in self.gene_to_pathway.items(): self.pathway_to_genes[pathway].append(gene) def _load_tsv_pathways(self, file_path: Path) -> None: """Load pathway mappings from TSV file.""" df = pd.read_csv(file_path, sep='\t') required_columns = ['gene', 'pathway'] if not all(col in df.columns for col in required_columns): raise ValueError(f"TSV file must contain columns: {required_columns}") for _, row in df.iterrows(): gene = str(row['gene']) pathway = str(row['pathway']) self.gene_to_pathway[gene] = pathway self.pathway_to_genes[pathway].append(gene) def get_pathway(self, gene_id: str) -> str: """Get pathway for a gene, returns 'Unknown_Pathway' if not found.""" return self.gene_to_pathway.get(gene_id, "Unknown_Pathway") def get_genes_in_pathway(self, pathway: str) -> List[str]: """Get all genes in a specific pathway.""" return self.pathway_to_genes.get(pathway, []) class VCFParser: """ Comprehensive VCF parser with hierarchical data organization. Parses VCF files and organizes mutations in a hierarchical structure: Sample -> Pathway -> Chromosome -> Gene -> Mutations """ def __init__(self, config: Optional[DataConfig] = None, pathway_mapper: Optional[PathwayMapper] = None): self.config = config or DataConfig() self.pathway_mapper = pathway_mapper or PathwayMapper() # Statistics tracking self.parsing_stats = { 'total_records': 0, 'processed_records': 0, 'skipped_records': 0, 'samples_processed': 0, 'unique_genes': set(), 'unique_pathways': set() } def parse_vcf_file(self, vcf_file: Union[str, Path]) -> Dict[str, Any]: """ Parse VCF file and return hierarchical mutation data. Returns: Dict with structure: { 'sample_id': { 'pathway_id': { 'chromosome': { 'gene_id': [MutationRecord, ...] } } } } """ vcf_file = Path(vcf_file) if not vcf_file.exists(): raise FileNotFoundError(f"VCF file not found: {vcf_file}") logger.info(f"Parsing VCF file: {vcf_file}") # Initialize hierarchical data structure hierarchical_data = defaultdict( lambda: defaultdict( # samples lambda: defaultdict( # pathways lambda: defaultdict( # chromosomes lambda: [] # genes -> list of mutations ) ) ) ) try: vcf_reader = vcf.Reader(filename=str(vcf_file)) for record in vcf_reader: self.parsing_stats['total_records'] += 1 # Process each sample in the record for sample in record.samples: mutation_record = self._process_vcf_record(record, sample) if mutation_record: # Add to hierarchical structure sample_id = mutation_record.sample_id pathway = mutation_record.pathway chromosome = mutation_record.chromosome gene_id = mutation_record.gene_id hierarchical_data[sample_id][pathway][chromosome][gene_id].append( mutation_record ) # Update statistics self.parsing_stats['processed_records'] += 1 self.parsing_stats['unique_genes'].add(gene_id) self.parsing_stats['unique_pathways'].add(pathway) else: self.parsing_stats['skipped_records'] += 1 self.parsing_stats['samples_processed'] = len(hierarchical_data) except Exception as e: logger.error(f"Error parsing VCF file: {e}") raise logger.info(f"Parsing completed. Processed {self.parsing_stats['processed_records']} " f"mutations from {self.parsing_stats['samples_processed']} samples") return dict(hierarchical_data) def _process_vcf_record(self, record, sample) -> Optional[MutationRecord]: """Process a single VCF record and return MutationRecord.""" try: # Extract basic variant information chrom = str(record.CHROM) pos = record.POS ref = record.REF alt = str(record.ALT[0]) if record.ALT else '.' # Validate chromosome if chrom not in self.config.supported_chromosomes: return None # Extract impact information impact = self._extract_impact(record) if impact not in self.config.supported_impacts: impact = "MODERATE" # Default impact # Extract gene information directly from VCF record gene_id = self._extract_gene_id(record) if not gene_id: gene_id = "Unknown_Gene" # Get pathway information pathway = self.pathway_mapper.get_pathway(gene_id) # Extract additional sample information quality = getattr(record, 'QUAL', None) depth = self._extract_depth(sample) allele_freq = self._extract_allele_frequency(sample) return MutationRecord( chromosome=chrom, position=pos, reference=ref, alternate=alt, impact=impact, gene_id=gene_id, pathway=pathway, sample_id=sample.sample, quality=quality, depth=depth, allele_frequency=allele_freq ) except Exception as e: logger.warning(f"Error processing record at {record.CHROM}:{record.POS}: {e}") return None def _extract_gene_id(self, record) -> Optional[str]: """Extract gene ID directly from VCF record INFO fields.""" # Try different fields where gene information might be stored gene_fields = ['GENE', 'SYMBOL', 'ANN', 'EFF', 'CSQ', 'GENEINFO'] for field in gene_fields: if field in record.INFO: gene_value = record.INFO[field] if isinstance(gene_value, list): gene_value = gene_value[0] gene_str = str(gene_value) # Parse gene from annotation strings (e.g., ANN, CSQ) if '|' in gene_str: # Common annotation format: ALLELE|Annotation|... parts = gene_str.split('|') for part in parts: if part and part not in ['', '.', 'ALLELE', 'Annotation']: return part else: # Direct gene value if gene_str and gene_str not in ['', '.']: return gene_str return None def _extract_impact(self, record) -> str: """Extract variant impact from VCF record.""" # Try different fields where impact might be stored impact_fields = ['IMPACT', 'ANN', 'EFF', 'CSQ'] for field in impact_fields: if field in record.INFO: impact_value = record.INFO[field] if isinstance(impact_value, list): impact_value = impact_value[0] # Parse impact from annotation strings for supported_impact in self.config.supported_impacts: if supported_impact in str(impact_value).upper(): return supported_impact return "MODERATE" # Default impact def _extract_depth(self, sample) -> Optional[int]: """Extract read depth from sample.""" if hasattr(sample.data, 'DP') and sample.data.DP is not None: return int(sample.data.DP) return None def _extract_allele_frequency(self, sample) -> Optional[float]: """Extract allele frequency from sample.""" if hasattr(sample.data, 'AF') and sample.data.AF is not None: af = sample.data.AF if isinstance(af, list): af = af[0] return float(af) return None def get_parsing_statistics(self) -> Dict[str, Any]: """Get detailed parsing statistics.""" stats = self.parsing_stats.copy() stats['unique_genes'] = len(stats['unique_genes']) stats['unique_pathways'] = len(stats['unique_pathways']) stats['success_rate'] = ( stats['processed_records'] / max(stats['total_records'], 1) * 100 ) return stats def export_parsed_data(self, hierarchical_data: Dict, output_file: Union[str, Path], format: str = 'json') -> None: """ Export parsed hierarchical data to file. Args: hierarchical_data: Parsed VCF data output_file: Output file path format: Export format ('json', 'pickle') """ output_file = Path(output_file) output_file.parent.mkdir(parents=True, exist_ok=True) if format.lower() == 'json': # Convert MutationRecord objects to dictionaries for JSON serialization json_data = self._convert_to_json_serializable(hierarchical_data) with open(output_file, 'w') as f: json.dump(json_data, f, indent=2) elif format.lower() == 'pickle': import pickle with open(output_file, 'wb') as f: pickle.dump(hierarchical_data, f) else: raise ValueError(f"Unsupported export format: {format}") logger.info(f"Exported parsed data to: {output_file}") def _convert_to_json_serializable(self, data: Dict) -> Dict: """Convert hierarchical data with MutationRecord objects to JSON-serializable format.""" result = {} for sample_id, pathways in data.items(): result[sample_id] = {} for pathway_id, chromosomes in pathways.items(): result[sample_id][pathway_id] = {} for chrom_id, genes in chromosomes.items(): result[sample_id][pathway_id][chrom_id] = {} for gene_id, mutations in genes.items(): result[sample_id][pathway_id][chrom_id][gene_id] = [ mutation.to_dict() for mutation in mutations ] return result def create_parser_from_config(config_manager: ConfigManager) -> VCFParser: """Create VCF parser from configuration manager.""" config = config_manager.data_config # Initialize pathway mapper pathway_mapper = None if config.pathway_mapping_path: pathway_mapper = PathwayMapper(config.pathway_mapping_path) return VCFParser( config=config, pathway_mapper=pathway_mapper ) # Example usage if __name__ == "__main__": # Example usage with configuration config_manager = ConfigManager() # Set up file paths config_manager.data_config.vcf_file_path = "example.vcf" config_manager.data_config.pathway_mapping_path = "pathway_mappings.json" # Create parser parser = create_parser_from_config(config_manager) # Parse VCF file try: hierarchical_data = parser.parse_vcf_file(config_manager.data_config.vcf_file_path) # Print statistics stats = parser.get_parsing_statistics() print(f"Parsing Statistics: {stats}") # Export results parser.export_parsed_data( hierarchical_data, "parsed_vcf_data.json", format='json' ) except Exception as e: logger.error(f"Error in VCF parsing: {e}")