File size: 16,526 Bytes
9fb6b95
dfe2f68
9fb6b95
dfe2f68
9fb6b95
 
 
dfe2f68
 
 
 
9fb6b95
 
dfe2f68
9fb6b95
dfe2f68
9fb6b95
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
dfe2f68
 
9fb6b95
 
 
 
 
 
 
 
 
 
 
dfe2f68
9fb6b95
 
 
 
 
 
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dfe2f68
9fb6b95
 
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dfe2f68
9fb6b95
dfe2f68
9fb6b95
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
dfe2f68
9fb6b95
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
 
 
dfe2f68
9fb6b95
 
dfe2f68
9fb6b95
 
dfe2f68
9fb6b95
 
 
 
 
 
dfe2f68
9fb6b95
 
 
 
 
 
dfe2f68
9fb6b95
 
dfe2f68
9fb6b95
 
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dfe2f68
9fb6b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d67fa4
 
 
9fb6b95
 
3d67fa4
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
#!/usr/bin/env python3
"""
Modal Implementation for MCO Hackathon

This module defines the Modal app, including the MCOClient for communicating
with the MCO MCP server via WebSockets, and the Claude 3 Opus agent function.
This version is fully asynchronous and uses a clean network-based client.
"""

import os
import json
import asyncio
import websockets
import modal
from typing import Dict, Any, Optional

# --- Modal App Setup ---

stub = modal.App(
    name="mco-autogpt-agent",
    secrets=[modal.Secret.from_dotenv()],
    image=modal.Image.debian_slim(python_version="3.10").pip_install(
        "websockets",
    )
)

# --- MCPClient: Generic client for interacting with MCP Servers via WebSocket ---

class MCPClient:
    """A generic client for interacting with any MCP-compliant server via WebSockets."""

    def __init__(self, server_url: str):
        self.server_url = server_url
        self.websocket = None
        self.client_id = None # Provided by the server upon connection
        self.request_counter = 0
        self.available_tools = {} # Populated from server's welcome message

    async def connect(self):
        """Establishes a WebSocket connection to the MCP server."""
        try:
            self.websocket = await websockets.connect(self.server_url)
            welcome_message = await self.websocket.recv()
            welcome_data = json.loads(welcome_message)
            self.client_id = welcome_data.get("clientId")
            self.available_tools = welcome_data.get("tools", {})
            print(f"Connected to MCP server with client ID: {self.client_id}")
            print(f"Available tools: {json.dumps(self.available_tools, indent=2)}")
        except Exception as e:
            print(f"Failed to connect to WebSocket server at {self.server_url}: {e}")
            raise

    async def call_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """Sends a tool call to the MCP server and awaits the response."""
        if not self.websocket or not self.websocket.open:
            raise ConnectionError("WebSocket connection is not established or has been closed.")

        self.request_counter += 1
        request_id = f"{self.client_id}-{self.request_counter}"

        message = {
            "type": "tool_call",
            "tool": tool_name,
            "params": params,
            "requestId": request_id
        }

        try:
            await self.websocket.send(json.dumps(message))
            response_message = await asyncio.wait_for(self.websocket.recv(), timeout=60.0)
            response_data = json.loads(response_message)
            
            if response_data.get("requestId") != request_id:
                print(f"Warning: Response ID mismatch. Expected {request_id}, got {response_data.get('requestId')}")

            # The 'result' field is standard for successful tool calls in MCP
            return response_data.get("result", response_data) # Return full response if no 'result'
        except asyncio.TimeoutError:
            print(f"Timeout waiting for response for tool call '{tool_name}'.")
            # Consider returning an error structure or raising a specific exception
            return {"error": f"Timeout waiting for response for tool call '{tool_name}'."}
        except Exception as e:
            print(f"An error occurred during tool call '{tool_name}': {e}")
            # Consider returning an error structure or raising a specific exception
            return {"error": f"An error occurred during tool call '{tool_name}': {str(e)}"}

    async def close(self):
        """Closes the WebSocket connection."""
        if self.websocket:
            await self.websocket.close()
            print("WebSocket connection closed.")

# --- Agent's Local Tools ---
def create_file_locally(filename: str, content: str) -> dict:
    """Creates a file in the agent's local (container) filesystem."""
    try:
        # Note: In Modal, files written here are typically ephemeral to the container's current run.
        # For persistent storage across invocations or for sharing, Modal's NetworkFileSystem 
        # or other storage solutions (e.g., an S3 upload tool) would be needed.
        # This basic tool writes to the current working directory of the Modal function execution.
        with open(filename, "w") as f:
            f.write(content)
        return {"status": "success", "message": f"File '{filename}' created successfully in agent's environment."}
    except Exception as e:
        return {"status": "error", "message": f"Failed to create file '{filename}': {str(e)}"}

async def call_external_mcp_tool(server_url: str, target_tool_name: str, target_tool_params: Dict[str, Any]) -> Dict[str, Any]:
    """Connects to an external MCP server and calls a specified tool. Returns the result."""
    print(f"[Agent Tool] Attempting to call tool '{target_tool_name}' on external MCP server: {server_url}")
    
    # Ensure server_url is a WebSocket URL for the MCPClient
    ws_server_url = server_url
    if server_url.startswith("http://"):
        ws_server_url = server_url.replace("http://", "ws://", 1)
    elif server_url.startswith("https://"):
        ws_server_url = server_url.replace("https://", "wss://", 1)

    external_mcp_client = MCPClient(server_url=ws_server_url)
    try:
        await external_mcp_client.connect()
        print(f"[Agent Tool] Connected to external MCP server: {server_url}")
        
        result = await external_mcp_client.call_tool(target_tool_name, target_tool_params)
        print(f"[Agent Tool] Successfully called '{target_tool_name}' on {server_url}. Result: {json.dumps(result)}")
        return result
    except ConnectionError as e:
        err_msg = f"Connection error with external MCP server {server_url}: {str(e)}"
        print(f"[Agent Tool] {err_msg}")
        return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "CONNECTION_ERROR"}
    except Exception as e:
        err_msg = f"Error calling tool '{target_tool_name}' on external MCP server {server_url}: {str(e)}"
        print(f"[Agent Tool] {err_msg}")
        return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "EXECUTION_ERROR"}
    finally:
        if external_mcp_client and external_mcp_client.websocket:
            await external_mcp_client.close()
            print(f"[Agent Tool] Closed connection to external MCP server: {server_url}")

async def read_file_locally(filename: str) -> Dict[str, Any]:
    """Reads a file from the local filesystem and returns its content."""
    print(f"[Agent Tool] Attempting to read file: {filename}")
    try:
        # Security check: Basic path traversal prevention
        base_dir = os.path.abspath(".") # Define a base directory if needed, for now current dir
        requested_path = os.path.abspath(filename)
        if not requested_path.startswith(base_dir):
            err_msg = f"Security error: Path traversal attempt detected for filename '{filename}'."
            print(f"[Agent Tool] {err_msg}")
            return {"error": err_msg, "filename": filename, "status_code": "SECURITY_ERROR"}

        if not os.path.exists(filename):
            err_msg = f"File not found: {filename}"
            print(f"[Agent Tool] {err_msg}")
            return {"error": err_msg, "filename": filename, "status_code": "FILE_NOT_FOUND"}
        
        with open(filename, "r") as f:
            content = f.read()
        print(f"[Agent Tool] Successfully read file: {filename}")
        return {"filename": filename, "content": content, "status_code": "SUCCESS"}
    except Exception as e:
        err_msg = f"Error reading file '{filename}': {str(e)}"
        print(f"[Agent Tool] {err_msg}")
        return {"error": err_msg, "filename": filename, "status_code": "READ_ERROR"}

AGENT_LOCAL_TOOLS = {
    "create_file_locally": create_file_locally,
    "call_external_mcp_tool": call_external_mcp_tool,
    "read_file_locally": read_file_locally,
    # Add other local tools here
}

# --- Agent (Orchestrated by an MCO Server) ---

async def arun_mco_agent(user_prompt: str):
    """Async generator that runs an agent orchestrated by an MCO server.

    This function connects to the MCO server (which is an MCP server),
    initiates an orchestration by calling the 'start_orchestration' tool,
    and then enters a loop to get and process directives. If a directive is
    a 'tool_call' for one of the agent's own local tools, it executes it.
    Otherwise, it yields status updates for the Gradio UI to stream.
    """
    print(f"Starting MCO agent with prompt: '{user_prompt}'")
    yield {"type": "status", "message": "<thinking>Initializing MCP Client and preparing for orchestration...</thinking>"}

    server_url = os.environ.get("MCO_SERVER_URL", "http://localhost:3000").replace("http", "ws")
    mcp_client = MCPClient(server_url=server_url)

    try:
        await mcp_client.connect()
        
        yield {"type": "status", "message": "<thinking>Starting orchestration with MCO server...</thinking>"}
        start_params = {"config": {"user_prompt": user_prompt}}
        start_result = await mcp_client.call_tool("start_orchestration", start_params)
        
        orchestration_id = start_result.get("orchestration_id")
        if not orchestration_id:
            yield {"type": "error", "message": "<thinking>Error: Failed to start orchestration. No orchestration_id received.</thinking>"}
            return

        yield {"type": "status", "message": f"<thinking>Orchestration started with ID: {orchestration_id}. Goal: '{user_prompt}'. MCO server will use configured SNLP.</thinking>"}
        print(f"Orchestration started. ID: {orchestration_id}, Prompt: {user_prompt}. MCO server using configured SNLP.")

        last_tool_result = None
        max_turns = 20 # Max turns to prevent infinite loops

        for turn in range(max_turns):
            get_directive_params = {"orchestration_id": orchestration_id}
            if last_tool_result:
                get_directive_params["last_tool_result"] = last_tool_result
                print(f"Sending last_tool_result to get_next_directive: {json.dumps(last_tool_result, indent=2)}")
                last_tool_result = None 

            directive = await mcp_client.call_tool("get_next_directive", get_directive_params)
            directive_type = directive.get("type")
            
            # Summarize directive for UI to avoid excessive length
            directive_summary = {k: (str(v)[:100] + '...' if isinstance(v, str) and len(str(v)) > 100 else v) 
                                 for k, v in directive.items()}

            yield {"type": "status", "message": f"<thinking>Turn {turn + 1}/{max_turns}: Received directive of type '{directive_type}'. Details: {json.dumps(directive_summary)}</thinking>"}
            print(f"Turn {turn + 1}: Received directive: {json.dumps(directive, indent=2)}")

            if not directive_type:
                yield {"type": "error", "message": "<thinking>Error: Received an empty or invalid directive.</thinking>"}
                break

            if directive_type == "tool_call":
                tool_name = directive.get("tool")
                tool_params = directive.get("params", {})
                tool_id = directive.get("tool_id", f"local_tool_{turn + 1}") # Ensure tool_id is present

                if tool_name in AGENT_LOCAL_TOOLS:
                    tool_function = AGENT_LOCAL_TOOLS[tool_name]
                    tool_params_summary = {k: (str(v)[:70] + '...' if isinstance(v, str) and len(str(v)) > 70 else v) 
                                         for k, v in tool_params.items()}
                    yield {"type": "status", "message": f"<thinking>Preparing to execute local tool: '{tool_name}' with parameters: {json.dumps(tool_params_summary)} as directed by MCO.</thinking>"}
                    print(f"Executing local tool: {tool_name}, Params: {json.dumps(tool_params, indent=2)}")
                    
                    try:
                        tool_result_payload = await tool_function(**tool_params)

                        last_tool_result = {
                            "tool_id": tool_id, 
                            "tool_name": tool_name,
                            "status": "success", 
                            "result": tool_result_payload
                        }
                        # Yielding the raw tool_result_payload. Gradio UI can decide how to format it.
                        yield {"type": "tool_result", "tool_name": tool_name, "result": tool_result_payload, "orchestration_id": orchestration_id, "tool_id": tool_id}
                        print(f"Tool {tool_name} executed. Result: {json.dumps(tool_result_payload, indent=2)}")
                    except Exception as e:
                        error_message = f"Error executing tool {tool_name}: {str(e)}"
                        yield {"type": "error", "message": f"<thinking>Error executing tool {tool_name}: {error_message}</thinking>"}
                        print(error_message)
                        last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": error_message}
                else:
                    unknown_tool_msg = f"Unknown tool requested by MCO: {tool_name}"
                    yield {"type": "error", "message": f"<thinking>Error: {unknown_tool_msg}</thinking>"}
                    print(unknown_tool_msg)
                    last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": "Unknown tool"}
            
            elif directive_type == "assistant_message":
                content = directive.get("content", "")
                # Ensure MCO messages are consistently wrapped for UI display as agent thoughts/observations.
                processed_content = f"<thinking>MCO Message: {content}</thinking>"
                yield {"type": "status", "message": processed_content}
                print(f"MCO Message: {content}")
                last_tool_result = None 

            elif directive_type == "final_answer":
                final_answer = directive.get("answer", "Orchestration complete.")
                yield {"type": "final_result", "result": f"<thinking>Final Answer from MCO: {final_answer}</thinking>", "orchestration_id": orchestration_id}
                print(f"Orchestration finished. Final answer: {final_answer}")
                break 
        
            elif directive_type == "orchestration_error":
                error_details = directive.get("details", "Unknown orchestration error.")
                yield {"type": "error", "message": f"<thinking>Orchestration Error from MCO: {error_details}</thinking>"}
                print(f"Orchestration Error from MCO: {error_details}")
                break

            else:
                unknown_directive_msg = f"Unknown directive type received: '{directive_type}'"
                yield {"type": "error", "message": f"<thinking>Error: {unknown_directive_msg}. Directive: {json.dumps(directive_summary)}</thinking>"}
                print(f"{unknown_directive_msg}. Directive: {json.dumps(directive, indent=2)}")
                break 

        if turn == max_turns - 1 and directive_type != "final_answer": # Avoid double message if final_answer was on last turn
            yield {"type": "status", "message": "<thinking>Max turns reached. Ending orchestration.</thinking>"}
            print("Max turns reached.")

    except ConnectionError as e:
        yield {"type": "error", "message": f"<thinking>Connection error: {str(e)}</thinking>"}
    except Exception as e:
        yield {"type": "error", "message": f"<thinking>An unexpected error occurred in the agent: {str(e)}</thinking>"}
    finally:
        if mcp_client and mcp_client.websocket:
            await mcp_client.close()
            yield {"type": "status", "message": "<thinking>MCP client shut down.</thinking>"}

@stub.function(
    secrets=[modal.Secret.from_dotenv()],
    timeout=600,
    scaledown_window=120,
)
def run_mco_agent(user_prompt: str):
    """Synchronous wrapper for the async generator, exposed as a Modal Function.
    This allows Gradio to call the agent and stream results.
    """
    import asyncio
    g = arun_mco_agent(user_prompt)
    try:
        while True:
            yield asyncio.run(g.__anext__())
    except StopAsyncIteration:
        pass