Spaces:
Sleeping
Sleeping
| """decision_engine.py — PeVe v1.1 Deterministic Synthesis Engine""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| import numpy as np | |
| from config import ( | |
| PEVE_VERSION, THRESHOLD_VERSION, | |
| SPLICE_PROB_HIGH, SPLICE_PROB_MODERATE, SPLICE_PROB_WEAK, | |
| SPLICE_SIGNAL_MIN, SPLICE_DOMINANT_MIN, | |
| ACTIVATION_NORM_HIGH, ACTIVATION_NORM_MODERATE, ACTIVATION_NORM_WEAK, | |
| CONTEXT_ACTIVE_MIN, BIOCHEMICAL_RISK_ACTIVE, | |
| AF_RARITY_THRESHOLD, AF_HIGH_CONFLICT, | |
| BOUNDARY_TOLERANCE, WINDOW_BP, PEAK_OFF_CENTER_FRAC, | |
| ) | |
| from prefilter import VariantClass | |
| from af_handler import AFResult, AF_NUMERIC, AF_ZERO, AF_UNKNOWN, AF_UNCERTAIN | |
| # ── Raw layer outputs ───────────────────────────────────── | |
| class SpliceLayerOutput: | |
| splice_prob: float | |
| splice_signal_strength: float | |
| counterfactual_delta: float | |
| saliency_map: Optional[object] | |
| model_available: bool = True | |
| class ContextLayerOutput: | |
| context_pathogenic_prob: float | |
| activation_norm: float | |
| activation_peak_position: int | |
| importance_score: float | |
| model_available: bool = True | |
| class ProteinLayerOutput: | |
| biochemical_risk_score: float | |
| feature_pathogenic_prob: float | |
| shap_feature_contributions: dict | |
| l3_substitution_valid: bool | |
| model_available: bool = True | |
| # ── Band classifiers ────────────────────────────────────── | |
| def _splice_band(p): | |
| if p >= SPLICE_PROB_HIGH: return "High" | |
| if p >= SPLICE_PROB_MODERATE: return "Moderate" | |
| if p >= SPLICE_PROB_WEAK: return "Weak" | |
| return "Inactive" | |
| def _context_band(n): | |
| if n >= ACTIVATION_NORM_HIGH: return "High" | |
| if n >= ACTIVATION_NORM_MODERATE: return "Moderate" | |
| if n >= ACTIVATION_NORM_WEAK: return "Weak" | |
| return "Inactive" | |
| def _near(val, thresh): return abs(val - thresh) <= BOUNDARY_TOLERANCE | |
| def _off_center(pos): return abs(pos - WINDOW_BP//2) > int(WINDOW_BP * PEAK_OFF_CENTER_FRAC) | |
| # ── Activation levels ───────────────────────────────────── | |
| class ActivationLevels: | |
| splice_band: str; rna_active: bool; rna_dominant: bool | |
| context_band: str; context_active: bool | |
| protein_active: bool; l3_valid: bool | |
| rna_boundary: bool; context_boundary: bool; protein_boundary: bool | |
| def compute_activation_levels(splice, context, protein, af_result): | |
| s_band = _splice_band(splice.splice_prob) | |
| rna_active = splice.splice_prob >= SPLICE_PROB_MODERATE and splice.splice_signal_strength >= SPLICE_SIGNAL_MIN | |
| rna_dominant = splice.splice_prob >= SPLICE_DOMINANT_MIN | |
| c_band = _context_band(context.activation_norm) | |
| ctx_active = context.activation_norm >= CONTEXT_ACTIVE_MIN | |
| prot_active = (protein.l3_substitution_valid and | |
| protein.biochemical_risk_score >= BIOCHEMICAL_RISK_ACTIVE and | |
| af_result.satisfies_rarity()) | |
| rna_b = _near(splice.splice_prob, SPLICE_PROB_MODERATE) or _near(splice.splice_prob, SPLICE_DOMINANT_MIN) or _near(splice.splice_signal_strength, SPLICE_SIGNAL_MIN) | |
| ctx_b = _near(context.activation_norm, CONTEXT_ACTIVE_MIN) or _near(context.activation_norm, ACTIVATION_NORM_HIGH) | |
| pro_b = _near(protein.biochemical_risk_score, BIOCHEMICAL_RISK_ACTIVE) | |
| return ActivationLevels(s_band, rna_active, rna_dominant, c_band, ctx_active, prot_active, | |
| protein.l3_substitution_valid, rna_b, ctx_b, pro_b) | |
| # ── Conflict detection ──────────────────────────────────── | |
| class ConflictReport: | |
| major_conflicts: list = field(default_factory=list) | |
| minor_conflicts: list = field(default_factory=list) | |
| requires_manual_review: bool = False | |
| conflict_score_major: int = 0 | |
| conflict_score_minor: int = 0 | |
| def compute_review_flag(self): | |
| self.conflict_score_major = len(self.major_conflicts) | |
| self.conflict_score_minor = len(self.minor_conflicts) | |
| self.requires_manual_review = self.conflict_score_major >= 1 or self.conflict_score_minor >= 2 | |
| def detect_conflicts(splice, context, protein, af_result, activation, variant_class): | |
| r = ConflictReport() | |
| if splice.splice_prob >= SPLICE_PROB_HIGH and af_result.triggers_high_af_conflict(): | |
| r.major_conflicts.append( | |
| f"MAJOR: High splice_prob ({splice.splice_prob:.3f}) + common variant (AF={af_result.global_af:.5f}). " | |
| "Splice-disrupting variant unlikely at this population frequency.") | |
| if (protein.l3_substitution_valid and protein.biochemical_risk_score >= BIOCHEMICAL_RISK_ACTIVE | |
| and af_result.triggers_high_af_conflict()): | |
| r.major_conflicts.append( | |
| f"MAJOR: High biochemical risk ({protein.biochemical_risk_score:.3f}) + common variant " | |
| f"(AF={af_result.global_af:.5f}). Common biochemically disruptive variants are typically tolerated.") | |
| if variant_class.variant_class == "canonical_splice" and not activation.rna_active: | |
| r.major_conflicts.append( | |
| f"MAJOR: Canonical splice site ({variant_class.raw_consequence}) but RNA model inactive " | |
| f"(splice_prob={splice.splice_prob:.3f}). Model/annotation disagreement.") | |
| bnd = [] | |
| if activation.rna_boundary: bnd.append(f"splice_prob({splice.splice_prob:.3f})/signal({splice.splice_signal_strength:.3f})") | |
| if activation.context_boundary: bnd.append(f"activation_norm({context.activation_norm:.3f})") | |
| if activation.protein_boundary: bnd.append(f"biochemical_risk({protein.biochemical_risk_score:.3f})") | |
| if bnd: r.minor_conflicts.append(f"MINOR: Boundary proximity — {'; '.join(bnd)} within ±{BOUNDARY_TOLERANCE}.") | |
| if _off_center(context.activation_peak_position): | |
| offset = abs(context.activation_peak_position - WINDOW_BP//2) | |
| r.minor_conflicts.append(f"MINOR: Activation peak {offset}bp from mutation centre (pos={context.activation_peak_position}).") | |
| if activation.context_active and variant_class.raw_consequence in { | |
| "synonymous_variant","intron_variant","upstream_gene_variant","downstream_gene_variant"}: | |
| r.minor_conflicts.append( | |
| f"MINOR: Context active (norm={context.activation_norm:.3f}) but VEP='{variant_class.raw_consequence}' (low impact).") | |
| if af_result.state in {AF_UNKNOWN, AF_UNCERTAIN}: | |
| r.minor_conflicts.append(f"MINOR: AF state={af_result.state} — rarity unconfirmed.") | |
| r.compute_review_flag() | |
| return r | |
| # ── Mechanism constants ─────────────────────────────────── | |
| DOMINANT_RNA = "RNA_Splicing" | |
| DOMINANT_PROTEIN = "Protein_Biochemical" | |
| DOMINANT_CONTEXT = "Sequence_Context" | |
| DOMINANT_AMBIGUITY = "Mechanism_Ambiguity" | |
| DOMINANT_TRUNCATION = "Protein_Truncation" | |
| DOMINANT_INSUFFICIENT = "Insufficient_Evidence" | |
| DOMINANT_OOS = "Out_Of_Scope" | |
| DOMINANT_CONFLICT_REVIEW = "Conflict_Manual_Review" | |
| # ── Synthesis result ────────────────────────────────────── | |
| class SynthesisResult: | |
| dominant_mechanism: str | |
| final_classification: str | |
| supporting_mechanisms: list | |
| activation_levels: ActivationLevels | |
| conflict_report: ConflictReport | |
| reasoning_steps: list | |
| transcript_ambiguity: bool | |
| af_uncertainty: bool | |
| version: str = PEVE_VERSION | |
| threshold_version: str = THRESHOLD_VERSION | |
| def _mkr(dom, cls, sup, act, conf, steps, vc, af): | |
| return SynthesisResult(dom, cls, sup, act, conf, steps, | |
| vc.transcript_conflict, af.state in {AF_UNKNOWN, AF_UNCERTAIN}) | |
| # ── Main synthesis ──────────────────────────────────────── | |
| def synthesize(splice, context, protein, af_result, variant_class): | |
| act = compute_activation_levels(splice, context, protein, af_result) | |
| conf = detect_conflicts(splice, context, protein, af_result, act, variant_class) | |
| steps = [] | |
| sup = [] | |
| # Conflict override | |
| if conf.requires_manual_review and conf.conflict_score_major >= 1: | |
| steps.append(f"CONFLICT OVERRIDE: {conf.conflict_score_major} major conflict(s). Classification suppressed.") | |
| return _mkr(DOMINANT_CONFLICT_REVIEW, "Conflict — Manual Review Required", [], act, conf, steps, variant_class, af_result) | |
| # Out of scope | |
| if variant_class.out_of_scope: | |
| steps.append(f"Variant class '{variant_class.variant_class}' is outside PeVe v1.1 scope.") | |
| return _mkr(DOMINANT_OOS, "Out of Scope — See Flags", [], act, conf, steps, variant_class, af_result) | |
| # Truncation gate | |
| if variant_class.variant_class in {"frameshift","stop_gained","start_lost"}: | |
| steps.append(f"Variant class '{variant_class.variant_class}' — protein truncation. L3 substitution metrics excluded.") | |
| if act.rna_active: | |
| steps.append(f"RNA also active (splice_prob={splice.splice_prob:.3f}) — possible NMD-relevant splice signal.") | |
| sup.append(DOMINANT_RNA) | |
| return _mkr(DOMINANT_TRUNCATION, "Protein Truncation", sup, act, conf, steps, variant_class, af_result) | |
| if variant_class.transcript_conflict: | |
| steps.append("Transcript conflict: consequence differs across transcripts. Both mechanisms elevated.") | |
| # Rule 1: RNA High → dominant | |
| if act.rna_dominant: | |
| steps.append(f"RULE 1: RNA HIGH (splice_prob={splice.splice_prob:.3f}≥{SPLICE_DOMINANT_MIN}, signal={splice.splice_signal_strength:.3f}). RNA dominant.") | |
| if act.protein_active: sup.append(DOMINANT_PROTEIN); steps.append(f" Supporting: Protein active (risk={protein.biochemical_risk_score:.3f}).") | |
| if act.context_active: sup.append(DOMINANT_CONTEXT); steps.append(f" Supporting: Context active (norm={context.activation_norm:.3f}).") | |
| return _mkr(DOMINANT_RNA, "Pathogenic — RNA Splice Mechanism", sup, act, conf, steps, variant_class, af_result) | |
| # Rule 1b: RNA Moderate + Protein Active → ambiguity | |
| if act.rna_active and act.protein_active: | |
| steps.append(f"RULE 1b: RNA MODERATE (splice_prob={splice.splice_prob:.3f}) + Protein ACTIVE (risk={protein.biochemical_risk_score:.3f}). Mechanism Ambiguity.") | |
| return _mkr(DOMINANT_AMBIGUITY, "Mechanism Ambiguity — Manual Review Recommended", | |
| [DOMINANT_RNA, DOMINANT_PROTEIN], act, conf, steps, variant_class, af_result) | |
| # Rule 2: Protein dominant | |
| if act.protein_active: | |
| steps.append(f"RULE 2: RNA inactive. Protein ACTIVE (risk={protein.biochemical_risk_score:.3f}, AF={af_result.global_af}).") | |
| if act.context_active: sup.append(DOMINANT_CONTEXT); steps.append(f" Supporting: Context active (norm={context.activation_norm:.3f}).") | |
| if act.rna_active: | |
| sup.append(DOMINANT_RNA) | |
| steps.append(f" Note: Moderate RNA signal present (splice_prob={splice.splice_prob:.3f}). mechanism_ambiguity_flag added.") | |
| conf.minor_conflicts.append("MINOR: Moderate RNA signal alongside Protein-dominant call.") | |
| conf.compute_review_flag() | |
| return _mkr(DOMINANT_PROTEIN, "Pathogenic — Protein Biochemical Mechanism", sup, act, conf, steps, variant_class, af_result) | |
| # Rule 3: Context dominant | |
| if act.context_active: | |
| if variant_class.variant_class == "substitution_synonymous": | |
| steps.append(f"RULE 3 BLOCKED: Context active but synonymous variant — context alone cannot classify pathogenic.") | |
| else: | |
| steps.append(f"RULE 3: RNA+Protein inactive. Context ACTIVE (norm={context.activation_norm:.3f}).") | |
| return _mkr(DOMINANT_CONTEXT, "Uncertain — Sequence Context Signal Only", [], act, conf, steps, variant_class, af_result) | |
| # Rule 4: Insufficient evidence | |
| steps.append( | |
| f"RULE 4: No mechanism active. RNA={act.splice_band} Context={act.context_band} " | |
| f"Protein active={act.protein_active} (L3 valid={act.l3_valid}, rare={af_result.satisfies_rarity()})." | |
| ) | |
| if conf.requires_manual_review: | |
| steps.append(f"Minor conflict threshold reached ({conf.conflict_score_minor} minor). Upgrading to Review.") | |
| return _mkr(DOMINANT_CONFLICT_REVIEW, "Conflict — Manual Review Required", [], act, conf, steps, variant_class, af_result) | |
| return _mkr(DOMINANT_INSUFFICIENT, "Likely Benign or Insufficient Evidence", [], act, conf, steps, variant_class, af_result) | |
| # ── Narrative builder ───────────────────────────────────── | |
| def build_narrative(result, splice, context, protein, af_result, variant_class): | |
| lines = [f"PeVe v{PEVE_VERSION} Structured Reasoning Narrative", "="*60] | |
| lines.append(f"Variant class: {variant_class.variant_class.replace('_',' ').title()}") | |
| lines.append(f"RNA: splice_prob={splice.splice_prob:.3f} (band={result.activation_levels.splice_band}), " | |
| f"signal={splice.splice_signal_strength:.3f}. " | |
| + ("ACTIVE." if result.activation_levels.rna_active else "INACTIVE.")) | |
| lines.append(f"Context: activation_norm={context.activation_norm:.3f} (band={result.activation_levels.context_band}). " | |
| + ("ACTIVE." if result.activation_levels.context_active else "INACTIVE.")) | |
| if result.activation_levels.l3_valid: | |
| af_str = f"AF={af_result.global_af:.6f}" if af_result.global_af is not None else f"AF_state={af_result.state}" | |
| lines.append(f"Protein: biochemical_risk={protein.biochemical_risk_score:.3f}, {af_str}. " | |
| + ("ACTIVE." if result.activation_levels.protein_active else "INACTIVE.")) | |
| else: | |
| lines.append("Protein substitution metrics: NOT APPLICABLE for this variant class.") | |
| lines.append("") | |
| lines.append(f"Dominant mechanism: {result.dominant_mechanism.replace('_',' ')}") | |
| lines.append(f"Final classification: {result.final_classification}") | |
| if result.supporting_mechanisms: | |
| lines.append(f"Supporting: {', '.join(m.replace('_',' ') for m in result.supporting_mechanisms)}") | |
| if result.conflict_report.major_conflicts: | |
| lines.append("\nMAJOR CONFLICTS:") | |
| lines.extend(f" • {c}" for c in result.conflict_report.major_conflicts) | |
| if result.conflict_report.minor_conflicts: | |
| lines.append("MINOR CONFLICTS / BOUNDARY FLAGS:") | |
| lines.extend(f" • {c}" for c in result.conflict_report.minor_conflicts) | |
| if result.transcript_ambiguity: | |
| lines.append("⚠ Transcript conflict: consequence differs across transcripts.") | |
| if variant_class.flags: | |
| lines.append("\nPre-filter flags:") | |
| lines.extend(f" • {f}" for f in variant_class.flags) | |
| if result.conflict_report.requires_manual_review: | |
| lines.append("\n⛔ MANUAL REVIEW REQUIRED.") | |
| lines.append("="*60) | |
| lines.append(f"PeVe v{PEVE_VERSION} | Thresholds {THRESHOLD_VERSION} | No probability averaging.") | |
| return "\n".join(lines) | |