Spaces:
Sleeping
Sleeping
| """ | |
| OAuth2 Device Authorization Grant flow client for browser-use. | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import shutil | |
| import time | |
| from datetime import datetime | |
| import httpx | |
| from pydantic import BaseModel | |
| from uuid_extensions import uuid7str | |
| from browser_use.config import CONFIG | |
| # Temporary user ID for pre-auth events (matches cloud backend) | |
| TEMP_USER_ID = '99999999-9999-9999-9999-999999999999' | |
| def get_or_create_device_id() -> str: | |
| """Get or create a persistent device ID for this installation.""" | |
| device_id_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'device_id' | |
| # Try to read existing device ID | |
| if device_id_path.exists(): | |
| try: | |
| device_id = device_id_path.read_text().strip() | |
| if device_id: # Make sure it's not empty | |
| return device_id | |
| except Exception: | |
| # If we can't read it, we'll create a new one | |
| pass | |
| # Create new device ID | |
| device_id = uuid7str() | |
| # Ensure config directory exists | |
| CONFIG.BROWSER_USE_CONFIG_DIR.mkdir(parents=True, exist_ok=True) | |
| # Write device ID to file | |
| device_id_path.write_text(device_id) | |
| return device_id | |
| class CloudAuthConfig(BaseModel): | |
| """Configuration for cloud authentication""" | |
| api_token: str | None = None | |
| user_id: str | None = None | |
| authorized_at: datetime | None = None | |
| def load_from_file(cls) -> 'CloudAuthConfig': | |
| """Load auth config from local file""" | |
| config_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'cloud_auth.json' | |
| if config_path.exists(): | |
| try: | |
| with open(config_path) as f: | |
| data = json.load(f) | |
| return cls.model_validate(data) | |
| except Exception: | |
| # Return empty config if file is corrupted | |
| pass | |
| return cls() | |
| def save_to_file(self) -> None: | |
| """Save auth config to local file""" | |
| CONFIG.BROWSER_USE_CONFIG_DIR.mkdir(parents=True, exist_ok=True) | |
| config_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'cloud_auth.json' | |
| with open(config_path, 'w') as f: | |
| json.dump(self.model_dump(mode='json'), f, indent=2, default=str) | |
| # Set restrictive permissions (owner read/write only) for security | |
| try: | |
| os.chmod(config_path, 0o600) | |
| except Exception: | |
| # Some systems may not support chmod, continue anyway | |
| pass | |
| class DeviceAuthClient: | |
| """Client for OAuth2 device authorization flow""" | |
| def __init__(self, base_url: str | None = None, http_client: httpx.AsyncClient | None = None): | |
| # Backend API URL for OAuth requests - can be passed directly or defaults to env var | |
| self.base_url = base_url or CONFIG.BROWSER_USE_CLOUD_API_URL | |
| self.client_id = 'library' | |
| self.scope = 'read write' | |
| # If no client provided, we'll create one per request | |
| self.http_client = http_client | |
| # Temporary user ID for pre-auth events | |
| self.temp_user_id = TEMP_USER_ID | |
| # Get or create persistent device ID | |
| self.device_id = get_or_create_device_id() | |
| # Load existing auth if available | |
| self.auth_config = CloudAuthConfig.load_from_file() | |
| def is_authenticated(self) -> bool: | |
| """Check if we have valid authentication""" | |
| return bool(self.auth_config.api_token and self.auth_config.user_id) | |
| def api_token(self) -> str | None: | |
| """Get the current API token""" | |
| return self.auth_config.api_token | |
| def user_id(self) -> str: | |
| """Get the current user ID (temporary or real)""" | |
| return self.auth_config.user_id or self.temp_user_id | |
| async def start_device_authorization( | |
| self, | |
| agent_session_id: str | None = None, | |
| ) -> dict: | |
| """ | |
| Start the device authorization flow. | |
| Returns device authorization details including user code and verification URL. | |
| """ | |
| if self.http_client: | |
| response = await self.http_client.post( | |
| f'{self.base_url.rstrip("/")}/api/v1/oauth/device/authorize', | |
| data={ | |
| 'client_id': self.client_id, | |
| 'scope': self.scope, | |
| 'agent_session_id': agent_session_id or '', | |
| 'device_id': self.device_id, | |
| }, | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| else: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f'{self.base_url.rstrip("/")}/api/v1/oauth/device/authorize', | |
| data={ | |
| 'client_id': self.client_id, | |
| 'scope': self.scope, | |
| 'agent_session_id': agent_session_id or '', | |
| 'device_id': self.device_id, | |
| }, | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| async def poll_for_token( | |
| self, | |
| device_code: str, | |
| interval: float = 3.0, | |
| timeout: float = 1800.0, | |
| ) -> dict | None: | |
| """ | |
| Poll for the access token. | |
| Returns token info when authorized, None if timeout. | |
| """ | |
| start_time = time.time() | |
| if self.http_client: | |
| # Use injected client for all requests | |
| while time.time() - start_time < timeout: | |
| try: | |
| response = await self.http_client.post( | |
| f'{self.base_url.rstrip("/")}/api/v1/oauth/device/token', | |
| data={ | |
| 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', | |
| 'device_code': device_code, | |
| 'client_id': self.client_id, | |
| }, | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Check for pending authorization | |
| if data.get('error') == 'authorization_pending': | |
| await asyncio.sleep(interval) | |
| continue | |
| # Check for slow down | |
| if data.get('error') == 'slow_down': | |
| interval = data.get('interval', interval * 2) | |
| await asyncio.sleep(interval) | |
| continue | |
| # Check for other errors | |
| if 'error' in data: | |
| print(f'Error: {data.get("error_description", data["error"])}') | |
| return None | |
| # Success! We have a token | |
| if 'access_token' in data: | |
| return data | |
| elif response.status_code == 400: | |
| # Error response | |
| data = response.json() | |
| if data.get('error') not in ['authorization_pending', 'slow_down']: | |
| print(f'Error: {data.get("error_description", "Unknown error")}') | |
| return None | |
| else: | |
| print(f'Unexpected status code: {response.status_code}') | |
| return None | |
| except Exception as e: | |
| print(f'Error polling for token: {e}') | |
| await asyncio.sleep(interval) | |
| else: | |
| # Create a new client for polling | |
| async with httpx.AsyncClient() as client: | |
| while time.time() - start_time < timeout: | |
| try: | |
| response = await client.post( | |
| f'{self.base_url.rstrip("/")}/api/v1/oauth/device/token', | |
| data={ | |
| 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', | |
| 'device_code': device_code, | |
| 'client_id': self.client_id, | |
| }, | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Check for pending authorization | |
| if data.get('error') == 'authorization_pending': | |
| await asyncio.sleep(interval) | |
| continue | |
| # Check for slow down | |
| if data.get('error') == 'slow_down': | |
| interval = data.get('interval', interval * 2) | |
| await asyncio.sleep(interval) | |
| continue | |
| # Check for other errors | |
| if 'error' in data: | |
| print(f'Error: {data.get("error_description", data["error"])}') | |
| return None | |
| # Success! We have a token | |
| if 'access_token' in data: | |
| return data | |
| elif response.status_code == 400: | |
| # Error response | |
| data = response.json() | |
| if data.get('error') not in ['authorization_pending', 'slow_down']: | |
| print(f'Error: {data.get("error_description", "Unknown error")}') | |
| return None | |
| else: | |
| print(f'Unexpected status code: {response.status_code}') | |
| return None | |
| except Exception as e: | |
| print(f'Error polling for token: {e}') | |
| await asyncio.sleep(interval) | |
| return None | |
| async def authenticate( | |
| self, | |
| agent_session_id: str | None = None, | |
| show_instructions: bool = True, | |
| ) -> bool: | |
| """ | |
| Run the full authentication flow. | |
| Returns True if authentication successful. | |
| """ | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| try: | |
| # Start device authorization | |
| device_auth = await self.start_device_authorization(agent_session_id) | |
| # Use frontend URL for user-facing links | |
| frontend_url = CONFIG.BROWSER_USE_CLOUD_UI_URL or self.base_url.replace('//api.', '//cloud.') | |
| # Replace backend URL with frontend URL in verification URIs | |
| verification_uri = device_auth['verification_uri'].replace(self.base_url, frontend_url) | |
| verification_uri_complete = device_auth['verification_uri_complete'].replace(self.base_url, frontend_url) | |
| terminal_width, _terminal_height = shutil.get_terminal_size((80, 20)) | |
| if show_instructions and CONFIG.BROWSER_USE_CLOUD_SYNC: | |
| logger.info('β' * max(terminal_width - 40, 20)) | |
| logger.info('π View the details of this run in Browser Use Cloud:') | |
| logger.info(f' π {verification_uri_complete}') | |
| logger.info('β' * max(terminal_width - 40, 20) + '\n') | |
| # Poll for token | |
| token_data = await self.poll_for_token( | |
| device_code=device_auth['device_code'], | |
| interval=device_auth.get('interval', 5), | |
| ) | |
| if token_data and token_data.get('access_token'): | |
| # Save authentication | |
| self.auth_config.api_token = token_data['access_token'] | |
| self.auth_config.user_id = token_data.get('user_id', self.temp_user_id) | |
| self.auth_config.authorized_at = datetime.now() | |
| self.auth_config.save_to_file() | |
| if show_instructions: | |
| logger.debug('β Authentication successful! Cloud sync is now enabled with your browser-use account.') | |
| return True | |
| except httpx.HTTPStatusError as e: | |
| # HTTP error with response | |
| if e.response.status_code == 404: | |
| logger.warning( | |
| 'Cloud sync authentication endpoint not found (404). Check your BROWSER_USE_CLOUD_API_URL setting.' | |
| ) | |
| else: | |
| logger.warning(f'Failed to authenticate with cloud service: HTTP {e.response.status_code} - {e.response.text}') | |
| except httpx.RequestError as e: | |
| # Connection/network errors | |
| # logger.warning(f'Failed to connect to cloud service: {type(e).__name__}: {e}') | |
| pass | |
| except Exception as e: | |
| # Other unexpected errors | |
| logger.warning(f'β Unexpected error during cloud sync authentication: {type(e).__name__}: {e}') | |
| if show_instructions: | |
| logger.debug(f'β Sync authentication failed or timed out with {CONFIG.BROWSER_USE_CLOUD_API_URL}') | |
| return False | |
| def get_headers(self) -> dict: | |
| """Get headers for API requests""" | |
| if self.api_token: | |
| return {'Authorization': f'Bearer {self.api_token}'} | |
| return {} | |
| def clear_auth(self) -> None: | |
| """Clear stored authentication""" | |
| self.auth_config = CloudAuthConfig() | |
| # Remove the config file entirely instead of saving empty values | |
| config_path = CONFIG.BROWSER_USE_CONFIG_DIR / 'cloud_auth.json' | |
| config_path.unlink(missing_ok=True) | |