gemeo-twin-stack / src /gemeo /mcp_server.py
timmers's picture
GEMEO world-model β€” initial release (module + NeuralSurv ckpt + RareBench v49 + KG embeddings)
089d665 verified
"""Gemeo MCP server β€” exposes the digital twin as an Anthropic Model
Context Protocol server (stdio transport).
Clients (Claude Desktop, Claude Code, custom agents, future tools) get:
- tools: gemeo.lookup, gemeo.state, gemeo.absorb, gemeo.evolve, gemeo.simulate, gemeo.consult
- resources: gemeo://twin/{case_id}/context, gemeo://twin/{case_id}/subgraph, gemeo://twin/{case_id}/full
Run as:
python -m gemeo.mcp_server
Register in mcp_servers.json:
{
"name": "gemeo",
"transport": "stdio",
"command": "python",
"args": ["-m", "gemeo.mcp_server"],
"description": "Gemeo digital twin β€” graph-RAG over patient context, SUS-aware"
}
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
logger = logging.getLogger("gemeo.mcp_server")
def _serve_via_mcp_sdk():
"""Use the official MCP Python SDK if available."""
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource
except ImportError as e:
raise RuntimeError(f"mcp SDK not available: {e}")
server = Server("gemeo")
# ─── Tools ─────────────────────────────────────────────────────────
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="gemeo.lookup",
description=(
"GraphRAG over the patient digital twin. Returns subgraph triples + "
"KG community summaries + (global mode) cohort exemplars + PubMed literature. "
"Use whenever you need grounded clinical evidence."
),
inputSchema={
"type": "object",
"required": ["case_id", "query"],
"properties": {
"case_id": {"type": "string"},
"query": {"type": "string"},
"mode": {"type": "string", "enum": ["local", "global"], "default": "local"},
},
},
),
Tool(
name="gemeo.state",
description=(
"Return the live digital twin state for a case. "
"Pass `section` to restrict to one capability."
),
inputSchema={
"type": "object",
"required": ["case_id"],
"properties": {
"case_id": {"type": "string"},
"section": {
"type": "string",
"enum": [
"diagnoses", "risk", "trajectory", "drugs", "ddi",
"pharmacogen", "family", "reverse_pheno",
"protocol_compliance", "next_questions", "sus_check",
"cohort", "subgraph",
],
},
},
},
),
Tool(
name="gemeo.absorb",
description=(
"Extract clinical entities (HPO, gene, lab, treatment) from a free-text "
"message via LLM-based structured extraction (negation/family-history aware) "
"and feed into the twin via evolve_gemeo. Returns counts of items added."
),
inputSchema={
"type": "object",
"required": ["case_id", "message"],
"properties": {
"case_id": {"type": "string"},
"message": {"type": "string"},
"source": {"type": "string", "default": "user"},
},
},
),
Tool(
name="gemeo.simulate",
description="Monte Carlo simulation of trajectory under stochastic intervention timing/adherence.",
inputSchema={
"type": "object",
"required": ["case_id"],
"properties": {
"case_id": {"type": "string"},
"n_runs": {"type": "integer", "default": 30},
"intervention": {"type": "object"},
"horizons_months": {"type": "array", "items": {"type": "integer"}, "default": [6, 12, 24]},
},
},
),
Tool(
name="gemeo.consult",
description="Multi-specialist agent consult on the twin (geneticist+neuro+ped+imuno+cardio+farma).",
inputSchema={
"type": "object",
"required": ["case_id"],
"properties": {
"case_id": {"type": "string"},
"panel": {"type": "array", "items": {"type": "string"}},
"question": {"type": "string"},
},
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
try:
if name == "gemeo.lookup":
from . import graphrag
res = await graphrag.retrieve(
arguments["case_id"], arguments["query"],
mode=arguments.get("mode", "local"),
)
return [TextContent(type="text", text=graphrag.format_for_llm(res))]
if name == "gemeo.state":
from . import core as gcore, llm_context
case_id = arguments["case_id"]
twin = gcore.get_gemeo(case_id) or await gcore.query_gemeo(case_id)
if twin is None:
return [TextContent(type="text", text=f"_no twin for {case_id}_")]
section = arguments.get("section")
if section is None:
return [TextContent(type="text", text=llm_context.serialize_twin_for_llm(twin))]
val = getattr(twin, section, None)
if val is None:
return [TextContent(type="text", text=f"_section `{section}` empty_")]
from dataclasses import asdict
d = asdict(val) if hasattr(val, "__dataclass_fields__") else val
return [TextContent(type="text", text=json.dumps(d, default=str, indent=2))]
if name == "gemeo.absorb":
from . import extractor
out = await extractor.absorb(
arguments["case_id"], arguments["message"],
source=arguments.get("source", "user"),
)
return [TextContent(type="text", text=json.dumps(out, default=str, indent=2))]
if name == "gemeo.simulate":
from . import core as gcore
out = await gcore.simulate(
arguments["case_id"],
n_runs=arguments.get("n_runs", 30),
intervention=arguments.get("intervention"),
horizons_months=arguments.get("horizons_months", [6, 12, 24]),
)
return [TextContent(type="text", text=json.dumps(out, default=str, indent=2))]
if name == "gemeo.consult":
from . import core as gcore
out = await gcore.consult(
arguments["case_id"],
panel=arguments.get("panel"),
question=arguments.get("question"),
)
return [TextContent(type="text", text=json.dumps(out, default=str, indent=2))]
return [TextContent(type="text", text=f"unknown tool: {name}")]
except Exception as e:
logger.exception("tool failed")
return [TextContent(type="text", text=f"_error: {e}_")]
# ─── Resources ─────────────────────────────────────────────────────
@server.list_resources()
async def list_resources() -> list[Resource]:
# Dynamic resources are hard to enumerate without a session.
# Clients can request gemeo://twin/{case_id}/<section> directly.
return [
Resource(
uri="gemeo://twin/{case_id}/context",
name="Twin LLM context",
description="Markdown block with the live patient twin (auto-injected by Gemeo's Python runtime).",
mimeType="text/markdown",
),
Resource(
uri="gemeo://twin/{case_id}/subgraph",
name="Reasoning subgraph",
description="JSON: nodes + edges + narrated paths Patient→...→Disease.",
mimeType="application/json",
),
Resource(
uri="gemeo://twin/{case_id}/full",
name="Full twin snapshot",
description="JSON: every capability for the case.",
mimeType="application/json",
),
]
@server.read_resource()
async def read_resource(uri: str) -> str:
# parse `gemeo://twin/<case_id>/<section>`
if not uri.startswith("gemeo://twin/"):
return f"_unknown resource: {uri}_"
rest = uri[len("gemeo://twin/"):]
try:
case_id, section = rest.split("/", 1)
except ValueError:
return "_malformed gemeo URI_"
from . import core as gcore, llm_context
twin = gcore.get_gemeo(case_id) or await gcore.query_gemeo(case_id)
if twin is None:
return f"_no twin for {case_id}_"
if section == "context":
return llm_context.serialize_twin_for_llm(twin)
if section == "subgraph":
from dataclasses import asdict
return json.dumps(asdict(twin.subgraph) if twin.subgraph else {}, default=str, indent=2)
if section == "full":
return json.dumps(twin.to_dict(), default=str, indent=2)[:60000]
return f"_unknown section: {section}_"
return server, stdio_server
async def main_async():
logging.basicConfig(level=logging.INFO)
server, stdio_server = _serve_via_mcp_sdk()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
def main():
try:
asyncio.run(main_async())
except RuntimeError as e:
# MCP SDK missing β€” print helpful message
print(json.dumps({"error": str(e), "hint": "pip install mcp"}))
raise
if __name__ == "__main__":
main()