import gradio as gr import os from multi_tool_agent.gmail_agent_logic import ( get_gmail_service, search_emails, summarize_email_with_gemini, generate_reply_with_gemini, send_reply, get_total_unread_count, get_emails_received_today_count, list_recent_emails, get_auth_url, exchange_code_for_credentials, store_user_credentials, create_session_id, is_user_authenticated, create_oauth_flow ) import google.generativeai as genai from dotenv import load_dotenv import json load_dotenv() # --- Deployment: Write credentials from environment variables to files --- if 'GMAIL_CREDENTIALS_JSON' in os.environ: with open('credentials.json', 'w') as f: f.write(os.environ.get('GMAIL_CREDENTIALS_JSON')) print("Created credentials.json from environment variable.") if 'GMAIL_TOKEN_JSON' in os.environ: with open('token.json', 'w') as f: f.write(os.environ.get('GMAIL_TOKEN_JSON')) print("Created token.json from environment variable.") # --- End Deployment --- try: gemini_api_key = os.environ.get("GOOGLE_API_KEY") if not gemini_api_key: raise ValueError("GOOGLE_API_KEY not found in environment variables. Please set it in the .env file and restart.") genai.configure(api_key=gemini_api_key) gemini_model = genai.GenerativeModel('gemini-2.5-flash-lite') print("Gemini model initialized successfully for Gradio app.") except Exception as e: raise Exception(f"FATAL: Error initializing Gemini model: {e}") # --- LLM Prompt for Intent Recognition --- CONTROLLER_PROMPT_TEMPLATE = """ You are the controller for a Gmail assistant. Analyze the user's message and determine the primary intent and necessary parameters based on the conversation history. Available intents and their required parameters: - LIST_RECENT: requires optional 'count' (integer, default 5) to list the most recent emails in the inbox. - SEARCH: requires 'query' (e.g., "from:a@b.com subject:hello") - SUMMARIZE_BY_ID: requires 'email_id' - SUMMARIZE_LAST: requires context indicating a specific email (e.g., from a previous search or mention). Check context['last_email_details']['id']. - GENERATE_REPLY: requires 'reply_instructions' (what the user wants to say) and context from a previously summarized email (context['last_email_details'] required). - SEND_REPLY: requires confirmation (e.g., "yes", "send it") and context from a previously generated reply draft (context['last_reply_draft'] and context['last_email_details'] required). - GET_UNREAD_COUNT: No parameters required. - GET_TODAY_EMAIL_COUNT: No parameters required. - GREETING/OTHER: if the intent is unclear, a simple greeting, or doesn't match the capabilities. Conversation History: {history_string} Current User message: "{user_message}" Current Context (JSON): {context_json} Based ONLY on the **Current User message** and the **Current Context**, determine the single most likely intent and extract the parameters. Output your decision STRICTLY as a JSON object with 'intent' (string) and 'parameters' (dictionary) keys. If parameters are not applicable or derivable, use an empty dictionary {{}}. Example for "list my last 3 emails": {{"intent": "LIST_RECENT", "parameters": {{"count": 3}}}} Example for "search for emails from test@test.com": {{"intent": "SEARCH", "parameters": {{"query": "from:test@test.com"}}}} Example for "summarize email with id 123": {{"intent": "SUMMARIZE_BY_ID", "parameters": {{"email_id": "123"}}}} Example for "draft a reply saying thanks": {{"intent": "GENERATE_REPLY", "parameters": {{"reply_instructions": "saying thanks"}}}} Example for "yes send it": {{"intent": "SEND_REPLY", "parameters": {{}}}} Example for "how many unread emails do I have": {{"intent": "GET_UNREAD_COUNT", "parameters": {{}}}} Example for "how many emails today": {{"intent": "GET_TODAY_EMAIL_COUNT", "parameters": {{}}}} Example for "hello there": {{"intent": "GREETING/OTHER", "parameters": {{}}}} JSON Response: """ def authenticate_user(auth_code): """Handle user authentication with OAuth code.""" if not auth_code or not auth_code.strip(): return None, "Please enter the authorization code." try: flow = create_oauth_flow() auth_url, _ = flow.authorization_url(prompt='consent') # Exchange code for credentials credentials = exchange_code_for_credentials(auth_code.strip(), flow) if credentials: # Create session and store credentials session_id = create_session_id() if store_user_credentials(session_id, credentials): return session_id, "Authentication successful! You can now use Gmail features." else: return None, "Failed to store credentials. Please try again." else: return None, "Invalid authorization code. Please try again." except Exception as e: return None, f"Authentication error: {e}" # --- Chatbot Logic --- def handle_chat(message, history, session_state): """ Processes user message using an LLM controller, interacts with Gmail/Gemini tools. """ # Get session info session_id = session_state.get("session_id") if session_state else None conversation_context = session_state.get("conversation_context", { "last_email_summary": None, "last_email_details": {}, "last_reply_draft": None, }) if session_state else { "last_email_summary": None, "last_email_details": {}, "last_reply_draft": None, } # Check authentication if not session_id or not is_user_authenticated(session_id): return "🔐 Please authenticate with Gmail first using the Authentication tab above." # Basic checks if not gemini_model: return "Error: Gemini model is not available. Check API key and configuration." # --- 1. Call LLM Controller --- history_string = "" if history: for h in history: history_string += f"User: {h[0]}\nAssistant: {h[1]}\n" context_json = json.dumps(conversation_context, indent=2) prompt = CONTROLLER_PROMPT_TEMPLATE.format( history_string=history_string, user_message=message, context_json=context_json ) try: print(f"--- Sending Controller Prompt ---\n{prompt}\n------------------------------") controller_response = gemini_model.generate_content(prompt) print(f"--- Controller Response ---\n{controller_response.text}\n--------------------------- ") cleaned_response_text = controller_response.text.strip().replace('```json', '').replace('```', '') decision = json.loads(cleaned_response_text) intent = decision.get("intent") parameters = decision.get("parameters", {}) except json.JSONDecodeError as e: print(f"Error decoding controller JSON: {e}\nResponse was: {controller_response.text}") return "Sorry, I had trouble understanding that request (JSON Decode Error)." except Exception as e: print(f"Error during controller LLM call: {e}") return f"Sorry, an error occurred while processing your request: {e}" # --- 2. Execute Action based on Intent --- try: if intent == "LIST_RECENT": count = parameters.get("count", 5) # Default to 5 if not specified try: count = int(count) except ValueError: count = 5 list_result = list_recent_emails(user_id='me', max_results=count, session_id=session_id) if list_result["status"] == "success" and list_result["emails"]: email_strings = [] for email in list_result["emails"]: email_str = ( f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline f"From: {email.get('from', 'N/A')}\n\n" # Double newline f"Date: {email.get('date', 'N/A')}" ) email_strings.append(email_str) response_text = f"Here are your last {len(email_strings)} emails:\n\n" + "\n\n---\n\n".join(email_strings) conversation_context["last_email_details"] = list_result["emails"][0] # Store first found conversation_context["last_reply_draft"] = None # Clear any old draft elif list_result["status"] == "success": response_text = "No emails found in your inbox." else: response_text = f"Error listing recent emails: {list_result.get('error_message', 'Unknown error')}" elif intent == "SEARCH": query = parameters.get("query") if not query: response_text = "My controller understood you want to search, but didn't find search criteria. Please specify (e.g., 'from:...' or 'subject:...')." else: search_result = search_emails(query=query, user_id='me', session_id=session_id) if search_result["status"] == "success" and search_result["emails"]: # Format emails email_strings = [] for email in search_result["emails"]: email_str = ( f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline f"From: {email.get('from', 'N/A')}\n\n" # Double newline f"Date: {email.get('date', 'N/A')}" ) email_strings.append(email_str) response_text = "Found emails:\n\n" + "\n\n---\n\n".join(email_strings) # Store the first result's ID for potential follow-up conversation_context["last_email_details"] = search_result["emails"][0] # Store first found conversation_context["last_reply_draft"] = None # Clear any old draft elif search_result["status"] == "success": response_text = "No emails found matching your query." else: response_text = f"Error searching emails: {search_result.get('error_message', 'Unknown error')}" elif intent == "SUMMARIZE_BY_ID": email_id = parameters.get("email_id") if email_id: summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id) if summary_result["status"] == "success": response_text = f"Summary:\n{summary_result['summary']}" conversation_context["last_email_summary"] = summary_result['summary'] conversation_context["last_email_details"] = summary_result # Store all details conversation_context["last_reply_draft"] = None # Clear any old draft else: response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}" else: response_text = "My controller understood you want to summarize by ID, but didn't find an ID. Please provide it." elif intent == "SUMMARIZE_LAST": email_id = conversation_context["last_email_details"].get("id") if email_id: summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id) if summary_result["status"] == "success": response_text = f"Summary of the last mentioned email (ID: {email_id}):\n{summary_result['summary']}" conversation_context["last_email_summary"] = summary_result['summary'] conversation_context["last_email_details"] = summary_result # Store all details conversation_context["last_reply_draft"] = None # Clear any old draft else: response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}" else: response_text = "I don't have a 'last email' in context to summarize. Please search for or specify an email first." elif intent == "GENERATE_REPLY": instructions = parameters.get("reply_instructions", "") details = conversation_context.get("last_email_details", {}) original_body = details.get("original_body") if original_body: # Combine original body with user instructions for the prompt generation_prompt_body = f"User wants reply to address: '{instructions}'\n\nOriginal Email Body:\n{original_body}" reply_result = generate_reply_with_gemini( original_subject=details.get("subject", "No Subject"), original_body=generation_prompt_body ) if reply_result["status"] == "success": response_text = f"Draft Reply:\n------\n{reply_result['reply_body']}\n------\n\nWould you like me to send this reply?" conversation_context["last_reply_draft"] = reply_result['reply_body'] # Store draft else: response_text = f"Error generating reply draft: {reply_result.get('error_message', 'Unknown error')}" else: response_text = "I need the context of an email (specifically its body) to generate a reply. Please summarize an email first." elif intent == "SEND_REPLY": details = conversation_context.get("last_email_details", {}) draft = conversation_context.get("last_reply_draft") if (draft and details.get("sender_email") and details.get("subject") and details.get("thread_id") and details.get("original_message_id")): send_result = send_reply( user_id='me', to=details["sender_email"], sender='me', subject=details["subject"], reply_body=draft, thread_id=details["thread_id"], original_message_id=details["original_message_id"], references=details.get("references", ""), session_id=session_id ) if send_result["status"] == "success": response_text = f"Reply sent successfully! Message ID: {send_result['message_id']}" conversation_context["last_reply_draft"] = None # Clear draft after sending # Optionally clear last_email_details too? else: response_text = f"Error sending reply: {send_result.get('error_message', 'Unknown error')}" elif not draft: response_text = "There is no reply draft stored in context to send. Please generate one first." else: response_text = "I'm missing some details from the original email context (like sender, thread ID, or message ID) needed to send the reply. Please summarize the relevant email again." elif intent == "GET_UNREAD_COUNT": unread_result = get_total_unread_count(user_id='me', session_id=session_id) if unread_result["status"] == "success": response_text = f"You have {unread_result['unread_count']} unread emails in your inbox." else: response_text = f"Error getting unread count: {unread_result.get('error_message', 'Unknown error')}" elif intent == "GET_TODAY_EMAIL_COUNT": today_count_result = get_emails_received_today_count(user_id='me', session_id=session_id) if today_count_result["status"] == "success": response_text = f"You received approximately {today_count_result['today_count']} emails in the last 24 hours." else: response_text = f"Error counting today's emails: {today_count_result.get('error_message', 'Unknown error')}" elif intent == "GREETING/OTHER": response_text = "Hello! How can I help you with your Gmail today?" else: # Handles cases where intent is missing or unrecognized by the Python code response_text = f"Sorry, I received an unexpected intent ('{intent}') from the controller. I don't know how to handle that." except Exception as e: print(f"Error executing action for intent {intent}: {e}") # Log unexpected errors response_text = f"An unexpected error occurred while executing the action: {e}" # Update session state if session_state: session_state["conversation_context"] = conversation_context return response_text if __name__ == "__main__": with gr.Blocks( theme=gr.themes.Soft( font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"], ), title="Gmail AI Agent" ) as app: # Session state session_state = gr.State({}) gr.Markdown( """
Your personal assistant for managing your Gmail. Each user can authenticate with their own Gmail account.