SVSTR-Score / seqonly /src /features.py
khyeom's picture
Add sequence-only headline model (svspr_v14_seq, 11-feature) + inference package
90d0b4b verified
Raw
History Blame Contribute Delete
7 kB
"""Feature extractor for SV-SPR confidence model.
Caller-agnostic feature set (11 features):
1. svlen_abs length, universal
2. log10_svlen log-scaled length
3. svtype_DEL/INS/DUP/BND one-hot type
4. total_alt_support generic alt read support
5. gc_flank_w100 GC fraction in ±100bp window around breakpoint
6. at_flank_w100 AT fraction in flank
7. gc_inner_w100 GC fraction inside the called region (DEL/DUP)
8. n_motif_2_w100 tandem dinuc count in flank
9. n_motif_3_w100 tandem trinuc count in flank
Inputs required from caller (VCF):
- chrom, pos, end (or svlen for INS)
- svtype: DEL / INS / DUP / BND
- total_alt_support: generic, derived from any caller's AD/PR/SR field
If absent, set to 0 (the model can still score on the remaining features
but performance may drop ~0.014 F1 — see T01d analysis).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Mapping, Optional
import numpy as np
import pandas as pd
import pysam
WINDOW = 100 # ±100 bp flank
FEATURE_COLS = [
'svlen_abs', 'log10_svlen',
'svtype_DEL', 'svtype_INS', 'svtype_DUP', 'svtype_BND',
'total_alt_support',
'gc_flank_w100', 'at_flank_w100', 'gc_inner_w100',
'n_motif_2_w100', 'n_motif_3_w100',
]
@dataclass
class SVCall:
"""One structural variant call to score.
Attributes
----------
chrom : str
pos : int 1-based start (VCF convention)
end : int 1-based end (= pos for INS/BND, or pos+svlen for DEL/DUP)
svtype : str "DEL" | "INS" | "DUP" | "BND"
svlen : int Absolute length of the variant
total_alt_support : float
Alt read support count from any caller. 0 if unavailable.
"""
chrom: str
pos: int
end: int
svtype: str
svlen: int
total_alt_support: float = 0.0
def _seq_features(fa: pysam.FastaFile, chrom: str, pos: int, end: int,
win: int = WINDOW) -> dict:
"""Compute the 5 sequence-context features for one breakpoint pair."""
try:
left = fa.fetch(chrom, max(0, pos - win), pos).upper()
right = fa.fetch(chrom, end, end + win).upper()
if 0 < end - pos < 5000:
inner = fa.fetch(chrom, pos, end).upper()
else:
inner = ''
except (ValueError, KeyError):
return {f'gc_flank_w{win}': np.nan,
f'at_flank_w{win}': np.nan,
f'gc_inner_w{win}': np.nan,
f'n_motif_2_w{win}': np.nan,
f'n_motif_3_w{win}': np.nan}
flank = left + right
n_flank = max(len(flank), 1)
gc_flank = (flank.count('G') + flank.count('C')) / n_flank
at_flank = (flank.count('A') + flank.count('T')) / n_flank
if inner:
gc_inner = (inner.count('G') + inner.count('C')) / max(len(inner), 1)
else:
gc_inner = gc_flank # fallback
n_motif_2 = 0
n_motif_3 = 0
for i in range(len(flank) - 5):
if flank[i:i+2] == flank[i+2:i+4] == flank[i+4:i+6]:
n_motif_2 += 1
if i + 9 <= len(flank) and flank[i:i+3] == flank[i+3:i+6] == flank[i+6:i+9]:
n_motif_3 += 1
return {
f'gc_flank_w{win}': gc_flank,
f'at_flank_w{win}': at_flank,
f'gc_inner_w{win}': gc_inner,
f'n_motif_2_w{win}': n_motif_2,
f'n_motif_3_w{win}': n_motif_3,
}
def extract_one(call: SVCall, fa: pysam.FastaFile) -> dict:
"""Compute all 11 features for one SV call."""
svlen = abs(int(call.svlen)) if call.svlen else 0
svtype = call.svtype.upper() if call.svtype else 'BND'
row = {
'svlen_abs': svlen,
'log10_svlen': np.log10(svlen + 1),
'svtype_DEL': int(svtype == 'DEL'),
'svtype_INS': int(svtype == 'INS'),
'svtype_DUP': int(svtype == 'DUP'),
'svtype_BND': int(svtype == 'BND'),
'total_alt_support': float(call.total_alt_support or 0.0),
}
seq = _seq_features(fa, call.chrom, int(call.pos), int(call.end))
row.update(seq)
return row
def extract_batch(calls: Iterable[SVCall], fasta_path: str) -> pd.DataFrame:
"""Compute features for a batch of SV calls. Returns a DataFrame in FEATURE_COLS order."""
fa = pysam.FastaFile(fasta_path)
try:
rows = [extract_one(c, fa) for c in calls]
finally:
fa.close()
df = pd.DataFrame(rows)
return df.reindex(columns=FEATURE_COLS).fillna(0)
def from_vcf(vcf_path: str, fasta_path: str,
alt_support_field: Optional[str] = None) -> pd.DataFrame:
"""Parse a VCF file and produce a feature matrix.
Parameters
----------
vcf_path : str
fasta_path : str
alt_support_field : Optional[str]
Format field name to derive total_alt_support. Falls back to PR/SR for
Manta, AD for Delly, or 0 if not found. Common values: "PR", "SR", "AD".
"""
try:
vcf = pysam.VariantFile(vcf_path)
except Exception as e:
raise IOError(f'Cannot open VCF {vcf_path}: {e}')
calls = []
coords = []
for rec in vcf.fetch():
chrom = rec.chrom
pos = int(rec.pos)
info = rec.info
svtype = (info.get('SVTYPE') or '').upper() if info else ''
if not svtype:
continue
end = int(info.get('END') or pos)
svlen = info.get('SVLEN')
if isinstance(svlen, tuple):
svlen = svlen[0]
if svlen is None:
svlen = end - pos
svlen = abs(int(svlen))
alt_support = _derive_alt_support(rec, alt_support_field)
calls.append(SVCall(chrom=chrom, pos=pos, end=end, svtype=svtype,
svlen=svlen, total_alt_support=alt_support))
coords.append({'chrom': chrom, 'pos': pos, 'end': end,
'svtype': svtype, 'svlen': svlen})
vcf.close()
feat_df = extract_batch(calls, fasta_path)
coord_df = pd.DataFrame(coords)
return pd.concat([coord_df.reset_index(drop=True),
feat_df.reset_index(drop=True)], axis=1)
def _derive_alt_support(rec, field: Optional[str]) -> float:
"""Best-effort: try the requested field, then PR/SR (Manta), then AD."""
if not rec.samples:
return 0.0
sample = list(rec.samples.values())[0]
if field:
v = sample.get(field)
if v is not None:
return _last_int(v)
pr = sample.get('PR')
sr = sample.get('SR')
total = 0.0
if pr is not None:
total += _last_int(pr)
if sr is not None:
total += _last_int(sr)
if total > 0:
return total
ad = sample.get('AD')
if ad is not None:
return _last_int(ad)
return 0.0
def _last_int(v) -> int:
"""Take the alt (last) value from an int/tuple. Used for Manta's PR=(ref,alt) tuples."""
if isinstance(v, (tuple, list)):
return int(v[-1]) if v else 0
try:
return int(v)
except (TypeError, ValueError):
return 0