emial_project / app.py
rayuga2503's picture
Upload 4 files
cc05d8d verified
import gradio as gr
import os
from multi_tool_agent.gmail_agent_logic import (
get_gmail_service,
search_emails,
summarize_email_with_gemini,
generate_reply_with_gemini,
send_reply,
get_total_unread_count,
get_emails_received_today_count,
list_recent_emails,
get_auth_url,
exchange_code_for_credentials,
store_user_credentials,
create_session_id,
is_user_authenticated,
create_oauth_flow
)
import google.generativeai as genai
from dotenv import load_dotenv
import json
load_dotenv()
# --- Deployment: Write credentials from environment variables to files ---
if 'GMAIL_CREDENTIALS_JSON' in os.environ:
with open('credentials.json', 'w') as f:
f.write(os.environ.get('GMAIL_CREDENTIALS_JSON'))
print("Created credentials.json from environment variable.")
if 'GMAIL_TOKEN_JSON' in os.environ:
with open('token.json', 'w') as f:
f.write(os.environ.get('GMAIL_TOKEN_JSON'))
print("Created token.json from environment variable.")
# --- End Deployment ---
try:
gemini_api_key = os.environ.get("GOOGLE_API_KEY")
if not gemini_api_key:
raise ValueError("GOOGLE_API_KEY not found in environment variables. Please set it in the .env file and restart.")
genai.configure(api_key=gemini_api_key)
gemini_model = genai.GenerativeModel('gemini-2.5-flash-lite')
print("Gemini model initialized successfully for Gradio app.")
except Exception as e:
raise Exception(f"FATAL: Error initializing Gemini model: {e}")
# --- LLM Prompt for Intent Recognition ---
CONTROLLER_PROMPT_TEMPLATE = """
You are the controller for a Gmail assistant. Analyze the user's message and determine the primary intent and necessary parameters based on the conversation history.
Available intents and their required parameters:
- LIST_RECENT: requires optional 'count' (integer, default 5) to list the most recent emails in the inbox.
- SEARCH: requires 'query' (e.g., "from:a@b.com subject:hello")
- SUMMARIZE_BY_ID: requires 'email_id'
- SUMMARIZE_LAST: requires context indicating a specific email (e.g., from a previous search or mention). Check context['last_email_details']['id'].
- GENERATE_REPLY: requires 'reply_instructions' (what the user wants to say) and context from a previously summarized email (context['last_email_details'] required).
- SEND_REPLY: requires confirmation (e.g., "yes", "send it") and context from a previously generated reply draft (context['last_reply_draft'] and context['last_email_details'] required).
- GET_UNREAD_COUNT: No parameters required.
- GET_TODAY_EMAIL_COUNT: No parameters required.
- GREETING/OTHER: if the intent is unclear, a simple greeting, or doesn't match the capabilities.
Conversation History:
{history_string}
Current User message: "{user_message}"
Current Context (JSON):
{context_json}
Based ONLY on the **Current User message** and the **Current Context**, determine the single most likely intent and extract the parameters.
Output your decision STRICTLY as a JSON object with 'intent' (string) and 'parameters' (dictionary) keys. If parameters are not applicable or derivable, use an empty dictionary {{}}.
Example for "list my last 3 emails": {{"intent": "LIST_RECENT", "parameters": {{"count": 3}}}}
Example for "search for emails from test@test.com": {{"intent": "SEARCH", "parameters": {{"query": "from:test@test.com"}}}}
Example for "summarize email with id 123": {{"intent": "SUMMARIZE_BY_ID", "parameters": {{"email_id": "123"}}}}
Example for "draft a reply saying thanks": {{"intent": "GENERATE_REPLY", "parameters": {{"reply_instructions": "saying thanks"}}}}
Example for "yes send it": {{"intent": "SEND_REPLY", "parameters": {{}}}}
Example for "how many unread emails do I have": {{"intent": "GET_UNREAD_COUNT", "parameters": {{}}}}
Example for "how many emails today": {{"intent": "GET_TODAY_EMAIL_COUNT", "parameters": {{}}}}
Example for "hello there": {{"intent": "GREETING/OTHER", "parameters": {{}}}}
JSON Response:
"""
def authenticate_user(auth_code):
"""Handle user authentication with OAuth code."""
if not auth_code or not auth_code.strip():
return None, "Please enter the authorization code."
try:
flow = create_oauth_flow()
auth_url, _ = flow.authorization_url(prompt='consent')
# Exchange code for credentials
credentials = exchange_code_for_credentials(auth_code.strip(), flow)
if credentials:
# Create session and store credentials
session_id = create_session_id()
if store_user_credentials(session_id, credentials):
return session_id, "Authentication successful! You can now use Gmail features."
else:
return None, "Failed to store credentials. Please try again."
else:
return None, "Invalid authorization code. Please try again."
except Exception as e:
return None, f"Authentication error: {e}"
# --- Chatbot Logic ---
def handle_chat(message, history, session_state):
"""
Processes user message using an LLM controller, interacts with Gmail/Gemini tools.
"""
# Get session info
session_id = session_state.get("session_id") if session_state else None
conversation_context = session_state.get("conversation_context", {
"last_email_summary": None,
"last_email_details": {},
"last_reply_draft": None,
}) if session_state else {
"last_email_summary": None,
"last_email_details": {},
"last_reply_draft": None,
}
# Check authentication
if not session_id or not is_user_authenticated(session_id):
return "πŸ” Please authenticate with Gmail first using the Authentication tab above."
# Basic checks
if not gemini_model:
return "Error: Gemini model is not available. Check API key and configuration."
# --- 1. Call LLM Controller ---
history_string = ""
if history:
for h in history:
history_string += f"User: {h[0]}\nAssistant: {h[1]}\n"
context_json = json.dumps(conversation_context, indent=2)
prompt = CONTROLLER_PROMPT_TEMPLATE.format(
history_string=history_string,
user_message=message,
context_json=context_json
)
try:
print(f"--- Sending Controller Prompt ---\n{prompt}\n------------------------------")
controller_response = gemini_model.generate_content(prompt)
print(f"--- Controller Response ---\n{controller_response.text}\n--------------------------- ")
cleaned_response_text = controller_response.text.strip().replace('```json', '').replace('```', '')
decision = json.loads(cleaned_response_text)
intent = decision.get("intent")
parameters = decision.get("parameters", {})
except json.JSONDecodeError as e:
print(f"Error decoding controller JSON: {e}\nResponse was: {controller_response.text}")
return "Sorry, I had trouble understanding that request (JSON Decode Error)."
except Exception as e:
print(f"Error during controller LLM call: {e}")
return f"Sorry, an error occurred while processing your request: {e}"
# --- 2. Execute Action based on Intent ---
try:
if intent == "LIST_RECENT":
count = parameters.get("count", 5) # Default to 5 if not specified
try:
count = int(count)
except ValueError:
count = 5
list_result = list_recent_emails(user_id='me', max_results=count, session_id=session_id)
if list_result["status"] == "success" and list_result["emails"]:
email_strings = []
for email in list_result["emails"]:
email_str = (
f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline
f"From: {email.get('from', 'N/A')}\n\n" # Double newline
f"Date: {email.get('date', 'N/A')}"
)
email_strings.append(email_str)
response_text = f"Here are your last {len(email_strings)} emails:\n\n" + "\n\n---\n\n".join(email_strings)
conversation_context["last_email_details"] = list_result["emails"][0] # Store first found
conversation_context["last_reply_draft"] = None # Clear any old draft
elif list_result["status"] == "success":
response_text = "No emails found in your inbox."
else:
response_text = f"Error listing recent emails: {list_result.get('error_message', 'Unknown error')}"
elif intent == "SEARCH":
query = parameters.get("query")
if not query:
response_text = "My controller understood you want to search, but didn't find search criteria. Please specify (e.g., 'from:...' or 'subject:...')."
else:
search_result = search_emails(query=query, user_id='me', session_id=session_id)
if search_result["status"] == "success" and search_result["emails"]:
# Format emails
email_strings = []
for email in search_result["emails"]:
email_str = (
f"Subject: {email.get('subject', 'N/A')}\n\n" # Double newline
f"From: {email.get('from', 'N/A')}\n\n" # Double newline
f"Date: {email.get('date', 'N/A')}"
)
email_strings.append(email_str)
response_text = "Found emails:\n\n" + "\n\n---\n\n".join(email_strings)
# Store the first result's ID for potential follow-up
conversation_context["last_email_details"] = search_result["emails"][0] # Store first found
conversation_context["last_reply_draft"] = None # Clear any old draft
elif search_result["status"] == "success":
response_text = "No emails found matching your query."
else:
response_text = f"Error searching emails: {search_result.get('error_message', 'Unknown error')}"
elif intent == "SUMMARIZE_BY_ID":
email_id = parameters.get("email_id")
if email_id:
summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id)
if summary_result["status"] == "success":
response_text = f"Summary:\n{summary_result['summary']}"
conversation_context["last_email_summary"] = summary_result['summary']
conversation_context["last_email_details"] = summary_result # Store all details
conversation_context["last_reply_draft"] = None # Clear any old draft
else:
response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}"
else:
response_text = "My controller understood you want to summarize by ID, but didn't find an ID. Please provide it."
elif intent == "SUMMARIZE_LAST":
email_id = conversation_context["last_email_details"].get("id")
if email_id:
summary_result = summarize_email_with_gemini(user_id='me', email_id=email_id, session_id=session_id)
if summary_result["status"] == "success":
response_text = f"Summary of the last mentioned email (ID: {email_id}):\n{summary_result['summary']}"
conversation_context["last_email_summary"] = summary_result['summary']
conversation_context["last_email_details"] = summary_result # Store all details
conversation_context["last_reply_draft"] = None # Clear any old draft
else:
response_text = f"Error summarizing email {email_id}: {summary_result.get('error_message', 'Unknown error')}"
else:
response_text = "I don't have a 'last email' in context to summarize. Please search for or specify an email first."
elif intent == "GENERATE_REPLY":
instructions = parameters.get("reply_instructions", "")
details = conversation_context.get("last_email_details", {})
original_body = details.get("original_body")
if original_body:
# Combine original body with user instructions for the prompt
generation_prompt_body = f"User wants reply to address: '{instructions}'\n\nOriginal Email Body:\n{original_body}"
reply_result = generate_reply_with_gemini(
original_subject=details.get("subject", "No Subject"),
original_body=generation_prompt_body
)
if reply_result["status"] == "success":
response_text = f"Draft Reply:\n------\n{reply_result['reply_body']}\n------\n\nWould you like me to send this reply?"
conversation_context["last_reply_draft"] = reply_result['reply_body'] # Store draft
else:
response_text = f"Error generating reply draft: {reply_result.get('error_message', 'Unknown error')}"
else:
response_text = "I need the context of an email (specifically its body) to generate a reply. Please summarize an email first."
elif intent == "SEND_REPLY":
details = conversation_context.get("last_email_details", {})
draft = conversation_context.get("last_reply_draft")
if (draft and details.get("sender_email") and details.get("subject") and
details.get("thread_id") and details.get("original_message_id")):
send_result = send_reply(
user_id='me',
to=details["sender_email"],
sender='me',
subject=details["subject"],
reply_body=draft,
thread_id=details["thread_id"],
original_message_id=details["original_message_id"],
references=details.get("references", ""),
session_id=session_id
)
if send_result["status"] == "success":
response_text = f"Reply sent successfully! Message ID: {send_result['message_id']}"
conversation_context["last_reply_draft"] = None # Clear draft after sending
# Optionally clear last_email_details too?
else:
response_text = f"Error sending reply: {send_result.get('error_message', 'Unknown error')}"
elif not draft:
response_text = "There is no reply draft stored in context to send. Please generate one first."
else:
response_text = "I'm missing some details from the original email context (like sender, thread ID, or message ID) needed to send the reply. Please summarize the relevant email again."
elif intent == "GET_UNREAD_COUNT":
unread_result = get_total_unread_count(user_id='me', session_id=session_id)
if unread_result["status"] == "success":
response_text = f"You have {unread_result['unread_count']} unread emails in your inbox."
else:
response_text = f"Error getting unread count: {unread_result.get('error_message', 'Unknown error')}"
elif intent == "GET_TODAY_EMAIL_COUNT":
today_count_result = get_emails_received_today_count(user_id='me', session_id=session_id)
if today_count_result["status"] == "success":
response_text = f"You received approximately {today_count_result['today_count']} emails in the last 24 hours."
else:
response_text = f"Error counting today's emails: {today_count_result.get('error_message', 'Unknown error')}"
elif intent == "GREETING/OTHER":
response_text = "Hello! How can I help you with your Gmail today?"
else: # Handles cases where intent is missing or unrecognized by the Python code
response_text = f"Sorry, I received an unexpected intent ('{intent}') from the controller. I don't know how to handle that."
except Exception as e:
print(f"Error executing action for intent {intent}: {e}") # Log unexpected errors
response_text = f"An unexpected error occurred while executing the action: {e}"
# Update session state
if session_state:
session_state["conversation_context"] = conversation_context
return response_text
if __name__ == "__main__":
with gr.Blocks(
theme=gr.themes.Soft(
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"],
),
title="Gmail AI Agent"
) as app:
# Session state
session_state = gr.State({})
gr.Markdown(
"""
<div style="text-align: center;">
<h1>πŸ“§ Gmail AI Agent</h1>
<p>Your personal assistant for managing your Gmail. Each user can authenticate with their own Gmail account.</p>
</div>
"""
)
with gr.Tabs():
with gr.TabItem("πŸ” Authentication"):
gr.Markdown(
"""
### Step 1: Get Authorization Code
Click the button below to get an authorization URL, then follow these steps:
1. **Click "Get Authorization URL"** below
2. **Copy the URL** that appears
3. **Open the URL** in your browser
4. **Sign in** to your Gmail account
5. **Allow permissions** for the app
6. **Copy the authorization code** from the browser
7. **Paste the code** in the text box below and click "Authenticate"
"""
)
get_url_btn = gr.Button("πŸ”— Get Authorization URL", variant="primary")
auth_url_display = gr.Textbox(
label="Authorization URL",
placeholder="Click 'Get Authorization URL' to generate the URL",
interactive=False,
lines=3
)
auth_code_input = gr.Textbox(
label="Authorization Code",
placeholder="Paste the authorization code here",
lines=2
)
auth_btn = gr.Button("πŸ”‘ Authenticate", variant="primary")
auth_status = gr.Textbox(
label="Authentication Status",
interactive=False,
lines=2
)
def get_url():
try:
auth_url, _ = get_auth_url()
return auth_url
except Exception as e:
return f"Error generating URL: {e}"
def handle_auth(auth_code, session_state):
session_id, message = authenticate_user(auth_code)
if session_id:
session_state["session_id"] = session_id
session_state["conversation_context"] = {
"last_email_summary": None,
"last_email_details": {},
"last_reply_draft": None,
}
return message, session_state, "" # Clear auth code on success
else:
return message, session_state, auth_code # Keep auth code on failure
get_url_btn.click(
fn=get_url,
outputs=auth_url_display
)
auth_btn.click(
fn=handle_auth,
inputs=[auth_code_input, session_state],
outputs=[auth_status, session_state, auth_code_input]
)
with gr.TabItem("πŸ’¬ Chat"):
gr.Markdown(
"""
**Here's what I can do once you're authenticated:**
- **List emails**: e.g., 'Show my last 5 emails'
- **Search emails**: e.g., 'Find emails from boss@company.com about the project report'
- **Summarize emails**: e.g., 'Summarize the last email we discussed?'
- **Draft & Send replies**: e.g., 'Draft a reply saying I will look into it', then 'Ok send it'
- **Get counts**: e.g., 'How many unread emails do I have?'
"""
)
chatbot = gr.Chatbot(
[],
elem_id="chatbot",
placeholder="Authenticate first, then start chatting about your Gmail!",
height=600,
type="tuples",
)
def chat_fn(message, history, session_state):
response = handle_chat(message, history, session_state)
return response
gr.ChatInterface(
fn=chat_fn,
chatbot=chatbot,
additional_inputs=[session_state],
examples=[
["How many unread emails do I have?"],
["Show my last 5 emails"],
["Find emails from my manager"],
["How many emails did I get today?"],
],
title=None,
description=None,
type="tuples",
)
app.launch()