Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel # Used to define expected request data | |
| from dotenv import load_dotenv # To load our secret API keys from .env file | |
| # --- LlamaIndex and Composio Imports --- | |
| from composio_llamaindex import ComposioToolSet, Action # Composio tools for LlamaIndex | |
| from llama_index.core.agent import FunctionCallingAgentWorker # The agent framework | |
| from llama_index.core.llms import ChatMessage # Standard message format | |
| from llama_index.llms.gemini import Gemini # The Gemini LLM integration | |
| # --- Load API Keys Safely --- | |
| load_dotenv() # Load variables from the .env file | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| composio_api_key = os.getenv("COMPOSIO_API_KEY") | |
| # Basic check to ensure keys are loaded | |
| if not gemini_api_key: | |
| raise ValueError("GEMINI_API_KEY not found. Make sure it's set in your .env file.") | |
| if not composio_api_key: | |
| raise ValueError("COMPOSIO_API_KEY not found. Make sure it's set in your .env file.") | |
| # --- Global Agent Variable --- | |
| # We'll set this up when the app starts | |
| agent = None | |
| # --- Function to Setup the Agent --- | |
| def setup_gmail_agent(): | |
| """Initializes the LlamaIndex agent with Gemini and Composio tools.""" | |
| global agent # Allow modification of the global 'agent' variable | |
| try: | |
| print("Initializing LLM (Gemini)...") | |
| # Initialize the Gemini Language Model | |
| # You might need to specify a model, e.g., "models/gemini-pro" if default isn't sufficient | |
| llm = Gemini(api_key=gemini_api_key) | |
| print("Initializing Composio ToolSet...") | |
| # Initialize Composio ToolSet - it securely connects to your Gmail via the setup you did | |
| composio_toolset = ComposioToolSet(api_key=composio_api_key) | |
| print("Fetching Gmail tools from Composio...") | |
| # Get specific Gmail tools (Actions) you want the agent to use | |
| gmail_tools = composio_toolset.get_tools(actions=[ | |
| # Action.GMAIL_SEND_EMAIL, | |
| Action.GMAIL_CREATE_EMAIL_DRAFT, | |
| # Action.GMAIL_REPLY_TO_THREAD, | |
| # Action.GMAIL_FETCH_MESSAGE_BY_THREAD_ID, | |
| # Action.GMAIL_LIST_THREADS, # Good for getting recent conversations | |
| # Action.GMAIL_FETCH_EMAILS, # More generic fetch if needed | |
| ]) | |
| print(f"Loaded {len(gmail_tools)} Gmail tools.") | |
| # --- Define the Agent's "Personality" or Instructions --- | |
| system_prompt_content = """You are a helpful Gmail assistant. | |
| Your tasks are to: | |
| 1. Read and list recent email threads when asked. | |
| 2. Read the content of a specific email/thread when given a thread ID. | |
| 3. Reply to email threads when requested, using the provided thread ID and message body. | |
| 4. Send new emails when requested, using the recipient, subject, and body provided. | |
| Use the available tools precisely based on the user's request. | |
| Be concise in your confirmation messages. | |
| If you need a 'thread_id' to reply or fetch a specific message, and the user hasn't provided one, ask them to list threads first to get the ID. | |
| """ | |
| prefix_messages = [ChatMessage(role="system", content=system_prompt_content)] | |
| print("Creating the Function Calling Agent Worker...") | |
| # Create the agent worker - this combines the LLM, tools, and instructions | |
| agent_worker = FunctionCallingAgentWorker( | |
| tools=gmail_tools, | |
| llm=llm, | |
| prefix_messages=prefix_messages, | |
| max_function_calls=10, # Max tool uses per query | |
| allow_parallel_tool_calls=False, # Process tools one by one | |
| verbose=True # Print agent's thinking process (helpful for debugging) | |
| ) | |
| agent = agent_worker.as_agent() # Make the worker usable as an agent | |
| print("--- Gmail Agent Initialized Successfully! ---") | |
| except Exception as e: | |
| print(f"---!!! ERROR Initializing Agent: {e} !!!---") | |
| agent = None # Ensure agent is None if setup fails | |
| # --- FastAPI Application Setup --- | |
| print("Setting up FastAPI app...") | |
| app = FastAPI( | |
| title="Gmail AI Agent API (Gemini + Composio)", | |
| description="Backend API for a Gmail assistant powered by Gemini, LlamaIndex, and Composio.", | |
| version="1.0.0", | |
| ) | |
| # --- Define Expected Input Data Structure --- | |
| class QueryRequest(BaseModel): | |
| query: str # We expect requests to have a single field named "query" | |
| # --- Run Agent Setup When FastAPI Starts --- | |
| async def startup_event(): | |
| """This function runs automatically when the FastAPI server starts.""" | |
| print("Application startup: Initializing Gmail agent...") | |
| setup_gmail_agent() | |
| if agent is None: | |
| print("WARNING: Agent did not initialize successfully on startup.") | |
| # --- Health Check Endpoint --- | |
| async def health_check(): | |
| """A simple endpoint to check if the server is running and agent is initialized.""" | |
| if agent: | |
| return {"status": "healthy", "agent_initialized": True} | |
| else: | |
| # Return unhealthy status if agent setup failed | |
| return {"status": "unhealthy", "agent_initialized": False, "message": "Agent initialization failed during startup."} | |
| # --- Main Interaction Endpoint --- | |
| # We use POST because the user is sending data (their query) | |
| async def process_query(request: QueryRequest): | |
| """Receives a user query, processes it with the agent, and returns the response.""" | |
| print(f"Received query: {request.query}") | |
| if agent is None: | |
| # If agent isn't ready, return an error | |
| raise HTTPException(status_code=503, detail="Agent not initialized or initialization failed. Check server logs.") | |
| try: | |
| # --- Core Logic: Use the Agent --- | |
| # Use agent.achat for asynchronous execution (better performance for web servers) | |
| agent_response = await agent.achat(request.query) | |
| print(f"Agent response: {agent_response}") | |
| # Return the agent's response | |
| return {"status": "success", "response": str(agent_response)} # Convert response to string | |
| except Exception as e: | |
| # Handle errors during processing | |
| print(f"---!!! ERROR processing query '{request.query}': {e} !!!---") | |
| raise HTTPException(status_code=500, detail=f"Error processing your request: {str(e)}") | |
| # --- Root endpoint (optional, just to show the app is running) --- | |
| async def root(): | |
| return {"message": "Gmail Agent API is running. Use the /interact endpoint to send queries."} | |
| print("FastAPI app setup complete. Waiting for server to start...") |