File size: 5,136 Bytes
37f9172
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3

import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict, List, Sequence

# MCP SDK imports
from mcp import Tool
from mcp.server import Server
from mcp.types import (
    TextContent,
    ImageContent,
    EmbeddedResource,
    LoggingLevel
)

# HTTP client for API calls
import requests

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kali-mcp-server")

# Configuration
KALI_API_URL = os.environ.get("KALI_API_URL", "https://sauserla-kali.hf.space")

class KaliMCPServer:
    """MCP Server for Kali Linux tools"""

    def __init__(self, api_url: str = KALI_API_URL):
        self.api_url = api_url.rstrip('/')
        self.session = requests.Session()
        logger.info(f"Initialized Kali MCP Server with API: {self.api_url}")

    def _call_api(self, endpoint: str, method: str = "GET", data: Dict = None) -> Dict[str, Any]:
        """Make API call to Kali server"""
        url = f"{self.api_url}{endpoint}"

        try:
            if method.upper() == "POST":
                response = self.session.post(url, json=data, timeout=300)
            else:
                response = self.session.get(url, timeout=30)

            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            logger.error(f"API call failed: {str(e)}")
            return {"error": str(e)}

    def get_capabilities(self) -> Dict[str, Any]:
        """Get MCP tool capabilities"""
        return self._call_api("/mcp/capabilities")

    def execute_command(self, command: str) -> Dict[str, Any]:
        """Execute a command via the API"""
        data = {"command": command}
        return self._call_api("/api/command", "POST", data)

    def health_check(self) -> Dict[str, Any]:
        """Check server health"""
        return self._call_api("/health")

# Global server instance
kali_server = KaliMCPServer()

# MCP Server setup
server = Server("kali-mcp-server")

@server.tool()
async def execute_command(command: str) -> str:
    """Execute arbitrary commands on Kali Linux

    Args:
        command: The shell command to execute on the Kali Linux system

    Returns:
        Command execution results including stdout, stderr, and exit code
    """
    logger.info(f"Executing command: {command}")

    try:
        result = kali_server.execute_command(command)

        if result.get("error"):
            return f"Error: {result['error']}"

        output = []
        if result.get("stdout"):
            output.append(f"STDOUT:\n{result['stdout']}")
        if result.get("stderr"):
            output.append(f"STDERR:\n{result['stderr']}")

        status = "SUCCESS" if result.get("success") else "FAILED"
        output.append(f"Exit Code: {result.get('return_code', 'unknown')}")
        output.append(f"Status: {status}")

        if result.get("timed_out"):
            output.append("WARNING: Command timed out after 180 seconds")

        return "\n\n".join(output)

    except Exception as e:
        logger.error(f"Tool execution error: {str(e)}")
        return f"Execution failed: {str(e)}"

@server.tool()
async def server_health() -> str:
    """Check the health status of the Kali Linux server and available tools

    Returns:
        Server health information and tool availability status
    """
    try:
        health = kali_server.health_check()

        if health.get("error"):
            return f"Health check failed: {health['error']}"

        output = []
        output.append(f"Status: {health.get('status', 'unknown')}")
        output.append(f"Message: {health.get('message', '')}")
        output.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}")

        tools_status = health.get('tools_status', {})
        if tools_status:
            output.append("\nTool Status:")
            for tool, available in tools_status.items():
                status_icon = "✅" if available else "❌"
                output.append(f"  {status_icon} {tool}")

        return "\n".join(output)

    except Exception as e:
        logger.error(f"Health check error: {str(e)}")
        return f"Health check failed: {str(e)}"

async def main():
    """Main server entry point"""
    # Import here to avoid issues if MCP SDK is not available
    from mcp.server.stdio import stdio_server

    logger.info("Starting Kali Linux MCP Server")

    # Test connection on startup
    try:
        health = kali_server.health_check()
        if health.get("error"):
            logger.error(f"Failed to connect to Kali API: {health['error']}")
            sys.exit(1)
        else:
            logger.info("✅ Successfully connected to Kali API server")
    except Exception as e:
        logger.error(f"Connection test failed: {str(e)}")
        sys.exit(1)

    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())