Spaces:
Sleeping
Sleeping
| """ | |
| Gmail 2FA Integration Example with Grant Mechanism | |
| This example demonstrates how to use the Gmail integration for handling 2FA codes | |
| during web automation with a robust credential grant and re-authentication system. | |
| Features: | |
| - Automatic credential validation and setup | |
| - Interactive OAuth grant flow when credentials are missing/invalid | |
| - Fallback re-authentication mechanisms | |
| - Clear error handling and user guidance | |
| Setup: | |
| 1. Enable Gmail API in Google Cloud Console | |
| 2. Create OAuth 2.0 credentials and download JSON | |
| 3. Save credentials as ~/.config/browseruse/gmail_credentials.json | |
| 4. Run this example - it will guide you through OAuth setup if needed | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| from dotenv import load_dotenv | |
| # Add the parent directory to the path so we can import browser_use | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) | |
| load_dotenv() | |
| from browser_use import Agent, ChatOpenAI, Tools | |
| from browser_use.config import CONFIG | |
| from browser_use.integrations.gmail import GmailService, register_gmail_actions | |
| class GmailGrantManager: | |
| """ | |
| Manages Gmail OAuth credential grants and authentication flows. | |
| Provides a robust mechanism for setting up and maintaining Gmail API access. | |
| """ | |
| def __init__(self): | |
| self.config_dir = CONFIG.BROWSER_USE_CONFIG_DIR | |
| self.credentials_file = self.config_dir / 'gmail_credentials.json' | |
| self.token_file = self.config_dir / 'gmail_token.json' | |
| print(f'GmailGrantManager initialized with config_dir: {self.config_dir}') | |
| print(f'GmailGrantManager initialized with credentials_file: {self.credentials_file}') | |
| print(f'GmailGrantManager initialized with token_file: {self.token_file}') | |
| def check_credentials_exist(self) -> bool: | |
| """Check if OAuth credentials file exists.""" | |
| return self.credentials_file.exists() | |
| def check_token_exists(self) -> bool: | |
| """Check if saved token file exists.""" | |
| return self.token_file.exists() | |
| def validate_credentials_format(self) -> tuple[bool, str]: | |
| """ | |
| Validate that the credentials file has the correct format. | |
| Returns (is_valid, error_message) | |
| """ | |
| if not self.check_credentials_exist(): | |
| return False, 'Credentials file not found' | |
| try: | |
| with open(self.credentials_file) as f: | |
| creds = json.load(f) | |
| # Accept if either 'web' or 'installed' section exists and is not empty | |
| if creds.get('web') or creds.get('installed'): | |
| return True, 'Credentials file is valid' | |
| return False, "Invalid credentials format - neither 'web' nor 'installed' sections found" | |
| except json.JSONDecodeError: | |
| return False, 'Credentials file is not valid JSON' | |
| except Exception as e: | |
| return False, f'Error reading credentials file: {e}' | |
| async def setup_oauth_credentials(self) -> bool: | |
| """ | |
| Guide user through OAuth credentials setup process. | |
| Returns True if setup is successful. | |
| """ | |
| print('\nπ Gmail OAuth Credentials Setup Required') | |
| print('=' * 50) | |
| if not self.check_credentials_exist(): | |
| print('β Gmail credentials file not found') | |
| else: | |
| is_valid, error = self.validate_credentials_format() | |
| if not is_valid: | |
| print(f'β Gmail credentials file is invalid: {error}') | |
| print('\nπ To set up Gmail API access:') | |
| print('1. Go to https://console.cloud.google.com/') | |
| print('2. Create a new project or select an existing one') | |
| print('3. Enable the Gmail API:') | |
| print(' - Go to "APIs & Services" > "Library"') | |
| print(' - Search for "Gmail API" and enable it') | |
| print('4. Create OAuth 2.0 credentials:') | |
| print(' - Go to "APIs & Services" > "Credentials"') | |
| print(' - Click "Create Credentials" > "OAuth client ID"') | |
| print(' - Choose "Desktop application"') | |
| print(' - Download the JSON file') | |
| print(f'5. Save the JSON file as: {self.credentials_file}') | |
| print(f'6. Ensure the directory exists: {self.config_dir}') | |
| # Create config directory if it doesn't exist | |
| self.config_dir.mkdir(parents=True, exist_ok=True) | |
| print(f'\nβ Created config directory: {self.config_dir}') | |
| # Wait for user to set up credentials | |
| while True: | |
| user_input = input('\nβ Have you saved the credentials file? (y/n/skip): ').lower().strip() | |
| if user_input == 'skip': | |
| print('βοΈ Skipping credential validation for now') | |
| return False | |
| elif user_input == 'y': | |
| if self.check_credentials_exist(): | |
| is_valid, error = self.validate_credentials_format() | |
| if is_valid: | |
| print('β Credentials file found and validated!') | |
| return True | |
| else: | |
| print(f'β Credentials file is invalid: {error}') | |
| print('Please check the file format and try again.') | |
| else: | |
| print(f'β Credentials file still not found at: {self.credentials_file}') | |
| elif user_input == 'n': | |
| print('β³ Please complete the setup steps above and try again.') | |
| else: | |
| print('Please enter y, n, or skip') | |
| async def test_authentication(self, gmail_service: GmailService) -> tuple[bool, str]: | |
| """ | |
| Test Gmail authentication and return status. | |
| Returns (success, message) | |
| """ | |
| try: | |
| print('π Testing Gmail authentication...') | |
| success = await gmail_service.authenticate() | |
| if success and gmail_service.is_authenticated(): | |
| print('β Gmail authentication successful!') | |
| return True, 'Authentication successful' | |
| else: | |
| return False, 'Authentication failed - invalid credentials or OAuth flow failed' | |
| except Exception as e: | |
| return False, f'Authentication error: {e}' | |
| async def handle_authentication_failure(self, gmail_service: GmailService, error_msg: str) -> bool: | |
| """ | |
| Handle authentication failures with fallback mechanisms. | |
| Returns True if recovery was successful. | |
| """ | |
| print(f'\nβ Gmail authentication failed: {error_msg}') | |
| print('\nπ§ Attempting recovery...') | |
| # Option 1: Try removing old token file | |
| if self.token_file.exists(): | |
| print('ποΈ Removing old token file to force re-authentication...') | |
| try: | |
| self.token_file.unlink() | |
| print('β Old token file removed') | |
| # Try authentication again | |
| success = await gmail_service.authenticate() | |
| if success: | |
| print('β Re-authentication successful!') | |
| return True | |
| except Exception as e: | |
| print(f'β Failed to remove token file: {e}') | |
| # Option 2: Validate and potentially re-setup credentials | |
| is_valid, cred_error = self.validate_credentials_format() | |
| if not is_valid: | |
| print(f'\nβ Credentials file issue: {cred_error}') | |
| print('π§ Initiating credential re-setup...') | |
| return await self.setup_oauth_credentials() | |
| # Option 3: Provide manual troubleshooting steps | |
| print('\nπ§ Manual troubleshooting steps:') | |
| print('1. Check that Gmail API is enabled in Google Cloud Console') | |
| print('2. Verify OAuth consent screen is configured') | |
| print('3. Ensure redirect URIs include http://localhost:8080') | |
| print('4. Check if credentials file is for the correct project') | |
| print('5. Try regenerating OAuth credentials in Google Cloud Console') | |
| retry = input('\nβ Would you like to retry authentication? (y/n): ').lower().strip() | |
| if retry == 'y': | |
| success = await gmail_service.authenticate() | |
| return success | |
| return False | |
| async def main(): | |
| print('π Gmail 2FA Integration Example with Grant Mechanism') | |
| print('=' * 60) | |
| # Initialize grant manager | |
| grant_manager = GmailGrantManager() | |
| # Step 1: Check and validate credentials | |
| print('π Step 1: Validating Gmail credentials...') | |
| if not grant_manager.check_credentials_exist(): | |
| print('β No Gmail credentials found') | |
| setup_success = await grant_manager.setup_oauth_credentials() | |
| if not setup_success: | |
| print('βΉοΈ Setup cancelled or failed. Exiting...') | |
| return | |
| else: | |
| is_valid, error = grant_manager.validate_credentials_format() | |
| if not is_valid: | |
| print(f'β Invalid credentials: {error}') | |
| setup_success = await grant_manager.setup_oauth_credentials() | |
| if not setup_success: | |
| print('βΉοΈ Setup cancelled or failed. Exiting...') | |
| return | |
| else: | |
| print('β Gmail credentials file found and validated') | |
| # Step 2: Initialize Gmail service and test authentication | |
| print('\nπ Step 2: Testing Gmail authentication...') | |
| gmail_service = GmailService() | |
| auth_success, auth_message = await grant_manager.test_authentication(gmail_service) | |
| if not auth_success: | |
| print(f'β Initial authentication failed: {auth_message}') | |
| recovery_success = await grant_manager.handle_authentication_failure(gmail_service, auth_message) | |
| if not recovery_success: | |
| print('β Failed to recover Gmail authentication. Please check your setup.') | |
| return | |
| # Step 3: Initialize tools with authenticated service | |
| print('\nπ Step 3: Registering Gmail actions...') | |
| tools = Tools() | |
| register_gmail_actions(tools, gmail_service=gmail_service) | |
| print('β Gmail actions registered with tools') | |
| print('Available Gmail actions:') | |
| print('- get_recent_emails: Get recent emails with filtering') | |
| print() | |
| # Initialize LLM | |
| llm = ChatOpenAI(model='gpt-4.1-mini') | |
| # Step 4: Test Gmail functionality | |
| print('π Step 4: Testing Gmail email retrieval...') | |
| agent = Agent(task='Get recent emails from Gmail to test the integration is working properly', llm=llm, tools=tools) | |
| try: | |
| history = await agent.run() | |
| print('β Gmail email retrieval test completed') | |
| except Exception as e: | |
| print(f'β Gmail email retrieval test failed: {e}') | |
| # Try one more recovery attempt | |
| print('π§ Attempting final recovery...') | |
| recovery_success = await grant_manager.handle_authentication_failure(gmail_service, str(e)) | |
| if recovery_success: | |
| print('β Recovery successful, re-running test...') | |
| history = await agent.run() | |
| else: | |
| print('β Final recovery failed. Please check your Gmail API setup.') | |
| return | |
| print('\n' + '=' * 60) | |
| # Step 5: Demonstrate 2FA code finding | |
| print('π Step 5: Testing 2FA code detection...') | |
| agent2 = Agent( | |
| task='Search for any 2FA verification codes or OTP codes in recent Gmail emails from the last 30 minutes', | |
| llm=llm, | |
| tools=tools, | |
| ) | |
| history2 = await agent2.run() | |
| print('β 2FA code search completed') | |
| print('\n' + '=' * 60) | |
| # Step 6: Simulate complete login flow | |
| print('π Step 6: Demonstrating complete 2FA login flow...') | |
| agent3 = Agent( | |
| task=""" | |
| Demonstrate a complete 2FA-enabled login flow: | |
| 1. Check for any existing 2FA codes in recent emails | |
| 2. Explain how the agent would handle a typical login: | |
| - Navigate to a login page | |
| - Enter credentials | |
| - Wait for 2FA prompt | |
| - Use get_recent_emails to find the verification code | |
| - Extract and enter the 2FA code | |
| 3. Show what types of emails and codes can be detected | |
| """, | |
| llm=llm, | |
| tools=tools, | |
| ) | |
| history3 = await agent3.run() | |
| print('β Complete 2FA flow demonstration completed') | |
| print('\n' + '=' * 60) | |
| print('π Gmail 2FA Integration with Grant Mechanism completed successfully!') | |
| print('\nπ‘ Key features demonstrated:') | |
| print('- β Automatic credential validation and setup') | |
| print('- β Robust error handling and recovery mechanisms') | |
| print('- β Interactive OAuth grant flow') | |
| print('- β Token refresh and re-authentication') | |
| print('- β 2FA code detection and extraction') | |
| print('\nπ§ Grant mechanism benefits:') | |
| print('- Handles missing or invalid credentials gracefully') | |
| print('- Provides clear setup instructions') | |
| print('- Automatically recovers from authentication failures') | |
| print('- Validates credential format before use') | |
| print('- Offers multiple fallback options') | |
| if __name__ == '__main__': | |
| asyncio.run(main()) | |