Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Modal Implementation for MCO Hackathon | |
| This module defines the Modal app, including the MCOClient for communicating | |
| with the MCO MCP server via WebSockets, and the Claude 3 Opus agent function. | |
| This version is fully asynchronous and uses a clean network-based client. | |
| """ | |
| import os | |
| import json | |
| import asyncio | |
| import websockets | |
| import modal | |
| from typing import Dict, Any, Optional | |
| # --- Modal App Setup --- | |
| stub = modal.App( | |
| name="mco-autogpt-agent", | |
| secrets=[modal.Secret.from_dotenv()], | |
| image=modal.Image.debian_slim(python_version="3.10").pip_install( | |
| "websockets", | |
| ) | |
| ) | |
| # --- MCPClient: Generic client for interacting with MCP Servers via WebSocket --- | |
| class MCPClient: | |
| """A generic client for interacting with any MCP-compliant server via WebSockets.""" | |
| def __init__(self, server_url: str): | |
| self.server_url = server_url | |
| self.websocket = None | |
| self.client_id = None # Provided by the server upon connection | |
| self.request_counter = 0 | |
| self.available_tools = {} # Populated from server's welcome message | |
| async def connect(self): | |
| """Establishes a WebSocket connection to the MCP server.""" | |
| try: | |
| self.websocket = await websockets.connect(self.server_url) | |
| welcome_message = await self.websocket.recv() | |
| welcome_data = json.loads(welcome_message) | |
| self.client_id = welcome_data.get("clientId") | |
| self.available_tools = welcome_data.get("tools", {}) | |
| print(f"Connected to MCP server with client ID: {self.client_id}") | |
| print(f"Available tools: {json.dumps(self.available_tools, indent=2)}") | |
| except Exception as e: | |
| print(f"Failed to connect to WebSocket server at {self.server_url}: {e}") | |
| raise | |
| async def call_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Sends a tool call to the MCP server and awaits the response.""" | |
| if not self.websocket or not self.websocket.open: | |
| raise ConnectionError("WebSocket connection is not established or has been closed.") | |
| self.request_counter += 1 | |
| request_id = f"{self.client_id}-{self.request_counter}" | |
| message = { | |
| "type": "tool_call", | |
| "tool": tool_name, | |
| "params": params, | |
| "requestId": request_id | |
| } | |
| try: | |
| await self.websocket.send(json.dumps(message)) | |
| response_message = await asyncio.wait_for(self.websocket.recv(), timeout=60.0) | |
| response_data = json.loads(response_message) | |
| if response_data.get("requestId") != request_id: | |
| print(f"Warning: Response ID mismatch. Expected {request_id}, got {response_data.get('requestId')}") | |
| # The 'result' field is standard for successful tool calls in MCP | |
| return response_data.get("result", response_data) # Return full response if no 'result' | |
| except asyncio.TimeoutError: | |
| print(f"Timeout waiting for response for tool call '{tool_name}'.") | |
| # Consider returning an error structure or raising a specific exception | |
| return {"error": f"Timeout waiting for response for tool call '{tool_name}'."} | |
| except Exception as e: | |
| print(f"An error occurred during tool call '{tool_name}': {e}") | |
| # Consider returning an error structure or raising a specific exception | |
| return {"error": f"An error occurred during tool call '{tool_name}': {str(e)}"} | |
| async def close(self): | |
| """Closes the WebSocket connection.""" | |
| if self.websocket: | |
| await self.websocket.close() | |
| print("WebSocket connection closed.") | |
| # --- Agent's Local Tools --- | |
| def create_file_locally(filename: str, content: str) -> dict: | |
| """Creates a file in the agent's local (container) filesystem.""" | |
| try: | |
| # Note: In Modal, files written here are typically ephemeral to the container's current run. | |
| # For persistent storage across invocations or for sharing, Modal's NetworkFileSystem | |
| # or other storage solutions (e.g., an S3 upload tool) would be needed. | |
| # This basic tool writes to the current working directory of the Modal function execution. | |
| with open(filename, "w") as f: | |
| f.write(content) | |
| return {"status": "success", "message": f"File '{filename}' created successfully in agent's environment."} | |
| except Exception as e: | |
| return {"status": "error", "message": f"Failed to create file '{filename}': {str(e)}"} | |
| async def call_external_mcp_tool(server_url: str, target_tool_name: str, target_tool_params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Connects to an external MCP server and calls a specified tool. Returns the result.""" | |
| print(f"[Agent Tool] Attempting to call tool '{target_tool_name}' on external MCP server: {server_url}") | |
| # Ensure server_url is a WebSocket URL for the MCPClient | |
| ws_server_url = server_url | |
| if server_url.startswith("http://"): | |
| ws_server_url = server_url.replace("http://", "ws://", 1) | |
| elif server_url.startswith("https://"): | |
| ws_server_url = server_url.replace("https://", "wss://", 1) | |
| external_mcp_client = MCPClient(server_url=ws_server_url) | |
| try: | |
| await external_mcp_client.connect() | |
| print(f"[Agent Tool] Connected to external MCP server: {server_url}") | |
| result = await external_mcp_client.call_tool(target_tool_name, target_tool_params) | |
| print(f"[Agent Tool] Successfully called '{target_tool_name}' on {server_url}. Result: {json.dumps(result)}") | |
| return result | |
| except ConnectionError as e: | |
| err_msg = f"Connection error with external MCP server {server_url}: {str(e)}" | |
| print(f"[Agent Tool] {err_msg}") | |
| return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "CONNECTION_ERROR"} | |
| except Exception as e: | |
| err_msg = f"Error calling tool '{target_tool_name}' on external MCP server {server_url}: {str(e)}" | |
| print(f"[Agent Tool] {err_msg}") | |
| return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "EXECUTION_ERROR"} | |
| finally: | |
| if external_mcp_client and external_mcp_client.websocket: | |
| await external_mcp_client.close() | |
| print(f"[Agent Tool] Closed connection to external MCP server: {server_url}") | |
| async def read_file_locally(filename: str) -> Dict[str, Any]: | |
| """Reads a file from the local filesystem and returns its content.""" | |
| print(f"[Agent Tool] Attempting to read file: {filename}") | |
| try: | |
| # Security check: Basic path traversal prevention | |
| base_dir = os.path.abspath(".") # Define a base directory if needed, for now current dir | |
| requested_path = os.path.abspath(filename) | |
| if not requested_path.startswith(base_dir): | |
| err_msg = f"Security error: Path traversal attempt detected for filename '{filename}'." | |
| print(f"[Agent Tool] {err_msg}") | |
| return {"error": err_msg, "filename": filename, "status_code": "SECURITY_ERROR"} | |
| if not os.path.exists(filename): | |
| err_msg = f"File not found: {filename}" | |
| print(f"[Agent Tool] {err_msg}") | |
| return {"error": err_msg, "filename": filename, "status_code": "FILE_NOT_FOUND"} | |
| with open(filename, "r") as f: | |
| content = f.read() | |
| print(f"[Agent Tool] Successfully read file: {filename}") | |
| return {"filename": filename, "content": content, "status_code": "SUCCESS"} | |
| except Exception as e: | |
| err_msg = f"Error reading file '{filename}': {str(e)}" | |
| print(f"[Agent Tool] {err_msg}") | |
| return {"error": err_msg, "filename": filename, "status_code": "READ_ERROR"} | |
| AGENT_LOCAL_TOOLS = { | |
| "create_file_locally": create_file_locally, | |
| "call_external_mcp_tool": call_external_mcp_tool, | |
| "read_file_locally": read_file_locally, | |
| # Add other local tools here | |
| } | |
| # --- Agent (Orchestrated by an MCO Server) --- | |
| async def arun_mco_agent(user_prompt: str): | |
| """Async generator that runs an agent orchestrated by an MCO server. | |
| This function connects to the MCO server (which is an MCP server), | |
| initiates an orchestration by calling the 'start_orchestration' tool, | |
| and then enters a loop to get and process directives. If a directive is | |
| a 'tool_call' for one of the agent's own local tools, it executes it. | |
| Otherwise, it yields status updates for the Gradio UI to stream. | |
| """ | |
| print(f"Starting MCO agent with prompt: '{user_prompt}'") | |
| yield {"type": "status", "message": "<thinking>Initializing MCP Client and preparing for orchestration...</thinking>"} | |
| server_url = os.environ.get("MCO_SERVER_URL", "http://localhost:3000").replace("http", "ws") | |
| mcp_client = MCPClient(server_url=server_url) | |
| try: | |
| await mcp_client.connect() | |
| yield {"type": "status", "message": "<thinking>Starting orchestration with MCO server...</thinking>"} | |
| start_params = {"config": {"user_prompt": user_prompt}} | |
| start_result = await mcp_client.call_tool("start_orchestration", start_params) | |
| orchestration_id = start_result.get("orchestration_id") | |
| if not orchestration_id: | |
| yield {"type": "error", "message": "<thinking>Error: Failed to start orchestration. No orchestration_id received.</thinking>"} | |
| return | |
| yield {"type": "status", "message": f"<thinking>Orchestration started with ID: {orchestration_id}. Goal: '{user_prompt}'. MCO server will use configured SNLP.</thinking>"} | |
| print(f"Orchestration started. ID: {orchestration_id}, Prompt: {user_prompt}. MCO server using configured SNLP.") | |
| last_tool_result = None | |
| max_turns = 20 # Max turns to prevent infinite loops | |
| for turn in range(max_turns): | |
| get_directive_params = {"orchestration_id": orchestration_id} | |
| if last_tool_result: | |
| get_directive_params["last_tool_result"] = last_tool_result | |
| print(f"Sending last_tool_result to get_next_directive: {json.dumps(last_tool_result, indent=2)}") | |
| last_tool_result = None | |
| directive = await mcp_client.call_tool("get_next_directive", get_directive_params) | |
| directive_type = directive.get("type") | |
| # Summarize directive for UI to avoid excessive length | |
| directive_summary = {k: (str(v)[:100] + '...' if isinstance(v, str) and len(str(v)) > 100 else v) | |
| for k, v in directive.items()} | |
| yield {"type": "status", "message": f"<thinking>Turn {turn + 1}/{max_turns}: Received directive of type '{directive_type}'. Details: {json.dumps(directive_summary)}</thinking>"} | |
| print(f"Turn {turn + 1}: Received directive: {json.dumps(directive, indent=2)}") | |
| if not directive_type: | |
| yield {"type": "error", "message": "<thinking>Error: Received an empty or invalid directive.</thinking>"} | |
| break | |
| if directive_type == "tool_call": | |
| tool_name = directive.get("tool") | |
| tool_params = directive.get("params", {}) | |
| tool_id = directive.get("tool_id", f"local_tool_{turn + 1}") # Ensure tool_id is present | |
| if tool_name in AGENT_LOCAL_TOOLS: | |
| tool_function = AGENT_LOCAL_TOOLS[tool_name] | |
| tool_params_summary = {k: (str(v)[:70] + '...' if isinstance(v, str) and len(str(v)) > 70 else v) | |
| for k, v in tool_params.items()} | |
| yield {"type": "status", "message": f"<thinking>Preparing to execute local tool: '{tool_name}' with parameters: {json.dumps(tool_params_summary)} as directed by MCO.</thinking>"} | |
| print(f"Executing local tool: {tool_name}, Params: {json.dumps(tool_params, indent=2)}") | |
| try: | |
| tool_result_payload = await tool_function(**tool_params) | |
| last_tool_result = { | |
| "tool_id": tool_id, | |
| "tool_name": tool_name, | |
| "status": "success", | |
| "result": tool_result_payload | |
| } | |
| # Yielding the raw tool_result_payload. Gradio UI can decide how to format it. | |
| yield {"type": "tool_result", "tool_name": tool_name, "result": tool_result_payload, "orchestration_id": orchestration_id, "tool_id": tool_id} | |
| print(f"Tool {tool_name} executed. Result: {json.dumps(tool_result_payload, indent=2)}") | |
| except Exception as e: | |
| error_message = f"Error executing tool {tool_name}: {str(e)}" | |
| yield {"type": "error", "message": f"<thinking>Error executing tool {tool_name}: {error_message}</thinking>"} | |
| print(error_message) | |
| last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": error_message} | |
| else: | |
| unknown_tool_msg = f"Unknown tool requested by MCO: {tool_name}" | |
| yield {"type": "error", "message": f"<thinking>Error: {unknown_tool_msg}</thinking>"} | |
| print(unknown_tool_msg) | |
| last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": "Unknown tool"} | |
| elif directive_type == "assistant_message": | |
| content = directive.get("content", "") | |
| # Ensure MCO messages are consistently wrapped for UI display as agent thoughts/observations. | |
| processed_content = f"<thinking>MCO Message: {content}</thinking>" | |
| yield {"type": "status", "message": processed_content} | |
| print(f"MCO Message: {content}") | |
| last_tool_result = None | |
| elif directive_type == "final_answer": | |
| final_answer = directive.get("answer", "Orchestration complete.") | |
| yield {"type": "final_result", "result": f"<thinking>Final Answer from MCO: {final_answer}</thinking>", "orchestration_id": orchestration_id} | |
| print(f"Orchestration finished. Final answer: {final_answer}") | |
| break | |
| elif directive_type == "orchestration_error": | |
| error_details = directive.get("details", "Unknown orchestration error.") | |
| yield {"type": "error", "message": f"<thinking>Orchestration Error from MCO: {error_details}</thinking>"} | |
| print(f"Orchestration Error from MCO: {error_details}") | |
| break | |
| else: | |
| unknown_directive_msg = f"Unknown directive type received: '{directive_type}'" | |
| yield {"type": "error", "message": f"<thinking>Error: {unknown_directive_msg}. Directive: {json.dumps(directive_summary)}</thinking>"} | |
| print(f"{unknown_directive_msg}. Directive: {json.dumps(directive, indent=2)}") | |
| break | |
| if turn == max_turns - 1 and directive_type != "final_answer": # Avoid double message if final_answer was on last turn | |
| yield {"type": "status", "message": "<thinking>Max turns reached. Ending orchestration.</thinking>"} | |
| print("Max turns reached.") | |
| except ConnectionError as e: | |
| yield {"type": "error", "message": f"<thinking>Connection error: {str(e)}</thinking>"} | |
| except Exception as e: | |
| yield {"type": "error", "message": f"<thinking>An unexpected error occurred in the agent: {str(e)}</thinking>"} | |
| finally: | |
| if mcp_client and mcp_client.websocket: | |
| await mcp_client.close() | |
| yield {"type": "status", "message": "<thinking>MCP client shut down.</thinking>"} | |
| def run_mco_agent(user_prompt: str): | |
| """Synchronous wrapper for the async generator, exposed as a Modal Function. | |
| This allows Gradio to call the agent and stream results. | |
| """ | |
| import asyncio | |
| g = arun_mco_agent(user_prompt) | |
| try: | |
| while True: | |
| yield asyncio.run(g.__anext__()) | |
| except StopAsyncIteration: | |
| pass | |