Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Gmail MCP Server with OAuth Authentication and Multi-Account Support | |
| """ | |
| import gradio as gr | |
| import json | |
| import base64 | |
| from email.mime.text import MIMEText | |
| from googleapiclient.errors import HttpError | |
| import os | |
| from typing import Dict, List | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| # Import OAuth-enabled modules | |
| # from tools import extract_query_info, analyze_emails | |
| from gmail_api_scraper import GmailAPIScraper | |
| from oauth_manager import oauth_manager | |
| from logger import logger | |
| load_dotenv() | |
| # Initialize Gmail API scraper | |
| gmail_scraper = GmailAPIScraper() | |
| def check_authentication() -> tuple[bool, str]: | |
| """Check if user is authenticated and return status""" | |
| current_account = oauth_manager.get_current_account() | |
| if current_account and oauth_manager.is_authenticated(): | |
| return True, current_account | |
| else: | |
| return False, "Not authenticated" | |
| def simple_analyze_emails(emails) -> dict: | |
| """ | |
| Simple email analysis without OpenAI - just basic statistics and patterns | |
| """ | |
| if not emails: | |
| return {"summary": "No emails to analyze.", "insights": []} | |
| # Basic statistics | |
| total_count = len(emails) | |
| # Group by sender | |
| senders = {} | |
| subjects = [] | |
| dates = [] | |
| for email in emails: | |
| sender = email.get("from", "Unknown") | |
| # Extract just the email domain for grouping | |
| if "<" in sender and ">" in sender: | |
| email_part = sender.split("<")[1].split(">")[0] | |
| else: | |
| email_part = sender | |
| domain = email_part.split("@")[-1] if "@" in email_part else sender | |
| senders[domain] = senders.get(domain, 0) + 1 | |
| subjects.append(email.get("subject", "")) | |
| dates.append(email.get("date", "")) | |
| # Create insights | |
| insights = [] | |
| insights.append(f"Found {total_count} emails total") | |
| if senders: | |
| top_sender = max(senders.items(), key=lambda x: x[1]) | |
| insights.append(f"Most emails from: {top_sender[0]} ({top_sender[1]} emails)") | |
| if len(senders) > 1: | |
| insights.append(f"Emails from {len(senders)} different domains") | |
| # Date range | |
| if dates: | |
| unique_dates = list(set(dates)) | |
| if len(unique_dates) > 1: | |
| insights.append(f"Spanning {len(unique_dates)} different days") | |
| # Subject analysis | |
| if subjects: | |
| # Count common words in subjects (simple approach) | |
| all_words = [] | |
| for subject in subjects: | |
| words = subject.lower().split() | |
| all_words.extend([w for w in words if len(w) > 3]) # Only words longer than 3 chars | |
| if all_words: | |
| word_counts = {} | |
| for word in all_words: | |
| word_counts[word] = word_counts.get(word, 0) + 1 | |
| if word_counts: | |
| common_word = max(word_counts.items(), key=lambda x: x[1]) | |
| if common_word[1] > 1: | |
| insights.append(f"Common subject word: '{common_word[0]}' appears {common_word[1]} times") | |
| summary = f"Analysis of {total_count} emails from {len(senders)} sender(s)" | |
| return { | |
| "summary": summary, | |
| "insights": insights | |
| } | |
| def authenticate_user() -> str: | |
| """ | |
| Start OAuth authentication flow for Gmail access. | |
| Opens a browser window for user to authenticate with Google. | |
| Returns: | |
| str: JSON string containing authentication result | |
| """ | |
| try: | |
| logger.info("Starting OAuth authentication flow...") | |
| # Check if OAuth is configured | |
| if not oauth_manager.client_secrets_file.exists(): | |
| return json.dumps({ | |
| "error": "OAuth not configured", | |
| "message": "Please run 'python setup_oauth.py' first to configure OAuth credentials.", | |
| "success": False | |
| }, indent=2) | |
| # Start authentication | |
| success = oauth_manager.authenticate_interactive() | |
| if success: | |
| user_email = oauth_manager.get_current_account() | |
| result = { | |
| "success": True, | |
| "message": "Authentication successful! You can now use the email tools.", | |
| "user_email": user_email, | |
| "instructions": [ | |
| "Authentication completed successfully", | |
| "You can now search emails, get email details, and analyze patterns", | |
| f"Currently authenticated as: {user_email}" | |
| ] | |
| } | |
| else: | |
| result = { | |
| "success": False, | |
| "error": "Authentication failed", | |
| "message": "Please try again or check your internet connection.", | |
| "instructions": [ | |
| "Make sure you have internet connection", | |
| "Ensure you complete the authentication in the browser", | |
| "Try running 'python setup_oauth.py' if problems persist" | |
| ] | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in authenticate_user: %s", e) | |
| error_result = { | |
| "success": False, | |
| "error": str(e), | |
| "message": "Authentication failed due to an error." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def switch_account(target_email: str) -> str: | |
| """ | |
| Switch to a different authenticated Gmail account. | |
| Args: | |
| target_email (str): Email address to switch to | |
| Returns: | |
| str: JSON string containing switch result | |
| """ | |
| try: | |
| logger.info("Switching to account: %s", target_email) | |
| # Check if target account is authenticated | |
| if not oauth_manager.is_authenticated(target_email): | |
| return json.dumps({ | |
| "error": "Account not authenticated", | |
| "message": f"Account '{target_email}' is not authenticated. Please authenticate first.", | |
| "target_email": target_email, | |
| "authenticated_accounts": list(oauth_manager.list_accounts().keys()) | |
| }, indent=2) | |
| # Switch account | |
| success = oauth_manager.switch_account(target_email) | |
| if success: | |
| result = { | |
| "success": True, | |
| "message": f"Successfully switched to account: {target_email}", | |
| "current_account": oauth_manager.get_current_account(), | |
| "previous_account": None # Could track this if needed | |
| } | |
| else: | |
| result = { | |
| "success": False, | |
| "error": "Failed to switch account", | |
| "message": f"Could not switch to account: {target_email}", | |
| "current_account": oauth_manager.get_current_account() | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error switching account: %s", e) | |
| error_result = { | |
| "success": False, | |
| "error": str(e), | |
| "message": f"Failed to switch to account: {target_email}" | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def list_accounts() -> str: | |
| """ | |
| List all authenticated Gmail accounts and their status. | |
| Returns: | |
| str: JSON string containing all accounts and their authentication status | |
| """ | |
| try: | |
| logger.info("Listing all accounts") | |
| accounts = oauth_manager.list_accounts() | |
| current_account = oauth_manager.get_current_account() | |
| result = { | |
| "accounts": accounts, | |
| "current_account": current_account, | |
| "total_accounts": len(accounts), | |
| "authenticated_accounts": [email for email, is_auth in accounts.items() if is_auth], | |
| "message": f"Found {len(accounts)} stored accounts, currently using: {current_account or 'None'}" | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error listing accounts: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "message": "Failed to list accounts" | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def remove_account(email_to_remove: str) -> str: | |
| """ | |
| Remove an authenticated Gmail account and its stored credentials. | |
| Args: | |
| email_to_remove (str): Email address to remove | |
| Returns: | |
| str: JSON string containing removal result | |
| """ | |
| try: | |
| logger.info("Removing account: %s", email_to_remove) | |
| # Check if account exists | |
| accounts = oauth_manager.list_accounts() | |
| if email_to_remove not in accounts: | |
| return json.dumps({ | |
| "error": "Account not found", | |
| "message": f"Account '{email_to_remove}' not found in stored accounts.", | |
| "available_accounts": list(accounts.keys()) | |
| }, indent=2) | |
| # Remove account | |
| oauth_manager.remove_account(email_to_remove) | |
| result = { | |
| "success": True, | |
| "message": f"Successfully removed account: {email_to_remove}", | |
| "removed_account": email_to_remove, | |
| "current_account": oauth_manager.get_current_account(), | |
| "remaining_accounts": list(oauth_manager.list_accounts().keys()) | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error removing account: %s", e) | |
| error_result = { | |
| "success": False, | |
| "error": str(e), | |
| "message": f"Failed to remove account: {email_to_remove}" | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def search_emails(sender_keyword: str, start_date: str = "", end_date: str = "") -> str: | |
| """ | |
| Search for emails from a specific sender within a date range using OAuth authentication. | |
| Args: | |
| sender_keyword (str): The sender/company keyword to search for (e.g., "apple", "amazon") | |
| start_date (str): Start date in DD-MMM-YYYY format (e.g., "01-Jan-2025"). If empty, defaults to 7 days ago. | |
| end_date (str): End date in DD-MMM-YYYY format (e.g., "07-Jan-2025"). If empty, defaults to today. | |
| Returns: | |
| str: JSON string containing email search results and analysis | |
| """ | |
| try: | |
| logger.info("OAuth Email search tool called with sender: %s, dates: %s to %s", sender_keyword, start_date, end_date) | |
| # Check authentication | |
| is_auth, auth_info = check_authentication() | |
| if not is_auth: | |
| return json.dumps({ | |
| "error": "Not authenticated", | |
| "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
| "auth_status": auth_info | |
| }, indent=2) | |
| # Set default date range if not provided | |
| if not start_date or not end_date: | |
| today = datetime.today() | |
| if not end_date: | |
| end_date = today.strftime("%d-%b-%Y") | |
| if not start_date: | |
| start_date = (today - timedelta(days=7)).strftime("%d-%b-%Y") | |
| logger.info(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}") | |
| # Use Gmail API scraper with OAuth | |
| full_emails = gmail_scraper.search_emails(sender_keyword, start_date, end_date) | |
| if not full_emails: | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date} to {end_date}", | |
| "email_summary": [], | |
| "analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []}, | |
| "email_count": 0, | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| # Create summary version without full content | |
| email_summary = [] | |
| for email in full_emails: | |
| summary_email = { | |
| "date": email.get("date"), | |
| "time": email.get("time"), | |
| "subject": email.get("subject"), | |
| "from": email.get("from", "Unknown Sender"), | |
| "message_id": email.get("message_id"), | |
| "gmail_id": email.get("gmail_id") | |
| } | |
| email_summary.append(summary_email) | |
| # Auto-analyze the emails for insights (no OpenAI) | |
| analysis = simple_analyze_emails(full_emails) | |
| # Return summary info with analysis | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date} to {end_date}", | |
| "email_summary": email_summary, | |
| "analysis": analysis, | |
| "email_count": len(full_emails), | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in search_emails: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "sender_keyword": sender_keyword, | |
| "message": "Failed to search emails." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def get_email_details(message_id: str) -> str: | |
| """ | |
| Get full details of a specific email by its message ID using OAuth authentication. | |
| Args: | |
| message_id (str): The message ID of the email to retrieve | |
| Returns: | |
| str: JSON string containing the full email details | |
| """ | |
| try: | |
| logger.info("Getting email details for message_id: %s", message_id) | |
| # Check authentication | |
| is_auth, auth_info = check_authentication() | |
| if not is_auth: | |
| return json.dumps({ | |
| "error": "Not authenticated", | |
| "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
| "auth_status": auth_info | |
| }, indent=2) | |
| # Get email using Gmail API | |
| email = gmail_scraper.get_email_by_id(message_id) | |
| if email: | |
| email["user_email"] = auth_info | |
| return json.dumps(email, indent=2) | |
| else: | |
| error_result = { | |
| "error": f"No email found with message_id '{message_id}'", | |
| "message": "Email may not exist or you may not have access to it.", | |
| "user_email": auth_info | |
| } | |
| return json.dumps(error_result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in get_email_details: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "message_id": message_id, | |
| "message": "Failed to retrieve email details." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def analyze_email_patterns(sender_keyword: str, days_back: str = "30") -> str: | |
| """ | |
| Analyze email patterns from a specific sender over a given time period using OAuth authentication. | |
| Args: | |
| sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google") | |
| days_back (str): Number of days to look back (default: "30") | |
| Returns: | |
| str: JSON string containing email pattern analysis | |
| """ | |
| try: | |
| logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back) | |
| # Check authentication | |
| is_auth, auth_info = check_authentication() | |
| if not is_auth: | |
| return json.dumps({ | |
| "error": "Not authenticated", | |
| "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
| "auth_status": auth_info | |
| }, indent=2) | |
| # Calculate date range | |
| days_int = int(days_back) | |
| end_date = datetime.today() | |
| start_date = end_date - timedelta(days=days_int) | |
| start_date_str = start_date.strftime("%d-%b-%Y") | |
| end_date_str = end_date.strftime("%d-%b-%Y") | |
| # Search for emails using Gmail API | |
| full_emails = gmail_scraper.search_emails(sender_keyword, start_date_str, end_date_str) | |
| if not full_emails: | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date_str} to {end_date_str}", | |
| "analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []}, | |
| "email_count": 0, | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| # Analyze the emails (no OpenAI) | |
| analysis = simple_analyze_emails(full_emails) | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date_str} to {end_date_str}", | |
| "analysis": analysis, | |
| "email_count": len(full_emails), | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in analyze_email_patterns: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "sender_keyword": sender_keyword, | |
| "message": "Failed to analyze email patterns." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def get_authentication_status() -> str: | |
| """ | |
| Get current authentication status and account information. | |
| Returns: | |
| str: JSON string containing authentication status | |
| """ | |
| try: | |
| current_account = oauth_manager.get_current_account() | |
| is_auth = oauth_manager.is_authenticated() if current_account else False | |
| all_accounts = oauth_manager.list_accounts() | |
| result = { | |
| "authenticated": is_auth, | |
| "current_account": current_account, | |
| "status": "authenticated" if is_auth else "not_authenticated", | |
| "message": f"Current account: {current_account}" if is_auth else "No account selected or not authenticated", | |
| "all_accounts": all_accounts, | |
| "total_accounts": len(all_accounts), | |
| "authenticated_accounts": [email for email, auth in all_accounts.items() if auth] | |
| } | |
| if not is_auth and not oauth_manager.client_secrets_file.exists(): | |
| result["setup_required"] = True | |
| result["message"] = "OAuth not configured. Please run 'python setup_oauth.py' first." | |
| elif not is_auth and current_account: | |
| result["message"] = f"Account {current_account} needs re-authentication" | |
| elif not current_account and all_accounts: | |
| result["message"] = "Accounts available but none selected. Use switch_account to select one." | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error checking authentication status: %s", e) | |
| return json.dumps({ | |
| "error": str(e), | |
| "message": "Failed to check authentication status" | |
| }, indent=2) | |
| def send_email(recipient: str, subject: str, body: str) -> str: | |
| """ | |
| Send a plain-text email via the authenticated Gmail account. | |
| Returns JSON with either: | |
| {"success": true, "message_id": "..."} | |
| or | |
| {"success": false, "error": "..."} | |
| """ | |
| # Use the correct method on your OAuth manager: | |
| service = oauth_manager.get_gmail_service() | |
| if service is None: | |
| return json.dumps( | |
| {"success": False, "error": "Not authenticated or failed to build service."}, | |
| indent=2, | |
| ) | |
| # Build the MIME message | |
| mime_msg = MIMEText(body, "plain", "utf-8") | |
| mime_msg["to"] = recipient | |
| mime_msg["subject"] = subject | |
| # Base64-encode and send | |
| raw_msg = base64.urlsafe_b64encode(mime_msg.as_bytes()).decode() | |
| try: | |
| sent = ( | |
| service.users() | |
| .messages() | |
| .send(userId="me", body={"raw": raw_msg}) | |
| .execute() | |
| ) | |
| return json.dumps( | |
| {"success": True, "message_id": sent.get("id")}, indent=2 | |
| ) | |
| except HttpError as err: | |
| logger.error(f"Error sending email: {err}") | |
| # err.error_details may be None; fallback to string | |
| error_detail = getattr(err, "error_details", None) or str(err) | |
| return json.dumps( | |
| {"success": False, "error": error_detail}, | |
| indent=2, | |
| ) | |
| # Create Gradio interfaces | |
| search_interface = gr.Interface( | |
| fn=search_emails, | |
| inputs=[ | |
| gr.Textbox(label="Sender Keyword", placeholder="apple, amazon, etc."), | |
| gr.Textbox(label="Start Date (Optional)", placeholder="01-Jan-2025 (leave empty for last 7 days)"), | |
| gr.Textbox(label="End Date (Optional)", placeholder="07-Jan-2025 (leave empty for today)") | |
| ], | |
| outputs=gr.Textbox(label="Search Results", lines=20), | |
| title="Email Search (OAuth)", | |
| description="Search your emails by sender keyword and date range with OAuth authentication" | |
| ) | |
| details_interface = gr.Interface( | |
| fn=get_email_details, | |
| inputs=[ | |
| gr.Textbox(label="Message ID", placeholder="Email message ID from search results") | |
| ], | |
| outputs=gr.Textbox(label="Email Details", lines=20), | |
| title="Email Details (OAuth)", | |
| description="Get full details of a specific email by message ID with OAuth authentication" | |
| ) | |
| analysis_interface = gr.Interface( | |
| fn=analyze_email_patterns, | |
| inputs=[ | |
| gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."), | |
| gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze") | |
| ], | |
| outputs=gr.Textbox(label="Analysis Results", lines=20), | |
| title="Email Pattern Analysis (OAuth)", | |
| description="Analyze email patterns from a specific sender over time with OAuth authentication" | |
| ) | |
| auth_interface = gr.Interface( | |
| fn=authenticate_user, | |
| inputs=[], | |
| outputs=gr.Textbox(label="Authentication Result", lines=10), | |
| title="Authenticate with Gmail", | |
| description="Click Submit to start OAuth authentication flow with Gmail" | |
| ) | |
| status_interface = gr.Interface( | |
| fn=get_authentication_status, | |
| inputs=[], | |
| outputs=gr.Textbox(label="Authentication Status", lines=15), | |
| title="Authentication Status", | |
| description="Check current authentication status and view all accounts" | |
| ) | |
| switch_interface = gr.Interface( | |
| fn=switch_account, | |
| inputs=[ | |
| gr.Textbox(label="Target Email", placeholder="email@gmail.com") | |
| ], | |
| outputs=gr.Textbox(label="Switch Result", lines=10), | |
| title="Switch Account", | |
| description="Switch to a different authenticated Gmail account" | |
| ) | |
| accounts_interface = gr.Interface( | |
| fn=list_accounts, | |
| inputs=[], | |
| outputs=gr.Textbox(label="Accounts List", lines=15), | |
| title="List All Accounts", | |
| description="View all authenticated Gmail accounts and their status" | |
| ) | |
| remove_interface = gr.Interface( | |
| fn=remove_account, | |
| inputs=[ | |
| gr.Textbox(label="Email to Remove", placeholder="email@gmail.com") | |
| ], | |
| outputs=gr.Textbox(label="Removal Result", lines=10), | |
| title="Remove Account", | |
| description="Remove an authenticated Gmail account and its credentials" | |
| ) | |
| send_interface = gr.Interface( | |
| fn=send_email, | |
| inputs=[ | |
| gr.Textbox(label="Recipient Email", placeholder="recipient@example.com"), | |
| gr.Textbox(label="Subject", placeholder="Email subject"), | |
| gr.Textbox(label="Body", placeholder="Email body text", lines=5) | |
| ], | |
| outputs=gr.Textbox(label="Send Result", lines=10), | |
| title="βοΈ Send Email", | |
| description="Send an email via Gmail using OAuth authenticated account" | |
| ) | |
| # Combine interfaces into a tabbed interface | |
| demo = gr.TabbedInterface( | |
| [auth_interface, status_interface, accounts_interface, switch_interface, remove_interface, search_interface, details_interface, analysis_interface, send_interface], | |
| ["π Authenticate", "π Status", "π₯ All Accounts", "π Switch Account", "ποΈ Remove Account", "π§ Email Search", "π Email Details", "π Pattern Analysis", "βοΈ Send Email"], | |
| title="π§ Gmail Assistant MCP Server (Multi-Account OAuth)" | |
| ) | |
| if __name__ == "__main__": | |
| # Set environment variable to enable MCP server | |
| import os | |
| os.environ["GRADIO_MCP_SERVER"] = "True" | |
| # Check authentication status on startup | |
| current_account = oauth_manager.get_current_account() | |
| all_accounts = oauth_manager.list_accounts() | |
| if current_account and oauth_manager.is_authenticated(): | |
| print(f"β Currently authenticated as: {current_account}") | |
| if len(all_accounts) > 1: | |
| print(f"π± {len(all_accounts)} total accounts available: {list(all_accounts.keys())}") | |
| elif all_accounts: | |
| print(f"π± {len(all_accounts)} stored accounts found: {list(all_accounts.keys())}") | |
| print("β οΈ No current account selected. Use the web interface or Claude to switch accounts.") | |
| else: | |
| print("β No authenticated accounts. Users will need to authenticate through the web interface.") | |
| print("π‘ Or run 'python setup_oauth.py' for initial setup.") | |
| # Launch the server | |
| demo.launch(share=False) | |
| print("\nπ MCP Server is running!") | |
| print("π MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse") | |
| print("π Copy this URL to your Claude Desktop MCP configuration") | |
| print("\nπ Web Interface: http://localhost:7860") | |
| print("π Use the web interface to authenticate and test the tools") |