Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| from multi_tool_agent.gmail_agent_logic import ( | |
| get_gmail_service, | |
| search_emails, | |
| summarize_email_with_gemini, | |
| generate_reply_with_gemini, | |
| send_reply, | |
| get_total_unread_count, | |
| get_emails_received_today_count, | |
| list_recent_emails, | |
| get_auth_url, | |
| exchange_code_for_credentials, | |
| store_user_credentials, | |
| create_session_id, | |
| is_user_authenticated, | |
| create_oauth_flow | |
| ) | |
| import google.generativeai as genai | |
| from dotenv import load_dotenv | |
| import json | |
| load_dotenv() | |
| # --- Deployment: Write credentials from environment variables to files --- | |
| if 'GMAIL_CREDENTIALS_JSON' in os.environ: | |
| with open('credentials.json', 'w') as f: | |
| f.write(os.environ.get('GMAIL_CREDENTIALS_JSON')) | |
| print("Created credentials.json from environment variable.") | |
| if 'GMAIL_TOKEN_JSON' in os.environ: | |
| with open('token.json', 'w') as f: | |
| f.write(os.environ.get('GMAIL_TOKEN_JSON')) | |
| print("Created token.json from environment variable.") | |
| # --- End Deployment --- | |
| try: | |
| gemini_api_key = os.environ.get("GOOGLE_API_KEY") | |
| if not gemini_api_key: | |
| raise ValueError("GOOGLE_API_KEY not found in environment variables. Please set it in the .env file and restart.") | |
| genai.configure(api_key=gemini_api_key) | |
| gemini_model = genai.GenerativeModel('gemini-2.5-flash-lite') | |
| print("Gemini model initialized successfully for Gradio app.") | |
| except Exception as e: | |
| raise Exception(f"FATAL: Error initializing Gemini model: {e}") | |
| # --- LLM Prompt for Intent Recognition --- | |
| CONTROLLER_PROMPT_TEMPLATE = """ | |
| You are the controller for a Gmail assistant. Analyze the user's message and determine the primary intent and necessary parameters based on the conversation history. | |
| Available intents and their required parameters: | |
| - LIST_RECENT: requires optional 'count' (integer, default 5) to list the most recent emails in the inbox. | |
| - SEARCH: requires 'query' (e.g., "from:a@b.com subject:hello") | |
| - SUMMARIZE_BY_ID: requires 'email_id' | |
| - SUMMARIZE_LAST: requires context indicating a specific email (e.g., from a previous search or mention). Check context['last_email_details']['id']. | |
| - GENERATE_REPLY: requires 'reply_instructions' (what the user wants to say) and context from a previously summarized email (context['last_email_details'] required). | |
| - SEND_REPLY: requires confirmation (e.g., "yes", "send it") and context from a previously generated reply draft (context['last_reply_draft'] and context['last_email_details'] required). | |
| - GET_UNREAD_COUNT: No parameters required. | |
| - GET_TODAY_EMAIL_COUNT: No parameters required. | |
| - GREETING/OTHER: if the intent is unclear, a simple greeting, or doesn't match the capabilities. | |
| Conversation History: | |
| {history_string} | |
| Current User message: "{user_message}" | |
| Current Context (JSON): | |
| {context_json} | |
| Based ONLY on the **Current User message** and the **Current Context**, determine the single most likely intent and extract the parameters. | |
| Output your decision STRICTLY as a JSON object with 'intent' (string) and 'parameters' (dictionary) keys. If parameters are not applicable or derivable, use an empty dictionary {{}}. | |
| Example for "list my last 3 emails": {{"intent": "LIST_RECENT", "parameters": {{"count": 3}}}} | |
| Example for "search for emails from test@test.com": {{"intent": "SEARCH", "parameters": {{"query": "from:test@test.com"}}}} | |
| Example for "summarize email with id 123": {{"intent": "SUMMARIZE_BY_ID", "parameters": {{"email_id": "123"}}}} | |
| Example for "draft a reply saying thanks": {{"intent": "GENERATE_REPLY", "parameters": {{"reply_instructions": "saying thanks"}}}} | |
| Example for "yes send it": {{"intent": "SEND_REPLY", "parameters": {{}}}} | |
| Example for "how many unread emails do I have": {{"intent": "GET_UNREAD_COUNT", "parameters": {{}}}} | |
| Example for "how many emails today": {{"intent": "GET_TODAY_EMAIL_COUNT", "parameters": {{}}}} | |
| Example for "hello there": {{"intent": "GREETING/OTHER", "parameters": {{}}}} | |
| JSON Response: | |
| """ | |
| def authenticate_user(auth_code): | |
| """Handle user authentication with OAuth code.""" | |
| if not auth_code or not auth_code.strip(): | |
| return None, "Please enter the authorization code." | |
| try: | |
| flow = create_oauth_flow() | |
| auth_url, _ = flow.authorization_url(prompt='consent') | |
| # Exchange code for credentials | |
| credentials = exchange_code_for_credentials(auth_code.strip(), flow) | |
| if credentials: | |
| # Create session and store credentials | |
| session_id = create_session_id() | |
| if store_user_credentials(session_id, credentials): | |
| return session_id, "Authentication successful! You can now use Gmail features." | |
| else: | |
| return None, "Failed to store credentials. Please try again." | |
| else: | |
| return None, "Invalid authorization code. Please try again." | |
| except Exception as e: | |
| return None, f"Authentication error: {e}" | |
| # --- Chatbot Logic --- | |
| def handle_chat(message, history, session_state): | |
| """ | |
| Processes user message using an LLM controller, interacts with Gmail/Gemini tools. | |
| """ | |
| # Get session info | |
| session_id = session_state.get("session_id") if session_state else None | |
| conversation_context = session_state.get("conversation_context", { | |
| "last_email_summary": None, | |
| "last_email_details": {}, | |
| "last_reply_draft": None, | |
| }) if session_state else { | |
| "last_email_summary": None, | |
| "last_email_details": {}, | |
| "last_reply_draft": None, | |
| } | |
| # Check authentication | |
| if not session_id or not is_user_authenticated(session_id): | |
| return "π Please authenticate with Gmail first using the Authentication tab above." | |
| # Basic checks | |
| if not gemini_model: | |
| return "Error: Gemini model is not available. Check API key and configuration." | |
| # --- 1. Call LLM Controller --- | |
| history_string = "" | |
| if history: | |
| for h in history: | |
| history_string += f"User: {h[0]}\nAssistant: {h[1]}\n" | |
| context_json = json.dumps(conversation_context, indent=2) | |
| prompt = CONTROLLER_PROMPT_TEMPLATE.format( | |
| history_string=history_string, | |
| user_message=message, | |
| context_json=context_json | |
| ) | |
| try: | |
| print(f"--- Sending Controller Prompt ---\n{prompt}\n------------------------------") | |
| controller_response = gemini_model.generate_content(prompt) | |
| print(f"--- Controller Response ---\n{controller_response.text}\n--------------------------- ") | |
| cleaned_response_text = controller_response.text.strip().replace('```json', '').replace('```', '') | |
| decision = json.loads(cleaned_response_text) | |
| intent = decision.get("intent") | |
| parameters = decision.get("parameters", {}) | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding controller JSON: {e}\nResponse was: {controller_response.text}") | |
| return "Sorry, I had trouble understanding that request (JSON Decode Error)." | |
| except Exception as e: | |
| print(f"Error during controller LLM call: {e}") | |
| return f"Sorry, an error occurred while processing your request: {e}" | |
| # --- 2. Execute Action based on Intent --- | |
| try: | |
| if intent == "LIST_RECENT": | |
| count = parameters.get("count", 5) # Default to 5 if not specified | |
| try: | |
| count = int(count) | |
| except ValueError: | |
| count = 5 | |
| list_result = list_recent_emails(user_id='me', max_results=count, session_id=session_id) | |
| if list_result["status"] == "success" and list_result["emails"]: | |
| email_strings = [] | |
| for email in list_result["emails"]: | |
| email_str = ( | |
| f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline | |
| f"From: {email.get('from', 'N/A')}\n\n" # Double newline | |
| f"Date: {email.get('date', 'N/A')}" | |
| ) | |
| email_strings.append(email_str) | |
| response_text = f"Here are your last {len(email_strings)} emails:\n\n" + "\n\n---\n\n".join(email_strings) | |
| conversation_context["last_email_details"] = list_result["emails"][0] # Store first found | |
| conversation_context["last_reply_draft"] = None # Clear any old draft | |
| elif list_result["status"] == "success": | |
| response_text = "No emails found in your inbox." | |
| else: | |
| response_text = f"Error listing recent emails: {list_result.get('error_message', 'Unknown error')}" | |
| elif intent == "SEARCH": | |
| query = parameters.get("query") | |
| if not query: | |
| response_text = "My controller understood you want to search, but didn't find search criteria. Please specify (e.g., 'from:...' or 'subject:...')." | |
| else: | |
| search_result = search_emails(query=query, user_id='me', session_id=session_id) | |
| if search_result["status"] == "success" and search_result["emails"]: | |
| # Format emails | |
| email_strings = [] | |
| for email in search_result["emails"]: | |
| email_str = ( | |
| f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline | |
| f"From: {email.get('from', 'N/A')}\n\n" # Double newline | |
| f"Date: {email.get('date', 'N/A')}" | |
| ) | |
| email_strings.append(email_str) | |
| response_text = "Found emails:\n\n" + "\n\n---\n\n".join(email_strings) | |
| # Store the first result's ID for potential follow-up | |
| conversation_context["last_email_details"] = search_result["emails"][0] # Store first found | |
| conversation_context["last_reply_draft"] = None # Clear any old draft | |
| elif search_result["status"] == "success": | |
| response_text = "No emails found matching your query." | |
| else: | |
| response_text = f"Error searching emails: {search_result.get('error_message', 'Unknown error')}" | |
| elif intent == "SUMMARIZE_BY_ID": | |
| email_id = parameters.get("email_id") | |
| if email_id: | |
| summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id) | |
| if summary_result["status"] == "success": | |
| response_text = f"Summary:\n{summary_result['summary']}" | |
| conversation_context["last_email_summary"] = summary_result['summary'] | |
| conversation_context["last_email_details"] = summary_result # Store all details | |
| conversation_context["last_reply_draft"] = None # Clear any old draft | |
| else: | |
| response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}" | |
| else: | |
| response_text = "My controller understood you want to summarize by ID, but didn't find an ID. Please provide it." | |
| elif intent == "SUMMARIZE_LAST": | |
| email_id = conversation_context["last_email_details"].get("id") | |
| if email_id: | |
| summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id) | |
| if summary_result["status"] == "success": | |
| response_text = f"Summary of the last mentioned email (ID: {email_id}):\n{summary_result['summary']}" | |
| conversation_context["last_email_summary"] = summary_result['summary'] | |
| conversation_context["last_email_details"] = summary_result # Store all details | |
| conversation_context["last_reply_draft"] = None # Clear any old draft | |
| else: | |
| response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}" | |
| else: | |
| response_text = "I don't have a 'last email' in context to summarize. Please search for or specify an email first." | |
| elif intent == "GENERATE_REPLY": | |
| instructions = parameters.get("reply_instructions", "") | |
| details = conversation_context.get("last_email_details", {}) | |
| original_body = details.get("original_body") | |
| if original_body: | |
| # Combine original body with user instructions for the prompt | |
| generation_prompt_body = f"User wants reply to address: '{instructions}'\n\nOriginal Email Body:\n{original_body}" | |
| reply_result = generate_reply_with_gemini( | |
| original_subject=details.get("subject", "No Subject"), | |
| original_body=generation_prompt_body | |
| ) | |
| if reply_result["status"] == "success": | |
| response_text = f"Draft Reply:\n------\n{reply_result['reply_body']}\n------\n\nWould you like me to send this reply?" | |
| conversation_context["last_reply_draft"] = reply_result['reply_body'] # Store draft | |
| else: | |
| response_text = f"Error generating reply draft: {reply_result.get('error_message', 'Unknown error')}" | |
| else: | |
| response_text = "I need the context of an email (specifically its body) to generate a reply. Please summarize an email first." | |
| elif intent == "SEND_REPLY": | |
| details = conversation_context.get("last_email_details", {}) | |
| draft = conversation_context.get("last_reply_draft") | |
| if (draft and details.get("sender_email") and details.get("subject") and | |
| details.get("thread_id") and details.get("original_message_id")): | |
| send_result = send_reply( | |
| user_id='me', | |
| to=details["sender_email"], | |
| sender='me', | |
| subject=details["subject"], | |
| reply_body=draft, | |
| thread_id=details["thread_id"], | |
| original_message_id=details["original_message_id"], | |
| references=details.get("references", ""), | |
| session_id=session_id | |
| ) | |
| if send_result["status"] == "success": | |
| response_text = f"Reply sent successfully! Message ID: {send_result['message_id']}" | |
| conversation_context["last_reply_draft"] = None # Clear draft after sending | |
| # Optionally clear last_email_details too? | |
| else: | |
| response_text = f"Error sending reply: {send_result.get('error_message', 'Unknown error')}" | |
| elif not draft: | |
| response_text = "There is no reply draft stored in context to send. Please generate one first." | |
| else: | |
| response_text = "I'm missing some details from the original email context (like sender, thread ID, or message ID) needed to send the reply. Please summarize the relevant email again." | |
| elif intent == "GET_UNREAD_COUNT": | |
| unread_result = get_total_unread_count(user_id='me', session_id=session_id) | |
| if unread_result["status"] == "success": | |
| response_text = f"You have {unread_result['unread_count']} unread emails in your inbox." | |
| else: | |
| response_text = f"Error getting unread count: {unread_result.get('error_message', 'Unknown error')}" | |
| elif intent == "GET_TODAY_EMAIL_COUNT": | |
| today_count_result = get_emails_received_today_count(user_id='me', session_id=session_id) | |
| if today_count_result["status"] == "success": | |
| response_text = f"You received approximately {today_count_result['today_count']} emails in the last 24 hours." | |
| else: | |
| response_text = f"Error counting today's emails: {today_count_result.get('error_message', 'Unknown error')}" | |
| elif intent == "GREETING/OTHER": | |
| response_text = "Hello! How can I help you with your Gmail today?" | |
| else: # Handles cases where intent is missing or unrecognized by the Python code | |
| response_text = f"Sorry, I received an unexpected intent ('{intent}') from the controller. I don't know how to handle that." | |
| except Exception as e: | |
| print(f"Error executing action for intent {intent}: {e}") # Log unexpected errors | |
| response_text = f"An unexpected error occurred while executing the action: {e}" | |
| # Update session state | |
| if session_state: | |
| session_state["conversation_context"] = conversation_context | |
| return response_text | |
| if __name__ == "__main__": | |
| with gr.Blocks( | |
| theme=gr.themes.Soft( | |
| font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"], | |
| ), | |
| title="Gmail AI Agent" | |
| ) as app: | |
| # Session state | |
| session_state = gr.State({}) | |
| gr.Markdown( | |
| """ | |
| <div style="text-align: center;"> | |
| <h1>π§ Gmail AI Agent</h1> | |
| <p>Your personal assistant for managing your Gmail. Each user can authenticate with their own Gmail account.</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("π Authentication"): | |
| gr.Markdown( | |
| """ | |
| ### Step 1: Get Authorization Code | |
| Click the button below to get an authorization URL, then follow these steps: | |
| 1. **Click "Get Authorization URL"** below | |
| 2. **Copy the URL** that appears | |
| 3. **Open the URL** in your browser | |
| 4. **Sign in** to your Gmail account | |
| 5. **Allow permissions** for the app | |
| 6. **Copy the authorization code** from the browser | |
| 7. **Paste the code** in the text box below and click "Authenticate" | |
| """ | |
| ) | |
| get_url_btn = gr.Button("π Get Authorization URL", variant="primary") | |
| auth_url_display = gr.Textbox( | |
| label="Authorization URL", | |
| placeholder="Click 'Get Authorization URL' to generate the URL", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| auth_code_input = gr.Textbox( | |
| label="Authorization Code", | |
| placeholder="Paste the authorization code here", | |
| lines=2 | |
| ) | |
| auth_btn = gr.Button("π Authenticate", variant="primary") | |
| auth_status = gr.Textbox( | |
| label="Authentication Status", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| def get_url(): | |
| try: | |
| auth_url, _ = get_auth_url() | |
| return auth_url | |
| except Exception as e: | |
| return f"Error generating URL: {e}" | |
| def handle_auth(auth_code, session_state): | |
| session_id, message = authenticate_user(auth_code) | |
| if session_id: | |
| session_state["session_id"] = session_id | |
| session_state["conversation_context"] = { | |
| "last_email_summary": None, | |
| "last_email_details": {}, | |
| "last_reply_draft": None, | |
| } | |
| return message, session_state, "" # Clear auth code on success | |
| else: | |
| return message, session_state, auth_code # Keep auth code on failure | |
| get_url_btn.click( | |
| fn=get_url, | |
| outputs=auth_url_display | |
| ) | |
| auth_btn.click( | |
| fn=handle_auth, | |
| inputs=[auth_code_input, session_state], | |
| outputs=[auth_status, session_state, auth_code_input] | |
| ) | |
| with gr.TabItem("π¬ Chat"): | |
| gr.Markdown( | |
| """ | |
| **Here's what I can do once you're authenticated:** | |
| - **List emails**: e.g., 'Show my last 5 emails' | |
| - **Search emails**: e.g., 'Find emails from boss@company.com about the project report' | |
| - **Summarize emails**: e.g., 'Summarize the last email we discussed?' | |
| - **Draft & Send replies**: e.g., 'Draft a reply saying I will look into it', then 'Ok send it' | |
| - **Get counts**: e.g., 'How many unread emails do I have?' | |
| """ | |
| ) | |
| chatbot = gr.Chatbot( | |
| [], | |
| elem_id="chatbot", | |
| placeholder="Authenticate first, then start chatting about your Gmail!", | |
| height=600, | |
| type="tuples", | |
| ) | |
| def chat_fn(message, history, session_state): | |
| response = handle_chat(message, history, session_state) | |
| return response | |
| gr.ChatInterface( | |
| fn=chat_fn, | |
| chatbot=chatbot, | |
| additional_inputs=[session_state], | |
| examples=[ | |
| ["How many unread emails do I have?"], | |
| ["Show my last 5 emails"], | |
| ["Find emails from my manager"], | |
| ["How many emails did I get today?"], | |
| ], | |
| title=None, | |
| description=None, | |
| type="tuples", | |
| ) | |
| app.launch() |