HMM / browser-use-main /tests /ci /evaluate_tasks.py
Speedofmastery's picture
Merge Landrun + Browser-Use + Chromium with AI agent support (without binary files)
d7b3d84
"""
Runs all agent tasks in parallel (up to 10 at a time) using separate subprocesses.
Each task gets its own Python process, preventing browser session interference.
Fails with exit code 1 if 0% of tasks pass.
"""
import argparse
import asyncio
import glob
import json
import logging
import os
import sys
import warnings
import anyio
import yaml
from dotenv import load_dotenv
from pydantic import BaseModel
load_dotenv()
from browser_use import Agent, AgentHistoryList, BrowserProfile, BrowserSession, ChatBrowserUse
from browser_use.llm.google.chat import ChatGoogle
from browser_use.llm.messages import UserMessage
# --- CONFIG ---
MAX_PARALLEL = 10
TASK_DIR = (
sys.argv[1]
if len(sys.argv) > 1 and not sys.argv[1].startswith('--')
else os.path.join(os.path.dirname(__file__), '../agent_tasks')
)
TASK_FILES = glob.glob(os.path.join(TASK_DIR, '*.yaml'))
class JudgeResponse(BaseModel):
success: bool
explanation: str
async def run_single_task(task_file):
"""Run a single task in the current process (called by subprocess)"""
try:
print(f'[DEBUG] Starting task: {os.path.basename(task_file)}', file=sys.stderr)
# Suppress all logging in subprocess to avoid interfering with JSON output
logging.getLogger().setLevel(logging.CRITICAL)
for logger_name in ['browser_use', 'telemetry', 'message_manager']:
logging.getLogger(logger_name).setLevel(logging.CRITICAL)
warnings.filterwarnings('ignore')
print('[DEBUG] Loading task file...', file=sys.stderr)
content = await anyio.Path(task_file).read_text()
task_data = yaml.safe_load(content)
task = task_data['task']
judge_context = task_data.get('judge_context', ['The agent must solve the task'])
max_steps = task_data.get('max_steps', 15)
print(f'[DEBUG] Task: {task[:100]}...', file=sys.stderr)
print(f'[DEBUG] Max steps: {max_steps}', file=sys.stderr)
api_key = os.getenv('BROWSER_USE_API_KEY')
if not api_key:
print('[SKIP] BROWSER_USE_API_KEY is not set - skipping task evaluation', file=sys.stderr)
return {
'file': os.path.basename(task_file),
'success': True, # Mark as success so it doesn't fail CI
'explanation': 'Skipped - API key not available (fork PR or missing secret)',
}
agent_llm = ChatBrowserUse(api_key=api_key)
# Check if Google API key is available for judge LLM
google_api_key = os.getenv('GOOGLE_API_KEY')
if not google_api_key:
print('[SKIP] GOOGLE_API_KEY is not set - skipping task evaluation', file=sys.stderr)
return {
'file': os.path.basename(task_file),
'success': True, # Mark as success so it doesn't fail CI
'explanation': 'Skipped - Google API key not available (fork PR or missing secret)',
}
judge_llm = ChatGoogle(model='gemini-flash-lite-latest')
print('[DEBUG] LLMs initialized', file=sys.stderr)
# Each subprocess gets its own profile and session
print('[DEBUG] Creating browser session...', file=sys.stderr)
profile = BrowserProfile(
headless=True,
user_data_dir=None,
chromium_sandbox=False, # Disable sandbox for CI environment (GitHub Actions)
)
session = BrowserSession(browser_profile=profile)
print('[DEBUG] Browser session created', file=sys.stderr)
# Test if browser is working
try:
await session.start()
from browser_use.browser.events import NavigateToUrlEvent
event = session.event_bus.dispatch(NavigateToUrlEvent(url='https://httpbin.org/get', new_tab=True))
await event
print('[DEBUG] Browser test: navigation successful', file=sys.stderr)
title = await session.get_current_page_title()
print(f"[DEBUG] Browser test: got title '{title}'", file=sys.stderr)
except Exception as browser_error:
print(f'[DEBUG] Browser test failed: {str(browser_error)}', file=sys.stderr)
print(
f'[DEBUG] Browser error type: {type(browser_error).__name__}',
file=sys.stderr,
)
print('[DEBUG] Starting agent execution...', file=sys.stderr)
agent = Agent(task=task, llm=agent_llm, browser_session=session)
try:
history: AgentHistoryList = await agent.run(max_steps=max_steps)
print('[DEBUG] Agent.run() returned successfully', file=sys.stderr)
except Exception as agent_error:
print(
f'[DEBUG] Agent.run() failed with error: {str(agent_error)}',
file=sys.stderr,
)
print(f'[DEBUG] Error type: {type(agent_error).__name__}', file=sys.stderr)
# Re-raise to be caught by outer try-catch
raise agent_error
agent_output = history.final_result() or ''
print('[DEBUG] Agent execution completed', file=sys.stderr)
# Test if LLM is working by making a simple call
try:
response = await agent_llm.ainvoke([UserMessage(content="Say 'test'")])
print(
f'[DEBUG] LLM test call successful: {response.completion[:50]}',
file=sys.stderr,
)
except Exception as llm_error:
print(f'[DEBUG] LLM test call failed: {str(llm_error)}', file=sys.stderr)
# Debug: capture more details about the agent execution
total_steps = len(history.history) if hasattr(history, 'history') else 0
last_action = history.history[-1] if hasattr(history, 'history') and history.history else None
debug_info = f'Steps: {total_steps}, Final result length: {len(agent_output)}'
if last_action:
debug_info += f', Last action: {type(last_action).__name__}'
# Log to stderr so it shows up in GitHub Actions (won't interfere with JSON output to stdout)
print(f'[DEBUG] Task {os.path.basename(task_file)}: {debug_info}', file=sys.stderr)
if agent_output:
print(
f'[DEBUG] Agent output preview: {agent_output[:200]}...',
file=sys.stderr,
)
else:
print('[DEBUG] Agent produced no output!', file=sys.stderr)
criteria = '\n- '.join(judge_context)
judge_prompt = f"""
You are a evaluator of a browser agent task inside a ci/cd pipeline. Here was the agent's task:
{task}
Here is the agent's output:
{agent_output if agent_output else '[No output provided]'}
Debug info: {debug_info}
Criteria for success:
- {criteria}
Reply in JSON with keys: success (true/false), explanation (string).
If the agent provided no output, explain what might have gone wrong.
"""
response = await judge_llm.ainvoke([UserMessage(content=judge_prompt)], output_format=JudgeResponse)
judge_response = response.completion
result = {
'file': os.path.basename(task_file),
'success': judge_response.success,
'explanation': judge_response.explanation,
}
# Clean up session before returning
await session.kill()
return result
except Exception as e:
# Ensure session cleanup even on error
try:
await session.kill()
except Exception:
pass
return {
'file': os.path.basename(task_file),
'success': False,
'explanation': f'Task failed with error: {str(e)}',
}
async def run_task_subprocess(task_file, semaphore):
"""Run a task in a separate subprocess"""
async with semaphore:
try:
# Set environment to reduce noise in subprocess
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(sys.path)
proc = await asyncio.create_subprocess_exec(
sys.executable,
__file__,
'--task',
task_file,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
try:
# Parse JSON result from subprocess
stdout_text = stdout.decode().strip()
stderr_text = stderr.decode().strip()
# Display subprocess debug logs
if stderr_text:
print(f'[SUBPROCESS {os.path.basename(task_file)}] Debug output:')
for line in stderr_text.split('\n'):
if line.strip():
print(f' {line}')
# Find the JSON line (should be the last line that starts with {)
lines = stdout_text.split('\n')
json_line = None
for line in reversed(lines):
line = line.strip()
if line.startswith('{') and line.endswith('}'):
json_line = line
break
if json_line:
result = json.loads(json_line)
print(f'[PARENT] Task {os.path.basename(task_file)} completed: {result["success"]}')
else:
raise ValueError(f'No JSON found in output: {stdout_text}')
except (json.JSONDecodeError, ValueError) as e:
result = {
'file': os.path.basename(task_file),
'success': False,
'explanation': f'Failed to parse subprocess result: {str(e)[:100]}',
}
print(f'[PARENT] Task {os.path.basename(task_file)} failed to parse: {str(e)}')
print(f'[PARENT] Full stdout was: {stdout.decode()[:500]}')
else:
stderr_text = stderr.decode().strip()
result = {
'file': os.path.basename(task_file),
'success': False,
'explanation': f'Subprocess failed (code {proc.returncode}): {stderr_text[:200]}',
}
print(f'[PARENT] Task {os.path.basename(task_file)} subprocess failed with code {proc.returncode}')
if stderr_text:
print(f'[PARENT] stderr: {stderr_text[:1000]}')
stdout_text = stdout.decode().strip()
if stdout_text:
print(f'[PARENT] stdout: {stdout_text[:1000]}')
except Exception as e:
result = {
'file': os.path.basename(task_file),
'success': False,
'explanation': f'Failed to start subprocess: {str(e)}',
}
print(f'[PARENT] Failed to start subprocess for {os.path.basename(task_file)}: {str(e)}')
return result
async def main():
"""Run all tasks in parallel using subprocesses"""
semaphore = asyncio.Semaphore(MAX_PARALLEL)
print(f'Found task files: {TASK_FILES}')
if not TASK_FILES:
print('No task files found!')
return 0, 0
# Run all tasks in parallel subprocesses
tasks = [run_task_subprocess(task_file, semaphore) for task_file in TASK_FILES]
results = await asyncio.gather(*tasks)
passed = sum(1 for r in results if r['success'])
total = len(results)
print('\n' + '=' * 60)
print(f'{"RESULTS":^60}\n')
# Prepare table data
headers = ['Task', 'Success', 'Reason']
rows = []
for r in results:
status = '✅' if r['success'] else '❌'
rows.append([r['file'], status, r['explanation']])
# Calculate column widths
col_widths = [max(len(str(row[i])) for row in ([headers] + rows)) for i in range(3)]
# Print header
header_row = ' | '.join(headers[i].ljust(col_widths[i]) for i in range(3))
print(header_row)
print('-+-'.join('-' * w for w in col_widths))
# Print rows
for row in rows:
print(' | '.join(str(row[i]).ljust(col_widths[i]) for i in range(3)))
print('\n' + '=' * 60)
print(f'\n{"SCORE":^60}')
print(f'\n{"=" * 60}\n')
print(f'\n{"*" * 10} {passed}/{total} PASSED {"*" * 10}\n')
print('=' * 60 + '\n')
# Output results for GitHub Actions
print(f'PASSED={passed}')
print(f'TOTAL={total}')
# Output detailed results as JSON for GitHub Actions
detailed_results = []
for r in results:
detailed_results.append(
{
'task': r['file'].replace('.yaml', ''),
'success': r['success'],
'reason': r['explanation'],
}
)
print('DETAILED_RESULTS=' + json.dumps(detailed_results))
return passed, total
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--task', type=str, help='Path to a single task YAML file (for subprocess mode)')
args = parser.parse_args()
if args.task:
# Subprocess mode: run a single task and output ONLY JSON
try:
result = asyncio.run(run_single_task(args.task))
# Output ONLY the JSON result, nothing else
print(json.dumps(result))
except Exception as e:
# Even on critical failure, output valid JSON
error_result = {
'file': os.path.basename(args.task),
'success': False,
'explanation': f'Critical subprocess error: {str(e)}',
}
print(json.dumps(error_result))
else:
# Parent process mode: run all tasks in parallel subprocesses
passed, total = asyncio.run(main())
# Results already printed by main() function
# Fail if 0% pass rate (all tasks failed)
if total > 0 and passed == 0:
print('\n❌ CRITICAL: 0% pass rate - all tasks failed!')
sys.exit(1)