File size: 13,512 Bytes
089d665
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
"""Gemeo as LLM context — bidirectional pipe between space and the LLM.

Three responsibilities:
  1. **serialize_twin_for_llm(twin)** — produce a compact, token-budgeted
     structured block (Markdown) summarising the twin: top dx, key risks,
     phenotypes, genes, drugs in use, DDI alerts, PCDT gaps, next questions.
     This block is what the LLM sees on every call for this case.

  2. **inject_context(case_id, system_prompt)** — wrapper used by every
     LLM call in the swarm. Looks up the cached twin (or builds it lazily),
     prepends the structured block to the system prompt under a "## Gemeo
     Digital — contexto do paciente" header. Token budget enforced.

  3. **absorb_message(case_id, message_text)** — when an LLM agent or a
     user adds new clinical content (lab, HPO, gene, treatment), this
     extracts structured items and feeds them back via `evolve_gemeo`,
     so the next LLM call sees the updated twin.

This is the "GraphRAG of the patient" — context is grounded in a graph
that itself updates from every conversational turn.
"""
from __future__ import annotations
import logging
import re
from typing import Optional

logger = logging.getLogger("gemeo.llm_context")

# Token budget (rough char count; ~4 chars per token).
DEFAULT_MAX_CHARS = 4000


def _section(title: str, body: str) -> str:
    body = body.strip()
    if not body:
        return ""
    return f"### {title}\n{body}\n"


def serialize_twin_for_llm(twin, *, max_chars: int = DEFAULT_MAX_CHARS) -> str:
    """Compact Markdown representation of the digital twin for LLM consumption."""
    if twin is None:
        return ""

    parts: list[str] = []
    parts.append(f"_Twin {twin.id} · case {twin.case_id} · embedding {twin.embedding_dim}d_")

    # diagnoses
    if twin.diagnoses:
        lines = []
        for d in twin.diagnoses[:5]:
            p = d.get("probability") or 0
            name = d.get("disease") or d.get("name") or "?"
            orpha = d.get("orpha") or "?"
            lines.append(f"- {name} (ORPHA:{orpha}) — p={p:.2f}, status={d.get('status', 'active')}")
        parts.append(_section("Diferencial", "\n".join(lines)))

    # snapshot summary (n_phenotypes etc.)
    parts.append(_section(
        "Snapshot",
        f"- Fenótipos: {twin.n_phenotypes}\n- Genes: {twin.n_genes}\n- Labs: {twin.n_labs}\n- Versões snapshot: {len(twin.snapshot_versions or [])}",
    ))

    # risk
    if twin.risk:
        r = twin.risk
        parts.append(_section(
            "Risco",
            f"- Severidade: {r.overall_severity:.2f}\n- Progressão: {r.progression_risk:.2f}\n- Urgência tx: {r.treatment_urgency:.2f}",
        ))

    # trajectory horizons (state at 6/12/24 months)
    if twin.trajectory and twin.trajectory.horizons:
        lines = []
        for h in twin.trajectory.horizons[:3]:
            lines.append(f"- T+{h.months}m (risco {h.risk_score:.2f}): {h.state[:160]}")
        parts.append(_section("Trajetória", "\n".join(lines)))

    # cohort centroid
    if twin.cohort and twin.cohort.centroid_disease:
        c = twin.cohort.centroid_disease
        parts.append(_section(
            "Coorte",
            f"- Centróide: {c['name']} ({c.get('count', 0)} pacientes, {round(c.get('fraction', 0) * 100)}%)\n- Membros: {len(twin.cohort.members or [])}",
        ))

    # drug candidates with SUS
    if twin.drugs and twin.drugs.candidates:
        lines = []
        for d in twin.drugs.candidates[:5]:
            tags = []
            if d.get("sus_dispensed"): tags.append("SUS")
            elif d.get("sus_in_pcdt"): tags.append("PCDT")
            tag = f" [{', '.join(tags)}]" if tags else ""
            lines.append(f"- {d.get('name', '?')}{tag}{d.get('mechanism', '')[:80]}")
        parts.append(_section("Reposicionamento", "\n".join(lines)))

    # DDI alerts (only if any)
    if twin.ddi and twin.ddi.pairs:
        lines = []
        for p in twin.ddi.pairs[:5]:
            lines.append(f"- [{p.severity}] {p.drug_a}{p.drug_b}: {p.mechanism[:120]}")
        parts.append(_section(f"DDI (regime: {twin.ddi.regimen_risk})", "\n".join(lines)))

    # pharmacogen actionable
    if twin.pharmacogen and twin.pharmacogen.assessments:
        actionable = [a for a in twin.pharmacogen.assessments if a.cpic_level in ("A", "B")][:5]
        if actionable:
            lines = [
                f"- {a.gene}{f' {a.variant}' if a.variant else ''} × {a.drug} (CPIC {a.cpic_level}): {a.recommendation[:120]}"
                for a in actionable
            ]
            parts.append(_section("Farmacogenômica (acionável)", "\n".join(lines)))

    # family / pedigree
    if twin.family:
        f = twin.family
        rels = ", ".join(f"{r.relation} {round(r.recurrence_risk*100)}%" for r in (f.relatives or [])[:3])
        if rels:
            parts.append(_section("Pedigree", f"- Modo: {f.inheritance_mode}\n- Recorrência: {rels}"))

    # PCDT compliance
    if twin.protocol_compliance and twin.protocol_compliance.gaps:
        pc = twin.protocol_compliance
        gap_lines = [f"- [{g.priority}] {g.expected}" for g in pc.gaps[:5]]
        parts.append(_section(
            f"PCDT compliance ({round(pc.score * 100)}%)",
            "\n".join(gap_lines) if gap_lines else "Sem gaps.",
        ))

    # reverse phenotyping suggestions
    if twin.reverse_pheno and twin.reverse_pheno.items:
        lines = [f"- {it.name} ({it.hpo_id}, freq {round(it.expected_frequency*100)}%)" for it in twin.reverse_pheno.items[:5]]
        parts.append(_section("O que ainda procurar", "\n".join(lines)))

    # next questions
    if twin.next_questions:
        lines = [
            f"- {q.name} ({q.hpo_id}) — gain {q.information_gain:.2f} bits"
            + (" [PCDT]" if q.asks_in_pcdt else "")
            for q in twin.next_questions[:5]
        ]
        parts.append(_section("Próximas perguntas (info-gain)", "\n".join(lines)))

    # SUS grounding
    if twin.sus_check:
        s = twin.sus_check
        sus_lines = []
        if s.has_pcdt:
            sus_lines.append(f"- PCDT: sim ({s.pcdt_url or '—'})")
            if s.therapy_pcdt_recommended:
                sus_lines.append("- Terapias: " + ", ".join(s.therapy_pcdt_recommended[:5]))
        if s.nearest_centro:
            c = s.nearest_centro
            sus_lines.append(f"- Centro mais próximo: {c.get('nome')} ({c.get('cidade')}/{c.get('uf')})")
        if sus_lines:
            parts.append(_section("SUS", "\n".join(sus_lines)))

    out = "\n".join(p for p in parts if p)
    if len(out) > max_chars:
        out = out[: max_chars - 40].rstrip() + "\n\n…(twin truncado por orçamento de tokens)"
    return out


# ─── Inject into a system prompt ───────────────────────────────────────────

GEMEO_HEADER = "## Gêmeo Digital — contexto vivo do paciente"

GEMEO_INSTRUCTIONS = (
    "Use o gêmeo abaixo para fundamentar TODA recomendação. Quando precisar "
    "de evidência específica, chame a tool `gemeo_lookup(query)` para "
    "recuperar subgrafo + coorte + literatura ancorados no caso. Nunca "
    "invente fatos não presentes no gêmeo ou no resultado da lookup."
)


# ─── Adaptive (Self-RAG style) brief context — ~300 tokens ────────────────

def serialize_twin_brief(twin) -> str:
    """Compact ~300-token summary used for adaptive injection.

    Strategy: only top-3 dx + risk + SUS-aware drug + count of interesting
    secondary signals (DDI, pgx, family) so the LLM knows MORE is available.
    The model then calls `gemeo_lookup(query)` or `gemeo_state(section)`
    to dive in only when needed.
    """
    if twin is None:
        return ""
    lines: list[str] = [f"_Gemeo {twin.id} · {twin.embedding_dim}d_"]
    if twin.diagnoses:
        top3 = ", ".join(
            f"{d.get('name', d.get('disease', '?'))} (ORPHA:{d.get('orpha', '?')}, p={d.get('probability', 0):.2f})"
            for d in twin.diagnoses[:3]
        )
        lines.append(f"Top-dx: {top3}")
    if twin.risk:
        r = twin.risk
        lines.append(
            f"Risco: severidade={r.overall_severity:.2f} · progressão={r.progression_risk:.2f} · urgência={r.treatment_urgency:.2f}"
        )
    if twin.drugs and twin.drugs.candidates:
        sus_drug = next((d for d in twin.drugs.candidates if d.get("sus_dispensed")), None)
        if sus_drug:
            lines.append(f"SUS dispensa: {sus_drug.get('name')} ({sus_drug.get('mechanism', '')[:60]})")
    if twin.sus_check and twin.sus_check.has_pcdt:
        lines.append("PCDT: vigente; chame `gemeo_state(\"sus_check\")` para detalhes")
    # Pointer to deeper sections
    extras = []
    if twin.ddi and twin.ddi.pairs: extras.append(f"DDI({len(twin.ddi.pairs)})")
    if twin.pharmacogen and twin.pharmacogen.n_actionable: extras.append(f"PGx({twin.pharmacogen.n_actionable})")
    if twin.family: extras.append(f"Pedigree({twin.family.inheritance_mode})")
    if twin.cohort and twin.cohort.members: extras.append(f"Cohort({len(twin.cohort.members)})")
    if twin.next_questions: extras.append(f"Perguntas({len(twin.next_questions)})")
    if extras:
        lines.append(
            "Mais: " + " · ".join(extras) +
            ". Chame `gemeo_state(<section>)` ou `gemeo_lookup(<query>)` para acessar."
        )
    return "\n".join(lines)


GEMEO_BRIEF_HEADER = "## Gêmeo Digital (resumo)"
GEMEO_BRIEF_INSTRUCTIONS = (
    "Resumo abaixo. Tools disponíveis: `gemeo_lookup(query, mode='local'|'global')` "
    "para evidência grounded em subgrafo+coorte+literatura, e "
    "`gemeo_state(section)` para uma das capabilities específicas. "
    "Não invente fatos fora do gêmeo / das tools."
)


async def inject_context(
    case_id: str,
    system_prompt: str,
    *,
    max_chars: int = DEFAULT_MAX_CHARS,
    refresh: bool = False,
    mode: str = "adaptive",
) -> str:
    """Prepend the Gemeo twin context to a system prompt.

    mode: "adaptive" (default) — short brief + tool pointers (~300 tokens)
          "full"               — entire serialized twin (~max_chars tokens)
          "off"                — no injection (returns original)

    Always falls back to the original prompt on any failure.
    """
    if not case_id or mode == "off":
        return system_prompt
    try:
        from . import core as gcore
        twin = gcore.get_gemeo(case_id)
        if twin is None or refresh:
            twin = await gcore.query_gemeo(case_id)
        if twin is None:
            return system_prompt
        if mode == "adaptive":
            block = serialize_twin_brief(twin)
            header = GEMEO_BRIEF_HEADER
            instructions = GEMEO_BRIEF_INSTRUCTIONS
        else:  # full
            block = serialize_twin_for_llm(twin, max_chars=max_chars)
            header = GEMEO_HEADER
            instructions = GEMEO_INSTRUCTIONS
        if not block:
            return system_prompt
        return f"{header}\n{instructions}\n\n{block}\n\n---\n\n{system_prompt}"
    except Exception as e:
        logger.warning(f"inject_context failed for {case_id[:8] if case_id else '?'}: {e}")
        return system_prompt


# ─── Absorb a message back into the twin ──────────────────────────────────

# Lightweight regex extractors. Real impl can use the existing entity
# extractors (bio_tools / phenotype agent). This is the "always-on" pass
# that runs on every message to keep the twin fresh.

_HPO_RE   = re.compile(r"\bHP:(\d{7})\b")
_ORPHA_RE = re.compile(r"\bORPHA:(\d{1,7})\b", re.IGNORECASE)
_GENE_RE  = re.compile(r"\b([A-Z][A-Z0-9]{1,7})\b\s*(?:gene|mutation|variant|c\.|p\.)", re.IGNORECASE)


async def absorb_message(
    case_id: str,
    message_text: str,
    *,
    source: str = "user",
) -> dict:
    """Extract HPO / ORPHA / gene mentions from a free-text message and
    feed them into the twin via `evolve_gemeo`.

    Returns a dict summarising what was absorbed (so the caller can show
    a "X items added to twin" hint in the UI).
    """
    if not case_id or not message_text:
        return {"hpo": [], "orpha": [], "genes": [], "absorbed": False}

    hpo_ids = list({f"HP:{m.group(1)}" for m in _HPO_RE.finditer(message_text)})
    orpha_ids = list({m.group(1) for m in _ORPHA_RE.finditer(message_text)})
    genes_raw = list({m.group(1).upper() for m in _GENE_RE.finditer(message_text)})

    if not (hpo_ids or orpha_ids or genes_raw):
        return {"hpo": [], "orpha": [], "genes": [], "absorbed": False}

    new_phenotypes = [
        {"hpo_id": h, "name": h, "source": source, "status": "extracted"}
        for h in hpo_ids
    ]
    new_genes = [
        {"symbol": g, "source": source, "status": "extracted"}
        for g in genes_raw
    ]

    try:
        from . import core as gcore
        if new_phenotypes or new_genes:
            await gcore.evolve_gemeo(
                case_id,
                new_phenotypes=new_phenotypes,
                new_genes=new_genes,
            )
        return {
            "hpo": hpo_ids,
            "orpha": orpha_ids,
            "genes": genes_raw,
            "absorbed": True,
            "source": source,
        }
    except Exception as e:
        logger.warning(f"absorb_message failed: {e}")
        return {"hpo": hpo_ids, "orpha": orpha_ids, "genes": genes_raw, "absorbed": False, "error": str(e)}