File size: 11,549 Bytes
d7b3d84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
"""
Gmail 2FA Integration Example with Grant Mechanism
This example demonstrates how to use the Gmail integration for handling 2FA codes
during web automation with a robust credential grant and re-authentication system.

Features:
- Automatic credential validation and setup
- Interactive OAuth grant flow when credentials are missing/invalid
- Fallback re-authentication mechanisms
- Clear error handling and user guidance

Setup:
1. Enable Gmail API in Google Cloud Console
2. Create OAuth 2.0 credentials and download JSON
3. Save credentials as ~/.config/browseruse/gmail_credentials.json
4. Run this example - it will guide you through OAuth setup if needed
"""

import asyncio
import json
import os
import sys

from dotenv import load_dotenv

# Add the parent directory to the path so we can import browser_use
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

load_dotenv()

from browser_use import Agent, ChatOpenAI, Tools
from browser_use.config import CONFIG
from browser_use.integrations.gmail import GmailService, register_gmail_actions


class GmailGrantManager:
	"""
	Manages Gmail OAuth credential grants and authentication flows.
	Provides a robust mechanism for setting up and maintaining Gmail API access.
	"""

	def __init__(self):
		self.config_dir = CONFIG.BROWSER_USE_CONFIG_DIR
		self.credentials_file = self.config_dir / 'gmail_credentials.json'
		self.token_file = self.config_dir / 'gmail_token.json'
		print(f'GmailGrantManager initialized with config_dir: {self.config_dir}')
		print(f'GmailGrantManager initialized with credentials_file: {self.credentials_file}')
		print(f'GmailGrantManager initialized with token_file: {self.token_file}')

	def check_credentials_exist(self) -> bool:
		"""Check if OAuth credentials file exists."""
		return self.credentials_file.exists()

	def check_token_exists(self) -> bool:
		"""Check if saved token file exists."""
		return self.token_file.exists()

	def validate_credentials_format(self) -> tuple[bool, str]:
		"""
		Validate that the credentials file has the correct format.
		Returns (is_valid, error_message)
		"""
		if not self.check_credentials_exist():
			return False, 'Credentials file not found'

		try:
			with open(self.credentials_file) as f:
				creds = json.load(f)

			# Accept if either 'web' or 'installed' section exists and is not empty
			if creds.get('web') or creds.get('installed'):
				return True, 'Credentials file is valid'
			return False, "Invalid credentials format - neither 'web' nor 'installed' sections found"

		except json.JSONDecodeError:
			return False, 'Credentials file is not valid JSON'
		except Exception as e:
			return False, f'Error reading credentials file: {e}'

	async def setup_oauth_credentials(self) -> bool:
		"""
		Guide user through OAuth credentials setup process.
		Returns True if setup is successful.
		"""
		print('\nπŸ” Gmail OAuth Credentials Setup Required')
		print('=' * 50)

		if not self.check_credentials_exist():
			print('❌ Gmail credentials file not found')
		else:
			is_valid, error = self.validate_credentials_format()
			if not is_valid:
				print(f'❌ Gmail credentials file is invalid: {error}')

		print('\nπŸ“‹ To set up Gmail API access:')
		print('1. Go to https://console.cloud.google.com/')
		print('2. Create a new project or select an existing one')
		print('3. Enable the Gmail API:')
		print('   - Go to "APIs & Services" > "Library"')
		print('   - Search for "Gmail API" and enable it')
		print('4. Create OAuth 2.0 credentials:')
		print('   - Go to "APIs & Services" > "Credentials"')
		print('   - Click "Create Credentials" > "OAuth client ID"')
		print('   - Choose "Desktop application"')
		print('   - Download the JSON file')
		print(f'5. Save the JSON file as: {self.credentials_file}')
		print(f'6. Ensure the directory exists: {self.config_dir}')

		# Create config directory if it doesn't exist
		self.config_dir.mkdir(parents=True, exist_ok=True)
		print(f'\nβœ… Created config directory: {self.config_dir}')

		# Wait for user to set up credentials
		while True:
			user_input = input('\n❓ Have you saved the credentials file? (y/n/skip): ').lower().strip()

			if user_input == 'skip':
				print('⏭️  Skipping credential validation for now')
				return False
			elif user_input == 'y':
				if self.check_credentials_exist():
					is_valid, error = self.validate_credentials_format()
					if is_valid:
						print('βœ… Credentials file found and validated!')
						return True
					else:
						print(f'❌ Credentials file is invalid: {error}')
						print('Please check the file format and try again.')
				else:
					print(f'❌ Credentials file still not found at: {self.credentials_file}')
			elif user_input == 'n':
				print('⏳ Please complete the setup steps above and try again.')
			else:
				print('Please enter y, n, or skip')

	async def test_authentication(self, gmail_service: GmailService) -> tuple[bool, str]:
		"""
		Test Gmail authentication and return status.
		Returns (success, message)
		"""
		try:
			print('πŸ” Testing Gmail authentication...')
			success = await gmail_service.authenticate()

			if success and gmail_service.is_authenticated():
				print('βœ… Gmail authentication successful!')
				return True, 'Authentication successful'
			else:
				return False, 'Authentication failed - invalid credentials or OAuth flow failed'

		except Exception as e:
			return False, f'Authentication error: {e}'

	async def handle_authentication_failure(self, gmail_service: GmailService, error_msg: str) -> bool:
		"""
		Handle authentication failures with fallback mechanisms.
		Returns True if recovery was successful.
		"""
		print(f'\n❌ Gmail authentication failed: {error_msg}')
		print('\nπŸ”§ Attempting recovery...')

		# Option 1: Try removing old token file
		if self.token_file.exists():
			print('πŸ—‘οΈ  Removing old token file to force re-authentication...')
			try:
				self.token_file.unlink()
				print('βœ… Old token file removed')

				# Try authentication again
				success = await gmail_service.authenticate()
				if success:
					print('βœ… Re-authentication successful!')
					return True
			except Exception as e:
				print(f'❌ Failed to remove token file: {e}')

		# Option 2: Validate and potentially re-setup credentials
		is_valid, cred_error = self.validate_credentials_format()
		if not is_valid:
			print(f'\n❌ Credentials file issue: {cred_error}')
			print('πŸ”§ Initiating credential re-setup...')

			return await self.setup_oauth_credentials()

		# Option 3: Provide manual troubleshooting steps
		print('\nπŸ”§ Manual troubleshooting steps:')
		print('1. Check that Gmail API is enabled in Google Cloud Console')
		print('2. Verify OAuth consent screen is configured')
		print('3. Ensure redirect URIs include http://localhost:8080')
		print('4. Check if credentials file is for the correct project')
		print('5. Try regenerating OAuth credentials in Google Cloud Console')

		retry = input('\n❓ Would you like to retry authentication? (y/n): ').lower().strip()
		if retry == 'y':
			success = await gmail_service.authenticate()
			return success

		return False


async def main():
	print('πŸš€ Gmail 2FA Integration Example with Grant Mechanism')
	print('=' * 60)

	# Initialize grant manager
	grant_manager = GmailGrantManager()

	# Step 1: Check and validate credentials
	print('πŸ” Step 1: Validating Gmail credentials...')

	if not grant_manager.check_credentials_exist():
		print('❌ No Gmail credentials found')
		setup_success = await grant_manager.setup_oauth_credentials()
		if not setup_success:
			print('⏹️  Setup cancelled or failed. Exiting...')
			return
	else:
		is_valid, error = grant_manager.validate_credentials_format()
		if not is_valid:
			print(f'❌ Invalid credentials: {error}')
			setup_success = await grant_manager.setup_oauth_credentials()
			if not setup_success:
				print('⏹️  Setup cancelled or failed. Exiting...')
				return
		else:
			print('βœ… Gmail credentials file found and validated')

	# Step 2: Initialize Gmail service and test authentication
	print('\nπŸ” Step 2: Testing Gmail authentication...')

	gmail_service = GmailService()
	auth_success, auth_message = await grant_manager.test_authentication(gmail_service)

	if not auth_success:
		print(f'❌ Initial authentication failed: {auth_message}')
		recovery_success = await grant_manager.handle_authentication_failure(gmail_service, auth_message)

		if not recovery_success:
			print('❌ Failed to recover Gmail authentication. Please check your setup.')
			return

	# Step 3: Initialize tools with authenticated service
	print('\nπŸ” Step 3: Registering Gmail actions...')

	tools = Tools()
	register_gmail_actions(tools, gmail_service=gmail_service)

	print('βœ… Gmail actions registered with tools')
	print('Available Gmail actions:')
	print('- get_recent_emails: Get recent emails with filtering')
	print()

	# Initialize LLM
	llm = ChatOpenAI(model='gpt-4.1-mini')

	# Step 4: Test Gmail functionality
	print('πŸ” Step 4: Testing Gmail email retrieval...')

	agent = Agent(task='Get recent emails from Gmail to test the integration is working properly', llm=llm, tools=tools)

	try:
		history = await agent.run()
		print('βœ… Gmail email retrieval test completed')
	except Exception as e:
		print(f'❌ Gmail email retrieval test failed: {e}')
		# Try one more recovery attempt
		print('πŸ”§ Attempting final recovery...')
		recovery_success = await grant_manager.handle_authentication_failure(gmail_service, str(e))
		if recovery_success:
			print('βœ… Recovery successful, re-running test...')
			history = await agent.run()
		else:
			print('❌ Final recovery failed. Please check your Gmail API setup.')
			return

	print('\n' + '=' * 60)

	# Step 5: Demonstrate 2FA code finding
	print('πŸ” Step 5: Testing 2FA code detection...')

	agent2 = Agent(
		task='Search for any 2FA verification codes or OTP codes in recent Gmail emails from the last 30 minutes',
		llm=llm,
		tools=tools,
	)

	history2 = await agent2.run()
	print('βœ… 2FA code search completed')

	print('\n' + '=' * 60)

	# Step 6: Simulate complete login flow
	print('πŸ” Step 6: Demonstrating complete 2FA login flow...')

	agent3 = Agent(
		task="""
		Demonstrate a complete 2FA-enabled login flow:
		1. Check for any existing 2FA codes in recent emails
		2. Explain how the agent would handle a typical login:
		   - Navigate to a login page
		   - Enter credentials
		   - Wait for 2FA prompt
		   - Use get_recent_emails to find the verification code
		   - Extract and enter the 2FA code
		3. Show what types of emails and codes can be detected
		""",
		llm=llm,
		tools=tools,
	)

	history3 = await agent3.run()
	print('βœ… Complete 2FA flow demonstration completed')

	print('\n' + '=' * 60)
	print('πŸŽ‰ Gmail 2FA Integration with Grant Mechanism completed successfully!')
	print('\nπŸ’‘ Key features demonstrated:')
	print('- βœ… Automatic credential validation and setup')
	print('- βœ… Robust error handling and recovery mechanisms')
	print('- βœ… Interactive OAuth grant flow')
	print('- βœ… Token refresh and re-authentication')
	print('- βœ… 2FA code detection and extraction')
	print('\nπŸ”§ Grant mechanism benefits:')
	print('- Handles missing or invalid credentials gracefully')
	print('- Provides clear setup instructions')
	print('- Automatically recovers from authentication failures')
	print('- Validates credential format before use')
	print('- Offers multiple fallback options')


if __name__ == '__main__':
	asyncio.run(main())