Spaces:
Sleeping
Sleeping
| """ | |
| Complete Task MCP Tool | |
| MCP tool for marking tasks as completed via natural language. | |
| Supports task identification by ID or title. | |
| Implements user context injection for security. | |
| """ | |
| import logging | |
| from typing import Union | |
| from datetime import datetime | |
| from sqlmodel import Session, select | |
| from ...models.task import Task | |
| from ...core.database import get_session | |
| from ..tool_registry import ToolExecutionResult | |
| logger = logging.getLogger(__name__) | |
| async def complete_task( | |
| task_identifier: Union[int, str], | |
| user_id: int # Injected by backend, never from LLM | |
| ) -> ToolExecutionResult: | |
| """ | |
| Mark a task as completed. | |
| SECURITY: user_id is injected by the backend via MCPToolRegistry. | |
| The LLM cannot specify or modify the user_id. | |
| Args: | |
| task_identifier: Task ID (integer) or task title (string) | |
| user_id: User ID (injected by backend for security) | |
| Returns: | |
| ToolExecutionResult with success status and updated task data | |
| """ | |
| try: | |
| # Query task from database | |
| db: Session = next(get_session()) | |
| try: | |
| # Build query based on identifier type | |
| if isinstance(task_identifier, int): | |
| # Search by ID | |
| statement = select(Task).where( | |
| Task.id == task_identifier, | |
| Task.user_id == user_id | |
| ) | |
| identifier_type = "ID" | |
| else: | |
| # Search by title (exact match) | |
| statement = select(Task).where( | |
| Task.title == task_identifier, | |
| Task.user_id == user_id | |
| ) | |
| identifier_type = "title" | |
| task = db.exec(statement).first() | |
| # Check if task exists | |
| if not task: | |
| logger.warning(f"Task not found: {identifier_type}={task_identifier}, user_id={user_id}") | |
| return ToolExecutionResult( | |
| success=False, | |
| error=f"Task not found with {identifier_type}: {task_identifier}" | |
| ) | |
| # Check if already completed | |
| if task.completed: | |
| logger.info(f"Task already completed: id={task.id}, user_id={user_id}") | |
| return ToolExecutionResult( | |
| success=True, | |
| data={ | |
| "id": task.id, | |
| "title": task.title, | |
| "description": task.description, | |
| "completed": task.completed, | |
| "updated_at": task.updated_at.isoformat() | |
| }, | |
| message=f"Task '{task.title}' was already marked as completed." | |
| ) | |
| # Mark task as completed | |
| task.completed = True | |
| task.updated_at = datetime.utcnow() | |
| db.add(task) | |
| db.commit() | |
| db.refresh(task) | |
| logger.info(f"Task completed successfully: id={task.id}, user_id={user_id}, title={task.title}") | |
| return ToolExecutionResult( | |
| success=True, | |
| data={ | |
| "id": task.id, | |
| "title": task.title, | |
| "description": task.description, | |
| "completed": task.completed, | |
| "updated_at": task.updated_at.isoformat() | |
| }, | |
| message=f"Task '{task.title}' marked as completed!" | |
| ) | |
| finally: | |
| db.close() | |
| except Exception as e: | |
| logger.error(f"Error completing task: {str(e)}") | |
| return ToolExecutionResult( | |
| success=False, | |
| error=f"Failed to complete task: {str(e)}" | |
| ) | |