| |
|
|
| import argparse |
| import json |
| import logging |
| import sys |
| import time |
| from typing import Dict, Any, List |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| handlers=[logging.StreamHandler(sys.stdout)] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| class KaliMCPClient: |
| """MCP Client for Kali Linux tools""" |
|
|
| def __init__(self, server_url: str, timeout: int = 300): |
| self.server_url = server_url.rstrip('/') |
| self.timeout = timeout |
| self.session = None |
| self._init_session() |
|
|
| def _init_session(self): |
| """Initialize HTTP session""" |
| try: |
| import requests |
| self.session = requests.Session() |
| self.session.timeout = self.timeout |
| except ImportError: |
| logger.error("requests library not found. Install with: pip install requests") |
| sys.exit(1) |
|
|
| def _make_request(self, method: str, endpoint: str, data: Dict = None) -> Dict[str, Any]: |
| """Make HTTP request to the MCP server""" |
| url = f"{self.server_url}{endpoint}" |
|
|
| try: |
| if method.upper() == "GET": |
| response = self.session.get(url) |
| elif method.upper() == "POST": |
| response = self.session.post(url, json=data) |
| else: |
| raise ValueError(f"Unsupported HTTP method: {method}") |
|
|
| response.raise_for_status() |
| return response.json() |
|
|
| except Exception as e: |
| logger.error(f"Request failed: {str(e)}") |
| return {"error": str(e), "success": False} |
|
|
| def health_check(self) -> Dict[str, Any]: |
| """Check server health""" |
| return self._make_request("GET", "/health") |
|
|
| def execute_command(self, command: str) -> Dict[str, Any]: |
| """Execute a command on the Kali server""" |
| data = {"command": command} |
| return self._make_request("POST", "/api/command", data) |
|
|
| def get_capabilities(self) -> Dict[str, Any]: |
| """Get MCP tool capabilities""" |
| return self._make_request("GET", "/mcp/capabilities") |
|
|
| def list_tools(self) -> List[str]: |
| """List available MCP tools""" |
| caps = self.get_capabilities() |
| if "tools" in caps: |
| return [tool["name"] for tool in caps["tools"]] |
| return [] |
|
|
| def run_tool(self, tool_name: str, **kwargs) -> Dict[str, Any]: |
| """Run an MCP tool with parameters""" |
| data = kwargs |
| return self._make_request("POST", f"/mcp/tools/{tool_name}", data) |
|
|
| def interactive_mode(client: KaliMCPClient): |
| """Run interactive command mode""" |
| print("π₯ Kali Linux MCP Client - Interactive Mode") |
| print("Type 'help' for commands, 'quit' to exit") |
| print("-" * 50) |
|
|
| while True: |
| try: |
| cmd = input("kali-mcp> ").strip() |
|
|
| if not cmd: |
| continue |
|
|
| if cmd.lower() in ['quit', 'exit', 'q']: |
| print("Goodbye! π") |
| break |
|
|
| if cmd.lower() == 'help': |
| print(""" |
| Available commands: |
| help - Show this help |
| health - Check server health |
| tools - List available tools |
| exec <cmd> - Execute shell command |
| quit - Exit interactive mode |
| |
| Examples: |
| exec nmap -sV 127.0.0.1 |
| exec sqlmap -u "http://example.com?id=1" |
| exec hydra -l admin -P /usr/share/wordlists/rockyou.txt example.com http-post-form "/login:username=^USER^&password=^PASS^:Invalid" |
| """) |
| continue |
|
|
| if cmd.lower() == 'health': |
| result = client.health_check() |
| print(json.dumps(result, indent=2)) |
| continue |
|
|
| if cmd.lower() == 'tools': |
| tools = client.list_tools() |
| print("Available MCP tools:") |
| for tool in tools: |
| print(f" - {tool}") |
| continue |
|
|
| if cmd.startswith('exec '): |
| command = cmd[5:].strip() |
| if not command: |
| print("Error: No command provided. Use: exec <command>") |
| continue |
|
|
| print(f"Executing: {command}") |
| print("-" * 50) |
|
|
| result = client.execute_command(command) |
|
|
| if result.get("success"): |
| if result.get("stdout"): |
| print("STDOUT:") |
| print(result["stdout"]) |
| if result.get("stderr"): |
| print("STDERR:") |
| print(result["stderr"]) |
| else: |
| print("Command failed!") |
| if result.get("stderr"): |
| print("Error:", result["stderr"]) |
| if result.get("timed_out"): |
| print("Note: Command timed out after 180 seconds") |
|
|
| print("-" * 50) |
| continue |
|
|
| print(f"Unknown command: {cmd}") |
| print("Type 'help' for available commands") |
|
|
| except KeyboardInterrupt: |
| print("\nGoodbye! π") |
| break |
| except Exception as e: |
| print(f"Error: {str(e)}") |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Kali Linux MCP Client") |
| parser.add_argument("--server", default="http://localhost:5000", |
| help="Kali MCP server URL") |
| parser.add_argument("--timeout", type=int, default=300, |
| help="Request timeout in seconds") |
| parser.add_argument("--command", help="Execute single command and exit") |
| parser.add_argument("--interactive", action="store_true", |
| help="Start interactive mode") |
|
|
| args = parser.parse_args() |
|
|
| |
| client = KaliMCPClient(args.server, args.timeout) |
|
|
| |
| logger.info(f"Connecting to Kali MCP server at {args.server}") |
| health = client.health_check() |
|
|
| if "error" in health: |
| logger.error(f"Failed to connect to server: {health['error']}") |
| sys.exit(1) |
|
|
| logger.info("β
Successfully connected to Kali MCP server") |
| logger.info(f"Server status: {health.get('status', 'unknown')}") |
|
|
| |
| if args.command: |
| |
| logger.info(f"Executing command: {args.command}") |
| result = client.execute_command(args.command) |
|
|
| if result.get("success"): |
| if result.get("stdout"): |
| print(result["stdout"]) |
| if result.get("stderr"): |
| print("STDERR:", result["stderr"], file=sys.stderr) |
| else: |
| print("Command failed!", file=sys.stderr) |
| if result.get("stderr"): |
| print(result["stderr"], file=sys.stderr) |
| sys.exit(1) |
|
|
| elif args.interactive: |
| |
| interactive_mode(client) |
|
|
| else: |
| |
| print("π₯ Kali Linux MCP Client") |
| print(f"Server: {args.server}") |
| print(f"Status: {health.get('status', 'unknown')}") |
|
|
| tools = client.list_tools() |
| print(f"Available tools: {len(tools)}") |
| for tool in tools: |
| print(f" - {tool}") |
|
|
| print("\nUsage:") |
| print(" python mcp_client.py --interactive # Start interactive mode") |
| print(" python mcp_client.py --command 'nmap -sV 127.0.0.1' # Run single command") |
|
|
| if __name__ == "__main__": |
| main() |