File size: 8,587 Bytes
ddd81fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import asyncio
import os
import json
from camel.toolkits.mcp_toolkit import MCPClient
from camel.toolkits import HumanToolkit, MCPToolkit 
from camel.models import ModelFactory
from camel.types import ModelPlatformType, ModelType
from camel.agents import ChatAgent
import urllib.parse
import base64
from mcp import ClientSession
from mcp.types import BlobResourceContents, ResourceContents, TextResourceContents
from typing import Union, Optional, List

async def get_tools_description(tools):
    descriptions = []
    for tool in tools:
        tool_name = getattr(tool.func, '__name__', 'unknown_tool')
        schema = tool.get_openai_function_schema() or {}
        arg_names = list(schema.get('parameters', {}).get('properties', {}).keys()) if schema else []
        description = tool.get_function_description() or 'No description'
        schema_str = json.dumps(schema, default=str).replace('{', '{{').replace('}', '}}')
        descriptions.append(
            f"Tool: {tool_name}, Args: {arg_names}, Description: {description}, Schema: {schema_str}"
        )
    return "\n".join(descriptions)

class SimpleBlob:
    """A simple class to hold resource data, MIME type, and metadata."""
    def __init__(self, data: Union[str, bytes], mime_type: Optional[str], metadata: dict):
        self.data = data
        self.mime_type = mime_type
        self.metadata = metadata

    @classmethod
    def from_data(cls, data: Union[str, bytes], mime_type: Optional[str] = None, metadata: Optional[dict] = None):
        """Create a SimpleBlob from data."""
        return cls(data=data, mime_type=mime_type, metadata=metadata or {})
    
def convert_mcp_resource_to_blob(
    resource_uri: str,
    contents: ResourceContents,
) -> SimpleBlob:
    if isinstance(contents, TextResourceContents):
        data = contents.text
    elif isinstance(contents, BlobResourceContents):
        data = base64.b64decode(contents.blob)
    else:
        raise ValueError(f"Unsupported content type for URI {resource_uri}")
    return SimpleBlob.from_data(
        data=data,
        mime_type=contents.mimeType,
        metadata={"uri": resource_uri},
    )
    
async def get_mcp_resource(session: ClientSession, uri: str) -> List[SimpleBlob]:
    contents_result = await session.read_resource(uri)
    if not contents_result.contents or len(contents_result.contents) == 0:
        return []
    return [
        convert_mcp_resource_to_blob(uri, content) for content in contents_result.contents
    ]

async def load_mcp_resources(
    session: ClientSession,
    uris: Union[str, List[str], None] = None,
) -> List[SimpleBlob]:
    blobs = []
    if uris is None:
        resources_list = await session.list_resources()
        uri_list = [r.uri for r in resources_list.resources]
    elif isinstance(uris, str):
        uri_list = [uris]
    else:
        uri_list = uris
    for uri in uri_list:
        try:
            resource_blobs = await get_mcp_resource(session, uri)
            blobs.extend(resource_blobs)
        except Exception as e:
            print(f"Error fetching resource {uri}: {e}")
            continue
    return blobs

async def get_resources(
    client: MCPClient,
    uris: Union[str, List[str], None] = None
) -> List[SimpleBlob]:
    """Get resources from the MCP server.

    Args:
        client: MCPClient instance
        uris: Optional resource URI or list of URIs to load. If None, fetches all resources.

    Returns:
        A list of SimpleBlob objects
    """
    if client.session is None:
        raise RuntimeError("MCPClient is not connected or session is not initialized.")
    try:
        return await load_mcp_resources(client.session, uris)
    except Exception as e:
        raise RuntimeError(f"Error fetching resources: {e}")

async def main():
    base_url_1 = "http://localhost:5555/devmode/exampleApplication/privkey/session1/sse"
    params_1 = {
        "waitForAgents": 1,
        "agentId": "user_interface_agent",
        "agentDescription": "You are user_interaction_agent, responsible for engaging with users, processing instructions, and coordinating with other agents"
    }
    query_string = urllib.parse.urlencode(params_1)
    MCP_SERVER_URL_1 = f"{base_url_1}?{query_string}"
    
    coral_server = MCPClient(
        command_or_url=MCP_SERVER_URL_1,
        timeout=300.0
    )
    await coral_server.__aenter__()
    print(f"Connected to MCP server as user_interface_agent at {MCP_SERVER_URL_1}")

    model = ModelFactory.create(
        model_platform=ModelPlatformType.OPENAI,
        model_type=ModelType.GPT_4O_MINI,
        api_key=os.getenv("OPENAI_API_KEY"),
        model_config_dict={"temperature": 0.3, "max_tokens": 16000},
    )

    while True:
        try:
            resources = await get_resources(coral_server, uris=None)
            if not resources:
                agent_resources = "NA"
                print("No resources found.")
            else:
                agent_resources = "\n".join(str(blob.data) for blob in resources)
                print("Resources fetched:")
                # for blob in resources:
                #     print(blob.data)
        except Exception as e:
            print(f"Error retrieving resources: {e}")
            agent_resources = "NA"

        resource_sys_message = agent_resources

        mcp_toolkit = MCPToolkit([coral_server])
        tools = mcp_toolkit.get_tools() + HumanToolkit().get_tools()
        tools_description = await get_tools_description(tools)

        sys_msg = (
            f"""You are an agent interacting with the tools from Coral Server and having your own Human Tool to ask have a conversation with Human.
            Your resources, provided in `resource_sys_message`, contain thread-based conversations between agents in XML format. 
            Each thread includes details such as thread ID, participant agent IDs, message content, and timestamps. 
            Use these resources to understand past agent interactions and inform your decisions when coordinating with other agents or responding to user queries.

            Follow these steps in order:
            1. Use `list_agents` to list all connected agents and get their descriptions.
            2. Use `ask_human_via_console` to ask, "How can I assist you today?" and capture expect response.
            3. Take 2 seconds to think and understand the user's intent and decide the right agent to handle the request based on list of agents. 
            4. If the user wants any information about the coral server, use the tools to get the information and pass it to the user. Do not send any message to any other agent, just give the information and go to Step 1.
            5. Once you have the right agent, use `create_thread` to create a thread with the selected agent. If no agent is available, use the `ask_human` tool to specify the agent you want to use.
            6. Use your logic to determine the task you want that agent to perform and create a message for them which instructs the agent to perform the task called "instruction". 
            7. Use `send_message` to send a message in the thread, mentioning the selected agent, with content: "instructions".
            8. Use `wait_for_mentions` with a 30 seconds timeout to wait for a response from the agent you mentioned.
            9. Show the entire conversation in the thread to the user.
            10. Wait for 3 seconds and then use `ask_human` to ask the user if they need anything else and keep waiting for their response.
            11. If the user asks for something else, repeat the process from step 1.

            Use only listed tools: {tools_description}
            Your resources are: {resource_sys_message}"""
        )

        camel_agent = ChatAgent(
            system_message=sys_msg,
            model=model,
            tools=tools,
        )
        print("ChatAgent initialized with updated resources!")
        print("Resource System Message before agent question:")
        print(resource_sys_message)

        prompt = "As the user_interaction_agent on the Coral Server, initiate your workflow by listing all connected agents and asking the user how you can assist them."
        try:
            response = await camel_agent.astep(prompt)
            print("Agent Reply:")
            print(response.msgs[0].content)
        except Exception as e:
            print(f"Error processing agent response: {e}")

        await asyncio.sleep(3)

        continue

    await coral_server.__aexit__(None, None, None)

if __name__ == "__main__":
    asyncio.run(main())