| """Protocol compliance — current management vs PCDT. |
| |
| Compares the patient's current treatments and investigations against |
| the PCDT for the confirmed-or-suspected diagnosis. Flags: |
| - PCDT-required exams not performed |
| - PCDT-recommended therapies not initiated |
| - Therapies in use that are NOT in the PCDT (off-protocol) |
| - Specialist referrals that the PCDT requires |
| |
| Returns a `ProtocolComplianceSpec` with a 0..1 compliance score and |
| itemised gaps. |
| """ |
| from __future__ import annotations |
| import logging |
| import re |
| from typing import Optional |
|
|
| from .types import ProtocolComplianceSpec, ComplianceGap |
|
|
| logger = logging.getLogger("gemeo.protocol_compliance") |
|
|
|
|
| def _extract_protocol_items(pcdt: dict, key_options: list) -> list[str]: |
| """Pull a flat list of strings from a PCDT field that may be list or text.""" |
| if not pcdt: |
| return [] |
| out = [] |
| for k in key_options: |
| v = pcdt.get(k) |
| if isinstance(v, list): |
| out.extend(str(x) for x in v if x) |
| elif isinstance(v, str) and v.strip(): |
| for line in re.split(r"[\n;]", v): |
| line = line.strip(" -•·\t") |
| if line: |
| out.append(line) |
| return [x for x in out if x] |
|
|
|
|
| def _matches_any(target: str, candidates: list[str]) -> bool: |
| t = (target or "").lower() |
| if not t: |
| return False |
| return any(t in c.lower() or c.lower() in t for c in candidates) |
|
|
|
|
| def assess( |
| *, |
| orpha: Optional[str], |
| current_treatments: list = None, |
| current_labs: list = None, |
| current_imaging: list = None, |
| ) -> ProtocolComplianceSpec: |
| """Compute compliance with the PCDT for `orpha`.""" |
| if not orpha: |
| return ProtocolComplianceSpec(disease_orpha=None, score=0.0, gaps=[], notes="No diagnosis provided.") |
|
|
| try: |
| from brazilian_context import get_pcdt |
| except ImportError: |
| return ProtocolComplianceSpec(disease_orpha=orpha, score=0.0, gaps=[], notes="brazilian_context unavailable.") |
|
|
| pcdt = None |
| try: |
| pcdt = get_pcdt(orpha) |
| except Exception as e: |
| logger.debug(f"get_pcdt failed: {e}") |
|
|
| if not pcdt: |
| return ProtocolComplianceSpec(disease_orpha=orpha, score=0.0, gaps=[], notes=f"No PCDT for ORPHA:{orpha}.") |
|
|
| pcdt_therapies = _extract_protocol_items(pcdt, ["therapies", "medicamentos", "tratamento"]) |
| pcdt_exams = _extract_protocol_items(pcdt, ["exames", "investigacao", "diagnostico", "criterios"]) |
| pcdt_referrals = _extract_protocol_items(pcdt, ["especialidades", "especialistas", "encaminhamento"]) |
|
|
| current_tx_names = [ |
| (t.get("name") if isinstance(t, dict) else str(t)) for t in (current_treatments or []) |
| ] |
| current_tx_names = [n for n in current_tx_names if n] |
|
|
| current_lab_names = [ |
| (l.get("test") if isinstance(l, dict) else str(l)) for l in (current_labs or []) |
| ] |
| current_lab_names = [n for n in current_lab_names if n] |
|
|
| current_imaging_names = [ |
| ((i.get("modality") or "") + " " + (i.get("finding") or "") if isinstance(i, dict) else str(i)) |
| for i in (current_imaging or []) |
| ] |
|
|
| gaps: list[ComplianceGap] = [] |
| n_required = len(pcdt_therapies) + len(pcdt_exams) + len(pcdt_referrals) |
| n_satisfied = 0 |
|
|
| for tx in pcdt_therapies: |
| if _matches_any(tx, current_tx_names): |
| n_satisfied += 1 |
| else: |
| gaps.append(ComplianceGap( |
| category="therapy", |
| expected=tx, |
| priority="high", |
| rationale=f"PCDT recommends {tx}; not present in current treatments.", |
| )) |
|
|
| for ex in pcdt_exams: |
| if _matches_any(ex, current_lab_names + current_imaging_names): |
| n_satisfied += 1 |
| else: |
| gaps.append(ComplianceGap( |
| category="exam", |
| expected=ex, |
| priority="medium", |
| rationale=f"PCDT requires/expects {ex}; no matching lab/imaging recorded.", |
| )) |
|
|
| for r in pcdt_referrals: |
| |
| gaps.append(ComplianceGap( |
| category="referral", |
| expected=r, |
| priority="medium", |
| rationale=f"PCDT requires specialist referral: {r}.", |
| )) |
|
|
| |
| for t in current_tx_names: |
| if pcdt_therapies and not _matches_any(t, pcdt_therapies): |
| gaps.append(ComplianceGap( |
| category="off_protocol", |
| expected="(remove or justify)", |
| got=t, |
| priority="low", |
| rationale=f"Therapy `{t}` is not part of the PCDT for this disease.", |
| )) |
|
|
| score = (n_satisfied / max(1, n_required)) if n_required else 0.0 |
|
|
| return ProtocolComplianceSpec( |
| disease_orpha=orpha, |
| score=round(score, 3), |
| gaps=gaps, |
| notes=( |
| f"PCDT items: {n_required} required, {n_satisfied} satisfied. " |
| f"{len([g for g in gaps if g.category != 'off_protocol'])} gaps + " |
| f"{len([g for g in gaps if g.category == 'off_protocol'])} off-protocol items." |
| ), |
| ) |
|
|