Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| EcoMCP - E-commerce MCP Server (Track 1: Building MCP) | |
| Minimalist, fast, enterprise e-commerce assistant | |
| Integrates: OpenAI API + LlamaIndex + Modal | |
| Features: | |
| - Knowledge base integration with LlamaIndex | |
| - Semantic search across products and documentation | |
| - AI-powered product analysis and recommendations | |
| - Review intelligence with sentiment analysis | |
| - Smart pricing and competitive analysis | |
| """ | |
| import json | |
| import sys | |
| import asyncio | |
| import logging | |
| import os | |
| from typing import Any, Dict, List, Optional, AsyncGenerator | |
| from datetime import datetime | |
| import httpx | |
| from functools import lru_cache | |
| # Setup logging to stderr | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| stream=sys.stderr | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Import validators | |
| try: | |
| from src.core.validators import validate_tool_args | |
| VALIDATORS_LOADED = True | |
| except ImportError: | |
| VALIDATORS_LOADED = False | |
| logger.error("CRITICAL: Input validators not available. Input validation disabled. This is a security risk.") | |
| # Import LlamaIndex knowledge base | |
| try: | |
| from src.core import EcoMCPKnowledgeBase, get_knowledge_base, initialize_knowledge_base | |
| LLAMAINDEX_AVAILABLE = True | |
| except ImportError: | |
| LLAMAINDEX_AVAILABLE = False | |
| logger.warning("LlamaIndex not available. Knowledge base features disabled.") | |
| # Get config | |
| try: | |
| from src.config import get_app_config | |
| config = get_app_config() | |
| OPENAI_API_KEY = config.openai_api_key | |
| MODEL = config.openai_model | |
| except ImportError: | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
| MODEL = "gpt-4-turbo" | |
| class EcoMCPServer: | |
| """ | |
| E-commerce MCP Server with OpenAI integration | |
| Implements MCP 2024-11-05 specification | |
| """ | |
| def __init__(self): | |
| self.tools = self._init_tools() | |
| self.protocol_version = "2024-11-05" | |
| self.kb = None | |
| self.capabilities = {} | |
| self._validate_startup() | |
| self._init_knowledge_base() | |
| self._report_capabilities() | |
| def _validate_startup(self): | |
| """Validate server startup configuration""" | |
| logger.info("Validating server startup configuration...") | |
| # Check OpenAI API key | |
| if not OPENAI_API_KEY: | |
| logger.error("CRITICAL: OpenAI API key not configured") | |
| logger.error(" Set OPENAI_API_KEY environment variable to enable AI features") | |
| self.capabilities['openai_available'] = False | |
| else: | |
| logger.info(f"✓ OpenAI API key configured for model: {MODEL}") | |
| self.capabilities['openai_available'] = True | |
| # Check validators | |
| if not VALIDATORS_LOADED: | |
| logger.error("CRITICAL: Input validators not loaded - validation disabled") | |
| self.capabilities['validation_available'] = False | |
| else: | |
| logger.info("✓ Input validators loaded") | |
| self.capabilities['validation_available'] = True | |
| # Check docs directory for KB | |
| docs_path = "./docs" | |
| if os.path.exists(docs_path): | |
| logger.info(f"✓ Documentation directory found: {docs_path}") | |
| self.capabilities['docs_available'] = True | |
| else: | |
| logger.warning(f"Documentation directory not found: {docs_path} (KB features will be unavailable)") | |
| self.capabilities['docs_available'] = False | |
| def _report_capabilities(self): | |
| """Report available capabilities""" | |
| logger.info("=" * 60) | |
| logger.info("Server Capabilities:") | |
| tools_available = 7 | |
| tools_disabled = 0 | |
| if not self.capabilities.get('openai_available'): | |
| logger.warning(" ⚠️ OpenAI features DISABLED (no API key)") | |
| tools_disabled += 6 # Most tools need OpenAI | |
| else: | |
| logger.info(" ✓ OpenAI-powered analysis tools") | |
| if not self.capabilities.get('docs_available'): | |
| logger.warning(" ⚠️ Knowledge base features DISABLED (no docs)") | |
| tools_disabled += 2 | |
| else: | |
| logger.info(" ✓ Knowledge base search and queries") | |
| logger.info(f" → {tools_available - tools_disabled}/{tools_available} tools available") | |
| if not self.capabilities.get('validation_available'): | |
| logger.error(" ⚠️ SECURITY: Input validation disabled") | |
| logger.info("=" * 60) | |
| def _init_knowledge_base(self): | |
| """Initialize LlamaIndex knowledge base""" | |
| if not LLAMAINDEX_AVAILABLE: | |
| return | |
| try: | |
| # Initialize knowledge base with docs directory | |
| docs_path = "./docs" | |
| if os.path.exists(docs_path): | |
| self.kb = EcoMCPKnowledgeBase() | |
| self.kb.initialize(docs_path) | |
| logger.info("Knowledge base initialized successfully") | |
| else: | |
| logger.warning(f"Documentation directory not found: {docs_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize knowledge base: {e}") | |
| def _init_tools(self) -> List[Dict[str, Any]]: | |
| """Define e-commerce MCP tools""" | |
| return [ | |
| { | |
| "name": "analyze_product", | |
| "description": "Analyze e-commerce product and generate recommendations", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "product_name": {"type": "string", "description": "Product name"}, | |
| "category": {"type": "string", "description": "Product category"}, | |
| "description": {"type": "string", "description": "Product description"}, | |
| "current_price": {"type": "number", "description": "Current price ($)"} | |
| }, | |
| "required": ["product_name"] | |
| } | |
| }, | |
| { | |
| "name": "analyze_reviews", | |
| "description": "Extract sentiment, themes, and actionable insights from reviews", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "reviews": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "List of customer reviews" | |
| }, | |
| "product_name": {"type": "string", "description": "Product name for context"} | |
| }, | |
| "required": ["reviews"] | |
| } | |
| }, | |
| { | |
| "name": "generate_listing", | |
| "description": "Create compelling product listing copy optimized for conversion", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "product_name": {"type": "string", "description": "Product name"}, | |
| "features": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Key product features" | |
| }, | |
| "target_audience": {"type": "string", "description": "Target customer segment"}, | |
| "style": {"type": "string", "enum": ["luxury", "budget", "professional", "casual"], "description": "Tone style"} | |
| }, | |
| "required": ["product_name", "features"] | |
| } | |
| }, | |
| { | |
| "name": "price_recommendation", | |
| "description": "AI-powered pricing strategy with market analysis", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "product_name": {"type": "string", "description": "Product name"}, | |
| "cost": {"type": "number", "description": "Product cost ($)"}, | |
| "category": {"type": "string", "description": "Product category"}, | |
| "target_margin": {"type": "number", "description": "Target profit margin %"} | |
| }, | |
| "required": ["product_name", "cost"] | |
| } | |
| }, | |
| { | |
| "name": "competitor_analysis", | |
| "description": "Analyze competitive positioning and market opportunities", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "product_name": {"type": "string", "description": "Product name"}, | |
| "category": {"type": "string", "description": "Product category"}, | |
| "key_competitors": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Competitor names" | |
| } | |
| }, | |
| "required": ["product_name"] | |
| } | |
| }, | |
| { | |
| "name": "knowledge_search", | |
| "description": "Search product knowledge base and documentation with semantic search", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Search query"}, | |
| "search_type": {"type": "string", "enum": ["all", "products", "documentation"], "description": "Type of search"}, | |
| "top_k": {"type": "integer", "description": "Number of results (default: 5)", "minimum": 1, "maximum": 20} | |
| }, | |
| "required": ["query"] | |
| } | |
| }, | |
| { | |
| "name": "product_query", | |
| "description": "Get natural language answers about products and documentation", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "question": {"type": "string", "description": "Natural language question"} | |
| }, | |
| "required": ["question"] | |
| } | |
| } | |
| ] | |
| async def handle_initialize(self, params: Dict) -> Dict: | |
| """Handle initialize request""" | |
| return { | |
| "protocolVersion": self.protocol_version, | |
| "capabilities": { | |
| "tools": {} | |
| }, | |
| "serverInfo": { | |
| "name": "ecomcp-server", | |
| "version": "1.0.0" | |
| } | |
| } | |
| async def handle_list_tools(self) -> Dict: | |
| """List available tools""" | |
| return {"tools": self.tools} | |
| async def call_tool(self, name: str, arguments: Dict) -> Any: | |
| """Execute tool with input validation""" | |
| logger.info(f"Calling tool: {name} with arguments: {list(arguments.keys())}") | |
| # Validate arguments (mandatory for security) | |
| if VALIDATORS_LOADED: | |
| is_valid, validated_args, error_msg = validate_tool_args(name, arguments) | |
| if not is_valid: | |
| logger.warning(f"Validation failed for {name}: {error_msg}") | |
| return {"status": "error", "error": error_msg, "code": "VALIDATION_ERROR"} | |
| arguments = validated_args if validated_args else arguments | |
| else: | |
| # If validators not loaded, do basic type checking | |
| logger.warning(f"Validators not loaded. Minimal validation for {name}") | |
| if not isinstance(arguments, dict): | |
| return {"status": "error", "error": "Arguments must be a dictionary", "code": "INVALID_FORMAT"} | |
| if name == "analyze_product": | |
| return await self._analyze_product(arguments) | |
| elif name == "analyze_reviews": | |
| return await self._analyze_reviews(arguments) | |
| elif name == "generate_listing": | |
| return await self._generate_listing(arguments) | |
| elif name == "price_recommendation": | |
| return await self._price_recommendation(arguments) | |
| elif name == "competitor_analysis": | |
| return await self._competitor_analysis(arguments) | |
| elif name == "knowledge_search": | |
| return await self._knowledge_search(arguments) | |
| elif name == "product_query": | |
| return await self._product_query(arguments) | |
| else: | |
| raise ValueError(f"Unknown tool: {name}") | |
| async def _analyze_product(self, args: Dict) -> Dict: | |
| """Analyze product with OpenAI""" | |
| try: | |
| product_name = args.get("product_name", "") | |
| category = args.get("category", "general") | |
| description = args.get("description", "") | |
| current_price = args.get("current_price") | |
| prompt = f"""Analyze this e-commerce product and provide actionable insights: | |
| Product: {product_name} | |
| Category: {category} | |
| {f'Description: {description}' if description else ''} | |
| {f'Current Price: ${current_price}' if current_price else ''} | |
| Provide: | |
| 1. Key value propositions | |
| 2. Potential customer segments | |
| 3. Market opportunities | |
| 4. Improvement recommendations | |
| 5. Competitive advantages | |
| Be concise and specific.""" | |
| analysis = await self._call_openai(prompt) | |
| return { | |
| "status": "success", | |
| "product": product_name, | |
| "analysis": analysis, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Product analysis error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _analyze_reviews(self, args: Dict) -> Dict: | |
| """Analyze reviews with OpenAI""" | |
| try: | |
| reviews = args.get("reviews", []) | |
| product_name = args.get("product_name", "Product") | |
| if not reviews: | |
| return {"status": "error", "error": "No reviews provided"} | |
| reviews_text = "\n".join(f"- {r}" for r in reviews[:20]) # Limit to 20 for token efficiency | |
| prompt = f"""Analyze these customer reviews for '{product_name}': | |
| {reviews_text} | |
| Provide: | |
| 1. Overall sentiment (positive/negative/mixed) with percentage breakdown | |
| 2. Top 3 strengths mentioned | |
| 3. Top 3 concerns/weaknesses | |
| 4. Key themes and patterns | |
| 5. Actionable improvement recommendations | |
| Be concise and data-driven.""" | |
| analysis = await self._call_openai(prompt) | |
| return { | |
| "status": "success", | |
| "product": product_name, | |
| "review_count": len(reviews), | |
| "analysis": analysis, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Review analysis error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _generate_listing(self, args: Dict) -> Dict: | |
| """Generate product listing""" | |
| try: | |
| product_name = args.get("product_name", "") | |
| features = args.get("features", []) | |
| target_audience = args.get("target_audience", "general consumers") | |
| style = args.get("style", "professional") | |
| features_text = ", ".join(features) if features else "premium quality" | |
| prompt = f"""Write a compelling e-commerce product listing: | |
| Product: {product_name} | |
| Features: {features_text} | |
| Target Audience: {target_audience} | |
| Tone: {style} | |
| Create: | |
| 1. Attention-grabbing headline (under 70 chars) | |
| 2. 2-3 sentence compelling description | |
| 3. 3-5 key benefits (bullet points) | |
| 4. Strong call-to-action | |
| Optimize for conversion. Be persuasive but authentic.""" | |
| listing = await self._call_openai(prompt) | |
| return { | |
| "status": "success", | |
| "product": product_name, | |
| "listing": listing, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Listing generation error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _price_recommendation(self, args: Dict) -> Dict: | |
| """Get pricing recommendation""" | |
| try: | |
| product_name = args.get("product_name", "") | |
| cost = args.get("cost", 0) | |
| category = args.get("category", "general") | |
| target_margin = args.get("target_margin", 50) | |
| prompt = f"""Provide pricing strategy for this e-commerce product: | |
| Product: {product_name} | |
| Cost: ${cost} | |
| Category: {category} | |
| Target Margin: {target_margin}% | |
| Analyze and provide: | |
| 1. Recommended retail price | |
| 2. Psychological pricing strategy (e.g., $99 vs $100) | |
| 3. Discount strategy recommendations | |
| 4. Bundle pricing opportunities | |
| 5. Price elasticity considerations for category | |
| Consider market dynamics, competition, and customer psychology.""" | |
| recommendation = await self._call_openai(prompt) | |
| return { | |
| "status": "success", | |
| "product": product_name, | |
| "cost": cost, | |
| "recommendation": recommendation, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Pricing error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _competitor_analysis(self, args: Dict) -> Dict: | |
| """Analyze competitive landscape""" | |
| try: | |
| product_name = args.get("product_name", "") | |
| category = args.get("category", "general") | |
| competitors = args.get("key_competitors", []) | |
| competitors_text = ", ".join(competitors) if competitors else "major market players" | |
| prompt = f"""Analyze competitive positioning: | |
| Product: {product_name} | |
| Category: {category} | |
| Competitors: {competitors_text} | |
| Provide: | |
| 1. Market positioning opportunities | |
| 2. Differentiation strategies | |
| 3. Competitor strengths and weaknesses | |
| 4. White space opportunities | |
| 5. Recommended go-to-market approach | |
| Focus on actionable competitive advantages.""" | |
| analysis = await self._call_openai(prompt) | |
| return { | |
| "status": "success", | |
| "product": product_name, | |
| "analysis": analysis, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Competitor analysis error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _knowledge_search(self, args: Dict) -> Dict: | |
| """Search knowledge base with semantic search""" | |
| try: | |
| if not self.kb: | |
| return {"status": "error", "error": "Knowledge base not initialized"} | |
| query = args.get("query", "") | |
| search_type = args.get("search_type", "all") | |
| top_k = args.get("top_k", 5) | |
| if not query: | |
| return {"status": "error", "error": "Query is required"} | |
| # Perform search | |
| if search_type == "products": | |
| results = self.kb.search_products(query, top_k=top_k) | |
| elif search_type == "documentation": | |
| results = self.kb.search_documentation(query, top_k=top_k) | |
| else: | |
| results = self.kb.search(query, top_k=top_k) | |
| # Format results | |
| formatted_results = [] | |
| for i, result in enumerate(results, 1): | |
| formatted_results.append({ | |
| "rank": i, | |
| "score": round(result.score, 3), | |
| "content": result.content[:300], # Truncate for readability | |
| "source": result.source | |
| }) | |
| return { | |
| "status": "success", | |
| "query": query, | |
| "search_type": search_type, | |
| "result_count": len(formatted_results), | |
| "results": formatted_results, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Knowledge search error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _product_query(self, args: Dict) -> Dict: | |
| """Query knowledge base with natural language question""" | |
| try: | |
| if not self.kb: | |
| return {"status": "error", "error": "Knowledge base not initialized"} | |
| question = args.get("question", "") | |
| if not question: | |
| return {"status": "error", "error": "Question is required"} | |
| # Get answer from knowledge base | |
| answer = self.kb.query(question) | |
| return { | |
| "status": "success", | |
| "question": question, | |
| "answer": answer, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Product query error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def _call_openai(self, prompt: str, stream: bool = False, max_retries: int = 3) -> str: | |
| """Call OpenAI API with comprehensive error handling and retry logic | |
| Args: | |
| prompt: The prompt to send to OpenAI | |
| stream: Whether to stream the response (currently unused) | |
| max_retries: Maximum number of retries for transient errors | |
| Returns: | |
| The model's response text or an error message | |
| """ | |
| if not OPENAI_API_KEY: | |
| return "OpenAI API key not configured. Set OPENAI_API_KEY environment variable." | |
| retry_count = 0 | |
| last_error = None | |
| while retry_count < max_retries: | |
| try: | |
| async with httpx.AsyncClient(timeout=60.0) as client: | |
| response = await client.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {OPENAI_API_KEY}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": MODEL, | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are an expert e-commerce consultant. Provide concise, actionable, data-driven insights." | |
| }, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "max_tokens": 800, | |
| "temperature": 0.7 | |
| } | |
| ) | |
| # Handle successful response | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data['choices'][0]['message']['content'] | |
| # Handle rate limiting - exponential backoff | |
| elif response.status_code == 429: | |
| wait_time = 2 ** retry_count # 1s, 2s, 4s | |
| logger.warning(f"Rate limited by OpenAI API. Retrying in {wait_time}s ({retry_count + 1}/{max_retries})") | |
| await asyncio.sleep(wait_time) | |
| retry_count += 1 | |
| continue | |
| # Handle authentication error | |
| elif response.status_code == 401: | |
| logger.error("OpenAI API authentication failed - check OPENAI_API_KEY") | |
| return "Authentication failed. Verify your OpenAI API key is valid." | |
| # Handle quota exceeded | |
| elif response.status_code == 403: | |
| logger.error("OpenAI API quota exceeded or permission denied") | |
| return "Access denied. Check your OpenAI account limits and permissions." | |
| # Handle server errors - retry with backoff | |
| elif 500 <= response.status_code < 600: | |
| wait_time = 2 ** retry_count | |
| logger.warning(f"OpenAI server error {response.status_code}. Retrying in {wait_time}s ({retry_count + 1}/{max_retries})") | |
| await asyncio.sleep(wait_time) | |
| retry_count += 1 | |
| continue | |
| # Handle other errors | |
| else: | |
| error_detail = response.text[:500] | |
| logger.error(f"OpenAI API error {response.status_code}: {error_detail}") | |
| return f"API Error {response.status_code}: {error_detail}" | |
| except asyncio.TimeoutError: | |
| last_error = "Request timeout" | |
| logger.error("OpenAI API call timed out") | |
| wait_time = 2 ** retry_count | |
| if retry_count < max_retries - 1: | |
| logger.info(f"Retrying after timeout ({retry_count + 1}/{max_retries})") | |
| await asyncio.sleep(wait_time) | |
| retry_count += 1 | |
| else: | |
| return "Request timeout. Please try again." | |
| except Exception as e: | |
| last_error = str(e) | |
| logger.error(f"OpenAI call error: {e}") | |
| return f"Error: {str(e)}" | |
| return f"Failed to reach OpenAI after {max_retries} attempts. Last error: {last_error}" | |
| async def process_message(self, message: Dict) -> Dict: | |
| """Process JSON-RPC message""" | |
| try: | |
| msg_id = message.get("id") | |
| method = message.get("method") | |
| params = message.get("params", {}) | |
| logger.debug(f"Processing method: {method}") | |
| if method == "initialize": | |
| result = await self.handle_initialize(params) | |
| elif method == "tools/list": | |
| result = await self.handle_list_tools() | |
| elif method == "tools/call": | |
| tool_name = params.get("name") | |
| tool_args = params.get("arguments", {}) | |
| result = await self.call_tool(tool_name, tool_args) | |
| else: | |
| raise ValueError(f"Unknown method: {method}") | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": msg_id, | |
| "result": result | |
| } | |
| except Exception as e: | |
| logger.error(f"Message processing error: {e}") | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": message.get("id"), | |
| "error": {"code": -32603, "message": str(e)} | |
| } | |
| async def main(): | |
| """Main server loop""" | |
| server = EcoMCPServer() | |
| logger.info("EcoMCP Server started - listening for JSON-RPC messages") | |
| loop = asyncio.get_event_loop() | |
| def read_message(): | |
| """Read JSON-RPC message from stdin""" | |
| try: | |
| line = sys.stdin.readline() | |
| if line: | |
| return json.loads(line) | |
| except (json.JSONDecodeError, EOFError, ValueError): | |
| pass | |
| return None | |
| async def server_loop(): | |
| """Main server loop""" | |
| while True: | |
| try: | |
| message = await loop.run_in_executor(None, read_message) | |
| if message is None: | |
| await asyncio.sleep(0.1) | |
| continue | |
| response = await server.process_message(message) | |
| sys.stdout.write(json.dumps(response) + "\n") | |
| sys.stdout.flush() | |
| except Exception as e: | |
| logger.error(f"Server error: {e}") | |
| error_response = { | |
| "jsonrpc": "2.0", | |
| "error": {"code": -32603, "message": str(e)} | |
| } | |
| sys.stdout.write(json.dumps(error_response) + "\n") | |
| sys.stdout.flush() | |
| try: | |
| await server_loop() | |
| except KeyboardInterrupt: | |
| logger.info("Server shutdown") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |