| """Feature engineering for the DPYD baseline classifier. |
| |
| BASELINE SCOPE (by design): AF + CPIC/ClinVar categorical features ONLY. |
| No SIFT / PolyPhen / CADD / conservation — those require dbNSFP and were |
| explicitly dropped for this baseline. Any model trained on this matrix MUST be |
| labeled "baseline classifier, AF+categorical features". |
| |
| Inputs: |
| data/clinvar_dpyd.tsv (ClinVar DPYD variants) |
| data/gnomad_dpyd_sas.csv (gnomAD SAS + global AF, per-variant) |
| data/cpic_dpyd_function.csv (CPIC v2024.01 functional labels — ground truth) |
| |
| Outputs: |
| data/training_data.csv (labeled rows: normal/decreased/no function) |
| data/inference_set.csv (VUS/unknown rows excluded from training) |
| |
| Labels (3-class): normal_function / decreased_function / no_function |
| - Primary truth: CPIC allele function table (joined on rsID). |
| - Fallback: ClinVar CLNSIG mapping for variants absent from CPIC. |
| - VUS / conflicting / unknown -> excluded from training, kept for inference. |
| """ |
| from __future__ import annotations |
| import pandas as pd |
|
|
| |
| |
| |
| CLNSIG_MAP = { |
| "pathogenic": "no_function", |
| "likely_pathogenic": "no_function", |
| "pathogenic/likely_pathogenic": "no_function", |
| "benign": "normal_function", |
| "likely_benign": "normal_function", |
| "benign/likely_benign": "normal_function", |
| } |
| EXCLUDE_CLNSIG_SUBSTR = ("uncertain", "conflicting", "not_provided", "other", "association") |
|
|
| MC_CONSEQUENCES = [ |
| "missense_variant", "synonymous_variant", "frameshift_variant", |
| "stop_gained", "splice_donor_variant", "splice_acceptor_variant", |
| "intron_variant", "5_prime_UTR_variant", "3_prime_UTR_variant", |
| ] |
|
|
|
|
| def _norm_clnsig(s: str) -> str: |
| return (s or "").strip().lower().replace(" ", "_") |
|
|
|
|
| def _clnsig_label(clnsig: str): |
| c = _norm_clnsig(clnsig) |
| if any(x in c for x in EXCLUDE_CLNSIG_SUBSTR): |
| return None |
| return CLNSIG_MAP.get(c) |
|
|
|
|
| def _consequence(mc: str) -> str: |
| mc = (mc or "").lower() |
| for cq in MC_CONSEQUENCES: |
| if cq in mc: |
| return cq |
| return "other" |
|
|
|
|
| def build(clinvar="data/clinvar_dpyd.tsv", |
| gnomad="data/gnomad_dpyd_sas.csv", |
| cpic="data/cpic_dpyd_function.csv", |
| train_out="data/training_data.csv", |
| infer_out="data/inference_set.csv"): |
| cv = pd.read_csv(clinvar, sep="\t", dtype=str).fillna("") |
| gn = pd.read_csv(gnomad, dtype=str).fillna("") if _exists(gnomad) else pd.DataFrame( |
| columns=["rsid", "gnomad_global_af", "gnomad_sas_af", "in_gnomad"]) |
| cp = pd.read_csv(cpic, dtype=str).fillna("") |
|
|
| |
| cv = cv.assign(alt=cv["alt"].str.split(",")).explode("alt") |
| cv["variant_id"] = "1-" + cv["pos"] + "-" + cv["ref"] + "-" + cv["alt"] |
|
|
| |
| gn_af = gn[["rsid", "gnomad_global_af", "gnomad_sas_af", "in_gnomad"]].drop_duplicates("rsid") |
| df = cv.merge(gn_af, on="rsid", how="left") |
|
|
| for col in ("gnomad_global_af", "gnomad_sas_af"): |
| df[col] = pd.to_numeric(df[col], errors="coerce") |
| df["in_gnomad"] = pd.to_numeric(df["in_gnomad"], errors="coerce").fillna(0).astype(int) |
|
|
| |
| df["consequence"] = df["mc"].map(_consequence) |
| df["clnsig_norm"] = df["clnsig"].map(_norm_clnsig) |
| df["is_indel"] = ((df["ref"].str.len() != 1) | (df["alt"].str.len() != 1)).astype(int) |
| |
| for col in ("gnomad_global_af", "gnomad_sas_af"): |
| df[f"log10_{col}"] = _log10_af(df[col]) |
| df["sas_enriched"] = ((df["gnomad_sas_af"].fillna(0) > df["gnomad_global_af"].fillna(0)) |
| & (df["gnomad_sas_af"].fillna(0) > 0)).astype(int) |
|
|
| |
| cpic_lbl = cp[["rsid", "label_class", "cpic_function"]].drop_duplicates("rsid") |
| df = df.merge(cpic_lbl, on="rsid", how="left") |
| df["label_source"] = "" |
| df.loc[df["label_class"].notna(), "label_source"] = "CPIC" |
|
|
| |
| need = df["label_class"].isna() |
| df.loc[need, "label_class"] = df.loc[need, "clnsig"].map(_clnsig_label) |
| df.loc[need & df["label_class"].notna(), "label_source"] = "ClinVar" |
|
|
| valid = {"normal_function", "decreased_function", "no_function"} |
| is_train = df["label_class"].isin(valid) |
|
|
| feature_cols = ["chrom", "pos", "ref", "alt", "rsid", "variant_id", |
| "gnomad_global_af", "gnomad_sas_af", "log10_gnomad_global_af", |
| "log10_gnomad_sas_af", "in_gnomad", "sas_enriched", |
| "consequence", "clnsig_norm", "is_indel", |
| "label_class", "label_source", "cpic_function"] |
| feature_cols = [c for c in feature_cols if c in df.columns] |
|
|
| train = df[is_train][feature_cols].copy() |
| infer = df[~is_train][feature_cols].copy() |
| train.to_csv(train_out, index=False) |
| infer.to_csv(infer_out, index=False) |
| print(f"training rows: {len(train)} (by source: " |
| f"{train['label_source'].value_counts().to_dict()})") |
| print(f" class balance: {train['label_class'].value_counts().to_dict()}") |
| print(f"inference rows (VUS/unknown/no-label): {len(infer)}") |
| print(f"wrote {train_out}, {infer_out}") |
| return train, infer |
|
|
|
|
| def _log10_af(s): |
| import numpy as np |
| v = pd.to_numeric(s, errors="coerce").fillna(0.0).astype(float) |
| return (v.clip(lower=0) + 1e-7).map(lambda x: float(np.log10(x))) |
|
|
|
|
| def _exists(p): |
| import os |
| return os.path.exists(p) |
|
|
|
|
| if __name__ == "__main__": |
| build() |
|
|