Spaces:
Running
Running
| """ | |
| CRM MCP Server - Local MCP server with mocked CRM data. | |
| Provides tools for customer, deal, and document management. | |
| """ | |
| import json | |
| import os | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, List, Dict, Any | |
| from datetime import datetime | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI(title="CRM MCP Server", version="1.0.0") | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Data directory path | |
| DATA_DIR = Path(__file__).parent / "data" | |
| ACCESS_FILE = DATA_DIR / "access.json" | |
| class ToolCallRequest(BaseModel): | |
| name: str | |
| arguments: Dict[str, Any] = {} | |
| class ToolResponse(BaseModel): | |
| success: bool | |
| result: Any = None | |
| error: str = None | |
| # ============================================ | |
| # Data Loading Functions | |
| # ============================================ | |
| def load_customers() -> Dict: | |
| """Load customers from JSON file.""" | |
| customers_file = DATA_DIR / "customers.json" | |
| if customers_file.exists(): | |
| with open(customers_file, "r") as f: | |
| return json.load(f) | |
| return {"customers": []} | |
| def load_deals() -> Dict: | |
| """Load deals from JSON file.""" | |
| deals_file = DATA_DIR / "deals.json" | |
| if deals_file.exists(): | |
| with open(deals_file, "r") as f: | |
| return json.load(f) | |
| return {"deals": [], "pipeline_summary": {}} | |
| def load_documents() -> List[Dict]: | |
| """Load list of available documents.""" | |
| docs_dir = DATA_DIR / "documents" | |
| documents = [] | |
| if docs_dir.exists(): | |
| for doc_path in docs_dir.glob("*"): | |
| if doc_path.is_file(): | |
| stat = doc_path.stat() | |
| documents.append({ | |
| "name": doc_path.name, | |
| #"path": str(doc_path), | |
| "size_bytes": stat.st_size, | |
| "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(), | |
| "type": doc_path.suffix.lstrip(".") | |
| }) | |
| return documents | |
| def load_access_data() -> Dict[str, Any]: | |
| if ACCESS_FILE.exists(): | |
| with open(ACCESS_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| return {"access": []} | |
| def save_access_data(data: Dict[str, Any]) -> None: | |
| ACCESS_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with open(ACCESS_FILE, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2) | |
| # ============================================ | |
| # Tool Implementations | |
| # ============================================ | |
| def get_customers( | |
| status: Optional[str] = None, | |
| industry: Optional[str] = None, | |
| limit: int = 50 | |
| ) -> Dict: | |
| """Get list of customers with optional filtering.""" | |
| data = load_customers() | |
| customers = data.get("customers", []) | |
| # Apply filters | |
| if status: | |
| customers = [c for c in customers if c.get("status", "").lower() == status.lower()] | |
| if industry: | |
| customers = [c for c in customers if industry.lower() in c.get("industry", "").lower()] | |
| # Apply limit | |
| customers = customers[:limit] | |
| return { | |
| "total": len(customers), | |
| "customers": customers | |
| } | |
| def get_customer(customer_id: str) -> Dict: | |
| """Get a specific customer by ID.""" | |
| data = load_customers() | |
| for customer in data.get("customers", []): | |
| if customer.get("id") == customer_id: | |
| # Also get related deals | |
| deals_data = load_deals() | |
| related_deals = [ | |
| d for d in deals_data.get("deals", []) | |
| if d.get("customer_id") == customer_id | |
| ] | |
| customer["related_deals"] = related_deals | |
| customer.pop("tags") | |
| return customer | |
| return {"error": f"Customer {customer_id} not found"} | |
| def get_deals( | |
| stage: Optional[str] = None, | |
| customer_id: Optional[str] = None, | |
| owner: Optional[str] = None, | |
| min_value: Optional[float] = None, | |
| limit: int = 50 | |
| ) -> Dict: | |
| """Get list of deals with optional filtering.""" | |
| data = load_deals() | |
| deals = data.get("deals", []) | |
| # Apply filters | |
| if stage: | |
| deals = [d for d in deals if d.get("stage", "").lower() == stage.lower()] | |
| if customer_id: | |
| deals = [d for d in deals if d.get("customer_id") == customer_id] | |
| if owner: | |
| deals = [d for d in deals if owner.lower() in d.get("owner", "").lower()] | |
| if min_value is not None: | |
| deals = [d for d in deals if d.get("value", 0) >= min_value] | |
| # Apply limit | |
| deals = deals[:limit] | |
| return { | |
| "total": len(deals), | |
| "deals": deals, | |
| "pipeline_summary": data.get("pipeline_summary", {}) | |
| } | |
| def get_deal(deal_id: str) -> Dict: | |
| """Get a specific deal by ID.""" | |
| data = load_deals() | |
| for deal in data.get("deals", []): | |
| if deal.get("id") == deal_id: | |
| return deal | |
| return {"error": f"Deal {deal_id} not found"} | |
| def get_pipeline_summary() -> Dict: | |
| """Get sales pipeline summary.""" | |
| data = load_deals() | |
| deals = data.get("deals", []) | |
| # Calculate fresh summary | |
| open_deals = [d for d in deals if d.get("stage") not in ["closed_won", "closed_lost"]] | |
| summary = { | |
| "total_deals": len(deals), | |
| "open_deals": len(open_deals), | |
| "total_pipeline_value": sum(d.get("value", 0) for d in open_deals), | |
| "weighted_value": sum( | |
| d.get("value", 0) * d.get("probability", 0) / 100 | |
| for d in open_deals | |
| ), | |
| "by_stage": {}, | |
| "by_owner": {}, | |
| "expected_closes_this_month": [] | |
| } | |
| # Group by stage | |
| for deal in deals: | |
| stage = deal.get("stage", "unknown") | |
| if stage not in summary["by_stage"]: | |
| summary["by_stage"][stage] = {"count": 0, "value": 0} | |
| summary["by_stage"][stage]["count"] += 1 | |
| summary["by_stage"][stage]["value"] += deal.get("value", 0) | |
| # Group by owner | |
| for deal in open_deals: | |
| owner = deal.get("owner", "Unassigned") | |
| if owner not in summary["by_owner"]: | |
| summary["by_owner"][owner] = {"count": 0, "value": 0} | |
| summary["by_owner"][owner]["count"] += 1 | |
| summary["by_owner"][owner]["value"] += deal.get("value", 0) | |
| # Deals expected to close this month | |
| current_month = datetime.now().strftime("%Y-%m") | |
| for deal in open_deals: | |
| close_date = deal.get("expected_close", "") | |
| if close_date.startswith(current_month): | |
| summary["expected_closes_this_month"].append({ | |
| "id": deal.get("id"), | |
| "title": deal.get("title"), | |
| "value": deal.get("value"), | |
| "probability": deal.get("probability") | |
| }) | |
| return summary | |
| def get_documents() -> Dict: | |
| """Get list of available documents.""" | |
| documents = load_documents() | |
| return { | |
| "total": len(documents), | |
| "documents": documents, | |
| "instructions": "Utilize the exact file name as param for the read_document tool" | |
| } | |
| def read_document(name: str) -> Dict: | |
| """Read content of a specific document.""" | |
| docs_dir = DATA_DIR / "documents" | |
| doc_path = docs_dir / name | |
| if not doc_path.exists(): | |
| # Try to find partial match | |
| for doc in docs_dir.glob("*"): | |
| if name.lower() in doc.name.lower(): | |
| doc_path = doc | |
| break | |
| if not doc_path.exists(): | |
| return {"error": f"Document '{name}' not found"} | |
| try: | |
| with open(doc_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| return { | |
| "name": doc_path.name, | |
| "content": content, | |
| "size_bytes": len(content), | |
| "type": doc_path.suffix.lstrip(".") | |
| } | |
| except Exception as e: | |
| return {"error": f"Failed to read document: {str(e)}"} | |
| def search_documents(query: str) -> Dict: | |
| """Search documents by content.""" | |
| docs_dir = DATA_DIR / "documents" | |
| query_lower = query.lower() | |
| matches = [] | |
| if docs_dir.exists(): | |
| for doc_path in docs_dir.glob("*"): | |
| if doc_path.is_file(): | |
| try: | |
| with open(doc_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if query_lower in content.lower() or query_lower in doc_path.name.lower(): | |
| # Find relevant excerpts | |
| lines = content.split("\n") | |
| relevant_lines = [ | |
| line.strip() for line in lines | |
| if query_lower in line.lower() | |
| ][:3] # Max 3 relevant lines | |
| matches.append({ | |
| "name": doc_path.name, | |
| "type": doc_path.suffix.lstrip("."), | |
| "relevant_excerpts": relevant_lines | |
| }) | |
| except Exception: | |
| continue | |
| return { | |
| "query": query, | |
| "total_matches": len(matches), | |
| "documents": matches | |
| } | |
| def get_access(customer_name: str) -> Dict[str, Any]: | |
| """Look up whether access is enabled for the given customer.""" | |
| data = load_access_data() | |
| name_lower = customer_name.lower() | |
| for entry in data.get("access", []): | |
| if entry.get("customer_name", "").lower() == name_lower: | |
| return entry | |
| return {"customer_name": customer_name, "enabled": False} | |
| def set_access(customer_name: str) -> Dict[str, Any]: | |
| """Set access enabled = true for the given customer.""" | |
| data = load_access_data() | |
| name_lower = customer_name.lower() | |
| found = None | |
| for entry in data.get("access", []): | |
| if entry.get("customer_name", "").lower() == name_lower: | |
| entry["enabled"] = True | |
| found = entry | |
| break | |
| if not found: | |
| found = {"customer_name": customer_name, "enabled": True} | |
| data.setdefault("access", []).append(found) | |
| save_access_data(data) | |
| return found | |
| # ============================================ | |
| # Tool Registry | |
| # ============================================ | |
| TOOLS = { | |
| "get_customers": { | |
| "description": "Get list of customers. Optionally filter by status (active/inactive/prospect) or industry.", | |
| "function": get_customers, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "status": {"type": "string", "description": "Filter by status: active, inactive, or prospect"}, | |
| "industry": {"type": "string", "description": "Filter by industry"}, | |
| "limit": {"type": "integer", "description": "Maximum number of results", "default": 50} | |
| } | |
| } | |
| }, | |
| "get_customer": { | |
| "description": "Get detailed information about a specific customer by name.", | |
| "function": get_customer, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "customer_id": {"type": "string", "description": "Customer ID (e.g., Walnut)"} | |
| }, | |
| "required": ["customer_id"] | |
| } | |
| }, | |
| "get_deals": { | |
| "description": "Get list of deals/opportunities. Optionally filter by stage, customer, owner, or minimum value.", | |
| "function": get_deals, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "stage": {"type": "string", "description": "Filter by stage: qualification, demo, proposal, negotiation, closed_won, closed_lost"}, | |
| "customer_id": {"type": "string", "description": "Filter by customer ID"}, | |
| "owner": {"type": "string", "description": "Filter by deal owner name"}, | |
| "min_value": {"type": "number", "description": "Minimum deal value"}, | |
| "limit": {"type": "integer", "description": "Maximum number of results", "default": 50} | |
| } | |
| } | |
| }, | |
| "get_deal": { | |
| "description": "Get detailed information about a specific deal by ID.", | |
| "function": get_deal, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "deal_id": {"type": "string", "description": "Deal ID (e.g., DEAL-001)"} | |
| }, | |
| "required": ["deal_id"] | |
| } | |
| }, | |
| "get_pipeline_summary": { | |
| "description": "Get sales pipeline summary including totals by stage and owner.", | |
| "function": get_pipeline_summary, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": {} | |
| } | |
| }, | |
| "get_documents": { | |
| "description": "Get list of available CRM documents related to the company at hand.", | |
| "function": get_documents, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": {} | |
| } | |
| }, | |
| "read_document": { | |
| "description": "Read the content of a specific document by name.", | |
| "function": read_document, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "name": {"type": "string", "description": "Document name or partial name"} | |
| }, | |
| "required": ["name"] | |
| } | |
| }, | |
| "search_documents": { | |
| "description": "Search documents by content or title.", | |
| "function": search_documents, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Search query"} | |
| }, | |
| "required": ["query"] | |
| } | |
| }, | |
| "get_access": { | |
| "description": "Check whether endpoint access is enabled for a given customer.", | |
| "function": get_access, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "customer_name": { | |
| "type": "string", | |
| "description": "Customer company name" | |
| } | |
| }, | |
| "required": ["customer_name"] | |
| } | |
| }, | |
| "set_access": { | |
| "description": "Enable endpoint access for a given customer (sets enabled=true in access.json).", | |
| "function": set_access, | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "customer_name": { | |
| "type": "string", | |
| "description": "Customer company name" | |
| } | |
| }, | |
| "required": ["customer_name"] | |
| } | |
| } | |
| } | |
| # ============================================ | |
| # API Endpoints | |
| # ============================================ | |
| async def root(): | |
| """Health check endpoint.""" | |
| return {"status": "ok", "service": "CRM MCP Server", "version": "1.0.0"} | |
| async def list_tools(): | |
| """List all available tools.""" | |
| tools_list = [] | |
| for name, config in TOOLS.items(): | |
| tools_list.append({ | |
| "name": name, | |
| "description": config["description"], | |
| "inputSchema": config["inputSchema"] | |
| }) | |
| return {"tools": tools_list} | |
| async def call_tool(request: ToolCallRequest): | |
| """Execute a tool by name with arguments.""" | |
| tool_name = request.name | |
| arguments = request.arguments | |
| if tool_name not in TOOLS: | |
| return ToolResponse( | |
| success=False, | |
| error=f"Unknown tool: {tool_name}" | |
| ) | |
| try: | |
| tool_func = TOOLS[tool_name]["function"] | |
| result = tool_func(**arguments) | |
| return ToolResponse(success=True, result=result) | |
| except Exception as e: | |
| logger.error(f"Tool execution error: {e}") | |
| return ToolResponse(success=False, error=str(e)) | |
| # ============================================ | |
| # Main Entry Point | |
| # ============================================ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Ensure data directory exists | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| (DATA_DIR / "documents").mkdir(exist_ok=True) | |
| logger.info(f"Starting CRM MCP Server...") | |
| logger.info(f"Data directory: {DATA_DIR}") | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |