Spaces:
Sleeping
Sleeping
File size: 3,732 Bytes
676582c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
"""
Complete Task MCP Tool
MCP tool for marking tasks as completed via natural language.
Supports task identification by ID or title.
Implements user context injection for security.
"""
import logging
from typing import Union
from datetime import datetime
from sqlmodel import Session, select
from ...models.task import Task
from ...core.database import get_session
from ..tool_registry import ToolExecutionResult
logger = logging.getLogger(__name__)
async def complete_task(
task_identifier: Union[int, str],
user_id: int # Injected by backend, never from LLM
) -> ToolExecutionResult:
"""
Mark a task as completed.
SECURITY: user_id is injected by the backend via MCPToolRegistry.
The LLM cannot specify or modify the user_id.
Args:
task_identifier: Task ID (integer) or task title (string)
user_id: User ID (injected by backend for security)
Returns:
ToolExecutionResult with success status and updated task data
"""
try:
# Query task from database
db: Session = next(get_session())
try:
# Build query based on identifier type
if isinstance(task_identifier, int):
# Search by ID
statement = select(Task).where(
Task.id == task_identifier,
Task.user_id == user_id
)
identifier_type = "ID"
else:
# Search by title (exact match)
statement = select(Task).where(
Task.title == task_identifier,
Task.user_id == user_id
)
identifier_type = "title"
task = db.exec(statement).first()
# Check if task exists
if not task:
logger.warning(f"Task not found: {identifier_type}={task_identifier}, user_id={user_id}")
return ToolExecutionResult(
success=False,
error=f"Task not found with {identifier_type}: {task_identifier}"
)
# Check if already completed
if task.completed:
logger.info(f"Task already completed: id={task.id}, user_id={user_id}")
return ToolExecutionResult(
success=True,
data={
"id": task.id,
"title": task.title,
"description": task.description,
"completed": task.completed,
"updated_at": task.updated_at.isoformat()
},
message=f"Task '{task.title}' was already marked as completed."
)
# Mark task as completed
task.completed = True
task.updated_at = datetime.utcnow()
db.add(task)
db.commit()
db.refresh(task)
logger.info(f"Task completed successfully: id={task.id}, user_id={user_id}, title={task.title}")
return ToolExecutionResult(
success=True,
data={
"id": task.id,
"title": task.title,
"description": task.description,
"completed": task.completed,
"updated_at": task.updated_at.isoformat()
},
message=f"Task '{task.title}' marked as completed!"
)
finally:
db.close()
except Exception as e:
logger.error(f"Error completing task: {str(e)}")
return ToolExecutionResult(
success=False,
error=f"Failed to complete task: {str(e)}"
)
|