Spaces:
Sleeping
Sleeping
| """ | |
| vep_handler.py — PeVe v1.4 | |
| =========================== | |
| Fixes: | |
| 1. Genome build normalisation (GRCh38 enforced, "chr" prefix stripped) | |
| 2. Correct Ensembl REST endpoint and headers | |
| 3. Full HTTP debug logging to Space stdout | |
| 4. Retry with back-off | |
| 5. annotation_available flag surfaced in every return value | |
| 6. AF lookup independent of VEP success | |
| 7. Test block: python vep_handler.py | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import time | |
| import traceback | |
| import urllib.error | |
| import urllib.request | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| # ── Constants ───────────────────────────────────────────────────────────────── | |
| _ENSEMBL_REST = "https://rest.ensembl.org" | |
| _GNOMAD_API = "https://gnomad.broadinstitute.org/api" | |
| _GENOME_BUILD = "GRCh38" # enforced throughout | |
| _REQUEST_TIMEOUT = 20 # seconds | |
| _MAX_RETRIES = 3 | |
| _RETRY_DELAY = 2 # seconds, doubled each retry | |
| # ── Return types ────────────────────────────────────────────────────────────── | |
| class VEPResult: | |
| consequence: str = "unknown" | |
| impact: str = "MODIFIER" | |
| gene: str = "" | |
| transcript: str = "" | |
| all_consequences: list = field(default_factory=lambda: ["unknown"]) | |
| annotation_available: bool = False | |
| error_message: Optional[str] = None | |
| raw_response: Optional[dict] = None | |
| class AFResult: | |
| state: str = "AF_UNKNOWN" | |
| global_af: Optional[float] = None | |
| population_afs: dict = field(default_factory=dict) | |
| is_rare: Optional[bool] = None | |
| founder_variant_flag: bool = False | |
| annotation_available: bool = False | |
| error_message: Optional[str] = None | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # Helpers | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def _normalise_chrom(chrom: str) -> str: | |
| """Strip 'chr' prefix, uppercase. '17' and 'chr17' → '17'.""" | |
| return str(chrom).strip().upper().lstrip("CHR") | |
| def _http_get(url: str, label: str) -> Optional[dict | str]: | |
| """ | |
| GET with retry + back-off. | |
| Returns parsed JSON dict, raw string, or None on failure. | |
| Logs full request/response to stdout (visible in HF Space logs). | |
| """ | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| delay = _RETRY_DELAY | |
| last_err = "" | |
| for attempt in range(1, _MAX_RETRIES + 1): | |
| print(f"[VEP] {label} → GET {url} (attempt {attempt}/{_MAX_RETRIES})") | |
| try: | |
| req = urllib.request.Request(url, headers=headers) | |
| with urllib.request.urlopen(req, timeout=_REQUEST_TIMEOUT) as resp: | |
| status = resp.status | |
| body = resp.read().decode("utf-8") | |
| print(f"[VEP] {label} ← HTTP {status} ({len(body)} bytes)") | |
| # Try JSON parse; fall back to raw string | |
| try: | |
| return json.loads(body) | |
| except json.JSONDecodeError: | |
| return body | |
| except urllib.error.HTTPError as e: | |
| last_err = f"HTTP {e.code}: {e.reason}" | |
| body_preview = "" | |
| try: | |
| body_preview = e.read().decode()[:300] | |
| except Exception: | |
| pass | |
| print(f"[VEP] {label} attempt {attempt} HTTPError: {last_err}") | |
| print(f"[VEP] body: {body_preview}") | |
| except urllib.error.URLError as e: | |
| last_err = f"URLError: {e.reason}" | |
| print(f"[VEP] {label} attempt {attempt} URLError: {last_err}") | |
| except Exception: | |
| last_err = traceback.format_exc() | |
| print(f"[VEP] {label} attempt {attempt} Exception:\n{last_err}") | |
| if attempt < _MAX_RETRIES: | |
| print(f"[VEP] {label} retrying in {delay}s …") | |
| time.sleep(delay) | |
| delay *= 2 | |
| print(f"[VEP] {label} FAILED after {_MAX_RETRIES} attempts: {last_err}") | |
| return None | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # VEP Annotation | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def fetch_vep(chrom: str, pos: int, ref: str, alt: str) -> VEPResult: | |
| """ | |
| Query Ensembl REST VEP (GRCh38). | |
| Endpoint: /vep/human/region/{chrom}:{pos}-{pos}/{alt} | |
| Debug info printed to stdout on every call. | |
| """ | |
| chrom_norm = _normalise_chrom(chrom) | |
| # Ensembl VEP REST requires no "chr" prefix for human GRCh38 | |
| url = ( | |
| f"{_ENSEMBL_REST}/vep/human/region/" | |
| f"{chrom_norm}:{pos}-{pos}/{ref}/{alt}" | |
| f"?content-type=application/json" | |
| f"&canonical=1" | |
| f"&pick=1" | |
| f"&hgvs=1" | |
| f"&LoF=1" | |
| ) | |
| print(f"[VEP] Querying VEP | build={_GENOME_BUILD} | " | |
| f"coord={chrom_norm}:{pos} {ref}>{alt}") | |
| print(f"[VEP] URL: {url}") | |
| data = _http_get(url, f"VEP {chrom_norm}:{pos}") | |
| if data is None: | |
| return VEPResult( | |
| annotation_available=False, | |
| error_message="HTTP request failed — see logs above", | |
| ) | |
| if not isinstance(data, list) or len(data) == 0: | |
| msg = f"Unexpected VEP response type={type(data).__name__}: {str(data)[:200]}" | |
| print(f"[VEP] ✗ {msg}") | |
| return VEPResult(annotation_available=False, error_message=msg) | |
| entry = data[0] | |
| # Check for Ensembl error object | |
| if "error" in entry: | |
| msg = f"Ensembl VEP error: {entry['error']}" | |
| print(f"[VEP] ✗ {msg}") | |
| return VEPResult(annotation_available=False, error_message=msg) | |
| tcs = entry.get("transcript_consequences") or [] | |
| if not tcs: | |
| # Try intergenic | |
| ics = entry.get("intergenic_consequences") or [{}] | |
| tc = ics[0] | |
| print(f"[VEP] ⚠ No transcript consequences — variant may be intergenic") | |
| else: | |
| tc = tcs[0] | |
| result = VEPResult( | |
| consequence = tc.get("consequence_terms", ["unknown"])[0], | |
| impact = tc.get("impact", "MODIFIER"), | |
| gene = tc.get("gene_symbol", ""), | |
| transcript = tc.get("transcript_id", ""), | |
| all_consequences = [ | |
| t.get("consequence_terms", ["unknown"])[0] for t in tcs | |
| ] or ["unknown"], | |
| annotation_available = True, | |
| raw_response = entry, | |
| ) | |
| print(f"[VEP] ✓ gene={result.gene} consequence={result.consequence} " | |
| f"impact={result.impact} tx={result.transcript}") | |
| return result | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # Allele Frequency (gnomAD v4 GraphQL) | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| _GNOMAD_QUERY = """ | |
| query VariantAF($variantId: String!, $dataset: DatasetId!) { | |
| variant(variantId: $variantId, dataset: $dataset) { | |
| variant_id | |
| genome { | |
| af | |
| populations { | |
| id | |
| af | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| _POP_FOUNDER = {"asj", "fin"} # populations with founder-effect risk | |
| _RARE_THRESHOLD = 0.001 # AF < 0.1% | |
| def fetch_af( | |
| chrom: str, | |
| pos: int, | |
| ref: str, | |
| alt: str, | |
| ancestry: Optional[str] = None, | |
| ) -> AFResult: | |
| """ | |
| Query gnomAD v4 for global + population AF. | |
| variant_id format: 17-43092176-G-T (no 'chr' prefix) | |
| """ | |
| chrom_norm = _normalise_chrom(chrom) | |
| variant_id = f"{chrom_norm}-{pos}-{ref}-{alt}" | |
| dataset = "gnomad_r4" | |
| print(f"[AF] Querying gnomAD | variant_id={variant_id}") | |
| query_body = json.dumps({ | |
| "query": _GNOMAD_QUERY, | |
| "variables": {"variantId": variant_id, "dataset": dataset}, | |
| }).encode("utf-8") | |
| try: | |
| req = urllib.request.Request( | |
| _GNOMAD_API, | |
| data=query_body, | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| }, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(req, timeout=_REQUEST_TIMEOUT) as resp: | |
| status = resp.status | |
| body = resp.read().decode("utf-8") | |
| print(f"[AF] gnomAD ← HTTP {status} ({len(body)} bytes)") | |
| data = json.loads(body) | |
| except Exception: | |
| tb = traceback.format_exc() | |
| print(f"[AF] gnomAD request failed:\n{tb}") | |
| return AFResult( | |
| state="AF_UNKNOWN", | |
| annotation_available=False, | |
| error_message=tb, | |
| ) | |
| variant_data = ( | |
| (data.get("data") or {}).get("variant") or {} | |
| ) | |
| genome_data = variant_data.get("genome") or {} | |
| if not genome_data: | |
| print(f"[AF] ⚠ No genome AF data for {variant_id} in {dataset}") | |
| # Try v2 fallback | |
| return _fetch_af_gnomad_v2(chrom_norm, pos, ref, alt, ancestry) | |
| global_af = genome_data.get("af") | |
| pops_raw = genome_data.get("populations") or [] | |
| pop_afs = {p["id"]: p["af"] for p in pops_raw if p.get("af") is not None} | |
| # Ancestry-specific AF | |
| anc_af = None | |
| if ancestry: | |
| anc_key = ancestry.lower() | |
| for k, v in pop_afs.items(): | |
| if anc_key in k.lower(): | |
| anc_af = v | |
| break | |
| effective_af = anc_af if anc_af is not None else global_af | |
| if effective_af is None: | |
| print(f"[AF] ⚠ AF is null for {variant_id}") | |
| return AFResult( | |
| state="AF_UNKNOWN", | |
| population_afs=pop_afs, | |
| annotation_available=True, | |
| error_message="AF field is null — variant may be absent from gnomAD", | |
| ) | |
| is_rare = effective_af < _RARE_THRESHOLD | |
| founder_flag = any( | |
| pop_afs.get(p, 0) > effective_af * 5 | |
| for p in _POP_FOUNDER | |
| if p in pop_afs | |
| ) | |
| state = "AF_RARE" if is_rare else "AF_COMMON" | |
| print(f"[AF] ✓ global_af={global_af:.6f} " | |
| f"effective={effective_af:.6f} " | |
| f"rare={is_rare} founder={founder_flag}") | |
| return AFResult( | |
| state = state, | |
| global_af = float(global_af), | |
| population_afs = pop_afs, | |
| is_rare = is_rare, | |
| founder_variant_flag = founder_flag, | |
| annotation_available = True, | |
| ) | |
| def _fetch_af_gnomad_v2( | |
| chrom: str, pos: int, ref: str, alt: str, | |
| ancestry: Optional[str] | |
| ) -> AFResult: | |
| """Fallback to gnomAD v2.1.1 (GRCh37 liftover via 38 API).""" | |
| variant_id = f"{chrom}-{pos}-{ref}-{alt}" | |
| dataset = "gnomad_r2_1" | |
| print(f"[AF] Trying gnomAD v2 fallback | variant_id={variant_id}") | |
| query_body = json.dumps({ | |
| "query": _GNOMAD_QUERY, | |
| "variables": {"variantId": variant_id, "dataset": dataset}, | |
| }).encode("utf-8") | |
| try: | |
| req = urllib.request.Request( | |
| _GNOMAD_API, | |
| data=query_body, | |
| headers={"Content-Type": "application/json", "Accept": "application/json"}, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(req, timeout=_REQUEST_TIMEOUT) as resp: | |
| data = json.loads(resp.read().decode("utf-8")) | |
| genome_data = ((data.get("data") or {}).get("variant") or {}).get("genome") or {} | |
| global_af = genome_data.get("af") | |
| if global_af is not None: | |
| is_rare = float(global_af) < _RARE_THRESHOLD | |
| print(f"[AF] ✓ gnomAD v2 fallback: global_af={global_af:.6f}") | |
| return AFResult( | |
| state = "AF_RARE" if is_rare else "AF_COMMON", | |
| global_af = float(global_af), | |
| is_rare = is_rare, | |
| annotation_available=True, | |
| ) | |
| except Exception: | |
| print(f"[AF] gnomAD v2 fallback failed:\n{traceback.format_exc()}") | |
| return AFResult( | |
| state="AF_UNKNOWN", | |
| annotation_available=False, | |
| error_message="Both gnomAD v4 and v2 lookups failed", | |
| ) | |
| # ── Compat shims for existing app.py / decision_engine calls ────────────────── | |
| def format_af_display(af_result: AFResult) -> str: | |
| if af_result.global_af is None: | |
| return "Not found in gnomAD" | |
| return f"{af_result.global_af:.6f}" | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| # Test block | |
| # ══════════════════════════════════════════════════════════════════════════════ | |
| def _test(): | |
| print("=" * 60) | |
| print("TEST: chr17:43092176 G>T (BRCA1 known pathogenic)") | |
| print("=" * 60) | |
| vep = fetch_vep("17", 43092176, "G", "T") | |
| print(f"\nVEP result:") | |
| print(f" annotation_available : {vep.annotation_available}") | |
| print(f" gene : {vep.gene}") | |
| print(f" consequence : {vep.consequence}") | |
| print(f" impact : {vep.impact}") | |
| print(f" transcript : {vep.transcript}") | |
| print(f" error_message : {vep.error_message}") | |
| print() | |
| af = fetch_af("17", 43092176, "G", "T", ancestry="nfe") | |
| print(f"AF result:") | |
| print(f" annotation_available : {af.annotation_available}") | |
| print(f" state : {af.state}") | |
| print(f" global_af : {af.global_af}") | |
| print(f" is_rare : {af.is_rare}") | |
| print(f" founder_flag : {af.founder_variant_flag}") | |
| print(f" error_message : {af.error_message}") | |
| print() | |
| print("EXPECTED:") | |
| print(" gene = BRCA1") | |
| print(" consequence = missense_variant (or similar)") | |
| print(" global_af = very small float (rare)") | |
| assert vep.annotation_available, "VEP annotation_available should be True" | |
| assert vep.gene == "BRCA1", f"Expected BRCA1, got '{vep.gene}'" | |
| assert vep.consequence != "unknown", "consequence should not be 'unknown'" | |
| print("\n✓ All assertions passed") | |
| if __name__ == "__main__": | |
| _test() | |