""" Demo script for MCP Orchestration Platform with sample servers and integration examples """ import asyncio import json import logging from datetime import datetime from typing import Dict, Any from orchestration_platform.mcp_orchestrator import MCPOrchestrator from orchestration_platform.examples.integration_examples import IntegrationOrchestrator class OrchestrationDemo: """Demo orchestrator for the MCP platform.""" def __init__(self): self.orchestrator = None self.integration_examples = None async def setup(self): """Setup the orchestrator and sample servers.""" print("๐Ÿ”ง Setting up MCP Orchestration Platform...") # Initialize orchestrator self.orchestrator = MCPOrchestrator() await self.orchestrator.initialize() # Start sample servers in the background try: from orchestration_platform.sample_servers.weather_server import WeatherMCPServer from orchestration_platform.sample_servers.crm_server import CRMMCPServer # Start weather server self.weather_server = WeatherMCPServer(port=8001) self.weather_task = asyncio.create_task(self._start_server(self.weather_server)) # Start CRM server self.crm_server = CRMMCPServer(port=8002) self.crm_task = asyncio.create_task(self._start_server(self.crm_server)) # Wait a moment for servers to start await asyncio.sleep(2) print("โœ… Sample servers started") # Register servers with orchestrator await self.orchestrator.add_server("weather-server", "http://localhost:8001/mcp") await self.orchestrator.add_server("crm-server", "http://localhost:8002/mcp") print("โœ… Servers registered with orchestrator") # Initialize integration examples self.integration_examples = IntegrationOrchestrator(self.orchestrator) except Exception as e: print(f"โŒ Error setting up servers: {e}") raise async def _start_server(self, server): """Start a server and keep it running.""" try: runner = await server.start_server() await server.stop_server(runner) except Exception as e: print(f"Server {server.__class__.__name__} error: {e}") async def run_quick_demo(self): """Run a quick demonstration of key features.""" print("\n๐Ÿš€ Running Quick Demo...") print("=" * 50) # Demo 1: Basic tool discovery print("\n1. ๐Ÿ“‹ Server Discovery and Tool Cataloging") tools = await self.orchestrator.list_all_tools() print(f" Found {len(tools)} tools across {len(self.orchestrator.servers)} servers") # Show a few sample tools for server_name, server_tools in tools.items(): print(f" {server_name}: {len(server_tools)} tools") for tool in server_tools[:2]: # Show first 2 tools print(f" - {tool['name']}: {tool['description'][:50]}...") # Demo 2: Weather integration print("\n2. ๐ŸŒค๏ธ Weather Data Integration") weather_result = await self.orchestrator.call_tool("weather-server", "get_current_weather", { "location": "New York" }) weather_data = json.loads(weather_result["content"][0]["text"]) print(f" New York: {weather_data['conditions']}, {weather_data['temperature']}ยฐC") print(f" Humidity: {weather_data['humidity']}%, Wind: {weather_data['wind_speed']} km/h") # Demo 3: CRM operations print("\n3. ๐Ÿ‘ฅ CRM Operations") # Add a customer customer_data = { "name": "Demo Customer", "email": "demo@example.com", "phone": "+1-555-0123", "status": "active", "tags": ["demo", "integration-test"], "lifetime_value": 25000.0 } customer_result = await self.orchestrator.call_tool("crm-server", "add_customer", customer_data) customer_info = json.loads(customer_result["content"][0]["text"]) print(f" โœ… Customer created: {customer_info['name']} (ID: {customer_info['id'][:8]}...)") # Get CRM metrics metrics_result = await self.orchestrator.call_tool("crm-server", "get_crm_metrics", {}) metrics = json.loads(metrics_result["content"][0]["text"]) print(f" ๐Ÿ“Š Active customers: {metrics['customers']['active']}") print(f" ๐Ÿ’ฐ Pipeline value: ${metrics['opportunities']['pipeline_value']:,.2f}") # Demo 4: Integration workflow print("\n4. ๐Ÿ”„ Integration Workflow Demo") try: # Run customer intake workflow result = await self.integration_examples.run_example("customer_intake_workflow") print(f" โœ… Workflow completed: {result['workflow']}") print(f" ๐Ÿ‘ค Customer ID: {result['customer_id'][:8]}...") print(f" ๐Ÿ’ผ Opportunity created: {result['opportunity_created']}") except Exception as e: print(f" โš ๏ธ Workflow demo failed: {e}") print("\n๐ŸŽ‰ Quick Demo Complete!") async def run_full_demo(self): """Run the complete demonstration including all integration examples.""" print("\n๐ŸŽฌ Running Full Integration Demo...") print("=" * 60) # Run all integration examples results = await self.integration_examples.run_all_examples() print(f"\n๐Ÿ“Š Execution Summary:") summary = results["execution_summary"] print(f" Total examples: {summary['total_examples']}") print(f" โœ… Successful: {summary['successful']}") print(f" โŒ Failed: {summary['failed']}") print(f"\n๐Ÿ“ Detailed Results:") for example_name, result in results["results"].items(): status_icon = "โœ…" if result["status"] == "success" else "โŒ" print(f" {status_icon} {example_name}") if result["status"] == "failed": print(f" Error: {result['error']}") async def run_interactive_mode(self): """Run in interactive mode for manual testing.""" print("\n๐ŸŽฎ Interactive Mode Started") print("Type 'help' for available commands, 'quit' to exit") print("-" * 50) while True: try: command = input("\n> ").strip().lower() if command == "quit" or command == "exit": break elif command == "help": self._show_help() elif command == "tools": await self._list_tools() elif command == "servers": await self._list_servers() elif command == "demo": await self.run_quick_demo() elif command.startswith("call "): await self._handle_tool_call(command) else: print(f"Unknown command: {command}. Type 'help' for available commands.") except KeyboardInterrupt: break except Exception as e: print(f"Error: {e}") def _show_help(self): """Show available commands.""" print("\nAvailable commands:") print(" help - Show this help") print(" tools - List all available tools") print(" servers - List connected servers") print(" demo - Run quick demo") print(" call - Call a tool (e.g., call weather-server get_current_weather {\"location\": \"New York\"})") print(" quit - Exit interactive mode") async def _list_tools(self): """List all available tools.""" tools = await self.orchestrator.list_all_tools() for server_name, server_tools in tools.items(): print(f"\n{server_name}:") for tool in server_tools: print(f" - {tool['name']}: {tool['description']}") async def _list_servers(self): """List connected servers.""" print("\nConnected servers:") for name, server in self.orchestrator.servers.items(): status = await server.health_check() print(f" - {name}: {status['status']}") async def _handle_tool_call(self, command): """Handle tool call command.""" try: # Parse the command parts = command.split(None, 2) if len(parts) < 3: print("Usage: call ") return server_name = parts[1] tool_name = parts[2] args_str = " ".join(parts[2:]) if len(parts) > 2 else "{}" # Try to parse arguments try: args = json.loads(args_str) except json.JSONDecodeError: print("Invalid JSON arguments") return # Make the tool call result = await self.orchestrator.call_tool(server_name, tool_name, args) print(json.dumps(json.loads(result["content"][0]["text"]), indent=2)) except Exception as e: print(f"Tool call failed: {e}") async def cleanup(self): """Cleanup resources.""" print("\n๐Ÿงน Cleaning up...") if hasattr(self, 'weather_task') and not self.weather_task.done(): self.weather_task.cancel() try: await self.weather_task except asyncio.CancelledError: pass if hasattr(self, 'crm_task') and not self.crm_task.done(): self.crm_task.cancel() try: await self.crm_task except asyncio.CancelledError: pass if self.orchestrator: await self.orchestrator.cleanup() print("โœ… Cleanup complete") async def main(): """Main demo entry point.""" print("๐ŸŒŸ MCP Orchestration Platform Demo") print("=" * 60) print("This demo showcases the full capabilities of the MCP orchestration platform") print("including sample servers and integration examples.") demo = OrchestrationDemo() try: # Setup await demo.setup() # Choose demo mode print("\nSelect demo mode:") print("1. Quick Demo (basic features)") print("2. Full Demo (all integration examples)") print("3. Interactive Mode (manual testing)") choice = input("\nEnter choice (1-3): ").strip() if choice == "1": await demo.run_quick_demo() elif choice == "2": await demo.run_full_demo() elif choice == "3": await demo.run_interactive_mode() else: print("Running quick demo by default...") await demo.run_quick_demo() except KeyboardInterrupt: print("\n\nโน๏ธ Demo interrupted by user") except Exception as e: print(f"\nโŒ Demo failed: {e}") import traceback traceback.print_exc() finally: await demo.cleanup() print("\n๐Ÿ‘‹ Demo complete! Thank you for trying the MCP Orchestration Platform.") if __name__ == "__main__": asyncio.run(main())