| """Fetch gnomAD v4 SAS + global allele frequencies for DPYD variants. |
| |
| Strategy: the gnomAD GraphQL *region* query over the full DPYD gene span |
| (~1.3 Mb) returns HTTP 502 (gateway times out). The *per-variant* query by |
| variant_id (`{chrom}-{pos}-{ref}-{alt}`, GRCh38) is reliable, so we query the |
| union of variant_ids derived from the ClinVar DPYD pull. |
| |
| This is the full-run version of the scaffolding probe; it pulls *all* ClinVar |
| DPYD variant_ids (no cap). |
| |
| Run: python -m src.fetch_gnomad --clinvar data/clinvar_dpyd.tsv --out data/gnomad_dpyd_sas.csv |
| """ |
| from __future__ import annotations |
| import argparse, csv, time |
| import requests |
|
|
| API = "https://gnomad.broadinstitute.org/api" |
| HEADERS = {"Content-Type": "application/json", "User-Agent": "anukriti-research/1.0"} |
|
|
| VARIANT_QUERY = """ |
| query($vid:String!){ |
| variant(variantId:$vid, dataset: gnomad_r4){ |
| variant_id rsid |
| genome{ af populations{ id ac an } } |
| exome{ af populations{ id ac an } } |
| } |
| }""" |
|
|
|
|
| def _pop_af(block: dict | None, pop: str) -> float | None: |
| if not block: |
| return None |
| for p in block.get("populations") or []: |
| if p["id"] == pop: |
| return (p["ac"] / p["an"]) if p.get("an") else None |
| return None |
|
|
|
|
| def query_variant(vid: str, retries: int = 3, backoff: float = 2.0): |
| """Return the gnomAD variant dict, None (absent), or 'ERR' (unreachable).""" |
| for _ in range(retries): |
| try: |
| r = requests.post(API, json={"query": VARIANT_QUERY, "variables": {"vid": vid}}, |
| headers=HEADERS, timeout=40) |
| if r.status_code == 200: |
| return r.json().get("data", {}).get("variant") |
| time.sleep(backoff) |
| except requests.RequestException: |
| time.sleep(backoff) |
| return "ERR" |
|
|
|
|
| def candidate_variant_ids(clinvar_tsv: str): |
| seen, out = set(), [] |
| with open(clinvar_tsv) as f: |
| for row in csv.DictReader(f, delimiter="\t"): |
| for alt in row["alt"].split(","): |
| if not alt or len(row["ref"]) > 50 or len(alt) > 50: |
| continue |
| vid = f'1-{row["pos"]}-{row["ref"]}-{alt}' |
| if vid not in seen: |
| seen.add(vid) |
| out.append((vid, row["rsid"])) |
| return out |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--clinvar", default="data/clinvar_dpyd.tsv") |
| ap.add_argument("--out", default="data/gnomad_dpyd_sas.csv") |
| ap.add_argument("--cap", type=int, default=0, help="0 = no cap (full pull)") |
| ap.add_argument("--sleep", type=float, default=0.15) |
| args = ap.parse_args() |
|
|
| cands = candidate_variant_ids(args.clinvar) |
| if args.cap: |
| cands = cands[: args.cap] |
| print(f"querying {len(cands)} gnomAD variants...") |
|
|
| cols = ["variant_id", "rsid", "chrom", "pos", "ref", "alt", |
| "gnomad_global_af", "gnomad_sas_af", "in_gnomad", "source_dataset"] |
| found = null = err = 0 |
| with open(args.out, "w", newline="") as fh: |
| w = csv.DictWriter(fh, fieldnames=cols) |
| w.writeheader() |
| for vid, rsid in cands: |
| _, pos, ref, alt = vid.split("-", 3) |
| v = query_variant(vid) |
| if v == "ERR": |
| err += 1 |
| continue |
| row = {"variant_id": vid, "rsid": rsid, "chrom": "1", "pos": pos, |
| "ref": ref, "alt": alt, "source_dataset": "gnomad_r4"} |
| if v is None: |
| null += 1 |
| row.update(gnomad_global_af="", gnomad_sas_af="", in_gnomad=0) |
| else: |
| g, e = v.get("genome"), v.get("exome") |
| gaf = (g or {}).get("af") |
| if gaf is None and e: |
| gaf = e.get("af") |
| sas = _pop_af(g, "sas") |
| if sas is None: |
| sas = _pop_af(e, "sas") |
| row.update(rsid=v.get("rsid") or rsid, |
| gnomad_global_af=gaf if gaf is not None else "", |
| gnomad_sas_af=sas if sas is not None else "", |
| in_gnomad=1) |
| found += 1 |
| w.writerow(row) |
| time.sleep(args.sleep) |
| print(f"done: in_gnomad={found} not_in_gnomad={null} errors={err} -> {args.out}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|