File size: 6,814 Bytes
51c503d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
from __future__ import annotations
from pydantic_ai import Agent, RunContext, format_as_xml
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.mcp import MCPServerStreamableHTTP, MCPServerSSE, MCPServerStdio
from dataclasses import dataclass
from datetime import datetime
from pydantic import Field
import json
from pydantic_ai.messages import (
ModelMessage,
FinalResultEvent,
FunctionToolCallEvent,
FunctionToolResultEvent,
PartDeltaEvent,
PartStartEvent,
TextPartDelta,
ToolCallPartDelta,
)
@dataclass
class Api_keys:
api_keys: dict
@dataclass
class Message_state:
messages: list[ModelMessage]
class MCP_Agent:
def __init__(self, api_keys:dict, mpc_server_urls:list = [], mpc_stdio_commands:list = []):
"""
Args:
api_keys (dict): The API keys to use as a dictionary
mpc_server_urls (list): The list of dicts containing the url and the name
of the mpc server and the type of connection, and the bearer token if necessary
example:
[
{
'url': 'http://localhost:8000',
'name': 'mcp_server_1',
'type': 'http','SSE'
'headers': {'Authorization': 'Bearer', '1234567890'} #optional or None
}
]
mpc_stdio_commands (list): The list of commands to use with the stdio mpc server
example:
[
{
'name': 'memory',
'command': 'npx', 'docker', 'npm', 'python'
'args': ['-y', '@modelcontextprotocol/server-memory']
}
]
"""
self.api_keys=Api_keys(api_keys=api_keys)
self.mpc_server_urls = mpc_server_urls
self.mpc_stdio_commands = mpc_stdio_commands
# tools
self.llms={'mcp_llm':OpenAIModel('gpt-4.1-mini',provider=OpenAIProvider(api_key=self.api_keys.api_keys['openai_api_key']))}
#mpc servers
self.mpc_servers=[]
for mpc_server_url in self.mpc_server_urls:
if mpc_server_url['type'] == 'http':
if mpc_server_url['headers'] is not None:
self.mpc_servers.append(MCPServerStreamableHTTP(url=mpc_server_url['url'], headers=mpc_server_url['headers']))
else:
self.mpc_servers.append(MCPServerStreamableHTTP(mpc_server_url['url']))
elif mpc_server_url['type'] == 'SSE':
if mpc_server_url['headers'] is not None:
self.mpc_servers.append(MCPServerSSE(url=mpc_server_url['url'], headers=mpc_server_url['headers']))
else:
self.mpc_servers.append(MCPServerSSE(mpc_server_url['url']))
for mpc_stdio_command in self.mpc_stdio_commands:
self.mpc_servers.append(MCPServerStdio(command=mpc_stdio_command['command'], args=mpc_stdio_command['args']))
self._mcp_context_manager = None
self._is_connected = False
#agent
self.agent=Agent(self.llms['mcp_llm'],tools=[], mcp_servers=self.mpc_servers, instructions="you are a helpful assistant that can help with a wide range of tasks,\
you have the current time and the user query, you can use the tools provided to you if necessary to help the user with their queries, ask how you can help the user, sometimes the user will ask you not to use the tools, in this case you should not use the tools")
self.memory=Message_state(messages=[])
async def connect(self):
"""Establish persistent connection to MCP server"""
if not self._is_connected:
self._mcp_context_manager = self.agent.run_mcp_servers()
await self._mcp_context_manager.__aenter__()
self._is_connected = True
return "Connected to MCP server"
async def disconnect(self):
"""Close the MCP server connection"""
if self._is_connected and self._mcp_context_manager:
await self._mcp_context_manager.__aexit__(None, None, None)
self._is_connected = False
self._mcp_context_manager = None
return "Disconnected from MCP server"
async def chat(self, query:any):
"""
# Chat Function Documentation
This function enables interaction with the user through various types of input.
## Parameters
- `query`: The input to process. Can be one of the following types:
- String: Direct text input passed to the agent
- Binary content: Special format for media files (see below)
## Binary Content Types
The function supports different types of media through `BinaryContent` objects:
### Audio
```python
agent.chat([
'optional string message',
BinaryContent(data=audio, media_type='audio/wav')
])
```
### PDF Files
```python
agent.chat([
'optional string message',
BinaryContent(data=pdf_path.read_bytes(), media_type='application/pdf')
])
```
### Images
```python
agent.chat([
'optional string message',
BinaryContent(data=image_response.content, media_type='image/png')
])
```
## Returns
- `Agent_output`: as a pydantic object, the ui_version and voice_version are the two fields of the object
## Extra Notes
The message_history of Agent can be accessed using the following code:
```python
agent.memory.messages
```
"""
if not self._is_connected:
await self.connect()
result=await self.agent.run(query, message_history=self.memory.messages)
self.memory.messages=result.all_messages()
return result.output
def reset(self):
"""
Resets the Agent to its initial state.
Returns:
str: A confirmation message indicating that the agent has been reset.
"""
self.memory.messages=[]
return f'Agent has been reset'
async def __aenter__(self):
"""Async context manager entry"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.disconnect()
|