| |
| """ |
| Browser Tool Module |
| |
| This module provides browser automation tools using agent-browser CLI. It |
| supports multiple backends β **Browser Use** (cloud, default for Nous |
| subscribers), **Browserbase** (cloud, direct credentials), and **local |
| Chromium** β with identical agent-facing behaviour. The backend is |
| auto-detected from config and available credentials. |
| |
| The tool uses agent-browser's accessibility tree (ariaSnapshot) for text-based |
| page representation, making it ideal for LLM agents without vision capabilities. |
| |
| Features: |
| - **Local mode** (default): zero-cost headless Chromium via agent-browser. |
| Works on Linux servers without a display. One-time setup: |
| ``agent-browser install`` (downloads Chromium) or |
| ``agent-browser install --with-deps`` (also installs system libraries for |
| Debian/Ubuntu/Docker). |
| - **Cloud mode**: Browserbase or Browser Use cloud execution when configured. |
| - Session isolation per task ID |
| - Text-based page snapshots using accessibility tree |
| - Element interaction via ref selectors (@e1, @e2, etc.) |
| - Task-aware content extraction using LLM summarization |
| - Automatic cleanup of browser sessions |
| |
| Environment Variables: |
| - BROWSERBASE_API_KEY: API key for direct Browserbase cloud mode |
| - BROWSERBASE_PROJECT_ID: Project ID for direct Browserbase cloud mode |
| - BROWSER_USE_API_KEY: API key for direct Browser Use cloud mode |
| - BROWSERBASE_PROXIES: Enable/disable residential proxies (default: "true") |
| - BROWSERBASE_ADVANCED_STEALTH: Enable advanced stealth mode with custom Chromium, |
| requires Scale Plan (default: "false") |
| - BROWSERBASE_KEEP_ALIVE: Enable keepAlive for session reconnection after disconnects, |
| requires paid plan (default: "true") |
| - BROWSERBASE_SESSION_TIMEOUT: Custom session timeout in milliseconds. Set to extend |
| beyond project default. Common values: 600000 (10min), 1800000 (30min) (default: none) |
| |
| Usage: |
| from tools.browser_tool import browser_navigate, browser_snapshot, browser_click |
| |
| # Navigate to a page |
| result = browser_navigate("https://example.com", task_id="task_123") |
| |
| # Get page snapshot |
| snapshot = browser_snapshot(task_id="task_123") |
| |
| # Click an element |
| browser_click("@e5", task_id="task_123") |
| """ |
|
|
| import atexit |
| import functools |
| import json |
| import logging |
| import os |
| import re |
| import signal |
| import subprocess |
| import shutil |
| import sys |
| import tempfile |
| import threading |
| import time |
| import requests |
| from typing import Dict, Any, Optional, List, Tuple |
| from pathlib import Path |
| from agent.auxiliary_client import call_llm |
| from hermes_constants import get_hermes_home |
|
|
| try: |
| from tools.website_policy import check_website_access |
| except Exception: |
| check_website_access = lambda url: None |
|
|
| try: |
| from tools.url_safety import is_safe_url as _is_safe_url |
| except Exception: |
| _is_safe_url = lambda url: False |
| from tools.browser_providers.base import CloudBrowserProvider |
| from tools.browser_providers.browserbase import BrowserbaseProvider |
| from tools.browser_providers.browser_use import BrowserUseProvider |
| from tools.browser_providers.firecrawl import FirecrawlProvider |
| from tools.tool_backend_helpers import normalize_browser_cloud_provider |
|
|
| |
| |
| |
| try: |
| from tools.browser_camofox import is_camofox_mode as _is_camofox_mode |
| except ImportError: |
| _is_camofox_mode = lambda: False |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| _SANE_PATH_DIRS = ( |
| "/data/data/com.termux/files/usr/bin", |
| "/data/data/com.termux/files/usr/sbin", |
| "/opt/homebrew/bin", |
| "/opt/homebrew/sbin", |
| "/usr/local/sbin", |
| "/usr/local/bin", |
| "/usr/sbin", |
| "/usr/bin", |
| "/sbin", |
| "/bin", |
| ) |
| _SANE_PATH = os.pathsep.join(_SANE_PATH_DIRS) |
|
|
|
|
| @functools.lru_cache(maxsize=1) |
| def _discover_homebrew_node_dirs() -> tuple[str, ...]: |
| """Find Homebrew versioned Node.js bin directories (e.g. node@20, node@24). |
| |
| When Node is installed via ``brew install node@24`` and NOT linked into |
| /opt/homebrew/bin, agent-browser isn't discoverable on the default PATH. |
| This function finds those directories so they can be prepended. |
| """ |
| dirs: list[str] = [] |
| homebrew_opt = "/opt/homebrew/opt" |
| if not os.path.isdir(homebrew_opt): |
| return tuple(dirs) |
| try: |
| for entry in os.listdir(homebrew_opt): |
| if entry.startswith("node") and entry != "node": |
| bin_dir = os.path.join(homebrew_opt, entry, "bin") |
| if os.path.isdir(bin_dir): |
| dirs.append(bin_dir) |
| except OSError: |
| pass |
| return tuple(dirs) |
|
|
|
|
| def _browser_candidate_path_dirs() -> list[str]: |
| """Return ordered browser CLI PATH candidates shared by discovery and execution.""" |
| hermes_home = get_hermes_home() |
| hermes_node_bin = str(hermes_home / "node" / "bin") |
| return [hermes_node_bin, *list(_discover_homebrew_node_dirs()), *_SANE_PATH_DIRS] |
|
|
|
|
| def _merge_browser_path(existing_path: str = "") -> str: |
| """Prepend browser-specific PATH fallbacks without reordering existing entries.""" |
| path_parts = [p for p in (existing_path or "").split(os.pathsep) if p] |
| existing_parts = set(path_parts) |
| prefix_parts: list[str] = [] |
|
|
| for part in _browser_candidate_path_dirs(): |
| if not part or part in existing_parts or part in prefix_parts: |
| continue |
| if os.path.isdir(part): |
| prefix_parts.append(part) |
|
|
| return os.pathsep.join(prefix_parts + path_parts) |
|
|
| |
| _last_screenshot_cleanup_by_dir: dict[str, float] = {} |
|
|
| |
| |
| |
|
|
| |
| DEFAULT_COMMAND_TIMEOUT = 30 |
|
|
| |
| SNAPSHOT_SUMMARIZE_THRESHOLD = 8000 |
|
|
| |
| _EMPTY_OK_COMMANDS: frozenset = frozenset({"close", "record"}) |
|
|
| _cached_command_timeout: Optional[int] = None |
| _command_timeout_resolved = False |
|
|
|
|
| def _get_command_timeout() -> int: |
| """Return the configured browser command timeout from config.yaml. |
| |
| Reads ``config["browser"]["command_timeout"]`` and falls back to |
| ``DEFAULT_COMMAND_TIMEOUT`` (30s) if unset or unreadable. Result is |
| cached after the first call and cleared by ``cleanup_all_browsers()``. |
| """ |
| global _cached_command_timeout, _command_timeout_resolved |
| if _command_timeout_resolved: |
| return _cached_command_timeout |
|
|
| _command_timeout_resolved = True |
| result = DEFAULT_COMMAND_TIMEOUT |
| try: |
| from hermes_cli.config import read_raw_config |
| cfg = read_raw_config() |
| val = cfg.get("browser", {}).get("command_timeout") |
| if val is not None: |
| result = max(int(val), 5) |
| except Exception as e: |
| logger.debug("Could not read command_timeout from config: %s", e) |
| _cached_command_timeout = result |
| return result |
|
|
|
|
| def _get_vision_model() -> Optional[str]: |
| """Model for browser_vision (screenshot analysis β multimodal).""" |
| return os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None |
|
|
|
|
| def _get_extraction_model() -> Optional[str]: |
| """Model for page snapshot text summarization β same as web_extract.""" |
| return os.getenv("AUXILIARY_WEB_EXTRACT_MODEL", "").strip() or None |
|
|
|
|
| def _resolve_cdp_override(cdp_url: str) -> str: |
| """Normalize a user-supplied CDP endpoint into a concrete connectable URL. |
| |
| Accepts: |
| - full websocket endpoints: ws://host:port/devtools/browser/... |
| - HTTP discovery endpoints: http://host:port or http://host:port/json/version |
| - bare websocket host:port values like ws://host:port |
| |
| For discovery-style endpoints we fetch /json/version and return the |
| webSocketDebuggerUrl so downstream tools always receive a concrete browser |
| websocket instead of an ambiguous host:port URL. |
| """ |
| raw = (cdp_url or "").strip() |
| if not raw: |
| return "" |
|
|
| lowered = raw.lower() |
| if "/devtools/browser/" in lowered: |
| return raw |
|
|
| discovery_url = raw |
| if lowered.startswith(("ws://", "wss://")): |
| if raw.count(":") == 2 and raw.rstrip("/").rsplit(":", 1)[-1].isdigit() and "/" not in raw.split(":", 2)[-1]: |
| discovery_url = ("http://" if lowered.startswith("ws://") else "https://") + raw.split("://", 1)[1] |
| else: |
| return raw |
|
|
| if discovery_url.lower().endswith("/json/version"): |
| version_url = discovery_url |
| else: |
| version_url = discovery_url.rstrip("/") + "/json/version" |
|
|
| try: |
| response = requests.get(version_url, timeout=10) |
| response.raise_for_status() |
| payload = response.json() |
| except Exception as exc: |
| logger.warning("Failed to resolve CDP endpoint %s via %s: %s", raw, version_url, exc) |
| return raw |
|
|
| ws_url = str(payload.get("webSocketDebuggerUrl") or "").strip() |
| if ws_url: |
| logger.info("Resolved CDP endpoint %s -> %s", raw, ws_url) |
| return ws_url |
|
|
| logger.warning("CDP discovery at %s did not return webSocketDebuggerUrl; using raw endpoint", version_url) |
| return raw |
|
|
|
|
| def _get_cdp_override() -> str: |
| """Return a normalized CDP URL override, or empty string. |
| |
| Precedence is: |
| 1. ``BROWSER_CDP_URL`` env var (live override from ``/browser connect``) |
| 2. ``browser.cdp_url`` in config.yaml (persistent config) |
| |
| When either is set, we skip both Browserbase and the local headless |
| launcher and connect directly to the supplied Chrome DevTools Protocol |
| endpoint. |
| """ |
| env_override = os.environ.get("BROWSER_CDP_URL", "").strip() |
| if env_override: |
| return _resolve_cdp_override(env_override) |
|
|
| try: |
| from hermes_cli.config import read_raw_config |
|
|
| cfg = read_raw_config() |
| browser_cfg = cfg.get("browser", {}) |
| if isinstance(browser_cfg, dict): |
| return _resolve_cdp_override(str(browser_cfg.get("cdp_url", "") or "")) |
| except Exception as e: |
| logger.debug("Could not read browser.cdp_url from config: %s", e) |
|
|
| return "" |
|
|
|
|
| def _get_dialog_policy_config() -> Tuple[str, float]: |
| """Read ``browser.dialog_policy`` + ``browser.dialog_timeout_s`` from config. |
| |
| Returns a ``(policy, timeout_s)`` tuple, falling back to the supervisor's |
| defaults when keys are absent or invalid. |
| """ |
| |
| from tools.browser_supervisor import ( |
| DEFAULT_DIALOG_POLICY, |
| DEFAULT_DIALOG_TIMEOUT_S, |
| _VALID_POLICIES, |
| ) |
|
|
| try: |
| from hermes_cli.config import read_raw_config |
|
|
| cfg = read_raw_config() |
| browser_cfg = cfg.get("browser", {}) if isinstance(cfg, dict) else {} |
| if not isinstance(browser_cfg, dict): |
| return DEFAULT_DIALOG_POLICY, DEFAULT_DIALOG_TIMEOUT_S |
| policy = str(browser_cfg.get("dialog_policy") or DEFAULT_DIALOG_POLICY) |
| if policy not in _VALID_POLICIES: |
| logger.debug("Invalid browser.dialog_policy=%r; using default", policy) |
| policy = DEFAULT_DIALOG_POLICY |
| timeout_raw = browser_cfg.get("dialog_timeout_s") |
| try: |
| timeout_s = float(timeout_raw) if timeout_raw is not None else DEFAULT_DIALOG_TIMEOUT_S |
| if timeout_s <= 0: |
| timeout_s = DEFAULT_DIALOG_TIMEOUT_S |
| except (TypeError, ValueError): |
| timeout_s = DEFAULT_DIALOG_TIMEOUT_S |
| return policy, timeout_s |
| except Exception: |
| return DEFAULT_DIALOG_POLICY, DEFAULT_DIALOG_TIMEOUT_S |
|
|
|
|
| def _ensure_cdp_supervisor(task_id: str) -> None: |
| """Start a CDP supervisor for ``task_id`` if an endpoint is reachable. |
| |
| Idempotent β delegates to ``SupervisorRegistry.get_or_start`` which skips |
| when a supervisor for this ``(task_id, cdp_url)`` already exists and |
| tears down + restarts on URL change. Safe to call on every |
| ``browser_navigate`` / ``/browser connect`` without worrying about |
| double-attach. |
| |
| Resolves the CDP URL in this order: |
| 1. ``BROWSER_CDP_URL`` / ``browser.cdp_url`` β covers ``/browser connect`` |
| and config-set overrides. |
| 2. ``_active_sessions[task_id]["cdp_url"]`` β covers Browserbase + any |
| other cloud provider whose ``create_session`` returns a raw CDP URL. |
| |
| Swallows all errors β failing to attach the supervisor must not break |
| the browser session itself. The agent simply won't see |
| ``pending_dialogs`` / ``frame_tree`` fields in snapshots. |
| """ |
| cdp_url = _get_cdp_override() |
| if not cdp_url: |
| |
| |
| with _cleanup_lock: |
| session_info = _active_sessions.get(task_id, {}) |
| maybe = str(session_info.get("cdp_url") or "") |
| if maybe: |
| cdp_url = _resolve_cdp_override(maybe) |
| if not cdp_url: |
| return |
| try: |
| from tools.browser_supervisor import SUPERVISOR_REGISTRY |
|
|
| policy, timeout_s = _get_dialog_policy_config() |
| SUPERVISOR_REGISTRY.get_or_start( |
| task_id=task_id, |
| cdp_url=cdp_url, |
| dialog_policy=policy, |
| dialog_timeout_s=timeout_s, |
| ) |
| except Exception as exc: |
| logger.debug( |
| "CDP supervisor attach for task=%s failed (non-fatal): %s", |
| task_id, |
| exc, |
| ) |
|
|
|
|
| def _stop_cdp_supervisor(task_id: str) -> None: |
| """Stop the CDP supervisor for ``task_id`` if one exists. No-op otherwise.""" |
| try: |
| from tools.browser_supervisor import SUPERVISOR_REGISTRY |
|
|
| SUPERVISOR_REGISTRY.stop(task_id) |
| except Exception as exc: |
| logger.debug("CDP supervisor stop for task=%s failed (non-fatal): %s", task_id, exc) |
|
|
|
|
| |
| |
| |
|
|
| _PROVIDER_REGISTRY: Dict[str, type] = { |
| "browserbase": BrowserbaseProvider, |
| "browser-use": BrowserUseProvider, |
| "firecrawl": FirecrawlProvider, |
| } |
|
|
| _cached_cloud_provider: Optional[CloudBrowserProvider] = None |
| _cloud_provider_resolved = False |
| _allow_private_urls_resolved = False |
| _cached_allow_private_urls: Optional[bool] = None |
| _cached_agent_browser: Optional[str] = None |
| _agent_browser_resolved = False |
|
|
|
|
| def _get_cloud_provider() -> Optional[CloudBrowserProvider]: |
| """Return the configured cloud browser provider, or None for local mode. |
| |
| Reads ``config["browser"]["cloud_provider"]`` once and caches the result |
| for the process lifetime. An explicit ``local`` provider disables cloud |
| fallback. If unset, fall back to Browserbase when direct or managed |
| Browserbase credentials are available. |
| """ |
| global _cached_cloud_provider, _cloud_provider_resolved |
| if _cloud_provider_resolved: |
| return _cached_cloud_provider |
|
|
| _cloud_provider_resolved = True |
| try: |
| from hermes_cli.config import read_raw_config |
| cfg = read_raw_config() |
| browser_cfg = cfg.get("browser", {}) |
| provider_key = None |
| if isinstance(browser_cfg, dict) and "cloud_provider" in browser_cfg: |
| provider_key = normalize_browser_cloud_provider( |
| browser_cfg.get("cloud_provider") |
| ) |
| if provider_key == "local": |
| _cached_cloud_provider = None |
| return None |
| if provider_key and provider_key in _PROVIDER_REGISTRY: |
| _cached_cloud_provider = _PROVIDER_REGISTRY[provider_key]() |
| except Exception as e: |
| logger.debug("Could not read cloud_provider from config: %s", e) |
|
|
| if _cached_cloud_provider is None: |
| |
| |
| fallback_provider = BrowserUseProvider() |
| if fallback_provider.is_configured(): |
| _cached_cloud_provider = fallback_provider |
| else: |
| fallback_provider = BrowserbaseProvider() |
| if fallback_provider.is_configured(): |
| _cached_cloud_provider = fallback_provider |
|
|
| return _cached_cloud_provider |
|
|
|
|
| from hermes_constants import is_termux as _is_termux_environment |
|
|
|
|
| def _browser_install_hint() -> str: |
| if _is_termux_environment(): |
| return "npm install -g agent-browser && agent-browser install" |
| return "npm install -g agent-browser && agent-browser install --with-deps" |
|
|
|
|
| def _requires_real_termux_browser_install(browser_cmd: str) -> bool: |
| return _is_termux_environment() and _is_local_mode() and browser_cmd.strip() == "npx agent-browser" |
|
|
|
|
| def _termux_browser_install_error() -> str: |
| return ( |
| "Local browser automation on Termux cannot rely on the bare npx fallback. " |
| f"Install agent-browser explicitly first: {_browser_install_hint()}" |
| ) |
|
|
|
|
| def _is_local_mode() -> bool: |
| """Return True when the browser tool will use a local browser backend.""" |
| if _get_cdp_override(): |
| return False |
| return _get_cloud_provider() is None |
|
|
|
|
| def _is_local_backend() -> bool: |
| """Return True when the browser runs locally (no cloud provider). |
| |
| SSRF protection is only meaningful for cloud backends (Browserbase, |
| BrowserUse) where the agent could reach internal resources on a remote |
| machine. For local backends β Camofox, or the built-in headless |
| Chromium without a cloud provider β the user already has full terminal |
| and network access on the same machine, so the check adds no security |
| value. |
| """ |
| return _is_camofox_mode() or _get_cloud_provider() is None |
|
|
|
|
| def _allow_private_urls() -> bool: |
| """Return whether the browser is allowed to navigate to private/internal addresses. |
| |
| Reads ``config["browser"]["allow_private_urls"]`` once and caches the result |
| for the process lifetime. Defaults to ``False`` (SSRF protection active). |
| """ |
| global _cached_allow_private_urls, _allow_private_urls_resolved |
| if _allow_private_urls_resolved: |
| return _cached_allow_private_urls |
|
|
| _allow_private_urls_resolved = True |
| _cached_allow_private_urls = False |
| try: |
| from hermes_cli.config import read_raw_config |
| cfg = read_raw_config() |
| _cached_allow_private_urls = bool(cfg.get("browser", {}).get("allow_private_urls")) |
| except Exception as e: |
| logger.debug("Could not read allow_private_urls from config: %s", e) |
| return _cached_allow_private_urls |
|
|
|
|
| def _socket_safe_tmpdir() -> str: |
| """Return a short temp directory path suitable for Unix domain sockets. |
| |
| macOS sets ``TMPDIR`` to ``/var/folders/xx/.../T/`` (~51 chars). When we |
| append ``agent-browser-hermes_β¦`` the resulting socket path exceeds the |
| 104-byte macOS limit for ``AF_UNIX`` addresses, causing agent-browser to |
| fail with "Failed to create socket directory" or silent screenshot failures. |
| |
| Linux ``tempfile.gettempdir()`` already returns ``/tmp``, so this is a |
| no-op there. On macOS we bypass ``TMPDIR`` and use ``/tmp`` directly |
| (symlink to ``/private/tmp``, sticky-bit protected, always available). |
| """ |
| if sys.platform == "darwin": |
| return "/tmp" |
| return tempfile.gettempdir() |
|
|
|
|
| |
| |
| _active_sessions: Dict[str, Dict[str, str]] = {} |
| _recording_sessions: set = set() |
|
|
| |
| _cleanup_done = False |
|
|
| |
| |
| |
|
|
| |
| |
| |
| BROWSER_SESSION_INACTIVITY_TIMEOUT = int(os.environ.get("BROWSER_INACTIVITY_TIMEOUT", "300")) |
|
|
| |
| _session_last_activity: Dict[str, float] = {} |
|
|
| |
| _cleanup_thread = None |
| _cleanup_running = False |
| |
| |
| _cleanup_lock = threading.Lock() |
|
|
|
|
| def _emergency_cleanup_all_sessions(): |
| """ |
| Emergency cleanup of all active browser sessions. |
| Called on process exit or interrupt to prevent orphaned sessions. |
| |
| Also runs the orphan reaper to clean up daemons left behind by previously |
| crashed hermes processes β this way every clean hermes exit sweeps |
| accumulated orphans, not just ones that actively used the browser tool. |
| """ |
| global _cleanup_done |
| if _cleanup_done: |
| return |
| _cleanup_done = True |
|
|
| |
| |
| if _active_sessions: |
| logger.info("Emergency cleanup: closing %s active session(s)...", |
| len(_active_sessions)) |
| try: |
| cleanup_all_browsers() |
| except Exception as e: |
| logger.error("Emergency cleanup error: %s", e) |
| finally: |
| with _cleanup_lock: |
| _active_sessions.clear() |
| _session_last_activity.clear() |
| _recording_sessions.clear() |
|
|
| |
| |
| |
| try: |
| _reap_orphaned_browser_sessions() |
| except Exception as e: |
| logger.debug("Orphan reap on exit failed: %s", e) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| atexit.register(_emergency_cleanup_all_sessions) |
|
|
|
|
| |
| |
| |
|
|
| def _cleanup_inactive_browser_sessions(): |
| """ |
| Clean up browser sessions that have been inactive for longer than the timeout. |
| |
| This function is called periodically by the background cleanup thread to |
| automatically close sessions that haven't been used recently, preventing |
| orphaned sessions (local or Browserbase) from accumulating. |
| """ |
| current_time = time.time() |
| sessions_to_cleanup = [] |
| |
| with _cleanup_lock: |
| for task_id, last_time in list(_session_last_activity.items()): |
| if current_time - last_time > BROWSER_SESSION_INACTIVITY_TIMEOUT: |
| sessions_to_cleanup.append(task_id) |
| |
| for task_id in sessions_to_cleanup: |
| try: |
| elapsed = int(current_time - _session_last_activity.get(task_id, current_time)) |
| logger.info("Cleaning up inactive session for task: %s (inactive for %ss)", task_id, elapsed) |
| cleanup_browser(task_id) |
| with _cleanup_lock: |
| if task_id in _session_last_activity: |
| del _session_last_activity[task_id] |
| except Exception as e: |
| logger.warning("Error cleaning up inactive session %s: %s", task_id, e) |
|
|
|
|
| def _write_owner_pid(socket_dir: str, session_name: str) -> None: |
| """Record the current hermes PID as the owner of a browser socket dir. |
| |
| Written atomically to ``<socket_dir>/<session_name>.owner_pid`` so the |
| orphan reaper can distinguish daemons owned by a live hermes process |
| (don't reap) from daemons whose owner crashed (reap). Best-effort β |
| an OSError here just falls back to the legacy ``tracked_names`` |
| heuristic in the reaper. |
| """ |
| try: |
| path = os.path.join(socket_dir, f"{session_name}.owner_pid") |
| with open(path, "w") as f: |
| f.write(str(os.getpid())) |
| except OSError as exc: |
| logger.debug("Could not write owner_pid file for %s: %s", |
| session_name, exc) |
|
|
|
|
| def _reap_orphaned_browser_sessions(): |
| """Scan for orphaned agent-browser daemon processes from previous runs. |
| |
| When the Python process that created a browser session exits uncleanly |
| (SIGKILL, crash, gateway restart), the in-memory ``_active_sessions`` |
| tracking is lost but the node + Chromium processes keep running. |
| |
| This function scans the tmp directory for ``agent-browser-*`` socket dirs |
| left behind by previous runs, reads the daemon PID files, and kills any |
| daemons whose owning hermes process is no longer alive. |
| |
| Ownership detection priority: |
| 1. ``<session>.owner_pid`` file (written by current code) β if the |
| referenced hermes PID is alive, leave the daemon alone regardless |
| of whether it's in *this* process's ``_active_sessions``. This is |
| cross-process safe: two concurrent hermes instances won't reap each |
| other's daemons. |
| 2. Fallback for daemons that predate owner_pid: check |
| ``_active_sessions`` in the current process. If not tracked here, |
| treat as orphan (legacy behavior). |
| |
| Safe to call from any context β atexit, cleanup thread, or on demand. |
| """ |
| import glob |
|
|
| tmpdir = _socket_safe_tmpdir() |
| pattern = os.path.join(tmpdir, "agent-browser-h_*") |
| socket_dirs = glob.glob(pattern) |
| |
| socket_dirs += glob.glob(os.path.join(tmpdir, "agent-browser-cdp_*")) |
| |
| socket_dirs += glob.glob(os.path.join(tmpdir, "agent-browser-hermes_*")) |
|
|
| if not socket_dirs: |
| return |
|
|
| |
| with _cleanup_lock: |
| tracked_names = { |
| info.get("session_name") |
| for info in _active_sessions.values() |
| if info.get("session_name") |
| } |
|
|
| reaped = 0 |
| for socket_dir in socket_dirs: |
| dir_name = os.path.basename(socket_dir) |
| |
| session_name = dir_name.removeprefix("agent-browser-") |
| if not session_name: |
| continue |
|
|
| |
| owner_pid_file = os.path.join(socket_dir, f"{session_name}.owner_pid") |
| owner_alive: Optional[bool] = None |
| if os.path.isfile(owner_pid_file): |
| try: |
| owner_pid = int(Path(owner_pid_file).read_text().strip()) |
| try: |
| os.kill(owner_pid, 0) |
| owner_alive = True |
| except ProcessLookupError: |
| owner_alive = False |
| except PermissionError: |
| |
| |
| owner_alive = True |
| except (ValueError, OSError): |
| owner_alive = None |
|
|
| if owner_alive is True: |
| |
| continue |
|
|
| if owner_alive is None: |
| |
| |
| if session_name in tracked_names: |
| continue |
|
|
| |
| pid_file = os.path.join(socket_dir, f"{session_name}.pid") |
| if not os.path.isfile(pid_file): |
| |
| shutil.rmtree(socket_dir, ignore_errors=True) |
| continue |
|
|
| try: |
| daemon_pid = int(Path(pid_file).read_text().strip()) |
| except (ValueError, OSError): |
| shutil.rmtree(socket_dir, ignore_errors=True) |
| continue |
|
|
| |
| try: |
| os.kill(daemon_pid, 0) |
| except ProcessLookupError: |
| |
| shutil.rmtree(socket_dir, ignore_errors=True) |
| continue |
| except PermissionError: |
| |
| continue |
|
|
| |
| try: |
| os.kill(daemon_pid, signal.SIGTERM) |
| logger.info("Reaped orphaned browser daemon PID %d (session %s)", |
| daemon_pid, session_name) |
| reaped += 1 |
| except (ProcessLookupError, PermissionError, OSError): |
| pass |
|
|
| |
| shutil.rmtree(socket_dir, ignore_errors=True) |
|
|
| if reaped: |
| logger.info("Reaped %d orphaned browser session(s) from previous run(s)", reaped) |
|
|
|
|
| def _browser_cleanup_thread_worker(): |
| """ |
| Background thread that periodically cleans up inactive browser sessions. |
| |
| Runs every 30 seconds and checks for sessions that haven't been used |
| within the BROWSER_SESSION_INACTIVITY_TIMEOUT period. |
| On first run, also reaps orphaned sessions from previous process lifetimes. |
| """ |
| |
| try: |
| _reap_orphaned_browser_sessions() |
| except Exception as e: |
| logger.warning("Orphan reap error: %s", e) |
|
|
| while _cleanup_running: |
| try: |
| _cleanup_inactive_browser_sessions() |
| except Exception as e: |
| logger.warning("Cleanup thread error: %s", e) |
| |
| |
| for _ in range(30): |
| if not _cleanup_running: |
| break |
| time.sleep(1) |
|
|
|
|
| def _start_browser_cleanup_thread(): |
| """Start the background cleanup thread if not already running.""" |
| global _cleanup_thread, _cleanup_running |
| |
| with _cleanup_lock: |
| if _cleanup_thread is None or not _cleanup_thread.is_alive(): |
| _cleanup_running = True |
| _cleanup_thread = threading.Thread( |
| target=_browser_cleanup_thread_worker, |
| daemon=True, |
| name="browser-cleanup" |
| ) |
| _cleanup_thread.start() |
| logger.info("Started inactivity cleanup thread (timeout: %ss)", BROWSER_SESSION_INACTIVITY_TIMEOUT) |
|
|
|
|
| def _stop_browser_cleanup_thread(): |
| """Stop the background cleanup thread.""" |
| global _cleanup_running |
| _cleanup_running = False |
| if _cleanup_thread is not None: |
| _cleanup_thread.join(timeout=5) |
|
|
|
|
| def _update_session_activity(task_id: str): |
| """Update the last activity timestamp for a session.""" |
| with _cleanup_lock: |
| _session_last_activity[task_id] = time.time() |
|
|
|
|
| |
| atexit.register(_stop_browser_cleanup_thread) |
|
|
|
|
| |
| |
| |
|
|
| BROWSER_TOOL_SCHEMAS = [ |
| { |
| "name": "browser_navigate", |
| "description": "Navigate to a URL in the browser. Initializes the session and loads the page. Must be called before other browser tools. For simple information retrieval, prefer web_search or web_extract (faster, cheaper). Use browser tools when you need to interact with a page (click, fill forms, dynamic content). Returns a compact page snapshot with interactive elements and ref IDs β no need to call browser_snapshot separately after navigating.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "url": { |
| "type": "string", |
| "description": "The URL to navigate to (e.g., 'https://example.com')" |
| } |
| }, |
| "required": ["url"] |
| } |
| }, |
| { |
| "name": "browser_snapshot", |
| "description": "Get a text-based snapshot of the current page's accessibility tree. Returns interactive elements with ref IDs (like @e1, @e2) for browser_click and browser_type. full=false (default): compact view with interactive elements. full=true: complete page content. Snapshots over 8000 chars are truncated or LLM-summarized. Requires browser_navigate first. Note: browser_navigate already returns a compact snapshot β use this to refresh after interactions that change the page, or with full=true for complete content.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "full": { |
| "type": "boolean", |
| "description": "If true, returns complete page content. If false (default), returns compact view with interactive elements only.", |
| "default": False |
| } |
| }, |
| "required": [] |
| } |
| }, |
| { |
| "name": "browser_click", |
| "description": "Click on an element identified by its ref ID from the snapshot (e.g., '@e5'). The ref IDs are shown in square brackets in the snapshot output. Requires browser_navigate and browser_snapshot to be called first.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "ref": { |
| "type": "string", |
| "description": "The element reference from the snapshot (e.g., '@e5', '@e12')" |
| } |
| }, |
| "required": ["ref"] |
| } |
| }, |
| { |
| "name": "browser_type", |
| "description": "Type text into an input field identified by its ref ID. Clears the field first, then types the new text. Requires browser_navigate and browser_snapshot to be called first.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "ref": { |
| "type": "string", |
| "description": "The element reference from the snapshot (e.g., '@e3')" |
| }, |
| "text": { |
| "type": "string", |
| "description": "The text to type into the field" |
| } |
| }, |
| "required": ["ref", "text"] |
| } |
| }, |
| { |
| "name": "browser_scroll", |
| "description": "Scroll the page in a direction. Use this to reveal more content that may be below or above the current viewport. Requires browser_navigate to be called first.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "direction": { |
| "type": "string", |
| "enum": ["up", "down"], |
| "description": "Direction to scroll" |
| } |
| }, |
| "required": ["direction"] |
| } |
| }, |
| { |
| "name": "browser_back", |
| "description": "Navigate back to the previous page in browser history. Requires browser_navigate to be called first.", |
| "parameters": { |
| "type": "object", |
| "properties": {}, |
| "required": [] |
| } |
| }, |
| { |
| "name": "browser_press", |
| "description": "Press a keyboard key. Useful for submitting forms (Enter), navigating (Tab), or keyboard shortcuts. Requires browser_navigate to be called first.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "key": { |
| "type": "string", |
| "description": "Key to press (e.g., 'Enter', 'Tab', 'Escape', 'ArrowDown')" |
| } |
| }, |
| "required": ["key"] |
| } |
| }, |
| { |
| "name": "browser_get_images", |
| "description": "Get a list of all images on the current page with their URLs and alt text. Useful for finding images to analyze with the vision tool. Requires browser_navigate to be called first.", |
| "parameters": { |
| "type": "object", |
| "properties": {}, |
| "required": [] |
| } |
| }, |
| { |
| "name": "browser_vision", |
| "description": "Take a screenshot of the current page and analyze it with vision AI. Use this when you need to visually understand what's on the page - especially useful for CAPTCHAs, visual verification challenges, complex layouts, or when the text snapshot doesn't capture important visual information. Returns both the AI analysis and a screenshot_path that you can share with the user by including MEDIA:<screenshot_path> in your response. Requires browser_navigate to be called first.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "question": { |
| "type": "string", |
| "description": "What you want to know about the page visually. Be specific about what you're looking for." |
| }, |
| "annotate": { |
| "type": "boolean", |
| "default": False, |
| "description": "If true, overlay numbered [N] labels on interactive elements. Each [N] maps to ref @eN for subsequent browser commands. Useful for QA and spatial reasoning about page layout." |
| } |
| }, |
| "required": ["question"] |
| } |
| }, |
| { |
| "name": "browser_console", |
| "description": "Get browser console output and JavaScript errors from the current page. Returns console.log/warn/error/info messages and uncaught JS exceptions. Use this to detect silent JavaScript errors, failed API calls, and application warnings. Requires browser_navigate to be called first. When 'expression' is provided, evaluates JavaScript in the page context and returns the result β use this for DOM inspection, reading page state, or extracting data programmatically.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "clear": { |
| "type": "boolean", |
| "default": False, |
| "description": "If true, clear the message buffers after reading" |
| }, |
| "expression": { |
| "type": "string", |
| "description": "JavaScript expression to evaluate in the page context. Runs in the browser like DevTools console β full access to DOM, window, document. Return values are serialized to JSON. Example: 'document.title' or 'document.querySelectorAll(\"a\").length'" |
| } |
| }, |
| "required": [] |
| } |
| }, |
| ] |
|
|
|
|
| |
| |
| |
|
|
| def _create_local_session(task_id: str) -> Dict[str, str]: |
| import uuid |
| session_name = f"h_{uuid.uuid4().hex[:10]}" |
| logger.info("Created local browser session %s for task %s", |
| session_name, task_id) |
| return { |
| "session_name": session_name, |
| "bb_session_id": None, |
| "cdp_url": None, |
| "features": {"local": True}, |
| } |
|
|
|
|
| def _create_cdp_session(task_id: str, cdp_url: str) -> Dict[str, str]: |
| """Create a session that connects to a user-supplied CDP endpoint.""" |
| import uuid |
| session_name = f"cdp_{uuid.uuid4().hex[:10]}" |
| logger.info("Created CDP browser session %s β %s for task %s", |
| session_name, cdp_url, task_id) |
| return { |
| "session_name": session_name, |
| "bb_session_id": None, |
| "cdp_url": cdp_url, |
| "features": {"cdp_override": True}, |
| } |
|
|
|
|
| def _get_session_info(task_id: Optional[str] = None) -> Dict[str, str]: |
| """ |
| Get or create session info for the given task. |
| |
| In cloud mode, creates a Browserbase session with proxies enabled. |
| In local mode, generates a session name for agent-browser --session. |
| Also starts the inactivity cleanup thread and updates activity tracking. |
| Thread-safe: multiple subagents can call this concurrently. |
| |
| Args: |
| task_id: Unique identifier for the task |
| |
| Returns: |
| Dict with session_name (always), bb_session_id + cdp_url (cloud only) |
| """ |
| if task_id is None: |
| task_id = "default" |
| |
| |
| _start_browser_cleanup_thread() |
| |
| |
| _update_session_activity(task_id) |
| |
| with _cleanup_lock: |
| |
| if task_id in _active_sessions: |
| return _active_sessions[task_id] |
| |
| |
| cdp_override = _get_cdp_override() |
| if cdp_override: |
| session_info = _create_cdp_session(task_id, cdp_override) |
| else: |
| provider = _get_cloud_provider() |
| if provider is None: |
| session_info = _create_local_session(task_id) |
| else: |
| try: |
| session_info = provider.create_session(task_id) |
| |
| if not session_info or not isinstance(session_info, dict): |
| raise ValueError(f"Cloud provider returned invalid session: {session_info!r}") |
| if session_info.get("cdp_url"): |
| |
| |
| session_info = dict(session_info) |
| session_info["cdp_url"] = _resolve_cdp_override(str(session_info["cdp_url"])) |
| except Exception as e: |
| provider_name = type(provider).__name__ |
| logger.warning( |
| "Cloud provider %s failed (%s); attempting fallback to local " |
| "Chromium for task %s", |
| provider_name, e, task_id, |
| exc_info=True, |
| ) |
| try: |
| session_info = _create_local_session(task_id) |
| except Exception as local_error: |
| raise RuntimeError( |
| f"Cloud provider {provider_name} failed ({e}) and local " |
| f"fallback also failed ({local_error})" |
| ) from e |
| |
| if isinstance(session_info, dict): |
| session_info = dict(session_info) |
| session_info["fallback_from_cloud"] = True |
| session_info["fallback_reason"] = str(e) |
| session_info["fallback_provider"] = provider_name |
| |
| with _cleanup_lock: |
| |
| |
| |
| if task_id in _active_sessions: |
| return _active_sessions[task_id] |
| _active_sessions[task_id] = session_info |
|
|
| |
| |
| |
| _ensure_cdp_supervisor(task_id) |
|
|
| return session_info |
|
|
|
|
|
|
| def _find_agent_browser() -> str: |
| """ |
| Find the agent-browser CLI executable. |
| |
| Checks in order: current PATH, Homebrew/common bin dirs, Hermes-managed |
| node, local node_modules/.bin/, npx fallback. |
| |
| Returns: |
| Path to agent-browser executable |
| |
| Raises: |
| FileNotFoundError: If agent-browser is not installed |
| """ |
| global _cached_agent_browser, _agent_browser_resolved |
| if _agent_browser_resolved: |
| if _cached_agent_browser is None: |
| raise FileNotFoundError( |
| "agent-browser CLI not found (cached). Install it with: " |
| f"{_browser_install_hint()}\n" |
| "Or run 'npm install' in the repo root to install locally.\n" |
| "Or ensure npx is available in your PATH." |
| ) |
| return _cached_agent_browser |
|
|
| |
| |
| |
|
|
| |
| which_result = shutil.which("agent-browser") |
| if which_result: |
| _cached_agent_browser = which_result |
| _agent_browser_resolved = True |
| return which_result |
|
|
| |
| |
| extended_path = _merge_browser_path("") |
| if extended_path: |
| which_result = shutil.which("agent-browser", path=extended_path) |
| if which_result: |
| _cached_agent_browser = which_result |
| _agent_browser_resolved = True |
| return which_result |
|
|
| |
| repo_root = Path(__file__).parent.parent |
| local_bin = repo_root / "node_modules" / ".bin" / "agent-browser" |
| if local_bin.exists(): |
| _cached_agent_browser = str(local_bin) |
| _agent_browser_resolved = True |
| return _cached_agent_browser |
| |
| |
| npx_path = shutil.which("npx") |
| if not npx_path and extended_path: |
| npx_path = shutil.which("npx", path=extended_path) |
| if npx_path: |
| _cached_agent_browser = "npx agent-browser" |
| _agent_browser_resolved = True |
| return _cached_agent_browser |
| |
| |
| _agent_browser_resolved = True |
| raise FileNotFoundError( |
| "agent-browser CLI not found. Install it with: " |
| f"{_browser_install_hint()}\n" |
| "Or run 'npm install' in the repo root to install locally.\n" |
| "Or ensure npx is available in your PATH." |
| ) |
|
|
|
|
| def _extract_screenshot_path_from_text(text: str) -> Optional[str]: |
| """Extract a screenshot file path from agent-browser human-readable output.""" |
| if not text: |
| return None |
|
|
| patterns = [ |
| r"Screenshot saved to ['\"](?P<path>/[^'\"]+?\.png)['\"]", |
| r"Screenshot saved to (?P<path>/\S+?\.png)(?:\s|$)", |
| r"(?P<path>/\S+?\.png)(?:\s|$)", |
| ] |
|
|
| for pattern in patterns: |
| match = re.search(pattern, text) |
| if match: |
| path = match.group("path").strip().strip("'\"") |
| if path: |
| return path |
|
|
| return None |
|
|
|
|
| def _run_browser_command( |
| task_id: str, |
| command: str, |
| args: List[str] = None, |
| timeout: Optional[int] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Run an agent-browser CLI command using our pre-created Browserbase session. |
| |
| Args: |
| task_id: Task identifier to get the right session |
| command: The command to run (e.g., "open", "click") |
| args: Additional arguments for the command |
| timeout: Command timeout in seconds. ``None`` reads |
| ``browser.command_timeout`` from config (default 30s). |
| |
| Returns: |
| Parsed JSON response from agent-browser |
| """ |
| if timeout is None: |
| timeout = _get_command_timeout() |
| args = args or [] |
| |
| |
| try: |
| browser_cmd = _find_agent_browser() |
| except FileNotFoundError as e: |
| logger.warning("agent-browser CLI not found: %s", e) |
| return {"success": False, "error": str(e)} |
|
|
| if _requires_real_termux_browser_install(browser_cmd): |
| error = _termux_browser_install_error() |
| logger.warning("browser command blocked on Termux: %s", error) |
| return {"success": False, "error": error} |
| |
| from tools.interrupt import is_interrupted |
| if is_interrupted(): |
| return {"success": False, "error": "Interrupted"} |
|
|
| |
| try: |
| session_info = _get_session_info(task_id) |
| except Exception as e: |
| logger.warning("Failed to create browser session for task=%s: %s", task_id, e) |
| return {"success": False, "error": f"Failed to create browser session: {str(e)}"} |
| |
| |
| |
| |
| |
| if session_info.get("cdp_url"): |
| |
| |
| |
| backend_args = ["--cdp", session_info["cdp_url"]] |
| else: |
| |
| backend_args = ["--session", session_info["session_name"]] |
|
|
| |
| |
| cmd_prefix = ["npx", "agent-browser"] if browser_cmd == "npx agent-browser" else [browser_cmd] |
|
|
| cmd_parts = cmd_prefix + backend_args + [ |
| "--json", |
| command |
| ] + args |
| |
| try: |
| |
| |
| |
| task_socket_dir = os.path.join( |
| _socket_safe_tmpdir(), |
| f"agent-browser-{session_info['session_name']}" |
| ) |
| os.makedirs(task_socket_dir, mode=0o700, exist_ok=True) |
| |
| |
| _write_owner_pid(task_socket_dir, session_info['session_name']) |
| logger.debug("browser cmd=%s task=%s socket_dir=%s (%d chars)", |
| command, task_id, task_socket_dir, len(task_socket_dir)) |
| |
| browser_env = {**os.environ} |
|
|
| |
| |
| browser_env["PATH"] = _merge_browser_path(browser_env.get("PATH", "")) |
| browser_env["AGENT_BROWSER_SOCKET_DIR"] = task_socket_dir |
|
|
| |
| |
| |
| |
| |
| if "AGENT_BROWSER_IDLE_TIMEOUT_MS" not in browser_env: |
| idle_ms = str(BROWSER_SESSION_INACTIVITY_TIMEOUT * 1000) |
| browser_env["AGENT_BROWSER_IDLE_TIMEOUT_MS"] = idle_ms |
| |
| |
| |
| |
| |
| |
| stdout_path = os.path.join(task_socket_dir, f"_stdout_{command}") |
| stderr_path = os.path.join(task_socket_dir, f"_stderr_{command}") |
| stdout_fd = os.open(stdout_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) |
| stderr_fd = os.open(stderr_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) |
| try: |
| proc = subprocess.Popen( |
| cmd_parts, |
| stdout=stdout_fd, |
| stderr=stderr_fd, |
| stdin=subprocess.DEVNULL, |
| env=browser_env, |
| ) |
| finally: |
| os.close(stdout_fd) |
| os.close(stderr_fd) |
|
|
| try: |
| proc.wait(timeout=timeout) |
| except subprocess.TimeoutExpired: |
| proc.kill() |
| proc.wait() |
| logger.warning("browser '%s' timed out after %ds (task=%s, socket_dir=%s)", |
| command, timeout, task_id, task_socket_dir) |
| return {"success": False, "error": f"Command timed out after {timeout} seconds"} |
|
|
| with open(stdout_path, "r") as f: |
| stdout = f.read() |
| with open(stderr_path, "r") as f: |
| stderr = f.read() |
| returncode = proc.returncode |
|
|
| |
| for p in (stdout_path, stderr_path): |
| try: |
| os.unlink(p) |
| except OSError: |
| pass |
|
|
| |
| if stderr and stderr.strip(): |
| level = logging.WARNING if returncode != 0 else logging.DEBUG |
| logger.log(level, "browser '%s' stderr: %s", command, stderr.strip()[:500]) |
| |
| stdout_text = stdout.strip() |
|
|
| |
| |
| |
| if not stdout_text and returncode == 0 and command not in _EMPTY_OK_COMMANDS: |
| logger.warning("browser '%s' returned empty output (rc=0)", command) |
| return {"success": False, "error": f"Browser command '{command}' returned no output"} |
|
|
| if stdout_text: |
| try: |
| parsed = json.loads(stdout_text) |
| |
| if command == "snapshot" and parsed.get("success"): |
| snap_data = parsed.get("data", {}) |
| if not snap_data.get("snapshot") and not snap_data.get("refs"): |
| logger.warning("snapshot returned empty content. " |
| "Possible stale daemon or CDP connection issue. " |
| "returncode=%s", returncode) |
| return parsed |
| except json.JSONDecodeError: |
| raw = stdout_text[:2000] |
| logger.warning("browser '%s' returned non-JSON output (rc=%s): %s", |
| command, returncode, raw[:500]) |
|
|
| if command == "screenshot": |
| stderr_text = (stderr or "").strip() |
| combined_text = "\n".join( |
| part for part in [stdout_text, stderr_text] if part |
| ) |
| recovered_path = _extract_screenshot_path_from_text(combined_text) |
|
|
| if recovered_path and Path(recovered_path).exists(): |
| logger.info( |
| "browser 'screenshot' recovered file from non-JSON output: %s", |
| recovered_path, |
| ) |
| return { |
| "success": True, |
| "data": { |
| "path": recovered_path, |
| "raw": raw, |
| }, |
| } |
|
|
| return { |
| "success": False, |
| "error": f"Non-JSON output from agent-browser for '{command}': {raw}" |
| } |
| |
| |
| if returncode != 0: |
| error_msg = stderr.strip() if stderr else f"Command failed with code {returncode}" |
| logger.warning("browser '%s' failed (rc=%s): %s", command, returncode, error_msg[:300]) |
| return {"success": False, "error": error_msg} |
| |
| return {"success": True, "data": {}} |
| |
| except Exception as e: |
| logger.warning("browser '%s' exception: %s", command, e, exc_info=True) |
| return {"success": False, "error": str(e)} |
|
|
|
|
| def _extract_relevant_content( |
| snapshot_text: str, |
| user_task: Optional[str] = None |
| ) -> str: |
| """Use LLM to extract relevant content from a snapshot based on the user's task. |
| |
| Falls back to simple truncation when no auxiliary text model is configured. |
| """ |
| if user_task: |
| extraction_prompt = ( |
| f"You are a content extractor for a browser automation agent.\n\n" |
| f"The user's task is: {user_task}\n\n" |
| f"Given the following page snapshot (accessibility tree representation), " |
| f"extract and summarize the most relevant information for completing this task. Focus on:\n" |
| f"1. Interactive elements (buttons, links, inputs) that might be needed\n" |
| f"2. Text content relevant to the task (prices, descriptions, headings, important info)\n" |
| f"3. Navigation structure if relevant\n\n" |
| f"Keep ref IDs (like [ref=e5]) for interactive elements so the agent can use them.\n\n" |
| f"Page Snapshot:\n{snapshot_text}\n\n" |
| f"Provide a concise summary that preserves actionable information and relevant content." |
| ) |
| else: |
| extraction_prompt = ( |
| f"Summarize this page snapshot, preserving:\n" |
| f"1. All interactive elements with their ref IDs (like [ref=e5])\n" |
| f"2. Key text content and headings\n" |
| f"3. Important information visible on the page\n\n" |
| f"Page Snapshot:\n{snapshot_text}\n\n" |
| f"Provide a concise summary focused on interactive elements and key content." |
| ) |
|
|
| |
| |
| |
| |
| from agent.redact import redact_sensitive_text |
| extraction_prompt = redact_sensitive_text(extraction_prompt) |
|
|
| try: |
| call_kwargs = { |
| "task": "web_extract", |
| "messages": [{"role": "user", "content": extraction_prompt}], |
| "max_tokens": 4000, |
| "temperature": 0.1, |
| } |
| model = _get_extraction_model() |
| if model: |
| call_kwargs["model"] = model |
| response = call_llm(**call_kwargs) |
| extracted = (response.choices[0].message.content or "").strip() or _truncate_snapshot(snapshot_text) |
| |
| return redact_sensitive_text(extracted) |
| except Exception: |
| return _truncate_snapshot(snapshot_text) |
|
|
|
|
| def _truncate_snapshot(snapshot_text: str, max_chars: int = 8000) -> str: |
| """Structure-aware truncation for snapshots. |
| |
| Cuts at line boundaries so that accessibility tree elements are never |
| split mid-line, and appends a note telling the agent how much was |
| omitted. |
| |
| Args: |
| snapshot_text: The snapshot text to truncate |
| max_chars: Maximum characters to keep |
| |
| Returns: |
| Truncated text with indicator if truncated |
| """ |
| if len(snapshot_text) <= max_chars: |
| return snapshot_text |
|
|
| lines = snapshot_text.split('\n') |
| result: list[str] = [] |
| chars = 0 |
| for line in lines: |
| if chars + len(line) + 1 > max_chars - 80: |
| break |
| result.append(line) |
| chars += len(line) + 1 |
| remaining = len(lines) - len(result) |
| if remaining > 0: |
| result.append(f'\n[... {remaining} more lines truncated, use browser_snapshot for full content]') |
| return '\n'.join(result) |
|
|
|
|
| |
| |
| |
|
|
| def browser_navigate(url: str, task_id: Optional[str] = None) -> str: |
| """ |
| Navigate to a URL in the browser. |
| |
| Args: |
| url: The URL to navigate to |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with navigation result (includes stealth features info on first nav) |
| """ |
| |
| |
| |
| |
| import urllib.parse |
| from agent.redact import _PREFIX_RE |
| url_decoded = urllib.parse.unquote(url) |
| if _PREFIX_RE.search(url) or _PREFIX_RE.search(url_decoded): |
| return json.dumps({ |
| "success": False, |
| "error": "Blocked: URL contains what appears to be an API key or token. " |
| "Secrets must not be sent in URLs.", |
| }) |
|
|
| |
| |
| |
| |
| |
| if not _is_local_backend() and not _allow_private_urls() and not _is_safe_url(url): |
| return json.dumps({ |
| "success": False, |
| "error": "Blocked: URL targets a private or internal address", |
| }) |
|
|
| |
| blocked = check_website_access(url) |
| if blocked: |
| return json.dumps({ |
| "success": False, |
| "error": blocked["message"], |
| "blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]}, |
| }) |
|
|
| |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_navigate |
| return camofox_navigate(url, task_id) |
|
|
| effective_task_id = task_id or "default" |
| |
| |
| |
| session_info = _get_session_info(effective_task_id) |
| is_first_nav = session_info.get("_first_nav", True) |
| |
| |
| if is_first_nav: |
| session_info["_first_nav"] = False |
| _maybe_start_recording(effective_task_id) |
|
|
| result = _run_browser_command(effective_task_id, "open", [url], timeout=max(_get_command_timeout(), 60)) |
| |
| if result.get("success"): |
| data = result.get("data", {}) |
| title = data.get("title", "") |
| final_url = data.get("url", url) |
|
|
| |
| |
| |
| |
| if not _is_local_backend() and not _allow_private_urls() and final_url and final_url != url and not _is_safe_url(final_url): |
| |
| _run_browser_command(effective_task_id, "open", ["about:blank"], timeout=10) |
| return json.dumps({ |
| "success": False, |
| "error": "Blocked: redirect landed on a private/internal address", |
| }) |
|
|
| response = { |
| "success": True, |
| "url": final_url, |
| "title": title |
| } |
| |
| |
| blocked_patterns = [ |
| "access denied", "access to this page has been denied", |
| "blocked", "bot detected", "verification required", |
| "please verify", "are you a robot", "captcha", |
| "cloudflare", "ddos protection", "checking your browser", |
| "just a moment", "attention required" |
| ] |
| title_lower = title.lower() |
| |
| if any(pattern in title_lower for pattern in blocked_patterns): |
| response["bot_detection_warning"] = ( |
| f"Page title '{title}' suggests bot detection. The site may have blocked this request. " |
| "Options: 1) Try adding delays between actions, 2) Access different pages first, " |
| "3) Enable advanced stealth (BROWSERBASE_ADVANCED_STEALTH=true, requires Scale plan), " |
| "4) Some sites have very aggressive bot detection that may be unavoidable." |
| ) |
| |
| |
| if is_first_nav and "features" in session_info: |
| features = session_info["features"] |
| active_features = [k for k, v in features.items() if v] |
| if not features.get("proxies"): |
| response["stealth_warning"] = ( |
| "Running WITHOUT residential proxies. Bot detection may be more aggressive. " |
| "Consider upgrading Browserbase plan for proxy support." |
| ) |
| response["stealth_features"] = active_features |
|
|
| |
| |
| try: |
| snap_result = _run_browser_command(effective_task_id, "snapshot", ["-c"]) |
| if snap_result.get("success"): |
| snap_data = snap_result.get("data", {}) |
| snapshot_text = snap_data.get("snapshot", "") |
| refs = snap_data.get("refs", {}) |
| if len(snapshot_text) > SNAPSHOT_SUMMARIZE_THRESHOLD: |
| snapshot_text = _truncate_snapshot(snapshot_text) |
| response["snapshot"] = snapshot_text |
| response["element_count"] = len(refs) if refs else 0 |
| except Exception as e: |
| logger.debug("Auto-snapshot after navigate failed: %s", e) |
|
|
| return json.dumps(response, ensure_ascii=False) |
| else: |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", "Navigation failed") |
| }, ensure_ascii=False) |
|
|
|
|
| def browser_snapshot( |
| full: bool = False, |
| task_id: Optional[str] = None, |
| user_task: Optional[str] = None |
| ) -> str: |
| """ |
| Get a text-based snapshot of the current page's accessibility tree. |
| |
| Args: |
| full: If True, return complete snapshot. If False, return compact view. |
| task_id: Task identifier for session isolation |
| user_task: The user's current task (for task-aware extraction) |
| |
| Returns: |
| JSON string with page snapshot |
| """ |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_snapshot |
| return camofox_snapshot(full, task_id, user_task) |
|
|
| effective_task_id = task_id or "default" |
| |
| |
| args = [] |
| if not full: |
| args.extend(["-c"]) |
| |
| result = _run_browser_command(effective_task_id, "snapshot", args) |
| |
| if result.get("success"): |
| data = result.get("data", {}) |
| snapshot_text = data.get("snapshot", "") |
| refs = data.get("refs", {}) |
| |
| |
| if len(snapshot_text) > SNAPSHOT_SUMMARIZE_THRESHOLD and user_task: |
| snapshot_text = _extract_relevant_content(snapshot_text, user_task) |
| elif len(snapshot_text) > SNAPSHOT_SUMMARIZE_THRESHOLD: |
| snapshot_text = _truncate_snapshot(snapshot_text) |
| |
| response = { |
| "success": True, |
| "snapshot": snapshot_text, |
| "element_count": len(refs) if refs else 0 |
| } |
|
|
| |
| |
| |
| try: |
| from tools.browser_supervisor import SUPERVISOR_REGISTRY |
| _supervisor = SUPERVISOR_REGISTRY.get(effective_task_id) |
| if _supervisor is not None: |
| _sv_snap = _supervisor.snapshot() |
| if _sv_snap.active: |
| response.update(_sv_snap.to_dict()) |
| except Exception as _sv_exc: |
| logger.debug("supervisor snapshot merge failed: %s", _sv_exc) |
|
|
| return json.dumps(response, ensure_ascii=False) |
| else: |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", "Failed to get snapshot") |
| }, ensure_ascii=False) |
|
|
|
|
| def browser_click(ref: str, task_id: Optional[str] = None) -> str: |
| """ |
| Click on an element. |
| |
| Args: |
| ref: Element reference (e.g., "@e5") |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with click result |
| """ |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_click |
| return camofox_click(ref, task_id) |
|
|
| effective_task_id = task_id or "default" |
| |
| |
| if not ref.startswith("@"): |
| ref = f"@{ref}" |
| |
| result = _run_browser_command(effective_task_id, "click", [ref]) |
| |
| if result.get("success"): |
| return json.dumps({ |
| "success": True, |
| "clicked": ref |
| }, ensure_ascii=False) |
| else: |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", f"Failed to click {ref}") |
| }, ensure_ascii=False) |
|
|
|
|
| def browser_type(ref: str, text: str, task_id: Optional[str] = None) -> str: |
| """ |
| Type text into an input field. |
| |
| Args: |
| ref: Element reference (e.g., "@e3") |
| text: Text to type |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with type result |
| """ |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_type |
| return camofox_type(ref, text, task_id) |
|
|
| effective_task_id = task_id or "default" |
| |
| |
| if not ref.startswith("@"): |
| ref = f"@{ref}" |
| |
| |
| result = _run_browser_command(effective_task_id, "fill", [ref, text]) |
| |
| if result.get("success"): |
| return json.dumps({ |
| "success": True, |
| "typed": text, |
| "element": ref |
| }, ensure_ascii=False) |
| else: |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", f"Failed to type into {ref}") |
| }, ensure_ascii=False) |
|
|
|
|
| def browser_scroll(direction: str, task_id: Optional[str] = None) -> str: |
| """ |
| Scroll the page. |
| |
| Args: |
| direction: "up" or "down" |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with scroll result |
| """ |
| |
| if direction not in ["up", "down"]: |
| return json.dumps({ |
| "success": False, |
| "error": f"Invalid direction '{direction}'. Use 'up' or 'down'." |
| }, ensure_ascii=False) |
|
|
| |
| |
| |
| _SCROLL_PIXELS = 500 |
|
|
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_scroll |
| |
| _SCROLL_REPEATS = 5 |
| result = None |
| for _ in range(_SCROLL_REPEATS): |
| result = camofox_scroll(direction, task_id) |
| return result |
|
|
| effective_task_id = task_id or "default" |
|
|
| result = _run_browser_command(effective_task_id, "scroll", [direction, str(_SCROLL_PIXELS)]) |
| if not result.get("success"): |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", f"Failed to scroll {direction}") |
| }, ensure_ascii=False) |
|
|
| return json.dumps({ |
| "success": True, |
| "scrolled": direction |
| }, ensure_ascii=False) |
|
|
|
|
| def browser_back(task_id: Optional[str] = None) -> str: |
| """ |
| Navigate back in browser history. |
| |
| Args: |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with navigation result |
| """ |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_back |
| return camofox_back(task_id) |
|
|
| effective_task_id = task_id or "default" |
| result = _run_browser_command(effective_task_id, "back", []) |
| |
| if result.get("success"): |
| data = result.get("data", {}) |
| return json.dumps({ |
| "success": True, |
| "url": data.get("url", "") |
| }, ensure_ascii=False) |
| else: |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", "Failed to go back") |
| }, ensure_ascii=False) |
|
|
|
|
| def browser_press(key: str, task_id: Optional[str] = None) -> str: |
| """ |
| Press a keyboard key. |
| |
| Args: |
| key: Key to press (e.g., "Enter", "Tab") |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with key press result |
| """ |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_press |
| return camofox_press(key, task_id) |
|
|
| effective_task_id = task_id or "default" |
| result = _run_browser_command(effective_task_id, "press", [key]) |
| |
| if result.get("success"): |
| return json.dumps({ |
| "success": True, |
| "pressed": key |
| }, ensure_ascii=False) |
| else: |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", f"Failed to press {key}") |
| }, ensure_ascii=False) |
|
|
|
|
|
|
|
|
|
|
| def browser_console(clear: bool = False, expression: Optional[str] = None, task_id: Optional[str] = None) -> str: |
| """Get browser console messages and JavaScript errors, or evaluate JS in the page. |
| |
| When ``expression`` is provided, evaluates JavaScript in the page context |
| (like the DevTools console) and returns the result. Otherwise returns |
| console output (log/warn/error/info) and uncaught exceptions. |
| |
| Args: |
| clear: If True, clear the message/error buffers after reading |
| expression: JavaScript expression to evaluate in the page context |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with console messages/errors, or eval result |
| """ |
| |
| if expression is not None: |
| return _browser_eval(expression, task_id) |
|
|
| |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_console |
| return camofox_console(clear, task_id) |
|
|
| effective_task_id = task_id or "default" |
| |
| console_args = ["--clear"] if clear else [] |
| error_args = ["--clear"] if clear else [] |
| |
| console_result = _run_browser_command(effective_task_id, "console", console_args) |
| errors_result = _run_browser_command(effective_task_id, "errors", error_args) |
| |
| messages = [] |
| if console_result.get("success"): |
| for msg in console_result.get("data", {}).get("messages", []): |
| messages.append({ |
| "type": msg.get("type", "log"), |
| "text": msg.get("text", ""), |
| "source": "console", |
| }) |
| |
| errors = [] |
| if errors_result.get("success"): |
| for err in errors_result.get("data", {}).get("errors", []): |
| errors.append({ |
| "message": err.get("message", ""), |
| "source": "exception", |
| }) |
| |
| return json.dumps({ |
| "success": True, |
| "console_messages": messages, |
| "js_errors": errors, |
| "total_messages": len(messages), |
| "total_errors": len(errors), |
| }, ensure_ascii=False) |
|
|
|
|
| def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: |
| """Evaluate a JavaScript expression in the page context and return the result.""" |
| if _is_camofox_mode(): |
| return _camofox_eval(expression, task_id) |
|
|
| effective_task_id = task_id or "default" |
| result = _run_browser_command(effective_task_id, "eval", [expression]) |
|
|
| if not result.get("success"): |
| err = result.get("error", "eval failed") |
| |
| if any(hint in err.lower() for hint in ("unknown command", "not supported", "not found", "no such command")): |
| return json.dumps({ |
| "success": False, |
| "error": f"JavaScript evaluation is not supported by this browser backend. {err}", |
| }) |
| return json.dumps({ |
| "success": False, |
| "error": err, |
| }) |
|
|
| data = result.get("data", {}) |
| raw_result = data.get("result") |
|
|
| |
| |
| parsed = raw_result |
| if isinstance(raw_result, str): |
| try: |
| parsed = json.loads(raw_result) |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| return json.dumps({ |
| "success": True, |
| "result": parsed, |
| "result_type": type(parsed).__name__, |
| }, ensure_ascii=False, default=str) |
|
|
|
|
| def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str: |
| """Evaluate JS via Camofox's /tabs/{tab_id}/eval endpoint (if available).""" |
| from tools.browser_camofox import _ensure_tab, _post |
| try: |
| tab_info = _ensure_tab(task_id or "default") |
| tab_id = tab_info.get("tab_id") or tab_info.get("id") |
| resp = _post(f"/tabs/{tab_id}/evaluate", body={"expression": expression, "userId": tab_info["user_id"]}) |
|
|
| |
| raw_result = resp.get("result") if isinstance(resp, dict) else resp |
| parsed = raw_result |
| if isinstance(raw_result, str): |
| try: |
| parsed = json.loads(raw_result) |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| return json.dumps({ |
| "success": True, |
| "result": parsed, |
| "result_type": type(parsed).__name__, |
| }, ensure_ascii=False, default=str) |
| except Exception as e: |
| error_msg = str(e) |
| |
| if any(code in error_msg for code in ("404", "405", "501")): |
| return json.dumps({ |
| "success": False, |
| "error": "JavaScript evaluation is not supported by this Camofox server. " |
| "Use browser_snapshot or browser_vision to inspect page state.", |
| }) |
| return tool_error(error_msg, success=False) |
|
|
|
|
| def _maybe_start_recording(task_id: str): |
| """Start recording if browser.record_sessions is enabled in config.""" |
| with _cleanup_lock: |
| if task_id in _recording_sessions: |
| return |
| try: |
| from hermes_cli.config import read_raw_config |
| hermes_home = get_hermes_home() |
| cfg = read_raw_config() |
| record_enabled = cfg.get("browser", {}).get("record_sessions", False) |
| |
| if not record_enabled: |
| return |
| |
| recordings_dir = hermes_home / "browser_recordings" |
| recordings_dir.mkdir(parents=True, exist_ok=True) |
| _cleanup_old_recordings(max_age_hours=72) |
| |
| timestamp = time.strftime("%Y%m%d_%H%M%S") |
| recording_path = recordings_dir / f"session_{timestamp}_{task_id[:16]}.webm" |
| |
| result = _run_browser_command(task_id, "record", ["start", str(recording_path)]) |
| if result.get("success"): |
| with _cleanup_lock: |
| _recording_sessions.add(task_id) |
| logger.info("Auto-recording browser session %s to %s", task_id, recording_path) |
| else: |
| logger.debug("Could not start auto-recording: %s", result.get("error")) |
| except Exception as e: |
| logger.debug("Auto-recording setup failed: %s", e) |
|
|
|
|
| def _maybe_stop_recording(task_id: str): |
| """Stop recording if one is active for this session.""" |
| with _cleanup_lock: |
| if task_id not in _recording_sessions: |
| return |
| try: |
| result = _run_browser_command(task_id, "record", ["stop"]) |
| if result.get("success"): |
| path = result.get("data", {}).get("path", "") |
| logger.info("Saved browser recording for session %s: %s", task_id, path) |
| except Exception as e: |
| logger.debug("Could not stop recording for %s: %s", task_id, e) |
| finally: |
| with _cleanup_lock: |
| _recording_sessions.discard(task_id) |
|
|
|
|
| def browser_get_images(task_id: Optional[str] = None) -> str: |
| """ |
| Get all images on the current page. |
| |
| Args: |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with list of images (src and alt) |
| """ |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_get_images |
| return camofox_get_images(task_id) |
|
|
| effective_task_id = task_id or "default" |
| |
| |
| js_code = """JSON.stringify( |
| [...document.images].map(img => ({ |
| src: img.src, |
| alt: img.alt || '', |
| width: img.naturalWidth, |
| height: img.naturalHeight |
| })).filter(img => img.src && !img.src.startsWith('data:')) |
| )""" |
| |
| result = _run_browser_command(effective_task_id, "eval", [js_code]) |
| |
| if result.get("success"): |
| data = result.get("data", {}) |
| raw_result = data.get("result", "[]") |
| |
| try: |
| |
| if isinstance(raw_result, str): |
| images = json.loads(raw_result) |
| else: |
| images = raw_result |
| |
| return json.dumps({ |
| "success": True, |
| "images": images, |
| "count": len(images) |
| }, ensure_ascii=False) |
| except json.JSONDecodeError: |
| return json.dumps({ |
| "success": True, |
| "images": [], |
| "count": 0, |
| "warning": "Could not parse image data" |
| }, ensure_ascii=False) |
| else: |
| return json.dumps({ |
| "success": False, |
| "error": result.get("error", "Failed to get images") |
| }, ensure_ascii=False) |
|
|
|
|
| def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] = None) -> str: |
| """ |
| Take a screenshot of the current page and analyze it with vision AI. |
| |
| This tool captures what's visually displayed in the browser and sends it |
| to Gemini for analysis. Useful for understanding visual content that the |
| text-based snapshot may not capture (CAPTCHAs, verification challenges, |
| images, complex layouts, etc.). |
| |
| The screenshot is saved persistently and its file path is returned alongside |
| the analysis, so it can be shared with users via MEDIA:<path> in the response. |
| |
| Args: |
| question: What you want to know about the page visually |
| annotate: If True, overlay numbered [N] labels on interactive elements |
| task_id: Task identifier for session isolation |
| |
| Returns: |
| JSON string with vision analysis results and screenshot_path |
| """ |
| if _is_camofox_mode(): |
| from tools.browser_camofox import camofox_vision |
| return camofox_vision(question, annotate, task_id) |
|
|
| import base64 |
| import uuid as uuid_mod |
| effective_task_id = task_id or "default" |
| |
| |
| from hermes_constants import get_hermes_dir |
| screenshots_dir = get_hermes_dir("cache/screenshots", "browser_screenshots") |
| screenshot_path = screenshots_dir / f"browser_screenshot_{uuid_mod.uuid4().hex}.png" |
| |
| try: |
| screenshots_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| _cleanup_old_screenshots(screenshots_dir, max_age_hours=24) |
| |
| |
| screenshot_args = [] |
| if annotate: |
| screenshot_args.append("--annotate") |
| screenshot_args.append("--full") |
| screenshot_args.append(str(screenshot_path)) |
| result = _run_browser_command( |
| effective_task_id, |
| "screenshot", |
| screenshot_args, |
| ) |
| |
| if not result.get("success"): |
| error_detail = result.get("error", "Unknown error") |
| _cp = _get_cloud_provider() |
| mode = "local" if _cp is None else f"cloud ({_cp.provider_name()})" |
| return json.dumps({ |
| "success": False, |
| "error": f"Failed to take screenshot ({mode} mode): {error_detail}" |
| }, ensure_ascii=False) |
|
|
| actual_screenshot_path = result.get("data", {}).get("path") |
| if actual_screenshot_path: |
| screenshot_path = Path(actual_screenshot_path) |
|
|
| |
| if not screenshot_path.exists(): |
| _cp = _get_cloud_provider() |
| mode = "local" if _cp is None else f"cloud ({_cp.provider_name()})" |
| return json.dumps({ |
| "success": False, |
| "error": ( |
| f"Screenshot file was not created at {screenshot_path} ({mode} mode). " |
| f"This may indicate a socket path issue (macOS /var/folders/), " |
| f"a missing Chromium install ('agent-browser install'), " |
| f"or a stale daemon process." |
| ), |
| }, ensure_ascii=False) |
| |
| |
| _screenshot_bytes = screenshot_path.read_bytes() |
| _screenshot_b64 = base64.b64encode(_screenshot_bytes).decode("ascii") |
| data_url = f"data:image/png;base64,{_screenshot_b64}" |
| |
| vision_prompt = ( |
| f"You are analyzing a screenshot of a web browser.\n\n" |
| f"User's question: {question}\n\n" |
| f"Provide a detailed and helpful answer based on what you see in the screenshot. " |
| f"If there are interactive elements, describe them. If there are verification challenges " |
| f"or CAPTCHAs, describe what type they are and what action might be needed. " |
| f"Focus on answering the user's specific question." |
| ) |
|
|
| |
| vision_model = _get_vision_model() |
| logger.debug("browser_vision: analysing screenshot (%d bytes)", |
| len(_screenshot_bytes)) |
|
|
| |
| |
| |
| vision_timeout = 120.0 |
| vision_temperature = 0.1 |
| try: |
| from hermes_cli.config import load_config |
| _cfg = load_config() |
| _vision_cfg = _cfg.get("auxiliary", {}).get("vision", {}) |
| _vt = _vision_cfg.get("timeout") |
| if _vt is not None: |
| vision_timeout = float(_vt) |
| _vtemp = _vision_cfg.get("temperature") |
| if _vtemp is not None: |
| vision_temperature = float(_vtemp) |
| except Exception: |
| pass |
|
|
| call_kwargs = { |
| "task": "vision", |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": vision_prompt}, |
| {"type": "image_url", "image_url": {"url": data_url}}, |
| ], |
| } |
| ], |
| "max_tokens": 2000, |
| "temperature": vision_temperature, |
| "timeout": vision_timeout, |
| } |
| if vision_model: |
| call_kwargs["model"] = vision_model |
| |
| try: |
| response = call_llm(**call_kwargs) |
| except Exception as _api_err: |
| from tools.vision_tools import ( |
| _is_image_size_error, _resize_image_for_vision, _RESIZE_TARGET_BYTES, |
| ) |
| if (_is_image_size_error(_api_err) |
| and len(data_url) > _RESIZE_TARGET_BYTES): |
| logger.info( |
| "Vision API rejected screenshot (%.1f MB); " |
| "auto-resizing to ~%.0f MB and retrying...", |
| len(data_url) / (1024 * 1024), |
| _RESIZE_TARGET_BYTES / (1024 * 1024), |
| ) |
| data_url = _resize_image_for_vision( |
| screenshot_path, mime_type="image/png") |
| call_kwargs["messages"][0]["content"][1]["image_url"]["url"] = data_url |
| response = call_llm(**call_kwargs) |
| else: |
| raise |
| |
| analysis = (response.choices[0].message.content or "").strip() |
| |
| from agent.redact import redact_sensitive_text |
| analysis = redact_sensitive_text(analysis) |
| response_data = { |
| "success": True, |
| "analysis": analysis or "Vision analysis returned no content.", |
| "screenshot_path": str(screenshot_path), |
| } |
| |
| if annotate and result.get("data", {}).get("annotations"): |
| response_data["annotations"] = result["data"]["annotations"] |
| return json.dumps(response_data, ensure_ascii=False) |
| |
| except Exception as e: |
| |
| |
| |
| |
| logger.warning("browser_vision failed: %s", e, exc_info=True) |
| error_info = {"success": False, "error": f"Error during vision analysis: {str(e)}"} |
| if screenshot_path.exists(): |
| error_info["screenshot_path"] = str(screenshot_path) |
| error_info["note"] = "Screenshot was captured but vision analysis failed. You can still share it via MEDIA:<path>." |
| return json.dumps(error_info, ensure_ascii=False) |
|
|
|
|
| def _cleanup_old_screenshots(screenshots_dir, max_age_hours=24): |
| """Remove browser screenshots older than max_age_hours to prevent disk bloat. |
| |
| Throttled to run at most once per hour per directory to avoid repeated |
| scans on screenshot-heavy workflows. |
| """ |
| key = str(screenshots_dir) |
| now = time.time() |
| if now - _last_screenshot_cleanup_by_dir.get(key, 0.0) < 3600: |
| return |
| _last_screenshot_cleanup_by_dir[key] = now |
|
|
| try: |
| cutoff = time.time() - (max_age_hours * 3600) |
| for f in screenshots_dir.glob("browser_screenshot_*.png"): |
| try: |
| if f.stat().st_mtime < cutoff: |
| f.unlink() |
| except Exception as e: |
| logger.debug("Failed to clean old screenshot %s: %s", f, e) |
| except Exception as e: |
| logger.debug("Screenshot cleanup error (non-critical): %s", e) |
|
|
|
|
| def _cleanup_old_recordings(max_age_hours=72): |
| """Remove browser recordings older than max_age_hours to prevent disk bloat.""" |
| try: |
| hermes_home = get_hermes_home() |
| recordings_dir = hermes_home / "browser_recordings" |
| if not recordings_dir.exists(): |
| return |
| cutoff = time.time() - (max_age_hours * 3600) |
| for f in recordings_dir.glob("session_*.webm"): |
| try: |
| if f.stat().st_mtime < cutoff: |
| f.unlink() |
| except Exception as e: |
| logger.debug("Failed to clean old recording %s: %s", f, e) |
| except Exception as e: |
| logger.debug("Recording cleanup error (non-critical): %s", e) |
|
|
|
|
| |
| |
| |
|
|
| def cleanup_browser(task_id: Optional[str] = None) -> None: |
| """ |
| Clean up browser session for a task. |
| |
| Called automatically when a task completes or when inactivity timeout is reached. |
| Closes both the agent-browser/Browserbase session and Camofox sessions. |
| |
| Args: |
| task_id: Task identifier to clean up |
| """ |
| if task_id is None: |
| task_id = "default" |
|
|
| |
| |
| _stop_cdp_supervisor(task_id) |
|
|
| |
| |
| |
| |
| if _is_camofox_mode(): |
| try: |
| from tools.browser_camofox import camofox_close, camofox_soft_cleanup |
| if not camofox_soft_cleanup(task_id): |
| camofox_close(task_id) |
| except Exception as e: |
| logger.debug("Camofox cleanup for task %s: %s", task_id, e) |
|
|
| logger.debug("cleanup_browser called for task_id: %s", task_id) |
| logger.debug("Active sessions: %s", list(_active_sessions.keys())) |
| |
| |
| |
| with _cleanup_lock: |
| session_info = _active_sessions.get(task_id) |
| |
| if session_info: |
| bb_session_id = session_info.get("bb_session_id", "unknown") |
| logger.debug("Found session for task %s: bb_session_id=%s", task_id, bb_session_id) |
| |
| |
| _maybe_stop_recording(task_id) |
| |
| |
| try: |
| _run_browser_command(task_id, "close", [], timeout=10) |
| logger.debug("agent-browser close command completed for task %s", task_id) |
| except Exception as e: |
| logger.warning("agent-browser close failed for task %s: %s", task_id, e) |
| |
| |
| with _cleanup_lock: |
| _active_sessions.pop(task_id, None) |
| _session_last_activity.pop(task_id, None) |
| |
| |
| if bb_session_id: |
| provider = _get_cloud_provider() |
| if provider is not None: |
| try: |
| provider.close_session(bb_session_id) |
| except Exception as e: |
| logger.warning("Could not close cloud browser session: %s", e) |
| |
| |
| session_name = session_info.get("session_name", "") |
| if session_name: |
| socket_dir = os.path.join(_socket_safe_tmpdir(), f"agent-browser-{session_name}") |
| if os.path.exists(socket_dir): |
| |
| pid_file = os.path.join(socket_dir, f"{session_name}.pid") |
| if os.path.isfile(pid_file): |
| try: |
| daemon_pid = int(Path(pid_file).read_text().strip()) |
| os.kill(daemon_pid, signal.SIGTERM) |
| logger.debug("Killed daemon pid %s for %s", daemon_pid, session_name) |
| except (ProcessLookupError, ValueError, PermissionError, OSError): |
| logger.debug("Could not kill daemon pid for %s (already dead or inaccessible)", session_name) |
| shutil.rmtree(socket_dir, ignore_errors=True) |
| |
| logger.debug("Removed task %s from active sessions", task_id) |
| else: |
| logger.debug("No active session found for task_id: %s", task_id) |
|
|
|
|
| def cleanup_all_browsers() -> None: |
| """ |
| Clean up all active browser sessions. |
| |
| Useful for cleanup on shutdown. |
| """ |
| with _cleanup_lock: |
| task_ids = list(_active_sessions.keys()) |
| for task_id in task_ids: |
| cleanup_browser(task_id) |
|
|
| |
| try: |
| from tools.browser_supervisor import SUPERVISOR_REGISTRY |
| SUPERVISOR_REGISTRY.stop_all() |
| except Exception: |
| pass |
|
|
| |
| global _cached_agent_browser, _agent_browser_resolved |
| global _cached_command_timeout, _command_timeout_resolved |
| _cached_agent_browser = None |
| _agent_browser_resolved = False |
| _discover_homebrew_node_dirs.cache_clear() |
| _cached_command_timeout = None |
| _command_timeout_resolved = False |
|
|
|
|
| |
| |
| |
|
|
| def check_browser_requirements() -> bool: |
| """ |
| Check if browser tool requirements are met. |
| |
| In **local mode** (no cloud provider configured): only the |
| ``agent-browser`` CLI must be findable. |
| |
| In **cloud mode** (Browserbase, Browser Use, or Firecrawl): the CLI |
| *and* the provider's required credentials must be present. |
| |
| Returns: |
| True if all requirements are met, False otherwise |
| """ |
| |
| if _is_camofox_mode(): |
| return True |
|
|
| |
| try: |
| browser_cmd = _find_agent_browser() |
| except FileNotFoundError: |
| return False |
|
|
| |
| |
| |
| |
| if _requires_real_termux_browser_install(browser_cmd): |
| return False |
|
|
| |
| provider = _get_cloud_provider() |
| if provider is not None and not provider.is_configured(): |
| return False |
|
|
| return True |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| """ |
| Simple test/demo when run directly |
| """ |
| print("π Browser Tool Module") |
| print("=" * 40) |
|
|
| _cp = _get_cloud_provider() |
| mode = "local" if _cp is None else f"cloud ({_cp.provider_name()})" |
| print(f" Mode: {mode}") |
| |
| |
| if check_browser_requirements(): |
| print("β
All requirements met") |
| else: |
| print("β Missing requirements:") |
| try: |
| browser_cmd = _find_agent_browser() |
| if _requires_real_termux_browser_install(browser_cmd): |
| print(" - bare npx fallback found (insufficient on Termux local mode)") |
| print(f" Install: {_browser_install_hint()}") |
| except FileNotFoundError: |
| print(" - agent-browser CLI not found") |
| print(f" Install: {_browser_install_hint()}") |
| if _cp is not None and not _cp.is_configured(): |
| print(f" - {_cp.provider_name()} credentials not configured") |
| print(" Tip: set browser.cloud_provider to 'local' to use free local mode instead") |
| |
| print("\nπ Available Browser Tools:") |
| for schema in BROWSER_TOOL_SCHEMAS: |
| print(f" πΉ {schema['name']}: {schema['description'][:60]}...") |
| |
| print("\nπ‘ Usage:") |
| print(" from tools.browser_tool import browser_navigate, browser_snapshot") |
| print(" result = browser_navigate('https://example.com', task_id='my_task')") |
| print(" snapshot = browser_snapshot(task_id='my_task')") |
|
|
|
|
| |
| |
| |
| from tools.registry import registry, tool_error |
|
|
| _BROWSER_SCHEMA_MAP = {s["name"]: s for s in BROWSER_TOOL_SCHEMAS} |
|
|
| registry.register( |
| name="browser_navigate", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_navigate"], |
| handler=lambda args, **kw: browser_navigate(url=args.get("url", ""), task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="π", |
| ) |
| registry.register( |
| name="browser_snapshot", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_snapshot"], |
| handler=lambda args, **kw: browser_snapshot( |
| full=args.get("full", False), task_id=kw.get("task_id"), user_task=kw.get("user_task")), |
| check_fn=check_browser_requirements, |
| emoji="πΈ", |
| ) |
| registry.register( |
| name="browser_click", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_click"], |
| handler=lambda args, **kw: browser_click(ref=args.get("ref", ""), task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="π", |
| ) |
| registry.register( |
| name="browser_type", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_type"], |
| handler=lambda args, **kw: browser_type(ref=args.get("ref", ""), text=args.get("text", ""), task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="β¨οΈ", |
| ) |
| registry.register( |
| name="browser_scroll", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_scroll"], |
| handler=lambda args, **kw: browser_scroll(direction=args.get("direction", "down"), task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="π", |
| ) |
| registry.register( |
| name="browser_back", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_back"], |
| handler=lambda args, **kw: browser_back(task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="βοΈ", |
| ) |
| registry.register( |
| name="browser_press", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_press"], |
| handler=lambda args, **kw: browser_press(key=args.get("key", ""), task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="β¨οΈ", |
| ) |
|
|
| registry.register( |
| name="browser_get_images", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_get_images"], |
| handler=lambda args, **kw: browser_get_images(task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="πΌοΈ", |
| ) |
| registry.register( |
| name="browser_vision", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_vision"], |
| handler=lambda args, **kw: browser_vision(question=args.get("question", ""), annotate=args.get("annotate", False), task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="ποΈ", |
| ) |
| registry.register( |
| name="browser_console", |
| toolset="browser", |
| schema=_BROWSER_SCHEMA_MAP["browser_console"], |
| handler=lambda args, **kw: browser_console(clear=args.get("clear", False), expression=args.get("expression"), task_id=kw.get("task_id")), |
| check_fn=check_browser_requirements, |
| emoji="π₯οΈ", |
| ) |
|
|