network_forensics / test_mcp_interfaces.py
WHOAM-EYE's picture
Upload folder using huggingface_hub
d9ac8a7 verified
#!/usr/bin/env python3
"""
Test script for both MCP interfaces in the Network Forensics Environment.
This script tests both:
1. Simplified MCP interface at /mcp (OpenEnv custom protocol)
2. Standard MCP interface at /mcp-standard (full MCP protocol)
Usage:
python test_mcp_interfaces.py
Requirements:
- Network forensics server running on http://localhost:8000
- Both MCP interfaces mounted and accessible
"""
import json
import requests
import websocket
import time
from typing import Dict, Any
# Server configuration
BASE_URL = "http://localhost:8000"
SIMPLIFIED_MCP_URL = f"{BASE_URL}/mcp"
STANDARD_MCP_URL = f"{BASE_URL}/mcp-standard"
STANDARD_MCP_WS_URL = "ws://localhost:8000/mcp-standard/ws"
def test_simplified_mcp():
"""Test the simplified MCP interface (OpenEnv custom protocol)."""
print("=== Testing Simplified MCP Interface ===")
try:
# Test health check
health_resp = requests.get(f"{BASE_URL}/health")
print(f"βœ“ Health check: {health_resp.status_code} - {health_resp.json()}")
# Test MCP info endpoint
info_resp = requests.get(f"{BASE_URL}/mcp-info")
if info_resp.status_code == 200:
print(f"βœ“ MCP info available: {len(info_resp.json().get('mcp_interfaces', {}))} interfaces")
# Test simplified MCP (this would normally use WebSocket, but we'll test HTTP availability)
print("βœ“ Simplified MCP interface available at /mcp")
return True
except Exception as e:
print(f"βœ— Simplified MCP test failed: {e}")
return False
def test_standard_mcp_http():
"""Test the standard MCP interface via HTTP."""
print("\n=== Testing Standard MCP Interface (HTTP) ===")
try:
# Test standard MCP health
health_resp = requests.get(f"{STANDARD_MCP_URL}/health")
print(f"βœ“ Standard MCP health: {health_resp.status_code} - {health_resp.json()}")
# Test initialize
init_payload = {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"}
}
init_resp = requests.post(f"{STANDARD_MCP_URL}/initialize", json=init_payload)
if init_resp.status_code == 200:
print(f"βœ“ Initialize successful: {init_resp.json().get('serverInfo', {}).get('name')}")
else:
print(f"βœ— Initialize failed: {init_resp.status_code} - {init_resp.text}")
return False
# Test tools/list
tools_resp = requests.post(f"{STANDARD_MCP_URL}/tools/list", json={})
if tools_resp.status_code == 200:
tools = tools_resp.json().get('tools', [])
print(f"βœ“ Tools list: {len(tools)} tools available")
for tool in tools[:3]: # Show first 3 tools
print(f" - {tool.get('name')}: {tool.get('description', '')[:50]}...")
else:
print(f"βœ— Tools list failed: {tools_resp.status_code}")
return False
return True
except Exception as e:
print(f"βœ— Standard MCP HTTP test failed: {e}")
return False
def test_standard_mcp_websocket():
"""Test the standard MCP interface via WebSocket."""
print("\n=== Testing Standard MCP Interface (WebSocket) ===")
try:
ws = websocket.create_connection(STANDARD_MCP_WS_URL)
print("βœ“ WebSocket connection established")
# Test initialize via WebSocket
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"}
}
}
ws.send(json.dumps(init_request))
init_response = json.loads(ws.recv())
if "result" in init_response:
print(f"βœ“ WebSocket initialize successful: {init_response['result'].get('serverInfo', {}).get('name')}")
else:
print(f"βœ— WebSocket initialize failed: {init_response.get('error', 'Unknown error')}")
ws.close()
return False
# Test tools/list via WebSocket
tools_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
ws.send(json.dumps(tools_request))
tools_response = json.loads(ws.recv())
if "result" in tools_response:
tools = tools_response["result"].get("tools", [])
print(f"βœ“ WebSocket tools list: {len(tools)} tools available")
else:
print(f"βœ— WebSocket tools list failed: {tools_response.get('error', 'Unknown error')}")
ws.close()
return True
except Exception as e:
print(f"βœ— Standard MCP WebSocket test failed: {e}")
return False
def test_forensics_workflow():
"""Test a complete forensics workflow using standard MCP."""
print("\n=== Testing Complete Forensics Workflow ===")
try:
# Initialize environment
init_resp = requests.post(f"{STANDARD_MCP_URL}/initialize", json={
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "workflow-test", "version": "1.0.0"}
})
if init_resp.status_code != 200:
print(f"βœ— Workflow initialization failed")
return False
# Get available tools
tools_resp = requests.post(f"{STANDARD_MCP_URL}/tools/list", json={})
tools = tools_resp.json().get('tools', [])
# Test a simple workflow
print(f"βœ“ Starting forensics workflow with {len(tools)} tools")
# Reset environment
reset_resp = requests.post(f"{STANDARD_MCP_URL}/tools/call", json={
"name": "reset_env",
"arguments": {"task_id": "easy"}
})
if reset_resp.status_code == 200:
print("βœ“ Environment reset for easy task")
else:
print(f"βœ— Environment reset failed: {reset_resp.status_code}")
return False
# Get status
status_resp = requests.post(f"{STANDARD_MCP_URL}/tools/call", json={
"name": "get_status",
"arguments": {}
})
if status_resp.status_code == 200:
print("βœ“ Status retrieved successfully")
else:
print(f"βœ— Status retrieval failed: {status_resp.status_code}")
return True
except Exception as e:
print(f"βœ— Workflow test failed: {e}")
return False
def main():
"""Run all MCP interface tests."""
print("Network Forensics MCP Interface Test Suite")
print("=" * 50)
# Check if server is running
try:
health_resp = requests.get(f"{BASE_URL}/health", timeout=5)
if health_resp.status_code != 200:
print(f"❌ Server not responding properly: {health_resp.status_code}")
print("Please ensure the server is running: python -m server.app")
return
except requests.exceptions.RequestException as e:
print(f"❌ Cannot connect to server at {BASE_URL}")
print("Please start the server: python -m server.app")
return
print(f"βœ“ Server detected at {BASE_URL}")
print()
# Run tests
results = []
# Test simplified MCP
results.append(("Simplified MCP", test_simplified_mcp()))
# Test standard MCP HTTP
results.append(("Standard MCP (HTTP)", test_standard_mcp_http()))
# Test standard MCP WebSocket
results.append(("Standard MCP (WebSocket)", test_standard_mcp_websocket()))
# Test complete workflow
results.append(("Forensics Workflow", test_forensics_workflow()))
# Summary
print("\n" + "=" * 50)
print("Test Summary:")
print("=" * 50)
passed = sum(1 for _, result in results if result)
total = len(results)
for test_name, result in results:
status = "βœ… PASS" if result else "❌ FAIL"
print(f"{status} {test_name}")
print(f"\nOverall: {passed}/{total} tests passed")
if passed == total:
print("πŸŽ‰ All tests passed! Both MCP interfaces are working correctly.")
else:
print("⚠️ Some tests failed. Check the server logs for details.")
if __name__ == "__main__":
main()