|
|
""" |
|
|
Demo script for MCP Orchestration Platform with sample servers and integration examples |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from typing import Dict, Any |
|
|
|
|
|
from orchestration_platform.mcp_orchestrator import MCPOrchestrator |
|
|
from orchestration_platform.examples.integration_examples import IntegrationOrchestrator |
|
|
|
|
|
|
|
|
class OrchestrationDemo: |
|
|
"""Demo orchestrator for the MCP platform.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.orchestrator = None |
|
|
self.integration_examples = None |
|
|
|
|
|
async def setup(self): |
|
|
"""Setup the orchestrator and sample servers.""" |
|
|
print("🔧 Setting up MCP Orchestration Platform...") |
|
|
|
|
|
|
|
|
self.orchestrator = MCPOrchestrator() |
|
|
await self.orchestrator.initialize() |
|
|
|
|
|
|
|
|
try: |
|
|
from orchestration_platform.sample_servers.weather_server import WeatherMCPServer |
|
|
from orchestration_platform.sample_servers.crm_server import CRMMCPServer |
|
|
|
|
|
|
|
|
self.weather_server = WeatherMCPServer(port=8001) |
|
|
self.weather_task = asyncio.create_task(self._start_server(self.weather_server)) |
|
|
|
|
|
|
|
|
self.crm_server = CRMMCPServer(port=8002) |
|
|
self.crm_task = asyncio.create_task(self._start_server(self.crm_server)) |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
print("✅ Sample servers started") |
|
|
|
|
|
|
|
|
await self.orchestrator.add_server("weather-server", "http://localhost:8001/mcp") |
|
|
await self.orchestrator.add_server("crm-server", "http://localhost:8002/mcp") |
|
|
|
|
|
print("✅ Servers registered with orchestrator") |
|
|
|
|
|
|
|
|
self.integration_examples = IntegrationOrchestrator(self.orchestrator) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error setting up servers: {e}") |
|
|
raise |
|
|
|
|
|
async def _start_server(self, server): |
|
|
"""Start a server and keep it running.""" |
|
|
try: |
|
|
runner = await server.start_server() |
|
|
await server.stop_server(runner) |
|
|
except Exception as e: |
|
|
print(f"Server {server.__class__.__name__} error: {e}") |
|
|
|
|
|
async def run_quick_demo(self): |
|
|
"""Run a quick demonstration of key features.""" |
|
|
print("\n🚀 Running Quick Demo...") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
print("\n1. 📋 Server Discovery and Tool Cataloging") |
|
|
tools = await self.orchestrator.list_all_tools() |
|
|
print(f" Found {len(tools)} tools across {len(self.orchestrator.servers)} servers") |
|
|
|
|
|
|
|
|
for server_name, server_tools in tools.items(): |
|
|
print(f" {server_name}: {len(server_tools)} tools") |
|
|
for tool in server_tools[:2]: |
|
|
print(f" - {tool['name']}: {tool['description'][:50]}...") |
|
|
|
|
|
|
|
|
print("\n2. 🌤️ Weather Data Integration") |
|
|
weather_result = await self.orchestrator.call_tool("weather-server", "get_current_weather", { |
|
|
"location": "New York" |
|
|
}) |
|
|
weather_data = json.loads(weather_result["content"][0]["text"]) |
|
|
print(f" New York: {weather_data['conditions']}, {weather_data['temperature']}°C") |
|
|
print(f" Humidity: {weather_data['humidity']}%, Wind: {weather_data['wind_speed']} km/h") |
|
|
|
|
|
|
|
|
print("\n3. 👥 CRM Operations") |
|
|
|
|
|
|
|
|
customer_data = { |
|
|
"name": "Demo Customer", |
|
|
"email": "demo@example.com", |
|
|
"phone": "+1-555-0123", |
|
|
"status": "active", |
|
|
"tags": ["demo", "integration-test"], |
|
|
"lifetime_value": 25000.0 |
|
|
} |
|
|
|
|
|
customer_result = await self.orchestrator.call_tool("crm-server", "add_customer", customer_data) |
|
|
customer_info = json.loads(customer_result["content"][0]["text"]) |
|
|
print(f" ✅ Customer created: {customer_info['name']} (ID: {customer_info['id'][:8]}...)") |
|
|
|
|
|
|
|
|
metrics_result = await self.orchestrator.call_tool("crm-server", "get_crm_metrics", {}) |
|
|
metrics = json.loads(metrics_result["content"][0]["text"]) |
|
|
print(f" 📊 Active customers: {metrics['customers']['active']}") |
|
|
print(f" 💰 Pipeline value: ${metrics['opportunities']['pipeline_value']:,.2f}") |
|
|
|
|
|
|
|
|
print("\n4. 🔄 Integration Workflow Demo") |
|
|
try: |
|
|
|
|
|
result = await self.integration_examples.run_example("customer_intake_workflow") |
|
|
print(f" ✅ Workflow completed: {result['workflow']}") |
|
|
print(f" 👤 Customer ID: {result['customer_id'][:8]}...") |
|
|
print(f" 💼 Opportunity created: {result['opportunity_created']}") |
|
|
except Exception as e: |
|
|
print(f" ⚠️ Workflow demo failed: {e}") |
|
|
|
|
|
print("\n🎉 Quick Demo Complete!") |
|
|
|
|
|
async def run_full_demo(self): |
|
|
"""Run the complete demonstration including all integration examples.""" |
|
|
print("\n🎬 Running Full Integration Demo...") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
results = await self.integration_examples.run_all_examples() |
|
|
|
|
|
print(f"\n📊 Execution Summary:") |
|
|
summary = results["execution_summary"] |
|
|
print(f" Total examples: {summary['total_examples']}") |
|
|
print(f" ✅ Successful: {summary['successful']}") |
|
|
print(f" ❌ Failed: {summary['failed']}") |
|
|
|
|
|
print(f"\n📝 Detailed Results:") |
|
|
for example_name, result in results["results"].items(): |
|
|
status_icon = "✅" if result["status"] == "success" else "❌" |
|
|
print(f" {status_icon} {example_name}") |
|
|
if result["status"] == "failed": |
|
|
print(f" Error: {result['error']}") |
|
|
|
|
|
async def run_interactive_mode(self): |
|
|
"""Run in interactive mode for manual testing.""" |
|
|
print("\n🎮 Interactive Mode Started") |
|
|
print("Type 'help' for available commands, 'quit' to exit") |
|
|
print("-" * 50) |
|
|
|
|
|
while True: |
|
|
try: |
|
|
command = input("\n> ").strip().lower() |
|
|
|
|
|
if command == "quit" or command == "exit": |
|
|
break |
|
|
elif command == "help": |
|
|
self._show_help() |
|
|
elif command == "tools": |
|
|
await self._list_tools() |
|
|
elif command == "servers": |
|
|
await self._list_servers() |
|
|
elif command == "demo": |
|
|
await self.run_quick_demo() |
|
|
elif command.startswith("call "): |
|
|
await self._handle_tool_call(command) |
|
|
else: |
|
|
print(f"Unknown command: {command}. Type 'help' for available commands.") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"Error: {e}") |
|
|
|
|
|
def _show_help(self): |
|
|
"""Show available commands.""" |
|
|
print("\nAvailable commands:") |
|
|
print(" help - Show this help") |
|
|
print(" tools - List all available tools") |
|
|
print(" servers - List connected servers") |
|
|
print(" demo - Run quick demo") |
|
|
print(" call - Call a tool (e.g., call weather-server get_current_weather {\"location\": \"New York\"})") |
|
|
print(" quit - Exit interactive mode") |
|
|
|
|
|
async def _list_tools(self): |
|
|
"""List all available tools.""" |
|
|
tools = await self.orchestrator.list_all_tools() |
|
|
for server_name, server_tools in tools.items(): |
|
|
print(f"\n{server_name}:") |
|
|
for tool in server_tools: |
|
|
print(f" - {tool['name']}: {tool['description']}") |
|
|
|
|
|
async def _list_servers(self): |
|
|
"""List connected servers.""" |
|
|
print("\nConnected servers:") |
|
|
for name, server in self.orchestrator.servers.items(): |
|
|
status = await server.health_check() |
|
|
print(f" - {name}: {status['status']}") |
|
|
|
|
|
async def _handle_tool_call(self, command): |
|
|
"""Handle tool call command.""" |
|
|
try: |
|
|
|
|
|
parts = command.split(None, 2) |
|
|
if len(parts) < 3: |
|
|
print("Usage: call <server> <tool> <arguments>") |
|
|
return |
|
|
|
|
|
server_name = parts[1] |
|
|
tool_name = parts[2] |
|
|
args_str = " ".join(parts[2:]) if len(parts) > 2 else "{}" |
|
|
|
|
|
|
|
|
try: |
|
|
args = json.loads(args_str) |
|
|
except json.JSONDecodeError: |
|
|
print("Invalid JSON arguments") |
|
|
return |
|
|
|
|
|
|
|
|
result = await self.orchestrator.call_tool(server_name, tool_name, args) |
|
|
print(json.dumps(json.loads(result["content"][0]["text"]), indent=2)) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Tool call failed: {e}") |
|
|
|
|
|
async def cleanup(self): |
|
|
"""Cleanup resources.""" |
|
|
print("\n🧹 Cleaning up...") |
|
|
|
|
|
if hasattr(self, 'weather_task') and not self.weather_task.done(): |
|
|
self.weather_task.cancel() |
|
|
try: |
|
|
await self.weather_task |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
|
|
|
if hasattr(self, 'crm_task') and not self.crm_task.done(): |
|
|
self.crm_task.cancel() |
|
|
try: |
|
|
await self.crm_task |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
|
|
|
if self.orchestrator: |
|
|
await self.orchestrator.cleanup() |
|
|
|
|
|
print("✅ Cleanup complete") |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Main demo entry point.""" |
|
|
print("🌟 MCP Orchestration Platform Demo") |
|
|
print("=" * 60) |
|
|
print("This demo showcases the full capabilities of the MCP orchestration platform") |
|
|
print("including sample servers and integration examples.") |
|
|
|
|
|
demo = OrchestrationDemo() |
|
|
|
|
|
try: |
|
|
|
|
|
await demo.setup() |
|
|
|
|
|
|
|
|
print("\nSelect demo mode:") |
|
|
print("1. Quick Demo (basic features)") |
|
|
print("2. Full Demo (all integration examples)") |
|
|
print("3. Interactive Mode (manual testing)") |
|
|
|
|
|
choice = input("\nEnter choice (1-3): ").strip() |
|
|
|
|
|
if choice == "1": |
|
|
await demo.run_quick_demo() |
|
|
elif choice == "2": |
|
|
await demo.run_full_demo() |
|
|
elif choice == "3": |
|
|
await demo.run_interactive_mode() |
|
|
else: |
|
|
print("Running quick demo by default...") |
|
|
await demo.run_quick_demo() |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\n\n⏹️ Demo interrupted by user") |
|
|
except Exception as e: |
|
|
print(f"\n❌ Demo failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
finally: |
|
|
await demo.cleanup() |
|
|
|
|
|
print("\n👋 Demo complete! Thank you for trying the MCP Orchestration Platform.") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |