Todo_App / ai /agents /todo_agent_fixed.py
Abdullahcoder54's picture
push
6a3de9e
from typing import Dict, Any, List, Optional
from agents import (
Agent,
Runner,
RunConfig,
OpenAIChatCompletionsModel,
ModelSettings,
set_tracing_disabled
)
from agents.mcp import MCPServerStdio
from config.settings import settings
from models.conversation import Conversation
from models.message import MessageRoleEnum
import logging
from openai import AsyncOpenAI
import json
import sys
import subprocess
import os
# Disable tracing as shown in the example
set_tracing_disabled(disabled=True)
logger = logging.getLogger(__name__)
class TodoAgentFixed:
"""
AI agent that interprets natural language commands for task management.
Uses OpenAI Agents SDK with Google Gemini API and stdio MCP server for tool integration.
"""
def __init__(self):
# Configure the OpenAI client with Google Gemini API
self.client = AsyncOpenAI(
api_key=settings.gemini_api_key,
base_url="https://generativelanguage.googleapis.com/v1beta/openai/" # Gemini-compatible endpoint
)
# Create the model using the OpenAIChatCompletionsModel as shown in the example
model = OpenAIChatCompletionsModel(
model="gemini-2.0-flash",
openai_client=self.client
)
# Create run configuration as shown in the example
self.config = RunConfig(
model=model,
model_provider=self.client,
)
# Set up the MCP server connection for tools using stdio subprocess approach
# This spawns a local subprocess that communicates via stdio with the agent
self.mcp_server = MCPServerStdio(
name="Todo Management MCP Server",
params={
"command": sys.executable, # Use the same Python executable
"args": ["-m", "ai.mcp.server"], # Use the existing server module
},
# Increase timeout for database operations
client_session_timeout_seconds=30.0
)
# Create the agent using the OpenAI Agents SDK and connect it to the MCP server
self.agent = Agent(
name="TodoAssistant",
instructions="""
You are an AI assistant for a todo management system. Your role is to help users manage their tasks using natural language.
You can perform the following operations:
1. Add tasks
2. List tasks
3. Complete tasks
4. Delete tasks
5. Update tasks
Always respond in a friendly and helpful manner. When a user asks to perform an action,
use the appropriate tool to carry out the request. If you don't understand a request,
ask for clarification.
Remember to respect user privacy - users can only operate on their own tasks.
""",
mcp_servers=[self.mcp_server],
# Disable parallel tool calls to prevent database lock issues
model_settings=ModelSettings(parallel_tool_calls=False)
)
async def process_message(self, user_id: str, message: str, conversation: Conversation) -> Dict[str, Any]:
"""
Process a user message and return appropriate response and tool calls.
"""
try:
# Run the agent with the user message using the configuration as shown in the example
# Use the MCP server in a context manager to ensure proper lifecycle
async with self.mcp_server:
result = await Runner.run(
self.agent,
input=f"[USER_ID: {user_id}] {message}",
run_config=self.config
)
# Process the response
message_content = result.final_output if result.final_output else "I processed your request."
# Extract tool calls if any (these would be handled by the agent framework through MCP)
tool_calls = []
requires_action = False
# Format the response
conversation_id = str(conversation.id) if conversation else "unknown"
formatted_result = {
"response": message_content,
"conversation_id": conversation_id,
"tool_calls": tool_calls,
"requires_action": requires_action
}
logger.info(f"Processed message for user {user_id}: {formatted_result}")
return formatted_result
except Exception as e:
logger.error(f"Error processing message for user {user_id}: {str(e)}")
conversation_id = str(conversation.id) if conversation else "unknown"
return {
"response": f"I'm sorry, I encountered an error processing your request: {str(e)}",
"conversation_id": conversation_id,
"tool_calls": [],
"requires_action": False
}