File size: 8,653 Bytes
d9ac8a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/usr/bin/env python3
"""
Test script for both MCP interfaces in the Network Forensics Environment.

This script tests both:
1. Simplified MCP interface at /mcp (OpenEnv custom protocol)
2. Standard MCP interface at /mcp-standard (full MCP protocol)

Usage:
    python test_mcp_interfaces.py
    
Requirements:
    - Network forensics server running on http://localhost:8000
    - Both MCP interfaces mounted and accessible
"""

import json
import requests
import websocket
import time
from typing import Dict, Any

# Server configuration
BASE_URL = "http://localhost:8000"
SIMPLIFIED_MCP_URL = f"{BASE_URL}/mcp"
STANDARD_MCP_URL = f"{BASE_URL}/mcp-standard"
STANDARD_MCP_WS_URL = "ws://localhost:8000/mcp-standard/ws"

def test_simplified_mcp():
    """Test the simplified MCP interface (OpenEnv custom protocol)."""
    print("=== Testing Simplified MCP Interface ===")
    
    try:
        # Test health check
        health_resp = requests.get(f"{BASE_URL}/health")
        print(f"βœ“ Health check: {health_resp.status_code} - {health_resp.json()}")
        
        # Test MCP info endpoint
        info_resp = requests.get(f"{BASE_URL}/mcp-info")
        if info_resp.status_code == 200:
            print(f"βœ“ MCP info available: {len(info_resp.json().get('mcp_interfaces', {}))} interfaces")
        
        # Test simplified MCP (this would normally use WebSocket, but we'll test HTTP availability)
        print("βœ“ Simplified MCP interface available at /mcp")
        return True
        
    except Exception as e:
        print(f"βœ— Simplified MCP test failed: {e}")
        return False

def test_standard_mcp_http():
    """Test the standard MCP interface via HTTP."""
    print("\n=== Testing Standard MCP Interface (HTTP) ===")
    
    try:
        # Test standard MCP health
        health_resp = requests.get(f"{STANDARD_MCP_URL}/health")
        print(f"βœ“ Standard MCP health: {health_resp.status_code} - {health_resp.json()}")
        
        # Test initialize
        init_payload = {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "test-client", "version": "1.0.0"}
        }
        
        init_resp = requests.post(f"{STANDARD_MCP_URL}/initialize", json=init_payload)
        if init_resp.status_code == 200:
            print(f"βœ“ Initialize successful: {init_resp.json().get('serverInfo', {}).get('name')}")
        else:
            print(f"βœ— Initialize failed: {init_resp.status_code} - {init_resp.text}")
            return False
        
        # Test tools/list
        tools_resp = requests.post(f"{STANDARD_MCP_URL}/tools/list", json={})
        if tools_resp.status_code == 200:
            tools = tools_resp.json().get('tools', [])
            print(f"βœ“ Tools list: {len(tools)} tools available")
            for tool in tools[:3]:  # Show first 3 tools
                print(f"  - {tool.get('name')}: {tool.get('description', '')[:50]}...")
        else:
            print(f"βœ— Tools list failed: {tools_resp.status_code}")
            return False
        
        return True
        
    except Exception as e:
        print(f"βœ— Standard MCP HTTP test failed: {e}")
        return False

def test_standard_mcp_websocket():
    """Test the standard MCP interface via WebSocket."""
    print("\n=== Testing Standard MCP Interface (WebSocket) ===")
    
    try:
        ws = websocket.create_connection(STANDARD_MCP_WS_URL)
        print("βœ“ WebSocket connection established")
        
        # Test initialize via WebSocket
        init_request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "initialize",
            "params": {
                "protocolVersion": "2024-11-05",
                "capabilities": {},
                "clientInfo": {"name": "test-client", "version": "1.0.0"}
            }
        }
        
        ws.send(json.dumps(init_request))
        init_response = json.loads(ws.recv())
        
        if "result" in init_response:
            print(f"βœ“ WebSocket initialize successful: {init_response['result'].get('serverInfo', {}).get('name')}")
        else:
            print(f"βœ— WebSocket initialize failed: {init_response.get('error', 'Unknown error')}")
            ws.close()
            return False
        
        # Test tools/list via WebSocket
        tools_request = {
            "jsonrpc": "2.0",
            "id": 2,
            "method": "tools/list",
            "params": {}
        }
        
        ws.send(json.dumps(tools_request))
        tools_response = json.loads(ws.recv())
        
        if "result" in tools_response:
            tools = tools_response["result"].get("tools", [])
            print(f"βœ“ WebSocket tools list: {len(tools)} tools available")
        else:
            print(f"βœ— WebSocket tools list failed: {tools_response.get('error', 'Unknown error')}")
        
        ws.close()
        return True
        
    except Exception as e:
        print(f"βœ— Standard MCP WebSocket test failed: {e}")
        return False

def test_forensics_workflow():
    """Test a complete forensics workflow using standard MCP."""
    print("\n=== Testing Complete Forensics Workflow ===")
    
    try:
        # Initialize environment
        init_resp = requests.post(f"{STANDARD_MCP_URL}/initialize", json={
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "workflow-test", "version": "1.0.0"}
        })
        
        if init_resp.status_code != 200:
            print(f"βœ— Workflow initialization failed")
            return False
        
        # Get available tools
        tools_resp = requests.post(f"{STANDARD_MCP_URL}/tools/list", json={})
        tools = tools_resp.json().get('tools', [])
        
        # Test a simple workflow
        print(f"βœ“ Starting forensics workflow with {len(tools)} tools")
        
        # Reset environment
        reset_resp = requests.post(f"{STANDARD_MCP_URL}/tools/call", json={
            "name": "reset_env",
            "arguments": {"task_id": "easy"}
        })
        
        if reset_resp.status_code == 200:
            print("βœ“ Environment reset for easy task")
        else:
            print(f"βœ— Environment reset failed: {reset_resp.status_code}")
            return False
        
        # Get status
        status_resp = requests.post(f"{STANDARD_MCP_URL}/tools/call", json={
            "name": "get_status",
            "arguments": {}
        })
        
        if status_resp.status_code == 200:
            print("βœ“ Status retrieved successfully")
        else:
            print(f"βœ— Status retrieval failed: {status_resp.status_code}")
        
        return True
        
    except Exception as e:
        print(f"βœ— Workflow test failed: {e}")
        return False

def main():
    """Run all MCP interface tests."""
    print("Network Forensics MCP Interface Test Suite")
    print("=" * 50)
    
    # Check if server is running
    try:
        health_resp = requests.get(f"{BASE_URL}/health", timeout=5)
        if health_resp.status_code != 200:
            print(f"❌ Server not responding properly: {health_resp.status_code}")
            print("Please ensure the server is running: python -m server.app")
            return
    except requests.exceptions.RequestException as e:
        print(f"❌ Cannot connect to server at {BASE_URL}")
        print("Please start the server: python -m server.app")
        return
    
    print(f"βœ“ Server detected at {BASE_URL}")
    print()
    
    # Run tests
    results = []
    
    # Test simplified MCP
    results.append(("Simplified MCP", test_simplified_mcp()))
    
    # Test standard MCP HTTP
    results.append(("Standard MCP (HTTP)", test_standard_mcp_http()))
    
    # Test standard MCP WebSocket
    results.append(("Standard MCP (WebSocket)", test_standard_mcp_websocket()))
    
    # Test complete workflow
    results.append(("Forensics Workflow", test_forensics_workflow()))
    
    # Summary
    print("\n" + "=" * 50)
    print("Test Summary:")
    print("=" * 50)
    
    passed = sum(1 for _, result in results if result)
    total = len(results)
    
    for test_name, result in results:
        status = "βœ… PASS" if result else "❌ FAIL"
        print(f"{status} {test_name}")
    
    print(f"\nOverall: {passed}/{total} tests passed")
    
    if passed == total:
        print("πŸŽ‰ All tests passed! Both MCP interfaces are working correctly.")
    else:
        print("⚠️  Some tests failed. Check the server logs for details.")

if __name__ == "__main__":
    main()