varientlens / scripts /replay_validation.py
Codex
Error-analysis writeup + replay improvements
767f344
"""Replay the saved validation results through the updated rule engine.
This is a deterministic, no-external-calls re-validation: it takes the
criteria already recorded in docs/clinical_validation_results.json,
applies the new gene-mechanism gating (suppress PVS1 for GOF genes, cap
BP4 strength for genes where in-silico predictors are unreliable), then
re-runs the Bayesian combiner.
Useful as a fast sanity check before committing to a full pipeline re-run.
"""
from __future__ import annotations
import json
import re
from collections import Counter
from pathlib import Path
from typing import Any
from backend.app.services.acmg.combiner import (
_bayesian_score,
_bayesian_significance,
combine_criteria,
)
from backend.app.services.acmg.gene_mechanisms import cap_strength, lookup
from backend.app.services.acmg.rules import RuleEngine
from backend.app.schemas.evidence import ACMGCriterion, InSilicoResult
_engine = RuleEngine()
def _guess_consequence(hgvs: str) -> str | None:
"""Heuristic consequence inference from HGVS — only used to drive BP1/
BP7/PP2 in the replay since the saved JSON doesn't carry consequence.
Live runs use the real VEP-derived field.
Codon position 3 (the wobble) is the redundant position in most codons
— transitions there (G↔A, C↔T) are synonymous ~70% of the time per
the standard genetic code. Without translation we treat wobble-position
transitions as synonymous, which avoids spurious PP2 fires on benign
silent variants. This is a replay-only heuristic; the live pipeline
uses VEP and is not affected.
"""
h = hgvs.lower()
if "del" in h and "_" in h:
return "inframe_deletion"
if "dup" in h or ("ins" in h and "_" in h):
return "inframe_insertion"
if h.endswith("=") or "%3d" in h:
return "synonymous_variant"
m = re.search(r"c\.(\d+)([acgt])>([acgt])", h)
if m:
pos, ref, alt = int(m.group(1)), m.group(2), m.group(3)
codon_pos = ((pos - 1) % 3) + 1
transitions = {("g","a"), ("a","g"), ("c","t"), ("t","c")}
if codon_pos == 3 and (ref, alt) in transitions:
return "synonymous_variant"
return "missense_variant"
return None
def _guess_protein_from_hgvs(hgvs: str) -> str | None:
"""Approximate codon = ceil(coding_pos / 3). For replay-only PM1
triggering when hgvs_protein wasn't recorded. Returns a synthetic
`p.X{codon}X` string that parse_residue can match.
"""
m = re.search(r"c\.(\d+)[ACGTacgt]>[ACGTacgt]", hgvs)
if not m:
return None
pos = int(m.group(1))
codon = (pos + 2) // 3 # 1-indexed; positions 1-3 → codon 1
return f"p.X{codon}X"
import sys
# Default to the 1000-variant fixture; override via CLI arg.
_DEFAULT_RESULTS = Path("docs/clinical_validation_results_1000.json")
RESULTS = Path(sys.argv[1]) if len(sys.argv) > 1 else _DEFAULT_RESULTS
PARTITION = {
"Pathogenic": {"Pathogenic", "Likely Pathogenic"},
"Likely Pathogenic": {"Pathogenic", "Likely Pathogenic"},
"Uncertain Significance": {"Uncertain Significance"},
"Likely Benign": {"Benign", "Likely Benign"},
"Benign": {"Benign", "Likely Benign"},
}
def apply_gating(
gene: str,
hgvs: str,
raw_criteria: list[dict[str, Any]],
) -> list[ACMGCriterion]:
gm = lookup(gene)
out: list[ACMGCriterion] = []
saved_codes: set[str] = set()
# Reconstruct the InSilicoResult that was used originally (best-effort —
# only the fields we need for BP7 are recoverable from evidence_text).
bp4_entry = next((c for c in raw_criteria if c.get("code") == "BP4"), None)
spliceai = None
if bp4_entry:
m = re.search(r"SpliceAI=([^,\s]+)", bp4_entry.get("evidence_text", ""))
if m and m.group(1) not in ("None", "null"):
try:
spliceai = float(m.group(1))
except ValueError:
spliceai = None
ins_proxy = InSilicoResult(spliceai_max=spliceai)
for c in raw_criteria:
if not c.get("triggered"):
continue
code = c["code"]
strength = c["strength"]
saved_codes.add(code)
# Suppress PVS1 for genes whose mechanism is not LoF
if code == "PVS1" and gm and gm.suppress_pvs1:
continue
# The production score_pvs1 now consequence-gates PVS1 (live VEP
# consequence). Don't replicate that in replay — the c.X>Y heuristic
# misclassifies start-codon (c.1A>G) and stop-gained-at-wobble
# variants as missense, leading to massive false PVS1 suppression.
# Re-derive BP4 / PP3 strength from raw REVEL+AM in evidence_text,
# so updates to _bp4_strength / _pp3_strength flow through the
# replay. (Saved strengths are frozen at the version they were
# written against.)
if code in ("BP4", "PP3"):
ev = c.get("evidence_text", "") or ""
m_revel = re.search(r"REVEL=([\d.]+|None)", ev)
m_am = re.search(r"AM=([\d.]+|None)", ev)
m_spl = re.search(r"SpliceAI=([\d.]+|None)", ev)
def _val(m):
if not m or m.group(1) == "None":
return None
try: return float(m.group(1))
except ValueError: return None
ins_proxy_local = InSilicoResult(
revel=_val(m_revel),
alphamissense=_val(m_am),
spliceai_max=_val(m_spl),
)
if code == "BP4":
strength = _engine._bp4_strength(ins_proxy_local)
else:
strength = _engine._pp3_strength(ins_proxy_local)
# Cap BP4 for GOF / altered-function genes
if code == "BP4" and gm and gm.bp4_max_strength:
strength = cap_strength(strength, gm.bp4_max_strength)
if code == "PP3" and gm and gm.pp3_max_strength:
strength = cap_strength(strength, gm.pp3_max_strength)
out.append(
ACMGCriterion(
code=code,
triggered=True,
strength=strength,
source=c.get("source") or "",
evidence_text=c.get("evidence_text") or "",
confidence=c.get("confidence") or "medium",
pmid=c.get("pmid"),
caveat=c.get("caveat"),
)
)
# NEW: also fire PM1 / BP1 / BP7 from the rule engine using the inferred
# consequence + protein change. The original validation run was done
# before these criteria existed, so we recompute them here. PM1 needs
# an HGVS protein string — we extract from the c. coordinates by
# converting codon position (best-effort).
consequence = _guess_consequence(hgvs)
protein = _guess_protein_from_hgvs(hgvs)
if "PM1" not in saved_codes:
pm1 = _engine.score_pm1(
gene_symbol=gene, hgvs_protein=protein, consequence=consequence,
)
if pm1:
out.append(pm1)
if "PS1" not in saved_codes and "PM5" not in saved_codes:
# PS1/PM5 need real ref+alt amino acids; the synthetic protein
# string from _guess_protein_from_hgvs only encodes residue
# number, so it can't drive PS1/PM5 in replay. Live runs will
# have the real hgvs_protein from VEP and the criterion will
# fire correctly. Skip in replay rather than fire incorrectly.
pass
if "BP1" not in saved_codes:
bp1 = _engine.score_bp1(consequence=consequence, gene_symbol=gene)
if bp1:
out.append(bp1)
if "PP2" not in saved_codes:
# PP2 needs the VCEP context to respect pp2_disallowed for
# ENIGMA / InSiGHT / TP53. Re-lookup here in the replay path.
from backend.app.services.acmg.vcep import lookup_vcep
pp2 = _engine.score_pp2(
consequence=consequence,
gene_symbol=gene,
vcep=lookup_vcep(gene),
)
if pp2:
out.append(pp2)
if "BP7" not in saved_codes:
bp7 = _engine.score_bp7(consequence=consequence, ins=ins_proxy)
if bp7:
out.append(bp7)
return out
def main() -> int:
data = json.loads(RESULTS.read_text())
print(f"Source: {RESULTS} (skip_rag={data.get('skip_rag')})")
print(f"Original: {data['correct']}/{data['total_scored']} = {data['concordance']:.1%}")
print()
new_correct = 0
confusion: Counter[str] = Counter()
flips: list[dict[str, Any]] = []
new_criterion_fires: Counter[str] = Counter()
for r in data["results"]:
gene = r.get("gene") or ""
expected = r["expected"]
original_got = r["got"]
original_codes = {
c["code"] for c in (r.get("criteria") or []) if c.get("triggered")
}
criteria = apply_gating(gene, r["hgvs"], r.get("criteria") or [])
new_codes = {c.code for c in criteria}
for added in new_codes - original_codes:
new_criterion_fires[added] += 1
# Use the production combiner so conflict detection + strategy
# logic match a live run, not just raw Bayesian summation.
classification = combine_criteria(criteria)
new_got = classification.significance
score = _bayesian_score(criteria)
match = new_got in PARTITION.get(expected, set())
if match:
new_correct += 1
confusion[f"{expected} -> {new_got}"] += 1
if new_got != original_got:
flips.append(
{
"gene": gene,
"hgvs": r["hgvs"],
"expected": expected,
"before": original_got,
"after": new_got,
"now_correct": match,
}
)
total = data["total_scored"]
print(f"Replayed: {new_correct}/{total} = {new_correct / total:.1%}")
print(f"Delta: {new_correct - data['correct']:+d}")
print()
print(f"Classification flips: {len(flips)}")
for f in flips:
marker = "✓" if f["now_correct"] else "✗"
print(
f" {marker} {f['gene']:8s} {f['hgvs']:30s} "
f"expected={f['expected']:25s} {f['before']:22s}{f['after']}"
)
print()
print("New confusion matrix:")
for k, v in sorted(confusion.items(), key=lambda kv: -kv[1]):
print(f" {v:3d} {k}")
if new_criterion_fires:
print()
print("Newly-firing criteria (count of variants):")
for code, count in sorted(new_criterion_fires.items()):
print(f" {code}: {count} variants")
return 0
if __name__ == "__main__":
raise SystemExit(main())