"""Feature extractor for SV-SPR confidence model. Caller-agnostic feature set (11 features): 1. svlen_abs length, universal 2. log10_svlen log-scaled length 3. svtype_DEL/INS/DUP/BND one-hot type 4. total_alt_support generic alt read support 5. gc_flank_w100 GC fraction in ±100bp window around breakpoint 6. at_flank_w100 AT fraction in flank 7. gc_inner_w100 GC fraction inside the called region (DEL/DUP) 8. n_motif_2_w100 tandem dinuc count in flank 9. n_motif_3_w100 tandem trinuc count in flank Inputs required from caller (VCF): - chrom, pos, end (or svlen for INS) - svtype: DEL / INS / DUP / BND - total_alt_support: generic, derived from any caller's AD/PR/SR field If absent, set to 0 (the model can still score on the remaining features but performance may drop ~0.014 F1 — see T01d analysis). """ from __future__ import annotations from dataclasses import dataclass from typing import Iterable, Mapping, Optional import numpy as np import pandas as pd import pysam WINDOW = 100 # ±100 bp flank FEATURE_COLS = [ 'svlen_abs', 'log10_svlen', 'svtype_DEL', 'svtype_INS', 'svtype_DUP', 'svtype_BND', 'total_alt_support', 'gc_flank_w100', 'at_flank_w100', 'gc_inner_w100', 'n_motif_2_w100', 'n_motif_3_w100', ] @dataclass class SVCall: """One structural variant call to score. Attributes ---------- chrom : str pos : int 1-based start (VCF convention) end : int 1-based end (= pos for INS/BND, or pos+svlen for DEL/DUP) svtype : str "DEL" | "INS" | "DUP" | "BND" svlen : int Absolute length of the variant total_alt_support : float Alt read support count from any caller. 0 if unavailable. """ chrom: str pos: int end: int svtype: str svlen: int total_alt_support: float = 0.0 def _seq_features(fa: pysam.FastaFile, chrom: str, pos: int, end: int, win: int = WINDOW) -> dict: """Compute the 5 sequence-context features for one breakpoint pair.""" try: left = fa.fetch(chrom, max(0, pos - win), pos).upper() right = fa.fetch(chrom, end, end + win).upper() if 0 < end - pos < 5000: inner = fa.fetch(chrom, pos, end).upper() else: inner = '' except (ValueError, KeyError): return {f'gc_flank_w{win}': np.nan, f'at_flank_w{win}': np.nan, f'gc_inner_w{win}': np.nan, f'n_motif_2_w{win}': np.nan, f'n_motif_3_w{win}': np.nan} flank = left + right n_flank = max(len(flank), 1) gc_flank = (flank.count('G') + flank.count('C')) / n_flank at_flank = (flank.count('A') + flank.count('T')) / n_flank if inner: gc_inner = (inner.count('G') + inner.count('C')) / max(len(inner), 1) else: gc_inner = gc_flank # fallback n_motif_2 = 0 n_motif_3 = 0 for i in range(len(flank) - 5): if flank[i:i+2] == flank[i+2:i+4] == flank[i+4:i+6]: n_motif_2 += 1 if i + 9 <= len(flank) and flank[i:i+3] == flank[i+3:i+6] == flank[i+6:i+9]: n_motif_3 += 1 return { f'gc_flank_w{win}': gc_flank, f'at_flank_w{win}': at_flank, f'gc_inner_w{win}': gc_inner, f'n_motif_2_w{win}': n_motif_2, f'n_motif_3_w{win}': n_motif_3, } def extract_one(call: SVCall, fa: pysam.FastaFile) -> dict: """Compute all 11 features for one SV call.""" svlen = abs(int(call.svlen)) if call.svlen else 0 svtype = call.svtype.upper() if call.svtype else 'BND' row = { 'svlen_abs': svlen, 'log10_svlen': np.log10(svlen + 1), 'svtype_DEL': int(svtype == 'DEL'), 'svtype_INS': int(svtype == 'INS'), 'svtype_DUP': int(svtype == 'DUP'), 'svtype_BND': int(svtype == 'BND'), 'total_alt_support': float(call.total_alt_support or 0.0), } seq = _seq_features(fa, call.chrom, int(call.pos), int(call.end)) row.update(seq) return row def extract_batch(calls: Iterable[SVCall], fasta_path: str) -> pd.DataFrame: """Compute features for a batch of SV calls. Returns a DataFrame in FEATURE_COLS order.""" fa = pysam.FastaFile(fasta_path) try: rows = [extract_one(c, fa) for c in calls] finally: fa.close() df = pd.DataFrame(rows) return df.reindex(columns=FEATURE_COLS).fillna(0) def from_vcf(vcf_path: str, fasta_path: str, alt_support_field: Optional[str] = None) -> pd.DataFrame: """Parse a VCF file and produce a feature matrix. Parameters ---------- vcf_path : str fasta_path : str alt_support_field : Optional[str] Format field name to derive total_alt_support. Falls back to PR/SR for Manta, AD for Delly, or 0 if not found. Common values: "PR", "SR", "AD". """ try: vcf = pysam.VariantFile(vcf_path) except Exception as e: raise IOError(f'Cannot open VCF {vcf_path}: {e}') calls = [] coords = [] for rec in vcf.fetch(): chrom = rec.chrom pos = int(rec.pos) info = rec.info svtype = (info.get('SVTYPE') or '').upper() if info else '' if not svtype: continue end = int(info.get('END') or pos) svlen = info.get('SVLEN') if isinstance(svlen, tuple): svlen = svlen[0] if svlen is None: svlen = end - pos svlen = abs(int(svlen)) alt_support = _derive_alt_support(rec, alt_support_field) calls.append(SVCall(chrom=chrom, pos=pos, end=end, svtype=svtype, svlen=svlen, total_alt_support=alt_support)) coords.append({'chrom': chrom, 'pos': pos, 'end': end, 'svtype': svtype, 'svlen': svlen}) vcf.close() feat_df = extract_batch(calls, fasta_path) coord_df = pd.DataFrame(coords) return pd.concat([coord_df.reset_index(drop=True), feat_df.reset_index(drop=True)], axis=1) def _derive_alt_support(rec, field: Optional[str]) -> float: """Best-effort: try the requested field, then PR/SR (Manta), then AD.""" if not rec.samples: return 0.0 sample = list(rec.samples.values())[0] if field: v = sample.get(field) if v is not None: return _last_int(v) pr = sample.get('PR') sr = sample.get('SR') total = 0.0 if pr is not None: total += _last_int(pr) if sr is not None: total += _last_int(sr) if total > 0: return total ad = sample.get('AD') if ad is not None: return _last_int(ad) return 0.0 def _last_int(v) -> int: """Take the alt (last) value from an int/tuple. Used for Manta's PR=(ref,alt) tuples.""" if isinstance(v, (tuple, list)): return int(v[-1]) if v else 0 try: return int(v) except (TypeError, ValueError): return 0