"""Replay the saved validation results through the updated rule engine. This is a deterministic, no-external-calls re-validation: it takes the criteria already recorded in docs/clinical_validation_results.json, applies the new gene-mechanism gating (suppress PVS1 for GOF genes, cap BP4 strength for genes where in-silico predictors are unreliable), then re-runs the Bayesian combiner. Useful as a fast sanity check before committing to a full pipeline re-run. """ from __future__ import annotations import json import re from collections import Counter from pathlib import Path from typing import Any from backend.app.services.acmg.combiner import ( _bayesian_score, _bayesian_significance, combine_criteria, ) from backend.app.services.acmg.gene_mechanisms import cap_strength, lookup from backend.app.services.acmg.rules import RuleEngine from backend.app.schemas.evidence import ACMGCriterion, InSilicoResult _engine = RuleEngine() def _guess_consequence(hgvs: str) -> str | None: """Heuristic consequence inference from HGVS — only used to drive BP1/ BP7/PP2 in the replay since the saved JSON doesn't carry consequence. Live runs use the real VEP-derived field. Codon position 3 (the wobble) is the redundant position in most codons — transitions there (G↔A, C↔T) are synonymous ~70% of the time per the standard genetic code. Without translation we treat wobble-position transitions as synonymous, which avoids spurious PP2 fires on benign silent variants. This is a replay-only heuristic; the live pipeline uses VEP and is not affected. """ h = hgvs.lower() if "del" in h and "_" in h: return "inframe_deletion" if "dup" in h or ("ins" in h and "_" in h): return "inframe_insertion" if h.endswith("=") or "%3d" in h: return "synonymous_variant" m = re.search(r"c\.(\d+)([acgt])>([acgt])", h) if m: pos, ref, alt = int(m.group(1)), m.group(2), m.group(3) codon_pos = ((pos - 1) % 3) + 1 transitions = {("g","a"), ("a","g"), ("c","t"), ("t","c")} if codon_pos == 3 and (ref, alt) in transitions: return "synonymous_variant" return "missense_variant" return None def _guess_protein_from_hgvs(hgvs: str) -> str | None: """Approximate codon = ceil(coding_pos / 3). For replay-only PM1 triggering when hgvs_protein wasn't recorded. Returns a synthetic `p.X{codon}X` string that parse_residue can match. """ m = re.search(r"c\.(\d+)[ACGTacgt]>[ACGTacgt]", hgvs) if not m: return None pos = int(m.group(1)) codon = (pos + 2) // 3 # 1-indexed; positions 1-3 → codon 1 return f"p.X{codon}X" import sys # Default to the 1000-variant fixture; override via CLI arg. _DEFAULT_RESULTS = Path("docs/clinical_validation_results_1000.json") RESULTS = Path(sys.argv[1]) if len(sys.argv) > 1 else _DEFAULT_RESULTS PARTITION = { "Pathogenic": {"Pathogenic", "Likely Pathogenic"}, "Likely Pathogenic": {"Pathogenic", "Likely Pathogenic"}, "Uncertain Significance": {"Uncertain Significance"}, "Likely Benign": {"Benign", "Likely Benign"}, "Benign": {"Benign", "Likely Benign"}, } def apply_gating( gene: str, hgvs: str, raw_criteria: list[dict[str, Any]], ) -> list[ACMGCriterion]: gm = lookup(gene) out: list[ACMGCriterion] = [] saved_codes: set[str] = set() # Reconstruct the InSilicoResult that was used originally (best-effort — # only the fields we need for BP7 are recoverable from evidence_text). bp4_entry = next((c for c in raw_criteria if c.get("code") == "BP4"), None) spliceai = None if bp4_entry: m = re.search(r"SpliceAI=([^,\s]+)", bp4_entry.get("evidence_text", "")) if m and m.group(1) not in ("None", "null"): try: spliceai = float(m.group(1)) except ValueError: spliceai = None ins_proxy = InSilicoResult(spliceai_max=spliceai) for c in raw_criteria: if not c.get("triggered"): continue code = c["code"] strength = c["strength"] saved_codes.add(code) # Suppress PVS1 for genes whose mechanism is not LoF if code == "PVS1" and gm and gm.suppress_pvs1: continue # The production score_pvs1 now consequence-gates PVS1 (live VEP # consequence). Don't replicate that in replay — the c.X>Y heuristic # misclassifies start-codon (c.1A>G) and stop-gained-at-wobble # variants as missense, leading to massive false PVS1 suppression. # Re-derive BP4 / PP3 strength from raw REVEL+AM in evidence_text, # so updates to _bp4_strength / _pp3_strength flow through the # replay. (Saved strengths are frozen at the version they were # written against.) if code in ("BP4", "PP3"): ev = c.get("evidence_text", "") or "" m_revel = re.search(r"REVEL=([\d.]+|None)", ev) m_am = re.search(r"AM=([\d.]+|None)", ev) m_spl = re.search(r"SpliceAI=([\d.]+|None)", ev) def _val(m): if not m or m.group(1) == "None": return None try: return float(m.group(1)) except ValueError: return None ins_proxy_local = InSilicoResult( revel=_val(m_revel), alphamissense=_val(m_am), spliceai_max=_val(m_spl), ) if code == "BP4": strength = _engine._bp4_strength(ins_proxy_local) else: strength = _engine._pp3_strength(ins_proxy_local) # Cap BP4 for GOF / altered-function genes if code == "BP4" and gm and gm.bp4_max_strength: strength = cap_strength(strength, gm.bp4_max_strength) if code == "PP3" and gm and gm.pp3_max_strength: strength = cap_strength(strength, gm.pp3_max_strength) out.append( ACMGCriterion( code=code, triggered=True, strength=strength, source=c.get("source") or "", evidence_text=c.get("evidence_text") or "", confidence=c.get("confidence") or "medium", pmid=c.get("pmid"), caveat=c.get("caveat"), ) ) # NEW: also fire PM1 / BP1 / BP7 from the rule engine using the inferred # consequence + protein change. The original validation run was done # before these criteria existed, so we recompute them here. PM1 needs # an HGVS protein string — we extract from the c. coordinates by # converting codon position (best-effort). consequence = _guess_consequence(hgvs) protein = _guess_protein_from_hgvs(hgvs) if "PM1" not in saved_codes: pm1 = _engine.score_pm1( gene_symbol=gene, hgvs_protein=protein, consequence=consequence, ) if pm1: out.append(pm1) if "PS1" not in saved_codes and "PM5" not in saved_codes: # PS1/PM5 need real ref+alt amino acids; the synthetic protein # string from _guess_protein_from_hgvs only encodes residue # number, so it can't drive PS1/PM5 in replay. Live runs will # have the real hgvs_protein from VEP and the criterion will # fire correctly. Skip in replay rather than fire incorrectly. pass if "BP1" not in saved_codes: bp1 = _engine.score_bp1(consequence=consequence, gene_symbol=gene) if bp1: out.append(bp1) if "PP2" not in saved_codes: # PP2 needs the VCEP context to respect pp2_disallowed for # ENIGMA / InSiGHT / TP53. Re-lookup here in the replay path. from backend.app.services.acmg.vcep import lookup_vcep pp2 = _engine.score_pp2( consequence=consequence, gene_symbol=gene, vcep=lookup_vcep(gene), ) if pp2: out.append(pp2) if "BP7" not in saved_codes: bp7 = _engine.score_bp7(consequence=consequence, ins=ins_proxy) if bp7: out.append(bp7) return out def main() -> int: data = json.loads(RESULTS.read_text()) print(f"Source: {RESULTS} (skip_rag={data.get('skip_rag')})") print(f"Original: {data['correct']}/{data['total_scored']} = {data['concordance']:.1%}") print() new_correct = 0 confusion: Counter[str] = Counter() flips: list[dict[str, Any]] = [] new_criterion_fires: Counter[str] = Counter() for r in data["results"]: gene = r.get("gene") or "" expected = r["expected"] original_got = r["got"] original_codes = { c["code"] for c in (r.get("criteria") or []) if c.get("triggered") } criteria = apply_gating(gene, r["hgvs"], r.get("criteria") or []) new_codes = {c.code for c in criteria} for added in new_codes - original_codes: new_criterion_fires[added] += 1 # Use the production combiner so conflict detection + strategy # logic match a live run, not just raw Bayesian summation. classification = combine_criteria(criteria) new_got = classification.significance score = _bayesian_score(criteria) match = new_got in PARTITION.get(expected, set()) if match: new_correct += 1 confusion[f"{expected} -> {new_got}"] += 1 if new_got != original_got: flips.append( { "gene": gene, "hgvs": r["hgvs"], "expected": expected, "before": original_got, "after": new_got, "now_correct": match, } ) total = data["total_scored"] print(f"Replayed: {new_correct}/{total} = {new_correct / total:.1%}") print(f"Delta: {new_correct - data['correct']:+d}") print() print(f"Classification flips: {len(flips)}") for f in flips: marker = "✓" if f["now_correct"] else "✗" print( f" {marker} {f['gene']:8s} {f['hgvs']:30s} " f"expected={f['expected']:25s} {f['before']:22s} → {f['after']}" ) print() print("New confusion matrix:") for k, v in sorted(confusion.items(), key=lambda kv: -kv[1]): print(f" {v:3d} {k}") if new_criterion_fires: print() print("Newly-firing criteria (count of variants):") for code, count in sorted(new_criterion_fires.items()): print(f" {code}: {count} variants") return 0 if __name__ == "__main__": raise SystemExit(main())