| """MCP (Model Context Protocol) client for remote tool servers. |
| |
| Supports Streamable HTTP transport (JSON-RPC 2.0 over HTTP). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import uuid |
| from typing import Any |
|
|
| import httpx |
|
|
| from .tools import Tool |
|
|
|
|
| |
| |
| |
|
|
|
|
| class MCPClient: |
| """Connect to a remote MCP server via Streamable HTTP transport.""" |
|
|
| def __init__( |
| self, |
| url: str, |
| headers: dict[str, str] | None = None, |
| timeout: float = 30.0, |
| ) -> None: |
| self.url = url |
| self.headers = headers or {} |
| self.timeout = timeout |
| self._session_id: str | None = None |
| self._initialized = False |
|
|
| |
| |
| |
|
|
| def _request(self, method: str, params: dict[str, Any] | None = None) -> Any: |
| """Send a JSON-RPC request and return the result.""" |
| payload: dict[str, Any] = { |
| "jsonrpc": "2.0", |
| "id": str(uuid.uuid4()), |
| "method": method, |
| } |
| if params is not None: |
| payload["params"] = params |
|
|
| headers = { |
| "Content-Type": "application/json", |
| "Accept": "application/json, text/event-stream", |
| **self.headers, |
| } |
| if self._session_id: |
| headers["Mcp-Session-Id"] = self._session_id |
|
|
| with httpx.Client(timeout=self.timeout) as client: |
| resp = client.post(self.url, json=payload, headers=headers) |
|
|
| |
| sid = resp.headers.get("mcp-session-id") |
| if sid: |
| self._session_id = sid |
|
|
| resp.raise_for_status() |
| content_type = resp.headers.get("content-type", "") |
|
|
| |
| if "text/event-stream" in content_type: |
| return self._parse_sse_response(resp.text) |
|
|
| |
| data = resp.json() |
| if "error" in data: |
| raise RuntimeError(f"MCP error: {data['error']}") |
| return data.get("result") |
|
|
| def _parse_sse_response(self, text: str) -> Any: |
| """Parse SSE response text, extracting JSON-RPC results.""" |
| result = None |
| for line in text.split("\n"): |
| line = line.strip() |
| if line.startswith("data:"): |
| data_str = line[len("data:") :].strip() |
| if data_str: |
| try: |
| data = json.loads(data_str) |
| if "result" in data: |
| result = data["result"] |
| elif "error" in data: |
| raise RuntimeError(f"MCP error: {data['error']}") |
| except json.JSONDecodeError: |
| continue |
| return result |
|
|
| |
| |
| |
|
|
| def initialize(self) -> dict[str, Any]: |
| """Initialize the MCP session.""" |
| result = self._request( |
| "initialize", |
| { |
| "protocolVersion": "2025-03-26", |
| "capabilities": {}, |
| "clientInfo": {"name": "mythos-agent", "version": "0.1.0"}, |
| }, |
| ) |
| self._initialized = True |
| |
| self._notify("notifications/initialized", {}) |
| return result or {} |
|
|
| def _notify(self, method: str, params: dict[str, Any]) -> None: |
| """Send a JSON-RPC notification (no id, no response expected).""" |
| payload = {"jsonrpc": "2.0", "method": method, "params": params} |
| headers = { |
| "Content-Type": "application/json", |
| "Accept": "application/json, text/event-stream", |
| **self.headers, |
| } |
| if self._session_id: |
| headers["Mcp-Session-Id"] = self._session_id |
|
|
| with httpx.Client(timeout=self.timeout) as client: |
| client.post(self.url, json=payload, headers=headers) |
|
|
| |
| |
| |
|
|
| def list_tools(self) -> list[dict[str, Any]]: |
| """List available tools from the server.""" |
| if not self._initialized: |
| self.initialize() |
| result = self._request("tools/list", {}) |
| return (result or {}).get("tools", []) |
|
|
| |
| |
| |
|
|
| def call_tool(self, name: str, arguments: dict[str, Any]) -> Any: |
| """Call a tool on the server.""" |
| if not self._initialized: |
| self.initialize() |
| return self._request("tools/call", {"name": name, "arguments": arguments}) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _json_schema_type_to_python(type_str: str) -> type: |
| """Map JSON Schema type to Python type.""" |
| mapping = {"string": str, "integer": int, "number": float, "boolean": bool, "array": list, "object": dict} |
| return mapping.get(type_str, str) |
|
|
|
|
| def mcp_tool(client: MCPClient, mcp_tool_def: dict[str, Any]) -> Tool: |
| """Wrap an MCP tool definition as a local Tool instance. |
| |
| Args: |
| client: Connected MCPClient. |
| mcp_tool_def: Tool dict from ``tools/list`` response. |
| """ |
| name = mcp_tool_def["name"] |
| description = mcp_tool_def.get("description", "") |
| input_schema = mcp_tool_def.get("inputSchema", {}) |
|
|
| |
| properties: dict[str, dict] = {} |
| for param_name, param_def in input_schema.get("properties", {}).items(): |
| prop: dict[str, Any] = {"type": param_def.get("type", "string")} |
| if "description" in param_def: |
| prop["description"] = param_def["description"] |
| if "enum" in param_def: |
| prop["enum"] = param_def["enum"] |
| properties[param_name] = prop |
|
|
| required = input_schema.get("required", []) |
|
|
| parameters = { |
| "type": "object", |
| "properties": properties, |
| "required": required, |
| } |
|
|
| def handler(**kwargs: Any) -> str: |
| result = client.call_tool(name, kwargs) |
| |
| if isinstance(result, dict) and "content" in result: |
| parts = [] |
| for block in result["content"]: |
| if isinstance(block, dict) and block.get("type") == "text": |
| parts.append(block.get("text", "")) |
| return "\n".join(parts) if parts else json.dumps(result) |
| return json.dumps(result) if not isinstance(result, str) else result |
|
|
| return Tool(name=name, description=description, parameters=parameters, handler=handler) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def load_mcp_tools(url: str, headers: dict[str, str] | None = None) -> list[Tool]: |
| """Connect to an MCP server and return all its tools as Tool instances.""" |
| client = MCPClient(url=url, headers=headers) |
| client.initialize() |
| tools_defs = client.list_tools() |
| return [mcp_tool(client, td) for td in tools_defs] |
|
|
|
|
| |
| |
| |
|
|
| MCP_SERVERS: dict[str, dict[str, Any]] = { |
| "exa": { |
| "url": "https://mcp.exa.ai/mcp", |
| "description": "Web search via Exa", |
| }, |
| "context7": { |
| "url": "https://mcp.context7.com/mcp", |
| "description": "Library documentation lookup", |
| }, |
| "grep_app": { |
| "url": "https://mcp.grep.app", |
| "description": "GitHub code search via grep.app", |
| }, |
| "github": { |
| "url": "https://gitmcp.io/docs", |
| "description": "GitHub code search and repository info", |
| }, |
| } |
|
|
|
|
| def load_all_mcp_tools( |
| servers: dict[str, dict[str, Any]] | None = None, |
| ) -> dict[str, list[Tool]]: |
| """Load tools from all configured MCP servers. |
| |
| Returns: |
| ``{server_name: [Tool, ...]}`` |
| """ |
| if servers is None: |
| servers = MCP_SERVERS |
|
|
| result: dict[str, list[Tool]] = {} |
| for name, cfg in servers.items(): |
| try: |
| result[name] = load_mcp_tools( |
| url=cfg["url"], |
| headers=cfg.get("headers"), |
| ) |
| except Exception as e: |
| print(f"[MCP] Failed to load {name}: {e}") |
| result[name] = [] |
| return result |
|
|