Spaces:
Sleeping
Sleeping
File size: 11,690 Bytes
a67367b baa9f3c a67367b 908f079 a67367b f80b076 a67367b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
"""
HTTP-only MCP Server for the separated AI service.
Communicates with the business logic service via HTTP API calls.
"""
import re
from typing import Literal, Optional, Any, Dict
import httpx
from mcp.server.fastmcp import FastMCP
from config.config import settings
import logging
import os
import sys
from dotenv import load_dotenv
# Set up logging
logger = logging.getLogger(__name__)
load_dotenv()
logger.info("--- MCP SERVER SCRIPT STARTING ---")
# Load secure service secret from environment variable
SERVICE_SECRET = "abfe95adc6a3d85f1d8533a0fbf151b18240d817b471dda39a925555d886549c32c667dbeb184b9e9c73da3227c0dae5f83a"
logger.info(f"SERVICE_SECRET loaded: '{bool(SERVICE_SECRET)}'") # Log boolean status, not the secret itself.
if not SERVICE_SECRET:
logger.error("SERVICE_SECRET is NOT SET. Authentication will fail.")
# Create MCP server instance
mcp = FastMCP("separated-task-management-server")
logger.info("FastMCP instance created.")
# Get business service base URL from environment
BUSINESS_SERVICE_URL = settings.BUSINESS_SERVICE_URL
def _get_auth_headers() -> Dict[str, str]:
if not SERVICE_SECRET:
logger.error("Attempted to get auth headers but SERVICE_SECRET is not set.")
return {}
return {"Authorization": f"Bearer {SERVICE_SECRET}"}
def detect_priority_from_text(text: str) -> str:
text_lower = text.lower()
high_priority_patterns = [r'\bhigh\s*priority\b', r'\burgent\b', r'\bcritical\b', r'\bimportant\b', r'\basap\b', r'\bhigh\b']
low_priority_patterns = [r'\blow\s*priority\b', r'\bminor\b', r'\boptional\b', r'\bwhen\s*you\s*have\s*time\b', r'low']
if any(re.search(p, text_lower) for p in high_priority_patterns):
return "high"
if any(re.search(p, text_lower) for p in low_priority_patterns):
return "low"
if re.search(r'\bmedium\b|\bnormal\b', text_lower):
return "medium"
return "medium"
@mcp.tool()
async def add_task(
user_id: str,
title: str,
description: Optional[str] = None,
priority: Optional[str] = None,
due_date: Optional[str] = None,
) -> dict:
"""
Create a new task. This tool is idempotent.
"""
if not SERVICE_SECRET:
return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
try:
existing_tasks_response = await list_tasks(user_id=user_id)
if existing_tasks_response.get("success"):
for task in existing_tasks_response.get("tasks", []):
if task.get("title", "").lower() == title.lower():
return {"success": False, "done": True, "message": "Task already exists."}
except Exception as e:
logger.error(f"Failed to check for existing tasks during add_task: {e}")
return {"success": False, "done": True, "error": "Failed to verify if task exists.", "details": str(e)}
if priority is None:
combined_text = f"{title} {description or ''}"
priority = detect_priority_from_text(combined_text)
else:
priority = priority.lower()
if priority not in ["low", "medium", "high"]:
priority = "medium"
payload = {"title": title, "description": description, "priority": priority, "completed": False, "due_date": due_date}
headers = {"Content-Type": "application/json", **_get_auth_headers()}
async with httpx.AsyncClient() as client:
try:
response = await client.post(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", json=payload, headers=headers)
if response.status_code in [200, 201]:
return {"success": True, "done": True, "message": "Task added successfully."}
else:
return {"success": False, "done": True, "error": f"Failed to create task: {response.status_code}", "details": response.text}
except Exception as e:
return {"success": False, "done": True, "error": "Failed to create task.", "details": str(e)}
@mcp.tool()
async def list_tasks(user_id: str, status: Literal["all", "pending", "completed"] = "all") -> dict:
"""
Retrieve tasks.
"""
if not SERVICE_SECRET:
return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
params = {"status": status} if status != "all" else {}
headers = _get_auth_headers()
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", params=params, headers=headers)
if response.status_code == 200:
result = response.json()
tasks = result if isinstance(result, list) else result.get("tasks", [])
task_list = [{"id": t.get("id"), "title": t.get("title"), "description": t.get("description"), "completed": t.get("completed", False), "priority": t.get("priority"), "created_at": t.get("created_at")} for t in tasks]
return {"success": True, "done": True, "tasks": task_list, "count": len(task_list)}
else:
return {"success": False, "done": True, "error": f"Failed to list tasks: {response.status_code}", "details": response.text}
except Exception as e:
return {"success": False, "done": True, "error": "Failed to list tasks.", "details": str(e)}
@mcp.tool()
async def complete_task(user_id: str, task_id: int) -> dict:
"""
Mark a task as complete.
"""
if not SERVICE_SECRET:
return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
async with httpx.AsyncClient() as client:
try:
response = await client.patch(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}/complete", json=payload, headers=headers)
if response.status_code in [200, 204]:
return {"success": True, "done": True, "message": f"Task {task_id} marked as complete."}
else:
return {"success": False, "done": True, "error": f"Failed to complete task: {response.status_code}", "details": response.text}
except Exception as e:
return {"success": False, "done": True, "error": "Failed to complete task.", "details": str(e)}
@mcp.tool()
async def delete_task(user_id: str, task_id: int) -> dict:
"""
Remove a task.
"""
if not SERVICE_SECRET:
return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
headers = _get_auth_headers()
async with httpx.AsyncClient() as client:
try:
response = await client.delete(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", headers=headers)
if response.status_code in [200, 204]:
return {"success": True, "done": True, "message": "Task deleted successfully."}
else:
return {"success": False, "done": True, "error": f"Failed to delete task: {response.status_code}", "details": response.text}
except Exception as e:
return {"success": False, "done": True, "error": "Failed to delete task.", "details": str(e)}
@mcp.tool()
async def update_task(user_id: str, task_id: int, title: Optional[str] = None, description: Optional[str] = None, priority: Optional[str] = None) -> dict:
"""
Modify task details.
"""
if not SERVICE_SECRET:
return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
if title is None and description is None and priority is None:
return {"success": False, "done": True, "error": "At least one of 'title', 'description', or 'priority' must be provided."}
payload = {k: v for k, v in {"title": title, "description": description, "priority": priority}.items() if v is not None}
headers = {"Content-Type": "application/json", **_get_auth_headers()}
async with httpx.AsyncClient() as client:
try:
response = await client.put(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", json=payload, headers=headers)
if response.status_code in [200, 204]:
return {"success": True, "done": True, "message": f"Task {task_id} updated successfully."}
else:
return {"success": False, "done": True, "error": f"Failed to update task: {response.status_code}", "details": response.text}
except Exception as e:
return {"success": False, "done": True, "error": "Failed to update task.", "details": str(e)}
@mcp.tool()
async def set_priority(user_id: str, task_id: int, priority: str) -> dict:
"""
Set or update a task's priority.
"""
if not SERVICE_SECRET:
return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
priority = priority.lower()
if priority not in ["low", "medium", "high"]:
return {"success": False, "done": True, "error": "Priority must be one of: 'low', 'medium', 'high'."}
payload = {"priority": priority}
headers = {"Content-Type": "application/json", **_get_auth_headers()}
async with httpx.AsyncClient() as client:
try:
response = await client.put(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", json=payload, headers=headers)
if response.status_code in [200, 204]:
return {"success": True, "done": True, "message": f"Priority for task {task_id} updated successfully."}
else:
return {"success": False, "done": True, "error": f"Failed to update priority: {response.status_code}", "details": response.text}
except Exception as e:
return {"success": False, "done": True, "error": "Failed to update priority.", "details": str(e)}
@mcp.tool()
async def list_tasks_by_priority(user_id: str, priority: str, status: Literal["all", "pending", "completed"] = "all") -> dict:
"""
Retrieve tasks filtered by priority.
"""
if not SERVICE_SECRET:
return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
priority = priority.lower()
if priority not in ["low", "medium", "high"]:
return {"success": False, "done": True, "error": "Priority must be one of: 'low', 'medium', 'high'."}
params = {"priority": priority}
if status != "all":
params["status"] = status
headers = _get_auth_headers()
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", params=params, headers=headers)
if response.status_code == 200:
result = response.json()
tasks = result.get("tasks", [])
task_list = [{"id": t.get("id"), "title": t.get("title"), "priority": t.get("priority"), "completed": t.get("completed", False), "description": t.get("description"), "created_at": t.get("created_at")} for t in tasks]
return {"success": True, "done": True, "tasks": task_list, "count": len(task_list), "priority": priority, "status": status}
else:
return {"success": False, "done": True, "error": f"Failed to list tasks by priority: {response.status_code}", "details": response.text}
except Exception as e:
return {"success": False, "done": True, "error": "Failed to list tasks by priority.", "details": str(e)}
if __name__ == "__main__":
try:
logger.info("MCP script running in __main__")
mcp.run()
except Exception as e:
logger.error("An unhandled exception occurred in ai_mcp_server.py", exc_info=True)
raise |