File size: 11,690 Bytes
a67367b
 
 
 
 
 
 
 
 
 
 
 
 
baa9f3c
a67367b
 
 
908f079
 
a67367b
 
f80b076
a67367b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""

HTTP-only MCP Server for the separated AI service.
Communicates with the business logic service via HTTP API calls.
"""
import re
from typing import Literal, Optional, Any, Dict
import httpx
from mcp.server.fastmcp import FastMCP
from config.config import settings
import logging
import os
import sys
from dotenv import load_dotenv
# Set up logging
logger = logging.getLogger(__name__)

load_dotenv()

logger.info("--- MCP SERVER SCRIPT STARTING ---")
# Load secure service secret from environment variable
SERVICE_SECRET = "abfe95adc6a3d85f1d8533a0fbf151b18240d817b471dda39a925555d886549c32c667dbeb184b9e9c73da3227c0dae5f83a"
logger.info(f"SERVICE_SECRET loaded: '{bool(SERVICE_SECRET)}'") # Log boolean status, not the secret itself.

if not SERVICE_SECRET:
    logger.error("SERVICE_SECRET is NOT SET. Authentication will fail.")

# Create MCP server instance
mcp = FastMCP("separated-task-management-server")
logger.info("FastMCP instance created.")

# Get business service base URL from environment
BUSINESS_SERVICE_URL = settings.BUSINESS_SERVICE_URL

def _get_auth_headers() -> Dict[str, str]:
    if not SERVICE_SECRET:
        logger.error("Attempted to get auth headers but SERVICE_SECRET is not set.")
        return {}
    return {"Authorization": f"Bearer {SERVICE_SECRET}"}

def detect_priority_from_text(text: str) -> str:
    text_lower = text.lower()
    high_priority_patterns = [r'\bhigh\s*priority\b', r'\burgent\b', r'\bcritical\b', r'\bimportant\b', r'\basap\b', r'\bhigh\b']
    low_priority_patterns = [r'\blow\s*priority\b', r'\bminor\b', r'\boptional\b', r'\bwhen\s*you\s*have\s*time\b', r'low']
    if any(re.search(p, text_lower) for p in high_priority_patterns):
        return "high"
    if any(re.search(p, text_lower) for p in low_priority_patterns):
        return "low"
    if re.search(r'\bmedium\b|\bnormal\b', text_lower):
        return "medium"
    return "medium"

@mcp.tool()
async def add_task(
    user_id: str,
    title: str,
    description: Optional[str] = None,
    priority: Optional[str] = None,
    due_date: Optional[str] = None,
) -> dict:
    """
    Create a new task. This tool is idempotent.
    """
    if not SERVICE_SECRET:
        return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}

    try:
        existing_tasks_response = await list_tasks(user_id=user_id)
        if existing_tasks_response.get("success"):
            for task in existing_tasks_response.get("tasks", []):
                if task.get("title", "").lower() == title.lower():
                    return {"success": False, "done": True, "message": "Task already exists."}
    except Exception as e:
        logger.error(f"Failed to check for existing tasks during add_task: {e}")
        return {"success": False, "done": True, "error": "Failed to verify if task exists.", "details": str(e)}

    if priority is None:
        combined_text = f"{title} {description or ''}"
        priority = detect_priority_from_text(combined_text)
    else:
        priority = priority.lower()
        if priority not in ["low", "medium", "high"]:
            priority = "medium"

    payload = {"title": title, "description": description, "priority": priority, "completed": False, "due_date": due_date}
    headers = {"Content-Type": "application/json", **_get_auth_headers()}

    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", json=payload, headers=headers)
            if response.status_code in [200, 201]:
                return {"success": True, "done": True, "message": "Task added successfully."}
            else:
                return {"success": False, "done": True, "error": f"Failed to create task: {response.status_code}", "details": response.text}
        except Exception as e:
            return {"success": False, "done": True, "error": "Failed to create task.", "details": str(e)}

@mcp.tool()
async def list_tasks(user_id: str, status: Literal["all", "pending", "completed"] = "all") -> dict:
    """
    Retrieve tasks.
    """
    if not SERVICE_SECRET:
        return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
    params = {"status": status} if status != "all" else {}
    headers = _get_auth_headers()
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", params=params, headers=headers)
            if response.status_code == 200:
                result = response.json()
                tasks = result if isinstance(result, list) else result.get("tasks", [])
                task_list = [{"id": t.get("id"), "title": t.get("title"), "description": t.get("description"), "completed": t.get("completed", False), "priority": t.get("priority"), "created_at": t.get("created_at")} for t in tasks]
                return {"success": True, "done": True, "tasks": task_list, "count": len(task_list)}
            else:
                return {"success": False, "done": True, "error": f"Failed to list tasks: {response.status_code}", "details": response.text}
        except Exception as e:
            return {"success": False, "done": True, "error": "Failed to list tasks.", "details": str(e)}

@mcp.tool()
async def complete_task(user_id: str, task_id: int) -> dict:
    """
    Mark a task as complete.
    """
    if not SERVICE_SECRET:
        return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
    async with httpx.AsyncClient() as client:
        try:
            response = await client.patch(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}/complete", json=payload, headers=headers)
            if response.status_code in [200, 204]:
                return {"success": True, "done": True, "message": f"Task {task_id} marked as complete."}
            else:
                return {"success": False, "done": True, "error": f"Failed to complete task: {response.status_code}", "details": response.text}
        except Exception as e:
            return {"success": False, "done": True, "error": "Failed to complete task.", "details": str(e)}

@mcp.tool()
async def delete_task(user_id: str, task_id: int) -> dict:
    """
    Remove a task.
    """
    if not SERVICE_SECRET:
        return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
    headers = _get_auth_headers()
    async with httpx.AsyncClient() as client:
        try:
            response = await client.delete(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", headers=headers)
            if response.status_code in [200, 204]:
                return {"success": True, "done": True, "message": "Task deleted successfully."}
            else:
                return {"success": False, "done": True, "error": f"Failed to delete task: {response.status_code}", "details": response.text}
        except Exception as e:
            return {"success": False, "done": True, "error": "Failed to delete task.", "details": str(e)}

@mcp.tool()
async def update_task(user_id: str, task_id: int, title: Optional[str] = None, description: Optional[str] = None, priority: Optional[str] = None) -> dict:
    """
    Modify task details.
    """
    if not SERVICE_SECRET:
        return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
    if title is None and description is None and priority is None:
        return {"success": False, "done": True, "error": "At least one of 'title', 'description', or 'priority' must be provided."}
    payload = {k: v for k, v in {"title": title, "description": description, "priority": priority}.items() if v is not None}
    headers = {"Content-Type": "application/json", **_get_auth_headers()}
    async with httpx.AsyncClient() as client:
        try:
            response = await client.put(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", json=payload, headers=headers)
            if response.status_code in [200, 204]:
                return {"success": True, "done": True, "message": f"Task {task_id} updated successfully."}
            else:
                return {"success": False, "done": True, "error": f"Failed to update task: {response.status_code}", "details": response.text}
        except Exception as e:
            return {"success": False, "done": True, "error": "Failed to update task.", "details": str(e)}

@mcp.tool()
async def set_priority(user_id: str, task_id: int, priority: str) -> dict:
    """
    Set or update a task's priority.
    """
    if not SERVICE_SECRET:
        return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
    priority = priority.lower()
    if priority not in ["low", "medium", "high"]:
        return {"success": False, "done": True, "error": "Priority must be one of: 'low', 'medium', 'high'."}
    payload = {"priority": priority}
    headers = {"Content-Type": "application/json", **_get_auth_headers()}
    async with httpx.AsyncClient() as client:
        try:
            response = await client.put(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks/{task_id}", json=payload, headers=headers)
            if response.status_code in [200, 204]:
                return {"success": True, "done": True, "message": f"Priority for task {task_id} updated successfully."}
            else:
                return {"success": False, "done": True, "error": f"Failed to update priority: {response.status_code}", "details": response.text}
        except Exception as e:
            return {"success": False, "done": True, "error": "Failed to update priority.", "details": str(e)}

@mcp.tool()
async def list_tasks_by_priority(user_id: str, priority: str, status: Literal["all", "pending", "completed"] = "all") -> dict:
    """
    Retrieve tasks filtered by priority.
    """
    if not SERVICE_SECRET:
        return {"success": False, "done": True, "error": "Internal authentication error: Service secret missing."}
    priority = priority.lower()
    if priority not in ["low", "medium", "high"]:
        return {"success": False, "done": True, "error": "Priority must be one of: 'low', 'medium', 'high'."}
    params = {"priority": priority}
    if status != "all":
        params["status"] = status
    headers = _get_auth_headers()
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(f"{BUSINESS_SERVICE_URL}/api/{user_id}/tasks", params=params, headers=headers)
            if response.status_code == 200:
                result = response.json()
                tasks = result.get("tasks", [])
                task_list = [{"id": t.get("id"), "title": t.get("title"), "priority": t.get("priority"), "completed": t.get("completed", False), "description": t.get("description"), "created_at": t.get("created_at")} for t in tasks]
                return {"success": True, "done": True, "tasks": task_list, "count": len(task_list), "priority": priority, "status": status}
            else:
                return {"success": False, "done": True, "error": f"Failed to list tasks by priority: {response.status_code}", "details": response.text}
        except Exception as e:
            return {"success": False, "done": True, "error": "Failed to list tasks by priority.", "details": str(e)}

if __name__ == "__main__":
    try:
        logger.info("MCP script running in __main__")
        mcp.run()
    except Exception as e:
        logger.error("An unhandled exception occurred in ai_mcp_server.py", exc_info=True)
        raise