| """Risk + survival. |
| |
| Bootstrap: wraps `risk_quantifier.quantify_risks` (rule-based scoring over |
| disease severity + diagnostic certainty + progression + treatment urgency |
| + Brazilian context). |
| |
| Phase 2: NeuralSurv on KG (gemeo/train/neuralsurv.py) — Bayesian survival |
| with epistemic uncertainty over the patient's KG-walk feature vector. |
| """ |
| from __future__ import annotations |
| import logging |
| import os |
| import math |
| from typing import Optional |
|
|
| from .types import RiskSpec |
|
|
| logger = logging.getLogger("gemeo.risk") |
|
|
| NEURALSURV_CKPT = os.environ.get( |
| "GEMEO_NEURALSURV_CKPT", |
| os.path.join(os.path.dirname(__file__), "artifacts", "neuralsurv.pt"), |
| ) |
|
|
|
|
| def _approx_survival_from_severity(severity: float, horizons_months: list[int] = None) -> list: |
| """Bootstrap survival curve when no NeuralSurv model is available. |
| |
| Maps severity (0..1) to a hazard rate via a simple monotonic transform and |
| integrates an exponential survival model. CI is heuristic (±15% relative). |
| """ |
| horizons_months = horizons_months or [3, 6, 12, 24, 36, 60] |
| severity = max(0.0, min(1.0, float(severity))) |
| |
| hazard_per_year = 0.01 + 0.49 * (severity ** 1.5) |
| points = [] |
| for m in horizons_months: |
| years = m / 12.0 |
| p = math.exp(-hazard_per_year * years) |
| |
| spread = min(0.4, 0.05 + 0.05 * years) |
| points.append({ |
| "month": m, |
| "p_alive": round(p, 4), |
| "ci_low": round(max(0.0, p - spread), 4), |
| "ci_high": round(min(1.0, p + spread), 4), |
| }) |
| return points |
|
|
|
|
| async def _try_neuralsurv(space, embedding): |
| if not os.path.exists(NEURALSURV_CKPT): |
| return None |
| try: |
| from .train import neuralsurv as ns_mod |
| return await ns_mod.predict(space, embedding, NEURALSURV_CKPT) |
| except Exception as e: |
| logger.warning(f"NeuralSurv predict failed: {e}") |
| return None |
|
|
|
|
| async def assess(space, embedding=None) -> RiskSpec: |
| """Compute the risk profile of the digital twin.""" |
|
|
| |
| ns_spec = await _try_neuralsurv(space, embedding) |
| if ns_spec is not None: |
| |
| |
| |
| |
| |
| |
| |
| |
| SEVERE_FLOOR = { |
| "100": 0.65, "646": 0.65, "355": 0.55, "324": 0.55, |
| "365": 0.85, "579": 0.65, "580": 0.65, "70": 0.95, |
| "905": 0.55, "98896": 0.70, "586": 0.65, "95": 0.60, |
| "183660": 0.85, "778": 0.70, "636": 0.50, "558": 0.55, |
| } |
| top_orpha = None; top_prob = 0.0 |
| for hyp in (getattr(space, "_hypotheses", {}) or {}).values(): |
| p = float(getattr(hyp, "probability", 0) or 0) |
| orpha = getattr(hyp, "orpha_code", None) |
| status = getattr(hyp, "status", "") |
| if p > top_prob and status in ("active", "supported", "confirmed") and orpha: |
| top_prob = p; top_orpha = orpha |
| if top_orpha and top_prob >= 0.5 and top_orpha in SEVERE_FLOOR: |
| ns_spec.overall_severity = max(ns_spec.overall_severity, SEVERE_FLOOR[top_orpha]) |
| ns_spec.progression_risk = max(ns_spec.progression_risk, 0.55) |
| ns_spec.treatment_urgency = max(ns_spec.treatment_urgency, 0.65) |
| return ns_spec |
|
|
| |
| severity = 0.0 |
| progression = 0.0 |
| urgency = 0.0 |
| complications = [] |
| try: |
| from risk_quantifier import quantify_risks |
| profile = await quantify_risks(space) |
| if profile is None: |
| pass |
| elif isinstance(profile, dict): |
| severity = float(profile.get("overall_severity", 0.0) or 0.0) |
| progression = float(profile.get("progression_risk", 0.0) or 0.0) |
| urgency = float(profile.get("treatment_urgency", 0.0) or 0.0) |
| complications = profile.get("top_complications", []) or [] |
| else: |
| severity = float(getattr(profile, "overall_severity", 0.0) or 0.0) |
| progression = float(getattr(profile, "progression_risk", 0.0) or 0.0) |
| urgency = float(getattr(profile, "treatment_urgency", 0.0) or 0.0) |
| comps = getattr(profile, "top_complications", []) or [] |
| for c in comps[:8]: |
| if isinstance(c, dict): |
| complications.append({ |
| "name": c.get("name"), |
| "prob": c.get("probability"), |
| "when": c.get("expected_in_months") or c.get("expected_in"), |
| }) |
| else: |
| complications.append({ |
| "name": getattr(c, "name", None), |
| "prob": getattr(c, "probability", None), |
| "when": getattr(c, "expected_in_months", None), |
| }) |
| except Exception as e: |
| logger.debug(f"risk_quantifier failed: {e}") |
|
|
| |
| |
| |
| |
| try: |
| snap = space.get_current_snapshot() if hasattr(space, "get_current_snapshot") else None |
| n_pheno = len(snap.phenotypes) if snap else 0 |
| |
| SEVERE_FLOOR = { |
| "100": 0.65, |
| "646": 0.65, |
| "355": 0.55, |
| "324": 0.55, |
| "365": 0.85, |
| "579": 0.65, |
| "580": 0.65, |
| "70": 0.95, |
| "905": 0.55, |
| "98896": 0.70, |
| "586": 0.65, |
| "95": 0.60, |
| "183660":0.85, |
| "778": 0.70, |
| } |
| top_orpha = None |
| top_prob = 0.0 |
| for hyp in (getattr(space, "_hypotheses", {}) or {}).values(): |
| p = float(getattr(hyp, "probability", 0) or 0) |
| orpha = getattr(hyp, "orpha_code", None) |
| status = getattr(hyp, "status", "") |
| if p > top_prob and status in ("active", "supported", "confirmed") and orpha: |
| top_prob = p |
| top_orpha = orpha |
| if top_orpha and top_prob >= 0.5 and top_orpha in SEVERE_FLOOR: |
| |
| |
| severity = max(severity, SEVERE_FLOOR[top_orpha]) |
| progression = max(progression, 0.55) |
| urgency = max(urgency, 0.65) |
| elif severity == 0 and progression == 0 and urgency == 0: |
| |
| severity = min(0.6, 0.05 + 0.02 * n_pheno) |
| except Exception as e: |
| logger.debug(f"severity boost failed: {e}") |
| if severity == 0: |
| severity = 0.2 |
|
|
| survival = _approx_survival_from_severity(severity) |
|
|
| return RiskSpec( |
| overall_severity=severity, |
| progression_risk=progression, |
| treatment_urgency=urgency, |
| survival_curve=survival, |
| top_complications=complications[:8], |
| model="rule_based" if not _try_neuralsurv else "rule_based", |
| ) |
|
|