abd-ur commited on
Commit
475dbbd
·
verified ·
1 Parent(s): 052501e

Update VCFparser.py

Browse files
Files changed (1) hide show
  1. VCFparser.py +51 -102
VCFparser.py CHANGED
@@ -1,5 +1,5 @@
1
  """
2
- VCF data parsing with support for gene annotations, pathway mappings, and hierarchical data organization.
3
  """
4
 
5
  import vcf
@@ -21,6 +21,7 @@ logger = logging.getLogger(__name__)
21
 
22
  @dataclass
23
  class MutationRecord:
 
24
 
25
  chromosome: str
26
  position: int
@@ -35,96 +36,12 @@ class MutationRecord:
35
  allele_frequency: Optional[float] = None
36
 
37
  def to_dict(self) -> Dict[str, Any]:
 
38
  return asdict(self)
39
 
40
 
41
- class GeneAnnotationManager:
42
- """Optional"""
43
-
44
- def __init__(self, annotation_file: Optional[Union[str, Path]] = None):
45
- self.annotations: Dict[Tuple[str, int], Dict[str, str]] = {}
46
- self.gene_to_coords: Dict[str, List[Tuple[str, int]]] = defaultdict(list)
47
-
48
- if annotation_file:
49
- self.load_annotations(annotation_file)
50
-
51
- def load_annotations(self, annotation_file: Union[str, Path]) -> None:
52
- """
53
- Expected formats:
54
- - JSON: {(chrom, pos): {"gene": "GENE1", "feature": "exon", ...}}
55
- - TSV: chrom\tpos\tgene\tfeature\t...
56
- """
57
- annotation_file = Path(annotation_file)
58
-
59
- if not annotation_file.exists():
60
- logger.warning(f"Annotation file not found: {annotation_file}")
61
- return
62
-
63
- try:
64
- if annotation_file.suffix.lower() == '.json':
65
- self._load_json_annotations(annotation_file)
66
- elif annotation_file.suffix.lower() in ['.tsv', '.txt']:
67
- self._load_tsv_annotations(annotation_file)
68
- else:
69
- logger.error(f"Unsupported annotation file format: {annotation_file.suffix}")
70
-
71
- except Exception as e:
72
- logger.error(f"Error loading annotations: {e}")
73
- raise
74
-
75
- def _load_json_annotations(self, file_path: Path) -> None:
76
- """Load annotations from JSON file."""
77
- with open(file_path, 'r') as f:
78
- data = json.load(f)
79
-
80
- for key, annotation in data.items():
81
- if isinstance(key, str):
82
- # Parse string key like "chr1:12345"
83
- chrom, pos = key.split(':')
84
- pos = int(pos)
85
- else:
86
- chrom, pos = key
87
-
88
- self.annotations[(chrom, pos)] = annotation
89
- gene_id = annotation.get('gene')
90
- if gene_id:
91
- self.gene_to_coords[gene_id].append((chrom, pos))
92
-
93
- def _load_tsv_annotations(self, file_path: Path) -> None:
94
- """Load annotations from TSV file."""
95
- df = pd.read_csv(file_path, sep='\t')
96
-
97
- required_columns = ['chrom', 'pos', 'gene']
98
- if not all(col in df.columns for col in required_columns):
99
- raise ValueError(f"TSV file must contain columns: {required_columns}")
100
-
101
- for _, row in df.iterrows():
102
- chrom = str(row['chrom'])
103
- pos = int(row['pos'])
104
- annotation = row.to_dict()
105
-
106
- self.annotations[(chrom, pos)] = annotation
107
- gene_id = annotation.get('gene')
108
- if gene_id:
109
- self.gene_to_coords[gene_id].append((chrom, pos))
110
-
111
- def get_gene_annotation(self, chrom: str, pos: int) -> Optional[Dict[str, str]]:
112
- """Get gene annotation for a specific genomic position."""
113
- return self.annotations.get((chrom, pos))
114
-
115
- def get_genes_in_region(self, chrom: str, start: int, end: int) -> List[str]:
116
- """Get all genes in a genomic region."""
117
- genes = []
118
- for (annot_chrom, annot_pos), annotation in self.annotations.items():
119
- if annot_chrom == chrom and start <= annot_pos <= end:
120
- gene_id = annotation.get('gene')
121
- if gene_id and gene_id not in genes:
122
- genes.append(gene_id)
123
- return genes
124
-
125
-
126
  class PathwayMapper:
127
- """Optional"""
128
 
129
  def __init__(self, pathway_file: Optional[Union[str, Path]] = None):
130
  self.gene_to_pathway: Dict[str, str] = {}
@@ -135,6 +52,8 @@ class PathwayMapper:
135
 
136
  def load_pathway_mappings(self, pathway_file: Union[str, Path]) -> None:
137
  """
 
 
138
  Expected formats:
139
  - JSON: {"GENE1": "pathway1", "GENE2": "pathway2", ...}
140
  - TSV: gene\tpathway
@@ -192,17 +111,17 @@ class PathwayMapper:
192
 
193
  class VCFParser:
194
  """
 
 
195
  Parses VCF files and organizes mutations in a hierarchical structure:
196
  Sample -> Pathway -> Chromosome -> Gene -> Mutations
197
  """
198
 
199
  def __init__(self,
200
  config: Optional[DataConfig] = None,
201
- gene_annotator: Optional[GeneAnnotationManager] = None,
202
  pathway_mapper: Optional[PathwayMapper] = None):
203
 
204
  self.config = config or DataConfig()
205
- self.gene_annotator = gene_annotator or GeneAnnotationManager()
206
  self.pathway_mapper = pathway_mapper or PathwayMapper()
207
 
208
  # Statistics tracking
@@ -217,7 +136,9 @@ class VCFParser:
217
 
218
  def parse_vcf_file(self, vcf_file: Union[str, Path]) -> Dict[str, Any]:
219
  """
220
- Returns:
 
 
221
  Dict with structure: {
222
  'sample_id': {
223
  'pathway_id': {
@@ -303,12 +224,10 @@ class VCFParser:
303
  if impact not in self.config.supported_impacts:
304
  impact = "MODERATE" # Default impact
305
 
306
- # Get gene annotation
307
- gene_annotation = self.gene_annotator.get_gene_annotation(chrom, pos)
308
- if not gene_annotation:
309
- return None # Skip variants without gene annotation
310
-
311
- gene_id = gene_annotation.get('gene', 'Unknown_Gene')
312
 
313
  # Get pathway information
314
  pathway = self.pathway_mapper.get_pathway(gene_id)
@@ -336,6 +255,33 @@ class VCFParser:
336
  logger.warning(f"Error processing record at {record.CHROM}:{record.POS}: {e}")
337
  return None
338
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
339
  def _extract_impact(self, record) -> str:
340
  """Extract variant impact from VCF record."""
341
  # Try different fields where impact might be stored
@@ -435,35 +381,38 @@ def create_parser_from_config(config_manager: ConfigManager) -> VCFParser:
435
  """Create VCF parser from configuration manager."""
436
  config = config_manager.data_config
437
 
438
- gene_annotator = None
439
- if config.gene_annotation_path:
440
- gene_annotator = GeneAnnotationManager(config.gene_annotation_path)
441
-
442
  pathway_mapper = None
443
  if config.pathway_mapping_path:
444
  pathway_mapper = PathwayMapper(config.pathway_mapping_path)
445
 
446
  return VCFParser(
447
  config=config,
448
- gene_annotator=gene_annotator,
449
  pathway_mapper=pathway_mapper
450
  )
451
 
 
 
452
  if __name__ == "__main__":
 
453
  config_manager = ConfigManager()
454
 
 
455
  config_manager.data_config.vcf_file_path = "example.vcf"
456
- config_manager.data_config.gene_annotation_path = "gene_annotations.json"
457
  config_manager.data_config.pathway_mapping_path = "pathway_mappings.json"
458
 
 
459
  parser = create_parser_from_config(config_manager)
460
 
 
461
  try:
462
  hierarchical_data = parser.parse_vcf_file(config_manager.data_config.vcf_file_path)
463
 
 
464
  stats = parser.get_parsing_statistics()
465
  print(f"Parsing Statistics: {stats}")
466
 
 
467
  parser.export_parsed_data(
468
  hierarchical_data,
469
  "parsed_vcf_data.json",
 
1
  """
2
+ This module provides comprehensive VCF file parsing
3
  """
4
 
5
  import vcf
 
21
 
22
  @dataclass
23
  class MutationRecord:
24
+ """Represents a single mutation record with all relevant information."""
25
 
26
  chromosome: str
27
  position: int
 
36
  allele_frequency: Optional[float] = None
37
 
38
  def to_dict(self) -> Dict[str, Any]:
39
+ """Convert to dictionary format."""
40
  return asdict(self)
41
 
42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  class PathwayMapper:
44
+ """Manages pathway mapping data and provides gene-to-pathway lookups."""
45
 
46
  def __init__(self, pathway_file: Optional[Union[str, Path]] = None):
47
  self.gene_to_pathway: Dict[str, str] = {}
 
52
 
53
  def load_pathway_mappings(self, pathway_file: Union[str, Path]) -> None:
54
  """
55
+ Load pathway mappings from file.
56
+
57
  Expected formats:
58
  - JSON: {"GENE1": "pathway1", "GENE2": "pathway2", ...}
59
  - TSV: gene\tpathway
 
111
 
112
  class VCFParser:
113
  """
114
+ Comprehensive VCF parser with hierarchical data organization.
115
+
116
  Parses VCF files and organizes mutations in a hierarchical structure:
117
  Sample -> Pathway -> Chromosome -> Gene -> Mutations
118
  """
119
 
120
  def __init__(self,
121
  config: Optional[DataConfig] = None,
 
122
  pathway_mapper: Optional[PathwayMapper] = None):
123
 
124
  self.config = config or DataConfig()
 
125
  self.pathway_mapper = pathway_mapper or PathwayMapper()
126
 
127
  # Statistics tracking
 
136
 
137
  def parse_vcf_file(self, vcf_file: Union[str, Path]) -> Dict[str, Any]:
138
  """
139
+ Parse VCF file and return hierarchical mutation data.
140
+
141
+ Returns:
142
  Dict with structure: {
143
  'sample_id': {
144
  'pathway_id': {
 
224
  if impact not in self.config.supported_impacts:
225
  impact = "MODERATE" # Default impact
226
 
227
+ # Extract gene information directly from VCF record
228
+ gene_id = self._extract_gene_id(record)
229
+ if not gene_id:
230
+ gene_id = "Unknown_Gene"
 
 
231
 
232
  # Get pathway information
233
  pathway = self.pathway_mapper.get_pathway(gene_id)
 
255
  logger.warning(f"Error processing record at {record.CHROM}:{record.POS}: {e}")
256
  return None
257
 
258
+ def _extract_gene_id(self, record) -> Optional[str]:
259
+ """Extract gene ID directly from VCF record INFO fields."""
260
+ # Try different fields where gene information might be stored
261
+ gene_fields = ['GENE', 'SYMBOL', 'ANN', 'EFF', 'CSQ', 'GENEINFO']
262
+
263
+ for field in gene_fields:
264
+ if field in record.INFO:
265
+ gene_value = record.INFO[field]
266
+ if isinstance(gene_value, list):
267
+ gene_value = gene_value[0]
268
+
269
+ gene_str = str(gene_value)
270
+
271
+ # Parse gene from annotation strings (e.g., ANN, CSQ)
272
+ if '|' in gene_str:
273
+ # Common annotation format: ALLELE|Annotation|...
274
+ parts = gene_str.split('|')
275
+ for part in parts:
276
+ if part and part not in ['', '.', 'ALLELE', 'Annotation']:
277
+ return part
278
+ else:
279
+ # Direct gene value
280
+ if gene_str and gene_str not in ['', '.']:
281
+ return gene_str
282
+
283
+ return None
284
+
285
  def _extract_impact(self, record) -> str:
286
  """Extract variant impact from VCF record."""
287
  # Try different fields where impact might be stored
 
381
  """Create VCF parser from configuration manager."""
382
  config = config_manager.data_config
383
 
384
+ # Initialize pathway mapper
 
 
 
385
  pathway_mapper = None
386
  if config.pathway_mapping_path:
387
  pathway_mapper = PathwayMapper(config.pathway_mapping_path)
388
 
389
  return VCFParser(
390
  config=config,
 
391
  pathway_mapper=pathway_mapper
392
  )
393
 
394
+
395
+ # Example usage
396
  if __name__ == "__main__":
397
+ # Example usage with configuration
398
  config_manager = ConfigManager()
399
 
400
+ # Set up file paths
401
  config_manager.data_config.vcf_file_path = "example.vcf"
 
402
  config_manager.data_config.pathway_mapping_path = "pathway_mappings.json"
403
 
404
+ # Create parser
405
  parser = create_parser_from_config(config_manager)
406
 
407
+ # Parse VCF file
408
  try:
409
  hierarchical_data = parser.parse_vcf_file(config_manager.data_config.vcf_file_path)
410
 
411
+ # Print statistics
412
  stats = parser.get_parsing_statistics()
413
  print(f"Parsing Statistics: {stats}")
414
 
415
+ # Export results
416
  parser.export_parsed_data(
417
  hierarchical_data,
418
  "parsed_vcf_data.json",