| """Gemeo as LLM context — bidirectional pipe between space and the LLM. |
| |
| Three responsibilities: |
| 1. **serialize_twin_for_llm(twin)** — produce a compact, token-budgeted |
| structured block (Markdown) summarising the twin: top dx, key risks, |
| phenotypes, genes, drugs in use, DDI alerts, PCDT gaps, next questions. |
| This block is what the LLM sees on every call for this case. |
| |
| 2. **inject_context(case_id, system_prompt)** — wrapper used by every |
| LLM call in the swarm. Looks up the cached twin (or builds it lazily), |
| prepends the structured block to the system prompt under a "## Gemeo |
| Digital — contexto do paciente" header. Token budget enforced. |
| |
| 3. **absorb_message(case_id, message_text)** — when an LLM agent or a |
| user adds new clinical content (lab, HPO, gene, treatment), this |
| extracts structured items and feeds them back via `evolve_gemeo`, |
| so the next LLM call sees the updated twin. |
| |
| This is the "GraphRAG of the patient" — context is grounded in a graph |
| that itself updates from every conversational turn. |
| """ |
| from __future__ import annotations |
| import logging |
| import re |
| from typing import Optional |
|
|
| logger = logging.getLogger("gemeo.llm_context") |
|
|
| |
| DEFAULT_MAX_CHARS = 4000 |
|
|
|
|
| def _section(title: str, body: str) -> str: |
| body = body.strip() |
| if not body: |
| return "" |
| return f"### {title}\n{body}\n" |
|
|
|
|
| def serialize_twin_for_llm(twin, *, max_chars: int = DEFAULT_MAX_CHARS) -> str: |
| """Compact Markdown representation of the digital twin for LLM consumption.""" |
| if twin is None: |
| return "" |
|
|
| parts: list[str] = [] |
| parts.append(f"_Twin {twin.id} · case {twin.case_id} · embedding {twin.embedding_dim}d_") |
|
|
| |
| if twin.diagnoses: |
| lines = [] |
| for d in twin.diagnoses[:5]: |
| p = d.get("probability") or 0 |
| name = d.get("disease") or d.get("name") or "?" |
| orpha = d.get("orpha") or "?" |
| lines.append(f"- {name} (ORPHA:{orpha}) — p={p:.2f}, status={d.get('status', 'active')}") |
| parts.append(_section("Diferencial", "\n".join(lines))) |
|
|
| |
| parts.append(_section( |
| "Snapshot", |
| f"- Fenótipos: {twin.n_phenotypes}\n- Genes: {twin.n_genes}\n- Labs: {twin.n_labs}\n- Versões snapshot: {len(twin.snapshot_versions or [])}", |
| )) |
|
|
| |
| if twin.risk: |
| r = twin.risk |
| parts.append(_section( |
| "Risco", |
| f"- Severidade: {r.overall_severity:.2f}\n- Progressão: {r.progression_risk:.2f}\n- Urgência tx: {r.treatment_urgency:.2f}", |
| )) |
|
|
| |
| if twin.trajectory and twin.trajectory.horizons: |
| lines = [] |
| for h in twin.trajectory.horizons[:3]: |
| lines.append(f"- T+{h.months}m (risco {h.risk_score:.2f}): {h.state[:160]}") |
| parts.append(_section("Trajetória", "\n".join(lines))) |
|
|
| |
| if twin.cohort and twin.cohort.centroid_disease: |
| c = twin.cohort.centroid_disease |
| parts.append(_section( |
| "Coorte", |
| f"- Centróide: {c['name']} ({c.get('count', 0)} pacientes, {round(c.get('fraction', 0) * 100)}%)\n- Membros: {len(twin.cohort.members or [])}", |
| )) |
|
|
| |
| if twin.drugs and twin.drugs.candidates: |
| lines = [] |
| for d in twin.drugs.candidates[:5]: |
| tags = [] |
| if d.get("sus_dispensed"): tags.append("SUS") |
| elif d.get("sus_in_pcdt"): tags.append("PCDT") |
| tag = f" [{', '.join(tags)}]" if tags else "" |
| lines.append(f"- {d.get('name', '?')}{tag} — {d.get('mechanism', '')[:80]}") |
| parts.append(_section("Reposicionamento", "\n".join(lines))) |
|
|
| |
| if twin.ddi and twin.ddi.pairs: |
| lines = [] |
| for p in twin.ddi.pairs[:5]: |
| lines.append(f"- [{p.severity}] {p.drug_a} ↔ {p.drug_b}: {p.mechanism[:120]}") |
| parts.append(_section(f"DDI (regime: {twin.ddi.regimen_risk})", "\n".join(lines))) |
|
|
| |
| if twin.pharmacogen and twin.pharmacogen.assessments: |
| actionable = [a for a in twin.pharmacogen.assessments if a.cpic_level in ("A", "B")][:5] |
| if actionable: |
| lines = [ |
| f"- {a.gene}{f' {a.variant}' if a.variant else ''} × {a.drug} (CPIC {a.cpic_level}): {a.recommendation[:120]}" |
| for a in actionable |
| ] |
| parts.append(_section("Farmacogenômica (acionável)", "\n".join(lines))) |
|
|
| |
| if twin.family: |
| f = twin.family |
| rels = ", ".join(f"{r.relation} {round(r.recurrence_risk*100)}%" for r in (f.relatives or [])[:3]) |
| if rels: |
| parts.append(_section("Pedigree", f"- Modo: {f.inheritance_mode}\n- Recorrência: {rels}")) |
|
|
| |
| if twin.protocol_compliance and twin.protocol_compliance.gaps: |
| pc = twin.protocol_compliance |
| gap_lines = [f"- [{g.priority}] {g.expected}" for g in pc.gaps[:5]] |
| parts.append(_section( |
| f"PCDT compliance ({round(pc.score * 100)}%)", |
| "\n".join(gap_lines) if gap_lines else "Sem gaps.", |
| )) |
|
|
| |
| if twin.reverse_pheno and twin.reverse_pheno.items: |
| lines = [f"- {it.name} ({it.hpo_id}, freq {round(it.expected_frequency*100)}%)" for it in twin.reverse_pheno.items[:5]] |
| parts.append(_section("O que ainda procurar", "\n".join(lines))) |
|
|
| |
| if twin.next_questions: |
| lines = [ |
| f"- {q.name} ({q.hpo_id}) — gain {q.information_gain:.2f} bits" |
| + (" [PCDT]" if q.asks_in_pcdt else "") |
| for q in twin.next_questions[:5] |
| ] |
| parts.append(_section("Próximas perguntas (info-gain)", "\n".join(lines))) |
|
|
| |
| if twin.sus_check: |
| s = twin.sus_check |
| sus_lines = [] |
| if s.has_pcdt: |
| sus_lines.append(f"- PCDT: sim ({s.pcdt_url or '—'})") |
| if s.therapy_pcdt_recommended: |
| sus_lines.append("- Terapias: " + ", ".join(s.therapy_pcdt_recommended[:5])) |
| if s.nearest_centro: |
| c = s.nearest_centro |
| sus_lines.append(f"- Centro mais próximo: {c.get('nome')} ({c.get('cidade')}/{c.get('uf')})") |
| if sus_lines: |
| parts.append(_section("SUS", "\n".join(sus_lines))) |
|
|
| out = "\n".join(p for p in parts if p) |
| if len(out) > max_chars: |
| out = out[: max_chars - 40].rstrip() + "\n\n…(twin truncado por orçamento de tokens)" |
| return out |
|
|
|
|
| |
|
|
| GEMEO_HEADER = "## Gêmeo Digital — contexto vivo do paciente" |
|
|
| GEMEO_INSTRUCTIONS = ( |
| "Use o gêmeo abaixo para fundamentar TODA recomendação. Quando precisar " |
| "de evidência específica, chame a tool `gemeo_lookup(query)` para " |
| "recuperar subgrafo + coorte + literatura ancorados no caso. Nunca " |
| "invente fatos não presentes no gêmeo ou no resultado da lookup." |
| ) |
|
|
|
|
| |
|
|
| def serialize_twin_brief(twin) -> str: |
| """Compact ~300-token summary used for adaptive injection. |
| |
| Strategy: only top-3 dx + risk + SUS-aware drug + count of interesting |
| secondary signals (DDI, pgx, family) so the LLM knows MORE is available. |
| The model then calls `gemeo_lookup(query)` or `gemeo_state(section)` |
| to dive in only when needed. |
| """ |
| if twin is None: |
| return "" |
| lines: list[str] = [f"_Gemeo {twin.id} · {twin.embedding_dim}d_"] |
| if twin.diagnoses: |
| top3 = ", ".join( |
| f"{d.get('name', d.get('disease', '?'))} (ORPHA:{d.get('orpha', '?')}, p={d.get('probability', 0):.2f})" |
| for d in twin.diagnoses[:3] |
| ) |
| lines.append(f"Top-dx: {top3}") |
| if twin.risk: |
| r = twin.risk |
| lines.append( |
| f"Risco: severidade={r.overall_severity:.2f} · progressão={r.progression_risk:.2f} · urgência={r.treatment_urgency:.2f}" |
| ) |
| if twin.drugs and twin.drugs.candidates: |
| sus_drug = next((d for d in twin.drugs.candidates if d.get("sus_dispensed")), None) |
| if sus_drug: |
| lines.append(f"SUS dispensa: {sus_drug.get('name')} ({sus_drug.get('mechanism', '')[:60]})") |
| if twin.sus_check and twin.sus_check.has_pcdt: |
| lines.append("PCDT: vigente; chame `gemeo_state(\"sus_check\")` para detalhes") |
| |
| extras = [] |
| if twin.ddi and twin.ddi.pairs: extras.append(f"DDI({len(twin.ddi.pairs)})") |
| if twin.pharmacogen and twin.pharmacogen.n_actionable: extras.append(f"PGx({twin.pharmacogen.n_actionable})") |
| if twin.family: extras.append(f"Pedigree({twin.family.inheritance_mode})") |
| if twin.cohort and twin.cohort.members: extras.append(f"Cohort({len(twin.cohort.members)})") |
| if twin.next_questions: extras.append(f"Perguntas({len(twin.next_questions)})") |
| if extras: |
| lines.append( |
| "Mais: " + " · ".join(extras) + |
| ". Chame `gemeo_state(<section>)` ou `gemeo_lookup(<query>)` para acessar." |
| ) |
| return "\n".join(lines) |
|
|
|
|
| GEMEO_BRIEF_HEADER = "## Gêmeo Digital (resumo)" |
| GEMEO_BRIEF_INSTRUCTIONS = ( |
| "Resumo abaixo. Tools disponíveis: `gemeo_lookup(query, mode='local'|'global')` " |
| "para evidência grounded em subgrafo+coorte+literatura, e " |
| "`gemeo_state(section)` para uma das capabilities específicas. " |
| "Não invente fatos fora do gêmeo / das tools." |
| ) |
|
|
|
|
| async def inject_context( |
| case_id: str, |
| system_prompt: str, |
| *, |
| max_chars: int = DEFAULT_MAX_CHARS, |
| refresh: bool = False, |
| mode: str = "adaptive", |
| ) -> str: |
| """Prepend the Gemeo twin context to a system prompt. |
| |
| mode: "adaptive" (default) — short brief + tool pointers (~300 tokens) |
| "full" — entire serialized twin (~max_chars tokens) |
| "off" — no injection (returns original) |
| |
| Always falls back to the original prompt on any failure. |
| """ |
| if not case_id or mode == "off": |
| return system_prompt |
| try: |
| from . import core as gcore |
| twin = gcore.get_gemeo(case_id) |
| if twin is None or refresh: |
| twin = await gcore.query_gemeo(case_id) |
| if twin is None: |
| return system_prompt |
| if mode == "adaptive": |
| block = serialize_twin_brief(twin) |
| header = GEMEO_BRIEF_HEADER |
| instructions = GEMEO_BRIEF_INSTRUCTIONS |
| else: |
| block = serialize_twin_for_llm(twin, max_chars=max_chars) |
| header = GEMEO_HEADER |
| instructions = GEMEO_INSTRUCTIONS |
| if not block: |
| return system_prompt |
| return f"{header}\n{instructions}\n\n{block}\n\n---\n\n{system_prompt}" |
| except Exception as e: |
| logger.warning(f"inject_context failed for {case_id[:8] if case_id else '?'}: {e}") |
| return system_prompt |
|
|
|
|
| |
|
|
| |
| |
| |
|
|
| _HPO_RE = re.compile(r"\bHP:(\d{7})\b") |
| _ORPHA_RE = re.compile(r"\bORPHA:(\d{1,7})\b", re.IGNORECASE) |
| _GENE_RE = re.compile(r"\b([A-Z][A-Z0-9]{1,7})\b\s*(?:gene|mutation|variant|c\.|p\.)", re.IGNORECASE) |
|
|
|
|
| async def absorb_message( |
| case_id: str, |
| message_text: str, |
| *, |
| source: str = "user", |
| ) -> dict: |
| """Extract HPO / ORPHA / gene mentions from a free-text message and |
| feed them into the twin via `evolve_gemeo`. |
| |
| Returns a dict summarising what was absorbed (so the caller can show |
| a "X items added to twin" hint in the UI). |
| """ |
| if not case_id or not message_text: |
| return {"hpo": [], "orpha": [], "genes": [], "absorbed": False} |
|
|
| hpo_ids = list({f"HP:{m.group(1)}" for m in _HPO_RE.finditer(message_text)}) |
| orpha_ids = list({m.group(1) for m in _ORPHA_RE.finditer(message_text)}) |
| genes_raw = list({m.group(1).upper() for m in _GENE_RE.finditer(message_text)}) |
|
|
| if not (hpo_ids or orpha_ids or genes_raw): |
| return {"hpo": [], "orpha": [], "genes": [], "absorbed": False} |
|
|
| new_phenotypes = [ |
| {"hpo_id": h, "name": h, "source": source, "status": "extracted"} |
| for h in hpo_ids |
| ] |
| new_genes = [ |
| {"symbol": g, "source": source, "status": "extracted"} |
| for g in genes_raw |
| ] |
|
|
| try: |
| from . import core as gcore |
| if new_phenotypes or new_genes: |
| await gcore.evolve_gemeo( |
| case_id, |
| new_phenotypes=new_phenotypes, |
| new_genes=new_genes, |
| ) |
| return { |
| "hpo": hpo_ids, |
| "orpha": orpha_ids, |
| "genes": genes_raw, |
| "absorbed": True, |
| "source": source, |
| } |
| except Exception as e: |
| logger.warning(f"absorb_message failed: {e}") |
| return {"hpo": hpo_ids, "orpha": orpha_ids, "genes": genes_raw, "absorbed": False, "error": str(e)} |
|
|