Spaces:
Sleeping
Sleeping
| """Replay the saved validation results through the updated rule engine. | |
| This is a deterministic, no-external-calls re-validation: it takes the | |
| criteria already recorded in docs/clinical_validation_results.json, | |
| applies the new gene-mechanism gating (suppress PVS1 for GOF genes, cap | |
| BP4 strength for genes where in-silico predictors are unreliable), then | |
| re-runs the Bayesian combiner. | |
| Useful as a fast sanity check before committing to a full pipeline re-run. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from collections import Counter | |
| from pathlib import Path | |
| from typing import Any | |
| from backend.app.services.acmg.combiner import ( | |
| _bayesian_score, | |
| _bayesian_significance, | |
| combine_criteria, | |
| ) | |
| from backend.app.services.acmg.gene_mechanisms import cap_strength, lookup | |
| from backend.app.services.acmg.rules import RuleEngine | |
| from backend.app.schemas.evidence import ACMGCriterion, InSilicoResult | |
| _engine = RuleEngine() | |
| def _guess_consequence(hgvs: str) -> str | None: | |
| """Heuristic consequence inference from HGVS — only used to drive BP1/ | |
| BP7/PP2 in the replay since the saved JSON doesn't carry consequence. | |
| Live runs use the real VEP-derived field. | |
| Codon position 3 (the wobble) is the redundant position in most codons | |
| — transitions there (G↔A, C↔T) are synonymous ~70% of the time per | |
| the standard genetic code. Without translation we treat wobble-position | |
| transitions as synonymous, which avoids spurious PP2 fires on benign | |
| silent variants. This is a replay-only heuristic; the live pipeline | |
| uses VEP and is not affected. | |
| """ | |
| h = hgvs.lower() | |
| if "del" in h and "_" in h: | |
| return "inframe_deletion" | |
| if "dup" in h or ("ins" in h and "_" in h): | |
| return "inframe_insertion" | |
| if h.endswith("=") or "%3d" in h: | |
| return "synonymous_variant" | |
| m = re.search(r"c\.(\d+)([acgt])>([acgt])", h) | |
| if m: | |
| pos, ref, alt = int(m.group(1)), m.group(2), m.group(3) | |
| codon_pos = ((pos - 1) % 3) + 1 | |
| transitions = {("g","a"), ("a","g"), ("c","t"), ("t","c")} | |
| if codon_pos == 3 and (ref, alt) in transitions: | |
| return "synonymous_variant" | |
| return "missense_variant" | |
| return None | |
| def _guess_protein_from_hgvs(hgvs: str) -> str | None: | |
| """Approximate codon = ceil(coding_pos / 3). For replay-only PM1 | |
| triggering when hgvs_protein wasn't recorded. Returns a synthetic | |
| `p.X{codon}X` string that parse_residue can match. | |
| """ | |
| m = re.search(r"c\.(\d+)[ACGTacgt]>[ACGTacgt]", hgvs) | |
| if not m: | |
| return None | |
| pos = int(m.group(1)) | |
| codon = (pos + 2) // 3 # 1-indexed; positions 1-3 → codon 1 | |
| return f"p.X{codon}X" | |
| import sys | |
| # Default to the 1000-variant fixture; override via CLI arg. | |
| _DEFAULT_RESULTS = Path("docs/clinical_validation_results_1000.json") | |
| RESULTS = Path(sys.argv[1]) if len(sys.argv) > 1 else _DEFAULT_RESULTS | |
| PARTITION = { | |
| "Pathogenic": {"Pathogenic", "Likely Pathogenic"}, | |
| "Likely Pathogenic": {"Pathogenic", "Likely Pathogenic"}, | |
| "Uncertain Significance": {"Uncertain Significance"}, | |
| "Likely Benign": {"Benign", "Likely Benign"}, | |
| "Benign": {"Benign", "Likely Benign"}, | |
| } | |
| def apply_gating( | |
| gene: str, | |
| hgvs: str, | |
| raw_criteria: list[dict[str, Any]], | |
| ) -> list[ACMGCriterion]: | |
| gm = lookup(gene) | |
| out: list[ACMGCriterion] = [] | |
| saved_codes: set[str] = set() | |
| # Reconstruct the InSilicoResult that was used originally (best-effort — | |
| # only the fields we need for BP7 are recoverable from evidence_text). | |
| bp4_entry = next((c for c in raw_criteria if c.get("code") == "BP4"), None) | |
| spliceai = None | |
| if bp4_entry: | |
| m = re.search(r"SpliceAI=([^,\s]+)", bp4_entry.get("evidence_text", "")) | |
| if m and m.group(1) not in ("None", "null"): | |
| try: | |
| spliceai = float(m.group(1)) | |
| except ValueError: | |
| spliceai = None | |
| ins_proxy = InSilicoResult(spliceai_max=spliceai) | |
| for c in raw_criteria: | |
| if not c.get("triggered"): | |
| continue | |
| code = c["code"] | |
| strength = c["strength"] | |
| saved_codes.add(code) | |
| # Suppress PVS1 for genes whose mechanism is not LoF | |
| if code == "PVS1" and gm and gm.suppress_pvs1: | |
| continue | |
| # The production score_pvs1 now consequence-gates PVS1 (live VEP | |
| # consequence). Don't replicate that in replay — the c.X>Y heuristic | |
| # misclassifies start-codon (c.1A>G) and stop-gained-at-wobble | |
| # variants as missense, leading to massive false PVS1 suppression. | |
| # Re-derive BP4 / PP3 strength from raw REVEL+AM in evidence_text, | |
| # so updates to _bp4_strength / _pp3_strength flow through the | |
| # replay. (Saved strengths are frozen at the version they were | |
| # written against.) | |
| if code in ("BP4", "PP3"): | |
| ev = c.get("evidence_text", "") or "" | |
| m_revel = re.search(r"REVEL=([\d.]+|None)", ev) | |
| m_am = re.search(r"AM=([\d.]+|None)", ev) | |
| m_spl = re.search(r"SpliceAI=([\d.]+|None)", ev) | |
| def _val(m): | |
| if not m or m.group(1) == "None": | |
| return None | |
| try: return float(m.group(1)) | |
| except ValueError: return None | |
| ins_proxy_local = InSilicoResult( | |
| revel=_val(m_revel), | |
| alphamissense=_val(m_am), | |
| spliceai_max=_val(m_spl), | |
| ) | |
| if code == "BP4": | |
| strength = _engine._bp4_strength(ins_proxy_local) | |
| else: | |
| strength = _engine._pp3_strength(ins_proxy_local) | |
| # Cap BP4 for GOF / altered-function genes | |
| if code == "BP4" and gm and gm.bp4_max_strength: | |
| strength = cap_strength(strength, gm.bp4_max_strength) | |
| if code == "PP3" and gm and gm.pp3_max_strength: | |
| strength = cap_strength(strength, gm.pp3_max_strength) | |
| out.append( | |
| ACMGCriterion( | |
| code=code, | |
| triggered=True, | |
| strength=strength, | |
| source=c.get("source") or "", | |
| evidence_text=c.get("evidence_text") or "", | |
| confidence=c.get("confidence") or "medium", | |
| pmid=c.get("pmid"), | |
| caveat=c.get("caveat"), | |
| ) | |
| ) | |
| # NEW: also fire PM1 / BP1 / BP7 from the rule engine using the inferred | |
| # consequence + protein change. The original validation run was done | |
| # before these criteria existed, so we recompute them here. PM1 needs | |
| # an HGVS protein string — we extract from the c. coordinates by | |
| # converting codon position (best-effort). | |
| consequence = _guess_consequence(hgvs) | |
| protein = _guess_protein_from_hgvs(hgvs) | |
| if "PM1" not in saved_codes: | |
| pm1 = _engine.score_pm1( | |
| gene_symbol=gene, hgvs_protein=protein, consequence=consequence, | |
| ) | |
| if pm1: | |
| out.append(pm1) | |
| if "PS1" not in saved_codes and "PM5" not in saved_codes: | |
| # PS1/PM5 need real ref+alt amino acids; the synthetic protein | |
| # string from _guess_protein_from_hgvs only encodes residue | |
| # number, so it can't drive PS1/PM5 in replay. Live runs will | |
| # have the real hgvs_protein from VEP and the criterion will | |
| # fire correctly. Skip in replay rather than fire incorrectly. | |
| pass | |
| if "BP1" not in saved_codes: | |
| bp1 = _engine.score_bp1(consequence=consequence, gene_symbol=gene) | |
| if bp1: | |
| out.append(bp1) | |
| if "PP2" not in saved_codes: | |
| # PP2 needs the VCEP context to respect pp2_disallowed for | |
| # ENIGMA / InSiGHT / TP53. Re-lookup here in the replay path. | |
| from backend.app.services.acmg.vcep import lookup_vcep | |
| pp2 = _engine.score_pp2( | |
| consequence=consequence, | |
| gene_symbol=gene, | |
| vcep=lookup_vcep(gene), | |
| ) | |
| if pp2: | |
| out.append(pp2) | |
| if "BP7" not in saved_codes: | |
| bp7 = _engine.score_bp7(consequence=consequence, ins=ins_proxy) | |
| if bp7: | |
| out.append(bp7) | |
| return out | |
| def main() -> int: | |
| data = json.loads(RESULTS.read_text()) | |
| print(f"Source: {RESULTS} (skip_rag={data.get('skip_rag')})") | |
| print(f"Original: {data['correct']}/{data['total_scored']} = {data['concordance']:.1%}") | |
| print() | |
| new_correct = 0 | |
| confusion: Counter[str] = Counter() | |
| flips: list[dict[str, Any]] = [] | |
| new_criterion_fires: Counter[str] = Counter() | |
| for r in data["results"]: | |
| gene = r.get("gene") or "" | |
| expected = r["expected"] | |
| original_got = r["got"] | |
| original_codes = { | |
| c["code"] for c in (r.get("criteria") or []) if c.get("triggered") | |
| } | |
| criteria = apply_gating(gene, r["hgvs"], r.get("criteria") or []) | |
| new_codes = {c.code for c in criteria} | |
| for added in new_codes - original_codes: | |
| new_criterion_fires[added] += 1 | |
| # Use the production combiner so conflict detection + strategy | |
| # logic match a live run, not just raw Bayesian summation. | |
| classification = combine_criteria(criteria) | |
| new_got = classification.significance | |
| score = _bayesian_score(criteria) | |
| match = new_got in PARTITION.get(expected, set()) | |
| if match: | |
| new_correct += 1 | |
| confusion[f"{expected} -> {new_got}"] += 1 | |
| if new_got != original_got: | |
| flips.append( | |
| { | |
| "gene": gene, | |
| "hgvs": r["hgvs"], | |
| "expected": expected, | |
| "before": original_got, | |
| "after": new_got, | |
| "now_correct": match, | |
| } | |
| ) | |
| total = data["total_scored"] | |
| print(f"Replayed: {new_correct}/{total} = {new_correct / total:.1%}") | |
| print(f"Delta: {new_correct - data['correct']:+d}") | |
| print() | |
| print(f"Classification flips: {len(flips)}") | |
| for f in flips: | |
| marker = "✓" if f["now_correct"] else "✗" | |
| print( | |
| f" {marker} {f['gene']:8s} {f['hgvs']:30s} " | |
| f"expected={f['expected']:25s} {f['before']:22s} → {f['after']}" | |
| ) | |
| print() | |
| print("New confusion matrix:") | |
| for k, v in sorted(confusion.items(), key=lambda kv: -kv[1]): | |
| print(f" {v:3d} {k}") | |
| if new_criterion_fires: | |
| print() | |
| print("Newly-firing criteria (count of variants):") | |
| for code, count in sorted(new_criterion_fires.items()): | |
| print(f" {code}: {count} variants") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |