OpenMythos / agent /mcp.py
KingNish's picture
BetterTools
72cc67e
Raw
History Blame Contribute Delete
9.19 kB
"""MCP (Model Context Protocol) client for remote tool servers.
Supports Streamable HTTP transport (JSON-RPC 2.0 over HTTP).
"""
from __future__ import annotations
import json
import uuid
from typing import Any
import httpx
from .tools import Tool
# ---------------------------------------------------------------------------
# MCP Client
# ---------------------------------------------------------------------------
class MCPClient:
"""Connect to a remote MCP server via Streamable HTTP transport."""
def __init__(
self,
url: str,
headers: dict[str, str] | None = None,
timeout: float = 30.0,
) -> None:
self.url = url
self.headers = headers or {}
self.timeout = timeout
self._session_id: str | None = None
self._initialized = False
# ------------------------------------------------------------------
# Low-level JSON-RPC
# ------------------------------------------------------------------
def _request(self, method: str, params: dict[str, Any] | None = None) -> Any:
"""Send a JSON-RPC request and return the result."""
payload: dict[str, Any] = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
}
if params is not None:
payload["params"] = params
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
**self.headers,
}
if self._session_id:
headers["Mcp-Session-Id"] = self._session_id
with httpx.Client(timeout=self.timeout) as client:
resp = client.post(self.url, json=payload, headers=headers)
# Capture session ID from response
sid = resp.headers.get("mcp-session-id")
if sid:
self._session_id = sid
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
# Handle SSE response
if "text/event-stream" in content_type:
return self._parse_sse_response(resp.text)
# Handle JSON response
data = resp.json()
if "error" in data:
raise RuntimeError(f"MCP error: {data['error']}")
return data.get("result")
def _parse_sse_response(self, text: str) -> Any:
"""Parse SSE response text, extracting JSON-RPC results."""
result = None
for line in text.split("\n"):
line = line.strip()
if line.startswith("data:"):
data_str = line[len("data:") :].strip()
if data_str:
try:
data = json.loads(data_str)
if "result" in data:
result = data["result"]
elif "error" in data:
raise RuntimeError(f"MCP error: {data['error']}")
except json.JSONDecodeError:
continue
return result
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def initialize(self) -> dict[str, Any]:
"""Initialize the MCP session."""
result = self._request(
"initialize",
{
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "mythos-agent", "version": "0.1.0"},
},
)
self._initialized = True
# Send initialized notification (no response expected)
self._notify("notifications/initialized", {})
return result or {}
def _notify(self, method: str, params: dict[str, Any]) -> None:
"""Send a JSON-RPC notification (no id, no response expected)."""
payload = {"jsonrpc": "2.0", "method": method, "params": params}
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
**self.headers,
}
if self._session_id:
headers["Mcp-Session-Id"] = self._session_id
with httpx.Client(timeout=self.timeout) as client:
client.post(self.url, json=payload, headers=headers)
# ------------------------------------------------------------------
# Tool discovery
# ------------------------------------------------------------------
def list_tools(self) -> list[dict[str, Any]]:
"""List available tools from the server."""
if not self._initialized:
self.initialize()
result = self._request("tools/list", {})
return (result or {}).get("tools", [])
# ------------------------------------------------------------------
# Tool execution
# ------------------------------------------------------------------
def call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
"""Call a tool on the server."""
if not self._initialized:
self.initialize()
return self._request("tools/call", {"name": name, "arguments": arguments})
# ---------------------------------------------------------------------------
# MCP Tool wrapper
# ---------------------------------------------------------------------------
def _json_schema_type_to_python(type_str: str) -> type:
"""Map JSON Schema type to Python type."""
mapping = {"string": str, "integer": int, "number": float, "boolean": bool, "array": list, "object": dict}
return mapping.get(type_str, str)
def mcp_tool(client: MCPClient, mcp_tool_def: dict[str, Any]) -> Tool:
"""Wrap an MCP tool definition as a local Tool instance.
Args:
client: Connected MCPClient.
mcp_tool_def: Tool dict from ``tools/list`` response.
"""
name = mcp_tool_def["name"]
description = mcp_tool_def.get("description", "")
input_schema = mcp_tool_def.get("inputSchema", {})
# Convert inputSchema to OpenAI-style parameters
properties: dict[str, dict] = {}
for param_name, param_def in input_schema.get("properties", {}).items():
prop: dict[str, Any] = {"type": param_def.get("type", "string")}
if "description" in param_def:
prop["description"] = param_def["description"]
if "enum" in param_def:
prop["enum"] = param_def["enum"]
properties[param_name] = prop
required = input_schema.get("required", [])
parameters = {
"type": "object",
"properties": properties,
"required": required,
}
def handler(**kwargs: Any) -> str:
result = client.call_tool(name, kwargs)
# Extract text content from MCP result
if isinstance(result, dict) and "content" in result:
parts = []
for block in result["content"]:
if isinstance(block, dict) and block.get("type") == "text":
parts.append(block.get("text", ""))
return "\n".join(parts) if parts else json.dumps(result)
return json.dumps(result) if not isinstance(result, str) else result
return Tool(name=name, description=description, parameters=parameters, handler=handler)
# ---------------------------------------------------------------------------
# Convenience: connect and load all tools
# ---------------------------------------------------------------------------
def load_mcp_tools(url: str, headers: dict[str, str] | None = None) -> list[Tool]:
"""Connect to an MCP server and return all its tools as Tool instances."""
client = MCPClient(url=url, headers=headers)
client.initialize()
tools_defs = client.list_tools()
return [mcp_tool(client, td) for td in tools_defs]
# ---------------------------------------------------------------------------
# Pre-configured MCP servers
# ---------------------------------------------------------------------------
MCP_SERVERS: dict[str, dict[str, Any]] = {
"exa": {
"url": "https://mcp.exa.ai/mcp",
"description": "Web search via Exa",
},
"context7": {
"url": "https://mcp.context7.com/mcp",
"description": "Library documentation lookup",
},
"grep_app": {
"url": "https://mcp.grep.app",
"description": "GitHub code search via grep.app",
},
"github": {
"url": "https://gitmcp.io/docs",
"description": "GitHub code search and repository info",
},
}
def load_all_mcp_tools(
servers: dict[str, dict[str, Any]] | None = None,
) -> dict[str, list[Tool]]:
"""Load tools from all configured MCP servers.
Returns:
``{server_name: [Tool, ...]}``
"""
if servers is None:
servers = MCP_SERVERS
result: dict[str, list[Tool]] = {}
for name, cfg in servers.items():
try:
result[name] = load_mcp_tools(
url=cfg["url"],
headers=cfg.get("headers"),
)
except Exception as e:
print(f"[MCP] Failed to load {name}: {e}")
result[name] = []
return result