"""MCP protocol handler - maps MCP tool calls to actual operations.""" from __future__ import annotations import logging from typing import Any from app.environment import configure_environment, get_environment_config from app.executor import get_executor from app.file_manager import FileManager from app.models import MCPToolResult from app.package_manager import get_package_manager logger = logging.getLogger(__name__) # Tool definitions for MCP list_tools TOOL_DEFINITIONS = [ { "name": "execute_code", "description": "Executes Python code in the configured environment. Best for short code snippets.", "inputSchema": { "type": "object", "properties": { "code": { "type": "string", "description": "Python code to execute", }, "filename": { "type": "string", "description": "Optional filename (without extension)", }, }, "required": ["code"], }, }, { "name": "install_dependencies", "description": "Installs Python packages in the environment.", "inputSchema": { "type": "object", "properties": { "packages": { "type": "array", "items": {"type": "string"}, "description": "List of package names to install", }, }, "required": ["packages"], }, }, { "name": "check_installed_packages", "description": "Checks if packages are already installed in the environment.", "inputSchema": { "type": "object", "properties": { "packages": { "type": "array", "items": {"type": "string"}, "description": "List of package names to check", }, }, "required": ["packages"], }, }, { "name": "configure_environment", "description": "Dynamically changes the environment configuration.", "inputSchema": { "type": "object", "properties": { "type": { "type": "string", "enum": ["conda", "venv", "venv-uv"], "description": "Environment type", }, "conda_name": { "type": "string", "description": "Conda environment name (required for conda type)", }, "venv_path": { "type": "string", "description": "Virtualenv path (required for venv type)", }, "uv_venv_path": { "type": "string", "description": "UV virtualenv path (required for venv-uv type)", }, }, "required": ["type"], }, }, { "name": "get_environment_config", "description": "Gets the current environment configuration.", "inputSchema": { "type": "object", "properties": {}, }, }, { "name": "initialize_code_file", "description": "Creates a new Python file with initial content. Use this as the first step for longer code that may exceed token limits.", "inputSchema": { "type": "object", "properties": { "content": { "type": "string", "description": "Initial file content", }, "filename": { "type": "string", "description": "Optional filename (without extension)", }, }, "required": ["content"], }, }, { "name": "append_to_code_file", "description": "Appends content to an existing Python code file.", "inputSchema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the existing code file", }, "content": { "type": "string", "description": "Content to append", }, }, "required": ["file_path", "content"], }, }, { "name": "execute_code_file", "description": "Executes an existing Python file.", "inputSchema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the code file to execute", }, }, "required": ["file_path"], }, }, { "name": "read_code_file", "description": "Reads the content of an existing Python code file.", "inputSchema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the code file to read", }, }, "required": ["file_path"], }, }, ] async def handle_tool_call(name: str, arguments: dict[str, Any]) -> MCPToolResult: """Route an MCP tool call to the appropriate handler.""" try: if name == "execute_code": return await _handle_execute_code(arguments) elif name == "install_dependencies": return await _handle_install_dependencies(arguments) elif name == "check_installed_packages": return await _handle_check_packages(arguments) elif name == "configure_environment": return _handle_configure_environment(arguments) elif name == "get_environment_config": return _handle_get_environment_config() elif name == "initialize_code_file": return _handle_initialize_code_file(arguments) elif name == "append_to_code_file": return _handle_append_to_code_file(arguments) elif name == "execute_code_file": return await _handle_execute_code_file(arguments) elif name == "read_code_file": return _handle_read_code_file(arguments) else: return MCPToolResult( content=[{"type": "text", "text": f"Unknown tool: {name}"}], isError=True, ) except Exception as e: logger.exception("Error handling tool call: %s", name) return MCPToolResult( content=[{"type": "text", "text": f"Error: {str(e)}"}], isError=True, ) async def _handle_execute_code(args: dict) -> MCPToolResult: executor = get_executor() result = await executor.execute_code(args["code"], args.get("filename")) output_parts = [] if result.stdout: output_parts.append(f"STDOUT:\n{result.stdout}") if result.stderr: output_parts.append(f"STDERR:\n{result.stderr}") output_parts.append(f"Return code: {result.return_code}") output_parts.append(f"Execution time: {result.execution_time:.2f}s") output_parts.append(f"File: {result.file_path}") return MCPToolResult( content=[{"type": "text", "text": "\n".join(output_parts)}], isError=not result.success, ) async def _handle_install_dependencies(args: dict) -> MCPToolResult: pm = get_package_manager() result = await pm.install_packages(args["packages"]) text = f"Installation {'succeeded' if result['success'] else 'failed'}\n" if result.get("stdout"): text += f"Output:\n{result['stdout']}\n" if result.get("stderr"): text += f"Errors:\n{result['stderr']}\n" return MCPToolResult( content=[{"type": "text", "text": text}], isError=not result["success"], ) async def _handle_check_packages(args: dict) -> MCPToolResult: pm = get_package_manager() results = await pm.check_packages(args["packages"]) lines = [] for r in results: status = "✓ installed" if r.installed else "✗ not installed" version = f" (v{r.version})" if r.version and r.version != "unknown" else "" lines.append(f" {r.package}: {status}{version}") return MCPToolResult( content=[{"type": "text", "text": "Package status:\n" + "\n".join(lines)}], isError=False, ) def _handle_configure_environment(args: dict) -> MCPToolResult: try: config = configure_environment( env_type=args["type"], conda_name=args.get("conda_name"), venv_path=args.get("venv_path"), uv_venv_path=args.get("uv_venv_path"), ) return MCPToolResult( content=[ { "type": "text", "text": f"Environment configured:\n" f" Type: {config.env_type}\n" f" Python: {config.python_executable}\n" f" Storage: {config.code_storage_dir}", } ], isError=False, ) except ValueError as e: return MCPToolResult( content=[{"type": "text", "text": f"Configuration error: {str(e)}"}], isError=True, ) def _handle_get_environment_config() -> MCPToolResult: config = get_environment_config() return MCPToolResult( content=[ { "type": "text", "text": f"Current environment:\n" f" Type: {config.env_type}\n" f" Python: {config.python_executable}\n" f" Storage: {config.code_storage_dir}\n" f" Conda env: {config.conda_env_name or 'N/A'}\n" f" Venv path: {config.venv_path or 'N/A'}\n" f" UV venv path: {config.uv_venv_path or 'N/A'}", } ], isError=False, ) def _handle_initialize_code_file(args: dict) -> MCPToolResult: fm = FileManager() result = fm.create_file(args["content"], args.get("filename")) if result.success: return MCPToolResult( content=[ { "type": "text", "text": f"File created: {result.file_path}\n{result.message}", } ], isError=False, ) return MCPToolResult( content=[{"type": "text", "text": f"Error: {result.message}"}], isError=True, ) def _handle_append_to_code_file(args: dict) -> MCPToolResult: fm = FileManager() result = fm.append_to_file(args["file_path"], args["content"]) return MCPToolResult( content=[ { "type": "text", "text": result.message + (f"\nFile: {result.file_path}" if result.success else ""), } ], isError=not result.success, ) async def _handle_execute_code_file(args: dict) -> MCPToolResult: executor = get_executor() result = await executor.execute_file(args["file_path"]) output_parts = [] if result.stdout: output_parts.append(f"STDOUT:\n{result.stdout}") if result.stderr: output_parts.append(f"STDERR:\n{result.stderr}") output_parts.append(f"Return code: {result.return_code}") output_parts.append(f"Execution time: {result.execution_time:.2f}s") return MCPToolResult( content=[{"type": "text", "text": "\n".join(output_parts)}], isError=not result.success, ) def _handle_read_code_file(args: dict) -> MCPToolResult: fm = FileManager() result = fm.read_file(args["file_path"]) if result.success: return MCPToolResult( content=[ { "type": "text", "text": f"File: {result.file_path}\n\n{result.content}", } ], isError=False, ) return MCPToolResult( content=[{"type": "text", "text": f"Error: {result.message}"}], isError=True, )