Spaces:
Sleeping
Sleeping
| """ | |
| HTTP-only MCP Server for the separated AI service. | |
| Communicates with the business logic service via HTTP API calls. | |
| """ | |
| import re | |
| from typing import Literal, Optional, Any, Dict | |
| import httpx | |
| from mcp.server.fastmcp import FastMCP | |
| from config.config import settings | |
| import logging | |
| import os | |
| import sys | |
| from dotenv import load_dotenv | |
| # Set up logging | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| logger.info("--- MCP SERVER SCRIPT STARTING ---") | |
| # Load secure service secret from environment variable | |
| SERVICE_SECRET = "abfe95adc6a3d85f1d8533a0fbf151b18240d817b471dda39a925555d886549c32c667dbeb184b9e9c73da3227c0dae5f83a" | |
| logger.info(f"SERVICE_SECRET loaded: '{bool(SERVICE_SECRET)}'") # Log boolean status, not the secret itself. | |
| if not SERVICE_SECRET: | |
| logger.error("SERVICE_SECRET is NOT SET. Authentication will fail.") | |
| # Create MCP server instance | |
| mcp = FastMCP("separated-task-management-server") | |
| logger.info("FastMCP instance created.") | |
| # Get business service base URL from environment | |
| BUSINESS_SERVICE_URL = settings.BUSINESS_SERVICE_URL | |
| def _get_auth_headers() -> Dict[str, str]: | |
| if not SERVICE_SECRET: | |
| logger.error("Attempted to get auth headers but SERVICE_SECRET is not set.") | |
| return {} | |
| return {"Authorization": f"Bearer {SERVICE_SECRET}"} | |
| def detect_priority_from_text(text: str) -> str: | |
| text_lower = text.lower() | |
| high_priority_patterns = [r'\bhigh\s*priority\b', r'\burgent\b', r'\bcritical\b', r'\bimportant\b', r'\basap\b', r'\bhigh\b'] | |
| low_priority_patterns = [r'\blow\s*priority\b', r'\bminor\b', r'\boptional\b', r'\bwhen\s*you\s*have\s*time\b', r'low'] | |
| if any(re.search(p, text_lower) for p in high_priority_patterns): | |
| return "high" | |
| if any(re.search(p, text_lower) for p in low_priority_patterns): | |
| return "low" | |
| if re.search(r'\bmedium\b|\bnormal\b', text_lower): | |
| return "medium" | |
| return "medium" | |
| async def add_task( | |
| user_id: str, | |
| title: str, | |
| description: Optional[str] = None, | |
| priority: Optional[str] = None, | |
| due_date: Optional[str] = None, | |
| ) -> dict: | |
| """ | |
| Create a new task. This tool is idempotent. | |
| """ | |
| if not SERVICE_SECRET: | |
| return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."} | |
| try: | |
| existing_tasks_response = await list_tasks(user_id=user_id) | |
| if existing_tasks_response.get("success"): | |
| for task in existing_tasks_response.get("tasks", []): | |
| if task.get("title", "").lower() == title.lower(): | |
| return {"success": False, "done": True, "message": "Task already exists."} | |
| except Exception as e: | |
| logger.error(f"Failed to check for existing tasks during add_task: {e}") | |
| return {"success": False, "done": True, "error": "Failed to verify if task exists.", "details": str(e)} | |
| if priority is None: | |
| combined_text = f"{title} {description or ''}" | |
| priority = detect_priority_from_text(combined_text) | |
| else: | |
| priority = priority.lower() | |
| if priority not in ["low", "medium", "high"]: | |
| priority = "medium" | |
| payload = {"title": title, "description": description, "priority": priority, "completed": False, "due_date": due_date} | |
| headers = {"Content-Type": "application/json", **_get_auth_headers()} | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.post(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", json=payload, headers=headers) | |
| if response.status_code in [200, 201]: | |
| return {"success": True, "done": True, "message": "Task added successfully."} | |
| else: | |
| return {"success": False, "done": True, "error": f"Failed to create task: {response.status_code}", "details": response.text} | |
| except Exception as e: | |
| return {"success": False, "done": True, "error": "Failed to create task.", "details": str(e)} | |
| async def list_tasks(user_id: str, status: Literal["all", "pending", "completed"] = "all") -> dict: | |
| """ | |
| Retrieve tasks. | |
| """ | |
| if not SERVICE_SECRET: | |
| return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."} | |
| params = {"status": status} if status != "all" else {} | |
| headers = _get_auth_headers() | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.get(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", params=params, headers=headers) | |
| if response.status_code == 200: | |
| result = response.json() | |
| tasks = result if isinstance(result, list) else result.get("tasks", []) | |
| task_list = [{"id": t.get("id"), "title": t.get("title"), "description": t.get("description"), "completed": t.get("completed", False), "priority": t.get("priority"), "created_at": t.get("created_at")} for t in tasks] | |
| return {"success": True, "done": True, "tasks": task_list, "count": len(task_list)} | |
| else: | |
| return {"success": False, "done": True, "error": f"Failed to list tasks: {response.status_code}", "details": response.text} | |
| except Exception as e: | |
| return {"success": False, "done": True, "error": "Failed to list tasks.", "details": str(e)} | |
| async def complete_task(user_id: str, task_id: int) -> dict: | |
| """ | |
| Mark a task as complete. | |
| """ | |
| if not SERVICE_SECRET: | |
| return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."} | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.patch(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}/complete", json=payload, headers=headers) | |
| if response.status_code in [200, 204]: | |
| return {"success": True, "done": True, "message": f"Task {task_id} marked as complete."} | |
| else: | |
| return {"success": False, "done": True, "error": f"Failed to complete task: {response.status_code}", "details": response.text} | |
| except Exception as e: | |
| return {"success": False, "done": True, "error": "Failed to complete task.", "details": str(e)} | |
| async def delete_task(user_id: str, task_id: int) -> dict: | |
| """ | |
| Remove a task. | |
| """ | |
| if not SERVICE_SECRET: | |
| return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."} | |
| headers = _get_auth_headers() | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.delete(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", headers=headers) | |
| if response.status_code in [200, 204]: | |
| return {"success": True, "done": True, "message": "Task deleted successfully."} | |
| else: | |
| return {"success": False, "done": True, "error": f"Failed to delete task: {response.status_code}", "details": response.text} | |
| except Exception as e: | |
| return {"success": False, "done": True, "error": "Failed to delete task.", "details": str(e)} | |
| async def update_task(user_id: str, task_id: int, title: Optional[str] = None, description: Optional[str] = None, priority: Optional[str] = None) -> dict: | |
| """ | |
| Modify task details. | |
| """ | |
| if not SERVICE_SECRET: | |
| return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."} | |
| if title is None and description is None and priority is None: | |
| return {"success": False, "done": True, "error": "At least one of 'title', 'description', or 'priority' must be provided."} | |
| payload = {k: v for k, v in {"title": title, "description": description, "priority": priority}.items() if v is not None} | |
| headers = {"Content-Type": "application/json", **_get_auth_headers()} | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.put(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", json=payload, headers=headers) | |
| if response.status_code in [200, 204]: | |
| return {"success": True, "done": True, "message": f"Task {task_id} updated successfully."} | |
| else: | |
| return {"success": False, "done": True, "error": f"Failed to update task: {response.status_code}", "details": response.text} | |
| except Exception as e: | |
| return {"success": False, "done": True, "error": "Failed to update task.", "details": str(e)} | |
| async def set_priority(user_id: str, task_id: int, priority: str) -> dict: | |
| """ | |
| Set or update a task's priority. | |
| """ | |
| if not SERVICE_SECRET: | |
| return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."} | |
| priority = priority.lower() | |
| if priority not in ["low", "medium", "high"]: | |
| return {"success": False, "done": True, "error": "Priority must be one of: 'low', 'medium', 'high'."} | |
| payload = {"priority": priority} | |
| headers = {"Content-Type": "application/json", **_get_auth_headers()} | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.put(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", json=payload, headers=headers) | |
| if response.status_code in [200, 204]: | |
| return {"success": True, "done": True, "message": f"Priority for task {task_id} updated successfully."} | |
| else: | |
| return {"success": False, "done": True, "error": f"Failed to update priority: {response.status_code}", "details": response.text} | |
| except Exception as e: | |
| return {"success": False, "done": True, "error": "Failed to update priority.", "details": str(e)} | |
| async def list_tasks_by_priority(user_id: str, priority: str, status: Literal["all", "pending", "completed"] = "all") -> dict: | |
| """ | |
| Retrieve tasks filtered by priority. | |
| """ | |
| if not SERVICE_SECRET: | |
| return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."} | |
| priority = priority.lower() | |
| if priority not in ["low", "medium", "high"]: | |
| return {"success": False, "done": True, "error": "Priority must be one of: 'low', 'medium', 'high'."} | |
| params = {"priority": priority} | |
| if status != "all": | |
| params["status"] = status | |
| headers = _get_auth_headers() | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.get(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", params=params, headers=headers) | |
| if response.status_code == 200: | |
| result = response.json() | |
| tasks = result.get("tasks", []) | |
| task_list = [{"id": t.get("id"), "title": t.get("title"), "priority": t.get("priority"), "completed": t.get("completed", False), "description": t.get("description"), "created_at": t.get("created_at")} for t in tasks] | |
| return {"success": True, "done": True, "tasks": task_list, "count": len(task_list), "priority": priority, "status": status} | |
| else: | |
| return {"success": False, "done": True, "error": f"Failed to list tasks by priority: {response.status_code}", "details": response.text} | |
| except Exception as e: | |
| return {"success": False, "done": True, "error": "Failed to list tasks by priority.", "details": str(e)} | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("MCP script running in __main__") | |
| mcp.run() | |
| except Exception as e: | |
| logger.error("An unhandled exception occurred in ai_mcp_server.py", exc_info=True) | |
| raise |