|
|
""" |
|
|
Integration Examples for MCP Orchestration Platform |
|
|
Demonstrates real-world workflows and usage patterns |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Any, Optional |
|
|
import structlog |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = structlog.get_logger() |
|
|
|
|
|
|
|
|
class IntegrationExample: |
|
|
"""Base class for integration examples.""" |
|
|
|
|
|
def __init__(self, name: str, description: str): |
|
|
self.name = name |
|
|
self.description = description |
|
|
|
|
|
async def run(self, orchestrator): |
|
|
"""Run the integration example.""" |
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
class CustomerIntakeWorkflow(IntegrationExample): |
|
|
"""Demonstrates a complete customer intake workflow using weather and CRM servers.""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__( |
|
|
"Customer Intake Workflow", |
|
|
"Complete customer onboarding process using weather and CRM integration" |
|
|
) |
|
|
|
|
|
async def run(self, orchestrator): |
|
|
"""Execute the customer intake workflow.""" |
|
|
logger.info("Starting customer intake workflow") |
|
|
|
|
|
|
|
|
lead_data = { |
|
|
"name": "Alice Johnson", |
|
|
"email": "alice.johnson@company.com", |
|
|
"phone": "+1-555-0123", |
|
|
"source": "website", |
|
|
"status": "new", |
|
|
"score": 85, |
|
|
"notes": "Interested in enterprise weather services for retail chain" |
|
|
} |
|
|
|
|
|
lead_result = await orchestrator.call_tool("crm-server", "add_lead", lead_data) |
|
|
lead_id = json.loads(lead_result["content"][0]["text"])["id"] |
|
|
|
|
|
|
|
|
weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", { |
|
|
"location": "New York" |
|
|
}) |
|
|
weather_data = json.loads(weather_result["content"][0]["text"]) |
|
|
|
|
|
|
|
|
assigned_rep = "john.smith" if weather_data["temperature"] > 20 else "sarah.jones" |
|
|
|
|
|
|
|
|
lead_update_data = { |
|
|
"assigned_to": assigned_rep, |
|
|
"notes": f"Assigned to {assigned_rep}. Weather in customer location: {weather_data['conditions']}, {weather_data['temperature']}°C" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Lead assigned successfully", lead_id=lead_id, assigned_to=assigned_rep) |
|
|
|
|
|
|
|
|
customer_data = { |
|
|
"name": "Alice Johnson", |
|
|
"email": "alice.johnson@company.com", |
|
|
"phone": "+1-555-0123", |
|
|
"status": "active", |
|
|
"tags": ["enterprise", "weather-services", "retail"], |
|
|
"notes": f"Converted from lead {lead_id}. Territory: {weather_data['conditions']} region", |
|
|
"lifetime_value": 75000.0 |
|
|
} |
|
|
|
|
|
customer_result = await orchestrator.call_tool("crm-server", "add_customer", customer_data) |
|
|
customer_info = json.loads(customer_result["content"][0]["text"]) |
|
|
|
|
|
|
|
|
opportunity_data = { |
|
|
"customer_id": customer_info["id"], |
|
|
"title": "Weather Analytics Platform - Enterprise License", |
|
|
"value": 50000.0, |
|
|
"probability": 75, |
|
|
"stage": "proposal", |
|
|
"close_date": (datetime.utcnow() + timedelta(days=30)).isoformat(), |
|
|
"assigned_to": assigned_rep, |
|
|
"notes": "Multi-location retail chain weather monitoring solution" |
|
|
} |
|
|
|
|
|
opportunity_result = await orchestrator.call_tool("crm-server", "add_opportunity", opportunity_data) |
|
|
|
|
|
return { |
|
|
"workflow": "customer_intake", |
|
|
"lead_id": lead_id, |
|
|
"customer_id": customer_info["id"], |
|
|
"opportunity_created": True, |
|
|
"assigned_rep": assigned_rep, |
|
|
"weather_considered": True |
|
|
} |
|
|
|
|
|
|
|
|
class SalesTerritoryOptimization(IntegrationExample): |
|
|
"""Demonstrates territory optimization using weather patterns and sales data.""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__( |
|
|
"Sales Territory Optimization", |
|
|
"Optimize sales territories based on weather patterns and CRM data" |
|
|
) |
|
|
|
|
|
async def run(self, orchestrator): |
|
|
"""Execute territory optimization workflow.""" |
|
|
logger.info("Starting sales territory optimization") |
|
|
|
|
|
|
|
|
pipeline_result = await orchestrator.call_tool("crm-server", "get_sales_pipeline", {"limit": 100}) |
|
|
pipeline = json.loads(pipeline_result["content"][0]["text"])["pipeline"] |
|
|
|
|
|
|
|
|
locations = ["New York", "London", "Tokyo", "Los Angeles", "Paris"] |
|
|
weather_data = {} |
|
|
|
|
|
for location in locations: |
|
|
try: |
|
|
weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", { |
|
|
"location": location |
|
|
}) |
|
|
weather_data[location] = json.loads(weather_result["content"][0]["text"]) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get weather for {location}: {e}") |
|
|
|
|
|
|
|
|
weather_performance = {} |
|
|
for opp in pipeline: |
|
|
location = opp.get("customer_name", "Unknown") |
|
|
if location in weather_data: |
|
|
conditions = weather_data[location]["conditions"] |
|
|
if conditions not in weather_performance: |
|
|
weather_performance[conditions] = {"count": 0, "value": 0} |
|
|
weather_performance[conditions]["count"] += 1 |
|
|
weather_performance[conditions]["value"] += opp["value"] |
|
|
|
|
|
|
|
|
recommendations = [] |
|
|
for conditions, data in weather_performance.items(): |
|
|
avg_deal_size = data["value"] / data["count"] |
|
|
recommendations.append({ |
|
|
"weather_conditions": conditions, |
|
|
"opportunity_count": data["count"], |
|
|
"total_pipeline_value": data["value"], |
|
|
"average_deal_size": avg_deal_size, |
|
|
"recommendation": "Expand focus" if avg_deal_size > 25000 else "Maintain current level" |
|
|
}) |
|
|
|
|
|
return { |
|
|
"workflow": "territory_optimization", |
|
|
"weather_performance": weather_performance, |
|
|
"recommendations": recommendations, |
|
|
"analyzed_opportunities": len(pipeline) |
|
|
} |
|
|
|
|
|
|
|
|
class MarketingCampaignAnalysis(IntegrationExample): |
|
|
"""Demonstrates marketing campaign effectiveness analysis using CRM and weather data.""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__( |
|
|
"Marketing Campaign Analysis", |
|
|
"Analyze marketing campaign effectiveness with weather correlation" |
|
|
) |
|
|
|
|
|
async def run(self, orchestrator): |
|
|
"""Execute marketing campaign analysis workflow.""" |
|
|
logger.info("Starting marketing campaign analysis") |
|
|
|
|
|
|
|
|
metrics_result = await orchestrator.call_tool("crm-server", "get_crm_metrics", {}) |
|
|
crm_metrics = json.loads(metrics_result["content"][0]["text"]) |
|
|
|
|
|
|
|
|
key_markets = ["New York", "London", "Tokyo"] |
|
|
forecast_data = {} |
|
|
|
|
|
for market in key_markets: |
|
|
try: |
|
|
forecast_result = await orchestrator.call_tool("weather-server", "get_weather_forecast", { |
|
|
"location": market, |
|
|
"days": 7 |
|
|
}) |
|
|
forecast_data[market] = json.loads(forecast_result["content"][0]["text"]) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get forecast for {market}: {e}") |
|
|
|
|
|
|
|
|
campaign_analysis = { |
|
|
"overall_metrics": crm_metrics, |
|
|
"weather_forecasts": forecast_data, |
|
|
"campaign_insights": [] |
|
|
} |
|
|
|
|
|
|
|
|
for market, forecast in forecast_data.items(): |
|
|
sunny_days = sum(1 for day in forecast["forecast"] if "sunny" in day["conditions"].lower()) |
|
|
rainy_days = sum(1 for day in forecast["forecast"] if "rain" in day["conditions"].lower()) |
|
|
|
|
|
campaign_analysis["campaign_insights"].append({ |
|
|
"market": market, |
|
|
"forecast_summary": { |
|
|
"sunny_days": sunny_days, |
|
|
"rainy_days": rainy_days, |
|
|
"avg_temperature": sum(day["high"] for day in forecast["forecast"]) / len(forecast["forecast"]) |
|
|
}, |
|
|
"recommended_campaigns": { |
|
|
"sunny": "Outdoor events and promotions" if sunny_days > 3 else "Limited outdoor focus", |
|
|
"rainy": "Indoor services and online marketing" if rainy_days > 2 else "Standard campaigns" |
|
|
} |
|
|
}) |
|
|
|
|
|
return campaign_analysis |
|
|
|
|
|
|
|
|
class InventoryPlanningWorkflow(IntegrationExample): |
|
|
"""Demonstrates inventory planning using weather forecasts and sales data.""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__( |
|
|
"Inventory Planning Workflow", |
|
|
"Plan inventory based on weather forecasts and sales pipeline" |
|
|
) |
|
|
|
|
|
async def run(self, orchestrator): |
|
|
"""Execute inventory planning workflow.""" |
|
|
logger.info("Starting inventory planning workflow") |
|
|
|
|
|
|
|
|
pipeline_result = await orchestrator.call_tool("crm-server", "get_sales_pipeline", {"limit": 50}) |
|
|
pipeline = json.loads(pipeline_result["content"][0]["text"])["pipeline"] |
|
|
|
|
|
|
|
|
major_markets = ["New York", "London", "Tokyo", "Los Angeles"] |
|
|
market_weather = {} |
|
|
|
|
|
for market in major_markets: |
|
|
try: |
|
|
forecast_result = await orchestrator.call_tool("weather-server", "get_weather_forecast", { |
|
|
"location": market, |
|
|
"days": 14 |
|
|
}) |
|
|
forecast = json.loads(forecast_result["content"][0]["text"]) |
|
|
|
|
|
|
|
|
temp_trend = [day["high"] for day in forecast["forecast"]] |
|
|
rainy_days = sum(1 for day in forecast["forecast"] if "rain" in day["conditions"].lower()) |
|
|
|
|
|
market_weather[market] = { |
|
|
"temp_trend": temp_trend, |
|
|
"rainy_days": rainy_days, |
|
|
"avg_temp": sum(temp_trend) / len(temp_trend), |
|
|
"forecast": forecast["forecast"] |
|
|
} |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get forecast for {market}: {e}") |
|
|
|
|
|
|
|
|
inventory_recommendations = [] |
|
|
|
|
|
for market, weather in market_weather.items(): |
|
|
|
|
|
base_demand = 1000 |
|
|
temp_factor = 1.0 + (weather["avg_temp"] - 20) * 0.02 |
|
|
weather_factor = 1.0 + weather["rainy_days"] * 0.05 |
|
|
|
|
|
recommended_inventory = int(base_demand * temp_factor * weather_factor) |
|
|
|
|
|
inventory_recommendations.append({ |
|
|
"market": market, |
|
|
"recommended_inventory": recommended_inventory, |
|
|
"temp_factor": round(temp_factor, 2), |
|
|
"weather_factor": round(weather_factor, 2), |
|
|
"reasoning": f"Based on {weather['rainy_days']} rainy days and avg temp {weather['avg_temp']:.1f}°C" |
|
|
}) |
|
|
|
|
|
|
|
|
pipeline_insights = { |
|
|
"total_pipeline_value": sum(opp["value"] for opp in pipeline), |
|
|
"opportunities_by_stage": {}, |
|
|
"high_value_opportunities": [opp for opp in pipeline if opp["value"] > 25000] |
|
|
} |
|
|
|
|
|
for opp in pipeline: |
|
|
stage = opp["stage"] |
|
|
if stage not in pipeline_insights["opportunities_by_stage"]: |
|
|
pipeline_insights["opportunities_by_stage"][stage] = {"count": 0, "value": 0} |
|
|
pipeline_insights["opportunities_by_stage"][stage]["count"] += 1 |
|
|
pipeline_insights["opportunities_by_stage"][stage]["value"] += opp["value"] |
|
|
|
|
|
return { |
|
|
"workflow": "inventory_planning", |
|
|
"inventory_recommendations": inventory_recommendations, |
|
|
"pipeline_insights": pipeline_insights, |
|
|
"forecast_period": "14 days" |
|
|
} |
|
|
|
|
|
|
|
|
class CustomerSuccessMonitoring(IntegrationExample): |
|
|
"""Demonstrates customer success monitoring with proactive weather-based outreach.""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__( |
|
|
"Customer Success Monitoring", |
|
|
"Monitor customer health and provide proactive weather-based support" |
|
|
) |
|
|
|
|
|
async def run(self, orchestrator): |
|
|
"""Execute customer success monitoring workflow.""" |
|
|
logger.info("Starting customer success monitoring") |
|
|
|
|
|
|
|
|
metrics_result = await orchestrator.call_tool("crm-server", "get_crm_metrics", {}) |
|
|
crm_metrics = json.loads(metrics_result["content"][0]["text"]) |
|
|
|
|
|
|
|
|
search_result = await orchestrator.call_tool("crm-server", "search_customers", { |
|
|
"query": "priority", |
|
|
"limit": 10 |
|
|
}) |
|
|
priority_customers = json.loads(search_result["content"][0]["text"])["customers"] |
|
|
|
|
|
|
|
|
weather_alerts = [] |
|
|
|
|
|
for customer in priority_customers[:3]: |
|
|
try: |
|
|
|
|
|
weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", { |
|
|
"location": customer["name"].split()[-1] if " " in customer["name"] else customer["name"] |
|
|
}) |
|
|
weather_data = json.loads(weather_result["content"][0]["text"]) |
|
|
|
|
|
|
|
|
alert_type = None |
|
|
recommendation = None |
|
|
|
|
|
if weather_data["temperature"] < 5: |
|
|
alert_type = "weather_alert" |
|
|
recommendation = "Check heating/cooling system status" |
|
|
elif weather_data["wind_speed"] > 15: |
|
|
alert_type = "weather_alert" |
|
|
recommendation = "Monitor outdoor equipment safety" |
|
|
elif weather_data["humidity"] > 80: |
|
|
alert_type = "weather_alert" |
|
|
recommendation = "Check for potential moisture issues" |
|
|
|
|
|
if alert_type: |
|
|
weather_alerts.append({ |
|
|
"customer_id": customer["id"], |
|
|
"customer_name": customer["name"], |
|
|
"location": weather_data["location"], |
|
|
"alert_type": alert_type, |
|
|
"current_conditions": f"{weather_data['conditions']}, {weather_data['temperature']}°C", |
|
|
"recommendation": recommendation, |
|
|
"priority": "high" if weather_data["wind_speed"] > 15 else "medium" |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get weather for customer {customer['name']}: {e}") |
|
|
|
|
|
|
|
|
success_report = { |
|
|
"monitoring_date": datetime.utcnow().isoformat(), |
|
|
"customer_metrics": crm_metrics, |
|
|
"priority_customers_monitored": len(priority_customers), |
|
|
"weather_alerts_generated": len(weather_alerts), |
|
|
"alerts": weather_alerts, |
|
|
"recommended_actions": [ |
|
|
"Follow up on high-priority weather alerts", |
|
|
"Schedule proactive check-ins with priority customers", |
|
|
"Prepare weather-related support materials" |
|
|
] |
|
|
} |
|
|
|
|
|
return success_report |
|
|
|
|
|
|
|
|
class IntegrationOrchestrator: |
|
|
"""Orchestrates multiple integration examples.""" |
|
|
|
|
|
def __init__(self, orchestrator): |
|
|
self.orchestrator = orchestrator |
|
|
self.examples = [ |
|
|
CustomerIntakeWorkflow(), |
|
|
SalesTerritoryOptimization(), |
|
|
MarketingCampaignAnalysis(), |
|
|
InventoryPlanningWorkflow(), |
|
|
CustomerSuccessMonitoring() |
|
|
] |
|
|
|
|
|
async def run_example(self, example_name: str) -> Dict[str, Any]: |
|
|
"""Run a specific integration example.""" |
|
|
for example in self.examples: |
|
|
if example.name.lower().replace(" ", "_") == example_name.lower().replace(" ", "_"): |
|
|
return await example.run(self.orchestrator) |
|
|
|
|
|
raise ValueError(f"Example '{example_name}' not found") |
|
|
|
|
|
async def run_all_examples(self) -> Dict[str, Any]: |
|
|
"""Run all integration examples.""" |
|
|
results = {} |
|
|
|
|
|
for example in self.examples: |
|
|
try: |
|
|
logger.info(f"Running example: {example.name}") |
|
|
result = await example.run(self.orchestrator) |
|
|
results[example.name] = { |
|
|
"status": "success", |
|
|
"result": result |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Example {example.name} failed", error=str(e)) |
|
|
results[example.name] = { |
|
|
"status": "failed", |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
return { |
|
|
"execution_summary": { |
|
|
"total_examples": len(self.examples), |
|
|
"successful": sum(1 for r in results.values() if r["status"] == "success"), |
|
|
"failed": sum(1 for r in results.values() if r["status"] == "failed") |
|
|
}, |
|
|
"results": results |
|
|
} |
|
|
|
|
|
def list_examples(self) -> List[Dict[str, str]]: |
|
|
"""List available integration examples.""" |
|
|
return [ |
|
|
{ |
|
|
"name": example.name, |
|
|
"description": example.description |
|
|
} |
|
|
for example in self.examples |
|
|
] |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Main function to demonstrate integration examples.""" |
|
|
|
|
|
print("Integration Examples for MCP Orchestration Platform") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
examples = [ |
|
|
"Customer Intake Workflow", |
|
|
"Sales Territory Optimization", |
|
|
"Marketing Campaign Analysis", |
|
|
"Inventory Planning Workflow", |
|
|
"Customer Success Monitoring" |
|
|
] |
|
|
|
|
|
print("\nAvailable Integration Examples:") |
|
|
for i, example in enumerate(examples, 1): |
|
|
print(f"{i}. {example}") |
|
|
|
|
|
print("\nEach example demonstrates:") |
|
|
print("- Multi-server coordination") |
|
|
print("- Real-world business workflows") |
|
|
print("- Data correlation and analysis") |
|
|
print("- Proactive decision making") |
|
|
|
|
|
print("\nTo run these examples:") |
|
|
print("1. Start the orchestration platform") |
|
|
print("2. Register the sample servers (weather-server, crm-server)") |
|
|
print("3. Run: await integration_orchestrator.run_example('customer_intake_workflow')") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |