MCP_todo / agents /todo_agent.py
MAWB's picture
Update agents/todo_agent.py
b471009 verified
"""
TodoAgent - OpenAI Agent orchestration with MCP tools.
Per @specs/001-chatbot-mcp/plan.md Section VIII - AI Chatbot Architecture
OpenAI Agents SDK for orchestration with MCP tool calling.
"""
from openai import OpenAI
from typing import List, Dict, Any, Optional, AsyncGenerator
from uuid import UUID
import logging
import json
from config import settings
from mcp.server import get_mcp_server
logger = logging.getLogger(__name__)
class TodoAgent:
"""
OpenAI Agent for Todo task management with MCP tool integration.
This agent:
- Uses OpenAI Chat Completions API with tool calling
- Integrates with MCP tools for task operations
- Maintains conversation context for multi-turn dialogs
- Enforces user isolation by passing user_id to all tool calls
"""
# System prompt for friendly, concise, task-oriented behavior
SYSTEM_PROMPT = """You are a helpful task management assistant for TaskFlow. Your role is to help users manage their todo tasks through natural language.
Key behaviors:
- Be friendly and concise in your responses
- Focus on helping users create, view, complete, and delete tasks
- When users ask to add tasks, extract the task title and any description
- When users ask to see tasks, list their tasks clearly with numbers for easy reference
- When users ask to complete/delete/update tasks, ALWAYS call list_tasks FIRST to identify the task
- Users may refer to tasks by number (e.g., "first task", "task 2") or by title/content
- Use the available tools to perform all task operations
- Never make up task information - always use the tools
- The user_id parameter is automatically injected - do NOT ask the user for it
Task identification workflow:
1. When user wants to update/delete/complete a task, FIRST call list_tasks
2. Show the user the tasks with numbers: "1. Task title", "2. Task title", etc.
3. If user specified a number or title, match it to get the task_id
4. Then call the appropriate tool (update_task, delete_task, complete_task) with the task_id
Available tools:
- add_task: Create a new task with title and optional description
- list_tasks: Show all tasks with their IDs (call this FIRST before update/delete/complete)
- complete_task: Mark a task as completed (requires task_id from list_tasks)
- delete_task: Remove a task permanently (requires task_id from list_tasks)
- update_task: Change a task's title or description (requires task_id from list_tasks)
Note: user_id is automatically provided for all tool calls. Never ask the user for their user ID."""
def __init__(self, user_id: UUID):
"""
Initialize the TodoAgent for a specific user.
Args:
user_id: The user's UUID for data isolation
"""
self.user_id = user_id
# Log API key info for debugging (don't log full key)
key_preview = settings.openai_api_key[:10] if settings.openai_api_key else "None"
key_length = len(settings.openai_api_key) if settings.openai_api_key else 0
logger.info(f"TodoAgent initializing with OpenAI API key: {key_preview}... (length: {key_length})")
self.client = OpenAI(api_key=settings.openai_api_key)
self.model = settings.openai_model
# Get MCP server and register tools as OpenAI functions
self.mcp_server = get_mcp_server()
self.tools = self._get_openai_tools()
logger.info(f"TodoAgent initialized for user {user_id}")
def _get_openai_tools(self) -> List[Dict[str, Any]]:
"""
Convert MCP tools to OpenAI function format.
Returns:
List of tool definitions in OpenAI format
Per @specs/001-chatbot-mcp/contracts/mcp-tools.json
"""
tools = []
# Get all registered MCP tools from SimpleMCPRegistry
mcp_tools = self.mcp_server.list_tools()
for tool in mcp_tools:
# Convert MCP tool schema to OpenAI function format
function_def = {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters.copy() # Copy to avoid mutation
}
}
# Add user_id parameter to all tools for data isolation
# Note: user_id is auto-filled and should NOT be in required parameters
if "properties" not in function_def["function"]["parameters"]:
function_def["function"]["parameters"]["properties"] = {}
function_def["function"]["parameters"]["properties"]["user_id"] = {
"type": "string",
"description": "User ID for data isolation (auto-filled, do not ask user)",
}
# Initialize required array if not present
if "required" not in function_def["function"]["parameters"]:
function_def["function"]["parameters"]["required"] = []
# IMPORTANT: Do NOT add user_id to required - it's auto-injected
# This prevents the model from asking the user for their user_id
tools.append(function_def)
logger.info(f"Registered {len(tools)} tools with OpenAI agent")
return tools
async def process_message(
self,
user_message: str,
conversation_history: Optional[List[Dict[str, str]]] = None
) -> AsyncGenerator[str, None]:
"""
Process a user message through the agent with tool execution.
Args:
user_message: The user's message text
conversation_history: Previous messages in the conversation
Yields:
Response text chunks as they're generated
Per @specs/001-chatbot-mcp/plan.md - MCP First with OpenAI Agents
"""
# Build messages array
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT}
]
# Add conversation history if provided
if conversation_history:
messages.extend(conversation_history)
# Add current user message
messages.append({"role": "user", "content": user_message})
logger.info(f"Processing message for user {self.user_id}: {user_message[:50]}...")
# Check if this is an update/delete/complete request
# If so, pre-load tasks to provide context
update_delete_keywords = ["update", "delete", "remove", "change", "modify", "complete", "finish", "mark"]
needs_task_list = any(keyword in user_message.lower() for keyword in update_delete_keywords)
if needs_task_list:
# Get tasks first and add to context
list_tool = self.mcp_server.get_tool("list_tasks")
if list_tool:
tasks_result = await list_tool.handler(user_id=str(self.user_id), include_completed=True)
if tasks_result.get("success") and tasks_result.get("tasks"):
# Add task context to system prompt
task_list = "\n".join([
f"Task {i+1}: ID={t['id']}, Title='{t['title']}', Completed={t['completed']}"
for i, t in enumerate(tasks_result["tasks"])
])
enhanced_prompt = self.SYSTEM_PROMPT + f"\n\nCURRENT USER TASKS:\n{task_list}\n\nWhen user refers to a task by number or title, use the corresponding ID from this list."
messages[0] = {"role": "system", "content": enhanced_prompt}
logger.info(f"Pre-loaded {len(tasks_result['tasks'])} tasks for context")
try:
# Make chat completion request with tools
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools,
tool_choice="auto", # Let model decide when to use tools
temperature=0.7, # Slightly creative but focused
max_tokens=1000, # Reasonable response length
)
# Process response
choice = response.choices[0]
message = choice.message
# Check if model wants to call tools
if message.tool_calls:
# Execute tool calls and collect results
tool_messages = []
for tool_call in message.tool_calls:
result = await self._execute_tool_call(tool_call)
# Add tool result as a tool message
tool_messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
# Add assistant message with tool calls
messages.append({
"role": "assistant",
"content": message.content or "",
"tool_calls": message.tool_calls
})
# Add tool result messages
messages.extend(tool_messages)
# Get follow-up response with tool results
follow_up = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=1000,
)
if follow_up.choices[0].message.content:
yield follow_up.choices[0].message.content
# Direct text response (no tools needed)
elif message.content:
yield message.content
else:
yield "I understand. How can I help you with your tasks?"
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
logger.error(f"Error processing message: {error_type}: {error_msg}", exc_info=True)
# Provide more helpful error messages
if "Connection" in error_msg or "connect" in error_msg.lower():
yield "I'm having trouble connecting to my AI service. Please check if the OpenAI API key is configured correctly in Railway environment variables."
elif "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower():
yield "My AI service credentials are invalid. Please check the OpenAI API key in Railway environment variables."
elif "rate" in error_msg.lower() or "limit" in error_msg.lower():
yield "I've reached my rate limit. Please try again in a moment."
else:
yield f"I encountered an error ({error_type}): {error_msg}"
async def _execute_tool_call(self, tool_call) -> str:
"""
Execute a single tool call from OpenAI.
Args:
tool_call: The OpenAI tool call object
Returns:
Result message to display to user
Per @specs/001-chatbot-mcp/plan.md - MCP First architecture
"""
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# Inject user_id for data isolation
function_args["user_id"] = str(self.user_id)
logger.info(f"Executing tool: {function_name} with args: {function_args}")
try:
# Get the tool from SimpleMCPRegistry
tool = self.mcp_server.get_tool(function_name)
if not tool:
return f"Error: Tool '{function_name}' not found"
# For update_task and delete_task, validate task_id exists first
if function_name in ["update_task", "delete_task", "complete_task"]:
task_id = function_args.get("task_id")
if task_id:
# Verify the task exists and belongs to user before proceeding
list_tool = self.mcp_server.get_tool("list_tasks")
tasks_result = await list_tool.handler(user_id=str(self.user_id), include_completed=True)
valid_task_ids = [t["id"] for t in tasks_result.get("tasks", [])]
if task_id not in valid_task_ids:
# Task doesn't exist or doesn't belong to user
# Provide helpful error with current tasks
if tasks_result.get("tasks"):
task_list = "\n".join([
f"Task {i+1}: {t['title']}"
for i, t in enumerate(tasks_result["tasks"])
])
return f"Error: Task not found. Here are your current tasks:\n{task_list}\n\nPlease specify which task you'd like to {function_name.replace('_', ' ')}."
else:
return "Error: You don't have any tasks yet. Create some tasks first!"
# Execute the tool via MCP
result = await tool.handler(**function_args)
# Parse result
if isinstance(result, dict):
if result.get("success"):
# Format success message based on tool
if function_name == "add_task":
return f"βœ“ Task '{result.get('title')}' created successfully!"
elif function_name == "complete_task":
return f"βœ“ Task '{result.get('title')}' marked as complete!"
elif function_name == "delete_task":
return f"βœ“ Task '{result.get('title')}' deleted."
elif function_name == "update_task":
return f"βœ“ Task updated successfully!"
elif function_name == "list_tasks":
tasks = result.get("tasks", [])
count = result.get("count", 0)
if count == 0:
return "You don't have any tasks yet."
# Number tasks for easy reference
task_list = "\n".join([
f"{i+1}. {t['title']}" + (" βœ“" if t['completed'] else "")
for i, t in enumerate(tasks)
])
return f"You have {count} task(s):\n{task_list}\n\nYou can refer to tasks by number (e.g., \"complete task 1\")"
else:
return "Operation completed successfully!"
else:
return f"Error: {result.get('error', 'Unknown error')}"
return str(result)
except Exception as e:
logger.error(f"Error executing tool {function_name}: {e}")
return f"Error executing {function_name}: {str(e)}"
def create_todo_agent(user_id: UUID) -> TodoAgent:
"""
Factory function to create a TodoAgent instance.
Args:
user_id: The user's UUID
Returns:
Initialized TodoAgent instance
"""
return TodoAgent(user_id)