"""Code-use agent service - Jupyter notebook-like code execution for browser automation.""" import asyncio import datetime import logging import re import traceback from pathlib import Path from typing import Any from uuid_extensions import uuid7str from browser_use.browser import BrowserSession from browser_use.browser.profile import BrowserProfile from browser_use.dom.service import DomService from browser_use.filesystem.file_system import FileSystem from browser_use.llm.base import BaseChatModel from browser_use.llm.messages import ( AssistantMessage, BaseMessage, ContentPartImageParam, ContentPartTextParam, ImageURL, UserMessage, ) from browser_use.screenshots.service import ScreenshotService from browser_use.telemetry.service import ProductTelemetry from browser_use.telemetry.views import AgentTelemetryEvent from browser_use.tokens.service import TokenCost from browser_use.tokens.views import UsageSummary from browser_use.tools.service import CodeAgentTools, Tools from browser_use.utils import get_browser_use_version from .formatting import format_browser_state_for_llm from .namespace import EvaluateError, create_namespace from .utils import detect_token_limit_issue, extract_code_blocks, extract_url_from_task, truncate_message_content from .views import ( CodeAgentHistory, CodeAgentModelOutput, CodeAgentResult, CodeAgentState, CodeAgentStepMetadata, ExecutionStatus, NotebookSession, ) logger = logging.getLogger(__name__) class CodeAgent: """ Agent that executes Python code in a notebook-like environment for browser automation. This agent provides a Jupyter notebook-like interface where the LLM writes Python code that gets executed in a persistent namespace with browser control functions available. """ def __init__( self, task: str, # Optional parameters llm: BaseChatModel | None = None, browser_session: BrowserSession | None = None, browser: BrowserSession | None = None, # Alias for browser_session tools: Tools | None = None, controller: Tools | None = None, # Alias for tools # Agent settings page_extraction_llm: BaseChatModel | None = None, file_system: FileSystem | None = None, available_file_paths: list[str] | None = None, sensitive_data: dict[str, str | dict[str, str]] | None = None, max_steps: int = 100, max_failures: int = 8, max_validations: int = 0, use_vision: bool = True, calculate_cost: bool = False, **kwargs, ): """ Initialize the code-use agent. Args: task: The task description for the agent browser_session: Optional browser session (will be created if not provided) [DEPRECATED: use browser] browser: Optional browser session (cleaner API) tools: Optional Tools instance (will create default if not provided) controller: Optional Tools instance page_extraction_llm: Optional LLM for page extraction file_system: Optional file system for file operations available_file_paths: Optional list of available file paths sensitive_data: Optional sensitive data dictionary max_steps: Maximum number of execution steps max_failures: Maximum consecutive errors before termination (default: 8) max_validations: Maximum number of times to run the validator agent (default: 0) use_vision: Whether to include screenshots in LLM messages (default: True) calculate_cost: Whether to calculate token costs (default: False) llm: Optional ChatBrowserUse LLM instance (will create default if not provided) **kwargs: Additional keyword arguments for compatibility (ignored) """ # Log and ignore unknown kwargs for compatibility if kwargs: logger.debug(f'Ignoring additional kwargs for CodeAgent compatibility: {list(kwargs.keys())}') if llm is None: try: from browser_use import ChatBrowserUse llm = ChatBrowserUse() logger.debug('CodeAgent using ChatBrowserUse') except Exception as e: raise RuntimeError(f'Failed to initialize CodeAgent LLM: {e}') if 'ChatBrowserUse' not in llm.__class__.__name__: raise ValueError('This agent works only with ChatBrowserUse.') # Handle browser vs browser_session parameter (browser takes precedence) if browser and browser_session: raise ValueError('Cannot specify both "browser" and "browser_session" parameters. Use "browser" for the cleaner API.') browser_session = browser or browser_session # Handle controller vs tools parameter (controller takes precedence) if controller and tools: raise ValueError('Cannot specify both "controller" and "tools" parameters. Use "controller" for the cleaner API.') tools = controller or tools # Store browser_profile for creating browser session if needed self._browser_profile_for_init = BrowserProfile() if browser_session is None else None self.task = task self.llm = llm self.browser_session = browser_session self.tools = tools or CodeAgentTools() self.page_extraction_llm = page_extraction_llm self.file_system = file_system if file_system is not None else FileSystem(base_dir='./') self.available_file_paths = available_file_paths or [] self.sensitive_data = sensitive_data self.max_steps = max_steps self.max_failures = max_failures self.max_validations = max_validations self.use_vision = use_vision self.session = NotebookSession() self.namespace: dict[str, Any] = {} self._llm_messages: list[BaseMessage] = [] # Internal LLM conversation history self.complete_history: list[CodeAgentHistory] = [] # Type-safe history with model_output and result self.dom_service: DomService | None = None self._last_browser_state_text: str | None = None # Track last browser state text self._last_screenshot: str | None = None # Track last screenshot (base64) self._consecutive_errors = 0 # Track consecutive errors for auto-termination self._validation_count = 0 # Track number of validator runs self._last_llm_usage: Any | None = None # Track last LLM call usage stats self._step_start_time = 0.0 # Track step start time for duration calculation self.usage_summary: UsageSummary | None = None # Track usage summary across run for history property # Initialize screenshot service for eval tracking self.id = uuid7str() timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') base_tmp = Path('/tmp') self.agent_directory = base_tmp / f'browser_use_code_agent_{self.id}_{timestamp}' self.screenshot_service = ScreenshotService(agent_directory=self.agent_directory) # Initialize token cost service for usage tracking self.token_cost_service = TokenCost(include_cost=calculate_cost) self.token_cost_service.register_llm(llm) if page_extraction_llm: self.token_cost_service.register_llm(page_extraction_llm) # Set version and source for telemetry self.version = get_browser_use_version() try: package_root = Path(__file__).parent.parent.parent repo_files = ['.git', 'README.md', 'docs', 'examples'] if all(Path(package_root / file).exists() for file in repo_files): self.source = 'git' else: self.source = 'pip' except Exception: self.source = 'unknown' # Telemetry self.telemetry = ProductTelemetry() async def run(self, max_steps: int | None = None) -> NotebookSession: """ Run the agent to complete the task. Args: max_steps: Optional override for maximum number of steps (uses __init__ value if not provided) Returns: The notebook session with all executed cells """ # Use override if provided, otherwise use value from __init__ steps_to_run = max_steps if max_steps is not None else self.max_steps self.max_steps = steps_to_run # Start browser if not provided if self.browser_session is None: assert self._browser_profile_for_init is not None self.browser_session = BrowserSession(browser_profile=self._browser_profile_for_init) await self.browser_session.start() # Initialize DOM service with cross-origin iframe support enabled self.dom_service = DomService( browser_session=self.browser_session, cross_origin_iframes=True, # Enable for code-use agent to access forms in iframes ) # Create namespace with all tools self.namespace = create_namespace( browser_session=self.browser_session, tools=self.tools, page_extraction_llm=self.page_extraction_llm, file_system=self.file_system, available_file_paths=self.available_file_paths, sensitive_data=self.sensitive_data, ) # Initialize conversation with task self._llm_messages.append(UserMessage(content=f'Task: {self.task}')) # Track agent run error for telemetry agent_run_error: str | None = None # Extract URL from task and navigate if found initial_url = extract_url_from_task(self.task) if initial_url: try: logger.info(f'Extracted URL from task, navigating to: {initial_url}') # Use the navigate action from namespace await self.namespace['navigate'](initial_url) # Wait for page load await asyncio.sleep(2) # Record this navigation as a cell in the notebook nav_code = f"await navigate('{initial_url}')" cell = self.session.add_cell(source=nav_code) cell.status = ExecutionStatus.SUCCESS cell.execution_count = self.session.increment_execution_count() cell.output = f'Navigated to {initial_url}' # Get browser state after navigation for the cell if self.dom_service: try: browser_state_text, _ = await self._get_browser_state() cell.browser_state = browser_state_text except Exception as state_error: logger.debug(f'Failed to capture browser state for initial navigation cell: {state_error}') except Exception as e: logger.warning(f'Failed to navigate to extracted URL {initial_url}: {e}') # Record failed navigation as error cell nav_code = f"await navigate('{initial_url}')" cell = self.session.add_cell(source=nav_code) cell.status = ExecutionStatus.ERROR cell.execution_count = self.session.increment_execution_count() cell.error = str(e) # Get initial browser state before first LLM call if self.browser_session and self.dom_service: try: browser_state_text, screenshot = await self._get_browser_state() self._last_browser_state_text = browser_state_text self._last_screenshot = screenshot except Exception as e: logger.warning(f'Failed to get initial browser state: {e}') # Main execution loop for step in range(self.max_steps): logger.info(f'\n\n\n\n\n\n\nStep {step + 1}/{self.max_steps}') # Start timing this step self._step_start_time = datetime.datetime.now().timestamp() # Check if we're approaching the step limit or error limit and inject warning steps_remaining = self.max_steps - step - 1 errors_remaining = self.max_failures - self._consecutive_errors should_warn = ( steps_remaining <= 1 # Last step or next to last or errors_remaining <= 1 # One more error will terminate or (steps_remaining <= 2 and self._consecutive_errors >= 2) # Close to both limits ) if should_warn: warning_message = ( f'\n\nāš ļø CRITICAL WARNING: You are approaching execution limits!\n' f'- Steps remaining: {steps_remaining + 1}\n' f'- Consecutive errors: {self._consecutive_errors}/{self.max_failures}\n\n' f'YOU MUST call done() in your NEXT response, even if the task is incomplete:\n' f"- Set success=False if you couldn't complete the task\n" f'- Return EVERYTHING you found so far (partial data is better than nothing)\n' f"- Include any variables you've stored (products, all_data, etc.)\n" f"- Explain what worked and what didn't\n\n" f'Without done(), the user will receive NOTHING.' ) self._llm_messages.append(UserMessage(content=warning_message)) try: # Fetch fresh browser state right before LLM call (only if not already set) if not self._last_browser_state_text and self.browser_session and self.dom_service: try: logger.debug('šŸ” Fetching browser state before LLM call...') browser_state_text, screenshot = await self._get_browser_state() self._last_browser_state_text = browser_state_text self._last_screenshot = screenshot # # Log browser state # if len(browser_state_text) > 2000: # logger.info( # f'Browser state (before LLM):\n{browser_state_text[:2000]}...\n[Truncated, full state {len(browser_state_text)} chars sent to LLM]' # ) # else: # logger.info(f'Browser state (before LLM):\n{browser_state_text}') except Exception as e: logger.warning(f'Failed to get browser state before LLM call: {e}') # Get code from LLM (this also adds to self._llm_messages) try: code, full_llm_response = await self._get_code_from_llm() except Exception as llm_error: # LLM call failed - count as consecutive error and retry self._consecutive_errors += 1 logger.warning( f'LLM call failed (consecutive errors: {self._consecutive_errors}/{self.max_failures}), retrying: {llm_error}' ) # Check if we've hit the consecutive error limit if self._consecutive_errors >= self.max_failures: logger.error(f'Terminating: {self.max_failures} consecutive LLM failures') break await asyncio.sleep(1) # Brief pause before retry continue if not code or code.strip() == '': # If task is already done, empty code is fine (LLM explaining completion) if self._is_task_done(): logger.info('Task already marked as done, LLM provided explanation without code') # Add the text response to history as a non-code step await self._add_step_to_complete_history( model_output_code='', full_llm_response=full_llm_response, output=full_llm_response, # Treat the explanation as output error=None, screenshot_path=await self._capture_screenshot(step + 1), ) break # Exit the loop since task is done logger.warning('LLM returned empty code') self._consecutive_errors += 1 # new state if self.browser_session and self.dom_service: try: browser_state_text, screenshot = await self._get_browser_state() self._last_browser_state_text = browser_state_text self._last_screenshot = screenshot except Exception as e: logger.warning(f'Failed to get new browser state: {e}') continue # Execute code blocks sequentially if multiple python blocks exist # This allows JS/bash blocks to be injected into namespace before Python code uses them all_blocks = self.namespace.get('_all_code_blocks', {}) python_blocks = [k for k in sorted(all_blocks.keys()) if k.startswith('python_')] if len(python_blocks) > 1: # Multiple Python blocks - execute each sequentially output = None error = None for i, block_key in enumerate(python_blocks): logger.info(f'Executing Python block {i + 1}/{len(python_blocks)}') block_code = all_blocks[block_key] block_output, block_error, _ = await self._execute_code(block_code) # Accumulate outputs if block_output: output = (output or '') + block_output if block_error: error = block_error # Stop on first error break else: # Single Python block - execute normally output, error, _ = await self._execute_code(code) # Track consecutive errors if error: self._consecutive_errors += 1 logger.warning(f'Consecutive errors: {self._consecutive_errors}/{self.max_failures}') # Check if we've hit the consecutive error limit if self._consecutive_errors >= self.max_failures: logger.error( f'Terminating: {self.max_failures} consecutive errors reached. The agent is unable to make progress.' ) # Add termination message to complete history before breaking await self._add_step_to_complete_history( model_output_code=code, full_llm_response=f'[Terminated after {self.max_failures} consecutive errors]', output=None, error=f'Auto-terminated: {self.max_failures} consecutive errors without progress', screenshot_path=None, ) break else: # Reset consecutive error counter on success self._consecutive_errors = 0 # Check if task is done - validate completion first if not at limits if self._is_task_done(): # Get the final result from namespace (from done() call) final_result: str | None = self.namespace.get('_task_result') # type: ignore[assignment] # Check if we should validate (not at step/error limits and under max validations) steps_remaining = self.max_steps - step - 1 should_validate = ( self._validation_count < self.max_validations # Haven't exceeded max validations and steps_remaining >= 4 # At least 4 steps away from limit and self._consecutive_errors < 3 # Not close to error limit (8 consecutive) ) if should_validate: self._validation_count += 1 logger.info('Validating task completion with LLM...') from .namespace import validate_task_completion is_complete, reasoning = await validate_task_completion( task=self.task, output=final_result, llm=self.llm, ) if not is_complete: # Task not truly complete - inject feedback and continue logger.warning('Validator: Task not complete, continuing...') validation_feedback = ( f'\n\nāš ļø VALIDATOR FEEDBACK:\n' f'Your done() call was rejected. The task is NOT complete yet.\n\n' f'Validation reasoning:\n{reasoning}\n\n' f'You must continue working on the task. Analyze what is missing and complete it.\n' f'Do NOT call done() again until the task is truly finished.' ) # Clear the done flag so execution continues self.namespace['_task_done'] = False self.namespace.pop('_task_result', None) self.namespace.pop('_task_success', None) # Add validation feedback to LLM messages self._llm_messages.append(UserMessage(content=validation_feedback)) # Don't override output - let execution continue normally else: logger.info('Validator: Task complete') # Override output with done message for final step if final_result: output = final_result else: # At limits - skip validation and accept done() if self._validation_count >= self.max_validations: logger.info( f'Reached max validations ({self.max_validations}) - skipping validation and accepting done()' ) else: logger.info('At step/error limits - skipping validation') if final_result: output = final_result if output: # Check if this is the final done() output if self._is_task_done(): # Show done() output more prominently logger.info( f'āœ“ Task completed - Final output from done():\n{output[:300] if len(output) > 300 else output}' ) # Also show files_to_display if they exist in namespace attachments: list[str] | None = self.namespace.get('_task_attachments') # type: ignore[assignment] if attachments: logger.info(f'Files displayed: {", ".join(attachments)}') else: logger.info(f'Code output:\n{output}') # Browser state is now only logged when fetched before LLM call (not after execution) # Take screenshot for eval tracking screenshot_path = await self._capture_screenshot(step + 1) # Add step to complete_history for eval system await self._add_step_to_complete_history( model_output_code=code, full_llm_response=full_llm_response, output=output, error=error, screenshot_path=screenshot_path, ) # Check if task is done (after validation) if self._is_task_done(): # Get the final result from namespace final_result: str | None = self.namespace.get('_task_result', output) # type: ignore[assignment] logger.info('Task completed successfully') if final_result: logger.info(f'Final result: {final_result}') break # If validation rejected done(), continue to next iteration # The feedback message has already been added to _llm_messages # Add result to LLM messages for next iteration (without browser state) result_message = self._format_execution_result(code, output, error, current_step=step + 1) truncated_result = truncate_message_content(result_message) self._llm_messages.append(UserMessage(content=truncated_result)) except Exception as e: logger.error(f'Error in step {step + 1}: {e}') traceback.print_exc() break else: # Loop completed without break - max_steps reached logger.warning(f'Maximum steps ({self.max_steps}) reached without task completion') # If task is not done, capture the last step's output as partial result if not self._is_task_done() and self.complete_history: # Get the last step's output/error and use it as final extracted_content last_step = self.complete_history[-1] last_result = last_step.result[0] if last_step.result else None last_output = last_result.extracted_content if last_result else None last_error = last_result.error if last_result else None # Build a partial result message from the last step partial_result_parts = [] partial_result_parts.append(f'Task incomplete - reached step limit ({self.max_steps} steps).') partial_result_parts.append('Last step output:') if last_output: partial_result_parts.append(f'\nOutput: {last_output}') if last_error: partial_result_parts.append(f'\nError: {last_error}') # Add any accumulated variables that might contain useful data data_vars = [] for var_name in sorted(self.namespace.keys()): if not var_name.startswith('_') and var_name not in {'json', 'asyncio', 'csv', 're', 'datetime', 'Path'}: var_value = self.namespace[var_name] # Check if it's a list or dict that might contain collected data if isinstance(var_value, (list, dict)) and var_value: data_vars.append(f' - {var_name}: {type(var_value).__name__} with {len(var_value)} items') if data_vars: partial_result_parts.append('\nVariables in namespace that may contain partial data:') partial_result_parts.extend(data_vars) partial_result = '\n'.join(partial_result_parts) # Update the last step's extracted_content with this partial result if last_result: last_result.extracted_content = partial_result last_result.is_done = False last_result.success = False logger.info(f'\nPartial result captured from last step:\n{partial_result}') # Log final summary if task was completed if self._is_task_done(): logger.info('\n' + '=' * 60) logger.info('TASK COMPLETED SUCCESSFULLY') logger.info('=' * 60) final_result: str | None = self.namespace.get('_task_result') # type: ignore[assignment] if final_result: logger.info(f'\nFinal Output:\n{final_result}') attachments: list[str] | None = self.namespace.get('_task_attachments') # type: ignore[assignment] if attachments: logger.info(f'\nFiles Attached:\n{chr(10).join(attachments)}') logger.info('=' * 60 + '\n') # Auto-close browser if keep_alive is False await self.close() # Store usage summary for history property self.usage_summary = await self.token_cost_service.get_usage_summary() # Log token usage summary await self.token_cost_service.log_usage_summary() # Log telemetry event try: self._log_agent_event(max_steps=self.max_steps, agent_run_error=agent_run_error) except Exception as log_e: logger.error(f'Failed to log telemetry event: {log_e}', exc_info=True) return self.session async def _get_code_from_llm(self) -> tuple[str, str]: """Get Python code from the LLM. Returns: Tuple of (extracted_code, full_llm_response) """ # Prepare messages for this request # Include browser state as separate message if available (not accumulated in history) messages_to_send = self._llm_messages.copy() if self._last_browser_state_text: # Create message with optional screenshot if self.use_vision and self._last_screenshot: # Build content with text + screenshot content_parts: list[ContentPartTextParam | ContentPartImageParam] = [ ContentPartTextParam(text=self._last_browser_state_text) ] # Add screenshot content_parts.append( ContentPartImageParam( image_url=ImageURL( url=f'data:image/jpeg;base64,{self._last_screenshot}', media_type='image/jpeg', detail='auto', ), ) ) messages_to_send.append(UserMessage(content=content_parts)) else: # Text only messages_to_send.append(UserMessage(content=self._last_browser_state_text)) # Clear browser state after including it so it's only in this request self._last_browser_state_text = None self._last_screenshot = None # Call LLM with message history (including temporary browser state message) response = await self.llm.ainvoke(messages_to_send) # Store usage stats from this LLM call self._last_llm_usage = response.usage # Log the LLM's raw output for debugging logger.info(f'LLM Response:\n{response.completion}') # Check for token limit or repetition issues max_tokens = getattr(self.llm, 'max_tokens', None) completion_tokens = response.usage.completion_tokens if response.usage else None is_problematic, issue_message = detect_token_limit_issue( completion=response.completion, completion_tokens=completion_tokens, max_tokens=max_tokens, stop_reason=response.stop_reason, ) if is_problematic: logger.warning(f'Token limit issue detected: {issue_message}') # Don't add the bad response to history # Instead, inject a system message prompting recovery recovery_prompt = ( f'Your previous response hit a token limit or became repetitive: {issue_message}\n\n' 'Please write a SHORT plan (2 sentences) for what to do next, then execute ONE simple action.' ) self._llm_messages.append(UserMessage(content=recovery_prompt)) # Return a controlled error message instead of corrupted code return '', f'[Token limit error: {issue_message}]' # Store the full response full_response = response.completion # Extract code blocks from response # Support multiple code block types: python, js, bash, markdown code_blocks = extract_code_blocks(response.completion) # Inject non-python blocks into namespace as variables # Track which variables are code blocks for browser state display if '_code_block_vars' not in self.namespace: self.namespace['_code_block_vars'] = set() for block_type, block_content in code_blocks.items(): if not block_type.startswith('python'): # Store js, bash, markdown blocks (and named variants) as variables in namespace self.namespace[block_type] = block_content self.namespace['_code_block_vars'].add(block_type) print(f'→ Code block variable: {block_type} (str, {len(block_content)} chars)') logger.debug(f'Injected {block_type} block into namespace ({len(block_content)} chars)') # Store all code blocks for sequential execution self.namespace['_all_code_blocks'] = code_blocks # Get Python code if it exists # If no python block exists and no other code blocks exist, return empty string to skip execution # This prevents treating plain text explanations as code code = code_blocks.get('python', response.completion) # Add to LLM messages (truncate for history to save context) truncated_completion = truncate_message_content(response.completion) self._llm_messages.append(AssistantMessage(content=truncated_completion)) return code, full_response def _print_variable_info(self, var_name: str, value: Any) -> None: """Print compact info about a variable assignment.""" # Skip built-in modules and known imports skip_names = { 'json', 'asyncio', 'csv', 're', 'datetime', 'Path', 'pd', 'np', 'plt', 'requests', 'BeautifulSoup', 'PdfReader', 'browser', 'file_system', } if var_name in skip_names: return # Skip code block variables (already printed) if '_code_block_vars' in self.namespace and var_name in self.namespace.get('_code_block_vars', set()): return # Print compact variable info if isinstance(value, (list, dict)): preview = str(value)[:100] print(f'→ Variable: {var_name} ({type(value).__name__}, len={len(value)}, preview={preview}...)') elif isinstance(value, str) and len(value) > 50: print(f'→ Variable: {var_name} (str, {len(value)} chars, preview={value[:50]}...)') elif callable(value): print(f'→ Variable: {var_name} (function)') else: print(f'→ Variable: {var_name} ({type(value).__name__}, value={repr(value)[:50]})') async def _execute_code(self, code: str) -> tuple[str | None, str | None, str | None]: """ Execute Python code in the namespace. Args: code: The Python code to execute Returns: Tuple of (output, error, browser_state) """ # Create new cell cell = self.session.add_cell(source=code) cell.status = ExecutionStatus.RUNNING cell.execution_count = self.session.increment_execution_count() output = None error = None browser_state = None try: # Capture output import ast import io import sys old_stdout = sys.stdout sys.stdout = io.StringIO() try: # Add asyncio to namespace if not already there if 'asyncio' not in self.namespace: self.namespace['asyncio'] = asyncio # Store the current code in namespace for done() validation self.namespace['_current_cell_code'] = code # Store consecutive errors count for done() validation self.namespace['_consecutive_errors'] = self._consecutive_errors # Check if code contains await expressions - if so, wrap in async function # This mimics how Jupyter/IPython handles top-level await try: tree = ast.parse(code, mode='exec') has_await = any(isinstance(node, (ast.Await, ast.AsyncWith, ast.AsyncFor)) for node in ast.walk(tree)) except SyntaxError: # If parse fails, let exec handle the error has_await = False if has_await: # When code has await, we must wrap in async function # To make variables persist naturally (like Jupyter without needing 'global'): # 1. Extract all assigned variable names from the code # 2. Inject 'global' declarations for variables that already exist in namespace # 3. Extract user's explicit global declarations and pre-define those vars # 4. Return locals() so we can update namespace with new variables # Find all variable names being assigned + user's explicit globals try: assigned_names = set() user_global_names = set() for node in ast.walk(tree): if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): assigned_names.add(target.id) elif isinstance(node, ast.AugAssign) and isinstance(node.target, ast.Name): assigned_names.add(node.target.id) elif isinstance(node, (ast.AnnAssign, ast.NamedExpr)): if hasattr(node, 'target') and isinstance(node.target, ast.Name): assigned_names.add(node.target.id) elif isinstance(node, ast.Global): # Track user's explicit global declarations user_global_names.update(node.names) # Pre-define any user-declared globals that don't exist yet # This prevents NameError when user writes "global foo" before "foo = ..." for name in user_global_names: if name not in self.namespace: self.namespace[name] = None # Filter to only existing namespace vars (like Jupyter does) # Include both: assigned vars that exist + user's explicit globals existing_vars = {name for name in (assigned_names | user_global_names) if name in self.namespace} except Exception as e: existing_vars = set() # Build global declaration if needed global_decl = '' has_global_decl = False if existing_vars: vars_str = ', '.join(sorted(existing_vars)) global_decl = f' global {vars_str}\n' has_global_decl = True indented_code = '\n'.join(' ' + line if line.strip() else line for line in code.split('\n')) wrapped_code = f"""async def __code_exec__(): {global_decl}{indented_code} # Return locals so we can update the namespace return locals() __code_exec_coro__ = __code_exec__() """ # Store whether we added a global declaration (needed for error line mapping) self.namespace['_has_global_decl'] = has_global_decl # Compile and execute wrapper at module level compiled_code = compile(wrapped_code, '', 'exec') exec(compiled_code, self.namespace, self.namespace) # Get and await the coroutine, then update namespace with new/modified variables coro = self.namespace.get('__code_exec_coro__') if coro: result_locals = await coro # Update namespace with all variables from the function's locals # This makes variable assignments persist across cells if result_locals: for key, value in result_locals.items(): if not key.startswith('_'): self.namespace[key] = value # Variable info is tracked in "Available" section, no need for verbose inline output # Clean up temporary variables self.namespace.pop('__code_exec_coro__', None) self.namespace.pop('__code_exec__', None) else: # No await - execute directly at module level for natural variable scoping # This means x = x + 10 will work without needing 'global x' # Track variables before execution vars_before = set(self.namespace.keys()) compiled_code = compile(code, '', 'exec') exec(compiled_code, self.namespace, self.namespace) # Track newly created/modified variables (info shown in "Available" section) vars_after = set(self.namespace.keys()) new_vars = vars_after - vars_before # Get output output_value = sys.stdout.getvalue() if output_value: output = output_value finally: sys.stdout = old_stdout # Wait 2 seconds for page to stabilize after code execution await asyncio.sleep(0.5) # Note: Browser state is now fetched right before LLM call instead of after each execution # This reduces unnecessary state fetches for operations that don't affect the browser cell.status = ExecutionStatus.SUCCESS cell.output = output cell.browser_state = None # Will be captured in next iteration before LLM call except Exception as e: # Handle EvaluateError specially - JavaScript execution failed if isinstance(e, EvaluateError): error = str(e) cell.status = ExecutionStatus.ERROR cell.error = error logger.error(f'Code execution error: {error}') await asyncio.sleep(1) # Browser state will be fetched before next LLM call # Return immediately - do not continue executing code return output, error, None # Handle NameError specially - check for code block variable confusion if isinstance(e, NameError): error_msg = str(e) cell.status = ExecutionStatus.ERROR cell.error = error # Browser state will be fetched before next LLM call await asyncio.sleep(0.5) return output, error, None # For syntax errors and common parsing errors, show just the error message # without the full traceback to keep output clean if isinstance(e, SyntaxError): error_msg = e.msg if e.msg else str(e) error = f'{type(e).__name__}: {error_msg}' # Detect common f-string issues with JSON/JavaScript code if 'unterminated' in error_msg.lower() and 'string' in error_msg.lower() and code: # Check if code contains f-strings with potential JSON/JS content has_fstring = bool(re.search(r'\bf["\']', code)) has_json_pattern = bool(re.search(r'json\.dumps|"[^"]*\{[^"]*\}[^"]*"|\'[^\']*\{[^\']*\}[^\']*\'', code)) has_js_pattern = bool(re.search(r'evaluate\(|await evaluate', code)) if has_fstring and (has_json_pattern or has_js_pattern): error += ( '\n\nšŸ’” TIP: Detected f-string with JSON/JavaScript code containing {}.\n' ' Use separate ```js or ```markdown blocks instead of f-strings to avoid escaping issues.\n' ' If your code block needs ``` inside it, wrap with 4+ backticks: ````markdown code`\n' ) # Detect and provide helpful hints for common string literal errors if 'unterminated' in error_msg.lower() and 'string' in error_msg.lower(): # Detect what type of string literal is unterminated is_triple = 'triple-quoted' in error_msg.lower() msg_lower = error_msg.lower() # Detect prefix type from error message if 'f-string' in msg_lower and 'raw' in msg_lower: prefix = 'rf or fr' desc = 'raw f-string' elif 'f-string' in msg_lower: prefix = 'f' desc = 'f-string' elif 'raw' in msg_lower and 'bytes' in msg_lower: prefix = 'rb or br' desc = 'raw bytes' elif 'raw' in msg_lower: prefix = 'r' desc = 'raw string' elif 'bytes' in msg_lower: prefix = 'b' desc = 'bytes' else: prefix = '' desc = 'string' # Build hint based on triple-quoted vs single/double quoted if is_triple: if prefix: hint = f"Hint: Unterminated {prefix}'''...''' or {prefix}\"\"\"...\"\" ({desc}). Check for missing closing quotes or unescaped quotes inside." else: hint = "Hint: Unterminated '''...''' or \"\"\"...\"\" detected. Check for missing closing quotes or unescaped quotes inside." hint += '\n If you need ``` inside your string, use a ````markdown varname` code block with 4+ backticks instead.' else: if prefix: hint = f'Hint: Unterminated {prefix}\'...\' or {prefix}"..." ({desc}). Check for missing closing quote or unescaped quotes inside.' else: hint = 'Hint: Unterminated \'...\' or "..." detected. Check for missing closing quote or unescaped quotes inside the string.' error += f'\n{hint}' # Show the problematic line from the code if e.text: error += f'\n{e.text}' elif e.lineno and code: # If e.text is empty, extract the line from the code lines = code.split('\n') if 0 < e.lineno <= len(lines): error += f'\n{lines[e.lineno - 1]}' else: # For other errors, try to extract useful information error_str = str(e) error = f'{type(e).__name__}: {error_str}' if error_str else f'{type(e).__name__} occurred' # For RuntimeError or other exceptions, try to extract traceback info # to show which line in the user's code actually failed if hasattr(e, '__traceback__'): # Walk the traceback to find the frame with '' filename tb = e.__traceback__ user_code_lineno = None while tb is not None: frame = tb.tb_frame if frame.f_code.co_filename == '': # Found the frame executing user code # Get the line number from the traceback user_code_lineno = tb.tb_lineno break tb = tb.tb_next cell.status = ExecutionStatus.ERROR cell.error = error logger.error(f'Code execution error: {error}') await asyncio.sleep(1) # Browser state will be fetched before next LLM call return output, error, None async def _get_browser_state(self) -> tuple[str, str | None]: """Get the current browser state as text with ultra-minimal DOM structure for code agents. Returns: Tuple of (browser_state_text, screenshot_base64) """ if not self.browser_session or not self.dom_service: return 'Browser state not available', None try: # Get full browser state including screenshot if use_vision is enabled include_screenshot = True state = await self.browser_session.get_browser_state_summary(include_screenshot=include_screenshot) # Format browser state with namespace context browser_state_text = await format_browser_state_for_llm( state=state, namespace=self.namespace, browser_session=self.browser_session ) screenshot = state.screenshot if include_screenshot else None return browser_state_text, screenshot except Exception as e: logger.error(f'Failed to get browser state: {e}') return f'Error getting browser state: {e}', None def _format_execution_result(self, code: str, output: str | None, error: str | None, current_step: int | None = None) -> str: """Format the execution result for the LLM (without browser state).""" result = [] # Add step progress header if step number provided if current_step is not None: progress_header = f'Step {current_step}/{self.max_steps} executed' # Add consecutive failure tracking if there are errors if error and self._consecutive_errors > 0: progress_header += f' | Consecutive failures: {self._consecutive_errors}/{self.max_failures}' result.append(progress_header) if error: result.append(f'Error: {error}') if output: # Truncate output if too long if len(output) > 10000: output = output[:9950] + '\n[Truncated after 10000 characters]' result.append(f'Output: {output}') if len(result) == 0: result.append('Executed') return '\n'.join(result) def _is_task_done(self) -> bool: """Check if the task is marked as done in the namespace.""" # Check if 'done' was called by looking for a special marker in namespace return self.namespace.get('_task_done', False) async def _capture_screenshot(self, step_number: int) -> str | None: """Capture and store screenshot for eval tracking.""" if not self.browser_session: return None try: # Get browser state summary which includes screenshot state = await self.browser_session.get_browser_state_summary(include_screenshot=True) if state and state.screenshot: # Store screenshot using screenshot service screenshot_path = await self.screenshot_service.store_screenshot(state.screenshot, step_number) return str(screenshot_path) if screenshot_path else None except Exception as e: logger.warning(f'Failed to capture screenshot for step {step_number}: {e}') return None async def _add_step_to_complete_history( self, model_output_code: str, full_llm_response: str, output: str | None, error: str | None, screenshot_path: str | None, ) -> None: """Add a step to complete_history using type-safe models.""" # Get current browser URL and title for state url: str | None = None title: str | None = None if self.browser_session: try: url = await self.browser_session.get_current_page_url() # Get title from browser cdp_session = await self.browser_session.get_or_create_cdp_session() result = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': 'document.title', 'returnByValue': True}, session_id=cdp_session.session_id, ) title = result.get('result', {}).get('value') except Exception as e: logger.debug(f'Failed to get browser URL/title for history: {e}') # Check if this is a done result is_done = self._is_task_done() # Get self-reported success from done() call if task is done self_reported_success: bool | None = None if is_done: task_success = self.namespace.get('_task_success') self_reported_success = task_success if isinstance(task_success, bool) else None # Create result entry using typed model result_entry = CodeAgentResult( extracted_content=output if output else None, error=error if error else None, is_done=is_done, success=self_reported_success, ) # Create state entry using typed model state_entry = CodeAgentState(url=url, title=title, screenshot_path=screenshot_path) # Create metadata entry using typed model step_end_time = datetime.datetime.now().timestamp() metadata_entry = CodeAgentStepMetadata( input_tokens=self._last_llm_usage.prompt_tokens if self._last_llm_usage else None, output_tokens=self._last_llm_usage.completion_tokens if self._last_llm_usage else None, step_start_time=self._step_start_time, step_end_time=step_end_time, ) # Create model output entry using typed model (if there's code to track) model_output_entry: CodeAgentModelOutput | None = None if model_output_code or full_llm_response: model_output_entry = CodeAgentModelOutput( model_output=model_output_code if model_output_code else '', full_response=full_llm_response if full_llm_response else '', ) # Create history entry using typed model history_entry = CodeAgentHistory( model_output=model_output_entry, result=[result_entry], state=state_entry, metadata=metadata_entry, screenshot_path=screenshot_path, # Keep for backward compatibility ) self.complete_history.append(history_entry) def _log_agent_event(self, max_steps: int, agent_run_error: str | None = None) -> None: """Send the agent event for this run to telemetry.""" from urllib.parse import urlparse token_summary = self.token_cost_service.get_usage_tokens_for_model(self.llm.model) # For CodeAgent, we don't have action history like Agent does # Instead we track the code execution cells action_history_data: list[list[dict[str, Any]] | None] = [] for step in self.complete_history: # Extract code from model_output if available (type-safe access) if step.model_output and step.model_output.full_response: code = step.model_output.full_response # Represent each code cell as a simple action entry action_history_data.append([{'llm_response': code}]) else: action_history_data.append(None) # Get final result from the last step or namespace (type-safe) final_result: Any = self.namespace.get('_task_result') final_result_str: str | None = final_result if isinstance(final_result, str) else None # Get URLs visited from complete_history (type-safe access) urls_visited: list[str] = [] for step in self.complete_history: if step.state.url and step.state.url not in urls_visited: urls_visited.append(step.state.url) # Get errors from complete_history (type-safe access) errors: list[str] = [] for step in self.complete_history: for result in step.result: if result.error: errors.append(result.error) # Determine success from task completion status (type-safe) is_done = self._is_task_done() task_success: Any = self.namespace.get('_task_success') self_reported_success: bool | None = task_success if isinstance(task_success, bool) else (False if is_done else None) self.telemetry.capture( AgentTelemetryEvent( task=self.task, model=self.llm.model, model_provider=self.llm.provider, max_steps=max_steps, max_actions_per_step=1, # CodeAgent executes one code cell per step use_vision=self.use_vision, version=self.version, source=self.source, cdp_url=urlparse(self.browser_session.cdp_url).hostname if self.browser_session and self.browser_session.cdp_url else None, agent_type='code', # CodeAgent identifier action_errors=errors, action_history=action_history_data, urls_visited=urls_visited, steps=len(self.complete_history), total_input_tokens=token_summary.prompt_tokens, total_output_tokens=token_summary.completion_tokens, prompt_cached_tokens=token_summary.prompt_cached_tokens, total_tokens=token_summary.total_tokens, total_duration_seconds=sum(step.metadata.duration_seconds for step in self.complete_history if step.metadata), success=self_reported_success, final_result_response=final_result_str, error_message=agent_run_error, ) ) def screenshot_paths(self, n_last: int | None = None) -> list[str | None]: """ Get screenshot paths from complete_history for eval system. Args: n_last: Optional number of last screenshots to return Returns: List of screenshot file paths (or None for missing screenshots) """ paths = [step.screenshot_path for step in self.complete_history] if n_last is not None: return paths[-n_last:] if len(paths) > n_last else paths return paths @property def message_manager(self) -> Any: """ Compatibility property for eval system. Returns a mock object with last_input_messages attribute. """ class MockMessageManager: def __init__(self, llm_messages: list[BaseMessage]) -> None: # Convert code-use LLM messages to format expected by eval system self.last_input_messages = llm_messages return MockMessageManager(self._llm_messages) @property def history(self) -> Any: """ Compatibility property for eval system. Returns a mock AgentHistoryList object with history attribute containing complete_history. This is what the eval system expects when it does: agent_history = agent.history """ class DictToObject: """Convert dict to object with attribute access for eval compatibility.""" def __init__(self, data: dict[str, Any]) -> None: for key, value in data.items(): if isinstance(value, dict): setattr(self, key, DictToObject(value)) elif isinstance(value, list): setattr(self, key, [DictToObject(item) if isinstance(item, dict) else item for item in value]) else: setattr(self, key, value) def __getattr__(self, name: str) -> None: """Provide safe attribute access with defaults for missing attributes.""" # Return None for missing attributes instead of raising AttributeError # This handles cases where eval system checks attributes that CodeAgent doesn't set return None def model_dump(self) -> dict[str, Any]: """Support model_dump() calls from eval system.""" result = {} for key, value in self.__dict__.items(): if isinstance(value, DictToObject): result[key] = value.model_dump() elif isinstance(value, list): result[key] = [item.model_dump() if isinstance(item, DictToObject) else item for item in value] else: result[key] = value return result def get_screenshot(self) -> str | None: """Support get_screenshot() calls for state objects.""" # Load screenshot from disk and return as base64 string (matching BrowserStateHistory implementation) if not hasattr(self, 'screenshot_path') or not self.screenshot_path: return None import base64 from pathlib import Path path_obj = Path(self.screenshot_path) if not path_obj.exists(): return None try: with open(path_obj, 'rb') as f: screenshot_data = f.read() return base64.b64encode(screenshot_data).decode('utf-8') except Exception: return None class MockAgentHistoryList: def __init__(self, complete_history: list[CodeAgentHistory], usage_summary: UsageSummary | None) -> None: # Convert each CodeAgentHistory to dict, then to object with attribute access self.history = [DictToObject(item.model_dump()) for item in complete_history] # Use the provided usage summary self.usage = usage_summary return MockAgentHistoryList(self.complete_history, self.usage_summary) async def close(self) -> None: """Close the browser session.""" if self.browser_session: # Check if we should close the browser based on keep_alive setting if not self.browser_session.browser_profile.keep_alive: await self.browser_session.kill() else: logger.debug('Browser keep_alive is True, not closing browser session') async def __aenter__(self) -> 'CodeAgent': """Async context manager entry.""" return self async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None: """Async context manager exit.""" await self.close()