Researcher-Agent / scripts /mcp_stress_test.py
vn6295337's picture
Add schema normalizers for MCP output consistency
26c5c2f
#!/usr/bin/env python3
"""
MCP Server Stress Test CLI
Usage:
python scripts/mcp_stress_test.py --mode smoke
python scripts/mcp_stress_test.py --mode standard --output reports/
python scripts/mcp_stress_test.py --mode stress --batch-size 100
"""
import asyncio
import argparse
import json
import sys
import time
from pathlib import Path
from datetime import datetime
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tests.mcp_reliability.company_sampler import CompanySampler, SamplingStrategy
from tests.mcp_reliability.rate_limiter import get_rate_limiter_registry
from tests.mcp_reliability.circuit_breaker import get_circuit_breaker_registry
from tests.mcp_reliability.test_stress import MCPTestRunner, TestConfig
# Test mode configurations
TEST_MODES = {
"smoke": {
"batch_size": 5,
"sampling_strategy": "uniform",
"max_concurrent": 3,
"description": "Quick smoke test for basic functionality"
},
"standard": {
"batch_size": 50,
"sampling_strategy": "mixed",
"max_concurrent": 5,
"description": "Standard reliability test"
},
"stress": {
"batch_size": 100,
"sampling_strategy": "uniform",
"max_concurrent": 10,
"request_interval_ms": 50,
"description": "High-load stress test"
},
"soak": {
"batch_size": 200,
"sampling_strategy": "stratified",
"max_concurrent": 5,
"description": "Long-running soak test"
}
}
def print_banner():
"""Print CLI banner."""
print("""
╔═══════════════════════════════════════════════════════════╗
β•‘ MCP Server Stress Test Framework β•‘
β•‘ High-frequency reliability testing for MCP servers β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
""")
def print_summary(summary: dict):
"""Print formatted test summary."""
print("\n" + "="*60)
print(" TEST SUMMARY")
print("="*60)
print(f"\nTotal Requests: {summary['total']}")
print(f"Success Rate: {summary['success_rate']*100:.1f}%")
print(f"Fallback Rate: {summary['fallback_rate']*100:.1f}%")
print(f"Failure Rate: {summary['failure_rate']*100:.1f}%")
print(f"\nLatency Percentiles:")
print(f" P50: {summary['latency_p50']:.0f}ms")
print(f" P95: {summary['latency_p95']:.0f}ms")
print(f" P99: {summary['latency_p99']:.0f}ms")
print(f"\nResults by Category:")
for category, count in summary['by_category'].items():
if count > 0:
pct = count / summary['total'] * 100
print(f" {category:15s}: {count:4d} ({pct:5.1f}%)")
print(f"\nResults by Server:")
for server, cats in summary['by_server'].items():
total = sum(cats.values())
success = cats.get('success', 0) + cats.get('partial', 0) + cats.get('fallback', 0)
rate = success / total * 100 if total > 0 else 0
print(f" {server:20s}: {rate:5.1f}% success ({total} requests)")
# Circuit breaker status
cb_status = summary.get('circuit_breaker_status', {})
open_breakers = [name for name, s in cb_status.items() if s.get('state') == 'open']
if open_breakers:
print(f"\n⚠️ Open Circuit Breakers: {', '.join(open_breakers)}")
print("\n" + "="*60)
def check_exit_criteria(summary: dict, mode: str) -> bool:
"""Check if test results meet exit criteria."""
criteria = {
"smoke": {"success_rate": 0.95, "p99_latency": 5000, "failure_rate": 0.0},
"standard": {"success_rate": 0.90, "p99_latency": 10000, "failure_rate": 0.05},
"stress": {"success_rate": 0.85, "p99_latency": 15000, "failure_rate": 0.10},
"soak": {"success_rate": 0.85, "p99_latency": 15000, "failure_rate": 0.10}
}
c = criteria.get(mode, criteria["standard"])
passed = True
print("\nExit Criteria Check:")
effective_success = summary['success_rate'] + summary['fallback_rate']
if effective_success >= c["success_rate"]:
print(f" βœ“ Success rate {effective_success*100:.1f}% >= {c['success_rate']*100:.0f}%")
else:
print(f" βœ— Success rate {effective_success*100:.1f}% < {c['success_rate']*100:.0f}%")
passed = False
if summary['latency_p99'] <= c["p99_latency"]:
print(f" βœ“ P99 latency {summary['latency_p99']:.0f}ms <= {c['p99_latency']}ms")
else:
print(f" βœ— P99 latency {summary['latency_p99']:.0f}ms > {c['p99_latency']}ms")
passed = False
if summary['failure_rate'] <= c["failure_rate"]:
print(f" βœ“ Failure rate {summary['failure_rate']*100:.1f}% <= {c['failure_rate']*100:.0f}%")
else:
print(f" βœ— Failure rate {summary['failure_rate']*100:.1f}% > {c['failure_rate']*100:.0f}%")
passed = False
return passed
async def run_test(config: TestConfig) -> dict:
"""Run the stress test with given configuration."""
runner = MCPTestRunner(config)
return await runner.run()
def main():
parser = argparse.ArgumentParser(
description="MCP Server Stress Test CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python scripts/mcp_stress_test.py --mode smoke
python scripts/mcp_stress_test.py --mode standard --output reports/
python scripts/mcp_stress_test.py --batch-size 100 --strategy stratified
"""
)
parser.add_argument(
"--mode", "-m",
choices=list(TEST_MODES.keys()),
default="standard",
help="Test mode (default: standard)"
)
parser.add_argument(
"--batch-size", "-b",
type=int,
help="Override batch size"
)
parser.add_argument(
"--strategy", "-s",
choices=["uniform", "stratified", "edge_case", "mixed"],
help="Override sampling strategy"
)
parser.add_argument(
"--max-concurrent", "-c",
type=int,
help="Override max concurrent requests"
)
parser.add_argument(
"--seed",
type=int,
help="Random seed for reproducibility"
)
parser.add_argument(
"--output", "-o",
type=Path,
help="Output directory for results"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON only"
)
parser.add_argument(
"--servers",
nargs="+",
help="Specific servers to test"
)
args = parser.parse_args()
if not args.json:
print_banner()
# Build configuration
mode_config = TEST_MODES[args.mode]
config = TestConfig(
batch_size=args.batch_size or mode_config["batch_size"],
sampling_strategy=args.strategy or mode_config["sampling_strategy"],
max_concurrent=args.max_concurrent or mode_config.get("max_concurrent", 5),
request_interval_ms=mode_config.get("request_interval_ms", 200),
seed=args.seed or int(time.time()),
servers=args.servers
)
if not args.json:
print(f"Mode: {args.mode} - {mode_config['description']}")
print(f"Batch size: {config.batch_size}")
print(f"Strategy: {config.sampling_strategy}")
print(f"Max concurrent: {config.max_concurrent}")
print(f"Seed: {config.seed}")
print("-"*60)
# Run test
summary = asyncio.run(run_test(config))
# Output results
if args.json:
print(json.dumps(summary, indent=2))
else:
print_summary(summary)
passed = check_exit_criteria(summary, args.mode)
if passed:
print("\nβœ“ TEST PASSED")
else:
print("\nβœ— TEST FAILED")
# Save results if output specified
if args.output:
args.output.mkdir(parents=True, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
summary_path = args.output / f"mcp_test_{args.mode}_{timestamp}.json"
with open(summary_path, "w") as f:
json.dump(summary, f, indent=2)
if not args.json:
print(f"\nResults saved to: {summary_path}")
# Exit with appropriate code
if not args.json:
passed = check_exit_criteria(summary, args.mode)
sys.exit(0 if passed else 1)
if __name__ == "__main__":
main()