#!/usr/bin/env python3 """ Test script for both MCP interfaces in the Network Forensics Environment. This script tests both: 1. Simplified MCP interface at /mcp (OpenEnv custom protocol) 2. Standard MCP interface at /mcp-standard (full MCP protocol) Usage: python test_mcp_interfaces.py Requirements: - Network forensics server running on http://localhost:8000 - Both MCP interfaces mounted and accessible """ import json import requests import websocket import time from typing import Dict, Any # Server configuration BASE_URL = "http://localhost:8000" SIMPLIFIED_MCP_URL = f"{BASE_URL}/mcp" STANDARD_MCP_URL = f"{BASE_URL}/mcp-standard" STANDARD_MCP_WS_URL = "ws://localhost:8000/mcp-standard/ws" def test_simplified_mcp(): """Test the simplified MCP interface (OpenEnv custom protocol).""" print("=== Testing Simplified MCP Interface ===") try: # Test health check health_resp = requests.get(f"{BASE_URL}/health") print(f"✓ Health check: {health_resp.status_code} - {health_resp.json()}") # Test MCP info endpoint info_resp = requests.get(f"{BASE_URL}/mcp-info") if info_resp.status_code == 200: print(f"✓ MCP info available: {len(info_resp.json().get('mcp_interfaces', {}))} interfaces") # Test simplified MCP (this would normally use WebSocket, but we'll test HTTP availability) print("✓ Simplified MCP interface available at /mcp") return True except Exception as e: print(f"✗ Simplified MCP test failed: {e}") return False def test_standard_mcp_http(): """Test the standard MCP interface via HTTP.""" print("\n=== Testing Standard MCP Interface (HTTP) ===") try: # Test standard MCP health health_resp = requests.get(f"{STANDARD_MCP_URL}/health") print(f"✓ Standard MCP health: {health_resp.status_code} - {health_resp.json()}") # Test initialize init_payload = { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"} } init_resp = requests.post(f"{STANDARD_MCP_URL}/initialize", json=init_payload) if init_resp.status_code == 200: print(f"✓ Initialize successful: {init_resp.json().get('serverInfo', {}).get('name')}") else: print(f"✗ Initialize failed: {init_resp.status_code} - {init_resp.text}") return False # Test tools/list tools_resp = requests.post(f"{STANDARD_MCP_URL}/tools/list", json={}) if tools_resp.status_code == 200: tools = tools_resp.json().get('tools', []) print(f"✓ Tools list: {len(tools)} tools available") for tool in tools[:3]: # Show first 3 tools print(f" - {tool.get('name')}: {tool.get('description', '')[:50]}...") else: print(f"✗ Tools list failed: {tools_resp.status_code}") return False return True except Exception as e: print(f"✗ Standard MCP HTTP test failed: {e}") return False def test_standard_mcp_websocket(): """Test the standard MCP interface via WebSocket.""" print("\n=== Testing Standard MCP Interface (WebSocket) ===") try: ws = websocket.create_connection(STANDARD_MCP_WS_URL) print("✓ WebSocket connection established") # Test initialize via WebSocket init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"} } } ws.send(json.dumps(init_request)) init_response = json.loads(ws.recv()) if "result" in init_response: print(f"✓ WebSocket initialize successful: {init_response['result'].get('serverInfo', {}).get('name')}") else: print(f"✗ WebSocket initialize failed: {init_response.get('error', 'Unknown error')}") ws.close() return False # Test tools/list via WebSocket tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {} } ws.send(json.dumps(tools_request)) tools_response = json.loads(ws.recv()) if "result" in tools_response: tools = tools_response["result"].get("tools", []) print(f"✓ WebSocket tools list: {len(tools)} tools available") else: print(f"✗ WebSocket tools list failed: {tools_response.get('error', 'Unknown error')}") ws.close() return True except Exception as e: print(f"✗ Standard MCP WebSocket test failed: {e}") return False def test_forensics_workflow(): """Test a complete forensics workflow using standard MCP.""" print("\n=== Testing Complete Forensics Workflow ===") try: # Initialize environment init_resp = requests.post(f"{STANDARD_MCP_URL}/initialize", json={ "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "workflow-test", "version": "1.0.0"} }) if init_resp.status_code != 200: print(f"✗ Workflow initialization failed") return False # Get available tools tools_resp = requests.post(f"{STANDARD_MCP_URL}/tools/list", json={}) tools = tools_resp.json().get('tools', []) # Test a simple workflow print(f"✓ Starting forensics workflow with {len(tools)} tools") # Reset environment reset_resp = requests.post(f"{STANDARD_MCP_URL}/tools/call", json={ "name": "reset_env", "arguments": {"task_id": "easy"} }) if reset_resp.status_code == 200: print("✓ Environment reset for easy task") else: print(f"✗ Environment reset failed: {reset_resp.status_code}") return False # Get status status_resp = requests.post(f"{STANDARD_MCP_URL}/tools/call", json={ "name": "get_status", "arguments": {} }) if status_resp.status_code == 200: print("✓ Status retrieved successfully") else: print(f"✗ Status retrieval failed: {status_resp.status_code}") return True except Exception as e: print(f"✗ Workflow test failed: {e}") return False def main(): """Run all MCP interface tests.""" print("Network Forensics MCP Interface Test Suite") print("=" * 50) # Check if server is running try: health_resp = requests.get(f"{BASE_URL}/health", timeout=5) if health_resp.status_code != 200: print(f"❌ Server not responding properly: {health_resp.status_code}") print("Please ensure the server is running: python -m server.app") return except requests.exceptions.RequestException as e: print(f"❌ Cannot connect to server at {BASE_URL}") print("Please start the server: python -m server.app") return print(f"✓ Server detected at {BASE_URL}") print() # Run tests results = [] # Test simplified MCP results.append(("Simplified MCP", test_simplified_mcp())) # Test standard MCP HTTP results.append(("Standard MCP (HTTP)", test_standard_mcp_http())) # Test standard MCP WebSocket results.append(("Standard MCP (WebSocket)", test_standard_mcp_websocket())) # Test complete workflow results.append(("Forensics Workflow", test_forensics_workflow())) # Summary print("\n" + "=" * 50) print("Test Summary:") print("=" * 50) passed = sum(1 for _, result in results if result) total = len(results) for test_name, result in results: status = "✅ PASS" if result else "❌ FAIL" print(f"{status} {test_name}") print(f"\nOverall: {passed}/{total} tests passed") if passed == total: print("🎉 All tests passed! Both MCP interfaces are working correctly.") else: print("⚠️ Some tests failed. Check the server logs for details.") if __name__ == "__main__": main()