Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| MCP Server Stress Test CLI | |
| Usage: | |
| python scripts/mcp_stress_test.py --mode smoke | |
| python scripts/mcp_stress_test.py --mode standard --output reports/ | |
| python scripts/mcp_stress_test.py --mode stress --batch-size 100 | |
| """ | |
| import asyncio | |
| import argparse | |
| import json | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from datetime import datetime | |
| # Add project root to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from tests.mcp_reliability.company_sampler import CompanySampler, SamplingStrategy | |
| from tests.mcp_reliability.rate_limiter import get_rate_limiter_registry | |
| from tests.mcp_reliability.circuit_breaker import get_circuit_breaker_registry | |
| from tests.mcp_reliability.test_stress import MCPTestRunner, TestConfig | |
| # Test mode configurations | |
| TEST_MODES = { | |
| "smoke": { | |
| "batch_size": 5, | |
| "sampling_strategy": "uniform", | |
| "max_concurrent": 3, | |
| "description": "Quick smoke test for basic functionality" | |
| }, | |
| "standard": { | |
| "batch_size": 50, | |
| "sampling_strategy": "mixed", | |
| "max_concurrent": 5, | |
| "description": "Standard reliability test" | |
| }, | |
| "stress": { | |
| "batch_size": 100, | |
| "sampling_strategy": "uniform", | |
| "max_concurrent": 10, | |
| "request_interval_ms": 50, | |
| "description": "High-load stress test" | |
| }, | |
| "soak": { | |
| "batch_size": 200, | |
| "sampling_strategy": "stratified", | |
| "max_concurrent": 5, | |
| "description": "Long-running soak test" | |
| } | |
| } | |
| def print_banner(): | |
| """Print CLI banner.""" | |
| print(""" | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β MCP Server Stress Test Framework β | |
| β High-frequency reliability testing for MCP servers β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """) | |
| def print_summary(summary: dict): | |
| """Print formatted test summary.""" | |
| print("\n" + "="*60) | |
| print(" TEST SUMMARY") | |
| print("="*60) | |
| print(f"\nTotal Requests: {summary['total']}") | |
| print(f"Success Rate: {summary['success_rate']*100:.1f}%") | |
| print(f"Fallback Rate: {summary['fallback_rate']*100:.1f}%") | |
| print(f"Failure Rate: {summary['failure_rate']*100:.1f}%") | |
| print(f"\nLatency Percentiles:") | |
| print(f" P50: {summary['latency_p50']:.0f}ms") | |
| print(f" P95: {summary['latency_p95']:.0f}ms") | |
| print(f" P99: {summary['latency_p99']:.0f}ms") | |
| print(f"\nResults by Category:") | |
| for category, count in summary['by_category'].items(): | |
| if count > 0: | |
| pct = count / summary['total'] * 100 | |
| print(f" {category:15s}: {count:4d} ({pct:5.1f}%)") | |
| print(f"\nResults by Server:") | |
| for server, cats in summary['by_server'].items(): | |
| total = sum(cats.values()) | |
| success = cats.get('success', 0) + cats.get('partial', 0) + cats.get('fallback', 0) | |
| rate = success / total * 100 if total > 0 else 0 | |
| print(f" {server:20s}: {rate:5.1f}% success ({total} requests)") | |
| # Circuit breaker status | |
| cb_status = summary.get('circuit_breaker_status', {}) | |
| open_breakers = [name for name, s in cb_status.items() if s.get('state') == 'open'] | |
| if open_breakers: | |
| print(f"\nβ οΈ Open Circuit Breakers: {', '.join(open_breakers)}") | |
| print("\n" + "="*60) | |
| def check_exit_criteria(summary: dict, mode: str) -> bool: | |
| """Check if test results meet exit criteria.""" | |
| criteria = { | |
| "smoke": {"success_rate": 0.95, "p99_latency": 5000, "failure_rate": 0.0}, | |
| "standard": {"success_rate": 0.90, "p99_latency": 10000, "failure_rate": 0.05}, | |
| "stress": {"success_rate": 0.85, "p99_latency": 15000, "failure_rate": 0.10}, | |
| "soak": {"success_rate": 0.85, "p99_latency": 15000, "failure_rate": 0.10} | |
| } | |
| c = criteria.get(mode, criteria["standard"]) | |
| passed = True | |
| print("\nExit Criteria Check:") | |
| effective_success = summary['success_rate'] + summary['fallback_rate'] | |
| if effective_success >= c["success_rate"]: | |
| print(f" β Success rate {effective_success*100:.1f}% >= {c['success_rate']*100:.0f}%") | |
| else: | |
| print(f" β Success rate {effective_success*100:.1f}% < {c['success_rate']*100:.0f}%") | |
| passed = False | |
| if summary['latency_p99'] <= c["p99_latency"]: | |
| print(f" β P99 latency {summary['latency_p99']:.0f}ms <= {c['p99_latency']}ms") | |
| else: | |
| print(f" β P99 latency {summary['latency_p99']:.0f}ms > {c['p99_latency']}ms") | |
| passed = False | |
| if summary['failure_rate'] <= c["failure_rate"]: | |
| print(f" β Failure rate {summary['failure_rate']*100:.1f}% <= {c['failure_rate']*100:.0f}%") | |
| else: | |
| print(f" β Failure rate {summary['failure_rate']*100:.1f}% > {c['failure_rate']*100:.0f}%") | |
| passed = False | |
| return passed | |
| async def run_test(config: TestConfig) -> dict: | |
| """Run the stress test with given configuration.""" | |
| runner = MCPTestRunner(config) | |
| return await runner.run() | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="MCP Server Stress Test CLI", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python scripts/mcp_stress_test.py --mode smoke | |
| python scripts/mcp_stress_test.py --mode standard --output reports/ | |
| python scripts/mcp_stress_test.py --batch-size 100 --strategy stratified | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--mode", "-m", | |
| choices=list(TEST_MODES.keys()), | |
| default="standard", | |
| help="Test mode (default: standard)" | |
| ) | |
| parser.add_argument( | |
| "--batch-size", "-b", | |
| type=int, | |
| help="Override batch size" | |
| ) | |
| parser.add_argument( | |
| "--strategy", "-s", | |
| choices=["uniform", "stratified", "edge_case", "mixed"], | |
| help="Override sampling strategy" | |
| ) | |
| parser.add_argument( | |
| "--max-concurrent", "-c", | |
| type=int, | |
| help="Override max concurrent requests" | |
| ) | |
| parser.add_argument( | |
| "--seed", | |
| type=int, | |
| help="Random seed for reproducibility" | |
| ) | |
| parser.add_argument( | |
| "--output", "-o", | |
| type=Path, | |
| help="Output directory for results" | |
| ) | |
| parser.add_argument( | |
| "--json", | |
| action="store_true", | |
| help="Output results as JSON only" | |
| ) | |
| parser.add_argument( | |
| "--servers", | |
| nargs="+", | |
| help="Specific servers to test" | |
| ) | |
| args = parser.parse_args() | |
| if not args.json: | |
| print_banner() | |
| # Build configuration | |
| mode_config = TEST_MODES[args.mode] | |
| config = TestConfig( | |
| batch_size=args.batch_size or mode_config["batch_size"], | |
| sampling_strategy=args.strategy or mode_config["sampling_strategy"], | |
| max_concurrent=args.max_concurrent or mode_config.get("max_concurrent", 5), | |
| request_interval_ms=mode_config.get("request_interval_ms", 200), | |
| seed=args.seed or int(time.time()), | |
| servers=args.servers | |
| ) | |
| if not args.json: | |
| print(f"Mode: {args.mode} - {mode_config['description']}") | |
| print(f"Batch size: {config.batch_size}") | |
| print(f"Strategy: {config.sampling_strategy}") | |
| print(f"Max concurrent: {config.max_concurrent}") | |
| print(f"Seed: {config.seed}") | |
| print("-"*60) | |
| # Run test | |
| summary = asyncio.run(run_test(config)) | |
| # Output results | |
| if args.json: | |
| print(json.dumps(summary, indent=2)) | |
| else: | |
| print_summary(summary) | |
| passed = check_exit_criteria(summary, args.mode) | |
| if passed: | |
| print("\nβ TEST PASSED") | |
| else: | |
| print("\nβ TEST FAILED") | |
| # Save results if output specified | |
| if args.output: | |
| args.output.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| summary_path = args.output / f"mcp_test_{args.mode}_{timestamp}.json" | |
| with open(summary_path, "w") as f: | |
| json.dump(summary, f, indent=2) | |
| if not args.json: | |
| print(f"\nResults saved to: {summary_path}") | |
| # Exit with appropriate code | |
| if not args.json: | |
| passed = check_exit_criteria(summary, args.mode) | |
| sys.exit(0 if passed else 1) | |
| if __name__ == "__main__": | |
| main() | |