codeMcp / app /mcp_handler.py
sarveshpatel's picture
Create app/mcp_handler.py
ba6de18 verified
"""MCP protocol handler - maps MCP tool calls to actual operations."""
from __future__ import annotations
import logging
from typing import Any
from app.environment import configure_environment, get_environment_config
from app.executor import get_executor
from app.file_manager import FileManager
from app.models import MCPToolResult
from app.package_manager import get_package_manager
logger = logging.getLogger(__name__)
# Tool definitions for MCP list_tools
TOOL_DEFINITIONS = [
{
"name": "execute_code",
"description": "Executes Python code in the configured environment. Best for short code snippets.",
"inputSchema": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python code to execute",
},
"filename": {
"type": "string",
"description": "Optional filename (without extension)",
},
},
"required": ["code"],
},
},
{
"name": "install_dependencies",
"description": "Installs Python packages in the environment.",
"inputSchema": {
"type": "object",
"properties": {
"packages": {
"type": "array",
"items": {"type": "string"},
"description": "List of package names to install",
},
},
"required": ["packages"],
},
},
{
"name": "check_installed_packages",
"description": "Checks if packages are already installed in the environment.",
"inputSchema": {
"type": "object",
"properties": {
"packages": {
"type": "array",
"items": {"type": "string"},
"description": "List of package names to check",
},
},
"required": ["packages"],
},
},
{
"name": "configure_environment",
"description": "Dynamically changes the environment configuration.",
"inputSchema": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["conda", "venv", "venv-uv"],
"description": "Environment type",
},
"conda_name": {
"type": "string",
"description": "Conda environment name (required for conda type)",
},
"venv_path": {
"type": "string",
"description": "Virtualenv path (required for venv type)",
},
"uv_venv_path": {
"type": "string",
"description": "UV virtualenv path (required for venv-uv type)",
},
},
"required": ["type"],
},
},
{
"name": "get_environment_config",
"description": "Gets the current environment configuration.",
"inputSchema": {
"type": "object",
"properties": {},
},
},
{
"name": "initialize_code_file",
"description": "Creates a new Python file with initial content. Use this as the first step for longer code that may exceed token limits.",
"inputSchema": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "Initial file content",
},
"filename": {
"type": "string",
"description": "Optional filename (without extension)",
},
},
"required": ["content"],
},
},
{
"name": "append_to_code_file",
"description": "Appends content to an existing Python code file.",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the existing code file",
},
"content": {
"type": "string",
"description": "Content to append",
},
},
"required": ["file_path", "content"],
},
},
{
"name": "execute_code_file",
"description": "Executes an existing Python file.",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the code file to execute",
},
},
"required": ["file_path"],
},
},
{
"name": "read_code_file",
"description": "Reads the content of an existing Python code file.",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the code file to read",
},
},
"required": ["file_path"],
},
},
]
async def handle_tool_call(name: str, arguments: dict[str, Any]) -> MCPToolResult:
"""Route an MCP tool call to the appropriate handler."""
try:
if name == "execute_code":
return await _handle_execute_code(arguments)
elif name == "install_dependencies":
return await _handle_install_dependencies(arguments)
elif name == "check_installed_packages":
return await _handle_check_packages(arguments)
elif name == "configure_environment":
return _handle_configure_environment(arguments)
elif name == "get_environment_config":
return _handle_get_environment_config()
elif name == "initialize_code_file":
return _handle_initialize_code_file(arguments)
elif name == "append_to_code_file":
return _handle_append_to_code_file(arguments)
elif name == "execute_code_file":
return await _handle_execute_code_file(arguments)
elif name == "read_code_file":
return _handle_read_code_file(arguments)
else:
return MCPToolResult(
content=[{"type": "text", "text": f"Unknown tool: {name}"}],
isError=True,
)
except Exception as e:
logger.exception("Error handling tool call: %s", name)
return MCPToolResult(
content=[{"type": "text", "text": f"Error: {str(e)}"}],
isError=True,
)
async def _handle_execute_code(args: dict) -> MCPToolResult:
executor = get_executor()
result = await executor.execute_code(args["code"], args.get("filename"))
output_parts = []
if result.stdout:
output_parts.append(f"STDOUT:\n{result.stdout}")
if result.stderr:
output_parts.append(f"STDERR:\n{result.stderr}")
output_parts.append(f"Return code: {result.return_code}")
output_parts.append(f"Execution time: {result.execution_time:.2f}s")
output_parts.append(f"File: {result.file_path}")
return MCPToolResult(
content=[{"type": "text", "text": "\n".join(output_parts)}],
isError=not result.success,
)
async def _handle_install_dependencies(args: dict) -> MCPToolResult:
pm = get_package_manager()
result = await pm.install_packages(args["packages"])
text = f"Installation {'succeeded' if result['success'] else 'failed'}\n"
if result.get("stdout"):
text += f"Output:\n{result['stdout']}\n"
if result.get("stderr"):
text += f"Errors:\n{result['stderr']}\n"
return MCPToolResult(
content=[{"type": "text", "text": text}],
isError=not result["success"],
)
async def _handle_check_packages(args: dict) -> MCPToolResult:
pm = get_package_manager()
results = await pm.check_packages(args["packages"])
lines = []
for r in results:
status = "✓ installed" if r.installed else "✗ not installed"
version = f" (v{r.version})" if r.version and r.version != "unknown" else ""
lines.append(f" {r.package}: {status}{version}")
return MCPToolResult(
content=[{"type": "text", "text": "Package status:\n" + "\n".join(lines)}],
isError=False,
)
def _handle_configure_environment(args: dict) -> MCPToolResult:
try:
config = configure_environment(
env_type=args["type"],
conda_name=args.get("conda_name"),
venv_path=args.get("venv_path"),
uv_venv_path=args.get("uv_venv_path"),
)
return MCPToolResult(
content=[
{
"type": "text",
"text": f"Environment configured:\n"
f" Type: {config.env_type}\n"
f" Python: {config.python_executable}\n"
f" Storage: {config.code_storage_dir}",
}
],
isError=False,
)
except ValueError as e:
return MCPToolResult(
content=[{"type": "text", "text": f"Configuration error: {str(e)}"}],
isError=True,
)
def _handle_get_environment_config() -> MCPToolResult:
config = get_environment_config()
return MCPToolResult(
content=[
{
"type": "text",
"text": f"Current environment:\n"
f" Type: {config.env_type}\n"
f" Python: {config.python_executable}\n"
f" Storage: {config.code_storage_dir}\n"
f" Conda env: {config.conda_env_name or 'N/A'}\n"
f" Venv path: {config.venv_path or 'N/A'}\n"
f" UV venv path: {config.uv_venv_path or 'N/A'}",
}
],
isError=False,
)
def _handle_initialize_code_file(args: dict) -> MCPToolResult:
fm = FileManager()
result = fm.create_file(args["content"], args.get("filename"))
if result.success:
return MCPToolResult(
content=[
{
"type": "text",
"text": f"File created: {result.file_path}\n{result.message}",
}
],
isError=False,
)
return MCPToolResult(
content=[{"type": "text", "text": f"Error: {result.message}"}],
isError=True,
)
def _handle_append_to_code_file(args: dict) -> MCPToolResult:
fm = FileManager()
result = fm.append_to_file(args["file_path"], args["content"])
return MCPToolResult(
content=[
{
"type": "text",
"text": result.message + (f"\nFile: {result.file_path}" if result.success else ""),
}
],
isError=not result.success,
)
async def _handle_execute_code_file(args: dict) -> MCPToolResult:
executor = get_executor()
result = await executor.execute_file(args["file_path"])
output_parts = []
if result.stdout:
output_parts.append(f"STDOUT:\n{result.stdout}")
if result.stderr:
output_parts.append(f"STDERR:\n{result.stderr}")
output_parts.append(f"Return code: {result.return_code}")
output_parts.append(f"Execution time: {result.execution_time:.2f}s")
return MCPToolResult(
content=[{"type": "text", "text": "\n".join(output_parts)}],
isError=not result.success,
)
def _handle_read_code_file(args: dict) -> MCPToolResult:
fm = FileManager()
result = fm.read_file(args["file_path"])
if result.success:
return MCPToolResult(
content=[
{
"type": "text",
"text": f"File: {result.file_path}\n\n{result.content}",
}
],
isError=False,
)
return MCPToolResult(
content=[{"type": "text", "text": f"Error: {result.message}"}],
isError=True,
)