File size: 5,204 Bytes
089d665
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Protocol compliance — current management vs PCDT.

Compares the patient's current treatments and investigations against
the PCDT for the confirmed-or-suspected diagnosis. Flags:
  - PCDT-required exams not performed
  - PCDT-recommended therapies not initiated
  - Therapies in use that are NOT in the PCDT (off-protocol)
  - Specialist referrals that the PCDT requires

Returns a `ProtocolComplianceSpec` with a 0..1 compliance score and
itemised gaps.
"""
from __future__ import annotations
import logging
import re
from typing import Optional

from .types import ProtocolComplianceSpec, ComplianceGap

logger = logging.getLogger("gemeo.protocol_compliance")


def _extract_protocol_items(pcdt: dict, key_options: list) -> list[str]:
    """Pull a flat list of strings from a PCDT field that may be list or text."""
    if not pcdt:
        return []
    out = []
    for k in key_options:
        v = pcdt.get(k)
        if isinstance(v, list):
            out.extend(str(x) for x in v if x)
        elif isinstance(v, str) and v.strip():
            for line in re.split(r"[\n;]", v):
                line = line.strip(" -•·\t")
                if line:
                    out.append(line)
    return [x for x in out if x]


def _matches_any(target: str, candidates: list[str]) -> bool:
    t = (target or "").lower()
    if not t:
        return False
    return any(t in c.lower() or c.lower() in t for c in candidates)


def assess(
    *,
    orpha: Optional[str],
    current_treatments: list = None,
    current_labs: list = None,
    current_imaging: list = None,
) -> ProtocolComplianceSpec:
    """Compute compliance with the PCDT for `orpha`."""
    if not orpha:
        return ProtocolComplianceSpec(disease_orpha=None, score=0.0, gaps=[], notes="No diagnosis provided.")

    try:
        from brazilian_context import get_pcdt
    except ImportError:
        return ProtocolComplianceSpec(disease_orpha=orpha, score=0.0, gaps=[], notes="brazilian_context unavailable.")

    pcdt = None
    try:
        pcdt = get_pcdt(orpha)
    except Exception as e:
        logger.debug(f"get_pcdt failed: {e}")

    if not pcdt:
        return ProtocolComplianceSpec(disease_orpha=orpha, score=0.0, gaps=[], notes=f"No PCDT for ORPHA:{orpha}.")

    pcdt_therapies = _extract_protocol_items(pcdt, ["therapies", "medicamentos", "tratamento"])
    pcdt_exams = _extract_protocol_items(pcdt, ["exames", "investigacao", "diagnostico", "criterios"])
    pcdt_referrals = _extract_protocol_items(pcdt, ["especialidades", "especialistas", "encaminhamento"])

    current_tx_names = [
        (t.get("name") if isinstance(t, dict) else str(t)) for t in (current_treatments or [])
    ]
    current_tx_names = [n for n in current_tx_names if n]

    current_lab_names = [
        (l.get("test") if isinstance(l, dict) else str(l)) for l in (current_labs or [])
    ]
    current_lab_names = [n for n in current_lab_names if n]

    current_imaging_names = [
        ((i.get("modality") or "") + " " + (i.get("finding") or "") if isinstance(i, dict) else str(i))
        for i in (current_imaging or [])
    ]

    gaps: list[ComplianceGap] = []
    n_required = len(pcdt_therapies) + len(pcdt_exams) + len(pcdt_referrals)
    n_satisfied = 0

    for tx in pcdt_therapies:
        if _matches_any(tx, current_tx_names):
            n_satisfied += 1
        else:
            gaps.append(ComplianceGap(
                category="therapy",
                expected=tx,
                priority="high",
                rationale=f"PCDT recommends {tx}; not present in current treatments.",
            ))

    for ex in pcdt_exams:
        if _matches_any(ex, current_lab_names + current_imaging_names):
            n_satisfied += 1
        else:
            gaps.append(ComplianceGap(
                category="exam",
                expected=ex,
                priority="medium",
                rationale=f"PCDT requires/expects {ex}; no matching lab/imaging recorded.",
            ))

    for r in pcdt_referrals:
        # we don't have specialist referral state; surface as soft gap
        gaps.append(ComplianceGap(
            category="referral",
            expected=r,
            priority="medium",
            rationale=f"PCDT requires specialist referral: {r}.",
        ))

    # Off-protocol therapies (in use but not in PCDT list)
    for t in current_tx_names:
        if pcdt_therapies and not _matches_any(t, pcdt_therapies):
            gaps.append(ComplianceGap(
                category="off_protocol",
                expected="(remove or justify)",
                got=t,
                priority="low",
                rationale=f"Therapy `{t}` is not part of the PCDT for this disease.",
            ))

    score = (n_satisfied / max(1, n_required)) if n_required else 0.0

    return ProtocolComplianceSpec(
        disease_orpha=orpha,
        score=round(score, 3),
        gaps=gaps,
        notes=(
            f"PCDT items: {n_required} required, {n_satisfied} satisfied. "
            f"{len([g for g in gaps if g.category != 'off_protocol'])} gaps + "
            f"{len([g for g in gaps if g.category == 'off_protocol'])} off-protocol items."
        ),
    )