Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Gmail MCP Server with OAuth Authentication and Multi-Account Support | |
| """ | |
| import gradio as gr | |
| import json | |
| import base64 | |
| from email.mime.text import MIMEText | |
| from googleapiclient.errors import HttpError | |
| import os | |
| from typing import Dict, List | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| from fastapi.responses import HTMLResponse | |
| from fastapi import FastAPI,Request | |
| from fastapi.routing import APIRoute | |
| # Import OAuth-enabled modules | |
| # from tools import extract_query_info, analyze_emails | |
| from gmail_api_scraper import GmailAPIScraper | |
| from oauth_manager import oauth_manager | |
| from logger import logger | |
| load_dotenv() | |
| if not oauth_manager.client_secrets_file.exists(): | |
| print("Setupy") | |
| oauth_manager.setup_client_secrets( | |
| os.environ["GOOGLE_CLIENT_ID"], | |
| os.environ["GOOGLE_CLIENT_SECRET"] | |
| ) | |
| # Initialize Gmail API scraper | |
| gmail_scraper = GmailAPIScraper() | |
| def check_authentication() -> tuple[bool, str]: | |
| """Check if user is authenticated and return status""" | |
| current_account = oauth_manager.get_current_account() | |
| if current_account and oauth_manager.is_authenticated(): | |
| return True, current_account | |
| else: | |
| return False, "Not authenticated" | |
| def simple_analyze_emails(emails) -> dict: | |
| """ | |
| Simple email analysis without OpenAI - just basic statistics and patterns | |
| """ | |
| if not emails: | |
| return {"summary": "No emails to analyze.", "insights": []} | |
| # Basic statistics | |
| total_count = len(emails) | |
| # Group by sender | |
| senders = {} | |
| subjects = [] | |
| dates = [] | |
| for email in emails: | |
| sender = email.get("from", "Unknown") | |
| # Extract just the email domain for grouping | |
| if "<" in sender and ">" in sender: | |
| email_part = sender.split("<")[1].split(">")[0] | |
| else: | |
| email_part = sender | |
| domain = email_part.split("@")[-1] if "@" in email_part else sender | |
| senders[domain] = senders.get(domain, 0) + 1 | |
| subjects.append(email.get("subject", "")) | |
| dates.append(email.get("date", "")) | |
| # Create insights | |
| insights = [] | |
| insights.append(f"Found {total_count} emails total") | |
| if senders: | |
| top_sender = max(senders.items(), key=lambda x: x[1]) | |
| insights.append(f"Most emails from: {top_sender[0]} ({top_sender[1]} emails)") | |
| if len(senders) > 1: | |
| insights.append(f"Emails from {len(senders)} different domains") | |
| # Date range | |
| if dates: | |
| unique_dates = list(set(dates)) | |
| if len(unique_dates) > 1: | |
| insights.append(f"Spanning {len(unique_dates)} different days") | |
| # Subject analysis | |
| if subjects: | |
| # Count common words in subjects (simple approach) | |
| all_words = [] | |
| for subject in subjects: | |
| words = subject.lower().split() | |
| all_words.extend([w for w in words if len(w) > 3]) # Only words longer than 3 chars | |
| if all_words: | |
| word_counts = {} | |
| for word in all_words: | |
| word_counts[word] = word_counts.get(word, 0) + 1 | |
| if word_counts: | |
| common_word = max(word_counts.items(), key=lambda x: x[1]) | |
| if common_word[1] > 1: | |
| insights.append(f"Common subject word: '{common_word[0]}' appears {common_word[1]} times") | |
| summary = f"Analysis of {total_count} emails from {len(senders)} sender(s)" | |
| return { | |
| "summary": summary, | |
| "insights": insights | |
| } | |
| def authenticate_user() -> str: | |
| """ | |
| Start OAuth authentication flow for Gmail access. | |
| Opens a browser window for user to authenticate with Google. | |
| Returns: | |
| str: JSON string containing authentication result | |
| """ | |
| try: | |
| logger.info("Starting OAuth authentication flow...") | |
| if oauth_manager.is_authenticated(): | |
| user_email = oauth_manager.get_current_account() | |
| return json.dumps({ | |
| "success": True, | |
| "message": "Already authenticated!", | |
| "user_email": user_email, | |
| "instructions": [ | |
| "You are already authenticated and ready to use email tools", | |
| f"Currently authenticated as: {user_email}" | |
| ] | |
| }, indent=2) | |
| # Check if OAuth is configured | |
| if not oauth_manager.client_secrets_file.exists(): | |
| return json.dumps({ | |
| "error": "OAuth not configured", | |
| "message": "Please run 'python setup_oauth.py' first to configure OAuth credentials.", | |
| "success": False | |
| }, indent=2) | |
| # Start authentication | |
| success = oauth_manager.authenticate_interactive() | |
| if success: | |
| user_email = oauth_manager.get_current_account() | |
| result = { | |
| "success": True, | |
| "message": "Authentication successful! You can now use the email tools.", | |
| "user_email": user_email, | |
| "instructions": [ | |
| "Authentication completed successfully", | |
| "You can now search emails, get email details, and analyze patterns", | |
| f"Currently authenticated as: {user_email}" | |
| ] | |
| } | |
| else: | |
| # Authentication not completed, provide manual instructions | |
| auth_url = oauth_manager.get_pending_auth_url() | |
| callback_url = oauth_manager.get_hf_redirect_uri() | |
| if auth_url: | |
| result = { | |
| "success": False, | |
| "message": "Manual authentication required", | |
| "auth_url": auth_url, | |
| "callback_url": callback_url, | |
| "instructions": [ | |
| "Authentication URL has been generated", | |
| "Please click the link below to authenticate:", | |
| "1. Open the authentication URL(auth_url) in a new browser tab", | |
| "2. Sign in with your Google account", | |
| "3. Grant Gmail access permissions", | |
| "4. You'll be redirected back automatically", | |
| "5. Try clicking 'Submit' again after completing authentication" | |
| ], | |
| "note": "After completing authentication in the popup, click Submit again to verify" | |
| } | |
| else: | |
| result = { | |
| "success": False, | |
| "error": "Failed to generate authentication URL", | |
| "message": "Could not start authentication process. Check your OAuth configuration." | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in authenticate_user: %s", e) | |
| error_result = { | |
| "success": False, | |
| "error": str(e), | |
| "message": "Authentication failed due to an error." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def handle_oauth_callback(auth_code: str) -> str: | |
| """Handle OAuth callback for Hugging Face Spaces | |
| Args: | |
| auth_code: Authorization code from OAuth callback | |
| Returns: | |
| HTML response string | |
| """ | |
| try: | |
| if not auth_code: | |
| return """ | |
| <html> | |
| <head><title>OAuth Error</title></head> | |
| <body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
| <h1 style="color: #d32f2f;">Authentication Error</h1> | |
| <p>No authorization code received.</p> | |
| <button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #1976d2; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> | |
| </body> | |
| </html> | |
| """ | |
| print(f"Received OAuth callback with code: {auth_code}") | |
| success = oauth_manager.complete_hf_spaces_auth(auth_code) | |
| if success: | |
| user_email = oauth_manager.get_current_account() | |
| return f""" | |
| <html> | |
| <head><title>OAuth Success</title></head> | |
| <body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
| <h1 style="color: #2e7d32;">π Authentication Successful!</h1> | |
| <p>You are now authenticated as:</p> | |
| <p style="font-weight: bold; font-size: 18px; color: #1976d2;">{user_email}</p> | |
| <p>You can now close this window and return to the main application.</p> | |
| <p style="color: #666; font-size: 14px;">This window will close automatically in 5 seconds...</p> | |
| <button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #2e7d32; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> | |
| <script> | |
| setTimeout(function() {{ | |
| window.close(); | |
| }}, 5000); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| else: | |
| return """ | |
| <html> | |
| <head><title>OAuth Error</title></head> | |
| <body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
| <h1 style="color: #d32f2f;">Authentication Failed</h1> | |
| <p>Unable to complete authentication. Please try again.</p> | |
| <p>Make sure you granted all required permissions.</p> | |
| <button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #d32f2f; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> | |
| </body> | |
| </html> | |
| """ | |
| except Exception as e: | |
| logger.error(f"Error handling OAuth callback: {e}") | |
| return f""" | |
| <html> | |
| <head><title>OAuth Error</title></head> | |
| <body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
| <h1 style="color: #d32f2f;">Authentication Error</h1> | |
| <p>An error occurred during authentication:</p> | |
| <pre style="background: #f5f5f5; padding: 10px; border-radius: 4px; text-align: left; max-width: 500px; margin: 0 auto;">{str(e)}</pre> | |
| <button onclick="window.close()" style="padding: 10px 20px; margin: 20px; background: #d32f2f; color: white; border: none; border-radius: 4px; cursor: pointer;">Close Window</button> | |
| </body> | |
| </html> | |
| """ | |
| def switch_account(target_email: str) -> str: | |
| """ | |
| Switch to a different authenticated Gmail account. | |
| Args: | |
| target_email (str): Email address to switch to | |
| Returns: | |
| str: JSON string containing switch result | |
| """ | |
| try: | |
| logger.info("Switching to account: %s", target_email) | |
| # Check if target account is authenticated | |
| if not oauth_manager.is_authenticated(target_email): | |
| return json.dumps({ | |
| "error": "Account not authenticated", | |
| "message": f"Account '{target_email}' is not authenticated. Please authenticate first.", | |
| "target_email": target_email, | |
| "authenticated_accounts": list(oauth_manager.list_accounts().keys()) | |
| }, indent=2) | |
| # Switch account | |
| success = oauth_manager.switch_account(target_email) | |
| if success: | |
| result = { | |
| "success": True, | |
| "message": f"Successfully switched to account: {target_email}", | |
| "current_account": oauth_manager.get_current_account(), | |
| "previous_account": None # Could track this if needed | |
| } | |
| else: | |
| result = { | |
| "success": False, | |
| "error": "Failed to switch account", | |
| "message": f"Could not switch to account: {target_email}", | |
| "current_account": oauth_manager.get_current_account() | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error switching account: %s", e) | |
| error_result = { | |
| "success": False, | |
| "error": str(e), | |
| "message": f"Failed to switch to account: {target_email}" | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def list_accounts() -> str: | |
| """ | |
| List all authenticated Gmail accounts and their status. | |
| Returns: | |
| str: JSON string containing all accounts and their authentication status | |
| """ | |
| try: | |
| logger.info("Listing all accounts") | |
| accounts = oauth_manager.list_accounts() | |
| current_account = oauth_manager.get_current_account() | |
| result = { | |
| "accounts": accounts, | |
| "current_account": current_account, | |
| "total_accounts": len(accounts), | |
| "authenticated_accounts": [email for email, is_auth in accounts.items() if is_auth], | |
| "message": f"Found {len(accounts)} stored accounts, currently using: {current_account or 'None'}" | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error listing accounts: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "message": "Failed to list accounts" | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def remove_account(email_to_remove: str) -> str: | |
| """ | |
| Remove an authenticated Gmail account and its stored credentials. | |
| Args: | |
| email_to_remove (str): Email address to remove | |
| Returns: | |
| str: JSON string containing removal result | |
| """ | |
| try: | |
| logger.info("Removing account: %s", email_to_remove) | |
| # Check if account exists | |
| accounts = oauth_manager.list_accounts() | |
| if email_to_remove not in accounts: | |
| return json.dumps({ | |
| "error": "Account not found", | |
| "message": f"Account '{email_to_remove}' not found in stored accounts.", | |
| "available_accounts": list(accounts.keys()) | |
| }, indent=2) | |
| # Remove account | |
| oauth_manager.remove_account(email_to_remove) | |
| result = { | |
| "success": True, | |
| "message": f"Successfully removed account: {email_to_remove}", | |
| "removed_account": email_to_remove, | |
| "current_account": oauth_manager.get_current_account(), | |
| "remaining_accounts": list(oauth_manager.list_accounts().keys()) | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error removing account: %s", e) | |
| error_result = { | |
| "success": False, | |
| "error": str(e), | |
| "message": f"Failed to remove account: {email_to_remove}" | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def search_emails(sender_keyword: str, start_date: str = "", end_date: str = "") -> str: | |
| """ | |
| Search for emails from a specific sender within a date range using OAuth authentication. | |
| Args: | |
| sender_keyword (str): The sender/company keyword to search for (e.g., "apple", "amazon") | |
| start_date (str): Start date in DD-MMM-YYYY format (e.g., "01-Jan-2025"). If empty, defaults to 7 days ago. | |
| end_date (str): End date in DD-MMM-YYYY format (e.g., "07-Jan-2025"). If empty, defaults to today. | |
| Returns: | |
| str: JSON string containing email search results and analysis | |
| """ | |
| try: | |
| logger.info("OAuth Email search tool called with sender: %s, dates: %s to %s", sender_keyword, start_date, end_date) | |
| # Check authentication | |
| is_auth, auth_info = check_authentication() | |
| if not is_auth: | |
| return json.dumps({ | |
| "error": "Not authenticated", | |
| "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
| "auth_status": auth_info | |
| }, indent=2) | |
| # Set default date range if not provided | |
| if not start_date or not end_date: | |
| today = datetime.today() | |
| if not end_date: | |
| end_date = today.strftime("%d-%b-%Y") | |
| if not start_date: | |
| start_date = (today - timedelta(days=7)).strftime("%d-%b-%Y") | |
| logger.info(f"Searching for emails with keyword '{sender_keyword}' between {start_date} and {end_date}") | |
| # Use Gmail API scraper with OAuth | |
| full_emails = gmail_scraper.search_emails(sender_keyword, start_date, end_date) | |
| if not full_emails: | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date} to {end_date}", | |
| "email_summary": [], | |
| "analysis": {"summary": f"No emails found for '{sender_keyword}' in the specified date range.", "insights": []}, | |
| "email_count": 0, | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| # Create summary version without full content | |
| email_summary = [] | |
| for email in full_emails: | |
| summary_email = { | |
| "date": email.get("date"), | |
| "time": email.get("time"), | |
| "subject": email.get("subject"), | |
| "from": email.get("from", "Unknown Sender"), | |
| "message_id": email.get("message_id"), | |
| "gmail_id": email.get("gmail_id") | |
| } | |
| email_summary.append(summary_email) | |
| # Auto-analyze the emails for insights (no OpenAI) | |
| analysis = simple_analyze_emails(full_emails) | |
| # Return summary info with analysis | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date} to {end_date}", | |
| "email_summary": email_summary, | |
| "analysis": analysis, | |
| "email_count": len(full_emails), | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in search_emails: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "sender_keyword": sender_keyword, | |
| "message": "Failed to search emails." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def get_email_details(message_id: str) -> str: | |
| """ | |
| Get full details of a specific email by its message ID using OAuth authentication. | |
| Args: | |
| message_id (str): The message ID of the email to retrieve | |
| Returns: | |
| str: JSON string containing the full email details | |
| """ | |
| try: | |
| logger.info("Getting email details for message_id: %s", message_id) | |
| # Check authentication | |
| is_auth, auth_info = check_authentication() | |
| if not is_auth: | |
| return json.dumps({ | |
| "error": "Not authenticated", | |
| "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
| "auth_status": auth_info | |
| }, indent=2) | |
| # Get email using Gmail API | |
| email = gmail_scraper.get_email_by_id(message_id) | |
| if email: | |
| email["user_email"] = auth_info | |
| return json.dumps(email, indent=2) | |
| else: | |
| error_result = { | |
| "error": f"No email found with message_id '{message_id}'", | |
| "message": "Email may not exist or you may not have access to it.", | |
| "user_email": auth_info | |
| } | |
| return json.dumps(error_result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in get_email_details: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "message_id": message_id, | |
| "message": "Failed to retrieve email details." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def analyze_email_patterns(sender_keyword: str, days_back: str = "30") -> str: | |
| """ | |
| Analyze email patterns from a specific sender over a given time period using OAuth authentication. | |
| Args: | |
| sender_keyword (str): The sender/company keyword to analyze (e.g., "amazon", "google") | |
| days_back (str): Number of days to look back (default: "30") | |
| Returns: | |
| str: JSON string containing email pattern analysis | |
| """ | |
| try: | |
| logger.info("Analyzing email patterns for sender: %s, days_back: %s", sender_keyword, days_back) | |
| # Check authentication | |
| is_auth, auth_info = check_authentication() | |
| if not is_auth: | |
| return json.dumps({ | |
| "error": "Not authenticated", | |
| "message": "Please authenticate first using the authenticate_user tool or run 'python setup_oauth.py'", | |
| "auth_status": auth_info | |
| }, indent=2) | |
| # Calculate date range | |
| days_int = int(days_back) | |
| end_date = datetime.today() | |
| start_date = end_date - timedelta(days=days_int) | |
| start_date_str = start_date.strftime("%d-%b-%Y") | |
| end_date_str = end_date.strftime("%d-%b-%Y") | |
| # Search for emails using Gmail API | |
| full_emails = gmail_scraper.search_emails(sender_keyword, start_date_str, end_date_str) | |
| if not full_emails: | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date_str} to {end_date_str}", | |
| "analysis": {"summary": f"No emails found from '{sender_keyword}' in the last {days_back} days.", "insights": []}, | |
| "email_count": 0, | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| # Analyze the emails (no OpenAI) | |
| analysis = simple_analyze_emails(full_emails) | |
| result = { | |
| "sender_keyword": sender_keyword, | |
| "date_range": f"{start_date_str} to {end_date_str}", | |
| "analysis": analysis, | |
| "email_count": len(full_emails), | |
| "user_email": auth_info | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error in analyze_email_patterns: %s", e) | |
| error_result = { | |
| "error": str(e), | |
| "sender_keyword": sender_keyword, | |
| "message": "Failed to analyze email patterns." | |
| } | |
| return json.dumps(error_result, indent=2) | |
| def get_authentication_status() -> str: | |
| """ | |
| Get current authentication status and account information. | |
| Returns: | |
| str: JSON string containing authentication status | |
| """ | |
| try: | |
| current_account = oauth_manager.get_current_account() | |
| is_auth = oauth_manager.is_authenticated() if current_account else False | |
| all_accounts = oauth_manager.list_accounts() | |
| result = { | |
| "authenticated": is_auth, | |
| "current_account": current_account, | |
| "status": "authenticated" if is_auth else "not_authenticated", | |
| "message": f"Current account: {current_account}" if is_auth else "No account selected or not authenticated", | |
| "all_accounts": all_accounts, | |
| "total_accounts": len(all_accounts), | |
| "authenticated_accounts": [email for email, auth in all_accounts.items() if auth] | |
| } | |
| if not is_auth and not oauth_manager.client_secrets_file.exists(): | |
| result["setup_required"] = True | |
| result["message"] = "OAuth not configured. Please run 'python setup_oauth.py' first." | |
| elif not is_auth and current_account: | |
| result["message"] = f"Account {current_account} needs re-authentication" | |
| elif not current_account and all_accounts: | |
| result["message"] = "Accounts available but none selected. Use switch_account to select one." | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logger.error("Error checking authentication status: %s", e) | |
| return json.dumps({ | |
| "error": str(e), | |
| "message": "Failed to check authentication status" | |
| }, indent=2) | |
| def send_email(recipient: str, subject: str, body: str) -> str: | |
| """ | |
| Send a plain-text email via the authenticated Gmail account. | |
| Returns JSON with either: | |
| {"success": true, "message_id": "..."} | |
| or | |
| {"success": false, "error": "..."} | |
| """ | |
| # Use the correct method on your OAuth manager: | |
| service = oauth_manager.get_gmail_service() | |
| if service is None: | |
| return json.dumps( | |
| {"success": False, "error": "Not authenticated or failed to build service."}, | |
| indent=2, | |
| ) | |
| # Build the MIME message | |
| mime_msg = MIMEText(body, "plain", "utf-8") | |
| mime_msg["to"] = recipient | |
| mime_msg["subject"] = subject | |
| # Base64-encode and send | |
| raw_msg = base64.urlsafe_b64encode(mime_msg.as_bytes()).decode() | |
| try: | |
| sent = ( | |
| service.users() | |
| .messages() | |
| .send(userId="me", body={"raw": raw_msg}) | |
| .execute() | |
| ) | |
| return json.dumps( | |
| {"success": True, "message_id": sent.get("id")}, indent=2 | |
| ) | |
| except HttpError as err: | |
| logger.error(f"Error sending email: {err}") | |
| # err.error_details may be None; fallback to string | |
| error_detail = getattr(err, "error_details", None) or str(err) | |
| return json.dumps( | |
| {"success": False, "error": error_detail}, | |
| indent=2, | |
| ) | |
| # Create Gradio interfaces | |
| search_interface = gr.Interface( | |
| fn=search_emails, | |
| inputs=[ | |
| gr.Textbox(label="Sender Keyword", placeholder="apple, amazon, etc."), | |
| gr.Textbox(label="Start Date (Optional)", placeholder="01-Jan-2025 (leave empty for last 7 days)"), | |
| gr.Textbox(label="End Date (Optional)", placeholder="07-Jan-2025 (leave empty for today)") | |
| ], | |
| outputs=gr.Textbox(label="Search Results", lines=20), | |
| title="Email Search (OAuth)", | |
| description="Search your emails by sender keyword and date range with OAuth authentication" | |
| ) | |
| details_interface = gr.Interface( | |
| fn=get_email_details, | |
| inputs=[ | |
| gr.Textbox(label="Message ID", placeholder="Email message ID from search results") | |
| ], | |
| outputs=gr.Textbox(label="Email Details", lines=20), | |
| title="Email Details (OAuth)", | |
| description="Get full details of a specific email by message ID with OAuth authentication" | |
| ) | |
| analysis_interface = gr.Interface( | |
| fn=analyze_email_patterns, | |
| inputs=[ | |
| gr.Textbox(label="Sender Keyword", placeholder="amazon, google, linkedin, etc."), | |
| gr.Textbox(label="Days Back", value="30", placeholder="Number of days to analyze") | |
| ], | |
| outputs=gr.Textbox(label="Analysis Results", lines=20), | |
| title="Email Pattern Analysis (OAuth)", | |
| description="Analyze email patterns from a specific sender over time with OAuth authentication" | |
| ) | |
| auth_interface = gr.Interface( | |
| fn=authenticate_user, | |
| inputs=[], | |
| outputs=gr.Textbox(label="Authentication Result", lines=10), | |
| title="Authenticate with Gmail", | |
| description="Click Submit to start OAuth authentication flow with Gmail" | |
| ) | |
| status_interface = gr.Interface( | |
| fn=get_authentication_status, | |
| inputs=[], | |
| outputs=gr.Textbox(label="Authentication Status", lines=15), | |
| title="Authentication Status", | |
| description="Check current authentication status and view all accounts" | |
| ) | |
| switch_interface = gr.Interface( | |
| fn=switch_account, | |
| inputs=[ | |
| gr.Textbox(label="Target Email", placeholder="email@gmail.com") | |
| ], | |
| outputs=gr.Textbox(label="Switch Result", lines=10), | |
| title="Switch Account", | |
| description="Switch to a different authenticated Gmail account" | |
| ) | |
| accounts_interface = gr.Interface( | |
| fn=list_accounts, | |
| inputs=[], | |
| outputs=gr.Textbox(label="Accounts List", lines=15), | |
| title="List All Accounts", | |
| description="View all authenticated Gmail accounts and their status" | |
| ) | |
| remove_interface = gr.Interface( | |
| fn=remove_account, | |
| inputs=[ | |
| gr.Textbox(label="Email to Remove", placeholder="email@gmail.com") | |
| ], | |
| outputs=gr.Textbox(label="Removal Result", lines=10), | |
| title="Remove Account", | |
| description="Remove an authenticated Gmail account and its credentials" | |
| ) | |
| send_interface = gr.Interface( | |
| fn=send_email, | |
| inputs=[ | |
| gr.Textbox(label="Recipient Email", placeholder="recipient@example.com"), | |
| gr.Textbox(label="Subject", placeholder="Email subject"), | |
| gr.Textbox(label="Body", placeholder="Email body text", lines=5) | |
| ], | |
| outputs=gr.Textbox(label="Send Result", lines=10), | |
| title="βοΈ Send Email", | |
| description="Send an email via Gmail using OAuth authenticated account" | |
| ) | |
| # Combine interfaces into a tabbed interface | |
| demo = gr.TabbedInterface( | |
| [auth_interface, status_interface, accounts_interface, switch_interface, remove_interface, search_interface, details_interface, analysis_interface, send_interface], | |
| ["π Authenticate", "π Status", "π₯ All Accounts", "π Switch Account", "ποΈ Remove Account", "π§ Email Search", "π Email Details", "π Pattern Analysis", "βοΈ Send Email"], | |
| title="π§ Gmail Assistant MCP Server (Multi-Account OAuth)" | |
| ) | |
| app = FastAPI() | |
| # Add your OAuth callback route | |
| async def google_oauth_cb(request: Request): | |
| code = request.query_params.get("code") | |
| print("code:", code) | |
| return HTMLResponse(handle_oauth_callback(code)) | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |