|
|
""" |
|
|
Sample MCP Database/CRM Server |
|
|
Demonstrates database operations, data management, and business logic integration |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Any, Optional |
|
|
import sqlite3 |
|
|
import uuid |
|
|
from dataclasses import dataclass, asdict |
|
|
from enum import Enum |
|
|
import structlog |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = structlog.get_logger() |
|
|
|
|
|
|
|
|
class CustomerStatus(Enum): |
|
|
"""Customer status enumeration.""" |
|
|
ACTIVE = "active" |
|
|
INACTIVE = "inactive" |
|
|
PENDING = "pending" |
|
|
SUSPENDED = "suspended" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Customer: |
|
|
"""Customer data model.""" |
|
|
id: str |
|
|
name: str |
|
|
email: str |
|
|
phone: Optional[str] |
|
|
status: CustomerStatus |
|
|
created_at: str |
|
|
last_contact: Optional[str] |
|
|
tags: List[str] |
|
|
notes: str |
|
|
lifetime_value: float = 0.0 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Lead: |
|
|
"""Lead data model.""" |
|
|
id: str |
|
|
name: str |
|
|
email: str |
|
|
phone: Optional[str] |
|
|
source: str |
|
|
status: str |
|
|
score: int |
|
|
assigned_to: Optional[str] |
|
|
created_at: str |
|
|
notes: str |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Opportunity: |
|
|
"""Opportunity data model.""" |
|
|
id: str |
|
|
customer_id: str |
|
|
title: str |
|
|
value: float |
|
|
probability: int |
|
|
stage: str |
|
|
close_date: Optional[str] |
|
|
assigned_to: Optional[str] |
|
|
created_at: str |
|
|
notes: str |
|
|
|
|
|
|
|
|
class DatabaseManager: |
|
|
"""Database manager for CRM operations.""" |
|
|
|
|
|
def __init__(self, db_path: str = "crm_database.db"): |
|
|
self.db_path = db_path |
|
|
self._initialize_database() |
|
|
|
|
|
def _initialize_database(self): |
|
|
"""Initialize the database with required tables.""" |
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS customers ( |
|
|
id TEXT PRIMARY KEY, |
|
|
name TEXT NOT NULL, |
|
|
email TEXT UNIQUE NOT NULL, |
|
|
phone TEXT, |
|
|
status TEXT NOT NULL, |
|
|
created_at TEXT NOT NULL, |
|
|
last_contact TEXT, |
|
|
tags TEXT, |
|
|
notes TEXT DEFAULT '', |
|
|
lifetime_value REAL DEFAULT 0.0 |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS leads ( |
|
|
id TEXT PRIMARY KEY, |
|
|
name TEXT NOT NULL, |
|
|
email TEXT UNIQUE NOT NULL, |
|
|
phone TEXT, |
|
|
source TEXT NOT NULL, |
|
|
status TEXT NOT NULL, |
|
|
score INTEGER DEFAULT 0, |
|
|
assigned_to TEXT, |
|
|
created_at TEXT NOT NULL, |
|
|
notes TEXT DEFAULT '' |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS opportunities ( |
|
|
id TEXT PRIMARY KEY, |
|
|
customer_id TEXT NOT NULL, |
|
|
title TEXT NOT NULL, |
|
|
value REAL NOT NULL, |
|
|
probability INTEGER DEFAULT 0, |
|
|
stage TEXT NOT NULL, |
|
|
close_date TEXT, |
|
|
assigned_to TEXT, |
|
|
created_at TEXT NOT NULL, |
|
|
notes TEXT DEFAULT '', |
|
|
FOREIGN KEY (customer_id) REFERENCES customers (id) |
|
|
) |
|
|
""") |
|
|
|
|
|
conn.commit() |
|
|
logger.info("Database initialized successfully") |
|
|
|
|
|
async def add_customer(self, customer_data: Dict[str, Any]) -> Customer: |
|
|
"""Add a new customer to the database.""" |
|
|
customer_id = str(uuid.uuid4()) |
|
|
customer = Customer( |
|
|
id=customer_id, |
|
|
name=customer_data["name"], |
|
|
email=customer_data["email"], |
|
|
phone=customer_data.get("phone"), |
|
|
status=CustomerStatus(customer_data.get("status", "active")), |
|
|
created_at=datetime.utcnow().isoformat(), |
|
|
last_contact=None, |
|
|
tags=customer_data.get("tags", []), |
|
|
notes=customer_data.get("notes", ""), |
|
|
lifetime_value=customer_data.get("lifetime_value", 0.0) |
|
|
) |
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
INSERT INTO customers (id, name, email, phone, status, created_at, last_contact, tags, notes, lifetime_value) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
""", ( |
|
|
customer.id, customer.name, customer.email, customer.phone, |
|
|
customer.status.value, customer.created_at, customer.last_contact, |
|
|
json.dumps(customer.tags), customer.notes, customer.lifetime_value |
|
|
)) |
|
|
conn.commit() |
|
|
|
|
|
logger.info("Customer added", customer_id=customer_id, email=customer.email) |
|
|
return customer |
|
|
|
|
|
async def get_customer(self, customer_id: str) -> Optional[Customer]: |
|
|
"""Get customer by ID.""" |
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute("SELECT * FROM customers WHERE id = ?", (customer_id,)) |
|
|
row = cursor.fetchone() |
|
|
|
|
|
if row: |
|
|
return Customer( |
|
|
id=row[0], name=row[1], email=row[2], phone=row[3], |
|
|
status=CustomerStatus(row[4]), created_at=row[5], |
|
|
last_contact=row[6], tags=json.loads(row[7] or "[]"), |
|
|
notes=row[8], lifetime_value=row[9] |
|
|
) |
|
|
return None |
|
|
|
|
|
async def search_customers(self, query: str, limit: int = 10) -> List[Customer]: |
|
|
"""Search customers by name or email.""" |
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
SELECT * FROM customers |
|
|
WHERE name LIKE ? OR email LIKE ? |
|
|
LIMIT ? |
|
|
""", (f"%{query}%", f"%{query}%", limit)) |
|
|
|
|
|
customers = [] |
|
|
for row in cursor.fetchall(): |
|
|
customers.append(Customer( |
|
|
id=row[0], name=row[1], email=row[2], phone=row[3], |
|
|
status=CustomerStatus(row[4]), created_at=row[5], |
|
|
last_contact=row[6], tags=json.loads(row[7] or "[]"), |
|
|
notes=row[8], lifetime_value=row[9] |
|
|
)) |
|
|
return customers |
|
|
|
|
|
async def add_lead(self, lead_data: Dict[str, Any]) -> Lead: |
|
|
"""Add a new lead.""" |
|
|
lead_id = str(uuid.uuid4()) |
|
|
lead = Lead( |
|
|
id=lead_id, |
|
|
name=lead_data["name"], |
|
|
email=lead_data["email"], |
|
|
phone=lead_data.get("phone"), |
|
|
source=lead_data["source"], |
|
|
status=lead_data.get("status", "new"), |
|
|
score=lead_data.get("score", 0), |
|
|
assigned_to=lead_data.get("assigned_to"), |
|
|
created_at=datetime.utcnow().isoformat(), |
|
|
notes=lead_data.get("notes", "") |
|
|
) |
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
INSERT INTO leads (id, name, email, phone, source, status, score, assigned_to, created_at, notes) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
""", ( |
|
|
lead.id, lead.name, lead.email, lead.phone, lead.source, |
|
|
lead.status, lead.score, lead.assigned_to, lead.created_at, lead.notes |
|
|
)) |
|
|
conn.commit() |
|
|
|
|
|
logger.info("Lead added", lead_id=lead_id, email=lead.email) |
|
|
return lead |
|
|
|
|
|
async def get_leads_by_status(self, status: str, limit: int = 20) -> List[Lead]: |
|
|
"""Get leads by status.""" |
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute("SELECT * FROM leads WHERE status = ? LIMIT ?", (status, limit)) |
|
|
|
|
|
leads = [] |
|
|
for row in cursor.fetchall(): |
|
|
leads.append(Lead( |
|
|
id=row[0], name=row[1], email=row[2], phone=row[3], |
|
|
source=row[4], status=row[5], score=row[6], |
|
|
assigned_to=row[7], created_at=row[8], notes=row[9] |
|
|
)) |
|
|
return leads |
|
|
|
|
|
async def add_opportunity(self, opportunity_data: Dict[str, Any]) -> Opportunity: |
|
|
"""Add a new opportunity.""" |
|
|
opp_id = str(uuid.uuid4()) |
|
|
opportunity = Opportunity( |
|
|
id=opp_id, |
|
|
customer_id=opportunity_data["customer_id"], |
|
|
title=opportunity_data["title"], |
|
|
value=opportunity_data["value"], |
|
|
probability=opportunity_data.get("probability", 0), |
|
|
stage=opportunity_data.get("stage", "prospecting"), |
|
|
close_date=opportunity_data.get("close_date"), |
|
|
assigned_to=opportunity_data.get("assigned_to"), |
|
|
created_at=datetime.utcnow().isoformat(), |
|
|
notes=opportunity_data.get("notes", "") |
|
|
) |
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
INSERT INTO opportunities (id, customer_id, title, value, probability, stage, close_date, assigned_to, created_at, notes) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
""", ( |
|
|
opportunity.id, opportunity.customer_id, opportunity.title, |
|
|
opportunity.value, opportunity.probability, opportunity.stage, |
|
|
opportunity.close_date, opportunity.assigned_to, opportunity.created_at, opportunity.notes |
|
|
)) |
|
|
conn.commit() |
|
|
|
|
|
logger.info("Opportunity added", opportunity_id=opp_id, value=opportunity.value) |
|
|
return opportunity |
|
|
|
|
|
async def get_sales_pipeline(self, limit: int = 50) -> List[Dict[str, Any]]: |
|
|
"""Get sales pipeline with customer information.""" |
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
SELECT o.id, o.title, o.value, o.probability, o.stage, o.close_date, |
|
|
c.name as customer_name, c.email as customer_email, |
|
|
o.assigned_to, o.created_at, o.notes |
|
|
FROM opportunities o |
|
|
JOIN customers c ON o.customer_id = c.id |
|
|
ORDER BY o.created_at DESC |
|
|
LIMIT ? |
|
|
""", (limit,)) |
|
|
|
|
|
pipeline = [] |
|
|
for row in cursor.fetchall(): |
|
|
pipeline.append({ |
|
|
"opportunity_id": row[0], |
|
|
"title": row[1], |
|
|
"value": row[2], |
|
|
"probability": row[3], |
|
|
"stage": row[4], |
|
|
"close_date": row[5], |
|
|
"customer_name": row[6], |
|
|
"customer_email": row[7], |
|
|
"assigned_to": row[8], |
|
|
"created_at": row[9], |
|
|
"notes": row[10] |
|
|
}) |
|
|
return pipeline |
|
|
|
|
|
async def get_metrics(self) -> Dict[str, Any]: |
|
|
"""Get CRM metrics and analytics.""" |
|
|
with sqlite3.connect(self.db_path) as conn: |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM customers WHERE status = 'active'") |
|
|
active_customers = cursor.fetchone()[0] |
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM customers") |
|
|
total_customers = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM leads WHERE status = 'new'") |
|
|
new_leads = cursor.fetchone()[0] |
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM leads WHERE created_at >= ?", |
|
|
[(datetime.utcnow() - timedelta(days=30)).isoformat()]) |
|
|
leads_this_month = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage NOT IN ('won', 'lost')") |
|
|
pipeline_value = cursor.fetchone()[0] or 0 |
|
|
|
|
|
cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage = 'won'") |
|
|
won_value = cursor.fetchone()[0] or 0 |
|
|
|
|
|
cursor.execute("SELECT SUM(value) FROM opportunities WHERE stage = 'won' AND close_date >= ?", |
|
|
[(datetime.utcnow() - timedelta(days=30)).isoformat()]) |
|
|
won_value_this_month = cursor.fetchone()[0] or 0 |
|
|
|
|
|
return { |
|
|
"customers": { |
|
|
"active": active_customers, |
|
|
"total": total_customers, |
|
|
"conversion_rate": round((active_customers / max(total_customers, 1)) * 100, 2) |
|
|
}, |
|
|
"leads": { |
|
|
"new": new_leads, |
|
|
"this_month": leads_this_month |
|
|
}, |
|
|
"opportunities": { |
|
|
"pipeline_value": pipeline_value, |
|
|
"won_value": won_value, |
|
|
"won_value_this_month": won_value_this_month |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class CRMMCPServer: |
|
|
"""Model Context Protocol CRM Server implementation.""" |
|
|
|
|
|
def __init__(self, port: int = 8002): |
|
|
self.port = port |
|
|
self.db = DatabaseManager() |
|
|
self.tools = { |
|
|
"add_customer": { |
|
|
"name": "add_customer", |
|
|
"description": "Add a new customer to the CRM system", |
|
|
"inputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"name": { |
|
|
"type": "string", |
|
|
"description": "Customer name", |
|
|
"minLength": 1, |
|
|
"maxLength": 200 |
|
|
}, |
|
|
"email": { |
|
|
"type": "string", |
|
|
"description": "Customer email address", |
|
|
"format": "email" |
|
|
}, |
|
|
"phone": { |
|
|
"type": "string", |
|
|
"description": "Customer phone number", |
|
|
"pattern": "^[+]?[\\d\\s\\-\\(\\)]+$" |
|
|
}, |
|
|
"status": { |
|
|
"type": "string", |
|
|
"enum": ["active", "inactive", "pending", "suspended"], |
|
|
"default": "active" |
|
|
}, |
|
|
"tags": { |
|
|
"type": "array", |
|
|
"items": {"type": "string"}, |
|
|
"description": "Customer tags for categorization" |
|
|
}, |
|
|
"notes": { |
|
|
"type": "string", |
|
|
"description": "Additional notes about the customer", |
|
|
"maxLength": 1000 |
|
|
}, |
|
|
"lifetime_value": { |
|
|
"type": "number", |
|
|
"description": "Customer lifetime value in USD", |
|
|
"minimum": 0 |
|
|
} |
|
|
}, |
|
|
"required": ["name", "email"] |
|
|
}, |
|
|
"outputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"id": {"type": "string"}, |
|
|
"name": {"type": "string"}, |
|
|
"email": {"type": "string"}, |
|
|
"phone": {"type": "string"}, |
|
|
"status": {"type": "string"}, |
|
|
"created_at": {"type": "string"}, |
|
|
"tags": {"type": "array", "items": {"type": "string"}}, |
|
|
"notes": {"type": "string"}, |
|
|
"lifetime_value": {"type": "number"} |
|
|
} |
|
|
}, |
|
|
"examples": [ |
|
|
{ |
|
|
"name": "John Doe", |
|
|
"email": "john.doe@example.com", |
|
|
"phone": "+1-555-0123", |
|
|
"status": "active", |
|
|
"tags": ["enterprise", "priority"], |
|
|
"lifetime_value": 50000.0 |
|
|
} |
|
|
], |
|
|
"tags": ["crm", "customer", "add", "create"] |
|
|
}, |
|
|
"get_customer": { |
|
|
"name": "get_customer", |
|
|
"description": "Get customer information by customer ID", |
|
|
"inputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"customer_id": { |
|
|
"type": "string", |
|
|
"description": "Customer UUID", |
|
|
"format": "uuid" |
|
|
} |
|
|
}, |
|
|
"required": ["customer_id"] |
|
|
}, |
|
|
"outputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"id": {"type": "string"}, |
|
|
"name": {"type": "string"}, |
|
|
"email": {"type": "string"}, |
|
|
"phone": {"type": "string"}, |
|
|
"status": {"type": "string"}, |
|
|
"created_at": {"type": "string"}, |
|
|
"last_contact": {"type": "string"}, |
|
|
"tags": {"type": "array", "items": {"type": "string"}}, |
|
|
"notes": {"type": "string"}, |
|
|
"lifetime_value": {"type": "number"} |
|
|
} |
|
|
}, |
|
|
"examples": [ |
|
|
{"customer_id": "123e4567-e89b-12d3-a456-426614174000"} |
|
|
], |
|
|
"tags": ["crm", "customer", "get", "retrieve"] |
|
|
}, |
|
|
"search_customers": { |
|
|
"name": "search_customers", |
|
|
"description": "Search customers by name or email", |
|
|
"inputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"query": { |
|
|
"type": "string", |
|
|
"description": "Search query for name or email", |
|
|
"minLength": 1, |
|
|
"maxLength": 100 |
|
|
}, |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of results", |
|
|
"minimum": 1, |
|
|
"maximum": 100, |
|
|
"default": 10 |
|
|
} |
|
|
}, |
|
|
"required": ["query"] |
|
|
}, |
|
|
"outputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"customers": { |
|
|
"type": "array", |
|
|
"items": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"id": {"type": "string"}, |
|
|
"name": {"type": "string"}, |
|
|
"email": {"type": "string"}, |
|
|
"phone": {"type": "string"}, |
|
|
"status": {"type": "string"}, |
|
|
"created_at": {"type": "string"}, |
|
|
"tags": {"type": "array", "items": {"type": "string"}}, |
|
|
"notes": {"type": "string"}, |
|
|
"lifetime_value": {"type": "number"} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}, |
|
|
"examples": [ |
|
|
{"query": "john", "limit": 5} |
|
|
], |
|
|
"tags": ["crm", "customer", "search", "find"] |
|
|
}, |
|
|
"add_lead": { |
|
|
"name": "add_lead", |
|
|
"description": "Add a new sales lead", |
|
|
"inputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"name": {"type": "string", "description": "Lead name"}, |
|
|
"email": {"type": "string", "format": "email"}, |
|
|
"phone": {"type": "string"}, |
|
|
"source": { |
|
|
"type": "string", |
|
|
"description": "Lead source", |
|
|
"enum": ["website", "referral", "social", "email", "phone", "event"] |
|
|
}, |
|
|
"status": { |
|
|
"type": "string", |
|
|
"enum": ["new", "contacted", "qualified", "unqualified"], |
|
|
"default": "new" |
|
|
}, |
|
|
"score": { |
|
|
"type": "integer", |
|
|
"description": "Lead score (1-100)", |
|
|
"minimum": 1, |
|
|
"maximum": 100 |
|
|
}, |
|
|
"assigned_to": {"type": "string", "description": "Assigned sales rep"}, |
|
|
"notes": {"type": "string", "description": "Additional notes"} |
|
|
}, |
|
|
"required": ["name", "email", "source"] |
|
|
}, |
|
|
"outputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"id": {"type": "string"}, |
|
|
"name": {"type": "string"}, |
|
|
"email": {"type": "string"}, |
|
|
"phone": {"type": "string"}, |
|
|
"source": {"type": "string"}, |
|
|
"status": {"type": "string"}, |
|
|
"score": {"type": "integer"}, |
|
|
"assigned_to": {"type": "string"}, |
|
|
"created_at": {"type": "string"}, |
|
|
"notes": {"type": "string"} |
|
|
} |
|
|
}, |
|
|
"examples": [ |
|
|
{ |
|
|
"name": "Jane Smith", |
|
|
"email": "jane.smith@company.com", |
|
|
"source": "website", |
|
|
"score": 85 |
|
|
} |
|
|
], |
|
|
"tags": ["crm", "lead", "add", "sales"] |
|
|
}, |
|
|
"get_sales_pipeline": { |
|
|
"name": "get_sales_pipeline", |
|
|
"description": "Get sales pipeline with opportunity details", |
|
|
"inputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"limit": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of opportunities to return", |
|
|
"minimum": 1, |
|
|
"maximum": 200, |
|
|
"default": 50 |
|
|
} |
|
|
} |
|
|
}, |
|
|
"outputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"pipeline": { |
|
|
"type": "array", |
|
|
"items": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"opportunity_id": {"type": "string"}, |
|
|
"title": {"type": "string"}, |
|
|
"value": {"type": "number"}, |
|
|
"probability": {"type": "integer"}, |
|
|
"stage": {"type": "string"}, |
|
|
"close_date": {"type": "string"}, |
|
|
"customer_name": {"type": "string"}, |
|
|
"customer_email": {"type": "string"}, |
|
|
"assigned_to": {"type": "string"}, |
|
|
"created_at": {"type": "string"}, |
|
|
"notes": {"type": "string"} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}, |
|
|
"examples": [ |
|
|
{"limit": 20} |
|
|
], |
|
|
"tags": ["crm", "sales", "pipeline", "opportunities"] |
|
|
}, |
|
|
"get_crm_metrics": { |
|
|
"name": "get_crm_metrics", |
|
|
"description": "Get CRM metrics and analytics dashboard", |
|
|
"inputSchema": { |
|
|
"type": "object", |
|
|
"properties": {} |
|
|
}, |
|
|
"outputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"customers": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"active": {"type": "integer"}, |
|
|
"total": {"type": "integer"}, |
|
|
"conversion_rate": {"type": "number"} |
|
|
} |
|
|
}, |
|
|
"leads": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"new": {"type": "integer"}, |
|
|
"this_month": {"type": "integer"} |
|
|
} |
|
|
}, |
|
|
"opportunities": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"pipeline_value": {"type": "number"}, |
|
|
"won_value": {"type": "number"}, |
|
|
"won_value_this_month": {"type": "number"} |
|
|
} |
|
|
} |
|
|
} |
|
|
}, |
|
|
"examples": [{}], |
|
|
"tags": ["crm", "metrics", "analytics", "dashboard"] |
|
|
} |
|
|
} |
|
|
|
|
|
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Handle incoming MCP requests.""" |
|
|
method = request.get("method") |
|
|
params = request.get("params", {}) |
|
|
request_id = request.get("id") |
|
|
|
|
|
try: |
|
|
if method == "tools/list": |
|
|
return await self._handle_tools_list(request_id) |
|
|
elif method == "tools/call": |
|
|
return await self._handle_tools_call(request_id, params) |
|
|
elif method == "initialize": |
|
|
return await self._handle_initialize(request_id, params) |
|
|
else: |
|
|
return self._error_response(request_id, -32601, "Method not found") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("Request handling failed", method=method, error=str(e)) |
|
|
return self._error_response(request_id, -32603, f"Internal error: {str(e)}") |
|
|
|
|
|
async def _handle_tools_list(self, request_id: str) -> Dict[str, Any]: |
|
|
"""Handle tools/list request.""" |
|
|
tools_list = [] |
|
|
for tool_name, tool_info in self.tools.items(): |
|
|
tools_list.append({ |
|
|
"name": tool_info["name"], |
|
|
"description": tool_info["description"], |
|
|
"inputSchema": tool_info["inputSchema"], |
|
|
"outputSchema": tool_info.get("outputSchema"), |
|
|
"examples": tool_info.get("examples", []), |
|
|
"tags": tool_info.get("tags", []) |
|
|
}) |
|
|
|
|
|
return { |
|
|
"jsonrpc": "2.0", |
|
|
"id": request_id, |
|
|
"result": { |
|
|
"tools": tools_list |
|
|
} |
|
|
} |
|
|
|
|
|
async def _handle_tools_call(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Handle tools/call request.""" |
|
|
tool_name = params.get("name") |
|
|
arguments = params.get("arguments", {}) |
|
|
|
|
|
if tool_name not in self.tools: |
|
|
return self._error_response(request_id, -32601, f"Tool '{tool_name}' not found") |
|
|
|
|
|
|
|
|
try: |
|
|
if tool_name == "add_customer": |
|
|
result = await self._add_customer(arguments) |
|
|
elif tool_name == "get_customer": |
|
|
result = await self._get_customer(arguments) |
|
|
elif tool_name == "search_customers": |
|
|
result = await self._search_customers(arguments) |
|
|
elif tool_name == "add_lead": |
|
|
result = await self._add_lead(arguments) |
|
|
elif tool_name == "get_sales_pipeline": |
|
|
result = await self._get_sales_pipeline(arguments) |
|
|
elif tool_name == "get_crm_metrics": |
|
|
result = await self._get_crm_metrics(arguments) |
|
|
else: |
|
|
return self._error_response(request_id, -32601, f"Tool '{tool_name}' not implemented") |
|
|
|
|
|
return { |
|
|
"jsonrpc": "2.0", |
|
|
"id": request_id, |
|
|
"result": { |
|
|
"content": [ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": json.dumps(result, indent=2) |
|
|
} |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("Tool execution failed", tool=tool_name, error=str(e)) |
|
|
return self._error_response(request_id, -32603, f"Tool execution failed: {str(e)}") |
|
|
|
|
|
async def _handle_initialize(self, request_id: str, params: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Handle initialize request.""" |
|
|
return { |
|
|
"jsonrpc": "2.0", |
|
|
"id": request_id, |
|
|
"result": { |
|
|
"protocolVersion": "2024-11-05", |
|
|
"capabilities": { |
|
|
"tools": {} |
|
|
}, |
|
|
"serverInfo": { |
|
|
"name": "CRM MCP Server", |
|
|
"version": "1.0.0", |
|
|
"description": "Provides customer relationship management and sales operations" |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
async def _add_customer(self, arguments: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Execute add_customer tool.""" |
|
|
return asdict(await self.db.add_customer(arguments)) |
|
|
|
|
|
async def _get_customer(self, arguments: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Execute get_customer tool.""" |
|
|
customer_id = arguments.get("customer_id") |
|
|
if not customer_id: |
|
|
raise ValueError("customer_id is required") |
|
|
|
|
|
customer = await self.db.get_customer(customer_id) |
|
|
if not customer: |
|
|
raise ValueError(f"Customer not found: {customer_id}") |
|
|
|
|
|
return asdict(customer) |
|
|
|
|
|
async def _search_customers(self, arguments: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Execute search_customers tool.""" |
|
|
query = arguments.get("query") |
|
|
limit = arguments.get("limit", 10) |
|
|
|
|
|
if not query: |
|
|
raise ValueError("query is required") |
|
|
|
|
|
customers = await self.db.search_customers(query, limit) |
|
|
return { |
|
|
"customers": [asdict(customer) for customer in customers] |
|
|
} |
|
|
|
|
|
async def _add_lead(self, arguments: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Execute add_lead tool.""" |
|
|
return asdict(await self.db.add_lead(arguments)) |
|
|
|
|
|
async def _get_sales_pipeline(self, arguments: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Execute get_sales_pipeline tool.""" |
|
|
limit = arguments.get("limit", 50) |
|
|
pipeline = await self.db.get_sales_pipeline(limit) |
|
|
return {"pipeline": pipeline} |
|
|
|
|
|
async def _get_crm_metrics(self, arguments: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Execute get_crm_metrics tool.""" |
|
|
return await self.db.get_metrics() |
|
|
|
|
|
def _error_response(self, request_id: str, code: int, message: str) -> Dict[str, Any]: |
|
|
"""Create error response.""" |
|
|
return { |
|
|
"jsonrpc": "2.0", |
|
|
"id": request_id, |
|
|
"error": { |
|
|
"code": code, |
|
|
"message": message |
|
|
} |
|
|
} |
|
|
|
|
|
async def start_server(self): |
|
|
"""Start the CRM MCP server.""" |
|
|
from aiohttp import web, ClientTimeout |
|
|
import aiohttp_cors |
|
|
|
|
|
|
|
|
app = web.Application() |
|
|
|
|
|
|
|
|
cors = aiohttp_cors.setup(app, defaults={ |
|
|
"*": aiohttp_cors.ResourceOptions( |
|
|
allow_credentials=True, |
|
|
expose_headers="*", |
|
|
allow_headers="*", |
|
|
allow_methods="*" |
|
|
) |
|
|
}) |
|
|
|
|
|
|
|
|
async def mcp_handler(request): |
|
|
try: |
|
|
data = await request.json() |
|
|
result = await self.handle_request(data) |
|
|
return web.json_response(result) |
|
|
except json.JSONDecodeError: |
|
|
return web.json_response({ |
|
|
"jsonrpc": "2.0", |
|
|
"id": None, |
|
|
"error": {"code": -32700, "message": "Parse error"} |
|
|
}, status=400) |
|
|
except Exception as e: |
|
|
logger.error("Request processing failed", error=str(e)) |
|
|
return web.json_response({ |
|
|
"jsonrpc": "2.0", |
|
|
"id": None, |
|
|
"error": {"code": -32603, "message": "Internal error"} |
|
|
}, status=500) |
|
|
|
|
|
|
|
|
app.router.add_post('/mcp', mcp_handler) |
|
|
app.router.add_get('/health', self._health_check) |
|
|
|
|
|
|
|
|
for route in list(app.router.routes()): |
|
|
cors.add(route) |
|
|
|
|
|
|
|
|
runner = web.AppRunner(app) |
|
|
await runner.setup() |
|
|
|
|
|
site = web.TCPSite(runner, 'localhost', self.port) |
|
|
await site.start() |
|
|
|
|
|
logger.info(f"CRM MCP Server started on port {self.port}") |
|
|
return runner |
|
|
|
|
|
|
|
|
async def _health_check(self, request): |
|
|
"""Health check endpoint.""" |
|
|
return web.json_response({ |
|
|
"status": "healthy", |
|
|
"service": "CRM MCP Server", |
|
|
"version": "1.0.0", |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
}) |
|
|
|
|
|
async def stop_server(self, runner): |
|
|
"""Stop the CRM MCP server.""" |
|
|
await runner.cleanup() |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Main entry point for the CRM server.""" |
|
|
server = CRMMCPServer() |
|
|
runner = await server.start_server() |
|
|
|
|
|
try: |
|
|
|
|
|
while True: |
|
|
await asyncio.sleep(1) |
|
|
except KeyboardInterrupt: |
|
|
logger.info("Shutting down CRM server...") |
|
|
await server.stop_server(runner) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |