gemeo-twin-stack / src /gemeo /llm_context.py
timmers's picture
GEMEO world-model — initial release (module + NeuralSurv ckpt + RareBench v49 + KG embeddings)
089d665 verified
"""Gemeo as LLM context — bidirectional pipe between space and the LLM.
Three responsibilities:
1. **serialize_twin_for_llm(twin)** — produce a compact, token-budgeted
structured block (Markdown) summarising the twin: top dx, key risks,
phenotypes, genes, drugs in use, DDI alerts, PCDT gaps, next questions.
This block is what the LLM sees on every call for this case.
2. **inject_context(case_id, system_prompt)** — wrapper used by every
LLM call in the swarm. Looks up the cached twin (or builds it lazily),
prepends the structured block to the system prompt under a "## Gemeo
Digital — contexto do paciente" header. Token budget enforced.
3. **absorb_message(case_id, message_text)** — when an LLM agent or a
user adds new clinical content (lab, HPO, gene, treatment), this
extracts structured items and feeds them back via `evolve_gemeo`,
so the next LLM call sees the updated twin.
This is the "GraphRAG of the patient" — context is grounded in a graph
that itself updates from every conversational turn.
"""
from __future__ import annotations
import logging
import re
from typing import Optional
logger = logging.getLogger("gemeo.llm_context")
# Token budget (rough char count; ~4 chars per token).
DEFAULT_MAX_CHARS = 4000
def _section(title: str, body: str) -> str:
body = body.strip()
if not body:
return ""
return f"### {title}\n{body}\n"
def serialize_twin_for_llm(twin, *, max_chars: int = DEFAULT_MAX_CHARS) -> str:
"""Compact Markdown representation of the digital twin for LLM consumption."""
if twin is None:
return ""
parts: list[str] = []
parts.append(f"_Twin {twin.id} · case {twin.case_id} · embedding {twin.embedding_dim}d_")
# diagnoses
if twin.diagnoses:
lines = []
for d in twin.diagnoses[:5]:
p = d.get("probability") or 0
name = d.get("disease") or d.get("name") or "?"
orpha = d.get("orpha") or "?"
lines.append(f"- {name} (ORPHA:{orpha}) — p={p:.2f}, status={d.get('status', 'active')}")
parts.append(_section("Diferencial", "\n".join(lines)))
# snapshot summary (n_phenotypes etc.)
parts.append(_section(
"Snapshot",
f"- Fenótipos: {twin.n_phenotypes}\n- Genes: {twin.n_genes}\n- Labs: {twin.n_labs}\n- Versões snapshot: {len(twin.snapshot_versions or [])}",
))
# risk
if twin.risk:
r = twin.risk
parts.append(_section(
"Risco",
f"- Severidade: {r.overall_severity:.2f}\n- Progressão: {r.progression_risk:.2f}\n- Urgência tx: {r.treatment_urgency:.2f}",
))
# trajectory horizons (state at 6/12/24 months)
if twin.trajectory and twin.trajectory.horizons:
lines = []
for h in twin.trajectory.horizons[:3]:
lines.append(f"- T+{h.months}m (risco {h.risk_score:.2f}): {h.state[:160]}")
parts.append(_section("Trajetória", "\n".join(lines)))
# cohort centroid
if twin.cohort and twin.cohort.centroid_disease:
c = twin.cohort.centroid_disease
parts.append(_section(
"Coorte",
f"- Centróide: {c['name']} ({c.get('count', 0)} pacientes, {round(c.get('fraction', 0) * 100)}%)\n- Membros: {len(twin.cohort.members or [])}",
))
# drug candidates with SUS
if twin.drugs and twin.drugs.candidates:
lines = []
for d in twin.drugs.candidates[:5]:
tags = []
if d.get("sus_dispensed"): tags.append("SUS")
elif d.get("sus_in_pcdt"): tags.append("PCDT")
tag = f" [{', '.join(tags)}]" if tags else ""
lines.append(f"- {d.get('name', '?')}{tag}{d.get('mechanism', '')[:80]}")
parts.append(_section("Reposicionamento", "\n".join(lines)))
# DDI alerts (only if any)
if twin.ddi and twin.ddi.pairs:
lines = []
for p in twin.ddi.pairs[:5]:
lines.append(f"- [{p.severity}] {p.drug_a}{p.drug_b}: {p.mechanism[:120]}")
parts.append(_section(f"DDI (regime: {twin.ddi.regimen_risk})", "\n".join(lines)))
# pharmacogen actionable
if twin.pharmacogen and twin.pharmacogen.assessments:
actionable = [a for a in twin.pharmacogen.assessments if a.cpic_level in ("A", "B")][:5]
if actionable:
lines = [
f"- {a.gene}{f' {a.variant}' if a.variant else ''} × {a.drug} (CPIC {a.cpic_level}): {a.recommendation[:120]}"
for a in actionable
]
parts.append(_section("Farmacogenômica (acionável)", "\n".join(lines)))
# family / pedigree
if twin.family:
f = twin.family
rels = ", ".join(f"{r.relation} {round(r.recurrence_risk*100)}%" for r in (f.relatives or [])[:3])
if rels:
parts.append(_section("Pedigree", f"- Modo: {f.inheritance_mode}\n- Recorrência: {rels}"))
# PCDT compliance
if twin.protocol_compliance and twin.protocol_compliance.gaps:
pc = twin.protocol_compliance
gap_lines = [f"- [{g.priority}] {g.expected}" for g in pc.gaps[:5]]
parts.append(_section(
f"PCDT compliance ({round(pc.score * 100)}%)",
"\n".join(gap_lines) if gap_lines else "Sem gaps.",
))
# reverse phenotyping suggestions
if twin.reverse_pheno and twin.reverse_pheno.items:
lines = [f"- {it.name} ({it.hpo_id}, freq {round(it.expected_frequency*100)}%)" for it in twin.reverse_pheno.items[:5]]
parts.append(_section("O que ainda procurar", "\n".join(lines)))
# next questions
if twin.next_questions:
lines = [
f"- {q.name} ({q.hpo_id}) — gain {q.information_gain:.2f} bits"
+ (" [PCDT]" if q.asks_in_pcdt else "")
for q in twin.next_questions[:5]
]
parts.append(_section("Próximas perguntas (info-gain)", "\n".join(lines)))
# SUS grounding
if twin.sus_check:
s = twin.sus_check
sus_lines = []
if s.has_pcdt:
sus_lines.append(f"- PCDT: sim ({s.pcdt_url or '—'})")
if s.therapy_pcdt_recommended:
sus_lines.append("- Terapias: " + ", ".join(s.therapy_pcdt_recommended[:5]))
if s.nearest_centro:
c = s.nearest_centro
sus_lines.append(f"- Centro mais próximo: {c.get('nome')} ({c.get('cidade')}/{c.get('uf')})")
if sus_lines:
parts.append(_section("SUS", "\n".join(sus_lines)))
out = "\n".join(p for p in parts if p)
if len(out) > max_chars:
out = out[: max_chars - 40].rstrip() + "\n\n…(twin truncado por orçamento de tokens)"
return out
# ─── Inject into a system prompt ───────────────────────────────────────────
GEMEO_HEADER = "## Gêmeo Digital — contexto vivo do paciente"
GEMEO_INSTRUCTIONS = (
"Use o gêmeo abaixo para fundamentar TODA recomendação. Quando precisar "
"de evidência específica, chame a tool `gemeo_lookup(query)` para "
"recuperar subgrafo + coorte + literatura ancorados no caso. Nunca "
"invente fatos não presentes no gêmeo ou no resultado da lookup."
)
# ─── Adaptive (Self-RAG style) brief context — ~300 tokens ────────────────
def serialize_twin_brief(twin) -> str:
"""Compact ~300-token summary used for adaptive injection.
Strategy: only top-3 dx + risk + SUS-aware drug + count of interesting
secondary signals (DDI, pgx, family) so the LLM knows MORE is available.
The model then calls `gemeo_lookup(query)` or `gemeo_state(section)`
to dive in only when needed.
"""
if twin is None:
return ""
lines: list[str] = [f"_Gemeo {twin.id} · {twin.embedding_dim}d_"]
if twin.diagnoses:
top3 = ", ".join(
f"{d.get('name', d.get('disease', '?'))} (ORPHA:{d.get('orpha', '?')}, p={d.get('probability', 0):.2f})"
for d in twin.diagnoses[:3]
)
lines.append(f"Top-dx: {top3}")
if twin.risk:
r = twin.risk
lines.append(
f"Risco: severidade={r.overall_severity:.2f} · progressão={r.progression_risk:.2f} · urgência={r.treatment_urgency:.2f}"
)
if twin.drugs and twin.drugs.candidates:
sus_drug = next((d for d in twin.drugs.candidates if d.get("sus_dispensed")), None)
if sus_drug:
lines.append(f"SUS dispensa: {sus_drug.get('name')} ({sus_drug.get('mechanism', '')[:60]})")
if twin.sus_check and twin.sus_check.has_pcdt:
lines.append("PCDT: vigente; chame `gemeo_state(\"sus_check\")` para detalhes")
# Pointer to deeper sections
extras = []
if twin.ddi and twin.ddi.pairs: extras.append(f"DDI({len(twin.ddi.pairs)})")
if twin.pharmacogen and twin.pharmacogen.n_actionable: extras.append(f"PGx({twin.pharmacogen.n_actionable})")
if twin.family: extras.append(f"Pedigree({twin.family.inheritance_mode})")
if twin.cohort and twin.cohort.members: extras.append(f"Cohort({len(twin.cohort.members)})")
if twin.next_questions: extras.append(f"Perguntas({len(twin.next_questions)})")
if extras:
lines.append(
"Mais: " + " · ".join(extras) +
". Chame `gemeo_state(<section>)` ou `gemeo_lookup(<query>)` para acessar."
)
return "\n".join(lines)
GEMEO_BRIEF_HEADER = "## Gêmeo Digital (resumo)"
GEMEO_BRIEF_INSTRUCTIONS = (
"Resumo abaixo. Tools disponíveis: `gemeo_lookup(query, mode='local'|'global')` "
"para evidência grounded em subgrafo+coorte+literatura, e "
"`gemeo_state(section)` para uma das capabilities específicas. "
"Não invente fatos fora do gêmeo / das tools."
)
async def inject_context(
case_id: str,
system_prompt: str,
*,
max_chars: int = DEFAULT_MAX_CHARS,
refresh: bool = False,
mode: str = "adaptive",
) -> str:
"""Prepend the Gemeo twin context to a system prompt.
mode: "adaptive" (default) — short brief + tool pointers (~300 tokens)
"full" — entire serialized twin (~max_chars tokens)
"off" — no injection (returns original)
Always falls back to the original prompt on any failure.
"""
if not case_id or mode == "off":
return system_prompt
try:
from . import core as gcore
twin = gcore.get_gemeo(case_id)
if twin is None or refresh:
twin = await gcore.query_gemeo(case_id)
if twin is None:
return system_prompt
if mode == "adaptive":
block = serialize_twin_brief(twin)
header = GEMEO_BRIEF_HEADER
instructions = GEMEO_BRIEF_INSTRUCTIONS
else: # full
block = serialize_twin_for_llm(twin, max_chars=max_chars)
header = GEMEO_HEADER
instructions = GEMEO_INSTRUCTIONS
if not block:
return system_prompt
return f"{header}\n{instructions}\n\n{block}\n\n---\n\n{system_prompt}"
except Exception as e:
logger.warning(f"inject_context failed for {case_id[:8] if case_id else '?'}: {e}")
return system_prompt
# ─── Absorb a message back into the twin ──────────────────────────────────
# Lightweight regex extractors. Real impl can use the existing entity
# extractors (bio_tools / phenotype agent). This is the "always-on" pass
# that runs on every message to keep the twin fresh.
_HPO_RE = re.compile(r"\bHP:(\d{7})\b")
_ORPHA_RE = re.compile(r"\bORPHA:(\d{1,7})\b", re.IGNORECASE)
_GENE_RE = re.compile(r"\b([A-Z][A-Z0-9]{1,7})\b\s*(?:gene|mutation|variant|c\.|p\.)", re.IGNORECASE)
async def absorb_message(
case_id: str,
message_text: str,
*,
source: str = "user",
) -> dict:
"""Extract HPO / ORPHA / gene mentions from a free-text message and
feed them into the twin via `evolve_gemeo`.
Returns a dict summarising what was absorbed (so the caller can show
a "X items added to twin" hint in the UI).
"""
if not case_id or not message_text:
return {"hpo": [], "orpha": [], "genes": [], "absorbed": False}
hpo_ids = list({f"HP:{m.group(1)}" for m in _HPO_RE.finditer(message_text)})
orpha_ids = list({m.group(1) for m in _ORPHA_RE.finditer(message_text)})
genes_raw = list({m.group(1).upper() for m in _GENE_RE.finditer(message_text)})
if not (hpo_ids or orpha_ids or genes_raw):
return {"hpo": [], "orpha": [], "genes": [], "absorbed": False}
new_phenotypes = [
{"hpo_id": h, "name": h, "source": source, "status": "extracted"}
for h in hpo_ids
]
new_genes = [
{"symbol": g, "source": source, "status": "extracted"}
for g in genes_raw
]
try:
from . import core as gcore
if new_phenotypes or new_genes:
await gcore.evolve_gemeo(
case_id,
new_phenotypes=new_phenotypes,
new_genes=new_genes,
)
return {
"hpo": hpo_ids,
"orpha": orpha_ids,
"genes": genes_raw,
"absorbed": True,
"source": source,
}
except Exception as e:
logger.warning(f"absorb_message failed: {e}")
return {"hpo": hpo_ids, "orpha": orpha_ids, "genes": genes_raw, "absorbed": False, "error": str(e)}