import gradio as gr import os from multi_tool_agent.gmail_agent_logic import ( get_gmail_service, search_emails, summarize_email_with_gemini, generate_reply_with_gemini, send_reply, get_total_unread_count, get_emails_received_today_count, list_recent_emails, get_auth_url, exchange_code_for_credentials, store_user_credentials, create_session_id, is_user_authenticated, create_oauth_flow ) import google.generativeai as genai from dotenv import load_dotenv import json load_dotenv() # --- Deployment: Write credentials from environment variables to files --- if 'GMAIL_CREDENTIALS_JSON' in os.environ: with open('credentials.json', 'w') as f: f.write(os.environ.get('GMAIL_CREDENTIALS_JSON')) print("Created credentials.json from environment variable.") if 'GMAIL_TOKEN_JSON' in os.environ: with open('token.json', 'w') as f: f.write(os.environ.get('GMAIL_TOKEN_JSON')) print("Created token.json from environment variable.") # --- End Deployment --- try: gemini_api_key = os.environ.get("GOOGLE_API_KEY") if not gemini_api_key: raise ValueError("GOOGLE_API_KEY not found in environment variables. Please set it in the .env file and restart.") genai.configure(api_key=gemini_api_key) gemini_model = genai.GenerativeModel('gemini-2.5-flash-lite') print("Gemini model initialized successfully for Gradio app.") except Exception as e: raise Exception(f"FATAL: Error initializing Gemini model: {e}") # --- LLM Prompt for Intent Recognition --- CONTROLLER_PROMPT_TEMPLATE = """ You are the controller for a Gmail assistant. Analyze the user's message and determine the primary intent and necessary parameters based on the conversation history. Available intents and their required parameters: - LIST_RECENT: requires optional 'count' (integer, default 5) to list the most recent emails in the inbox. - SEARCH: requires 'query' (e.g., "from:a@b.com subject:hello") - SUMMARIZE_BY_ID: requires 'email_id' - SUMMARIZE_LAST: requires context indicating a specific email (e.g., from a previous search or mention). Check context['last_email_details']['id']. - GENERATE_REPLY: requires 'reply_instructions' (what the user wants to say) and context from a previously summarized email (context['last_email_details'] required). - SEND_REPLY: requires confirmation (e.g., "yes", "send it") and context from a previously generated reply draft (context['last_reply_draft'] and context['last_email_details'] required). - GET_UNREAD_COUNT: No parameters required. - GET_TODAY_EMAIL_COUNT: No parameters required. - GREETING/OTHER: if the intent is unclear, a simple greeting, or doesn't match the capabilities. Conversation History: {history_string} Current User message: "{user_message}" Current Context (JSON): {context_json} Based ONLY on the **Current User message** and the **Current Context**, determine the single most likely intent and extract the parameters. Output your decision STRICTLY as a JSON object with 'intent' (string) and 'parameters' (dictionary) keys. If parameters are not applicable or derivable, use an empty dictionary {{}}. Example for "list my last 3 emails": {{"intent": "LIST_RECENT", "parameters": {{"count": 3}}}} Example for "search for emails from test@test.com": {{"intent": "SEARCH", "parameters": {{"query": "from:test@test.com"}}}} Example for "summarize email with id 123": {{"intent": "SUMMARIZE_BY_ID", "parameters": {{"email_id": "123"}}}} Example for "draft a reply saying thanks": {{"intent": "GENERATE_REPLY", "parameters": {{"reply_instructions": "saying thanks"}}}} Example for "yes send it": {{"intent": "SEND_REPLY", "parameters": {{}}}} Example for "how many unread emails do I have": {{"intent": "GET_UNREAD_COUNT", "parameters": {{}}}} Example for "how many emails today": {{"intent": "GET_TODAY_EMAIL_COUNT", "parameters": {{}}}} Example for "hello there": {{"intent": "GREETING/OTHER", "parameters": {{}}}} JSON Response: """ def authenticate_user(auth_code): """Handle user authentication with OAuth code.""" if not auth_code or not auth_code.strip(): return None, "Please enter the authorization code." try: flow = create_oauth_flow() auth_url, _ = flow.authorization_url(prompt='consent') # Exchange code for credentials credentials = exchange_code_for_credentials(auth_code.strip(), flow) if credentials: # Create session and store credentials session_id = create_session_id() if store_user_credentials(session_id, credentials): return session_id, "Authentication successful! You can now use Gmail features." else: return None, "Failed to store credentials. Please try again." else: return None, "Invalid authorization code. Please try again." except Exception as e: return None, f"Authentication error: {e}" # --- Chatbot Logic --- def handle_chat(message, history, session_state): """ Processes user message using an LLM controller, interacts with Gmail/Gemini tools. """ # Get session info session_id = session_state.get("session_id") if session_state else None conversation_context = session_state.get("conversation_context", { "last_email_summary": None, "last_email_details": {}, "last_reply_draft": None, }) if session_state else { "last_email_summary": None, "last_email_details": {}, "last_reply_draft": None, } # Check authentication if not session_id or not is_user_authenticated(session_id): return "🔐 Please authenticate with Gmail first using the Authentication tab above." # Basic checks if not gemini_model: return "Error: Gemini model is not available. Check API key and configuration." # --- 1. Call LLM Controller --- history_string = "" if history: for h in history: history_string += f"User: {h[0]}\nAssistant: {h[1]}\n" context_json = json.dumps(conversation_context, indent=2) prompt = CONTROLLER_PROMPT_TEMPLATE.format( history_string=history_string, user_message=message, context_json=context_json ) try: print(f"--- Sending Controller Prompt ---\n{prompt}\n------------------------------") controller_response = gemini_model.generate_content(prompt) print(f"--- Controller Response ---\n{controller_response.text}\n--------------------------- ") cleaned_response_text = controller_response.text.strip().replace('```json', '').replace('```', '') decision = json.loads(cleaned_response_text) intent = decision.get("intent") parameters = decision.get("parameters", {}) except json.JSONDecodeError as e: print(f"Error decoding controller JSON: {e}\nResponse was: {controller_response.text}") return "Sorry, I had trouble understanding that request (JSON Decode Error)." except Exception as e: print(f"Error during controller LLM call: {e}") return f"Sorry, an error occurred while processing your request: {e}" # --- 2. Execute Action based on Intent --- try: if intent == "LIST_RECENT": count = parameters.get("count", 5) # Default to 5 if not specified try: count = int(count) except ValueError: count = 5 list_result = list_recent_emails(user_id='me', max_results=count, session_id=session_id) if list_result["status"] == "success" and list_result["emails"]: email_strings = [] for email in list_result["emails"]: email_str = ( f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline f"From: {email.get('from', 'N/A')}\n\n" # Double newline f"Date: {email.get('date', 'N/A')}" ) email_strings.append(email_str) response_text = f"Here are your last {len(email_strings)} emails:\n\n" + "\n\n---\n\n".join(email_strings) conversation_context["last_email_details"] = list_result["emails"][0] # Store first found conversation_context["last_reply_draft"] = None # Clear any old draft elif list_result["status"] == "success": response_text = "No emails found in your inbox." else: response_text = f"Error listing recent emails: {list_result.get('error_message', 'Unknown error')}" elif intent == "SEARCH": query = parameters.get("query") if not query: response_text = "My controller understood you want to search, but didn't find search criteria. Please specify (e.g., 'from:...' or 'subject:...')." else: search_result = search_emails(query=query, user_id='me', session_id=session_id) if search_result["status"] == "success" and search_result["emails"]: # Format emails email_strings = [] for email in search_result["emails"]: email_str = ( f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline f"From: {email.get('from', 'N/A')}\n\n" # Double newline f"Date: {email.get('date', 'N/A')}" ) email_strings.append(email_str) response_text = "Found emails:\n\n" + "\n\n---\n\n".join(email_strings) # Store the first result's ID for potential follow-up conversation_context["last_email_details"] = search_result["emails"][0] # Store first found conversation_context["last_reply_draft"] = None # Clear any old draft elif search_result["status"] == "success": response_text = "No emails found matching your query." else: response_text = f"Error searching emails: {search_result.get('error_message', 'Unknown error')}" elif intent == "SUMMARIZE_BY_ID": email_id = parameters.get("email_id") if email_id: summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id) if summary_result["status"] == "success": response_text = f"Summary:\n{summary_result['summary']}" conversation_context["last_email_summary"] = summary_result['summary'] conversation_context["last_email_details"] = summary_result # Store all details conversation_context["last_reply_draft"] = None # Clear any old draft else: response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}" else: response_text = "My controller understood you want to summarize by ID, but didn't find an ID. Please provide it." elif intent == "SUMMARIZE_LAST": email_id = conversation_context["last_email_details"].get("id") if email_id: summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id) if summary_result["status"] == "success": response_text = f"Summary of the last mentioned email (ID: {email_id}):\n{summary_result['summary']}" conversation_context["last_email_summary"] = summary_result['summary'] conversation_context["last_email_details"] = summary_result # Store all details conversation_context["last_reply_draft"] = None # Clear any old draft else: response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}" else: response_text = "I don't have a 'last email' in context to summarize. Please search for or specify an email first." elif intent == "GENERATE_REPLY": instructions = parameters.get("reply_instructions", "") details = conversation_context.get("last_email_details", {}) original_body = details.get("original_body") if original_body: # Combine original body with user instructions for the prompt generation_prompt_body = f"User wants reply to address: '{instructions}'\n\nOriginal Email Body:\n{original_body}" reply_result = generate_reply_with_gemini( original_subject=details.get("subject", "No Subject"), original_body=generation_prompt_body ) if reply_result["status"] == "success": response_text = f"Draft Reply:\n------\n{reply_result['reply_body']}\n------\n\nWould you like me to send this reply?" conversation_context["last_reply_draft"] = reply_result['reply_body'] # Store draft else: response_text = f"Error generating reply draft: {reply_result.get('error_message', 'Unknown error')}" else: response_text = "I need the context of an email (specifically its body) to generate a reply. Please summarize an email first." elif intent == "SEND_REPLY": details = conversation_context.get("last_email_details", {}) draft = conversation_context.get("last_reply_draft") if (draft and details.get("sender_email") and details.get("subject") and details.get("thread_id") and details.get("original_message_id")): send_result = send_reply( user_id='me', to=details["sender_email"], sender='me', subject=details["subject"], reply_body=draft, thread_id=details["thread_id"], original_message_id=details["original_message_id"], references=details.get("references", ""), session_id=session_id ) if send_result["status"] == "success": response_text = f"Reply sent successfully! Message ID: {send_result['message_id']}" conversation_context["last_reply_draft"] = None # Clear draft after sending # Optionally clear last_email_details too? else: response_text = f"Error sending reply: {send_result.get('error_message', 'Unknown error')}" elif not draft: response_text = "There is no reply draft stored in context to send. Please generate one first." else: response_text = "I'm missing some details from the original email context (like sender, thread ID, or message ID) needed to send the reply. Please summarize the relevant email again." elif intent == "GET_UNREAD_COUNT": unread_result = get_total_unread_count(user_id='me', session_id=session_id) if unread_result["status"] == "success": response_text = f"You have {unread_result['unread_count']} unread emails in your inbox." else: response_text = f"Error getting unread count: {unread_result.get('error_message', 'Unknown error')}" elif intent == "GET_TODAY_EMAIL_COUNT": today_count_result = get_emails_received_today_count(user_id='me', session_id=session_id) if today_count_result["status"] == "success": response_text = f"You received approximately {today_count_result['today_count']} emails in the last 24 hours." else: response_text = f"Error counting today's emails: {today_count_result.get('error_message', 'Unknown error')}" elif intent == "GREETING/OTHER": response_text = "Hello! How can I help you with your Gmail today?" else: # Handles cases where intent is missing or unrecognized by the Python code response_text = f"Sorry, I received an unexpected intent ('{intent}') from the controller. I don't know how to handle that." except Exception as e: print(f"Error executing action for intent {intent}: {e}") # Log unexpected errors response_text = f"An unexpected error occurred while executing the action: {e}" # Update session state if session_state: session_state["conversation_context"] = conversation_context return response_text if __name__ == "__main__": with gr.Blocks( theme=gr.themes.Soft( font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"], ), title="Gmail AI Agent" ) as app: # Session state session_state = gr.State({}) gr.Markdown( """

📧 Gmail AI Agent

Your personal assistant for managing your Gmail. Each user can authenticate with their own Gmail account.

""" ) with gr.Tabs(): with gr.TabItem("🔐 Authentication"): gr.Markdown( """ ### Step 1: Get Authorization Code Click the button below to get an authorization URL, then follow these steps: 1. **Click "Get Authorization URL"** below 2. **Copy the URL** that appears 3. **Open the URL** in your browser 4. **Sign in** to your Gmail account 5. **Allow permissions** for the app 6. **Copy the authorization code** from the browser 7. **Paste the code** in the text box below and click "Authenticate" """ ) get_url_btn = gr.Button("🔗 Get Authorization URL", variant="primary") auth_url_display = gr.Textbox( label="Authorization URL", placeholder="Click 'Get Authorization URL' to generate the URL", interactive=False, lines=3 ) auth_code_input = gr.Textbox( label="Authorization Code", placeholder="Paste the authorization code here", lines=2 ) auth_btn = gr.Button("🔑 Authenticate", variant="primary") auth_status = gr.Textbox( label="Authentication Status", interactive=False, lines=2 ) def get_url(): try: auth_url, _ = get_auth_url() return auth_url except Exception as e: return f"Error generating URL: {e}" def handle_auth(auth_code, session_state): session_id, message = authenticate_user(auth_code) if session_id: session_state["session_id"] = session_id session_state["conversation_context"] = { "last_email_summary": None, "last_email_details": {}, "last_reply_draft": None, } return message, session_state, "" # Clear auth code on success else: return message, session_state, auth_code # Keep auth code on failure get_url_btn.click( fn=get_url, outputs=auth_url_display ) auth_btn.click( fn=handle_auth, inputs=[auth_code_input, session_state], outputs=[auth_status, session_state, auth_code_input] ) with gr.TabItem("💬 Chat"): gr.Markdown( """ **Here's what I can do once you're authenticated:** - **List emails**: e.g., 'Show my last 5 emails' - **Search emails**: e.g., 'Find emails from boss@company.com about the project report' - **Summarize emails**: e.g., 'Summarize the last email we discussed?' - **Draft & Send replies**: e.g., 'Draft a reply saying I will look into it', then 'Ok send it' - **Get counts**: e.g., 'How many unread emails do I have?' """ ) chatbot = gr.Chatbot( [], elem_id="chatbot", placeholder="Authenticate first, then start chatting about your Gmail!", height=600, type="tuples", ) def chat_fn(message, history, session_state): response = handle_chat(message, history, session_state) return response gr.ChatInterface( fn=chat_fn, chatbot=chatbot, additional_inputs=[session_state], examples=[ ["How many unread emails do I have?"], ["Show my last 5 emails"], ["Find emails from my manager"], ["How many emails did I get today?"], ], title=None, description=None, type="tuples", ) app.launch()