File size: 3,930 Bytes
b0c0df0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import asyncio
import json
from typing import List, Union

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import AudioContent, ImageContent, TextContent
from openai import OpenAI


class MCPClient:
    def __init__(self, server_path: str):
        """
        Initialize the MCPClient with the path to the MCP server.
        """
        self.server_path = server_path

    async def get_function_list(self):
        """
        Connect to the MCP server and retrieve the list of available functions.
        """
        server_params = StdioServerParameters(command="python", args=[self.server_path])
        async with stdio_client(server=server_params) as (read_stream, write_stream):
            async with ClientSession(read_stream, write_stream) as session:
                await session.initialize()

                tools = (await session.list_tools()).tools

                functions = []
                for tool in tools:
                    functions.append({"type": "function", "function": {"name": tool.name, "description": tool.description or "", "parameters": tool.inputSchema}})
                return functions

    async def run_tool(self, tool_name: str, tool_args: dict):
        """
        Run a specific tool with the given arguments.
        :param tool_name: Name of the tool to run.
        :param tool_args: Arguments for the tool.
        :return: Result of the tool execution.
        """
        server_params = StdioServerParameters(command="python", args=[self.server_path])
        async with stdio_client(server=server_params) as (read_stream, write_stream):
            async with ClientSession(read_stream, write_stream) as session:
                await session.initialize()

                result = await session.call_tool(tool_name, tool_args)
                return result

    def convert_result_to_openai_format(self, result: Union[ImageContent, TextContent, AudioContent, List[Union[ImageContent, TextContent, AudioContent]]]) -> dict:
        """
        Convert the result from the MCP tool to OpenAI compatible format.
        :param result: Result from the MCP tool.
        :return: Converted result.
        """
        if isinstance(result, list):
            results = []
            for item in result:
                results.append(self.convert_result_to_openai_format(item))
            return results
        if isinstance(result, ImageContent):
            return [{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{result.data}"}}]
        elif isinstance(result, TextContent):
            return [{"type": "text", "text": result.text}]
        elif isinstance(result, AudioContent):
            return [{"type": "audio_url", "audio_url": {"url": f"data:audio/wav;base64,{result.data}"}}]
        else:
            raise ValueError(f"Unsupported result type : {type(result)}")

    def get_function_list_sync(self):
        """
        Synchronous wrapper for get_function_list.
        Connect to the MCP server and retrieve the list of available functions.
        :return: List of available functions in OpenAI-compatible format.
        """
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            return loop.run_until_complete(self.get_function_list())
        finally:
            loop.close()

    def run_tool_sync(self, tool_name: str, tool_args: dict):
        """
        Synchronous wrapper for run_tool.
        Run a specific tool with the given arguments.
        :param tool_name: Name of the tool to run.
        :param tool_args: Arguments for the tool.
        :return: Result of the tool execution.
        """
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            return loop.run_until_complete(self.run_tool(tool_name, tool_args))
        finally:
            loop.close()