Spaces:
Sleeping
Sleeping
File size: 5,591 Bytes
d7b3d84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
"""
Cloud Example 1: Your First Browser Use Cloud Task
==================================================
This example demonstrates the most basic Browser Use Cloud functionality:
- Create a simple automation task
- Get the task ID
- Monitor completion
- Retrieve results
Perfect for first-time cloud users to understand the API basics.
Cost: ~$0.04 (1 task + 3 steps with GPT-4.1 mini)
"""
import os
import time
from typing import Any
import requests
from requests.exceptions import RequestException
# Configuration
API_KEY = os.getenv('BROWSER_USE_API_KEY')
if not API_KEY:
raise ValueError(
'Please set BROWSER_USE_API_KEY environment variable. You can also create an API key at https://cloud.browser-use.com/new-api-key'
)
BASE_URL = os.getenv('BROWSER_USE_BASE_URL', 'https://api.browser-use.com/api/v1')
TIMEOUT = int(os.getenv('BROWSER_USE_TIMEOUT', '30'))
HEADERS = {'Authorization': f'Bearer {API_KEY}', 'Content-Type': 'application/json'}
def _request_with_retry(method: str, url: str, **kwargs) -> requests.Response:
"""Make HTTP request with timeout and retry logic."""
kwargs.setdefault('timeout', TIMEOUT)
for attempt in range(3):
try:
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response
except RequestException as e:
if attempt == 2: # Last attempt
raise
sleep_time = 2**attempt
print(f'β οΈ Request failed (attempt {attempt + 1}/3), retrying in {sleep_time}s: {e}')
time.sleep(sleep_time)
# This line should never be reached, but satisfies type checker
raise RuntimeError('Unexpected error in retry logic')
def create_task(instructions: str) -> str:
"""
Create a new browser automation task.
Args:
instructions: Natural language description of what the agent should do
Returns:
task_id: Unique identifier for the created task
"""
print(f'π Creating task: {instructions}')
payload = {
'task': instructions,
'llm_model': 'gpt-4.1-mini', # Cost-effective model
'max_agent_steps': 10, # Prevent runaway costs
'enable_public_share': True, # Enable shareable execution URLs
}
response = _request_with_retry('post', f'{BASE_URL}/run-task', headers=HEADERS, json=payload)
task_id = response.json()['id']
print(f'β
Task created with ID: {task_id}')
return task_id
def get_task_status(task_id: str) -> dict[str, Any]:
"""Get the current status of a task."""
response = _request_with_retry('get', f'{BASE_URL}/task/{task_id}/status', headers=HEADERS)
return response.json()
def get_task_details(task_id: str) -> dict[str, Any]:
"""Get full task details including steps and output."""
response = _request_with_retry('get', f'{BASE_URL}/task/{task_id}', headers=HEADERS)
return response.json()
def wait_for_completion(task_id: str, poll_interval: int = 3) -> dict[str, Any]:
"""
Wait for task completion and show progress.
Args:
task_id: The task to monitor
poll_interval: How often to check status (seconds)
Returns:
Complete task details with output
"""
print(f'β³ Monitoring task {task_id}...')
step_count = 0
start_time = time.time()
while True:
details = get_task_details(task_id)
status = details['status']
current_steps = len(details.get('steps', []))
elapsed = time.time() - start_time
# Clear line and show current progress
if current_steps > step_count:
step_count = current_steps
# Build status message
if status == 'running':
if current_steps > 0:
status_msg = f'π Step {current_steps} | β±οΈ {elapsed:.0f}s | π€ Agent working...'
else:
status_msg = f'π€ Agent starting... | β±οΈ {elapsed:.0f}s'
else:
status_msg = f'π Step {current_steps} | β±οΈ {elapsed:.0f}s | Status: {status}'
# Clear line and print status
print(f'\r{status_msg:<80}', end='', flush=True)
# Check if finished
if status == 'finished':
print(f'\rβ
Task completed successfully! ({current_steps} steps in {elapsed:.1f}s)' + ' ' * 20)
return details
elif status in ['failed', 'stopped']:
print(f'\rβ Task {status} after {current_steps} steps' + ' ' * 30)
return details
time.sleep(poll_interval)
def main():
"""Run a basic cloud automation task."""
print('π Browser Use Cloud - Basic Task Example')
print('=' * 50)
# Define a simple search task (using DuckDuckGo to avoid captchas)
task_description = (
"Go to DuckDuckGo and search for 'browser automation tools'. Tell me the top 3 results with their titles and URLs."
)
try:
# Step 1: Create the task
task_id = create_task(task_description)
# Step 2: Wait for completion
result = wait_for_completion(task_id)
# Step 3: Display results
print('\nπ Results:')
print('-' * 30)
print(f'Status: {result["status"]}')
print(f'Steps taken: {len(result.get("steps", []))}')
if result.get('output'):
print(f'Output: {result["output"]}')
else:
print('No output available')
# Show share URLs for viewing execution
if result.get('live_url'):
print(f'\nπ Live Preview: {result["live_url"]}')
if result.get('public_share_url'):
print(f'π Share URL: {result["public_share_url"]}')
elif result.get('share_url'):
print(f'π Share URL: {result["share_url"]}')
if not result.get('live_url') and not result.get('public_share_url') and not result.get('share_url'):
print("\nπ‘ Tip: Add 'enable_public_share': True to task payload to get shareable URLs")
except requests.exceptions.RequestException as e:
print(f'β API Error: {e}')
except Exception as e:
print(f'β Error: {e}')
if __name__ == '__main__':
main()
|