rajkumarrawal's picture
Initial commit
2ec0d39
"""
Sample MCP Database/CRM Server
Demonstrates database operations, data management, and business logic integration
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import sqlite3
import uuid
from dataclasses import dataclass, asdict
from enum import Enum
import structlog
# Configure structured logging
logging.basicConfig(level=logging.INFO)
logger = structlog.get_logger()
class CustomerStatus(Enum):
"""Customer status enumeration."""
ACTIVE = "active"
INACTIVE = "inactive"
PENDING = "pending"
SUSPENDED = "suspended"
@dataclass
class Customer:
"""Customer data model."""
id: str
name: str
email: str
phone: Optional[str]
status: CustomerStatus
created_at: str
last_contact: Optional[str]
tags: List[str]
notes: str
lifetime_value: float = 0.0
@dataclass
class Lead:
"""Lead data model."""
id: str
name: str
email: str
phone: Optional[str]
source: str
status: str
score: int
assigned_to: Optional[str]
created_at: str
notes: str
@dataclass
class Opportunity:
"""Opportunity data model."""
id: str
customer_id: str
title: str
value: float
probability: int
stage: str
close_date: Optional[str]
assigned_to: Optional[str]
created_at: str
notes: str
class DatabaseManager:
"""Database manager for CRM operations."""
def __init__(self, db_path: str = "crm_database.db"):
self.db_path = db_path
self._initialize_database()
def _initialize_database(self):
"""Initialize the database with required tables."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create customers table
cursor.execute("""
CREATE TABLE IF NOT EXISTS customers (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
phone TEXT,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
last_contact TEXT,
tags TEXT,
notes TEXT DEFAULT '',
lifetime_value REAL DEFAULT 0.0
)
""")
# Create leads table
cursor.execute("""
CREATE TABLE IF NOT EXISTS leads (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
phone TEXT,
source TEXT NOT NULL,
status TEXT NOT NULL,
score INTEGER DEFAULT 0,
assigned_to TEXT,
created_at TEXT NOT NULL,
notes TEXT DEFAULT ''
)
""")
# Create opportunities table
cursor.execute("""
CREATE TABLE IF NOT EXISTS opportunities (
id TEXT PRIMARY KEY,
customer_id TEXT NOT NULL,
title TEXT NOT NULL,
value REAL NOT NULL,
probability INTEGER DEFAULT 0,
stage TEXT NOT NULL,
close_date TEXT,
assigned_to TEXT,
created_at TEXT NOT NULL,
notes TEXT DEFAULT '',
FOREIGN KEY (customer_id) REFERENCES customers (id)
)
""")
conn.commit()
logger.info("Database initialized successfully")
async def add_customer(self, customer_data: Dict[str, Any]) -> Customer:
"""Add a new customer to the database."""
customer_id = str(uuid.uuid4())
customer = Customer(
id=customer_id,
name=customer_data["name"],
email=customer_data["email"],
phone=customer_data.get("phone"),
status=CustomerStatus(customer_data.get("status", "active")),
created_at=datetime.utcnow().isoformat(),
last_contact=None,
tags=customer_data.get("tags", []),
notes=customer_data.get("notes", ""),
lifetime_value=customer_data.get("lifetime_value", 0.0)
)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO customers (id, name, email, phone, status, created_at, last_contact, tags, notes, lifetime_value)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
customer.id, customer.name, customer.email, customer.phone,
customer.status.value, customer.created_at, customer.last_contact,
json.dumps(customer.tags), customer.notes, customer.lifetime_value
))
conn.commit()
logger.info("Customer added", customer_id=customer_id, email=customer.email)
return customer
async def get_customer(self, customer_id: str) -> Optional[Customer]:
"""Get customer by ID."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM customers WHERE id = ?", (customer_id,))
row = cursor.fetchone()
if row:
return Customer(
id=row[0], name=row[1], email=row[2], phone=row[3],
status=CustomerStatus(row[4]), created_at=row[5],
last_contact=row[6], tags=json.loads(row[7] or "[]"),
notes=row[8], lifetime_value=row[9]
)
return None
async def search_customers(self, query: str, limit: int = 10) -> List[Customer]:
"""Search customers by name or email."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM customers
WHERE name LIKE ? OR email LIKE ?
LIMIT ?
""", (f"%{query}%", f"%{query}%", limit))
customers = []
for row in cursor.fetchall():
customers.append(Customer(
id=row[0], name=row[1], email=row[2], phone=row[3],
status=CustomerStatus(row[4]), created_at=row[5],
last_contact=row[6], tags=json.loads(row[7] or "[]"),
notes=row[8], lifetime_value=row[9]
))
return customers
async def add_lead(self, lead_data: Dict[str, Any]) -> Lead:
"""Add a new lead."""
lead_id = str(uuid.uuid4())
lead = Lead(
id=lead_id,
name=lead_data["name"],
email=lead_data["email"],
phone=lead_data.get("phone"),
source=lead_data["source"],
status=lead_data.get("status", "new"),
score=lead_data.get("score", 0),
assigned_to=lead_data.get("assigned_to"),
created_at=datetime.utcnow().isoformat(),
notes=lead_data.get("notes", "")
)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO leads (id, name, email, phone, source, status, score, assigned_to, created_at, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
lead.id, lead.name, lead.email, lead.phone, lead.source,
lead.status, lead.score, lead.assigned_to, lead.created_at, lead.notes
))
conn.commit()
logger.info("Lead added", lead_id=lead_id, email=lead.email)
return lead
async def get_leads_by_status(self, status: str, limit: int = 20) -> List[Lead]:
"""Get leads by status."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM leads WHERE status = ? LIMIT ?", (status, limit))
leads = []
for row in cursor.fetchall():
leads.append(Lead(
id=row[0], name=row[1], email=row[2], phone=row[3],
source=row[4], status=row[5], score=row[6],
assigned_to=row[7], created_at=row[8], notes=row[9]
))
return leads
async def add_opportunity(self, opportunity_data: Dict[str, Any]) -> Opportunity:
"""Add a new opportunity."""
opp_id = str(uuid.uuid4())
opportunity = Opportunity(
id=opp_id,
customer_id=opportunity_data["customer_id"],
title=opportunity_data["title"],
value=opportunity_data["value"],
probability=opportunity_data.get("probability", 0),
stage=opportunity_data.get("stage", "prospecting"),
close_date=opportunity_data.get("close_date"),
assigned_to=opportunity_data.get("assigned_to"),
created_at=datetime.utcnow().isoformat(),
notes=opportunity_data.get("notes", "")
)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO opportunities (id, customer_id, title, value, probability, stage, close_date, assigned_to, created_at, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
opportunity.id, opportunity.customer_id, opportunity.title,
opportunity.value, opportunity.probability, opportunity.stage,
opportunity.close_date, opportunity.assigned_to, opportunity.created_at, opportunity.notes
))
conn.commit()
logger.info("Opportunity added", opportunity_id=opp_id, value=opportunity.value)
return opportunity
async def get_sales_pipeline(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get sales pipeline with customer information."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT o.id, o.title, o.value, o.probability, o.stage, o.close_date,
c.name as customer_name, c.email as customer_email,
o.assigned_to, o.created_at, o.notes
FROM opportunities o
JOIN customers c ON o.customer_id = c.id
ORDER BY o.created_at DESC
LIMIT ?
""", (limit,))
pipeline = []
for row in cursor.fetchall():
pipeline.append({
"opportunity_id": row[0],
"title": row[1],
"value": row[2],
"probability": row[3],
"stage": row[4],
"close_date": row[5],
"customer_name": row[6],
"customer_email": row[7],
"assigned_to": row[8],
"created_at": row[9],
"notes": row[10]
})
return pipeline
async def get_metrics(self) -> Dict[str, Any]:
"""Get CRM metrics and analytics."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Customer metrics
cursor.execute("SELECT COUNT(*) FROM customers WHERE status = 'active'")
active_customers = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM customers")
total_customers = cursor.fetchone()[0]
# Lead metrics
cursor.execute("SELECT COUNT(*) FROM leads WHERE status = 'new'")
new_leads = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM leads WHERE created_at >= ?",
[(datetime.utcnow() - timedelta(days=30)).isoformat()])
leads_this_month = cursor.fetchone()[0]
# Opportunity metrics
cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage NOT IN ('won', 'lost')")
pipeline_value = cursor.fetchone()[0] or 0
cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage = 'won'")
won_value = cursor.fetchone()[0] or 0
cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage = 'won' AND close_date >= ?",
[(datetime.utcnow() - timedelta(days=30)).isoformat()])
won_value_this_month = cursor.fetchone()[0] or 0
return {
"customers": {
"active": active_customers,
"total": total_customers,
"conversion_rate": round((active_customers / max(total_customers, 1)) * 100, 2)
},
"leads": {
"new": new_leads,
"this_month": leads_this_month
},
"opportunities": {
"pipeline_value": pipeline_value,
"won_value": won_value,
"won_value_this_month": won_value_this_month
}
}
class CRMMCPServer:
"""Model Context Protocol CRM Server implementation."""
def __init__(self, port: int = 8002):
self.port = port
self.db = DatabaseManager()
self.tools = {
"add_customer": {
"name": "add_customer",
"description": "Add a new customer to the CRM system",
"inputSchema": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Customer name",
"minLength": 1,
"maxLength": 200
},
"email": {
"type": "string",
"description": "Customer email address",
"format": "email"
},
"phone": {
"type": "string",
"description": "Customer phone number",
"pattern": "^[+]?[\\d\\s\\-\\(\\)]+$"
},
"status": {
"type": "string",
"enum": ["active", "inactive", "pending", "suspended"],
"default": "active"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Customer tags for categorization"
},
"notes": {
"type": "string",
"description": "Additional notes about the customer",
"maxLength": 1000
},
"lifetime_value": {
"type": "number",
"description": "Customer lifetime value in USD",
"minimum": 0
}
},
"required": ["name", "email"]
},
"outputSchema": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"email": {"type": "string"},
"phone": {"type": "string"},
"status": {"type": "string"},
"created_at": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"notes": {"type": "string"},
"lifetime_value": {"type": "number"}
}
},
"examples": [
{
"name": "John Doe",
"email": "john.doe@example.com",
"phone": "+1-555-0123",
"status": "active",
"tags": ["enterprise", "priority"],
"lifetime_value": 50000.0
}
],
"tags": ["crm", "customer", "add", "create"]
},
"get_customer": {
"name": "get_customer",
"description": "Get customer information by customer ID",
"inputSchema": {
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "Customer UUID",
"format": "uuid"
}
},
"required": ["customer_id"]
},
"outputSchema": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"email": {"type": "string"},
"phone": {"type": "string"},
"status": {"type": "string"},
"created_at": {"type": "string"},
"last_contact": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"notes": {"type": "string"},
"lifetime_value": {"type": "number"}
}
},
"examples": [
{"customer_id": "123e4567-e89b-12d3-a456-426614174000"}
],
"tags": ["crm", "customer", "get", "retrieve"]
},
"search_customers": {
"name": "search_customers",
"description": "Search customers by name or email",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for name or email",
"minLength": 1,
"maxLength": 100
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"minimum": 1,
"maximum": 100,
"default": 10
}
},
"required": ["query"]
},
"outputSchema": {
"type": "object",
"properties": {
"customers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"email": {"type": "string"},
"phone": {"type": "string"},
"status": {"type": "string"},
"created_at": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"notes": {"type": "string"},
"lifetime_value": {"type": "number"}
}
}
}
}
},
"examples": [
{"query": "john", "limit": 5}
],
"tags": ["crm", "customer", "search", "find"]
},
"add_lead": {
"name": "add_lead",
"description": "Add a new sales lead",
"inputSchema": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Lead name"},
"email": {"type": "string", "format": "email"},
"phone": {"type": "string"},
"source": {
"type": "string",
"description": "Lead source",
"enum": ["website", "referral", "social", "email", "phone", "event"]
},
"status": {
"type": "string",
"enum": ["new", "contacted", "qualified", "unqualified"],
"default": "new"
},
"score": {
"type": "integer",
"description": "Lead score (1-100)",
"minimum": 1,
"maximum": 100
},
"assigned_to": {"type": "string", "description": "Assigned sales rep"},
"notes": {"type": "string", "description": "Additional notes"}
},
"required": ["name", "email", "source"]
},
"outputSchema": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"email": {"type": "string"},
"phone": {"type": "string"},
"source": {"type": "string"},
"status": {"type": "string"},
"score": {"type": "integer"},
"assigned_to": {"type": "string"},
"created_at": {"type": "string"},
"notes": {"type": "string"}
}
},
"examples": [
{
"name": "Jane Smith",
"email": "jane.smith@company.com",
"source": "website",
"score": 85
}
],
"tags": ["crm", "lead", "add", "sales"]
},
"get_sales_pipeline": {
"name": "get_sales_pipeline",
"description": "Get sales pipeline with opportunity details",
"inputSchema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of opportunities to return",
"minimum": 1,
"maximum": 200,
"default": 50
}
}
},
"outputSchema": {
"type": "object",
"properties": {
"pipeline": {
"type": "array",
"items": {
"type": "object",
"properties": {
"opportunity_id": {"type": "string"},
"title": {"type": "string"},
"value": {"type": "number"},
"probability": {"type": "integer"},
"stage": {"type": "string"},
"close_date": {"type": "string"},
"customer_name": {"type": "string"},
"customer_email": {"type": "string"},
"assigned_to": {"type": "string"},
"created_at": {"type": "string"},
"notes": {"type": "string"}
}
}
}
}
},
"examples": [
{"limit": 20}
],
"tags": ["crm", "sales", "pipeline", "opportunities"]
},
"get_crm_metrics": {
"name": "get_crm_metrics",
"description": "Get CRM metrics and analytics dashboard",
"inputSchema": {
"type": "object",
"properties": {}
},
"outputSchema": {
"type": "object",
"properties": {
"customers": {
"type": "object",
"properties": {
"active": {"type": "integer"},
"total": {"type": "integer"},
"conversion_rate": {"type": "number"}
}
},
"leads": {
"type": "object",
"properties": {
"new": {"type": "integer"},
"this_month": {"type": "integer"}
}
},
"opportunities": {
"type": "object",
"properties": {
"pipeline_value": {"type": "number"},
"won_value": {"type": "number"},
"won_value_this_month": {"type": "number"}
}
}
}
},
"examples": [{}],
"tags": ["crm", "metrics", "analytics", "dashboard"]
}
}
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle incoming MCP requests."""
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
try:
if method == "tools/list":
return await self._handle_tools_list(request_id)
elif method == "tools/call":
return await self._handle_tools_call(request_id, params)
elif method == "initialize":
return await self._handle_initialize(request_id, params)
else:
return self._error_response(request_id, -32601, "Method not found")
except Exception as e:
logger.error("Request handling failed", method=method, error=str(e))
return self._error_response(request_id, -32603, f"Internal error: {str(e)}")
async def _handle_tools_list(self, request_id: str) -> Dict[str, Any]:
"""Handle tools/list request."""
tools_list = []
for tool_name, tool_info in self.tools.items():
tools_list.append({
"name": tool_info["name"],
"description": tool_info["description"],
"inputSchema": tool_info["inputSchema"],
"outputSchema": tool_info.get("outputSchema"),
"examples": tool_info.get("examples", []),
"tags": tool_info.get("tags", [])
})
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools_list
}
}
async def _handle_tools_call(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tools/call request."""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self.tools:
return self._error_response(request_id, -32601, f"Tool '{tool_name}' not found")
# Execute the tool
try:
if tool_name == "add_customer":
result = await self._add_customer(arguments)
elif tool_name == "get_customer":
result = await self._get_customer(arguments)
elif tool_name == "search_customers":
result = await self._search_customers(arguments)
elif tool_name == "add_lead":
result = await self._add_lead(arguments)
elif tool_name == "get_sales_pipeline":
result = await self._get_sales_pipeline(arguments)
elif tool_name == "get_crm_metrics":
result = await self._get_crm_metrics(arguments)
else:
return self._error_response(request_id, -32601, f"Tool '{tool_name}' not implemented")
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2)
}
]
}
}
except Exception as e:
logger.error("Tool execution failed", tool=tool_name, error=str(e))
return self._error_response(request_id, -32603, f"Tool execution failed: {str(e)}")
async def _handle_initialize(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialize request."""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "CRM MCP Server",
"version": "1.0.0",
"description": "Provides customer relationship management and sales operations"
}
}
}
async def _add_customer(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute add_customer tool."""
return asdict(await self.db.add_customer(arguments))
async def _get_customer(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute get_customer tool."""
customer_id = arguments.get("customer_id")
if not customer_id:
raise ValueError("customer_id is required")
customer = await self.db.get_customer(customer_id)
if not customer:
raise ValueError(f"Customer not found: {customer_id}")
return asdict(customer)
async def _search_customers(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute search_customers tool."""
query = arguments.get("query")
limit = arguments.get("limit", 10)
if not query:
raise ValueError("query is required")
customers = await self.db.search_customers(query, limit)
return {
"customers": [asdict(customer) for customer in customers]
}
async def _add_lead(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute add_lead tool."""
return asdict(await self.db.add_lead(arguments))
async def _get_sales_pipeline(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute get_sales_pipeline tool."""
limit = arguments.get("limit", 50)
pipeline = await self.db.get_sales_pipeline(limit)
return {"pipeline": pipeline}
async def _get_crm_metrics(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute get_crm_metrics tool."""
return await self.db.get_metrics()
def _error_response(self, request_id: str, code: int, message: str) -> Dict[str, Any]:
"""Create error response."""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}
async def start_server(self):
"""Start the CRM MCP server."""
from aiohttp import web, ClientTimeout
import aiohttp_cors
# Create aiohttp application
app = web.Application()
# Add CORS support
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
allow_methods="*"
)
})
# Add MCP endpoint
async def mcp_handler(request):
try:
data = await request.json()
result = await self.handle_request(data)
return web.json_response(result)
except json.JSONDecodeError:
return web.json_response({
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": "Parse error"}
}, status=400)
except Exception as e:
logger.error("Request processing failed", error=str(e))
return web.json_response({
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32603, "message": "Internal error"}
}, status=500)
# Add routes
app.router.add_post('/mcp', mcp_handler)
app.router.add_get('/health', self._health_check)
# Configure CORS for all routes
for route in list(app.router.routes()):
cors.add(route)
# Start server
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', self.port)
await site.start()
logger.info(f"CRM MCP Server started on port {self.port}")
return runner
async def _health_check(self, request):
"""Health check endpoint."""
return web.json_response({
"status": "healthy",
"service": "CRM MCP Server",
"version": "1.0.0",
"timestamp": datetime.utcnow().isoformat()
})
async def stop_server(self, runner):
"""Stop the CRM MCP server."""
await runner.cleanup()
async def main():
"""Main entry point for the CRM server."""
server = CRMMCPServer()
runner = await server.start_server()
try:
# Keep server running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down CRM server...")
await server.stop_server(runner)
if __name__ == "__main__":
asyncio.run(main())