Spaces:
Sleeping
Sleeping
| from typing import Dict, Any, List, Optional | |
| from agents import ( | |
| Agent, | |
| Runner, | |
| RunConfig, | |
| OpenAIChatCompletionsModel, | |
| ModelSettings, | |
| set_tracing_disabled | |
| ) | |
| from agents.mcp import MCPServerStdio | |
| from config.settings import settings | |
| from models.conversation import Conversation | |
| from models.message import MessageRoleEnum | |
| import logging | |
| from openai import AsyncOpenAI | |
| import json | |
| import sys | |
| import subprocess | |
| import os | |
| # Disable tracing as shown in the example | |
| set_tracing_disabled(disabled=True) | |
| logger = logging.getLogger(__name__) | |
| class TodoAgentFixed: | |
| """ | |
| AI agent that interprets natural language commands for task management. | |
| Uses OpenAI Agents SDK with Google Gemini API and stdio MCP server for tool integration. | |
| """ | |
| def __init__(self): | |
| # Configure the OpenAI client with Google Gemini API | |
| self.client = AsyncOpenAI( | |
| api_key=settings.gemini_api_key, | |
| base_url="https://generativelanguage.googleapis.com/v1beta/openai/" # Gemini-compatible endpoint | |
| ) | |
| # Create the model using the OpenAIChatCompletionsModel as shown in the example | |
| model = OpenAIChatCompletionsModel( | |
| model="gemini-2.0-flash", | |
| openai_client=self.client | |
| ) | |
| # Create run configuration as shown in the example | |
| self.config = RunConfig( | |
| model=model, | |
| model_provider=self.client, | |
| ) | |
| # Set up the MCP server connection for tools using stdio subprocess approach | |
| # This spawns a local subprocess that communicates via stdio with the agent | |
| self.mcp_server = MCPServerStdio( | |
| name="Todo Management MCP Server", | |
| params={ | |
| "command": sys.executable, # Use the same Python executable | |
| "args": ["-m", "ai.mcp.server"], # Use the existing server module | |
| }, | |
| # Increase timeout for database operations | |
| client_session_timeout_seconds=30.0 | |
| ) | |
| # Create the agent using the OpenAI Agents SDK and connect it to the MCP server | |
| self.agent = Agent( | |
| name="TodoAssistant", | |
| instructions=""" | |
| You are an AI assistant for a todo management system. Your role is to help users manage their tasks using natural language. | |
| You can perform the following operations: | |
| 1. Add tasks | |
| 2. List tasks | |
| 3. Complete tasks | |
| 4. Delete tasks | |
| 5. Update tasks | |
| Always respond in a friendly and helpful manner. When a user asks to perform an action, | |
| use the appropriate tool to carry out the request. If you don't understand a request, | |
| ask for clarification. | |
| Remember to respect user privacy - users can only operate on their own tasks. | |
| """, | |
| mcp_servers=[self.mcp_server], | |
| # Disable parallel tool calls to prevent database lock issues | |
| model_settings=ModelSettings(parallel_tool_calls=False) | |
| ) | |
| async def process_message(self, user_id: str, message: str, conversation: Conversation) -> Dict[str, Any]: | |
| """ | |
| Process a user message and return appropriate response and tool calls. | |
| """ | |
| try: | |
| # Run the agent with the user message using the configuration as shown in the example | |
| # Use the MCP server in a context manager to ensure proper lifecycle | |
| async with self.mcp_server: | |
| result = await Runner.run( | |
| self.agent, | |
| input=f"[USER_ID: {user_id}] {message}", | |
| run_config=self.config | |
| ) | |
| # Process the response | |
| message_content = result.final_output if result.final_output else "I processed your request." | |
| # Extract tool calls if any (these would be handled by the agent framework through MCP) | |
| tool_calls = [] | |
| requires_action = False | |
| # Format the response | |
| conversation_id = str(conversation.id) if conversation else "unknown" | |
| formatted_result = { | |
| "response": message_content, | |
| "conversation_id": conversation_id, | |
| "tool_calls": tool_calls, | |
| "requires_action": requires_action | |
| } | |
| logger.info(f"Processed message for user {user_id}: {formatted_result}") | |
| return formatted_result | |
| except Exception as e: | |
| logger.error(f"Error processing message for user {user_id}: {str(e)}") | |
| conversation_id = str(conversation.id) if conversation else "unknown" | |
| return { | |
| "response": f"I'm sorry, I encountered an error processing your request: {str(e)}", | |
| "conversation_id": conversation_id, | |
| "tool_calls": [], | |
| "requires_action": False | |
| } |