"""Gemeo MCP server — exposes the digital twin as an Anthropic Model Context Protocol server (stdio transport). Clients (Claude Desktop, Claude Code, custom agents, future tools) get: - tools: gemeo.lookup, gemeo.state, gemeo.absorb, gemeo.evolve, gemeo.simulate, gemeo.consult - resources: gemeo://twin/{case_id}/context, gemeo://twin/{case_id}/subgraph, gemeo://twin/{case_id}/full Run as: python -m gemeo.mcp_server Register in mcp_servers.json: { "name": "gemeo", "transport": "stdio", "command": "python", "args": ["-m", "gemeo.mcp_server"], "description": "Gemeo digital twin — graph-RAG over patient context, SUS-aware" } """ from __future__ import annotations import asyncio import json import logging from typing import Any logger = logging.getLogger("gemeo.mcp_server") def _serve_via_mcp_sdk(): """Use the official MCP Python SDK if available.""" try: from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, Resource except ImportError as e: raise RuntimeError(f"mcp SDK not available: {e}") server = Server("gemeo") # ─── Tools ───────────────────────────────────────────────────────── @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="gemeo.lookup", description=( "GraphRAG over the patient digital twin. Returns subgraph triples + " "KG community summaries + (global mode) cohort exemplars + PubMed literature. " "Use whenever you need grounded clinical evidence." ), inputSchema={ "type": "object", "required": ["case_id", "query"], "properties": { "case_id": {"type": "string"}, "query": {"type": "string"}, "mode": {"type": "string", "enum": ["local", "global"], "default": "local"}, }, }, ), Tool( name="gemeo.state", description=( "Return the live digital twin state for a case. " "Pass `section` to restrict to one capability." ), inputSchema={ "type": "object", "required": ["case_id"], "properties": { "case_id": {"type": "string"}, "section": { "type": "string", "enum": [ "diagnoses", "risk", "trajectory", "drugs", "ddi", "pharmacogen", "family", "reverse_pheno", "protocol_compliance", "next_questions", "sus_check", "cohort", "subgraph", ], }, }, }, ), Tool( name="gemeo.absorb", description=( "Extract clinical entities (HPO, gene, lab, treatment) from a free-text " "message via LLM-based structured extraction (negation/family-history aware) " "and feed into the twin via evolve_gemeo. Returns counts of items added." ), inputSchema={ "type": "object", "required": ["case_id", "message"], "properties": { "case_id": {"type": "string"}, "message": {"type": "string"}, "source": {"type": "string", "default": "user"}, }, }, ), Tool( name="gemeo.simulate", description="Monte Carlo simulation of trajectory under stochastic intervention timing/adherence.", inputSchema={ "type": "object", "required": ["case_id"], "properties": { "case_id": {"type": "string"}, "n_runs": {"type": "integer", "default": 30}, "intervention": {"type": "object"}, "horizons_months": {"type": "array", "items": {"type": "integer"}, "default": [6, 12, 24]}, }, }, ), Tool( name="gemeo.consult", description="Multi-specialist agent consult on the twin (geneticist+neuro+ped+imuno+cardio+farma).", inputSchema={ "type": "object", "required": ["case_id"], "properties": { "case_id": {"type": "string"}, "panel": {"type": "array", "items": {"type": "string"}}, "question": {"type": "string"}, }, }, ), ] @server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: try: if name == "gemeo.lookup": from . import graphrag res = await graphrag.retrieve( arguments["case_id"], arguments["query"], mode=arguments.get("mode", "local"), ) return [TextContent(type="text", text=graphrag.format_for_llm(res))] if name == "gemeo.state": from . import core as gcore, llm_context case_id = arguments["case_id"] twin = gcore.get_gemeo(case_id) or await gcore.query_gemeo(case_id) if twin is None: return [TextContent(type="text", text=f"_no twin for {case_id}_")] section = arguments.get("section") if section is None: return [TextContent(type="text", text=llm_context.serialize_twin_for_llm(twin))] val = getattr(twin, section, None) if val is None: return [TextContent(type="text", text=f"_section `{section}` empty_")] from dataclasses import asdict d = asdict(val) if hasattr(val, "__dataclass_fields__") else val return [TextContent(type="text", text=json.dumps(d, default=str, indent=2))] if name == "gemeo.absorb": from . import extractor out = await extractor.absorb( arguments["case_id"], arguments["message"], source=arguments.get("source", "user"), ) return [TextContent(type="text", text=json.dumps(out, default=str, indent=2))] if name == "gemeo.simulate": from . import core as gcore out = await gcore.simulate( arguments["case_id"], n_runs=arguments.get("n_runs", 30), intervention=arguments.get("intervention"), horizons_months=arguments.get("horizons_months", [6, 12, 24]), ) return [TextContent(type="text", text=json.dumps(out, default=str, indent=2))] if name == "gemeo.consult": from . import core as gcore out = await gcore.consult( arguments["case_id"], panel=arguments.get("panel"), question=arguments.get("question"), ) return [TextContent(type="text", text=json.dumps(out, default=str, indent=2))] return [TextContent(type="text", text=f"unknown tool: {name}")] except Exception as e: logger.exception("tool failed") return [TextContent(type="text", text=f"_error: {e}_")] # ─── Resources ───────────────────────────────────────────────────── @server.list_resources() async def list_resources() -> list[Resource]: # Dynamic resources are hard to enumerate without a session. # Clients can request gemeo://twin/{case_id}/
directly. return [ Resource( uri="gemeo://twin/{case_id}/context", name="Twin LLM context", description="Markdown block with the live patient twin (auto-injected by Gemeo's Python runtime).", mimeType="text/markdown", ), Resource( uri="gemeo://twin/{case_id}/subgraph", name="Reasoning subgraph", description="JSON: nodes + edges + narrated paths Patient→...→Disease.", mimeType="application/json", ), Resource( uri="gemeo://twin/{case_id}/full", name="Full twin snapshot", description="JSON: every capability for the case.", mimeType="application/json", ), ] @server.read_resource() async def read_resource(uri: str) -> str: # parse `gemeo://twin//
` if not uri.startswith("gemeo://twin/"): return f"_unknown resource: {uri}_" rest = uri[len("gemeo://twin/"):] try: case_id, section = rest.split("/", 1) except ValueError: return "_malformed gemeo URI_" from . import core as gcore, llm_context twin = gcore.get_gemeo(case_id) or await gcore.query_gemeo(case_id) if twin is None: return f"_no twin for {case_id}_" if section == "context": return llm_context.serialize_twin_for_llm(twin) if section == "subgraph": from dataclasses import asdict return json.dumps(asdict(twin.subgraph) if twin.subgraph else {}, default=str, indent=2) if section == "full": return json.dumps(twin.to_dict(), default=str, indent=2)[:60000] return f"_unknown section: {section}_" return server, stdio_server async def main_async(): logging.basicConfig(level=logging.INFO) server, stdio_server = _serve_via_mcp_sdk() async with stdio_server() as (read_stream, write_stream): await server.run(read_stream, write_stream, server.create_initialization_options()) def main(): try: asyncio.run(main_async()) except RuntimeError as e: # MCP SDK missing — print helpful message print(json.dumps({"error": str(e), "hint": "pip install mcp"})) raise if __name__ == "__main__": main()