""" Integration Examples for MCP Orchestration Platform Demonstrates real-world workflows and usage patterns """ import asyncio import json import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import structlog # Configure structured logging logging.basicConfig(level=logging.INFO) logger = structlog.get_logger() class IntegrationExample: """Base class for integration examples.""" def __init__(self, name: str, description: str): self.name = name self.description = description async def run(self, orchestrator): """Run the integration example.""" raise NotImplementedError class CustomerIntakeWorkflow(IntegrationExample): """Demonstrates a complete customer intake workflow using weather and CRM servers.""" def __init__(self): super().__init__( "Customer Intake Workflow", "Complete customer onboarding process using weather and CRM integration" ) async def run(self, orchestrator): """Execute the customer intake workflow.""" logger.info("Starting customer intake workflow") # Step 1: Create lead from website inquiry lead_data = { "name": "Alice Johnson", "email": "alice.johnson@company.com", "phone": "+1-555-0123", "source": "website", "status": "new", "score": 85, "notes": "Interested in enterprise weather services for retail chain" } lead_result = await orchestrator.call_tool("crm-server", "add_lead", lead_data) lead_id = json.loads(lead_result["content"][0]["text"])["id"] # Step 2: Get customer location weather for territory assignment weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", { "location": "New York" }) weather_data = json.loads(weather_result["content"][0]["text"]) # Step 3: Assign lead to sales rep based on weather conditions (simplified logic) assigned_rep = "john.smith" if weather_data["temperature"] > 20 else "sarah.jones" # Step 4: Update lead with assignment lead_update_data = { "assigned_to": assigned_rep, "notes": f"Assigned to {assigned_rep}. Weather in customer location: {weather_data['conditions']}, {weather_data['temperature']}°C" } # Note: In a real implementation, you'd have an update_lead tool logger.info("Lead assigned successfully", lead_id=lead_id, assigned_to=assigned_rep) # Step 5: Add customer after lead qualification customer_data = { "name": "Alice Johnson", "email": "alice.johnson@company.com", "phone": "+1-555-0123", "status": "active", "tags": ["enterprise", "weather-services", "retail"], "notes": f"Converted from lead {lead_id}. Territory: {weather_data['conditions']} region", "lifetime_value": 75000.0 } customer_result = await orchestrator.call_tool("crm-server", "add_customer", customer_data) customer_info = json.loads(customer_result["content"][0]["text"]) # Step 6: Create initial opportunity opportunity_data = { "customer_id": customer_info["id"], "title": "Weather Analytics Platform - Enterprise License", "value": 50000.0, "probability": 75, "stage": "proposal", "close_date": (datetime.utcnow() + timedelta(days=30)).isoformat(), "assigned_to": assigned_rep, "notes": "Multi-location retail chain weather monitoring solution" } opportunity_result = await orchestrator.call_tool("crm-server", "add_opportunity", opportunity_data) return { "workflow": "customer_intake", "lead_id": lead_id, "customer_id": customer_info["id"], "opportunity_created": True, "assigned_rep": assigned_rep, "weather_considered": True } class SalesTerritoryOptimization(IntegrationExample): """Demonstrates territory optimization using weather patterns and sales data.""" def __init__(self): super().__init__( "Sales Territory Optimization", "Optimize sales territories based on weather patterns and CRM data" ) async def run(self, orchestrator): """Execute territory optimization workflow.""" logger.info("Starting sales territory optimization") # Step 1: Get sales pipeline pipeline_result = await orchestrator.call_tool("crm-server", "get_sales_pipeline", {"limit": 100}) pipeline = json.loads(pipeline_result["content"][0]["text"])["pipeline"] # Step 2: Get weather for key customer locations locations = ["New York", "London", "Tokyo", "Los Angeles", "Paris"] weather_data = {} for location in locations: try: weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", { "location": location }) weather_data[location] = json.loads(weather_result["content"][0]["text"]) except Exception as e: logger.warning(f"Could not get weather for {location}: {e}") # Step 3: Analyze pipeline performance by weather conditions weather_performance = {} for opp in pipeline: location = opp.get("customer_name", "Unknown") if location in weather_data: conditions = weather_data[location]["conditions"] if conditions not in weather_performance: weather_performance[conditions] = {"count": 0, "value": 0} weather_performance[conditions]["count"] += 1 weather_performance[conditions]["value"] += opp["value"] # Step 4: Generate territory recommendations recommendations = [] for conditions, data in weather_performance.items(): avg_deal_size = data["value"] / data["count"] recommendations.append({ "weather_conditions": conditions, "opportunity_count": data["count"], "total_pipeline_value": data["value"], "average_deal_size": avg_deal_size, "recommendation": "Expand focus" if avg_deal_size > 25000 else "Maintain current level" }) return { "workflow": "territory_optimization", "weather_performance": weather_performance, "recommendations": recommendations, "analyzed_opportunities": len(pipeline) } class MarketingCampaignAnalysis(IntegrationExample): """Demonstrates marketing campaign effectiveness analysis using CRM and weather data.""" def __init__(self): super().__init__( "Marketing Campaign Analysis", "Analyze marketing campaign effectiveness with weather correlation" ) async def run(self, orchestrator): """Execute marketing campaign analysis workflow.""" logger.info("Starting marketing campaign analysis") # Step 1: Get CRM metrics metrics_result = await orchestrator.call_tool("crm-server", "get_crm_metrics", {}) crm_metrics = json.loads(metrics_result["content"][0]["text"]) # Step 2: Get weather forecasts for key markets key_markets = ["New York", "London", "Tokyo"] forecast_data = {} for market in key_markets: try: forecast_result = await orchestrator.call_tool("weather-server", "get_weather_forecast", { "location": market, "days": 7 }) forecast_data[market] = json.loads(forecast_result["content"][0]["text"]) except Exception as e: logger.warning(f"Could not get forecast for {market}: {e}") # Step 3: Analyze campaign performance correlation with weather campaign_analysis = { "overall_metrics": crm_metrics, "weather_forecasts": forecast_data, "campaign_insights": [] } # Example insights (simplified) for market, forecast in forecast_data.items(): sunny_days = sum(1 for day in forecast["forecast"] if "sunny" in day["conditions"].lower()) rainy_days = sum(1 for day in forecast["forecast"] if "rain" in day["conditions"].lower()) campaign_analysis["campaign_insights"].append({ "market": market, "forecast_summary": { "sunny_days": sunny_days, "rainy_days": rainy_days, "avg_temperature": sum(day["high"] for day in forecast["forecast"]) / len(forecast["forecast"]) }, "recommended_campaigns": { "sunny": "Outdoor events and promotions" if sunny_days > 3 else "Limited outdoor focus", "rainy": "Indoor services and online marketing" if rainy_days > 2 else "Standard campaigns" } }) return campaign_analysis class InventoryPlanningWorkflow(IntegrationExample): """Demonstrates inventory planning using weather forecasts and sales data.""" def __init__(self): super().__init__( "Inventory Planning Workflow", "Plan inventory based on weather forecasts and sales pipeline" ) async def run(self, orchestrator): """Execute inventory planning workflow.""" logger.info("Starting inventory planning workflow") # Step 1: Get sales pipeline to understand upcoming demand pipeline_result = await orchestrator.call_tool("crm-server", "get_sales_pipeline", {"limit": 50}) pipeline = json.loads(pipeline_result["content"][0]["text"])["pipeline"] # Step 2: Get weather forecasts for major markets major_markets = ["New York", "London", "Tokyo", "Los Angeles"] market_weather = {} for market in major_markets: try: forecast_result = await orchestrator.call_tool("weather-server", "get_weather_forecast", { "location": market, "days": 14 # 2-week forecast }) forecast = json.loads(forecast_result["content"][0]["text"]) # Analyze weather patterns temp_trend = [day["high"] for day in forecast["forecast"]] rainy_days = sum(1 for day in forecast["forecast"] if "rain" in day["conditions"].lower()) market_weather[market] = { "temp_trend": temp_trend, "rainy_days": rainy_days, "avg_temp": sum(temp_trend) / len(temp_trend), "forecast": forecast["forecast"] } except Exception as e: logger.warning(f"Could not get forecast for {market}: {e}") # Step 3: Calculate inventory recommendations inventory_recommendations = [] for market, weather in market_weather.items(): # Simple demand forecasting based on weather base_demand = 1000 # Base units temp_factor = 1.0 + (weather["avg_temp"] - 20) * 0.02 # Temperature impact weather_factor = 1.0 + weather["rainy_days"] * 0.05 # Rain impact recommended_inventory = int(base_demand * temp_factor * weather_factor) inventory_recommendations.append({ "market": market, "recommended_inventory": recommended_inventory, "temp_factor": round(temp_factor, 2), "weather_factor": round(weather_factor, 2), "reasoning": f"Based on {weather['rainy_days']} rainy days and avg temp {weather['avg_temp']:.1f}°C" }) # Step 4: Get pipeline insights for strategic planning pipeline_insights = { "total_pipeline_value": sum(opp["value"] for opp in pipeline), "opportunities_by_stage": {}, "high_value_opportunities": [opp for opp in pipeline if opp["value"] > 25000] } for opp in pipeline: stage = opp["stage"] if stage not in pipeline_insights["opportunities_by_stage"]: pipeline_insights["opportunities_by_stage"][stage] = {"count": 0, "value": 0} pipeline_insights["opportunities_by_stage"][stage]["count"] += 1 pipeline_insights["opportunities_by_stage"][stage]["value"] += opp["value"] return { "workflow": "inventory_planning", "inventory_recommendations": inventory_recommendations, "pipeline_insights": pipeline_insights, "forecast_period": "14 days" } class CustomerSuccessMonitoring(IntegrationExample): """Demonstrates customer success monitoring with proactive weather-based outreach.""" def __init__(self): super().__init__( "Customer Success Monitoring", "Monitor customer health and provide proactive weather-based support" ) async def run(self, orchestrator): """Execute customer success monitoring workflow.""" logger.info("Starting customer success monitoring") # Step 1: Get customer metrics from CRM metrics_result = await orchestrator.call_tool("crm-server", "get_crm_metrics", {}) crm_metrics = json.loads(metrics_result["content"][0]["text"]) # Step 2: Search for customers who need attention (simplified) search_result = await orchestrator.call_tool("crm-server", "search_customers", { "query": "priority", "limit": 10 }) priority_customers = json.loads(search_result["content"][0]["text"])["customers"] # Step 3: Get weather alerts for customer locations weather_alerts = [] for customer in priority_customers[:3]: # Limit to 3 for demo try: # Use customer name as location (simplified) weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", { "location": customer["name"].split()[-1] if " " in customer["name"] else customer["name"] }) weather_data = json.loads(weather_result["content"][0]["text"]) # Generate proactive outreach recommendations alert_type = None recommendation = None if weather_data["temperature"] < 5: # Cold weather alert alert_type = "weather_alert" recommendation = "Check heating/cooling system status" elif weather_data["wind_speed"] > 15: # High wind alert alert_type = "weather_alert" recommendation = "Monitor outdoor equipment safety" elif weather_data["humidity"] > 80: # High humidity alert alert_type = "weather_alert" recommendation = "Check for potential moisture issues" if alert_type: weather_alerts.append({ "customer_id": customer["id"], "customer_name": customer["name"], "location": weather_data["location"], "alert_type": alert_type, "current_conditions": f"{weather_data['conditions']}, {weather_data['temperature']}°C", "recommendation": recommendation, "priority": "high" if weather_data["wind_speed"] > 15 else "medium" }) except Exception as e: logger.warning(f"Could not get weather for customer {customer['name']}: {e}") # Step 4: Generate customer success report success_report = { "monitoring_date": datetime.utcnow().isoformat(), "customer_metrics": crm_metrics, "priority_customers_monitored": len(priority_customers), "weather_alerts_generated": len(weather_alerts), "alerts": weather_alerts, "recommended_actions": [ "Follow up on high-priority weather alerts", "Schedule proactive check-ins with priority customers", "Prepare weather-related support materials" ] } return success_report class IntegrationOrchestrator: """Orchestrates multiple integration examples.""" def __init__(self, orchestrator): self.orchestrator = orchestrator self.examples = [ CustomerIntakeWorkflow(), SalesTerritoryOptimization(), MarketingCampaignAnalysis(), InventoryPlanningWorkflow(), CustomerSuccessMonitoring() ] async def run_example(self, example_name: str) -> Dict[str, Any]: """Run a specific integration example.""" for example in self.examples: if example.name.lower().replace(" ", "_") == example_name.lower().replace(" ", "_"): return await example.run(self.orchestrator) raise ValueError(f"Example '{example_name}' not found") async def run_all_examples(self) -> Dict[str, Any]: """Run all integration examples.""" results = {} for example in self.examples: try: logger.info(f"Running example: {example.name}") result = await example.run(self.orchestrator) results[example.name] = { "status": "success", "result": result } except Exception as e: logger.error(f"Example {example.name} failed", error=str(e)) results[example.name] = { "status": "failed", "error": str(e) } return { "execution_summary": { "total_examples": len(self.examples), "successful": sum(1 for r in results.values() if r["status"] == "success"), "failed": sum(1 for r in results.values() if r["status"] == "failed") }, "results": results } def list_examples(self) -> List[Dict[str, str]]: """List available integration examples.""" return [ { "name": example.name, "description": example.description } for example in self.examples ] async def main(): """Main function to demonstrate integration examples.""" # This would be replaced with actual orchestrator initialization print("Integration Examples for MCP Orchestration Platform") print("=" * 60) # Example usage examples = [ "Customer Intake Workflow", "Sales Territory Optimization", "Marketing Campaign Analysis", "Inventory Planning Workflow", "Customer Success Monitoring" ] print("\nAvailable Integration Examples:") for i, example in enumerate(examples, 1): print(f"{i}. {example}") print("\nEach example demonstrates:") print("- Multi-server coordination") print("- Real-world business workflows") print("- Data correlation and analysis") print("- Proactive decision making") print("\nTo run these examples:") print("1. Start the orchestration platform") print("2. Register the sample servers (weather-server, crm-server)") print("3. Run: await integration_orchestrator.run_example('customer_intake_workflow')") if __name__ == "__main__": asyncio.run(main())