| import asyncio | |
| import json | |
| from typing import List, Union | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.stdio import stdio_client | |
| from mcp.types import AudioContent, ImageContent, TextContent | |
| from openai import OpenAI | |
| class MCPClient: | |
| def __init__(self, server_path: str): | |
| """ | |
| Initialize the MCPClient with the path to the MCP server. | |
| """ | |
| self.server_path = server_path | |
| async def get_function_list(self): | |
| """ | |
| Connect to the MCP server and retrieve the list of available functions. | |
| """ | |
| server_params = StdioServerParameters(command="python", args=[self.server_path]) | |
| async with stdio_client(server=server_params) as (read_stream, write_stream): | |
| async with ClientSession(read_stream, write_stream) as session: | |
| await session.initialize() | |
| tools = (await session.list_tools()).tools | |
| functions = [] | |
| for tool in tools: | |
| functions.append({"type": "function", "function": {"name": tool.name, "description": tool.description or "", "parameters": tool.inputSchema}}) | |
| return functions | |
| async def run_tool(self, tool_name: str, tool_args: dict): | |
| """ | |
| Run a specific tool with the given arguments. | |
| :param tool_name: Name of the tool to run. | |
| :param tool_args: Arguments for the tool. | |
| :return: Result of the tool execution. | |
| """ | |
| server_params = StdioServerParameters(command="python", args=[self.server_path]) | |
| async with stdio_client(server=server_params) as (read_stream, write_stream): | |
| async with ClientSession(read_stream, write_stream) as session: | |
| await session.initialize() | |
| result = await session.call_tool(tool_name, tool_args) | |
| return result | |
| def convert_result_to_openai_format(self, result: Union[ImageContent, TextContent, AudioContent, List[Union[ImageContent, TextContent, AudioContent]]]) -> dict: | |
| """ | |
| Convert the result from the MCP tool to OpenAI compatible format. | |
| :param result: Result from the MCP tool. | |
| :return: Converted result. | |
| """ | |
| if isinstance(result, list): | |
| results = [] | |
| for item in result: | |
| results.append(self.convert_result_to_openai_format(item)) | |
| return results | |
| if isinstance(result, ImageContent): | |
| return [{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{result.data}"}}] | |
| elif isinstance(result, TextContent): | |
| return [{"type": "text", "text": result.text}] | |
| elif isinstance(result, AudioContent): | |
| return [{"type": "audio_url", "audio_url": {"url": f"data:audio/wav;base64,{result.data}"}}] | |
| else: | |
| raise ValueError(f"Unsupported result type : {type(result)}") | |
| def get_function_list_sync(self): | |
| """ | |
| Synchronous wrapper for get_function_list. | |
| Connect to the MCP server and retrieve the list of available functions. | |
| :return: List of available functions in OpenAI-compatible format. | |
| """ | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(self.get_function_list()) | |
| finally: | |
| loop.close() | |
| def run_tool_sync(self, tool_name: str, tool_args: dict): | |
| """ | |
| Synchronous wrapper for run_tool. | |
| Run a specific tool with the given arguments. | |
| :param tool_name: Name of the tool to run. | |
| :param tool_args: Arguments for the tool. | |
| :return: Result of the tool execution. | |
| """ | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(self.run_tool(tool_name, tool_args)) | |
| finally: | |
| loop.close() | |