|
|
from typing import Dict, List, Optional |
|
|
|
|
|
from pydantic import Field, model_validator |
|
|
|
|
|
from app.agent.browser import BrowserContextHelper |
|
|
from app.agent.toolcall import ToolCallAgent |
|
|
from app.config import config |
|
|
from app.logger import logger |
|
|
from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT |
|
|
from app.tool import Terminate, ToolCollection |
|
|
from app.tool.ask_human import AskHuman |
|
|
from app.tool.browser_use_tool import BrowserUseTool |
|
|
from app.tool.mcp import MCPClients, MCPClientTool |
|
|
from app.tool.python_execute import PythonExecute |
|
|
from app.tool.str_replace_editor import StrReplaceEditor |
|
|
|
|
|
|
|
|
class Manus(ToolCallAgent): |
|
|
"""A versatile general-purpose agent with support for both local and MCP tools.""" |
|
|
|
|
|
name: str = "Manus" |
|
|
description: str = "A versatile agent that can solve various tasks using multiple tools including MCP-based tools" |
|
|
|
|
|
system_prompt: str = SYSTEM_PROMPT.format(directory=config.workspace_root) |
|
|
next_step_prompt: str = NEXT_STEP_PROMPT |
|
|
|
|
|
max_observe: int = 10000 |
|
|
max_steps: int = 20 |
|
|
|
|
|
|
|
|
mcp_clients: MCPClients = Field(default_factory=MCPClients) |
|
|
|
|
|
|
|
|
available_tools: ToolCollection = Field( |
|
|
default_factory=lambda: ToolCollection( |
|
|
PythonExecute(), |
|
|
BrowserUseTool(), |
|
|
StrReplaceEditor(), |
|
|
AskHuman(), |
|
|
Terminate(), |
|
|
) |
|
|
) |
|
|
|
|
|
special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name]) |
|
|
browser_context_helper: Optional[BrowserContextHelper] = None |
|
|
|
|
|
|
|
|
connected_servers: Dict[str, str] = Field( |
|
|
default_factory=dict |
|
|
) |
|
|
_initialized: bool = False |
|
|
|
|
|
@model_validator(mode="after") |
|
|
def initialize_helper(self) -> "Manus": |
|
|
"""Initialize basic components synchronously.""" |
|
|
self.browser_context_helper = BrowserContextHelper(self) |
|
|
return self |
|
|
|
|
|
@classmethod |
|
|
async def create(cls, **kwargs) -> "Manus": |
|
|
"""Factory method to create and properly initialize a Manus instance.""" |
|
|
instance = cls(**kwargs) |
|
|
await instance.initialize_mcp_servers() |
|
|
instance._initialized = True |
|
|
return instance |
|
|
|
|
|
async def initialize_mcp_servers(self) -> None: |
|
|
"""Initialize connections to configured MCP servers.""" |
|
|
for server_id, server_config in config.mcp_config.servers.items(): |
|
|
try: |
|
|
if server_config.type == "sse": |
|
|
if server_config.url: |
|
|
await self.connect_mcp_server(server_config.url, server_id) |
|
|
logger.info( |
|
|
f"Connected to MCP server {server_id} at {server_config.url}" |
|
|
) |
|
|
elif server_config.type == "stdio": |
|
|
if server_config.command: |
|
|
await self.connect_mcp_server( |
|
|
server_config.command, |
|
|
server_id, |
|
|
use_stdio=True, |
|
|
stdio_args=server_config.args, |
|
|
) |
|
|
logger.info( |
|
|
f"Connected to MCP server {server_id} using command {server_config.command}" |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to connect to MCP server {server_id}: {e}") |
|
|
|
|
|
async def connect_mcp_server( |
|
|
self, |
|
|
server_url: str, |
|
|
server_id: str = "", |
|
|
use_stdio: bool = False, |
|
|
stdio_args: List[str] = None, |
|
|
) -> None: |
|
|
"""Connect to an MCP server and add its tools.""" |
|
|
if use_stdio: |
|
|
await self.mcp_clients.connect_stdio( |
|
|
server_url, stdio_args or [], server_id |
|
|
) |
|
|
self.connected_servers[server_id or server_url] = server_url |
|
|
else: |
|
|
await self.mcp_clients.connect_sse(server_url, server_id) |
|
|
self.connected_servers[server_id or server_url] = server_url |
|
|
|
|
|
|
|
|
new_tools = [ |
|
|
tool for tool in self.mcp_clients.tools if tool.server_id == server_id |
|
|
] |
|
|
self.available_tools.add_tools(*new_tools) |
|
|
|
|
|
async def disconnect_mcp_server(self, server_id: str = "") -> None: |
|
|
"""Disconnect from an MCP server and remove its tools.""" |
|
|
await self.mcp_clients.disconnect(server_id) |
|
|
if server_id: |
|
|
self.connected_servers.pop(server_id, None) |
|
|
else: |
|
|
self.connected_servers.clear() |
|
|
|
|
|
|
|
|
base_tools = [ |
|
|
tool |
|
|
for tool in self.available_tools.tools |
|
|
if not isinstance(tool, MCPClientTool) |
|
|
] |
|
|
self.available_tools = ToolCollection(*base_tools) |
|
|
self.available_tools.add_tools(*self.mcp_clients.tools) |
|
|
|
|
|
async def cleanup(self): |
|
|
"""Clean up Manus agent resources.""" |
|
|
if self.browser_context_helper: |
|
|
await self.browser_context_helper.cleanup_browser() |
|
|
|
|
|
if self._initialized: |
|
|
await self.disconnect_mcp_server() |
|
|
self._initialized = False |
|
|
|
|
|
async def think(self) -> bool: |
|
|
"""Process current state and decide next actions with appropriate context.""" |
|
|
if not self._initialized: |
|
|
await self.initialize_mcp_servers() |
|
|
self._initialized = True |
|
|
|
|
|
original_prompt = self.next_step_prompt |
|
|
recent_messages = self.memory.messages[-3:] if self.memory.messages else [] |
|
|
browser_in_use = any( |
|
|
tc.function.name == BrowserUseTool().name |
|
|
for msg in recent_messages |
|
|
if msg.tool_calls |
|
|
for tc in msg.tool_calls |
|
|
) |
|
|
|
|
|
if browser_in_use: |
|
|
self.next_step_prompt = ( |
|
|
await self.browser_context_helper.format_next_step_prompt() |
|
|
) |
|
|
|
|
|
result = await super().think() |
|
|
|
|
|
|
|
|
self.next_step_prompt = original_prompt |
|
|
|
|
|
return result |
|
|
|