Kali / mcp_client.py
SAUSERLA's picture
Upload 4 files
33b7766 verified
#!/usr/bin/env python3
import argparse
import json
import logging
import sys
import time
from typing import Dict, Any, List
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
class KaliMCPClient:
"""MCP Client for Kali Linux tools"""
def __init__(self, server_url: str, timeout: int = 300):
self.server_url = server_url.rstrip('/')
self.timeout = timeout
self.session = None
self._init_session()
def _init_session(self):
"""Initialize HTTP session"""
try:
import requests
self.session = requests.Session()
self.session.timeout = self.timeout
except ImportError:
logger.error("requests library not found. Install with: pip install requests")
sys.exit(1)
def _make_request(self, method: str, endpoint: str, data: Dict = None) -> Dict[str, Any]:
"""Make HTTP request to the MCP server"""
url = f"{self.server_url}{endpoint}"
try:
if method.upper() == "GET":
response = self.session.get(url)
elif method.upper() == "POST":
response = self.session.post(url, json=data)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Request failed: {str(e)}")
return {"error": str(e), "success": False}
def health_check(self) -> Dict[str, Any]:
"""Check server health"""
return self._make_request("GET", "/health")
def execute_command(self, command: str) -> Dict[str, Any]:
"""Execute a command on the Kali server"""
data = {"command": command}
return self._make_request("POST", "/api/command", data)
def get_capabilities(self) -> Dict[str, Any]:
"""Get MCP tool capabilities"""
return self._make_request("GET", "/mcp/capabilities")
def list_tools(self) -> List[str]:
"""List available MCP tools"""
caps = self.get_capabilities()
if "tools" in caps:
return [tool["name"] for tool in caps["tools"]]
return []
def run_tool(self, tool_name: str, **kwargs) -> Dict[str, Any]:
"""Run an MCP tool with parameters"""
data = kwargs
return self._make_request("POST", f"/mcp/tools/{tool_name}", data)
def interactive_mode(client: KaliMCPClient):
"""Run interactive command mode"""
print("πŸ”₯ Kali Linux MCP Client - Interactive Mode")
print("Type 'help' for commands, 'quit' to exit")
print("-" * 50)
while True:
try:
cmd = input("kali-mcp> ").strip()
if not cmd:
continue
if cmd.lower() in ['quit', 'exit', 'q']:
print("Goodbye! πŸ‘‹")
break
if cmd.lower() == 'help':
print("""
Available commands:
help - Show this help
health - Check server health
tools - List available tools
exec <cmd> - Execute shell command
quit - Exit interactive mode
Examples:
exec nmap -sV 127.0.0.1
exec sqlmap -u "http://example.com?id=1"
exec hydra -l admin -P /usr/share/wordlists/rockyou.txt example.com http-post-form "/login:username=^USER^&password=^PASS^:Invalid"
""")
continue
if cmd.lower() == 'health':
result = client.health_check()
print(json.dumps(result, indent=2))
continue
if cmd.lower() == 'tools':
tools = client.list_tools()
print("Available MCP tools:")
for tool in tools:
print(f" - {tool}")
continue
if cmd.startswith('exec '):
command = cmd[5:].strip()
if not command:
print("Error: No command provided. Use: exec <command>")
continue
print(f"Executing: {command}")
print("-" * 50)
result = client.execute_command(command)
if result.get("success"):
if result.get("stdout"):
print("STDOUT:")
print(result["stdout"])
if result.get("stderr"):
print("STDERR:")
print(result["stderr"])
else:
print("Command failed!")
if result.get("stderr"):
print("Error:", result["stderr"])
if result.get("timed_out"):
print("Note: Command timed out after 180 seconds")
print("-" * 50)
continue
print(f"Unknown command: {cmd}")
print("Type 'help' for available commands")
except KeyboardInterrupt:
print("\nGoodbye! πŸ‘‹")
break
except Exception as e:
print(f"Error: {str(e)}")
def main():
parser = argparse.ArgumentParser(description="Kali Linux MCP Client")
parser.add_argument("--server", default="http://localhost:5000",
help="Kali MCP server URL")
parser.add_argument("--timeout", type=int, default=300,
help="Request timeout in seconds")
parser.add_argument("--command", help="Execute single command and exit")
parser.add_argument("--interactive", action="store_true",
help="Start interactive mode")
args = parser.parse_args()
# Create client
client = KaliMCPClient(args.server, args.timeout)
# Test connection
logger.info(f"Connecting to Kali MCP server at {args.server}")
health = client.health_check()
if "error" in health:
logger.error(f"Failed to connect to server: {health['error']}")
sys.exit(1)
logger.info("βœ… Successfully connected to Kali MCP server")
logger.info(f"Server status: {health.get('status', 'unknown')}")
# Handle different modes
if args.command:
# Single command mode
logger.info(f"Executing command: {args.command}")
result = client.execute_command(args.command)
if result.get("success"):
if result.get("stdout"):
print(result["stdout"])
if result.get("stderr"):
print("STDERR:", result["stderr"], file=sys.stderr)
else:
print("Command failed!", file=sys.stderr)
if result.get("stderr"):
print(result["stderr"], file=sys.stderr)
sys.exit(1)
elif args.interactive:
# Interactive mode
interactive_mode(client)
else:
# Default: show server info
print("πŸ”₯ Kali Linux MCP Client")
print(f"Server: {args.server}")
print(f"Status: {health.get('status', 'unknown')}")
tools = client.list_tools()
print(f"Available tools: {len(tools)}")
for tool in tools:
print(f" - {tool}")
print("\nUsage:")
print(" python mcp_client.py --interactive # Start interactive mode")
print(" python mcp_client.py --command 'nmap -sV 127.0.0.1' # Run single command")
if __name__ == "__main__":
main()