rajkumarrawal's picture
Initial commit
2ec0d39
"""
Integration Examples for MCP Orchestration Platform
Demonstrates real-world workflows and usage patterns
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import structlog
# Configure structured logging
logging.basicConfig(level=logging.INFO)
logger = structlog.get_logger()
class IntegrationExample:
"""Base class for integration examples."""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
async def run(self, orchestrator):
"""Run the integration example."""
raise NotImplementedError
class CustomerIntakeWorkflow(IntegrationExample):
"""Demonstrates a complete customer intake workflow using weather and CRM servers."""
def __init__(self):
super().__init__(
"Customer Intake Workflow",
"Complete customer onboarding process using weather and CRM integration"
)
async def run(self, orchestrator):
"""Execute the customer intake workflow."""
logger.info("Starting customer intake workflow")
# Step 1: Create lead from website inquiry
lead_data = {
"name": "Alice Johnson",
"email": "alice.johnson@company.com",
"phone": "+1-555-0123",
"source": "website",
"status": "new",
"score": 85,
"notes": "Interested in enterprise weather services for retail chain"
}
lead_result = await orchestrator.call_tool("crm-server", "add_lead", lead_data)
lead_id = json.loads(lead_result["content"][0]["text"])["id"]
# Step 2: Get customer location weather for territory assignment
weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", {
"location": "New York"
})
weather_data = json.loads(weather_result["content"][0]["text"])
# Step 3: Assign lead to sales rep based on weather conditions (simplified logic)
assigned_rep = "john.smith" if weather_data["temperature"] > 20 else "sarah.jones"
# Step 4: Update lead with assignment
lead_update_data = {
"assigned_to": assigned_rep,
"notes": f"Assigned to {assigned_rep}. Weather in customer location: {weather_data['conditions']}, {weather_data['temperature']}°C"
}
# Note: In a real implementation, you'd have an update_lead tool
logger.info("Lead assigned successfully", lead_id=lead_id, assigned_to=assigned_rep)
# Step 5: Add customer after lead qualification
customer_data = {
"name": "Alice Johnson",
"email": "alice.johnson@company.com",
"phone": "+1-555-0123",
"status": "active",
"tags": ["enterprise", "weather-services", "retail"],
"notes": f"Converted from lead {lead_id}. Territory: {weather_data['conditions']} region",
"lifetime_value": 75000.0
}
customer_result = await orchestrator.call_tool("crm-server", "add_customer", customer_data)
customer_info = json.loads(customer_result["content"][0]["text"])
# Step 6: Create initial opportunity
opportunity_data = {
"customer_id": customer_info["id"],
"title": "Weather Analytics Platform - Enterprise License",
"value": 50000.0,
"probability": 75,
"stage": "proposal",
"close_date": (datetime.utcnow() + timedelta(days=30)).isoformat(),
"assigned_to": assigned_rep,
"notes": "Multi-location retail chain weather monitoring solution"
}
opportunity_result = await orchestrator.call_tool("crm-server", "add_opportunity", opportunity_data)
return {
"workflow": "customer_intake",
"lead_id": lead_id,
"customer_id": customer_info["id"],
"opportunity_created": True,
"assigned_rep": assigned_rep,
"weather_considered": True
}
class SalesTerritoryOptimization(IntegrationExample):
"""Demonstrates territory optimization using weather patterns and sales data."""
def __init__(self):
super().__init__(
"Sales Territory Optimization",
"Optimize sales territories based on weather patterns and CRM data"
)
async def run(self, orchestrator):
"""Execute territory optimization workflow."""
logger.info("Starting sales territory optimization")
# Step 1: Get sales pipeline
pipeline_result = await orchestrator.call_tool("crm-server", "get_sales_pipeline", {"limit": 100})
pipeline = json.loads(pipeline_result["content"][0]["text"])["pipeline"]
# Step 2: Get weather for key customer locations
locations = ["New York", "London", "Tokyo", "Los Angeles", "Paris"]
weather_data = {}
for location in locations:
try:
weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", {
"location": location
})
weather_data[location] = json.loads(weather_result["content"][0]["text"])
except Exception as e:
logger.warning(f"Could not get weather for {location}: {e}")
# Step 3: Analyze pipeline performance by weather conditions
weather_performance = {}
for opp in pipeline:
location = opp.get("customer_name", "Unknown")
if location in weather_data:
conditions = weather_data[location]["conditions"]
if conditions not in weather_performance:
weather_performance[conditions] = {"count": 0, "value": 0}
weather_performance[conditions]["count"] += 1
weather_performance[conditions]["value"] += opp["value"]
# Step 4: Generate territory recommendations
recommendations = []
for conditions, data in weather_performance.items():
avg_deal_size = data["value"] / data["count"]
recommendations.append({
"weather_conditions": conditions,
"opportunity_count": data["count"],
"total_pipeline_value": data["value"],
"average_deal_size": avg_deal_size,
"recommendation": "Expand focus" if avg_deal_size > 25000 else "Maintain current level"
})
return {
"workflow": "territory_optimization",
"weather_performance": weather_performance,
"recommendations": recommendations,
"analyzed_opportunities": len(pipeline)
}
class MarketingCampaignAnalysis(IntegrationExample):
"""Demonstrates marketing campaign effectiveness analysis using CRM and weather data."""
def __init__(self):
super().__init__(
"Marketing Campaign Analysis",
"Analyze marketing campaign effectiveness with weather correlation"
)
async def run(self, orchestrator):
"""Execute marketing campaign analysis workflow."""
logger.info("Starting marketing campaign analysis")
# Step 1: Get CRM metrics
metrics_result = await orchestrator.call_tool("crm-server", "get_crm_metrics", {})
crm_metrics = json.loads(metrics_result["content"][0]["text"])
# Step 2: Get weather forecasts for key markets
key_markets = ["New York", "London", "Tokyo"]
forecast_data = {}
for market in key_markets:
try:
forecast_result = await orchestrator.call_tool("weather-server", "get_weather_forecast", {
"location": market,
"days": 7
})
forecast_data[market] = json.loads(forecast_result["content"][0]["text"])
except Exception as e:
logger.warning(f"Could not get forecast for {market}: {e}")
# Step 3: Analyze campaign performance correlation with weather
campaign_analysis = {
"overall_metrics": crm_metrics,
"weather_forecasts": forecast_data,
"campaign_insights": []
}
# Example insights (simplified)
for market, forecast in forecast_data.items():
sunny_days = sum(1 for day in forecast["forecast"] if "sunny" in day["conditions"].lower())
rainy_days = sum(1 for day in forecast["forecast"] if "rain" in day["conditions"].lower())
campaign_analysis["campaign_insights"].append({
"market": market,
"forecast_summary": {
"sunny_days": sunny_days,
"rainy_days": rainy_days,
"avg_temperature": sum(day["high"] for day in forecast["forecast"]) / len(forecast["forecast"])
},
"recommended_campaigns": {
"sunny": "Outdoor events and promotions" if sunny_days > 3 else "Limited outdoor focus",
"rainy": "Indoor services and online marketing" if rainy_days > 2 else "Standard campaigns"
}
})
return campaign_analysis
class InventoryPlanningWorkflow(IntegrationExample):
"""Demonstrates inventory planning using weather forecasts and sales data."""
def __init__(self):
super().__init__(
"Inventory Planning Workflow",
"Plan inventory based on weather forecasts and sales pipeline"
)
async def run(self, orchestrator):
"""Execute inventory planning workflow."""
logger.info("Starting inventory planning workflow")
# Step 1: Get sales pipeline to understand upcoming demand
pipeline_result = await orchestrator.call_tool("crm-server", "get_sales_pipeline", {"limit": 50})
pipeline = json.loads(pipeline_result["content"][0]["text"])["pipeline"]
# Step 2: Get weather forecasts for major markets
major_markets = ["New York", "London", "Tokyo", "Los Angeles"]
market_weather = {}
for market in major_markets:
try:
forecast_result = await orchestrator.call_tool("weather-server", "get_weather_forecast", {
"location": market,
"days": 14 # 2-week forecast
})
forecast = json.loads(forecast_result["content"][0]["text"])
# Analyze weather patterns
temp_trend = [day["high"] for day in forecast["forecast"]]
rainy_days = sum(1 for day in forecast["forecast"] if "rain" in day["conditions"].lower())
market_weather[market] = {
"temp_trend": temp_trend,
"rainy_days": rainy_days,
"avg_temp": sum(temp_trend) / len(temp_trend),
"forecast": forecast["forecast"]
}
except Exception as e:
logger.warning(f"Could not get forecast for {market}: {e}")
# Step 3: Calculate inventory recommendations
inventory_recommendations = []
for market, weather in market_weather.items():
# Simple demand forecasting based on weather
base_demand = 1000 # Base units
temp_factor = 1.0 + (weather["avg_temp"] - 20) * 0.02 # Temperature impact
weather_factor = 1.0 + weather["rainy_days"] * 0.05 # Rain impact
recommended_inventory = int(base_demand * temp_factor * weather_factor)
inventory_recommendations.append({
"market": market,
"recommended_inventory": recommended_inventory,
"temp_factor": round(temp_factor, 2),
"weather_factor": round(weather_factor, 2),
"reasoning": f"Based on {weather['rainy_days']} rainy days and avg temp {weather['avg_temp']:.1f}°C"
})
# Step 4: Get pipeline insights for strategic planning
pipeline_insights = {
"total_pipeline_value": sum(opp["value"] for opp in pipeline),
"opportunities_by_stage": {},
"high_value_opportunities": [opp for opp in pipeline if opp["value"] > 25000]
}
for opp in pipeline:
stage = opp["stage"]
if stage not in pipeline_insights["opportunities_by_stage"]:
pipeline_insights["opportunities_by_stage"][stage] = {"count": 0, "value": 0}
pipeline_insights["opportunities_by_stage"][stage]["count"] += 1
pipeline_insights["opportunities_by_stage"][stage]["value"] += opp["value"]
return {
"workflow": "inventory_planning",
"inventory_recommendations": inventory_recommendations,
"pipeline_insights": pipeline_insights,
"forecast_period": "14 days"
}
class CustomerSuccessMonitoring(IntegrationExample):
"""Demonstrates customer success monitoring with proactive weather-based outreach."""
def __init__(self):
super().__init__(
"Customer Success Monitoring",
"Monitor customer health and provide proactive weather-based support"
)
async def run(self, orchestrator):
"""Execute customer success monitoring workflow."""
logger.info("Starting customer success monitoring")
# Step 1: Get customer metrics from CRM
metrics_result = await orchestrator.call_tool("crm-server", "get_crm_metrics", {})
crm_metrics = json.loads(metrics_result["content"][0]["text"])
# Step 2: Search for customers who need attention (simplified)
search_result = await orchestrator.call_tool("crm-server", "search_customers", {
"query": "priority",
"limit": 10
})
priority_customers = json.loads(search_result["content"][0]["text"])["customers"]
# Step 3: Get weather alerts for customer locations
weather_alerts = []
for customer in priority_customers[:3]: # Limit to 3 for demo
try:
# Use customer name as location (simplified)
weather_result = await orchestrator.call_tool("weather-server", "get_current_weather", {
"location": customer["name"].split()[-1] if " " in customer["name"] else customer["name"]
})
weather_data = json.loads(weather_result["content"][0]["text"])
# Generate proactive outreach recommendations
alert_type = None
recommendation = None
if weather_data["temperature"] < 5: # Cold weather alert
alert_type = "weather_alert"
recommendation = "Check heating/cooling system status"
elif weather_data["wind_speed"] > 15: # High wind alert
alert_type = "weather_alert"
recommendation = "Monitor outdoor equipment safety"
elif weather_data["humidity"] > 80: # High humidity alert
alert_type = "weather_alert"
recommendation = "Check for potential moisture issues"
if alert_type:
weather_alerts.append({
"customer_id": customer["id"],
"customer_name": customer["name"],
"location": weather_data["location"],
"alert_type": alert_type,
"current_conditions": f"{weather_data['conditions']}, {weather_data['temperature']}°C",
"recommendation": recommendation,
"priority": "high" if weather_data["wind_speed"] > 15 else "medium"
})
except Exception as e:
logger.warning(f"Could not get weather for customer {customer['name']}: {e}")
# Step 4: Generate customer success report
success_report = {
"monitoring_date": datetime.utcnow().isoformat(),
"customer_metrics": crm_metrics,
"priority_customers_monitored": len(priority_customers),
"weather_alerts_generated": len(weather_alerts),
"alerts": weather_alerts,
"recommended_actions": [
"Follow up on high-priority weather alerts",
"Schedule proactive check-ins with priority customers",
"Prepare weather-related support materials"
]
}
return success_report
class IntegrationOrchestrator:
"""Orchestrates multiple integration examples."""
def __init__(self, orchestrator):
self.orchestrator = orchestrator
self.examples = [
CustomerIntakeWorkflow(),
SalesTerritoryOptimization(),
MarketingCampaignAnalysis(),
InventoryPlanningWorkflow(),
CustomerSuccessMonitoring()
]
async def run_example(self, example_name: str) -> Dict[str, Any]:
"""Run a specific integration example."""
for example in self.examples:
if example.name.lower().replace(" ", "_") == example_name.lower().replace(" ", "_"):
return await example.run(self.orchestrator)
raise ValueError(f"Example '{example_name}' not found")
async def run_all_examples(self) -> Dict[str, Any]:
"""Run all integration examples."""
results = {}
for example in self.examples:
try:
logger.info(f"Running example: {example.name}")
result = await example.run(self.orchestrator)
results[example.name] = {
"status": "success",
"result": result
}
except Exception as e:
logger.error(f"Example {example.name} failed", error=str(e))
results[example.name] = {
"status": "failed",
"error": str(e)
}
return {
"execution_summary": {
"total_examples": len(self.examples),
"successful": sum(1 for r in results.values() if r["status"] == "success"),
"failed": sum(1 for r in results.values() if r["status"] == "failed")
},
"results": results
}
def list_examples(self) -> List[Dict[str, str]]:
"""List available integration examples."""
return [
{
"name": example.name,
"description": example.description
}
for example in self.examples
]
async def main():
"""Main function to demonstrate integration examples."""
# This would be replaced with actual orchestrator initialization
print("Integration Examples for MCP Orchestration Platform")
print("=" * 60)
# Example usage
examples = [
"Customer Intake Workflow",
"Sales Territory Optimization",
"Marketing Campaign Analysis",
"Inventory Planning Workflow",
"Customer Success Monitoring"
]
print("\nAvailable Integration Examples:")
for i, example in enumerate(examples, 1):
print(f"{i}. {example}")
print("\nEach example demonstrates:")
print("- Multi-server coordination")
print("- Real-world business workflows")
print("- Data correlation and analysis")
print("- Proactive decision making")
print("\nTo run these examples:")
print("1. Start the orchestration platform")
print("2. Register the sample servers (weather-server, crm-server)")
print("3. Run: await integration_orchestrator.run_example('customer_intake_workflow')")
if __name__ == "__main__":
asyncio.run(main())