File size: 13,512 Bytes
089d665 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 | """Gemeo as LLM context — bidirectional pipe between space and the LLM.
Three responsibilities:
1. **serialize_twin_for_llm(twin)** — produce a compact, token-budgeted
structured block (Markdown) summarising the twin: top dx, key risks,
phenotypes, genes, drugs in use, DDI alerts, PCDT gaps, next questions.
This block is what the LLM sees on every call for this case.
2. **inject_context(case_id, system_prompt)** — wrapper used by every
LLM call in the swarm. Looks up the cached twin (or builds it lazily),
prepends the structured block to the system prompt under a "## Gemeo
Digital — contexto do paciente" header. Token budget enforced.
3. **absorb_message(case_id, message_text)** — when an LLM agent or a
user adds new clinical content (lab, HPO, gene, treatment), this
extracts structured items and feeds them back via `evolve_gemeo`,
so the next LLM call sees the updated twin.
This is the "GraphRAG of the patient" — context is grounded in a graph
that itself updates from every conversational turn.
"""
from __future__ import annotations
import logging
import re
from typing import Optional
logger = logging.getLogger("gemeo.llm_context")
# Token budget (rough char count; ~4 chars per token).
DEFAULT_MAX_CHARS = 4000
def _section(title: str, body: str) -> str:
body = body.strip()
if not body:
return ""
return f"### {title}\n{body}\n"
def serialize_twin_for_llm(twin, *, max_chars: int = DEFAULT_MAX_CHARS) -> str:
"""Compact Markdown representation of the digital twin for LLM consumption."""
if twin is None:
return ""
parts: list[str] = []
parts.append(f"_Twin {twin.id} · case {twin.case_id} · embedding {twin.embedding_dim}d_")
# diagnoses
if twin.diagnoses:
lines = []
for d in twin.diagnoses[:5]:
p = d.get("probability") or 0
name = d.get("disease") or d.get("name") or "?"
orpha = d.get("orpha") or "?"
lines.append(f"- {name} (ORPHA:{orpha}) — p={p:.2f}, status={d.get('status', 'active')}")
parts.append(_section("Diferencial", "\n".join(lines)))
# snapshot summary (n_phenotypes etc.)
parts.append(_section(
"Snapshot",
f"- Fenótipos: {twin.n_phenotypes}\n- Genes: {twin.n_genes}\n- Labs: {twin.n_labs}\n- Versões snapshot: {len(twin.snapshot_versions or [])}",
))
# risk
if twin.risk:
r = twin.risk
parts.append(_section(
"Risco",
f"- Severidade: {r.overall_severity:.2f}\n- Progressão: {r.progression_risk:.2f}\n- Urgência tx: {r.treatment_urgency:.2f}",
))
# trajectory horizons (state at 6/12/24 months)
if twin.trajectory and twin.trajectory.horizons:
lines = []
for h in twin.trajectory.horizons[:3]:
lines.append(f"- T+{h.months}m (risco {h.risk_score:.2f}): {h.state[:160]}")
parts.append(_section("Trajetória", "\n".join(lines)))
# cohort centroid
if twin.cohort and twin.cohort.centroid_disease:
c = twin.cohort.centroid_disease
parts.append(_section(
"Coorte",
f"- Centróide: {c['name']} ({c.get('count', 0)} pacientes, {round(c.get('fraction', 0) * 100)}%)\n- Membros: {len(twin.cohort.members or [])}",
))
# drug candidates with SUS
if twin.drugs and twin.drugs.candidates:
lines = []
for d in twin.drugs.candidates[:5]:
tags = []
if d.get("sus_dispensed"): tags.append("SUS")
elif d.get("sus_in_pcdt"): tags.append("PCDT")
tag = f" [{', '.join(tags)}]" if tags else ""
lines.append(f"- {d.get('name', '?')}{tag} — {d.get('mechanism', '')[:80]}")
parts.append(_section("Reposicionamento", "\n".join(lines)))
# DDI alerts (only if any)
if twin.ddi and twin.ddi.pairs:
lines = []
for p in twin.ddi.pairs[:5]:
lines.append(f"- [{p.severity}] {p.drug_a} ↔ {p.drug_b}: {p.mechanism[:120]}")
parts.append(_section(f"DDI (regime: {twin.ddi.regimen_risk})", "\n".join(lines)))
# pharmacogen actionable
if twin.pharmacogen and twin.pharmacogen.assessments:
actionable = [a for a in twin.pharmacogen.assessments if a.cpic_level in ("A", "B")][:5]
if actionable:
lines = [
f"- {a.gene}{f' {a.variant}' if a.variant else ''} × {a.drug} (CPIC {a.cpic_level}): {a.recommendation[:120]}"
for a in actionable
]
parts.append(_section("Farmacogenômica (acionável)", "\n".join(lines)))
# family / pedigree
if twin.family:
f = twin.family
rels = ", ".join(f"{r.relation} {round(r.recurrence_risk*100)}%" for r in (f.relatives or [])[:3])
if rels:
parts.append(_section("Pedigree", f"- Modo: {f.inheritance_mode}\n- Recorrência: {rels}"))
# PCDT compliance
if twin.protocol_compliance and twin.protocol_compliance.gaps:
pc = twin.protocol_compliance
gap_lines = [f"- [{g.priority}] {g.expected}" for g in pc.gaps[:5]]
parts.append(_section(
f"PCDT compliance ({round(pc.score * 100)}%)",
"\n".join(gap_lines) if gap_lines else "Sem gaps.",
))
# reverse phenotyping suggestions
if twin.reverse_pheno and twin.reverse_pheno.items:
lines = [f"- {it.name} ({it.hpo_id}, freq {round(it.expected_frequency*100)}%)" for it in twin.reverse_pheno.items[:5]]
parts.append(_section("O que ainda procurar", "\n".join(lines)))
# next questions
if twin.next_questions:
lines = [
f"- {q.name} ({q.hpo_id}) — gain {q.information_gain:.2f} bits"
+ (" [PCDT]" if q.asks_in_pcdt else "")
for q in twin.next_questions[:5]
]
parts.append(_section("Próximas perguntas (info-gain)", "\n".join(lines)))
# SUS grounding
if twin.sus_check:
s = twin.sus_check
sus_lines = []
if s.has_pcdt:
sus_lines.append(f"- PCDT: sim ({s.pcdt_url or '—'})")
if s.therapy_pcdt_recommended:
sus_lines.append("- Terapias: " + ", ".join(s.therapy_pcdt_recommended[:5]))
if s.nearest_centro:
c = s.nearest_centro
sus_lines.append(f"- Centro mais próximo: {c.get('nome')} ({c.get('cidade')}/{c.get('uf')})")
if sus_lines:
parts.append(_section("SUS", "\n".join(sus_lines)))
out = "\n".join(p for p in parts if p)
if len(out) > max_chars:
out = out[: max_chars - 40].rstrip() + "\n\n…(twin truncado por orçamento de tokens)"
return out
# ─── Inject into a system prompt ───────────────────────────────────────────
GEMEO_HEADER = "## Gêmeo Digital — contexto vivo do paciente"
GEMEO_INSTRUCTIONS = (
"Use o gêmeo abaixo para fundamentar TODA recomendação. Quando precisar "
"de evidência específica, chame a tool `gemeo_lookup(query)` para "
"recuperar subgrafo + coorte + literatura ancorados no caso. Nunca "
"invente fatos não presentes no gêmeo ou no resultado da lookup."
)
# ─── Adaptive (Self-RAG style) brief context — ~300 tokens ────────────────
def serialize_twin_brief(twin) -> str:
"""Compact ~300-token summary used for adaptive injection.
Strategy: only top-3 dx + risk + SUS-aware drug + count of interesting
secondary signals (DDI, pgx, family) so the LLM knows MORE is available.
The model then calls `gemeo_lookup(query)` or `gemeo_state(section)`
to dive in only when needed.
"""
if twin is None:
return ""
lines: list[str] = [f"_Gemeo {twin.id} · {twin.embedding_dim}d_"]
if twin.diagnoses:
top3 = ", ".join(
f"{d.get('name', d.get('disease', '?'))} (ORPHA:{d.get('orpha', '?')}, p={d.get('probability', 0):.2f})"
for d in twin.diagnoses[:3]
)
lines.append(f"Top-dx: {top3}")
if twin.risk:
r = twin.risk
lines.append(
f"Risco: severidade={r.overall_severity:.2f} · progressão={r.progression_risk:.2f} · urgência={r.treatment_urgency:.2f}"
)
if twin.drugs and twin.drugs.candidates:
sus_drug = next((d for d in twin.drugs.candidates if d.get("sus_dispensed")), None)
if sus_drug:
lines.append(f"SUS dispensa: {sus_drug.get('name')} ({sus_drug.get('mechanism', '')[:60]})")
if twin.sus_check and twin.sus_check.has_pcdt:
lines.append("PCDT: vigente; chame `gemeo_state(\"sus_check\")` para detalhes")
# Pointer to deeper sections
extras = []
if twin.ddi and twin.ddi.pairs: extras.append(f"DDI({len(twin.ddi.pairs)})")
if twin.pharmacogen and twin.pharmacogen.n_actionable: extras.append(f"PGx({twin.pharmacogen.n_actionable})")
if twin.family: extras.append(f"Pedigree({twin.family.inheritance_mode})")
if twin.cohort and twin.cohort.members: extras.append(f"Cohort({len(twin.cohort.members)})")
if twin.next_questions: extras.append(f"Perguntas({len(twin.next_questions)})")
if extras:
lines.append(
"Mais: " + " · ".join(extras) +
". Chame `gemeo_state(<section>)` ou `gemeo_lookup(<query>)` para acessar."
)
return "\n".join(lines)
GEMEO_BRIEF_HEADER = "## Gêmeo Digital (resumo)"
GEMEO_BRIEF_INSTRUCTIONS = (
"Resumo abaixo. Tools disponíveis: `gemeo_lookup(query, mode='local'|'global')` "
"para evidência grounded em subgrafo+coorte+literatura, e "
"`gemeo_state(section)` para uma das capabilities específicas. "
"Não invente fatos fora do gêmeo / das tools."
)
async def inject_context(
case_id: str,
system_prompt: str,
*,
max_chars: int = DEFAULT_MAX_CHARS,
refresh: bool = False,
mode: str = "adaptive",
) -> str:
"""Prepend the Gemeo twin context to a system prompt.
mode: "adaptive" (default) — short brief + tool pointers (~300 tokens)
"full" — entire serialized twin (~max_chars tokens)
"off" — no injection (returns original)
Always falls back to the original prompt on any failure.
"""
if not case_id or mode == "off":
return system_prompt
try:
from . import core as gcore
twin = gcore.get_gemeo(case_id)
if twin is None or refresh:
twin = await gcore.query_gemeo(case_id)
if twin is None:
return system_prompt
if mode == "adaptive":
block = serialize_twin_brief(twin)
header = GEMEO_BRIEF_HEADER
instructions = GEMEO_BRIEF_INSTRUCTIONS
else: # full
block = serialize_twin_for_llm(twin, max_chars=max_chars)
header = GEMEO_HEADER
instructions = GEMEO_INSTRUCTIONS
if not block:
return system_prompt
return f"{header}\n{instructions}\n\n{block}\n\n---\n\n{system_prompt}"
except Exception as e:
logger.warning(f"inject_context failed for {case_id[:8] if case_id else '?'}: {e}")
return system_prompt
# ─── Absorb a message back into the twin ──────────────────────────────────
# Lightweight regex extractors. Real impl can use the existing entity
# extractors (bio_tools / phenotype agent). This is the "always-on" pass
# that runs on every message to keep the twin fresh.
_HPO_RE = re.compile(r"\bHP:(\d{7})\b")
_ORPHA_RE = re.compile(r"\bORPHA:(\d{1,7})\b", re.IGNORECASE)
_GENE_RE = re.compile(r"\b([A-Z][A-Z0-9]{1,7})\b\s*(?:gene|mutation|variant|c\.|p\.)", re.IGNORECASE)
async def absorb_message(
case_id: str,
message_text: str,
*,
source: str = "user",
) -> dict:
"""Extract HPO / ORPHA / gene mentions from a free-text message and
feed them into the twin via `evolve_gemeo`.
Returns a dict summarising what was absorbed (so the caller can show
a "X items added to twin" hint in the UI).
"""
if not case_id or not message_text:
return {"hpo": [], "orpha": [], "genes": [], "absorbed": False}
hpo_ids = list({f"HP:{m.group(1)}" for m in _HPO_RE.finditer(message_text)})
orpha_ids = list({m.group(1) for m in _ORPHA_RE.finditer(message_text)})
genes_raw = list({m.group(1).upper() for m in _GENE_RE.finditer(message_text)})
if not (hpo_ids or orpha_ids or genes_raw):
return {"hpo": [], "orpha": [], "genes": [], "absorbed": False}
new_phenotypes = [
{"hpo_id": h, "name": h, "source": source, "status": "extracted"}
for h in hpo_ids
]
new_genes = [
{"symbol": g, "source": source, "status": "extracted"}
for g in genes_raw
]
try:
from . import core as gcore
if new_phenotypes or new_genes:
await gcore.evolve_gemeo(
case_id,
new_phenotypes=new_phenotypes,
new_genes=new_genes,
)
return {
"hpo": hpo_ids,
"orpha": orpha_ids,
"genes": genes_raw,
"absorbed": True,
"source": source,
}
except Exception as e:
logger.warning(f"absorb_message failed: {e}")
return {"hpo": hpo_ids, "orpha": orpha_ids, "genes": genes_raw, "absorbed": False, "error": str(e)}
|