Spaces:
Sleeping
Sleeping
| """Code-use agent service - Jupyter notebook-like code execution for browser automation.""" | |
| import asyncio | |
| import datetime | |
| import logging | |
| import re | |
| import traceback | |
| from pathlib import Path | |
| from typing import Any | |
| from uuid_extensions import uuid7str | |
| from browser_use.browser import BrowserSession | |
| from browser_use.browser.profile import BrowserProfile | |
| from browser_use.dom.service import DomService | |
| from browser_use.filesystem.file_system import FileSystem | |
| from browser_use.llm.base import BaseChatModel | |
| from browser_use.llm.messages import ( | |
| AssistantMessage, | |
| BaseMessage, | |
| ContentPartImageParam, | |
| ContentPartTextParam, | |
| ImageURL, | |
| UserMessage, | |
| ) | |
| from browser_use.screenshots.service import ScreenshotService | |
| from browser_use.telemetry.service import ProductTelemetry | |
| from browser_use.telemetry.views import AgentTelemetryEvent | |
| from browser_use.tokens.service import TokenCost | |
| from browser_use.tokens.views import UsageSummary | |
| from browser_use.tools.service import CodeAgentTools, Tools | |
| from browser_use.utils import get_browser_use_version | |
| from .formatting import format_browser_state_for_llm | |
| from .namespace import EvaluateError, create_namespace | |
| from .utils import detect_token_limit_issue, extract_code_blocks, extract_url_from_task, truncate_message_content | |
| from .views import ( | |
| CodeAgentHistory, | |
| CodeAgentModelOutput, | |
| CodeAgentResult, | |
| CodeAgentState, | |
| CodeAgentStepMetadata, | |
| ExecutionStatus, | |
| NotebookSession, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class CodeAgent: | |
| """ | |
| Agent that executes Python code in a notebook-like environment for browser automation. | |
| This agent provides a Jupyter notebook-like interface where the LLM writes Python code | |
| that gets executed in a persistent namespace with browser control functions available. | |
| """ | |
| def __init__( | |
| self, | |
| task: str, | |
| # Optional parameters | |
| llm: BaseChatModel | None = None, | |
| browser_session: BrowserSession | None = None, | |
| browser: BrowserSession | None = None, # Alias for browser_session | |
| tools: Tools | None = None, | |
| controller: Tools | None = None, # Alias for tools | |
| # Agent settings | |
| page_extraction_llm: BaseChatModel | None = None, | |
| file_system: FileSystem | None = None, | |
| available_file_paths: list[str] | None = None, | |
| sensitive_data: dict[str, str | dict[str, str]] | None = None, | |
| max_steps: int = 100, | |
| max_failures: int = 8, | |
| max_validations: int = 0, | |
| use_vision: bool = True, | |
| calculate_cost: bool = False, | |
| **kwargs, | |
| ): | |
| """ | |
| Initialize the code-use agent. | |
| Args: | |
| task: The task description for the agent | |
| browser_session: Optional browser session (will be created if not provided) [DEPRECATED: use browser] | |
| browser: Optional browser session (cleaner API) | |
| tools: Optional Tools instance (will create default if not provided) | |
| controller: Optional Tools instance | |
| page_extraction_llm: Optional LLM for page extraction | |
| file_system: Optional file system for file operations | |
| available_file_paths: Optional list of available file paths | |
| sensitive_data: Optional sensitive data dictionary | |
| max_steps: Maximum number of execution steps | |
| max_failures: Maximum consecutive errors before termination (default: 8) | |
| max_validations: Maximum number of times to run the validator agent (default: 0) | |
| use_vision: Whether to include screenshots in LLM messages (default: True) | |
| calculate_cost: Whether to calculate token costs (default: False) | |
| llm: Optional ChatBrowserUse LLM instance (will create default if not provided) | |
| **kwargs: Additional keyword arguments for compatibility (ignored) | |
| """ | |
| # Log and ignore unknown kwargs for compatibility | |
| if kwargs: | |
| logger.debug(f'Ignoring additional kwargs for CodeAgent compatibility: {list(kwargs.keys())}') | |
| if llm is None: | |
| try: | |
| from browser_use import ChatBrowserUse | |
| llm = ChatBrowserUse() | |
| logger.debug('CodeAgent using ChatBrowserUse') | |
| except Exception as e: | |
| raise RuntimeError(f'Failed to initialize CodeAgent LLM: {e}') | |
| if 'ChatBrowserUse' not in llm.__class__.__name__: | |
| raise ValueError('This agent works only with ChatBrowserUse.') | |
| # Handle browser vs browser_session parameter (browser takes precedence) | |
| if browser and browser_session: | |
| raise ValueError('Cannot specify both "browser" and "browser_session" parameters. Use "browser" for the cleaner API.') | |
| browser_session = browser or browser_session | |
| # Handle controller vs tools parameter (controller takes precedence) | |
| if controller and tools: | |
| raise ValueError('Cannot specify both "controller" and "tools" parameters. Use "controller" for the cleaner API.') | |
| tools = controller or tools | |
| # Store browser_profile for creating browser session if needed | |
| self._browser_profile_for_init = BrowserProfile() if browser_session is None else None | |
| self.task = task | |
| self.llm = llm | |
| self.browser_session = browser_session | |
| self.tools = tools or CodeAgentTools() | |
| self.page_extraction_llm = page_extraction_llm | |
| self.file_system = file_system if file_system is not None else FileSystem(base_dir='./') | |
| self.available_file_paths = available_file_paths or [] | |
| self.sensitive_data = sensitive_data | |
| self.max_steps = max_steps | |
| self.max_failures = max_failures | |
| self.max_validations = max_validations | |
| self.use_vision = use_vision | |
| self.session = NotebookSession() | |
| self.namespace: dict[str, Any] = {} | |
| self._llm_messages: list[BaseMessage] = [] # Internal LLM conversation history | |
| self.complete_history: list[CodeAgentHistory] = [] # Type-safe history with model_output and result | |
| self.dom_service: DomService | None = None | |
| self._last_browser_state_text: str | None = None # Track last browser state text | |
| self._last_screenshot: str | None = None # Track last screenshot (base64) | |
| self._consecutive_errors = 0 # Track consecutive errors for auto-termination | |
| self._validation_count = 0 # Track number of validator runs | |
| self._last_llm_usage: Any | None = None # Track last LLM call usage stats | |
| self._step_start_time = 0.0 # Track step start time for duration calculation | |
| self.usage_summary: UsageSummary | None = None # Track usage summary across run for history property | |
| # Initialize screenshot service for eval tracking | |
| self.id = uuid7str() | |
| timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') | |
| base_tmp = Path('/tmp') | |
| self.agent_directory = base_tmp / f'browser_use_code_agent_{self.id}_{timestamp}' | |
| self.screenshot_service = ScreenshotService(agent_directory=self.agent_directory) | |
| # Initialize token cost service for usage tracking | |
| self.token_cost_service = TokenCost(include_cost=calculate_cost) | |
| self.token_cost_service.register_llm(llm) | |
| if page_extraction_llm: | |
| self.token_cost_service.register_llm(page_extraction_llm) | |
| # Set version and source for telemetry | |
| self.version = get_browser_use_version() | |
| try: | |
| package_root = Path(__file__).parent.parent.parent | |
| repo_files = ['.git', 'README.md', 'docs', 'examples'] | |
| if all(Path(package_root / file).exists() for file in repo_files): | |
| self.source = 'git' | |
| else: | |
| self.source = 'pip' | |
| except Exception: | |
| self.source = 'unknown' | |
| # Telemetry | |
| self.telemetry = ProductTelemetry() | |
| async def run(self, max_steps: int | None = None) -> NotebookSession: | |
| """ | |
| Run the agent to complete the task. | |
| Args: | |
| max_steps: Optional override for maximum number of steps (uses __init__ value if not provided) | |
| Returns: | |
| The notebook session with all executed cells | |
| """ | |
| # Use override if provided, otherwise use value from __init__ | |
| steps_to_run = max_steps if max_steps is not None else self.max_steps | |
| self.max_steps = steps_to_run | |
| # Start browser if not provided | |
| if self.browser_session is None: | |
| assert self._browser_profile_for_init is not None | |
| self.browser_session = BrowserSession(browser_profile=self._browser_profile_for_init) | |
| await self.browser_session.start() | |
| # Initialize DOM service with cross-origin iframe support enabled | |
| self.dom_service = DomService( | |
| browser_session=self.browser_session, | |
| cross_origin_iframes=True, # Enable for code-use agent to access forms in iframes | |
| ) | |
| # Create namespace with all tools | |
| self.namespace = create_namespace( | |
| browser_session=self.browser_session, | |
| tools=self.tools, | |
| page_extraction_llm=self.page_extraction_llm, | |
| file_system=self.file_system, | |
| available_file_paths=self.available_file_paths, | |
| sensitive_data=self.sensitive_data, | |
| ) | |
| # Initialize conversation with task | |
| self._llm_messages.append(UserMessage(content=f'Task: {self.task}')) | |
| # Track agent run error for telemetry | |
| agent_run_error: str | None = None | |
| # Extract URL from task and navigate if found | |
| initial_url = extract_url_from_task(self.task) | |
| if initial_url: | |
| try: | |
| logger.info(f'Extracted URL from task, navigating to: {initial_url}') | |
| # Use the navigate action from namespace | |
| await self.namespace['navigate'](initial_url) | |
| # Wait for page load | |
| await asyncio.sleep(2) | |
| # Record this navigation as a cell in the notebook | |
| nav_code = f"await navigate('{initial_url}')" | |
| cell = self.session.add_cell(source=nav_code) | |
| cell.status = ExecutionStatus.SUCCESS | |
| cell.execution_count = self.session.increment_execution_count() | |
| cell.output = f'Navigated to {initial_url}' | |
| # Get browser state after navigation for the cell | |
| if self.dom_service: | |
| try: | |
| browser_state_text, _ = await self._get_browser_state() | |
| cell.browser_state = browser_state_text | |
| except Exception as state_error: | |
| logger.debug(f'Failed to capture browser state for initial navigation cell: {state_error}') | |
| except Exception as e: | |
| logger.warning(f'Failed to navigate to extracted URL {initial_url}: {e}') | |
| # Record failed navigation as error cell | |
| nav_code = f"await navigate('{initial_url}')" | |
| cell = self.session.add_cell(source=nav_code) | |
| cell.status = ExecutionStatus.ERROR | |
| cell.execution_count = self.session.increment_execution_count() | |
| cell.error = str(e) | |
| # Get initial browser state before first LLM call | |
| if self.browser_session and self.dom_service: | |
| try: | |
| browser_state_text, screenshot = await self._get_browser_state() | |
| self._last_browser_state_text = browser_state_text | |
| self._last_screenshot = screenshot | |
| except Exception as e: | |
| logger.warning(f'Failed to get initial browser state: {e}') | |
| # Main execution loop | |
| for step in range(self.max_steps): | |
| logger.info(f'\n\n\n\n\n\n\nStep {step + 1}/{self.max_steps}') | |
| # Start timing this step | |
| self._step_start_time = datetime.datetime.now().timestamp() | |
| # Check if we're approaching the step limit or error limit and inject warning | |
| steps_remaining = self.max_steps - step - 1 | |
| errors_remaining = self.max_failures - self._consecutive_errors | |
| should_warn = ( | |
| steps_remaining <= 1 # Last step or next to last | |
| or errors_remaining <= 1 # One more error will terminate | |
| or (steps_remaining <= 2 and self._consecutive_errors >= 2) # Close to both limits | |
| ) | |
| if should_warn: | |
| warning_message = ( | |
| f'\n\n⚠️ CRITICAL WARNING: You are approaching execution limits!\n' | |
| f'- Steps remaining: {steps_remaining + 1}\n' | |
| f'- Consecutive errors: {self._consecutive_errors}/{self.max_failures}\n\n' | |
| f'YOU MUST call done() in your NEXT response, even if the task is incomplete:\n' | |
| f"- Set success=False if you couldn't complete the task\n" | |
| f'- Return EVERYTHING you found so far (partial data is better than nothing)\n' | |
| f"- Include any variables you've stored (products, all_data, etc.)\n" | |
| f"- Explain what worked and what didn't\n\n" | |
| f'Without done(), the user will receive NOTHING.' | |
| ) | |
| self._llm_messages.append(UserMessage(content=warning_message)) | |
| try: | |
| # Fetch fresh browser state right before LLM call (only if not already set) | |
| if not self._last_browser_state_text and self.browser_session and self.dom_service: | |
| try: | |
| logger.debug('🔍 Fetching browser state before LLM call...') | |
| browser_state_text, screenshot = await self._get_browser_state() | |
| self._last_browser_state_text = browser_state_text | |
| self._last_screenshot = screenshot | |
| # # Log browser state | |
| # if len(browser_state_text) > 2000: | |
| # logger.info( | |
| # f'Browser state (before LLM):\n{browser_state_text[:2000]}...\n[Truncated, full state {len(browser_state_text)} chars sent to LLM]' | |
| # ) | |
| # else: | |
| # logger.info(f'Browser state (before LLM):\n{browser_state_text}') | |
| except Exception as e: | |
| logger.warning(f'Failed to get browser state before LLM call: {e}') | |
| # Get code from LLM (this also adds to self._llm_messages) | |
| try: | |
| code, full_llm_response = await self._get_code_from_llm() | |
| except Exception as llm_error: | |
| # LLM call failed - count as consecutive error and retry | |
| self._consecutive_errors += 1 | |
| logger.warning( | |
| f'LLM call failed (consecutive errors: {self._consecutive_errors}/{self.max_failures}), retrying: {llm_error}' | |
| ) | |
| # Check if we've hit the consecutive error limit | |
| if self._consecutive_errors >= self.max_failures: | |
| logger.error(f'Terminating: {self.max_failures} consecutive LLM failures') | |
| break | |
| await asyncio.sleep(1) # Brief pause before retry | |
| continue | |
| if not code or code.strip() == '': | |
| # If task is already done, empty code is fine (LLM explaining completion) | |
| if self._is_task_done(): | |
| logger.info('Task already marked as done, LLM provided explanation without code') | |
| # Add the text response to history as a non-code step | |
| await self._add_step_to_complete_history( | |
| model_output_code='', | |
| full_llm_response=full_llm_response, | |
| output=full_llm_response, # Treat the explanation as output | |
| error=None, | |
| screenshot_path=await self._capture_screenshot(step + 1), | |
| ) | |
| break # Exit the loop since task is done | |
| logger.warning('LLM returned empty code') | |
| self._consecutive_errors += 1 | |
| # new state | |
| if self.browser_session and self.dom_service: | |
| try: | |
| browser_state_text, screenshot = await self._get_browser_state() | |
| self._last_browser_state_text = browser_state_text | |
| self._last_screenshot = screenshot | |
| except Exception as e: | |
| logger.warning(f'Failed to get new browser state: {e}') | |
| continue | |
| # Execute code blocks sequentially if multiple python blocks exist | |
| # This allows JS/bash blocks to be injected into namespace before Python code uses them | |
| all_blocks = self.namespace.get('_all_code_blocks', {}) | |
| python_blocks = [k for k in sorted(all_blocks.keys()) if k.startswith('python_')] | |
| if len(python_blocks) > 1: | |
| # Multiple Python blocks - execute each sequentially | |
| output = None | |
| error = None | |
| for i, block_key in enumerate(python_blocks): | |
| logger.info(f'Executing Python block {i + 1}/{len(python_blocks)}') | |
| block_code = all_blocks[block_key] | |
| block_output, block_error, _ = await self._execute_code(block_code) | |
| # Accumulate outputs | |
| if block_output: | |
| output = (output or '') + block_output | |
| if block_error: | |
| error = block_error | |
| # Stop on first error | |
| break | |
| else: | |
| # Single Python block - execute normally | |
| output, error, _ = await self._execute_code(code) | |
| # Track consecutive errors | |
| if error: | |
| self._consecutive_errors += 1 | |
| logger.warning(f'Consecutive errors: {self._consecutive_errors}/{self.max_failures}') | |
| # Check if we've hit the consecutive error limit | |
| if self._consecutive_errors >= self.max_failures: | |
| logger.error( | |
| f'Terminating: {self.max_failures} consecutive errors reached. The agent is unable to make progress.' | |
| ) | |
| # Add termination message to complete history before breaking | |
| await self._add_step_to_complete_history( | |
| model_output_code=code, | |
| full_llm_response=f'[Terminated after {self.max_failures} consecutive errors]', | |
| output=None, | |
| error=f'Auto-terminated: {self.max_failures} consecutive errors without progress', | |
| screenshot_path=None, | |
| ) | |
| break | |
| else: | |
| # Reset consecutive error counter on success | |
| self._consecutive_errors = 0 | |
| # Check if task is done - validate completion first if not at limits | |
| if self._is_task_done(): | |
| # Get the final result from namespace (from done() call) | |
| final_result: str | None = self.namespace.get('_task_result') # type: ignore[assignment] | |
| # Check if we should validate (not at step/error limits and under max validations) | |
| steps_remaining = self.max_steps - step - 1 | |
| should_validate = ( | |
| self._validation_count < self.max_validations # Haven't exceeded max validations | |
| and steps_remaining >= 4 # At least 4 steps away from limit | |
| and self._consecutive_errors < 3 # Not close to error limit (8 consecutive) | |
| ) | |
| if should_validate: | |
| self._validation_count += 1 | |
| logger.info('Validating task completion with LLM...') | |
| from .namespace import validate_task_completion | |
| is_complete, reasoning = await validate_task_completion( | |
| task=self.task, | |
| output=final_result, | |
| llm=self.llm, | |
| ) | |
| if not is_complete: | |
| # Task not truly complete - inject feedback and continue | |
| logger.warning('Validator: Task not complete, continuing...') | |
| validation_feedback = ( | |
| f'\n\n⚠️ VALIDATOR FEEDBACK:\n' | |
| f'Your done() call was rejected. The task is NOT complete yet.\n\n' | |
| f'Validation reasoning:\n{reasoning}\n\n' | |
| f'You must continue working on the task. Analyze what is missing and complete it.\n' | |
| f'Do NOT call done() again until the task is truly finished.' | |
| ) | |
| # Clear the done flag so execution continues | |
| self.namespace['_task_done'] = False | |
| self.namespace.pop('_task_result', None) | |
| self.namespace.pop('_task_success', None) | |
| # Add validation feedback to LLM messages | |
| self._llm_messages.append(UserMessage(content=validation_feedback)) | |
| # Don't override output - let execution continue normally | |
| else: | |
| logger.info('Validator: Task complete') | |
| # Override output with done message for final step | |
| if final_result: | |
| output = final_result | |
| else: | |
| # At limits - skip validation and accept done() | |
| if self._validation_count >= self.max_validations: | |
| logger.info( | |
| f'Reached max validations ({self.max_validations}) - skipping validation and accepting done()' | |
| ) | |
| else: | |
| logger.info('At step/error limits - skipping validation') | |
| if final_result: | |
| output = final_result | |
| if output: | |
| # Check if this is the final done() output | |
| if self._is_task_done(): | |
| # Show done() output more prominently | |
| logger.info( | |
| f'✓ Task completed - Final output from done():\n{output[:300] if len(output) > 300 else output}' | |
| ) | |
| # Also show files_to_display if they exist in namespace | |
| attachments: list[str] | None = self.namespace.get('_task_attachments') # type: ignore[assignment] | |
| if attachments: | |
| logger.info(f'Files displayed: {", ".join(attachments)}') | |
| else: | |
| logger.info(f'Code output:\n{output}') | |
| # Browser state is now only logged when fetched before LLM call (not after execution) | |
| # Take screenshot for eval tracking | |
| screenshot_path = await self._capture_screenshot(step + 1) | |
| # Add step to complete_history for eval system | |
| await self._add_step_to_complete_history( | |
| model_output_code=code, | |
| full_llm_response=full_llm_response, | |
| output=output, | |
| error=error, | |
| screenshot_path=screenshot_path, | |
| ) | |
| # Check if task is done (after validation) | |
| if self._is_task_done(): | |
| # Get the final result from namespace | |
| final_result: str | None = self.namespace.get('_task_result', output) # type: ignore[assignment] | |
| logger.info('Task completed successfully') | |
| if final_result: | |
| logger.info(f'Final result: {final_result}') | |
| break | |
| # If validation rejected done(), continue to next iteration | |
| # The feedback message has already been added to _llm_messages | |
| # Add result to LLM messages for next iteration (without browser state) | |
| result_message = self._format_execution_result(code, output, error, current_step=step + 1) | |
| truncated_result = truncate_message_content(result_message) | |
| self._llm_messages.append(UserMessage(content=truncated_result)) | |
| except Exception as e: | |
| logger.error(f'Error in step {step + 1}: {e}') | |
| traceback.print_exc() | |
| break | |
| else: | |
| # Loop completed without break - max_steps reached | |
| logger.warning(f'Maximum steps ({self.max_steps}) reached without task completion') | |
| # If task is not done, capture the last step's output as partial result | |
| if not self._is_task_done() and self.complete_history: | |
| # Get the last step's output/error and use it as final extracted_content | |
| last_step = self.complete_history[-1] | |
| last_result = last_step.result[0] if last_step.result else None | |
| last_output = last_result.extracted_content if last_result else None | |
| last_error = last_result.error if last_result else None | |
| # Build a partial result message from the last step | |
| partial_result_parts = [] | |
| partial_result_parts.append(f'Task incomplete - reached step limit ({self.max_steps} steps).') | |
| partial_result_parts.append('Last step output:') | |
| if last_output: | |
| partial_result_parts.append(f'\nOutput: {last_output}') | |
| if last_error: | |
| partial_result_parts.append(f'\nError: {last_error}') | |
| # Add any accumulated variables that might contain useful data | |
| data_vars = [] | |
| for var_name in sorted(self.namespace.keys()): | |
| if not var_name.startswith('_') and var_name not in {'json', 'asyncio', 'csv', 're', 'datetime', 'Path'}: | |
| var_value = self.namespace[var_name] | |
| # Check if it's a list or dict that might contain collected data | |
| if isinstance(var_value, (list, dict)) and var_value: | |
| data_vars.append(f' - {var_name}: {type(var_value).__name__} with {len(var_value)} items') | |
| if data_vars: | |
| partial_result_parts.append('\nVariables in namespace that may contain partial data:') | |
| partial_result_parts.extend(data_vars) | |
| partial_result = '\n'.join(partial_result_parts) | |
| # Update the last step's extracted_content with this partial result | |
| if last_result: | |
| last_result.extracted_content = partial_result | |
| last_result.is_done = False | |
| last_result.success = False | |
| logger.info(f'\nPartial result captured from last step:\n{partial_result}') | |
| # Log final summary if task was completed | |
| if self._is_task_done(): | |
| logger.info('\n' + '=' * 60) | |
| logger.info('TASK COMPLETED SUCCESSFULLY') | |
| logger.info('=' * 60) | |
| final_result: str | None = self.namespace.get('_task_result') # type: ignore[assignment] | |
| if final_result: | |
| logger.info(f'\nFinal Output:\n{final_result}') | |
| attachments: list[str] | None = self.namespace.get('_task_attachments') # type: ignore[assignment] | |
| if attachments: | |
| logger.info(f'\nFiles Attached:\n{chr(10).join(attachments)}') | |
| logger.info('=' * 60 + '\n') | |
| # Auto-close browser if keep_alive is False | |
| await self.close() | |
| # Store usage summary for history property | |
| self.usage_summary = await self.token_cost_service.get_usage_summary() | |
| # Log token usage summary | |
| await self.token_cost_service.log_usage_summary() | |
| # Log telemetry event | |
| try: | |
| self._log_agent_event(max_steps=self.max_steps, agent_run_error=agent_run_error) | |
| except Exception as log_e: | |
| logger.error(f'Failed to log telemetry event: {log_e}', exc_info=True) | |
| return self.session | |
| async def _get_code_from_llm(self) -> tuple[str, str]: | |
| """Get Python code from the LLM. | |
| Returns: | |
| Tuple of (extracted_code, full_llm_response) | |
| """ | |
| # Prepare messages for this request | |
| # Include browser state as separate message if available (not accumulated in history) | |
| messages_to_send = self._llm_messages.copy() | |
| if self._last_browser_state_text: | |
| # Create message with optional screenshot | |
| if self.use_vision and self._last_screenshot: | |
| # Build content with text + screenshot | |
| content_parts: list[ContentPartTextParam | ContentPartImageParam] = [ | |
| ContentPartTextParam(text=self._last_browser_state_text) | |
| ] | |
| # Add screenshot | |
| content_parts.append( | |
| ContentPartImageParam( | |
| image_url=ImageURL( | |
| url=f'data:image/jpeg;base64,{self._last_screenshot}', | |
| media_type='image/jpeg', | |
| detail='auto', | |
| ), | |
| ) | |
| ) | |
| messages_to_send.append(UserMessage(content=content_parts)) | |
| else: | |
| # Text only | |
| messages_to_send.append(UserMessage(content=self._last_browser_state_text)) | |
| # Clear browser state after including it so it's only in this request | |
| self._last_browser_state_text = None | |
| self._last_screenshot = None | |
| # Call LLM with message history (including temporary browser state message) | |
| response = await self.llm.ainvoke(messages_to_send) | |
| # Store usage stats from this LLM call | |
| self._last_llm_usage = response.usage | |
| # Log the LLM's raw output for debugging | |
| logger.info(f'LLM Response:\n{response.completion}') | |
| # Check for token limit or repetition issues | |
| max_tokens = getattr(self.llm, 'max_tokens', None) | |
| completion_tokens = response.usage.completion_tokens if response.usage else None | |
| is_problematic, issue_message = detect_token_limit_issue( | |
| completion=response.completion, | |
| completion_tokens=completion_tokens, | |
| max_tokens=max_tokens, | |
| stop_reason=response.stop_reason, | |
| ) | |
| if is_problematic: | |
| logger.warning(f'Token limit issue detected: {issue_message}') | |
| # Don't add the bad response to history | |
| # Instead, inject a system message prompting recovery | |
| recovery_prompt = ( | |
| f'Your previous response hit a token limit or became repetitive: {issue_message}\n\n' | |
| 'Please write a SHORT plan (2 sentences) for what to do next, then execute ONE simple action.' | |
| ) | |
| self._llm_messages.append(UserMessage(content=recovery_prompt)) | |
| # Return a controlled error message instead of corrupted code | |
| return '', f'[Token limit error: {issue_message}]' | |
| # Store the full response | |
| full_response = response.completion | |
| # Extract code blocks from response | |
| # Support multiple code block types: python, js, bash, markdown | |
| code_blocks = extract_code_blocks(response.completion) | |
| # Inject non-python blocks into namespace as variables | |
| # Track which variables are code blocks for browser state display | |
| if '_code_block_vars' not in self.namespace: | |
| self.namespace['_code_block_vars'] = set() | |
| for block_type, block_content in code_blocks.items(): | |
| if not block_type.startswith('python'): | |
| # Store js, bash, markdown blocks (and named variants) as variables in namespace | |
| self.namespace[block_type] = block_content | |
| self.namespace['_code_block_vars'].add(block_type) | |
| print(f'→ Code block variable: {block_type} (str, {len(block_content)} chars)') | |
| logger.debug(f'Injected {block_type} block into namespace ({len(block_content)} chars)') | |
| # Store all code blocks for sequential execution | |
| self.namespace['_all_code_blocks'] = code_blocks | |
| # Get Python code if it exists | |
| # If no python block exists and no other code blocks exist, return empty string to skip execution | |
| # This prevents treating plain text explanations as code | |
| code = code_blocks.get('python', response.completion) | |
| # Add to LLM messages (truncate for history to save context) | |
| truncated_completion = truncate_message_content(response.completion) | |
| self._llm_messages.append(AssistantMessage(content=truncated_completion)) | |
| return code, full_response | |
| def _print_variable_info(self, var_name: str, value: Any) -> None: | |
| """Print compact info about a variable assignment.""" | |
| # Skip built-in modules and known imports | |
| skip_names = { | |
| 'json', | |
| 'asyncio', | |
| 'csv', | |
| 're', | |
| 'datetime', | |
| 'Path', | |
| 'pd', | |
| 'np', | |
| 'plt', | |
| 'requests', | |
| 'BeautifulSoup', | |
| 'PdfReader', | |
| 'browser', | |
| 'file_system', | |
| } | |
| if var_name in skip_names: | |
| return | |
| # Skip code block variables (already printed) | |
| if '_code_block_vars' in self.namespace and var_name in self.namespace.get('_code_block_vars', set()): | |
| return | |
| # Print compact variable info | |
| if isinstance(value, (list, dict)): | |
| preview = str(value)[:100] | |
| print(f'→ Variable: {var_name} ({type(value).__name__}, len={len(value)}, preview={preview}...)') | |
| elif isinstance(value, str) and len(value) > 50: | |
| print(f'→ Variable: {var_name} (str, {len(value)} chars, preview={value[:50]}...)') | |
| elif callable(value): | |
| print(f'→ Variable: {var_name} (function)') | |
| else: | |
| print(f'→ Variable: {var_name} ({type(value).__name__}, value={repr(value)[:50]})') | |
| async def _execute_code(self, code: str) -> tuple[str | None, str | None, str | None]: | |
| """ | |
| Execute Python code in the namespace. | |
| Args: | |
| code: The Python code to execute | |
| Returns: | |
| Tuple of (output, error, browser_state) | |
| """ | |
| # Create new cell | |
| cell = self.session.add_cell(source=code) | |
| cell.status = ExecutionStatus.RUNNING | |
| cell.execution_count = self.session.increment_execution_count() | |
| output = None | |
| error = None | |
| browser_state = None | |
| try: | |
| # Capture output | |
| import ast | |
| import io | |
| import sys | |
| old_stdout = sys.stdout | |
| sys.stdout = io.StringIO() | |
| try: | |
| # Add asyncio to namespace if not already there | |
| if 'asyncio' not in self.namespace: | |
| self.namespace['asyncio'] = asyncio | |
| # Store the current code in namespace for done() validation | |
| self.namespace['_current_cell_code'] = code | |
| # Store consecutive errors count for done() validation | |
| self.namespace['_consecutive_errors'] = self._consecutive_errors | |
| # Check if code contains await expressions - if so, wrap in async function | |
| # This mimics how Jupyter/IPython handles top-level await | |
| try: | |
| tree = ast.parse(code, mode='exec') | |
| has_await = any(isinstance(node, (ast.Await, ast.AsyncWith, ast.AsyncFor)) for node in ast.walk(tree)) | |
| except SyntaxError: | |
| # If parse fails, let exec handle the error | |
| has_await = False | |
| if has_await: | |
| # When code has await, we must wrap in async function | |
| # To make variables persist naturally (like Jupyter without needing 'global'): | |
| # 1. Extract all assigned variable names from the code | |
| # 2. Inject 'global' declarations for variables that already exist in namespace | |
| # 3. Extract user's explicit global declarations and pre-define those vars | |
| # 4. Return locals() so we can update namespace with new variables | |
| # Find all variable names being assigned + user's explicit globals | |
| try: | |
| assigned_names = set() | |
| user_global_names = set() | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Assign): | |
| for target in node.targets: | |
| if isinstance(target, ast.Name): | |
| assigned_names.add(target.id) | |
| elif isinstance(node, ast.AugAssign) and isinstance(node.target, ast.Name): | |
| assigned_names.add(node.target.id) | |
| elif isinstance(node, (ast.AnnAssign, ast.NamedExpr)): | |
| if hasattr(node, 'target') and isinstance(node.target, ast.Name): | |
| assigned_names.add(node.target.id) | |
| elif isinstance(node, ast.Global): | |
| # Track user's explicit global declarations | |
| user_global_names.update(node.names) | |
| # Pre-define any user-declared globals that don't exist yet | |
| # This prevents NameError when user writes "global foo" before "foo = ..." | |
| for name in user_global_names: | |
| if name not in self.namespace: | |
| self.namespace[name] = None | |
| # Filter to only existing namespace vars (like Jupyter does) | |
| # Include both: assigned vars that exist + user's explicit globals | |
| existing_vars = {name for name in (assigned_names | user_global_names) if name in self.namespace} | |
| except Exception as e: | |
| existing_vars = set() | |
| # Build global declaration if needed | |
| global_decl = '' | |
| has_global_decl = False | |
| if existing_vars: | |
| vars_str = ', '.join(sorted(existing_vars)) | |
| global_decl = f' global {vars_str}\n' | |
| has_global_decl = True | |
| indented_code = '\n'.join(' ' + line if line.strip() else line for line in code.split('\n')) | |
| wrapped_code = f"""async def __code_exec__(): | |
| {global_decl}{indented_code} | |
| # Return locals so we can update the namespace | |
| return locals() | |
| __code_exec_coro__ = __code_exec__() | |
| """ | |
| # Store whether we added a global declaration (needed for error line mapping) | |
| self.namespace['_has_global_decl'] = has_global_decl | |
| # Compile and execute wrapper at module level | |
| compiled_code = compile(wrapped_code, '<code>', 'exec') | |
| exec(compiled_code, self.namespace, self.namespace) | |
| # Get and await the coroutine, then update namespace with new/modified variables | |
| coro = self.namespace.get('__code_exec_coro__') | |
| if coro: | |
| result_locals = await coro | |
| # Update namespace with all variables from the function's locals | |
| # This makes variable assignments persist across cells | |
| if result_locals: | |
| for key, value in result_locals.items(): | |
| if not key.startswith('_'): | |
| self.namespace[key] = value | |
| # Variable info is tracked in "Available" section, no need for verbose inline output | |
| # Clean up temporary variables | |
| self.namespace.pop('__code_exec_coro__', None) | |
| self.namespace.pop('__code_exec__', None) | |
| else: | |
| # No await - execute directly at module level for natural variable scoping | |
| # This means x = x + 10 will work without needing 'global x' | |
| # Track variables before execution | |
| vars_before = set(self.namespace.keys()) | |
| compiled_code = compile(code, '<code>', 'exec') | |
| exec(compiled_code, self.namespace, self.namespace) | |
| # Track newly created/modified variables (info shown in "Available" section) | |
| vars_after = set(self.namespace.keys()) | |
| new_vars = vars_after - vars_before | |
| # Get output | |
| output_value = sys.stdout.getvalue() | |
| if output_value: | |
| output = output_value | |
| finally: | |
| sys.stdout = old_stdout | |
| # Wait 2 seconds for page to stabilize after code execution | |
| await asyncio.sleep(0.5) | |
| # Note: Browser state is now fetched right before LLM call instead of after each execution | |
| # This reduces unnecessary state fetches for operations that don't affect the browser | |
| cell.status = ExecutionStatus.SUCCESS | |
| cell.output = output | |
| cell.browser_state = None # Will be captured in next iteration before LLM call | |
| except Exception as e: | |
| # Handle EvaluateError specially - JavaScript execution failed | |
| if isinstance(e, EvaluateError): | |
| error = str(e) | |
| cell.status = ExecutionStatus.ERROR | |
| cell.error = error | |
| logger.error(f'Code execution error: {error}') | |
| await asyncio.sleep(1) | |
| # Browser state will be fetched before next LLM call | |
| # Return immediately - do not continue executing code | |
| return output, error, None | |
| # Handle NameError specially - check for code block variable confusion | |
| if isinstance(e, NameError): | |
| error_msg = str(e) | |
| cell.status = ExecutionStatus.ERROR | |
| cell.error = error | |
| # Browser state will be fetched before next LLM call | |
| await asyncio.sleep(0.5) | |
| return output, error, None | |
| # For syntax errors and common parsing errors, show just the error message | |
| # without the full traceback to keep output clean | |
| if isinstance(e, SyntaxError): | |
| error_msg = e.msg if e.msg else str(e) | |
| error = f'{type(e).__name__}: {error_msg}' | |
| # Detect common f-string issues with JSON/JavaScript code | |
| if 'unterminated' in error_msg.lower() and 'string' in error_msg.lower() and code: | |
| # Check if code contains f-strings with potential JSON/JS content | |
| has_fstring = bool(re.search(r'\bf["\']', code)) | |
| has_json_pattern = bool(re.search(r'json\.dumps|"[^"]*\{[^"]*\}[^"]*"|\'[^\']*\{[^\']*\}[^\']*\'', code)) | |
| has_js_pattern = bool(re.search(r'evaluate\(|await evaluate', code)) | |
| if has_fstring and (has_json_pattern or has_js_pattern): | |
| error += ( | |
| '\n\n💡 TIP: Detected f-string with JSON/JavaScript code containing {}.\n' | |
| ' Use separate ```js or ```markdown blocks instead of f-strings to avoid escaping issues.\n' | |
| ' If your code block needs ``` inside it, wrap with 4+ backticks: ````markdown code`\n' | |
| ) | |
| # Detect and provide helpful hints for common string literal errors | |
| if 'unterminated' in error_msg.lower() and 'string' in error_msg.lower(): | |
| # Detect what type of string literal is unterminated | |
| is_triple = 'triple-quoted' in error_msg.lower() | |
| msg_lower = error_msg.lower() | |
| # Detect prefix type from error message | |
| if 'f-string' in msg_lower and 'raw' in msg_lower: | |
| prefix = 'rf or fr' | |
| desc = 'raw f-string' | |
| elif 'f-string' in msg_lower: | |
| prefix = 'f' | |
| desc = 'f-string' | |
| elif 'raw' in msg_lower and 'bytes' in msg_lower: | |
| prefix = 'rb or br' | |
| desc = 'raw bytes' | |
| elif 'raw' in msg_lower: | |
| prefix = 'r' | |
| desc = 'raw string' | |
| elif 'bytes' in msg_lower: | |
| prefix = 'b' | |
| desc = 'bytes' | |
| else: | |
| prefix = '' | |
| desc = 'string' | |
| # Build hint based on triple-quoted vs single/double quoted | |
| if is_triple: | |
| if prefix: | |
| hint = f"Hint: Unterminated {prefix}'''...''' or {prefix}\"\"\"...\"\" ({desc}). Check for missing closing quotes or unescaped quotes inside." | |
| else: | |
| hint = "Hint: Unterminated '''...''' or \"\"\"...\"\" detected. Check for missing closing quotes or unescaped quotes inside." | |
| hint += '\n If you need ``` inside your string, use a ````markdown varname` code block with 4+ backticks instead.' | |
| else: | |
| if prefix: | |
| hint = f'Hint: Unterminated {prefix}\'...\' or {prefix}"..." ({desc}). Check for missing closing quote or unescaped quotes inside.' | |
| else: | |
| hint = 'Hint: Unterminated \'...\' or "..." detected. Check for missing closing quote or unescaped quotes inside the string.' | |
| error += f'\n{hint}' | |
| # Show the problematic line from the code | |
| if e.text: | |
| error += f'\n{e.text}' | |
| elif e.lineno and code: | |
| # If e.text is empty, extract the line from the code | |
| lines = code.split('\n') | |
| if 0 < e.lineno <= len(lines): | |
| error += f'\n{lines[e.lineno - 1]}' | |
| else: | |
| # For other errors, try to extract useful information | |
| error_str = str(e) | |
| error = f'{type(e).__name__}: {error_str}' if error_str else f'{type(e).__name__} occurred' | |
| # For RuntimeError or other exceptions, try to extract traceback info | |
| # to show which line in the user's code actually failed | |
| if hasattr(e, '__traceback__'): | |
| # Walk the traceback to find the frame with '<code>' filename | |
| tb = e.__traceback__ | |
| user_code_lineno = None | |
| while tb is not None: | |
| frame = tb.tb_frame | |
| if frame.f_code.co_filename == '<code>': | |
| # Found the frame executing user code | |
| # Get the line number from the traceback | |
| user_code_lineno = tb.tb_lineno | |
| break | |
| tb = tb.tb_next | |
| cell.status = ExecutionStatus.ERROR | |
| cell.error = error | |
| logger.error(f'Code execution error: {error}') | |
| await asyncio.sleep(1) | |
| # Browser state will be fetched before next LLM call | |
| return output, error, None | |
| async def _get_browser_state(self) -> tuple[str, str | None]: | |
| """Get the current browser state as text with ultra-minimal DOM structure for code agents. | |
| Returns: | |
| Tuple of (browser_state_text, screenshot_base64) | |
| """ | |
| if not self.browser_session or not self.dom_service: | |
| return 'Browser state not available', None | |
| try: | |
| # Get full browser state including screenshot if use_vision is enabled | |
| include_screenshot = True | |
| state = await self.browser_session.get_browser_state_summary(include_screenshot=include_screenshot) | |
| # Format browser state with namespace context | |
| browser_state_text = await format_browser_state_for_llm( | |
| state=state, namespace=self.namespace, browser_session=self.browser_session | |
| ) | |
| screenshot = state.screenshot if include_screenshot else None | |
| return browser_state_text, screenshot | |
| except Exception as e: | |
| logger.error(f'Failed to get browser state: {e}') | |
| return f'Error getting browser state: {e}', None | |
| def _format_execution_result(self, code: str, output: str | None, error: str | None, current_step: int | None = None) -> str: | |
| """Format the execution result for the LLM (without browser state).""" | |
| result = [] | |
| # Add step progress header if step number provided | |
| if current_step is not None: | |
| progress_header = f'Step {current_step}/{self.max_steps} executed' | |
| # Add consecutive failure tracking if there are errors | |
| if error and self._consecutive_errors > 0: | |
| progress_header += f' | Consecutive failures: {self._consecutive_errors}/{self.max_failures}' | |
| result.append(progress_header) | |
| if error: | |
| result.append(f'Error: {error}') | |
| if output: | |
| # Truncate output if too long | |
| if len(output) > 10000: | |
| output = output[:9950] + '\n[Truncated after 10000 characters]' | |
| result.append(f'Output: {output}') | |
| if len(result) == 0: | |
| result.append('Executed') | |
| return '\n'.join(result) | |
| def _is_task_done(self) -> bool: | |
| """Check if the task is marked as done in the namespace.""" | |
| # Check if 'done' was called by looking for a special marker in namespace | |
| return self.namespace.get('_task_done', False) | |
| async def _capture_screenshot(self, step_number: int) -> str | None: | |
| """Capture and store screenshot for eval tracking.""" | |
| if not self.browser_session: | |
| return None | |
| try: | |
| # Get browser state summary which includes screenshot | |
| state = await self.browser_session.get_browser_state_summary(include_screenshot=True) | |
| if state and state.screenshot: | |
| # Store screenshot using screenshot service | |
| screenshot_path = await self.screenshot_service.store_screenshot(state.screenshot, step_number) | |
| return str(screenshot_path) if screenshot_path else None | |
| except Exception as e: | |
| logger.warning(f'Failed to capture screenshot for step {step_number}: {e}') | |
| return None | |
| async def _add_step_to_complete_history( | |
| self, | |
| model_output_code: str, | |
| full_llm_response: str, | |
| output: str | None, | |
| error: str | None, | |
| screenshot_path: str | None, | |
| ) -> None: | |
| """Add a step to complete_history using type-safe models.""" | |
| # Get current browser URL and title for state | |
| url: str | None = None | |
| title: str | None = None | |
| if self.browser_session: | |
| try: | |
| url = await self.browser_session.get_current_page_url() | |
| # Get title from browser | |
| cdp_session = await self.browser_session.get_or_create_cdp_session() | |
| result = await cdp_session.cdp_client.send.Runtime.evaluate( | |
| params={'expression': 'document.title', 'returnByValue': True}, | |
| session_id=cdp_session.session_id, | |
| ) | |
| title = result.get('result', {}).get('value') | |
| except Exception as e: | |
| logger.debug(f'Failed to get browser URL/title for history: {e}') | |
| # Check if this is a done result | |
| is_done = self._is_task_done() | |
| # Get self-reported success from done() call if task is done | |
| self_reported_success: bool | None = None | |
| if is_done: | |
| task_success = self.namespace.get('_task_success') | |
| self_reported_success = task_success if isinstance(task_success, bool) else None | |
| # Create result entry using typed model | |
| result_entry = CodeAgentResult( | |
| extracted_content=output if output else None, | |
| error=error if error else None, | |
| is_done=is_done, | |
| success=self_reported_success, | |
| ) | |
| # Create state entry using typed model | |
| state_entry = CodeAgentState(url=url, title=title, screenshot_path=screenshot_path) | |
| # Create metadata entry using typed model | |
| step_end_time = datetime.datetime.now().timestamp() | |
| metadata_entry = CodeAgentStepMetadata( | |
| input_tokens=self._last_llm_usage.prompt_tokens if self._last_llm_usage else None, | |
| output_tokens=self._last_llm_usage.completion_tokens if self._last_llm_usage else None, | |
| step_start_time=self._step_start_time, | |
| step_end_time=step_end_time, | |
| ) | |
| # Create model output entry using typed model (if there's code to track) | |
| model_output_entry: CodeAgentModelOutput | None = None | |
| if model_output_code or full_llm_response: | |
| model_output_entry = CodeAgentModelOutput( | |
| model_output=model_output_code if model_output_code else '', | |
| full_response=full_llm_response if full_llm_response else '', | |
| ) | |
| # Create history entry using typed model | |
| history_entry = CodeAgentHistory( | |
| model_output=model_output_entry, | |
| result=[result_entry], | |
| state=state_entry, | |
| metadata=metadata_entry, | |
| screenshot_path=screenshot_path, # Keep for backward compatibility | |
| ) | |
| self.complete_history.append(history_entry) | |
| def _log_agent_event(self, max_steps: int, agent_run_error: str | None = None) -> None: | |
| """Send the agent event for this run to telemetry.""" | |
| from urllib.parse import urlparse | |
| token_summary = self.token_cost_service.get_usage_tokens_for_model(self.llm.model) | |
| # For CodeAgent, we don't have action history like Agent does | |
| # Instead we track the code execution cells | |
| action_history_data: list[list[dict[str, Any]] | None] = [] | |
| for step in self.complete_history: | |
| # Extract code from model_output if available (type-safe access) | |
| if step.model_output and step.model_output.full_response: | |
| code = step.model_output.full_response | |
| # Represent each code cell as a simple action entry | |
| action_history_data.append([{'llm_response': code}]) | |
| else: | |
| action_history_data.append(None) | |
| # Get final result from the last step or namespace (type-safe) | |
| final_result: Any = self.namespace.get('_task_result') | |
| final_result_str: str | None = final_result if isinstance(final_result, str) else None | |
| # Get URLs visited from complete_history (type-safe access) | |
| urls_visited: list[str] = [] | |
| for step in self.complete_history: | |
| if step.state.url and step.state.url not in urls_visited: | |
| urls_visited.append(step.state.url) | |
| # Get errors from complete_history (type-safe access) | |
| errors: list[str] = [] | |
| for step in self.complete_history: | |
| for result in step.result: | |
| if result.error: | |
| errors.append(result.error) | |
| # Determine success from task completion status (type-safe) | |
| is_done = self._is_task_done() | |
| task_success: Any = self.namespace.get('_task_success') | |
| self_reported_success: bool | None = task_success if isinstance(task_success, bool) else (False if is_done else None) | |
| self.telemetry.capture( | |
| AgentTelemetryEvent( | |
| task=self.task, | |
| model=self.llm.model, | |
| model_provider=self.llm.provider, | |
| max_steps=max_steps, | |
| max_actions_per_step=1, # CodeAgent executes one code cell per step | |
| use_vision=self.use_vision, | |
| version=self.version, | |
| source=self.source, | |
| cdp_url=urlparse(self.browser_session.cdp_url).hostname | |
| if self.browser_session and self.browser_session.cdp_url | |
| else None, | |
| agent_type='code', # CodeAgent identifier | |
| action_errors=errors, | |
| action_history=action_history_data, | |
| urls_visited=urls_visited, | |
| steps=len(self.complete_history), | |
| total_input_tokens=token_summary.prompt_tokens, | |
| total_output_tokens=token_summary.completion_tokens, | |
| prompt_cached_tokens=token_summary.prompt_cached_tokens, | |
| total_tokens=token_summary.total_tokens, | |
| total_duration_seconds=sum(step.metadata.duration_seconds for step in self.complete_history if step.metadata), | |
| success=self_reported_success, | |
| final_result_response=final_result_str, | |
| error_message=agent_run_error, | |
| ) | |
| ) | |
| def screenshot_paths(self, n_last: int | None = None) -> list[str | None]: | |
| """ | |
| Get screenshot paths from complete_history for eval system. | |
| Args: | |
| n_last: Optional number of last screenshots to return | |
| Returns: | |
| List of screenshot file paths (or None for missing screenshots) | |
| """ | |
| paths = [step.screenshot_path for step in self.complete_history] | |
| if n_last is not None: | |
| return paths[-n_last:] if len(paths) > n_last else paths | |
| return paths | |
| def message_manager(self) -> Any: | |
| """ | |
| Compatibility property for eval system. | |
| Returns a mock object with last_input_messages attribute. | |
| """ | |
| class MockMessageManager: | |
| def __init__(self, llm_messages: list[BaseMessage]) -> None: | |
| # Convert code-use LLM messages to format expected by eval system | |
| self.last_input_messages = llm_messages | |
| return MockMessageManager(self._llm_messages) | |
| def history(self) -> Any: | |
| """ | |
| Compatibility property for eval system. | |
| Returns a mock AgentHistoryList object with history attribute containing complete_history. | |
| This is what the eval system expects when it does: agent_history = agent.history | |
| """ | |
| class DictToObject: | |
| """Convert dict to object with attribute access for eval compatibility.""" | |
| def __init__(self, data: dict[str, Any]) -> None: | |
| for key, value in data.items(): | |
| if isinstance(value, dict): | |
| setattr(self, key, DictToObject(value)) | |
| elif isinstance(value, list): | |
| setattr(self, key, [DictToObject(item) if isinstance(item, dict) else item for item in value]) | |
| else: | |
| setattr(self, key, value) | |
| def __getattr__(self, name: str) -> None: | |
| """Provide safe attribute access with defaults for missing attributes.""" | |
| # Return None for missing attributes instead of raising AttributeError | |
| # This handles cases where eval system checks attributes that CodeAgent doesn't set | |
| return None | |
| def model_dump(self) -> dict[str, Any]: | |
| """Support model_dump() calls from eval system.""" | |
| result = {} | |
| for key, value in self.__dict__.items(): | |
| if isinstance(value, DictToObject): | |
| result[key] = value.model_dump() | |
| elif isinstance(value, list): | |
| result[key] = [item.model_dump() if isinstance(item, DictToObject) else item for item in value] | |
| else: | |
| result[key] = value | |
| return result | |
| def get_screenshot(self) -> str | None: | |
| """Support get_screenshot() calls for state objects.""" | |
| # Load screenshot from disk and return as base64 string (matching BrowserStateHistory implementation) | |
| if not hasattr(self, 'screenshot_path') or not self.screenshot_path: | |
| return None | |
| import base64 | |
| from pathlib import Path | |
| path_obj = Path(self.screenshot_path) | |
| if not path_obj.exists(): | |
| return None | |
| try: | |
| with open(path_obj, 'rb') as f: | |
| screenshot_data = f.read() | |
| return base64.b64encode(screenshot_data).decode('utf-8') | |
| except Exception: | |
| return None | |
| class MockAgentHistoryList: | |
| def __init__(self, complete_history: list[CodeAgentHistory], usage_summary: UsageSummary | None) -> None: | |
| # Convert each CodeAgentHistory to dict, then to object with attribute access | |
| self.history = [DictToObject(item.model_dump()) for item in complete_history] | |
| # Use the provided usage summary | |
| self.usage = usage_summary | |
| return MockAgentHistoryList(self.complete_history, self.usage_summary) | |
| async def close(self) -> None: | |
| """Close the browser session.""" | |
| if self.browser_session: | |
| # Check if we should close the browser based on keep_alive setting | |
| if not self.browser_session.browser_profile.keep_alive: | |
| await self.browser_session.kill() | |
| else: | |
| logger.debug('Browser keep_alive is True, not closing browser session') | |
| async def __aenter__(self) -> 'CodeAgent': | |
| """Async context manager entry.""" | |
| return self | |
| async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None: | |
| """Async context manager exit.""" | |
| await self.close() | |