File size: 7,002 Bytes
90d0b4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
"""Feature extractor for SV-SPR confidence model.

Caller-agnostic feature set (11 features):
  1. svlen_abs            length, universal
  2. log10_svlen          log-scaled length
  3. svtype_DEL/INS/DUP/BND   one-hot type
  4. total_alt_support    generic alt read support
  5. gc_flank_w100        GC fraction in ±100bp window around breakpoint
  6. at_flank_w100        AT fraction in flank
  7. gc_inner_w100        GC fraction inside the called region (DEL/DUP)
  8. n_motif_2_w100       tandem dinuc count in flank
  9. n_motif_3_w100       tandem trinuc count in flank

Inputs required from caller (VCF):
  - chrom, pos, end (or svlen for INS)
  - svtype: DEL / INS / DUP / BND
  - total_alt_support: generic, derived from any caller's AD/PR/SR field
    If absent, set to 0 (the model can still score on the remaining features
    but performance may drop ~0.014 F1 — see T01d analysis).
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, Mapping, Optional

import numpy as np
import pandas as pd
import pysam


WINDOW = 100  # ±100 bp flank
FEATURE_COLS = [
    'svlen_abs', 'log10_svlen',
    'svtype_DEL', 'svtype_INS', 'svtype_DUP', 'svtype_BND',
    'total_alt_support',
    'gc_flank_w100', 'at_flank_w100', 'gc_inner_w100',
    'n_motif_2_w100', 'n_motif_3_w100',
]


@dataclass
class SVCall:
    """One structural variant call to score.

    Attributes
    ----------
    chrom : str
    pos   : int      1-based start (VCF convention)
    end   : int      1-based end (= pos for INS/BND, or pos+svlen for DEL/DUP)
    svtype : str     "DEL" | "INS" | "DUP" | "BND"
    svlen : int      Absolute length of the variant
    total_alt_support : float
        Alt read support count from any caller. 0 if unavailable.
    """
    chrom: str
    pos: int
    end: int
    svtype: str
    svlen: int
    total_alt_support: float = 0.0


def _seq_features(fa: pysam.FastaFile, chrom: str, pos: int, end: int,
                  win: int = WINDOW) -> dict:
    """Compute the 5 sequence-context features for one breakpoint pair."""
    try:
        left = fa.fetch(chrom, max(0, pos - win), pos).upper()
        right = fa.fetch(chrom, end, end + win).upper()
        if 0 < end - pos < 5000:
            inner = fa.fetch(chrom, pos, end).upper()
        else:
            inner = ''
    except (ValueError, KeyError):
        return {f'gc_flank_w{win}': np.nan,
                f'at_flank_w{win}': np.nan,
                f'gc_inner_w{win}': np.nan,
                f'n_motif_2_w{win}': np.nan,
                f'n_motif_3_w{win}': np.nan}

    flank = left + right
    n_flank = max(len(flank), 1)
    gc_flank = (flank.count('G') + flank.count('C')) / n_flank
    at_flank = (flank.count('A') + flank.count('T')) / n_flank
    if inner:
        gc_inner = (inner.count('G') + inner.count('C')) / max(len(inner), 1)
    else:
        gc_inner = gc_flank  # fallback

    n_motif_2 = 0
    n_motif_3 = 0
    for i in range(len(flank) - 5):
        if flank[i:i+2] == flank[i+2:i+4] == flank[i+4:i+6]:
            n_motif_2 += 1
        if i + 9 <= len(flank) and flank[i:i+3] == flank[i+3:i+6] == flank[i+6:i+9]:
            n_motif_3 += 1

    return {
        f'gc_flank_w{win}': gc_flank,
        f'at_flank_w{win}': at_flank,
        f'gc_inner_w{win}': gc_inner,
        f'n_motif_2_w{win}': n_motif_2,
        f'n_motif_3_w{win}': n_motif_3,
    }


def extract_one(call: SVCall, fa: pysam.FastaFile) -> dict:
    """Compute all 11 features for one SV call."""
    svlen = abs(int(call.svlen)) if call.svlen else 0
    svtype = call.svtype.upper() if call.svtype else 'BND'

    row = {
        'svlen_abs': svlen,
        'log10_svlen': np.log10(svlen + 1),
        'svtype_DEL': int(svtype == 'DEL'),
        'svtype_INS': int(svtype == 'INS'),
        'svtype_DUP': int(svtype == 'DUP'),
        'svtype_BND': int(svtype == 'BND'),
        'total_alt_support': float(call.total_alt_support or 0.0),
    }
    seq = _seq_features(fa, call.chrom, int(call.pos), int(call.end))
    row.update(seq)
    return row


def extract_batch(calls: Iterable[SVCall], fasta_path: str) -> pd.DataFrame:
    """Compute features for a batch of SV calls. Returns a DataFrame in FEATURE_COLS order."""
    fa = pysam.FastaFile(fasta_path)
    try:
        rows = [extract_one(c, fa) for c in calls]
    finally:
        fa.close()
    df = pd.DataFrame(rows)
    return df.reindex(columns=FEATURE_COLS).fillna(0)


def from_vcf(vcf_path: str, fasta_path: str,
             alt_support_field: Optional[str] = None) -> pd.DataFrame:
    """Parse a VCF file and produce a feature matrix.

    Parameters
    ----------
    vcf_path : str
    fasta_path : str
    alt_support_field : Optional[str]
        Format field name to derive total_alt_support. Falls back to PR/SR for
        Manta, AD for Delly, or 0 if not found. Common values: "PR", "SR", "AD".
    """
    try:
        vcf = pysam.VariantFile(vcf_path)
    except Exception as e:
        raise IOError(f'Cannot open VCF {vcf_path}: {e}')

    calls = []
    coords = []
    for rec in vcf.fetch():
        chrom = rec.chrom
        pos = int(rec.pos)
        info = rec.info
        svtype = (info.get('SVTYPE') or '').upper() if info else ''
        if not svtype:
            continue
        end = int(info.get('END') or pos)
        svlen = info.get('SVLEN')
        if isinstance(svlen, tuple):
            svlen = svlen[0]
        if svlen is None:
            svlen = end - pos
        svlen = abs(int(svlen))

        alt_support = _derive_alt_support(rec, alt_support_field)
        calls.append(SVCall(chrom=chrom, pos=pos, end=end, svtype=svtype,
                            svlen=svlen, total_alt_support=alt_support))
        coords.append({'chrom': chrom, 'pos': pos, 'end': end,
                       'svtype': svtype, 'svlen': svlen})
    vcf.close()

    feat_df = extract_batch(calls, fasta_path)
    coord_df = pd.DataFrame(coords)
    return pd.concat([coord_df.reset_index(drop=True),
                      feat_df.reset_index(drop=True)], axis=1)


def _derive_alt_support(rec, field: Optional[str]) -> float:
    """Best-effort: try the requested field, then PR/SR (Manta), then AD."""
    if not rec.samples:
        return 0.0
    sample = list(rec.samples.values())[0]
    if field:
        v = sample.get(field)
        if v is not None:
            return _last_int(v)
    pr = sample.get('PR')
    sr = sample.get('SR')
    total = 0.0
    if pr is not None:
        total += _last_int(pr)
    if sr is not None:
        total += _last_int(sr)
    if total > 0:
        return total
    ad = sample.get('AD')
    if ad is not None:
        return _last_int(ad)
    return 0.0


def _last_int(v) -> int:
    """Take the alt (last) value from an int/tuple. Used for Manta's PR=(ref,alt) tuples."""
    if isinstance(v, (tuple, list)):
        return int(v[-1]) if v else 0
    try:
        return int(v)
    except (TypeError, ValueError):
        return 0