Spaces:
Sleeping
Sleeping
| # src/tools/gmail_mcp_client.py | |
| import base64 | |
| import os | |
| from typing import List, Optional, Dict, Any | |
| from google.auth.transport.requests import Request | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient.discovery import build | |
| # If modifying these SCOPES, delete the token file to re-authenticate. | |
| SCOPES = ["https://www.googleapis.com/auth/gmail.modify"] | |
| # OAuth2 credentials from environment variables | |
| GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") | |
| GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") | |
| GOOGLE_REFRESH_TOKEN = os.getenv("GOOGLE_REFRESH_TOKEN") | |
| GOOGLE_ACCESS_TOKEN = os.getenv("GOOGLE_ACCESS_TOKEN") | |
| # Allowed email addresses and domains for security | |
| ALLOWED_SENDERS = ["habib.adoum01@gmail.com", "news@alphasignal.ai"] | |
| def _get_gmail_service(): | |
| """ | |
| Authenticate (or refresh) and return a Gmail API service instance using environment variables. | |
| """ | |
| try: | |
| # Check if we have the required environment variables | |
| if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET: | |
| raise ValueError( | |
| "Missing required environment variables: GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET. " | |
| "Please set these environment variables with your OAuth 2.0 credentials." | |
| ) | |
| creds = None | |
| # Try to create credentials from environment variables | |
| if GOOGLE_ACCESS_TOKEN and GOOGLE_REFRESH_TOKEN: | |
| print("Creating credentials from environment variables...") | |
| creds = Credentials( | |
| token=GOOGLE_ACCESS_TOKEN, | |
| refresh_token=GOOGLE_REFRESH_TOKEN, | |
| token_uri="https://oauth2.googleapis.com/token", | |
| client_id=GOOGLE_CLIENT_ID, | |
| client_secret=GOOGLE_CLIENT_SECRET, | |
| scopes=SCOPES, | |
| ) | |
| elif GOOGLE_REFRESH_TOKEN: | |
| print("Creating credentials from refresh token...") | |
| creds = Credentials( | |
| token=None, | |
| refresh_token=GOOGLE_REFRESH_TOKEN, | |
| token_uri="https://oauth2.googleapis.com/token", | |
| client_id=GOOGLE_CLIENT_ID, | |
| client_secret=GOOGLE_CLIENT_SECRET, | |
| scopes=SCOPES, | |
| ) | |
| # If no valid credentials from environment variables, try to refresh or start OAuth flow | |
| if not creds or not creds.valid: | |
| if creds and creds.refresh_token: | |
| print("Refreshing Gmail credentials using refresh token...") | |
| try: | |
| creds.refresh(Request()) | |
| print("Gmail credentials refreshed successfully.") | |
| except Exception as e: | |
| print(f"Failed to refresh credentials: {e}") | |
| creds = None # Force OAuth flow | |
| if not creds or not creds.valid: | |
| # For deployment environments, we can't run interactive OAuth flow | |
| # User must provide refresh token via environment variable | |
| if os.getenv("DEPLOYMENT_ENV") == "production": | |
| raise ValueError( | |
| "No valid credentials found in production environment. " | |
| "Please set GOOGLE_REFRESH_TOKEN environment variable with a valid refresh token. " | |
| "Run the setup script locally first to obtain the refresh token." | |
| ) | |
| # For local development, create OAuth client info from environment variables | |
| client_config = { | |
| "installed": { | |
| "client_id": GOOGLE_CLIENT_ID, | |
| "client_secret": GOOGLE_CLIENT_SECRET, | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "redirect_uris": ["http://localhost:3000"], | |
| } | |
| } | |
| print("Starting Gmail OAuth flow...") | |
| flow = InstalledAppFlow.from_client_config(client_config, SCOPES) | |
| # Use port 3000 to match the redirect URI in your OAuth config | |
| creds = flow.run_local_server(port=3000, open_browser=True) | |
| # Print the refresh token for the user to set as environment variable | |
| print("\n" + "=" * 60) | |
| print("IMPORTANT: Save this refresh token as an environment variable:") | |
| print(f"GOOGLE_REFRESH_TOKEN={creds.refresh_token}") | |
| print("Set this in your .env file or deployment environment.") | |
| print("=" * 60 + "\n") | |
| # Build the Gmail API client | |
| print("Gmail API service initialized successfully.") | |
| return build("gmail", "v1", credentials=creds) | |
| except Exception as e: | |
| print(f"Error initializing Gmail service: {str(e)}") | |
| raise | |
| def setup_gmail_credentials(): | |
| """ | |
| Helper function to setup Gmail credentials and get the refresh token. | |
| Run this once locally to get the refresh token for your environment variables. | |
| """ | |
| print("Setting up Gmail credentials...") | |
| print("Make sure you have set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables.") | |
| if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET: | |
| print("Error: Missing GOOGLE_CLIENT_ID or GOOGLE_CLIENT_SECRET environment variables.") | |
| print("\nTo get these values:") | |
| print("1. Go to https://console.cloud.google.com/apis/credentials") | |
| print("2. Create OAuth 2.0 Client IDs (Application type: Desktop application)") | |
| print("3. Download the JSON file and extract client_id and client_secret") | |
| print("4. Set them as environment variables:") | |
| print(" export GOOGLE_CLIENT_ID='your_client_id_here'") | |
| print(" export GOOGLE_CLIENT_SECRET='your_client_secret_here'") | |
| return None | |
| client_config = { | |
| "installed": { | |
| "client_id": GOOGLE_CLIENT_ID, | |
| "client_secret": GOOGLE_CLIENT_SECRET, | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "redirect_uris": ["http://localhost:3000"], | |
| } | |
| } | |
| flow = InstalledAppFlow.from_client_config(client_config, SCOPES) | |
| creds = flow.run_local_server(port=3000, open_browser=True) | |
| print("\n" + "=" * 80) | |
| print("SUCCESS! Gmail credentials obtained.") | |
| print("\nAdd these to your environment variables (.env file or deployment platform):") | |
| print(f"GOOGLE_CLIENT_ID={GOOGLE_CLIENT_ID}") | |
| print(f"GOOGLE_CLIENT_SECRET={GOOGLE_CLIENT_SECRET}") | |
| print(f"GOOGLE_REFRESH_TOKEN={creds.refresh_token}") | |
| if creds.token: | |
| print(f"GOOGLE_ACCESS_TOKEN={creds.token}") | |
| print("\nFor deployment (like Hugging Face Spaces), you only need:") | |
| print("- GOOGLE_CLIENT_ID") | |
| print("- GOOGLE_CLIENT_SECRET") | |
| print("- GOOGLE_REFRESH_TOKEN") | |
| print("- DEPLOYMENT_ENV=production") | |
| print("=" * 80) | |
| return creds | |
| def _is_allowed_sender(email_dict: Dict[str, Any]) -> bool: | |
| """ | |
| Check if the email is from an allowed sender. | |
| """ | |
| try: | |
| headers = email_dict.get("payload", {}).get("headers", []) | |
| for header in headers: | |
| if header.get("name") == "From": | |
| from_address = header.get("value", "").lower() | |
| for allowed in ALLOWED_SENDERS: | |
| if allowed.lower() in from_address: | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error checking sender permissions: {str(e)}") | |
| return False | |
| def _extract_email_info(email_dict: Dict[str, Any]) -> Dict[str, str]: | |
| """ | |
| Extract relevant information from an email dictionary with error handling. | |
| """ | |
| try: | |
| headers = email_dict.get("payload", {}).get("headers", []) | |
| subject = "" | |
| sender = "" | |
| date = "" | |
| for header in headers: | |
| name = header.get("name", "") | |
| value = header.get("value", "") | |
| if name == "Subject": | |
| subject = value | |
| elif name == "From": | |
| sender = value | |
| elif name == "Date": | |
| date = value | |
| result = { | |
| "id": email_dict.get("id", ""), | |
| "subject": subject, | |
| "sender": sender, | |
| "date": date, | |
| "snippet": email_dict.get("snippet", ""), | |
| } | |
| print(f"Extracted email info: {subject} from {sender}") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Error parsing email: {str(e)}" | |
| print(error_msg) | |
| return { | |
| "id": email_dict.get("id", ""), | |
| "subject": "Error parsing subject", | |
| "sender": "Error parsing sender", | |
| "date": "Error parsing date", | |
| "snippet": error_msg, | |
| } | |
| def get_recent_emails(max_results: int = 10) -> List[Dict[str, str]]: | |
| """ | |
| Get the most recent emails from allowed senders only. | |
| Args: | |
| max_results: Maximum number of emails to return (default 10, max 50). | |
| Returns: | |
| List of email dictionaries with id, subject, sender, date, and snippet. | |
| Returns empty list if no emails found or on error. | |
| """ | |
| try: | |
| # Limit max_results to prevent token overflow | |
| max_results = min(max_results, 50) | |
| print(f"Fetching up to {max_results} recent emails from allowed senders...") | |
| service = _get_gmail_service() | |
| # Build query for allowed senders | |
| query_parts = [] | |
| for sender in ALLOWED_SENDERS: | |
| query_parts.append(f"from:{sender}") | |
| query = " OR ".join(query_parts) | |
| print(f"Using Gmail query: {query}") | |
| # Get message list | |
| response = service.users().messages().list(userId="me", q=query, maxResults=max_results).execute() | |
| messages = response.get("messages", []) | |
| print(f"Found {len(messages)} messages from allowed senders.") | |
| if not messages: | |
| print("No emails found from allowed senders.") | |
| return [] | |
| email_list = [] | |
| for i, message in enumerate(messages): | |
| try: | |
| print(f"Processing email {i + 1}/{len(messages)}: {message['id']}") | |
| # Get message metadata only to save tokens | |
| msg = service.users().messages().get(userId="me", id=message["id"], format="metadata").execute() | |
| # Extract and filter | |
| if _is_allowed_sender(msg): | |
| email_info = _extract_email_info(msg) | |
| email_list.append(email_info) | |
| else: | |
| print(f"Email {message['id']} not from allowed sender, skipping.") | |
| except Exception as e: | |
| print(f"Error processing message {message.get('id', 'unknown')}: {str(e)}") | |
| continue | |
| print(f"Successfully processed {len(email_list)} emails.") | |
| return email_list | |
| except Exception as e: | |
| error_msg = f"Failed to get recent emails: {str(e)}" | |
| print(error_msg) | |
| return [{"error": error_msg}] | |
| def search_emails_simple(query: str, max_results: int = 10) -> List[Dict[str, str]]: | |
| """ | |
| Search for emails with a simple query, automatically filtered to allowed senders. | |
| Args: | |
| query: Simple search term (e.g. "AI", "newsletter"). Complex Gmail operators not needed. | |
| max_results: Maximum number of emails to return (default 10, max 50). | |
| Returns: | |
| List of email dictionaries with basic info, or empty list if none found. | |
| """ | |
| try: | |
| max_results = min(max_results, 50) | |
| print(f"Searching for emails containing '{query}' from allowed senders...") | |
| service = _get_gmail_service() | |
| # Build query with sender filtering | |
| sender_filter = " OR ".join([f"from:{sender}" for sender in ALLOWED_SENDERS]) | |
| full_query = f"({sender_filter}) AND ({query})" | |
| print(f"Using Gmail search query: {full_query}") | |
| response = service.users().messages().list(userId="me", q=full_query, maxResults=max_results).execute() | |
| messages = response.get("messages", []) | |
| result_count = response.get("resultSizeEstimate", 0) | |
| print(f"Search found {result_count} total results, processing {len(messages)} messages...") | |
| if not messages: | |
| print("No emails found matching the search criteria.") | |
| return [] | |
| email_list = [] | |
| for i, message in enumerate(messages): | |
| try: | |
| print(f"Processing search result {i + 1}/{len(messages)}: {message['id']}") | |
| msg = service.users().messages().get(userId="me", id=message["id"], format="metadata").execute() | |
| if _is_allowed_sender(msg): | |
| email_info = _extract_email_info(msg) | |
| email_list.append(email_info) | |
| except Exception as e: | |
| print(f"Error processing search result {message.get('id', 'unknown')}: {str(e)}") | |
| continue | |
| print(f"Search completed: found {len(email_list)} relevant emails.") | |
| return email_list | |
| except Exception as e: | |
| error_msg = f"Email search failed: {str(e)}" | |
| print(error_msg) | |
| return [{"error": error_msg}] | |
| def read_email_content(message_id: str) -> Dict[str, Any]: | |
| """ | |
| Read the full content of a specific email by ID, with security filtering. | |
| Args: | |
| message_id: The Gmail message ID (from search or recent emails list). | |
| Returns: | |
| Dictionary with email content including subject, sender, date, and body. | |
| Returns error dict if email not accessible or not from allowed sender. | |
| """ | |
| try: | |
| print(f"Reading email content for message ID: {message_id}") | |
| service = _get_gmail_service() | |
| message = service.users().messages().get(userId="me", id=message_id, format="full").execute() | |
| # Security check: only return emails from allowed senders | |
| if not _is_allowed_sender(message): | |
| error_msg = f"Access denied: Email {message_id} not from allowed sender" | |
| print(error_msg) | |
| return {"error": error_msg, "allowed_senders": ALLOWED_SENDERS} | |
| # Extract basic info | |
| email_info = _extract_email_info(message) | |
| # Try to extract body content | |
| body_text = "" | |
| try: | |
| payload = message.get("payload", {}) | |
| if "parts" in payload: | |
| # Multipart message | |
| for part in payload["parts"]: | |
| if part.get("mimeType") == "text/plain": | |
| body_data = part.get("body", {}).get("data", "") | |
| if body_data: | |
| body_text = base64.urlsafe_b64decode(body_data).decode("utf-8") | |
| break | |
| else: | |
| # Simple message | |
| body_data = payload.get("body", {}).get("data", "") | |
| if body_data: | |
| body_text = base64.urlsafe_b64decode(body_data).decode("utf-8") | |
| except Exception as e: | |
| print(f"Could not extract email body: {str(e)}") | |
| body_text = "Error extracting email body" | |
| # Limit body text to prevent token overflow | |
| if len(body_text) > 2000: | |
| body_text = body_text[:2000] + "... [content truncated]" | |
| print("Email body truncated to prevent token overflow") | |
| result = {**email_info, "body": body_text} | |
| print(f"Successfully read email: {email_info['subject']}") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Failed to read email {message_id}: {str(e)}" | |
| print(error_msg) | |
| return {"error": error_msg} | |
| # Legacy functions for backward compatibility | |
| def send_email( | |
| to: List[str], | |
| subject: str, | |
| body: str, | |
| html_body: Optional[str] = None, | |
| cc: Optional[List[str]] = None, | |
| bcc: Optional[List[str]] = None, | |
| ) -> Dict[str, Any]: | |
| """Send email - currently not implemented for security""" | |
| return {"error": "Email sending not implemented for security reasons"} | |
| def search_emails(query: str, max_results: int = 10) -> Dict[str, Any]: | |
| """Legacy search function - use search_emails_simple instead""" | |
| results = search_emails_simple(query, max_results) | |
| return {"messages": [{"id": r["id"]} for r in results if "id" in r], "resultSizeEstimate": len(results)} | |
| def read_email(message_id: str, format: str = "full") -> Dict[str, Any]: | |
| """Legacy read function - use read_email_content instead""" | |
| return read_email_content(message_id) | |