"""Gemeo as LLM context — bidirectional pipe between space and the LLM. Three responsibilities: 1. **serialize_twin_for_llm(twin)** — produce a compact, token-budgeted structured block (Markdown) summarising the twin: top dx, key risks, phenotypes, genes, drugs in use, DDI alerts, PCDT gaps, next questions. This block is what the LLM sees on every call for this case. 2. **inject_context(case_id, system_prompt)** — wrapper used by every LLM call in the swarm. Looks up the cached twin (or builds it lazily), prepends the structured block to the system prompt under a "## Gemeo Digital — contexto do paciente" header. Token budget enforced. 3. **absorb_message(case_id, message_text)** — when an LLM agent or a user adds new clinical content (lab, HPO, gene, treatment), this extracts structured items and feeds them back via `evolve_gemeo`, so the next LLM call sees the updated twin. This is the "GraphRAG of the patient" — context is grounded in a graph that itself updates from every conversational turn. """ from __future__ import annotations import logging import re from typing import Optional logger = logging.getLogger("gemeo.llm_context") # Token budget (rough char count; ~4 chars per token). DEFAULT_MAX_CHARS = 4000 def _section(title: str, body: str) -> str: body = body.strip() if not body: return "" return f"### {title}\n{body}\n" def serialize_twin_for_llm(twin, *, max_chars: int = DEFAULT_MAX_CHARS) -> str: """Compact Markdown representation of the digital twin for LLM consumption.""" if twin is None: return "" parts: list[str] = [] parts.append(f"_Twin {twin.id} · case {twin.case_id} · embedding {twin.embedding_dim}d_") # diagnoses if twin.diagnoses: lines = [] for d in twin.diagnoses[:5]: p = d.get("probability") or 0 name = d.get("disease") or d.get("name") or "?" orpha = d.get("orpha") or "?" lines.append(f"- {name} (ORPHA:{orpha}) — p={p:.2f}, status={d.get('status', 'active')}") parts.append(_section("Diferencial", "\n".join(lines))) # snapshot summary (n_phenotypes etc.) parts.append(_section( "Snapshot", f"- Fenótipos: {twin.n_phenotypes}\n- Genes: {twin.n_genes}\n- Labs: {twin.n_labs}\n- Versões snapshot: {len(twin.snapshot_versions or [])}", )) # risk if twin.risk: r = twin.risk parts.append(_section( "Risco", f"- Severidade: {r.overall_severity:.2f}\n- Progressão: {r.progression_risk:.2f}\n- Urgência tx: {r.treatment_urgency:.2f}", )) # trajectory horizons (state at 6/12/24 months) if twin.trajectory and twin.trajectory.horizons: lines = [] for h in twin.trajectory.horizons[:3]: lines.append(f"- T+{h.months}m (risco {h.risk_score:.2f}): {h.state[:160]}") parts.append(_section("Trajetória", "\n".join(lines))) # cohort centroid if twin.cohort and twin.cohort.centroid_disease: c = twin.cohort.centroid_disease parts.append(_section( "Coorte", f"- Centróide: {c['name']} ({c.get('count', 0)} pacientes, {round(c.get('fraction', 0) * 100)}%)\n- Membros: {len(twin.cohort.members or [])}", )) # drug candidates with SUS if twin.drugs and twin.drugs.candidates: lines = [] for d in twin.drugs.candidates[:5]: tags = [] if d.get("sus_dispensed"): tags.append("SUS") elif d.get("sus_in_pcdt"): tags.append("PCDT") tag = f" [{', '.join(tags)}]" if tags else "" lines.append(f"- {d.get('name', '?')}{tag} — {d.get('mechanism', '')[:80]}") parts.append(_section("Reposicionamento", "\n".join(lines))) # DDI alerts (only if any) if twin.ddi and twin.ddi.pairs: lines = [] for p in twin.ddi.pairs[:5]: lines.append(f"- [{p.severity}] {p.drug_a} ↔ {p.drug_b}: {p.mechanism[:120]}") parts.append(_section(f"DDI (regime: {twin.ddi.regimen_risk})", "\n".join(lines))) # pharmacogen actionable if twin.pharmacogen and twin.pharmacogen.assessments: actionable = [a for a in twin.pharmacogen.assessments if a.cpic_level in ("A", "B")][:5] if actionable: lines = [ f"- {a.gene}{f' {a.variant}' if a.variant else ''} × {a.drug} (CPIC {a.cpic_level}): {a.recommendation[:120]}" for a in actionable ] parts.append(_section("Farmacogenômica (acionável)", "\n".join(lines))) # family / pedigree if twin.family: f = twin.family rels = ", ".join(f"{r.relation} {round(r.recurrence_risk*100)}%" for r in (f.relatives or [])[:3]) if rels: parts.append(_section("Pedigree", f"- Modo: {f.inheritance_mode}\n- Recorrência: {rels}")) # PCDT compliance if twin.protocol_compliance and twin.protocol_compliance.gaps: pc = twin.protocol_compliance gap_lines = [f"- [{g.priority}] {g.expected}" for g in pc.gaps[:5]] parts.append(_section( f"PCDT compliance ({round(pc.score * 100)}%)", "\n".join(gap_lines) if gap_lines else "Sem gaps.", )) # reverse phenotyping suggestions if twin.reverse_pheno and twin.reverse_pheno.items: lines = [f"- {it.name} ({it.hpo_id}, freq {round(it.expected_frequency*100)}%)" for it in twin.reverse_pheno.items[:5]] parts.append(_section("O que ainda procurar", "\n".join(lines))) # next questions if twin.next_questions: lines = [ f"- {q.name} ({q.hpo_id}) — gain {q.information_gain:.2f} bits" + (" [PCDT]" if q.asks_in_pcdt else "") for q in twin.next_questions[:5] ] parts.append(_section("Próximas perguntas (info-gain)", "\n".join(lines))) # SUS grounding if twin.sus_check: s = twin.sus_check sus_lines = [] if s.has_pcdt: sus_lines.append(f"- PCDT: sim ({s.pcdt_url or '—'})") if s.therapy_pcdt_recommended: sus_lines.append("- Terapias: " + ", ".join(s.therapy_pcdt_recommended[:5])) if s.nearest_centro: c = s.nearest_centro sus_lines.append(f"- Centro mais próximo: {c.get('nome')} ({c.get('cidade')}/{c.get('uf')})") if sus_lines: parts.append(_section("SUS", "\n".join(sus_lines))) out = "\n".join(p for p in parts if p) if len(out) > max_chars: out = out[: max_chars - 40].rstrip() + "\n\n…(twin truncado por orçamento de tokens)" return out # ─── Inject into a system prompt ─────────────────────────────────────────── GEMEO_HEADER = "## Gêmeo Digital — contexto vivo do paciente" GEMEO_INSTRUCTIONS = ( "Use o gêmeo abaixo para fundamentar TODA recomendação. Quando precisar " "de evidência específica, chame a tool `gemeo_lookup(query)` para " "recuperar subgrafo + coorte + literatura ancorados no caso. Nunca " "invente fatos não presentes no gêmeo ou no resultado da lookup." ) # ─── Adaptive (Self-RAG style) brief context — ~300 tokens ──────────────── def serialize_twin_brief(twin) -> str: """Compact ~300-token summary used for adaptive injection. Strategy: only top-3 dx + risk + SUS-aware drug + count of interesting secondary signals (DDI, pgx, family) so the LLM knows MORE is available. The model then calls `gemeo_lookup(query)` or `gemeo_state(section)` to dive in only when needed. """ if twin is None: return "" lines: list[str] = [f"_Gemeo {twin.id} · {twin.embedding_dim}d_"] if twin.diagnoses: top3 = ", ".join( f"{d.get('name', d.get('disease', '?'))} (ORPHA:{d.get('orpha', '?')}, p={d.get('probability', 0):.2f})" for d in twin.diagnoses[:3] ) lines.append(f"Top-dx: {top3}") if twin.risk: r = twin.risk lines.append( f"Risco: severidade={r.overall_severity:.2f} · progressão={r.progression_risk:.2f} · urgência={r.treatment_urgency:.2f}" ) if twin.drugs and twin.drugs.candidates: sus_drug = next((d for d in twin.drugs.candidates if d.get("sus_dispensed")), None) if sus_drug: lines.append(f"SUS dispensa: {sus_drug.get('name')} ({sus_drug.get('mechanism', '')[:60]})") if twin.sus_check and twin.sus_check.has_pcdt: lines.append("PCDT: vigente; chame `gemeo_state(\"sus_check\")` para detalhes") # Pointer to deeper sections extras = [] if twin.ddi and twin.ddi.pairs: extras.append(f"DDI({len(twin.ddi.pairs)})") if twin.pharmacogen and twin.pharmacogen.n_actionable: extras.append(f"PGx({twin.pharmacogen.n_actionable})") if twin.family: extras.append(f"Pedigree({twin.family.inheritance_mode})") if twin.cohort and twin.cohort.members: extras.append(f"Cohort({len(twin.cohort.members)})") if twin.next_questions: extras.append(f"Perguntas({len(twin.next_questions)})") if extras: lines.append( "Mais: " + " · ".join(extras) + ". Chame `gemeo_state(
)` ou `gemeo_lookup()` para acessar." ) return "\n".join(lines) GEMEO_BRIEF_HEADER = "## Gêmeo Digital (resumo)" GEMEO_BRIEF_INSTRUCTIONS = ( "Resumo abaixo. Tools disponíveis: `gemeo_lookup(query, mode='local'|'global')` " "para evidência grounded em subgrafo+coorte+literatura, e " "`gemeo_state(section)` para uma das capabilities específicas. " "Não invente fatos fora do gêmeo / das tools." ) async def inject_context( case_id: str, system_prompt: str, *, max_chars: int = DEFAULT_MAX_CHARS, refresh: bool = False, mode: str = "adaptive", ) -> str: """Prepend the Gemeo twin context to a system prompt. mode: "adaptive" (default) — short brief + tool pointers (~300 tokens) "full" — entire serialized twin (~max_chars tokens) "off" — no injection (returns original) Always falls back to the original prompt on any failure. """ if not case_id or mode == "off": return system_prompt try: from . import core as gcore twin = gcore.get_gemeo(case_id) if twin is None or refresh: twin = await gcore.query_gemeo(case_id) if twin is None: return system_prompt if mode == "adaptive": block = serialize_twin_brief(twin) header = GEMEO_BRIEF_HEADER instructions = GEMEO_BRIEF_INSTRUCTIONS else: # full block = serialize_twin_for_llm(twin, max_chars=max_chars) header = GEMEO_HEADER instructions = GEMEO_INSTRUCTIONS if not block: return system_prompt return f"{header}\n{instructions}\n\n{block}\n\n---\n\n{system_prompt}" except Exception as e: logger.warning(f"inject_context failed for {case_id[:8] if case_id else '?'}: {e}") return system_prompt # ─── Absorb a message back into the twin ────────────────────────────────── # Lightweight regex extractors. Real impl can use the existing entity # extractors (bio_tools / phenotype agent). This is the "always-on" pass # that runs on every message to keep the twin fresh. _HPO_RE = re.compile(r"\bHP:(\d{7})\b") _ORPHA_RE = re.compile(r"\bORPHA:(\d{1,7})\b", re.IGNORECASE) _GENE_RE = re.compile(r"\b([A-Z][A-Z0-9]{1,7})\b\s*(?:gene|mutation|variant|c\.|p\.)", re.IGNORECASE) async def absorb_message( case_id: str, message_text: str, *, source: str = "user", ) -> dict: """Extract HPO / ORPHA / gene mentions from a free-text message and feed them into the twin via `evolve_gemeo`. Returns a dict summarising what was absorbed (so the caller can show a "X items added to twin" hint in the UI). """ if not case_id or not message_text: return {"hpo": [], "orpha": [], "genes": [], "absorbed": False} hpo_ids = list({f"HP:{m.group(1)}" for m in _HPO_RE.finditer(message_text)}) orpha_ids = list({m.group(1) for m in _ORPHA_RE.finditer(message_text)}) genes_raw = list({m.group(1).upper() for m in _GENE_RE.finditer(message_text)}) if not (hpo_ids or orpha_ids or genes_raw): return {"hpo": [], "orpha": [], "genes": [], "absorbed": False} new_phenotypes = [ {"hpo_id": h, "name": h, "source": source, "status": "extracted"} for h in hpo_ids ] new_genes = [ {"symbol": g, "source": source, "status": "extracted"} for g in genes_raw ] try: from . import core as gcore if new_phenotypes or new_genes: await gcore.evolve_gemeo( case_id, new_phenotypes=new_phenotypes, new_genes=new_genes, ) return { "hpo": hpo_ids, "orpha": orpha_ids, "genes": genes_raw, "absorbed": True, "source": source, } except Exception as e: logger.warning(f"absorb_message failed: {e}") return {"hpo": hpo_ids, "orpha": orpha_ids, "genes": genes_raw, "absorbed": False, "error": str(e)}