File size: 5,821 Bytes
2ea06dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Feature engineering for the DPYD baseline classifier.

BASELINE SCOPE (by design): AF + CPIC/ClinVar categorical features ONLY.
No SIFT / PolyPhen / CADD / conservation — those require dbNSFP and were
explicitly dropped for this baseline. Any model trained on this matrix MUST be
labeled "baseline classifier, AF+categorical features".

Inputs:
  data/clinvar_dpyd.tsv       (ClinVar DPYD variants)
  data/gnomad_dpyd_sas.csv    (gnomAD SAS + global AF, per-variant)
  data/cpic_dpyd_function.csv (CPIC v2024.01 functional labels — ground truth)

Outputs:
  data/training_data.csv  (labeled rows: normal/decreased/no function)
  data/inference_set.csv  (VUS/unknown rows excluded from training)

Labels (3-class): normal_function / decreased_function / no_function
  - Primary truth: CPIC allele function table (joined on rsID).
  - Fallback: ClinVar CLNSIG mapping for variants absent from CPIC.
  - VUS / conflicting / unknown -> excluded from training, kept for inference.
"""
from __future__ import annotations
import pandas as pd

# --- ClinVar CLNSIG -> 3-class fallback (only used when CPIC has no entry) ---
# Conservative mapping: pathogenic loss-of-function -> no_function;
# we do NOT invent a decreased_function signal from ClinVar (CPIC owns that).
CLNSIG_MAP = {
    "pathogenic": "no_function",
    "likely_pathogenic": "no_function",
    "pathogenic/likely_pathogenic": "no_function",
    "benign": "normal_function",
    "likely_benign": "normal_function",
    "benign/likely_benign": "normal_function",
}
EXCLUDE_CLNSIG_SUBSTR = ("uncertain", "conflicting", "not_provided", "other", "association")

MC_CONSEQUENCES = [
    "missense_variant", "synonymous_variant", "frameshift_variant",
    "stop_gained", "splice_donor_variant", "splice_acceptor_variant",
    "intron_variant", "5_prime_UTR_variant", "3_prime_UTR_variant",
]


def _norm_clnsig(s: str) -> str:
    return (s or "").strip().lower().replace(" ", "_")


def _clnsig_label(clnsig: str):
    c = _norm_clnsig(clnsig)
    if any(x in c for x in EXCLUDE_CLNSIG_SUBSTR):
        return None  # -> inference only
    return CLNSIG_MAP.get(c)


def _consequence(mc: str) -> str:
    mc = (mc or "").lower()
    for cq in MC_CONSEQUENCES:
        if cq in mc:
            return cq
    return "other"


def build(clinvar="data/clinvar_dpyd.tsv",
          gnomad="data/gnomad_dpyd_sas.csv",
          cpic="data/cpic_dpyd_function.csv",
          train_out="data/training_data.csv",
          infer_out="data/inference_set.csv"):
    cv = pd.read_csv(clinvar, sep="\t", dtype=str).fillna("")
    gn = pd.read_csv(gnomad, dtype=str).fillna("") if _exists(gnomad) else pd.DataFrame(
        columns=["rsid", "gnomad_global_af", "gnomad_sas_af", "in_gnomad"])
    cp = pd.read_csv(cpic, dtype=str).fillna("")

    # explode multi-allele ClinVar rows so each (pos,ref,alt) is one row
    cv = cv.assign(alt=cv["alt"].str.split(",")).explode("alt")
    cv["variant_id"] = "1-" + cv["pos"] + "-" + cv["ref"] + "-" + cv["alt"]

    # --- join gnomAD AF on rsid (primary) then variant_id (fallback) ---
    gn_af = gn[["rsid", "gnomad_global_af", "gnomad_sas_af", "in_gnomad"]].drop_duplicates("rsid")
    df = cv.merge(gn_af, on="rsid", how="left")

    for col in ("gnomad_global_af", "gnomad_sas_af"):
        df[col] = pd.to_numeric(df[col], errors="coerce")
    df["in_gnomad"] = pd.to_numeric(df["in_gnomad"], errors="coerce").fillna(0).astype(int)

    # --- categorical features ---
    df["consequence"] = df["mc"].map(_consequence)
    df["clnsig_norm"] = df["clnsig"].map(_norm_clnsig)
    df["is_indel"] = ((df["ref"].str.len() != 1) | (df["alt"].str.len() != 1)).astype(int)
    # AF-derived numeric features (log10 with pseudocount; rarity is signal)
    for col in ("gnomad_global_af", "gnomad_sas_af"):
        df[f"log10_{col}"] = _log10_af(df[col])
    df["sas_enriched"] = ((df["gnomad_sas_af"].fillna(0) > df["gnomad_global_af"].fillna(0))
                          & (df["gnomad_sas_af"].fillna(0) > 0)).astype(int)

    # --- labels ---
    cpic_lbl = cp[["rsid", "label_class", "cpic_function"]].drop_duplicates("rsid")
    df = df.merge(cpic_lbl, on="rsid", how="left")
    df["label_source"] = ""
    df.loc[df["label_class"].notna(), "label_source"] = "CPIC"

    # fallback to ClinVar where CPIC is silent
    need = df["label_class"].isna()
    df.loc[need, "label_class"] = df.loc[need, "clnsig"].map(_clnsig_label)
    df.loc[need & df["label_class"].notna(), "label_source"] = "ClinVar"

    valid = {"normal_function", "decreased_function", "no_function"}
    is_train = df["label_class"].isin(valid)

    feature_cols = ["chrom", "pos", "ref", "alt", "rsid", "variant_id",
                    "gnomad_global_af", "gnomad_sas_af", "log10_gnomad_global_af",
                    "log10_gnomad_sas_af", "in_gnomad", "sas_enriched",
                    "consequence", "clnsig_norm", "is_indel",
                    "label_class", "label_source", "cpic_function"]
    feature_cols = [c for c in feature_cols if c in df.columns]

    train = df[is_train][feature_cols].copy()
    infer = df[~is_train][feature_cols].copy()
    train.to_csv(train_out, index=False)
    infer.to_csv(infer_out, index=False)
    print(f"training rows: {len(train)}  (by source: "
          f"{train['label_source'].value_counts().to_dict()})")
    print(f"  class balance: {train['label_class'].value_counts().to_dict()}")
    print(f"inference rows (VUS/unknown/no-label): {len(infer)}")
    print(f"wrote {train_out}, {infer_out}")
    return train, infer


def _log10_af(s):
    import numpy as np
    v = pd.to_numeric(s, errors="coerce").fillna(0.0).astype(float)
    return (v.clip(lower=0) + 1e-7).map(lambda x: float(np.log10(x)))


def _exists(p):
    import os
    return os.path.exists(p)


if __name__ == "__main__":
    build()