Spaces:
Sleeping
Sleeping
File size: 10,608 Bytes
d7b3d84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
"""
OAuth2 Device Authorization Grant flow client for browser-use.
"""
import asyncio
import json
import os
import shutil
import time
from datetime import datetime
import httpx
from pydantic import BaseModel
from uuid_extensions import uuid7str
from browser_use.config import CONFIG
# Temporary user ID for pre-auth events (matches cloud backend)
TEMP_USER_ID = '99999999-9999-9999-9999-999999999999'
def get_or_create_device_id() -> str:
"""Get or create a persistent device ID for this installation."""
device_id_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'device_id'
# Try to read existing device ID
if device_id_path.exists():
try:
device_id = device_id_path.read_text().strip()
if device_id: # Make sure it's not empty
return device_id
except Exception:
# If we can't read it, we'll create a new one
pass
# Create new device ID
device_id = uuid7str()
# Ensure config directory exists
CONFIG.BROWSER_USE_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
# Write device ID to file
device_id_path.write_text(device_id)
return device_id
class CloudAuthConfig(BaseModel):
"""Configuration for cloud authentication"""
api_token: str | None = None
user_id: str | None = None
authorized_at: datetime | None = None
@classmethod
def load_from_file(cls) -> 'CloudAuthConfig':
"""Load auth config from local file"""
config_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'cloud_auth.json'
if config_path.exists():
try:
with open(config_path) as f:
data = json.load(f)
return cls.model_validate(data)
except Exception:
# Return empty config if file is corrupted
pass
return cls()
def save_to_file(self) -> None:
"""Save auth config to local file"""
CONFIG.BROWSER_USE_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
config_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'cloud_auth.json'
with open(config_path, 'w') as f:
json.dump(self.model_dump(mode='json'), f, indent=2, default=str)
# Set restrictive permissions (owner read/write only) for security
try:
os.chmod(config_path, 0o600)
except Exception:
# Some systems may not support chmod, continue anyway
pass
class DeviceAuthClient:
"""Client for OAuth2 device authorization flow"""
def __init__(self, base_url: str | None = None, http_client: httpx.AsyncClient | None = None):
# Backend API URL for OAuth requests - can be passed directly or defaults to env var
self.base_url = base_url or CONFIG.BROWSER_USE_CLOUD_API_URL
self.client_id = 'library'
self.scope = 'read write'
# If no client provided, we'll create one per request
self.http_client = http_client
# Temporary user ID for pre-auth events
self.temp_user_id = TEMP_USER_ID
# Get or create persistent device ID
self.device_id = get_or_create_device_id()
# Load existing auth if available
self.auth_config = CloudAuthConfig.load_from_file()
@property
def is_authenticated(self) -> bool:
"""Check if we have valid authentication"""
return bool(self.auth_config.api_token and self.auth_config.user_id)
@property
def api_token(self) -> str | None:
"""Get the current API token"""
return self.auth_config.api_token
@property
def user_id(self) -> str:
"""Get the current user ID (temporary or real)"""
return self.auth_config.user_id or self.temp_user_id
async def start_device_authorization(
self,
agent_session_id: str | None = None,
) -> dict:
"""
Start the device authorization flow.
Returns device authorization details including user code and verification URL.
"""
if self.http_client:
response = await self.http_client.post(
f'{self.base_url.rstrip("/")}/api/v1/oauth/device/authorize',
data={
'client_id': self.client_id,
'scope': self.scope,
'agent_session_id': agent_session_id or '',
'device_id': self.device_id,
},
)
response.raise_for_status()
return response.json()
else:
async with httpx.AsyncClient() as client:
response = await client.post(
f'{self.base_url.rstrip("/")}/api/v1/oauth/device/authorize',
data={
'client_id': self.client_id,
'scope': self.scope,
'agent_session_id': agent_session_id or '',
'device_id': self.device_id,
},
)
response.raise_for_status()
return response.json()
async def poll_for_token(
self,
device_code: str,
interval: float = 3.0,
timeout: float = 1800.0,
) -> dict | None:
"""
Poll for the access token.
Returns token info when authorized, None if timeout.
"""
start_time = time.time()
if self.http_client:
# Use injected client for all requests
while time.time() - start_time < timeout:
try:
response = await self.http_client.post(
f'{self.base_url.rstrip("/")}/api/v1/oauth/device/token',
data={
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'device_code': device_code,
'client_id': self.client_id,
},
)
if response.status_code == 200:
data = response.json()
# Check for pending authorization
if data.get('error') == 'authorization_pending':
await asyncio.sleep(interval)
continue
# Check for slow down
if data.get('error') == 'slow_down':
interval = data.get('interval', interval * 2)
await asyncio.sleep(interval)
continue
# Check for other errors
if 'error' in data:
print(f'Error: {data.get("error_description", data["error"])}')
return None
# Success! We have a token
if 'access_token' in data:
return data
elif response.status_code == 400:
# Error response
data = response.json()
if data.get('error') not in ['authorization_pending', 'slow_down']:
print(f'Error: {data.get("error_description", "Unknown error")}')
return None
else:
print(f'Unexpected status code: {response.status_code}')
return None
except Exception as e:
print(f'Error polling for token: {e}')
await asyncio.sleep(interval)
else:
# Create a new client for polling
async with httpx.AsyncClient() as client:
while time.time() - start_time < timeout:
try:
response = await client.post(
f'{self.base_url.rstrip("/")}/api/v1/oauth/device/token',
data={
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'device_code': device_code,
'client_id': self.client_id,
},
)
if response.status_code == 200:
data = response.json()
# Check for pending authorization
if data.get('error') == 'authorization_pending':
await asyncio.sleep(interval)
continue
# Check for slow down
if data.get('error') == 'slow_down':
interval = data.get('interval', interval * 2)
await asyncio.sleep(interval)
continue
# Check for other errors
if 'error' in data:
print(f'Error: {data.get("error_description", data["error"])}')
return None
# Success! We have a token
if 'access_token' in data:
return data
elif response.status_code == 400:
# Error response
data = response.json()
if data.get('error') not in ['authorization_pending', 'slow_down']:
print(f'Error: {data.get("error_description", "Unknown error")}')
return None
else:
print(f'Unexpected status code: {response.status_code}')
return None
except Exception as e:
print(f'Error polling for token: {e}')
await asyncio.sleep(interval)
return None
async def authenticate(
self,
agent_session_id: str | None = None,
show_instructions: bool = True,
) -> bool:
"""
Run the full authentication flow.
Returns True if authentication successful.
"""
import logging
logger = logging.getLogger(__name__)
try:
# Start device authorization
device_auth = await self.start_device_authorization(agent_session_id)
# Use frontend URL for user-facing links
frontend_url = CONFIG.BROWSER_USE_CLOUD_UI_URL or self.base_url.replace('//api.', '//cloud.')
# Replace backend URL with frontend URL in verification URIs
verification_uri = device_auth['verification_uri'].replace(self.base_url, frontend_url)
verification_uri_complete = device_auth['verification_uri_complete'].replace(self.base_url, frontend_url)
terminal_width, _terminal_height = shutil.get_terminal_size((80, 20))
if show_instructions and CONFIG.BROWSER_USE_CLOUD_SYNC:
logger.info('β' * max(terminal_width - 40, 20))
logger.info('π View the details of this run in Browser Use Cloud:')
logger.info(f' π {verification_uri_complete}')
logger.info('β' * max(terminal_width - 40, 20) + '\n')
# Poll for token
token_data = await self.poll_for_token(
device_code=device_auth['device_code'],
interval=device_auth.get('interval', 5),
)
if token_data and token_data.get('access_token'):
# Save authentication
self.auth_config.api_token = token_data['access_token']
self.auth_config.user_id = token_data.get('user_id', self.temp_user_id)
self.auth_config.authorized_at = datetime.now()
self.auth_config.save_to_file()
if show_instructions:
logger.debug('β
Authentication successful! Cloud sync is now enabled with your browser-use account.')
return True
except httpx.HTTPStatusError as e:
# HTTP error with response
if e.response.status_code == 404:
logger.warning(
'Cloud sync authentication endpoint not found (404). Check your BROWSER_USE_CLOUD_API_URL setting.'
)
else:
logger.warning(f'Failed to authenticate with cloud service: HTTP {e.response.status_code} - {e.response.text}')
except httpx.RequestError as e:
# Connection/network errors
# logger.warning(f'Failed to connect to cloud service: {type(e).__name__}: {e}')
pass
except Exception as e:
# Other unexpected errors
logger.warning(f'β Unexpected error during cloud sync authentication: {type(e).__name__}: {e}')
if show_instructions:
logger.debug(f'β Sync authentication failed or timed out with {CONFIG.BROWSER_USE_CLOUD_API_URL}')
return False
def get_headers(self) -> dict:
"""Get headers for API requests"""
if self.api_token:
return {'Authorization': f'Bearer {self.api_token}'}
return {}
def clear_auth(self) -> None:
"""Clear stored authentication"""
self.auth_config = CloudAuthConfig()
# Remove the config file entirely instead of saving empty values
config_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'cloud_auth.json'
config_path.unlink(missing_ok=True)
|