Spaces:
Running
Running
| """Simple AI agent implementation using OpenAI SDK with function calling. | |
| [From]: specs/004-ai-chatbot/plan.md - AI Agent Layer | |
| This is a simplified implementation that uses OpenAI's function calling | |
| capabilities directly through the AsyncOpenAI client with Gemini. | |
| """ | |
| import uuid | |
| import logging | |
| from typing import Optional, List, Dict, Any | |
| from openai import AsyncOpenAI | |
| from core.config import get_settings | |
| from mcp_server.tools import ( | |
| add_task, list_tasks, update_task, complete_task, delete_task, | |
| complete_all_tasks, delete_all_tasks | |
| ) | |
| logger = logging.getLogger(__name__) | |
| settings = get_settings() | |
| # Global client instance | |
| _gemini_client: Optional[AsyncOpenAI] = None | |
| def get_gemini_client() -> AsyncOpenAI: | |
| """Get or create the AsyncOpenAI client for Gemini API. | |
| [From]: specs/004-ai-chatbot/plan.md - Gemini Integration Pattern | |
| The client uses Gemini's OpenAI-compatible endpoint: | |
| https://generativelanguage.googleapis.com/v1beta/openai/ | |
| Returns: | |
| AsyncOpenAI: Configured client for Gemini API | |
| Raises: | |
| ValueError: If GEMINI_API_KEY is not configured | |
| """ | |
| global _gemini_client | |
| if _gemini_client is None: | |
| if not settings.gemini_api_key: | |
| raise ValueError( | |
| "GEMINI_API_KEY is not configured. " | |
| "Please set GEMINI_API_KEY in your environment variables." | |
| ) | |
| _gemini_client = AsyncOpenAI( | |
| base_url="https://generativelanguage.googleapis.com/v1beta/openai/", | |
| api_key=settings.gemini_api_key | |
| ) | |
| logger.info("✅ Gemini AI client initialized via AsyncOpenAI adapter") | |
| return _gemini_client | |
| # Define tools for function calling | |
| TOOLS_DEFINITION = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "add_task", | |
| "description": "Create a new task in the user's todo list. Use this when the user wants to create, add, or remind themselves about a task.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_id": { | |
| "type": "string", | |
| "description": "User ID (UUID) who owns this task" | |
| }, | |
| "title": { | |
| "type": "string", | |
| "description": "Task title (brief description)" | |
| }, | |
| "description": { | |
| "type": "string", | |
| "description": "Detailed task description" | |
| }, | |
| "due_date": { | |
| "type": "string", | |
| "description": "Due date in ISO 8601 format or relative terms like 'tomorrow', 'next week'" | |
| }, | |
| "priority": { | |
| "type": "string", | |
| "enum": ["low", "medium", "high"], | |
| "description": "Task priority level" | |
| } | |
| }, | |
| "required": ["user_id", "title"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "list_tasks", | |
| "description": "List and filter tasks from the user's todo list. Use this when the user wants to see their tasks, ask what they have to do, or request a filtered view of their tasks.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_id": { | |
| "type": "string", | |
| "description": "User ID (UUID) who owns these tasks" | |
| }, | |
| "status": { | |
| "type": "string", | |
| "enum": ["all", "pending", "completed"], | |
| "description": "Filter by completion status" | |
| }, | |
| "due_within_days": { | |
| "type": "number", | |
| "description": "Only show tasks due within X days" | |
| }, | |
| "limit": { | |
| "type": "number", | |
| "description": "Maximum tasks to return (1-100)" | |
| } | |
| }, | |
| "required": ["user_id"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "update_task", | |
| "description": "Update an existing task in the user's todo list. Use this when the user wants to modify, change, or edit an existing task. You need the task_id to update.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_id": { | |
| "type": "string", | |
| "description": "User ID (UUID) who owns this task" | |
| }, | |
| "task_id": { | |
| "type": "string", | |
| "description": "Task ID (UUID) of the task to update" | |
| }, | |
| "title": { | |
| "type": "string", | |
| "description": "New task title" | |
| }, | |
| "description": { | |
| "type": "string", | |
| "description": "New task description" | |
| }, | |
| "due_date": { | |
| "type": "string", | |
| "description": "New due date in ISO 8601 format or relative terms" | |
| }, | |
| "priority": { | |
| "type": "string", | |
| "enum": ["low", "medium", "high"], | |
| "description": "New task priority level" | |
| }, | |
| "completed": { | |
| "type": "boolean", | |
| "description": "Mark task as completed or not completed" | |
| } | |
| }, | |
| "required": ["user_id", "task_id"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "complete_task", | |
| "description": "Mark a task as completed or not completed (toggle completion status). Use this when the user wants to mark a task as done, finished, complete, or conversely as pending, not done, incomplete.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_id": { | |
| "type": "string", | |
| "description": "User ID (UUID) who owns this task" | |
| }, | |
| "task_id": { | |
| "type": "string", | |
| "description": "Task ID (UUID) of the task to mark complete/incomplete" | |
| }, | |
| "completed": { | |
| "type": "boolean", | |
| "description": "True to mark complete, False to mark incomplete/pending" | |
| } | |
| }, | |
| "required": ["user_id", "task_id", "completed"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "delete_task", | |
| "description": "Delete a task from the user's todo list permanently. Use this when the user wants to remove, delete, or get rid of a task.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_id": { | |
| "type": "string", | |
| "description": "User ID (UUID) who owns this task" | |
| }, | |
| "task_id": { | |
| "type": "string", | |
| "description": "Task ID (UUID) of the task to delete" | |
| } | |
| }, | |
| "required": ["user_id", "task_id"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "complete_all_tasks", | |
| "description": "Mark all tasks as completed or not completed. Use this when the user wants to mark all tasks as done, complete, finished, or conversely mark all as pending or incomplete.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_id": { | |
| "type": "string", | |
| "description": "User ID (UUID) who owns these tasks" | |
| }, | |
| "completed": { | |
| "type": "boolean", | |
| "description": "True to mark all tasks complete, False to mark all incomplete" | |
| }, | |
| "status_filter": { | |
| "type": "string", | |
| "enum": ["pending", "completed"], | |
| "description": "Optional: Only affect tasks with this status (e.g., only mark pending tasks as complete)" | |
| } | |
| }, | |
| "required": ["user_id", "completed"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "delete_all_tasks", | |
| "description": "Delete all tasks from the user's todo list permanently. This is a destructive operation - always inform the user how many tasks will be deleted and ask for confirmation before calling with confirmed=true.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_id": { | |
| "type": "string", | |
| "description": "User ID (UUID) who owns these tasks" | |
| }, | |
| "confirmed": { | |
| "type": "boolean", | |
| "description": "Must be true to actually delete. First call with confirmed=false to show count, then call again with confirmed=true after user confirms." | |
| }, | |
| "status_filter": { | |
| "type": "string", | |
| "enum": ["pending", "completed"], | |
| "description": "Optional: Only delete tasks with this status (e.g., only delete completed tasks)" | |
| } | |
| }, | |
| "required": ["user_id", "confirmed"] | |
| } | |
| } | |
| } | |
| ] | |
| async def run_agent( | |
| messages: List[Dict[str, str]], | |
| user_id: str, | |
| context: Optional[Dict] = None | |
| ) -> str: | |
| """Run the task agent with conversation history. | |
| [From]: specs/004-ai-chatbot/plan.md - Agent Execution Pattern | |
| Args: | |
| messages: Conversation history in OpenAI format | |
| user_id: User ID for context | |
| context: Optional additional context | |
| Returns: | |
| str: Agent's response message | |
| Raises: | |
| ValueError: If agent initialization fails | |
| ConnectionError: If Gemini API is unreachable | |
| Exception: If agent execution fails | |
| """ | |
| try: | |
| client = get_gemini_client() | |
| # System prompt with user_id context | |
| system_prompt = f"""You are a helpful task management assistant. | |
| Users can create, list, update, complete, and delete tasks through natural language. | |
| IMPORTANT: You are currently assisting user with ID: {user_id} | |
| When calling tools, ALWAYS include this user_id parameter. Do not ask the user for their user ID. | |
| Your capabilities: | |
| - Create tasks with title, description, due date, and priority | |
| - List and filter tasks (e.g., "show me high priority tasks due this week") | |
| - Update existing tasks (title, description, due date, priority) | |
| - Mark tasks as complete or incomplete (individual or all tasks) | |
| - Delete tasks (individual or all tasks) | |
| - Handle multi-action requests in a single response (e.g., "add a task and list my tasks") | |
| Guidelines for task references: | |
| - Users may refer to tasks by position (e.g., "task 1", "the first task", "my last task") | |
| - When user references a task by position, ALWAYS first list tasks to identify the correct task_id | |
| - Then confirm with the user before proceeding (e.g., "I found 'Buy groceries' as your first task. Is that the one you want to mark complete?") | |
| - Example flow: User says "mark task 1 as done" → You list_tasks → Find first task → Confirm → complete_task with correct task_id | |
| Guidelines for bulk operations: | |
| - "Mark all tasks as complete" → Use complete_all_tasks with completed=True | |
| - "Mark all pending tasks as complete" → Use complete_all_tasks with completed=True, status_filter="pending" | |
| - "Delete all tasks" → First call delete_all_tasks with confirmed=false to show count → Wait for user confirmation → Call again with confirmed=True | |
| Safety confirmations: | |
| - For delete_all_tasks: ALWAYS call with confirmed=false first, inform user of count, and ask for explicit confirmation | |
| - Example: "This will delete 5 tasks. Please confirm by saying 'yes' or 'confirm'." | |
| Empty task list handling: | |
| - When users have no tasks, respond warmly and offer to help create one | |
| - Examples: "You don't have any tasks yet. Would you like me to help you create one?" | |
| - For filtered queries with no results: "No tasks match that criteria. Would you like to see all your tasks instead?" | |
| Task presentation: | |
| - When listing tasks, organize them logically (e.g., pending first, then completed) | |
| - Include key details: title, due date, priority, completion status | |
| - Use clear formatting (bullet points or numbered lists) | |
| - For long lists, offer to filter or show specific categories | |
| Response formatting: | |
| - When completing tasks: Include the task title and confirmation (e.g., "✅ 'Buy groceries' marked as complete") | |
| - When completing multiple tasks: Include count (e.g., "✅ 3 tasks marked as complete") | |
| - When updating tasks: Describe what changed (e.g., "✅ Task updated: title changed to 'Buy groceries and milk'") | |
| - When deleting tasks: Include title and confirmation (e.g., "✅ 'Buy groceries' deleted") | |
| When you need to create a task, use the add_task function with user_id="{user_id}". | |
| When you need to list tasks, use the list_tasks function with user_id="{user_id}". | |
| When you need to update a task, use the update_task function with user_id="{user_id}" and task_id. | |
| When you need to mark a task complete/incomplete, use the complete_task function with user_id="{user_id}", task_id, and completed=True/False. | |
| When you need to mark all tasks complete/incomplete, use the complete_all_tasks function with user_id="{user_id}" and completed=True/False. | |
| When you need to delete a task, use the delete_task function with user_id="{user_id}" and task_id. | |
| When you need to delete all tasks, use the delete_all_tasks function with user_id="{user_id}" and confirmed=false first, then confirmed=true after user confirms. | |
| """ | |
| # Prepare messages with system prompt | |
| api_messages = [{"role": "system", "content": system_prompt}] | |
| api_messages.extend(messages) | |
| # Call the API | |
| response = await client.chat.completions.create( | |
| model=settings.gemini_model, | |
| messages=api_messages, | |
| tools=TOOLS_DEFINITION, | |
| tool_choice="auto" | |
| ) | |
| assistant_message = response.choices[0].message | |
| # Handle tool calls | |
| if assistant_message.tool_calls: | |
| tool_results = [] | |
| for tool_call in assistant_message.tool_calls: | |
| function_name = tool_call.function.name | |
| function_args = tool_call.function.arguments | |
| # Broadcast tool starting event via WebSocket | |
| # [From]: specs/004-ai-chatbot/research.md - Section 6 | |
| try: | |
| from ws_manager.events import broadcast_tool_starting | |
| # Format tool name for display | |
| display_name = function_name.replace("_", " ").title() | |
| await broadcast_tool_starting(user_id, display_name, {}) | |
| except Exception as ws_e: | |
| logger.warning(f"Failed to broadcast tool_starting for {function_name}: {ws_e}") | |
| # Add user_id to function args if not present | |
| import json | |
| args = json.loads(function_args) | |
| if "user_id" not in args: | |
| args["user_id"] = user_id | |
| # Call the appropriate function | |
| try: | |
| if function_name == "add_task": | |
| result = await add_task.add_task(**args) | |
| elif function_name == "list_tasks": | |
| result = await list_tasks.list_tasks(**args) | |
| elif function_name == "update_task": | |
| result = await update_task.update_task(**args) | |
| elif function_name == "complete_task": | |
| result = await complete_task.complete_task(**args) | |
| elif function_name == "delete_task": | |
| result = await delete_task.delete_task(**args) | |
| elif function_name == "complete_all_tasks": | |
| result = await complete_all_tasks.complete_all_tasks(**args) | |
| elif function_name == "delete_all_tasks": | |
| result = await delete_all_tasks.delete_all_tasks(**args) | |
| else: | |
| result = {"error": f"Unknown function: {function_name}"} | |
| tool_results.append({ | |
| "tool_call_id": tool_call.id, | |
| "role": "tool", | |
| "name": function_name, | |
| "content": json.dumps(result) | |
| }) | |
| # Broadcast tool complete event | |
| try: | |
| from ws_manager.events import broadcast_tool_complete | |
| display_name = function_name.replace("_", " ").title() | |
| await broadcast_tool_complete(user_id, display_name, result) | |
| except Exception as ws_e: | |
| logger.warning(f"Failed to broadcast tool_complete for {function_name}: {ws_e}") | |
| except Exception as e: | |
| # Broadcast tool error | |
| try: | |
| from ws_manager.events import broadcast_tool_error | |
| display_name = function_name.replace("_", " ").title() | |
| await broadcast_tool_error(user_id, display_name, str(e)) | |
| except Exception as ws_e: | |
| logger.warning(f"Failed to broadcast tool_error for {function_name}: {ws_e}") | |
| raise | |
| # Get final response from assistant | |
| api_messages.append(assistant_message) | |
| api_messages.extend(tool_results) | |
| final_response = await client.chat.completions.create( | |
| model=settings.gemini_model, | |
| messages=api_messages | |
| ) | |
| # Ensure we always return a non-empty string | |
| content = final_response.choices[0].message.content | |
| return content or "I've processed your request. Is there anything else you'd like help with?" | |
| else: | |
| # No tool calls, return the content directly | |
| # Ensure we always return a non-empty string | |
| content = assistant_message.content | |
| return content or "I understand. How can I help you with your tasks?" | |
| except ValueError as e: | |
| # Re-raise configuration errors | |
| logger.error(f"❌ Agent configuration error: {e}") | |
| raise | |
| except Exception as e: | |
| # Detect specific error types | |
| error_msg = str(e).lower() | |
| if "connection" in error_msg or "network" in error_msg: | |
| logger.error(f"❌ Gemini API connection error: {e}") | |
| raise ConnectionError( | |
| "Unable to reach AI service. Please check your internet connection " | |
| "and try again later." | |
| ) | |
| elif "timeout" in error_msg: | |
| logger.error(f"❌ Gemini API timeout error: {e}") | |
| raise TimeoutError( | |
| "AI service request timed out. Please try again." | |
| ) | |
| elif "rate limit" in error_msg or "quota" in error_msg: | |
| logger.error(f"❌ Gemini API rate limit error: {e}") | |
| raise Exception( | |
| "AI service rate limit exceeded. Please wait a moment and try again." | |
| ) | |
| elif "authentication" in error_msg or "unauthorized" in error_msg or "401" in error_msg: | |
| logger.error(f"❌ Gemini API authentication error: {e}") | |
| raise Exception( | |
| "AI service authentication failed. Please check your API key configuration." | |
| ) | |
| else: | |
| # Unknown error | |
| logger.error(f"❌ Agent execution error: {e}") | |
| raise Exception( | |
| f"AI service temporarily unavailable: {str(e)}" | |
| ) | |
| def is_gemini_configured() -> bool: | |
| """Check if Gemini API is properly configured. | |
| Returns: | |
| bool: True if GEMINI_API_KEY is set, False otherwise | |
| """ | |
| return bool(settings.gemini_api_key) | |
| __all__ = [ | |
| "get_gemini_client", | |
| "run_agent", | |
| "is_gemini_configured" | |
| ] | |