""" Sample MCP Database/CRM Server Demonstrates database operations, data management, and business logic integration """ import asyncio import json import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import sqlite3 import uuid from dataclasses import dataclass, asdict from enum import Enum import structlog # Configure structured logging logging.basicConfig(level=logging.INFO) logger = structlog.get_logger() class CustomerStatus(Enum): """Customer status enumeration.""" ACTIVE = "active" INACTIVE = "inactive" PENDING = "pending" SUSPENDED = "suspended" @dataclass class Customer: """Customer data model.""" id: str name: str email: str phone: Optional[str] status: CustomerStatus created_at: str last_contact: Optional[str] tags: List[str] notes: str lifetime_value: float = 0.0 @dataclass class Lead: """Lead data model.""" id: str name: str email: str phone: Optional[str] source: str status: str score: int assigned_to: Optional[str] created_at: str notes: str @dataclass class Opportunity: """Opportunity data model.""" id: str customer_id: str title: str value: float probability: int stage: str close_date: Optional[str] assigned_to: Optional[str] created_at: str notes: str class DatabaseManager: """Database manager for CRM operations.""" def __init__(self, db_path: str = "crm_database.db"): self.db_path = db_path self._initialize_database() def _initialize_database(self): """Initialize the database with required tables.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create customers table cursor.execute(""" CREATE TABLE IF NOT EXISTS customers ( id TEXT PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, phone TEXT, status TEXT NOT NULL, created_at TEXT NOT NULL, last_contact TEXT, tags TEXT, notes TEXT DEFAULT '', lifetime_value REAL DEFAULT 0.0 ) """) # Create leads table cursor.execute(""" CREATE TABLE IF NOT EXISTS leads ( id TEXT PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, phone TEXT, source TEXT NOT NULL, status TEXT NOT NULL, score INTEGER DEFAULT 0, assigned_to TEXT, created_at TEXT NOT NULL, notes TEXT DEFAULT '' ) """) # Create opportunities table cursor.execute(""" CREATE TABLE IF NOT EXISTS opportunities ( id TEXT PRIMARY KEY, customer_id TEXT NOT NULL, title TEXT NOT NULL, value REAL NOT NULL, probability INTEGER DEFAULT 0, stage TEXT NOT NULL, close_date TEXT, assigned_to TEXT, created_at TEXT NOT NULL, notes TEXT DEFAULT '', FOREIGN KEY (customer_id) REFERENCES customers (id) ) """) conn.commit() logger.info("Database initialized successfully") async def add_customer(self, customer_data: Dict[str, Any]) -> Customer: """Add a new customer to the database.""" customer_id = str(uuid.uuid4()) customer = Customer( id=customer_id, name=customer_data["name"], email=customer_data["email"], phone=customer_data.get("phone"), status=CustomerStatus(customer_data.get("status", "active")), created_at=datetime.utcnow().isoformat(), last_contact=None, tags=customer_data.get("tags", []), notes=customer_data.get("notes", ""), lifetime_value=customer_data.get("lifetime_value", 0.0) ) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO customers (id, name, email, phone, status, created_at, last_contact, tags, notes, lifetime_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( customer.id, customer.name, customer.email, customer.phone, customer.status.value, customer.created_at, customer.last_contact, json.dumps(customer.tags), customer.notes, customer.lifetime_value )) conn.commit() logger.info("Customer added", customer_id=customer_id, email=customer.email) return customer async def get_customer(self, customer_id: str) -> Optional[Customer]: """Get customer by ID.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM customers WHERE id = ?", (customer_id,)) row = cursor.fetchone() if row: return Customer( id=row[0], name=row[1], email=row[2], phone=row[3], status=CustomerStatus(row[4]), created_at=row[5], last_contact=row[6], tags=json.loads(row[7] or "[]"), notes=row[8], lifetime_value=row[9] ) return None async def search_customers(self, query: str, limit: int = 10) -> List[Customer]: """Search customers by name or email.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM customers WHERE name LIKE ? OR email LIKE ? LIMIT ? """, (f"%{query}%", f"%{query}%", limit)) customers = [] for row in cursor.fetchall(): customers.append(Customer( id=row[0], name=row[1], email=row[2], phone=row[3], status=CustomerStatus(row[4]), created_at=row[5], last_contact=row[6], tags=json.loads(row[7] or "[]"), notes=row[8], lifetime_value=row[9] )) return customers async def add_lead(self, lead_data: Dict[str, Any]) -> Lead: """Add a new lead.""" lead_id = str(uuid.uuid4()) lead = Lead( id=lead_id, name=lead_data["name"], email=lead_data["email"], phone=lead_data.get("phone"), source=lead_data["source"], status=lead_data.get("status", "new"), score=lead_data.get("score", 0), assigned_to=lead_data.get("assigned_to"), created_at=datetime.utcnow().isoformat(), notes=lead_data.get("notes", "") ) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO leads (id, name, email, phone, source, status, score, assigned_to, created_at, notes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( lead.id, lead.name, lead.email, lead.phone, lead.source, lead.status, lead.score, lead.assigned_to, lead.created_at, lead.notes )) conn.commit() logger.info("Lead added", lead_id=lead_id, email=lead.email) return lead async def get_leads_by_status(self, status: str, limit: int = 20) -> List[Lead]: """Get leads by status.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM leads WHERE status = ? LIMIT ?", (status, limit)) leads = [] for row in cursor.fetchall(): leads.append(Lead( id=row[0], name=row[1], email=row[2], phone=row[3], source=row[4], status=row[5], score=row[6], assigned_to=row[7], created_at=row[8], notes=row[9] )) return leads async def add_opportunity(self, opportunity_data: Dict[str, Any]) -> Opportunity: """Add a new opportunity.""" opp_id = str(uuid.uuid4()) opportunity = Opportunity( id=opp_id, customer_id=opportunity_data["customer_id"], title=opportunity_data["title"], value=opportunity_data["value"], probability=opportunity_data.get("probability", 0), stage=opportunity_data.get("stage", "prospecting"), close_date=opportunity_data.get("close_date"), assigned_to=opportunity_data.get("assigned_to"), created_at=datetime.utcnow().isoformat(), notes=opportunity_data.get("notes", "") ) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO opportunities (id, customer_id, title, value, probability, stage, close_date, assigned_to, created_at, notes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( opportunity.id, opportunity.customer_id, opportunity.title, opportunity.value, opportunity.probability, opportunity.stage, opportunity.close_date, opportunity.assigned_to, opportunity.created_at, opportunity.notes )) conn.commit() logger.info("Opportunity added", opportunity_id=opp_id, value=opportunity.value) return opportunity async def get_sales_pipeline(self, limit: int = 50) -> List[Dict[str, Any]]: """Get sales pipeline with customer information.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT o.id, o.title, o.value, o.probability, o.stage, o.close_date, c.name as customer_name, c.email as customer_email, o.assigned_to, o.created_at, o.notes FROM opportunities o JOIN customers c ON o.customer_id = c.id ORDER BY o.created_at DESC LIMIT ? """, (limit,)) pipeline = [] for row in cursor.fetchall(): pipeline.append({ "opportunity_id": row[0], "title": row[1], "value": row[2], "probability": row[3], "stage": row[4], "close_date": row[5], "customer_name": row[6], "customer_email": row[7], "assigned_to": row[8], "created_at": row[9], "notes": row[10] }) return pipeline async def get_metrics(self) -> Dict[str, Any]: """Get CRM metrics and analytics.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Customer metrics cursor.execute("SELECT COUNT(*) FROM customers WHERE status = 'active'") active_customers = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM customers") total_customers = cursor.fetchone()[0] # Lead metrics cursor.execute("SELECT COUNT(*) FROM leads WHERE status = 'new'") new_leads = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM leads WHERE created_at >= ?", [(datetime.utcnow() - timedelta(days=30)).isoformat()]) leads_this_month = cursor.fetchone()[0] # Opportunity metrics cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage NOT IN ('won', 'lost')") pipeline_value = cursor.fetchone()[0] or 0 cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage = 'won'") won_value = cursor.fetchone()[0] or 0 cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage = 'won' AND close_date >= ?", [(datetime.utcnow() - timedelta(days=30)).isoformat()]) won_value_this_month = cursor.fetchone()[0] or 0 return { "customers": { "active": active_customers, "total": total_customers, "conversion_rate": round((active_customers / max(total_customers, 1)) * 100, 2) }, "leads": { "new": new_leads, "this_month": leads_this_month }, "opportunities": { "pipeline_value": pipeline_value, "won_value": won_value, "won_value_this_month": won_value_this_month } } class CRMMCPServer: """Model Context Protocol CRM Server implementation.""" def __init__(self, port: int = 8002): self.port = port self.db = DatabaseManager() self.tools = { "add_customer": { "name": "add_customer", "description": "Add a new customer to the CRM system", "inputSchema": { "type": "object", "properties": { "name": { "type": "string", "description": "Customer name", "minLength": 1, "maxLength": 200 }, "email": { "type": "string", "description": "Customer email address", "format": "email" }, "phone": { "type": "string", "description": "Customer phone number", "pattern": "^[+]?[\\d\\s\\-\\(\\)]+$" }, "status": { "type": "string", "enum": ["active", "inactive", "pending", "suspended"], "default": "active" }, "tags": { "type": "array", "items": {"type": "string"}, "description": "Customer tags for categorization" }, "notes": { "type": "string", "description": "Additional notes about the customer", "maxLength": 1000 }, "lifetime_value": { "type": "number", "description": "Customer lifetime value in USD", "minimum": 0 } }, "required": ["name", "email"] }, "outputSchema": { "type": "object", "properties": { "id": {"type": "string"}, "name": {"type": "string"}, "email": {"type": "string"}, "phone": {"type": "string"}, "status": {"type": "string"}, "created_at": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}, "notes": {"type": "string"}, "lifetime_value": {"type": "number"} } }, "examples": [ { "name": "John Doe", "email": "john.doe@example.com", "phone": "+1-555-0123", "status": "active", "tags": ["enterprise", "priority"], "lifetime_value": 50000.0 } ], "tags": ["crm", "customer", "add", "create"] }, "get_customer": { "name": "get_customer", "description": "Get customer information by customer ID", "inputSchema": { "type": "object", "properties": { "customer_id": { "type": "string", "description": "Customer UUID", "format": "uuid" } }, "required": ["customer_id"] }, "outputSchema": { "type": "object", "properties": { "id": {"type": "string"}, "name": {"type": "string"}, "email": {"type": "string"}, "phone": {"type": "string"}, "status": {"type": "string"}, "created_at": {"type": "string"}, "last_contact": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}, "notes": {"type": "string"}, "lifetime_value": {"type": "number"} } }, "examples": [ {"customer_id": "123e4567-e89b-12d3-a456-426614174000"} ], "tags": ["crm", "customer", "get", "retrieve"] }, "search_customers": { "name": "search_customers", "description": "Search customers by name or email", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query for name or email", "minLength": 1, "maxLength": 100 }, "limit": { "type": "integer", "description": "Maximum number of results", "minimum": 1, "maximum": 100, "default": 10 } }, "required": ["query"] }, "outputSchema": { "type": "object", "properties": { "customers": { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "string"}, "name": {"type": "string"}, "email": {"type": "string"}, "phone": {"type": "string"}, "status": {"type": "string"}, "created_at": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}, "notes": {"type": "string"}, "lifetime_value": {"type": "number"} } } } } }, "examples": [ {"query": "john", "limit": 5} ], "tags": ["crm", "customer", "search", "find"] }, "add_lead": { "name": "add_lead", "description": "Add a new sales lead", "inputSchema": { "type": "object", "properties": { "name": {"type": "string", "description": "Lead name"}, "email": {"type": "string", "format": "email"}, "phone": {"type": "string"}, "source": { "type": "string", "description": "Lead source", "enum": ["website", "referral", "social", "email", "phone", "event"] }, "status": { "type": "string", "enum": ["new", "contacted", "qualified", "unqualified"], "default": "new" }, "score": { "type": "integer", "description": "Lead score (1-100)", "minimum": 1, "maximum": 100 }, "assigned_to": {"type": "string", "description": "Assigned sales rep"}, "notes": {"type": "string", "description": "Additional notes"} }, "required": ["name", "email", "source"] }, "outputSchema": { "type": "object", "properties": { "id": {"type": "string"}, "name": {"type": "string"}, "email": {"type": "string"}, "phone": {"type": "string"}, "source": {"type": "string"}, "status": {"type": "string"}, "score": {"type": "integer"}, "assigned_to": {"type": "string"}, "created_at": {"type": "string"}, "notes": {"type": "string"} } }, "examples": [ { "name": "Jane Smith", "email": "jane.smith@company.com", "source": "website", "score": 85 } ], "tags": ["crm", "lead", "add", "sales"] }, "get_sales_pipeline": { "name": "get_sales_pipeline", "description": "Get sales pipeline with opportunity details", "inputSchema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of opportunities to return", "minimum": 1, "maximum": 200, "default": 50 } } }, "outputSchema": { "type": "object", "properties": { "pipeline": { "type": "array", "items": { "type": "object", "properties": { "opportunity_id": {"type": "string"}, "title": {"type": "string"}, "value": {"type": "number"}, "probability": {"type": "integer"}, "stage": {"type": "string"}, "close_date": {"type": "string"}, "customer_name": {"type": "string"}, "customer_email": {"type": "string"}, "assigned_to": {"type": "string"}, "created_at": {"type": "string"}, "notes": {"type": "string"} } } } } }, "examples": [ {"limit": 20} ], "tags": ["crm", "sales", "pipeline", "opportunities"] }, "get_crm_metrics": { "name": "get_crm_metrics", "description": "Get CRM metrics and analytics dashboard", "inputSchema": { "type": "object", "properties": {} }, "outputSchema": { "type": "object", "properties": { "customers": { "type": "object", "properties": { "active": {"type": "integer"}, "total": {"type": "integer"}, "conversion_rate": {"type": "number"} } }, "leads": { "type": "object", "properties": { "new": {"type": "integer"}, "this_month": {"type": "integer"} } }, "opportunities": { "type": "object", "properties": { "pipeline_value": {"type": "number"}, "won_value": {"type": "number"}, "won_value_this_month": {"type": "number"} } } } }, "examples": [{}], "tags": ["crm", "metrics", "analytics", "dashboard"] } } async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle incoming MCP requests.""" method = request.get("method") params = request.get("params", {}) request_id = request.get("id") try: if method == "tools/list": return await self._handle_tools_list(request_id) elif method == "tools/call": return await self._handle_tools_call(request_id, params) elif method == "initialize": return await self._handle_initialize(request_id, params) else: return self._error_response(request_id, -32601, "Method not found") except Exception as e: logger.error("Request handling failed", method=method, error=str(e)) return self._error_response(request_id, -32603, f"Internal error: {str(e)}") async def _handle_tools_list(self, request_id: str) -> Dict[str, Any]: """Handle tools/list request.""" tools_list = [] for tool_name, tool_info in self.tools.items(): tools_list.append({ "name": tool_info["name"], "description": tool_info["description"], "inputSchema": tool_info["inputSchema"], "outputSchema": tool_info.get("outputSchema"), "examples": tool_info.get("examples", []), "tags": tool_info.get("tags", []) }) return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools_list } } async def _handle_tools_call(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tools/call request.""" tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name not in self.tools: return self._error_response(request_id, -32601, f"Tool '{tool_name}' not found") # Execute the tool try: if tool_name == "add_customer": result = await self._add_customer(arguments) elif tool_name == "get_customer": result = await self._get_customer(arguments) elif tool_name == "search_customers": result = await self._search_customers(arguments) elif tool_name == "add_lead": result = await self._add_lead(arguments) elif tool_name == "get_sales_pipeline": result = await self._get_sales_pipeline(arguments) elif tool_name == "get_crm_metrics": result = await self._get_crm_metrics(arguments) else: return self._error_response(request_id, -32601, f"Tool '{tool_name}' not implemented") return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ] } } except Exception as e: logger.error("Tool execution failed", tool=tool_name, error=str(e)) return self._error_response(request_id, -32603, f"Tool execution failed: {str(e)}") async def _handle_initialize(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]: """Handle initialize request.""" return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "CRM MCP Server", "version": "1.0.0", "description": "Provides customer relationship management and sales operations" } } } async def _add_customer(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute add_customer tool.""" return asdict(await self.db.add_customer(arguments)) async def _get_customer(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute get_customer tool.""" customer_id = arguments.get("customer_id") if not customer_id: raise ValueError("customer_id is required") customer = await self.db.get_customer(customer_id) if not customer: raise ValueError(f"Customer not found: {customer_id}") return asdict(customer) async def _search_customers(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute search_customers tool.""" query = arguments.get("query") limit = arguments.get("limit", 10) if not query: raise ValueError("query is required") customers = await self.db.search_customers(query, limit) return { "customers": [asdict(customer) for customer in customers] } async def _add_lead(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute add_lead tool.""" return asdict(await self.db.add_lead(arguments)) async def _get_sales_pipeline(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute get_sales_pipeline tool.""" limit = arguments.get("limit", 50) pipeline = await self.db.get_sales_pipeline(limit) return {"pipeline": pipeline} async def _get_crm_metrics(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute get_crm_metrics tool.""" return await self.db.get_metrics() def _error_response(self, request_id: str, code: int, message: str) -> Dict[str, Any]: """Create error response.""" return { "jsonrpc": "2.0", "id": request_id, "error": { "code": code, "message": message } } async def start_server(self): """Start the CRM MCP server.""" from aiohttp import web, ClientTimeout import aiohttp_cors # Create aiohttp application app = web.Application() # Add CORS support cors = aiohttp_cors.setup(app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*" ) }) # Add MCP endpoint async def mcp_handler(request): try: data = await request.json() result = await self.handle_request(data) return web.json_response(result) except json.JSONDecodeError: return web.json_response({ "jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"} }, status=400) except Exception as e: logger.error("Request processing failed", error=str(e)) return web.json_response({ "jsonrpc": "2.0", "id": None, "error": {"code": -32603, "message": "Internal error"} }, status=500) # Add routes app.router.add_post('/mcp', mcp_handler) app.router.add_get('/health', self._health_check) # Configure CORS for all routes for route in list(app.router.routes()): cors.add(route) # Start server runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', self.port) await site.start() logger.info(f"CRM MCP Server started on port {self.port}") return runner async def _health_check(self, request): """Health check endpoint.""" return web.json_response({ "status": "healthy", "service": "CRM MCP Server", "version": "1.0.0", "timestamp": datetime.utcnow().isoformat() }) async def stop_server(self, runner): """Stop the CRM MCP server.""" await runner.cleanup() async def main(): """Main entry point for the CRM server.""" server = CRMMCPServer() runner = await server.start_server() try: # Keep server running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down CRM server...") await server.stop_server(runner) if __name__ == "__main__": asyncio.run(main())