gemeo-twin-stack / src /gemeo /protocol_compliance.py
timmers's picture
GEMEO world-model — initial release (module + NeuralSurv ckpt + RareBench v49 + KG embeddings)
089d665 verified
"""Protocol compliance — current management vs PCDT.
Compares the patient's current treatments and investigations against
the PCDT for the confirmed-or-suspected diagnosis. Flags:
- PCDT-required exams not performed
- PCDT-recommended therapies not initiated
- Therapies in use that are NOT in the PCDT (off-protocol)
- Specialist referrals that the PCDT requires
Returns a `ProtocolComplianceSpec` with a 0..1 compliance score and
itemised gaps.
"""
from __future__ import annotations
import logging
import re
from typing import Optional
from .types import ProtocolComplianceSpec, ComplianceGap
logger = logging.getLogger("gemeo.protocol_compliance")
def _extract_protocol_items(pcdt: dict, key_options: list) -> list[str]:
"""Pull a flat list of strings from a PCDT field that may be list or text."""
if not pcdt:
return []
out = []
for k in key_options:
v = pcdt.get(k)
if isinstance(v, list):
out.extend(str(x) for x in v if x)
elif isinstance(v, str) and v.strip():
for line in re.split(r"[\n;]", v):
line = line.strip(" -•·\t")
if line:
out.append(line)
return [x for x in out if x]
def _matches_any(target: str, candidates: list[str]) -> bool:
t = (target or "").lower()
if not t:
return False
return any(t in c.lower() or c.lower() in t for c in candidates)
def assess(
*,
orpha: Optional[str],
current_treatments: list = None,
current_labs: list = None,
current_imaging: list = None,
) -> ProtocolComplianceSpec:
"""Compute compliance with the PCDT for `orpha`."""
if not orpha:
return ProtocolComplianceSpec(disease_orpha=None, score=0.0, gaps=[], notes="No diagnosis provided.")
try:
from brazilian_context import get_pcdt
except ImportError:
return ProtocolComplianceSpec(disease_orpha=orpha, score=0.0, gaps=[], notes="brazilian_context unavailable.")
pcdt = None
try:
pcdt = get_pcdt(orpha)
except Exception as e:
logger.debug(f"get_pcdt failed: {e}")
if not pcdt:
return ProtocolComplianceSpec(disease_orpha=orpha, score=0.0, gaps=[], notes=f"No PCDT for ORPHA:{orpha}.")
pcdt_therapies = _extract_protocol_items(pcdt, ["therapies", "medicamentos", "tratamento"])
pcdt_exams = _extract_protocol_items(pcdt, ["exames", "investigacao", "diagnostico", "criterios"])
pcdt_referrals = _extract_protocol_items(pcdt, ["especialidades", "especialistas", "encaminhamento"])
current_tx_names = [
(t.get("name") if isinstance(t, dict) else str(t)) for t in (current_treatments or [])
]
current_tx_names = [n for n in current_tx_names if n]
current_lab_names = [
(l.get("test") if isinstance(l, dict) else str(l)) for l in (current_labs or [])
]
current_lab_names = [n for n in current_lab_names if n]
current_imaging_names = [
((i.get("modality") or "") + " " + (i.get("finding") or "") if isinstance(i, dict) else str(i))
for i in (current_imaging or [])
]
gaps: list[ComplianceGap] = []
n_required = len(pcdt_therapies) + len(pcdt_exams) + len(pcdt_referrals)
n_satisfied = 0
for tx in pcdt_therapies:
if _matches_any(tx, current_tx_names):
n_satisfied += 1
else:
gaps.append(ComplianceGap(
category="therapy",
expected=tx,
priority="high",
rationale=f"PCDT recommends {tx}; not present in current treatments.",
))
for ex in pcdt_exams:
if _matches_any(ex, current_lab_names + current_imaging_names):
n_satisfied += 1
else:
gaps.append(ComplianceGap(
category="exam",
expected=ex,
priority="medium",
rationale=f"PCDT requires/expects {ex}; no matching lab/imaging recorded.",
))
for r in pcdt_referrals:
# we don't have specialist referral state; surface as soft gap
gaps.append(ComplianceGap(
category="referral",
expected=r,
priority="medium",
rationale=f"PCDT requires specialist referral: {r}.",
))
# Off-protocol therapies (in use but not in PCDT list)
for t in current_tx_names:
if pcdt_therapies and not _matches_any(t, pcdt_therapies):
gaps.append(ComplianceGap(
category="off_protocol",
expected="(remove or justify)",
got=t,
priority="low",
rationale=f"Therapy `{t}` is not part of the PCDT for this disease.",
))
score = (n_satisfied / max(1, n_required)) if n_required else 0.0
return ProtocolComplianceSpec(
disease_orpha=orpha,
score=round(score, 3),
gaps=gaps,
notes=(
f"PCDT items: {n_required} required, {n_satisfied} satisfied. "
f"{len([g for g in gaps if g.category != 'off_protocol'])} gaps + "
f"{len([g for g in gaps if g.category == 'off_protocol'])} off-protocol items."
),
)