Kali / mcp_bridge.py
SAUSERLA's picture
Upload 6 files
d309ec7 verified
#!/usr/bin/env python3
import asyncio
import json
import logging
import sys
import requests
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("kali-mcp-bridge")
# Configuration
KALI_API_URL = "https://sauserla-kali.hf.space"
class KaliMCPBridge:
"""MCP Bridge to Kali Linux API"""
def __init__(self, api_url: str = KALI_API_URL):
self.api_url = api_url.rstrip('/')
self.session = requests.Session()
logger.info(f"Initialized Kali MCP Bridge with API: {self.api_url}")
def call_api(self, endpoint: str, method: str = "GET", data: dict = None) -> dict:
"""Make API call to Kali server"""
url = f"{self.api_url}{endpoint}"
try:
if method.upper() == "POST":
response = self.session.post(url, json=data, timeout=300)
else:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API call failed: {str(e)}")
return {"error": str(e)}
def execute_command(self, command: str) -> dict:
"""Execute a command via the API"""
data = {"command": command}
return self.call_api("/api/command", "POST", data)
def health_check(self) -> dict:
"""Check server health"""
return self.call_api("/health")
# MCP Protocol Implementation (Simplified)
class MCPServer:
"""Simple MCP server implementation"""
def __init__(self):
self.kali_bridge = KaliMCPBridge()
self.tools = {
"execute_command": {
"name": "execute_command",
"description": "Execute arbitrary commands on Kali Linux",
"inputSchema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute on Kali Linux"
}
},
"required": ["command"]
}
},
"server_health": {
"name": "server_health",
"description": "Check the health status of the Kali Linux server",
"inputSchema": {
"type": "object",
"properties": {}
}
}
}
def handle_initialize(self, request: dict) -> dict:
"""Handle initialization request"""
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "kali-mcp-bridge",
"version": "1.0.0"
}
}
}
def handle_tools_list(self, request: dict) -> dict:
"""Handle tools/list request"""
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"tools": list(self.tools.values())
}
}
def handle_tools_call(self, request: dict) -> dict:
"""Handle tools/call request"""
params = request.get("params", {})
tool_name = params.get("name")
tool_args = params.get("arguments", {})
try:
if tool_name == "execute_command":
command = tool_args.get("command", "")
if not command:
raise ValueError("Command parameter is required")
result = self.kali_bridge.execute_command(command)
if result.get("error"):
content = f"Error: {result['error']}"
else:
output_parts = []
if result.get("stdout"):
output_parts.append(f"STDOUT:\n{result['stdout']}")
if result.get("stderr"):
output_parts.append(f"STDERR:\n{result['stderr']}")
status = "SUCCESS" if result.get("success") else "FAILED"
output_parts.append(f"Exit Code: {result.get('return_code', 'unknown')}")
output_parts.append(f"Status: {status}")
if result.get("timed_out"):
output_parts.append("WARNING: Command timed out after 180 seconds")
content = "\n\n".join(output_parts)
elif tool_name == "server_health":
health = self.kali_bridge.health_check()
if health.get("error"):
content = f"Health check failed: {health['error']}"
else:
output_parts = []
output_parts.append(f"Status: {health.get('status', 'unknown')}")
output_parts.append(f"Message: {health.get('message', '')}")
output_parts.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}")
tools_status = health.get('tools_status', {})
if tools_status:
output_parts.append("\nTool Status:")
for tool_name, available in tools_status.items():
status_icon = "✅" if available else "❌"
output_parts.append(f" {status_icon} {tool_name}")
content = "\n".join(output_parts)
else:
content = f"Unknown tool: {tool_name}"
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"content": [{"type": "text", "text": content}]
}
}
except Exception as e:
logger.error(f"Tool execution error: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32000,
"message": f"Tool execution failed: {str(e)}"
}
}
def process_request(self, line: str) -> str:
"""Process a single JSON-RPC request"""
try:
request = json.loads(line.strip())
method = request.get("method")
if method == "initialize":
response = self.handle_initialize(request)
elif method == "tools/list":
response = self.handle_tools_list(request)
elif method == "tools/call":
response = self.handle_tools_call(request)
else:
response = {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
return json.dumps(response)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {str(e)}")
return json.dumps({
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
})
except Exception as e:
logger.error(f"Request processing error: {str(e)}")
return json.dumps({
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
})
def main():
"""Main server loop"""
server = MCPServer()
# Test connection on startup
logger.info("Testing connection to Kali API...")
health = server.kali_bridge.health_check()
if health.get("error"):
logger.error(f"Failed to connect to Kali API: {health['error']}")
sys.exit(1)
else:
logger.info("✅ Successfully connected to Kali API server")
logger.info("Kali MCP Bridge started. Ready to accept MCP requests...")
# Main request/response loop
for line in sys.stdin:
if line.strip():
response = server.process_request(line)
print(response, flush=True)
if __name__ == "__main__":
main()