| |
| """ |
| Code Execution Tool -- Programmatic Tool Calling (PTC) |
| |
| Lets the LLM write a Python script that calls Hermes tools via RPC, |
| collapsing multi-step tool chains into a single inference turn. |
| |
| Architecture (two transports): |
| |
| **Local backend (UDS):** |
| 1. Parent generates a `hermes_tools.py` stub module with UDS RPC functions |
| 2. Parent opens a Unix domain socket and starts an RPC listener thread |
| 3. Parent spawns a child process that runs the LLM's script |
| 4. Tool calls travel over the UDS back to the parent for dispatch |
| |
| **Remote backends (file-based RPC):** |
| 1. Parent generates `hermes_tools.py` with file-based RPC stubs |
| 2. Parent ships both files to the remote environment |
| 3. Script runs inside the terminal backend (Docker/SSH/Modal/Daytona/etc.) |
| 4. Tool calls are written as request files; a polling thread on the parent |
| reads them via env.execute(), dispatches, and writes response files |
| 5. The script polls for response files and continues |
| |
| In both cases, only the script's stdout is returned to the LLM; intermediate |
| tool results never enter the context window. |
| |
| Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Windows. |
| Remote execution additionally requires Python 3 in the terminal backend. |
| """ |
|
|
| import base64 |
| import functools |
| import json |
| import logging |
| import os |
| import platform |
| import shlex |
| import signal |
| import socket |
| import subprocess |
| import sys |
| import tempfile |
| import threading |
| import time |
| import uuid |
|
|
| _IS_WINDOWS = platform.system() == "Windows" |
| from typing import Any, Dict, List, Optional |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| SANDBOX_AVAILABLE = sys.platform != "win32" |
|
|
| |
| |
| SANDBOX_ALLOWED_TOOLS = frozenset([ |
| "web_search", |
| "web_extract", |
| "read_file", |
| "write_file", |
| "search_files", |
| "patch", |
| "terminal", |
| ]) |
|
|
| |
| DEFAULT_TIMEOUT = 300 |
| DEFAULT_MAX_TOOL_CALLS = 50 |
| MAX_STDOUT_BYTES = 50_000 |
| MAX_STDERR_BYTES = 10_000 |
|
|
|
|
| def check_sandbox_requirements() -> bool: |
| """Code execution sandbox requires a POSIX OS for Unix domain sockets.""" |
| return SANDBOX_AVAILABLE |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| _TOOL_STUBS = { |
| "web_search": ( |
| "web_search", |
| "query: str, limit: int = 5", |
| '"""Search the web. Returns dict with data.web list of {url, title, description}."""', |
| '{"query": query, "limit": limit}', |
| ), |
| "web_extract": ( |
| "web_extract", |
| "urls: list", |
| '"""Extract content from URLs. Returns dict with results list of {url, title, content, error}."""', |
| '{"urls": urls}', |
| ), |
| "read_file": ( |
| "read_file", |
| "path: str, offset: int = 1, limit: int = 500", |
| '"""Read a file (1-indexed lines). Returns dict with "content" and "total_lines"."""', |
| '{"path": path, "offset": offset, "limit": limit}', |
| ), |
| "write_file": ( |
| "write_file", |
| "path: str, content: str", |
| '"""Write content to a file (always overwrites). Returns dict with status."""', |
| '{"path": path, "content": content}', |
| ), |
| "search_files": ( |
| "search_files", |
| 'pattern: str, target: str = "content", path: str = ".", file_glob: str = None, limit: int = 50, offset: int = 0, output_mode: str = "content", context: int = 0', |
| '"""Search file contents (target="content") or find files by name (target="files"). Returns dict with "matches"."""', |
| '{"pattern": pattern, "target": target, "path": path, "file_glob": file_glob, "limit": limit, "offset": offset, "output_mode": output_mode, "context": context}', |
| ), |
| "patch": ( |
| "patch", |
| 'path: str = None, old_string: str = None, new_string: str = None, replace_all: bool = False, mode: str = "replace", patch: str = None', |
| '"""Targeted find-and-replace (mode="replace") or V4A multi-file patches (mode="patch"). Returns dict with status."""', |
| '{"path": path, "old_string": old_string, "new_string": new_string, "replace_all": replace_all, "mode": mode, "patch": patch}', |
| ), |
| "terminal": ( |
| "terminal", |
| "command: str, timeout: int = None, workdir: str = None", |
| '"""Run a shell command (foreground only). Returns dict with "output" and "exit_code"."""', |
| '{"command": command, "timeout": timeout, "workdir": workdir}', |
| ), |
| } |
|
|
|
|
| def generate_hermes_tools_module(enabled_tools: List[str], |
| transport: str = "uds") -> str: |
| """ |
| Build the source code for the hermes_tools.py stub module. |
| |
| Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs. |
| |
| Args: |
| enabled_tools: Tool names enabled in the current session. |
| transport: ``"uds"`` for Unix domain socket (local backend) or |
| ``"file"`` for file-based RPC (remote backends). |
| """ |
| tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools)) |
|
|
| stub_functions = [] |
| export_names = [] |
| for tool_name in tools_to_generate: |
| if tool_name not in _TOOL_STUBS: |
| continue |
| func_name, sig, doc, args_expr = _TOOL_STUBS[tool_name] |
| stub_functions.append( |
| f"def {func_name}({sig}):\n" |
| f" {doc}\n" |
| f" return _call({func_name!r}, {args_expr})\n" |
| ) |
| export_names.append(func_name) |
|
|
| if transport == "file": |
| header = _FILE_TRANSPORT_HEADER |
| else: |
| header = _UDS_TRANSPORT_HEADER |
|
|
| return header + "\n".join(stub_functions) |
|
|
|
|
| |
|
|
| _COMMON_HELPERS = '''\ |
| |
| # --------------------------------------------------------------------------- |
| # Convenience helpers (avoid common scripting pitfalls) |
| # --------------------------------------------------------------------------- |
| |
| def json_parse(text: str): |
| """Parse JSON tolerant of control characters (strict=False). |
| Use this instead of json.loads() when parsing output from terminal() |
| or web_extract() that may contain raw tabs/newlines in strings.""" |
| return json.loads(text, strict=False) |
| |
| |
| def shell_quote(s: str) -> str: |
| """Shell-escape a string for safe interpolation into commands. |
| Use this when inserting dynamic content into terminal() commands: |
| terminal(f"echo {shell_quote(user_input)}") |
| """ |
| return shlex.quote(s) |
| |
| |
| def retry(fn, max_attempts=3, delay=2): |
| """Retry a function up to max_attempts times with exponential backoff. |
| Use for transient failures (network errors, API rate limits): |
| result = retry(lambda: terminal("gh issue list ...")) |
| """ |
| last_err = None |
| for attempt in range(max_attempts): |
| try: |
| return fn() |
| except Exception as e: |
| last_err = e |
| if attempt < max_attempts - 1: |
| time.sleep(delay * (2 ** attempt)) |
| raise last_err |
| |
| ''' |
|
|
| |
|
|
| _UDS_TRANSPORT_HEADER = '''\ |
| """Auto-generated Hermes tools RPC stubs.""" |
| import json, os, socket, shlex, time |
| |
| _sock = None |
| ''' + _COMMON_HELPERS + '''\ |
| |
| def _connect(): |
| global _sock |
| if _sock is None: |
| _sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| _sock.connect(os.environ["HERMES_RPC_SOCKET"]) |
| _sock.settimeout(300) |
| return _sock |
| |
| def _call(tool_name, args): |
| """Send a tool call to the parent process and return the parsed result.""" |
| conn = _connect() |
| request = json.dumps({"tool": tool_name, "args": args}) + "\\n" |
| conn.sendall(request.encode()) |
| buf = b"" |
| while True: |
| chunk = conn.recv(65536) |
| if not chunk: |
| raise RuntimeError("Agent process disconnected") |
| buf += chunk |
| if buf.endswith(b"\\n"): |
| break |
| raw = buf.decode().strip() |
| result = json.loads(raw) |
| if isinstance(result, str): |
| try: |
| return json.loads(result) |
| except (json.JSONDecodeError, TypeError): |
| return result |
| return result |
| |
| ''' |
|
|
| |
|
|
| _FILE_TRANSPORT_HEADER = '''\ |
| """Auto-generated Hermes tools RPC stubs (file-based transport).""" |
| import json, os, shlex, tempfile, time |
| |
| _RPC_DIR = os.environ.get("HERMES_RPC_DIR") or os.path.join(tempfile.gettempdir(), "hermes_rpc") |
| _seq = 0 |
| ''' + _COMMON_HELPERS + '''\ |
| |
| def _call(tool_name, args): |
| """Send a tool call request via file-based RPC and wait for response.""" |
| global _seq |
| _seq += 1 |
| seq_str = f"{_seq:06d}" |
| req_file = os.path.join(_RPC_DIR, f"req_{seq_str}") |
| res_file = os.path.join(_RPC_DIR, f"res_{seq_str}") |
| |
| # Write request atomically (write to .tmp, then rename) |
| tmp = req_file + ".tmp" |
| with open(tmp, "w") as f: |
| json.dump({"tool": tool_name, "args": args, "seq": _seq}, f) |
| os.rename(tmp, req_file) |
| |
| # Wait for response with adaptive polling |
| deadline = time.monotonic() + 300 # 5-minute timeout per tool call |
| poll_interval = 0.05 # Start at 50ms |
| while not os.path.exists(res_file): |
| if time.monotonic() > deadline: |
| raise RuntimeError(f"RPC timeout: no response for {tool_name} after 300s") |
| time.sleep(poll_interval) |
| poll_interval = min(poll_interval * 1.2, 0.25) # Back off to 250ms |
| |
| with open(res_file) as f: |
| raw = f.read() |
| |
| # Clean up response file |
| try: |
| os.unlink(res_file) |
| except OSError: |
| pass |
| |
| result = json.loads(raw) |
| if isinstance(result, str): |
| try: |
| return json.loads(result) |
| except (json.JSONDecodeError, TypeError): |
| return result |
| return result |
| |
| ''' |
|
|
|
|
| |
| |
| |
|
|
| |
| _TERMINAL_BLOCKED_PARAMS = {"background", "pty", "notify_on_complete", "watch_patterns"} |
|
|
|
|
| def _rpc_server_loop( |
| server_sock: socket.socket, |
| task_id: str, |
| tool_call_log: list, |
| tool_call_counter: list, |
| max_tool_calls: int, |
| allowed_tools: frozenset, |
| ): |
| """ |
| Accept one client connection and dispatch tool-call requests until |
| the client disconnects or the call limit is reached. |
| """ |
| from model_tools import handle_function_call |
|
|
| conn = None |
| try: |
| server_sock.settimeout(5) |
| conn, _ = server_sock.accept() |
| conn.settimeout(300) |
|
|
| buf = b"" |
| while True: |
| try: |
| chunk = conn.recv(65536) |
| except socket.timeout: |
| break |
| if not chunk: |
| break |
| buf += chunk |
|
|
| |
| while b"\n" in buf: |
| line, buf = buf.split(b"\n", 1) |
| line = line.strip() |
| if not line: |
| continue |
|
|
| call_start = time.monotonic() |
| try: |
| request = json.loads(line.decode()) |
| except (json.JSONDecodeError, UnicodeDecodeError) as exc: |
| resp = tool_error(f"Invalid RPC request: {exc}") |
| conn.sendall((resp + "\n").encode()) |
| continue |
|
|
| tool_name = request.get("tool", "") |
| tool_args = request.get("args", {}) |
|
|
| |
| if tool_name not in allowed_tools: |
| available = ", ".join(sorted(allowed_tools)) |
| resp = json.dumps({ |
| "error": ( |
| f"Tool '{tool_name}' is not available in execute_code. " |
| f"Available: {available}" |
| ) |
| }) |
| conn.sendall((resp + "\n").encode()) |
| continue |
|
|
| |
| if tool_call_counter[0] >= max_tool_calls: |
| resp = json.dumps({ |
| "error": ( |
| f"Tool call limit reached ({max_tool_calls}). " |
| "No more tool calls allowed in this execution." |
| ) |
| }) |
| conn.sendall((resp + "\n").encode()) |
| continue |
|
|
| |
| if tool_name == "terminal" and isinstance(tool_args, dict): |
| for param in _TERMINAL_BLOCKED_PARAMS: |
| tool_args.pop(param, None) |
|
|
| |
| |
| |
| try: |
| _real_stdout, _real_stderr = sys.stdout, sys.stderr |
| devnull = open(os.devnull, "w") |
| try: |
| sys.stdout = devnull |
| sys.stderr = devnull |
| result = handle_function_call( |
| tool_name, tool_args, task_id=task_id |
| ) |
| finally: |
| sys.stdout, sys.stderr = _real_stdout, _real_stderr |
| devnull.close() |
| except Exception as exc: |
| logger.error("Tool call failed in sandbox: %s", exc, exc_info=True) |
| result = tool_error(str(exc)) |
|
|
| tool_call_counter[0] += 1 |
| call_duration = time.monotonic() - call_start |
|
|
| |
| args_preview = str(tool_args)[:80] |
| tool_call_log.append({ |
| "tool": tool_name, |
| "args_preview": args_preview, |
| "duration": round(call_duration, 2), |
| }) |
|
|
| conn.sendall((result + "\n").encode()) |
|
|
| except socket.timeout: |
| logger.debug("RPC listener socket timeout") |
| except OSError as e: |
| logger.debug("RPC listener socket error: %s", e, exc_info=True) |
| finally: |
| if conn: |
| try: |
| conn.close() |
| except OSError as e: |
| logger.debug("RPC conn close error: %s", e) |
|
|
|
|
| |
| |
| |
|
|
| def _get_or_create_env(task_id: str): |
| """Get or create the terminal environment for *task_id*. |
| |
| Reuses the same environment (container/sandbox/SSH session) that the |
| terminal and file tools use, creating one if it doesn't exist yet. |
| Returns ``(env, env_type)`` tuple. |
| """ |
| from tools.terminal_tool import ( |
| _active_environments, _env_lock, _create_environment, |
| _get_env_config, _last_activity, _start_cleanup_thread, |
| _creation_locks, _creation_locks_lock, _task_env_overrides, |
| ) |
|
|
| effective_task_id = task_id or "default" |
|
|
| |
| with _env_lock: |
| if effective_task_id in _active_environments: |
| _last_activity[effective_task_id] = time.time() |
| return _active_environments[effective_task_id], _get_env_config()["env_type"] |
|
|
| |
| with _creation_locks_lock: |
| if effective_task_id not in _creation_locks: |
| _creation_locks[effective_task_id] = threading.Lock() |
| task_lock = _creation_locks[effective_task_id] |
|
|
| with task_lock: |
| with _env_lock: |
| if effective_task_id in _active_environments: |
| _last_activity[effective_task_id] = time.time() |
| return _active_environments[effective_task_id], _get_env_config()["env_type"] |
|
|
| config = _get_env_config() |
| env_type = config["env_type"] |
| overrides = _task_env_overrides.get(effective_task_id, {}) |
|
|
| if env_type == "docker": |
| image = overrides.get("docker_image") or config["docker_image"] |
| elif env_type == "singularity": |
| image = overrides.get("singularity_image") or config["singularity_image"] |
| elif env_type == "modal": |
| image = overrides.get("modal_image") or config["modal_image"] |
| elif env_type == "daytona": |
| image = overrides.get("daytona_image") or config["daytona_image"] |
| else: |
| image = "" |
|
|
| cwd = overrides.get("cwd") or config["cwd"] |
|
|
| container_config = None |
| if env_type in ("docker", "singularity", "modal", "daytona"): |
| container_config = { |
| "container_cpu": config.get("container_cpu", 1), |
| "container_memory": config.get("container_memory", 5120), |
| "container_disk": config.get("container_disk", 51200), |
| "container_persistent": config.get("container_persistent", True), |
| "docker_volumes": config.get("docker_volumes", []), |
| } |
|
|
| ssh_config = None |
| if env_type == "ssh": |
| ssh_config = { |
| "host": config.get("ssh_host", ""), |
| "user": config.get("ssh_user", ""), |
| "port": config.get("ssh_port", 22), |
| "key": config.get("ssh_key", ""), |
| "persistent": config.get("ssh_persistent", False), |
| } |
|
|
| local_config = None |
| if env_type == "local": |
| local_config = { |
| "persistent": config.get("local_persistent", False), |
| } |
|
|
| logger.info("Creating new %s environment for execute_code task %s...", |
| env_type, effective_task_id[:8]) |
| env = _create_environment( |
| env_type=env_type, |
| image=image, |
| cwd=cwd, |
| timeout=config["timeout"], |
| ssh_config=ssh_config, |
| container_config=container_config, |
| local_config=local_config, |
| task_id=effective_task_id, |
| host_cwd=config.get("host_cwd"), |
| ) |
|
|
| with _env_lock: |
| _active_environments[effective_task_id] = env |
| _last_activity[effective_task_id] = time.time() |
|
|
| _start_cleanup_thread() |
| logger.info("%s environment ready for execute_code task %s", |
| env_type, effective_task_id[:8]) |
| return env, env_type |
|
|
|
|
| def _ship_file_to_remote(env, remote_path: str, content: str) -> None: |
| """Write *content* to *remote_path* on the remote environment. |
| |
| Uses ``echo โฆ | base64 -d`` rather than stdin piping because some |
| backends (Modal) don't reliably deliver stdin_data to chained |
| commands. Base64 output is shell-safe ([A-Za-z0-9+/=]) so single |
| quotes are fine. |
| """ |
| encoded = base64.b64encode(content.encode("utf-8")).decode("ascii") |
| quoted_remote_path = shlex.quote(remote_path) |
| env.execute( |
| f"echo '{encoded}' | base64 -d > {quoted_remote_path}", |
| cwd="/", |
| timeout=30, |
| ) |
|
|
|
|
| def _env_temp_dir(env: Any) -> str: |
| """Return a writable temp dir for env-backed execute_code sandboxes.""" |
| get_temp_dir = getattr(env, "get_temp_dir", None) |
| if callable(get_temp_dir): |
| try: |
| temp_dir = get_temp_dir() |
| if isinstance(temp_dir, str) and temp_dir.startswith("/"): |
| return temp_dir.rstrip("/") or "/" |
| except Exception as exc: |
| logger.debug("Could not resolve execute_code env temp dir: %s", exc) |
| candidate = tempfile.gettempdir() |
| if isinstance(candidate, str) and candidate.startswith("/"): |
| return candidate.rstrip("/") or "/" |
| return "/tmp" |
|
|
|
|
| def _rpc_poll_loop( |
| env, |
| rpc_dir: str, |
| task_id: str, |
| tool_call_log: list, |
| tool_call_counter: list, |
| max_tool_calls: int, |
| allowed_tools: frozenset, |
| stop_event: threading.Event, |
| ): |
| """Poll the remote filesystem for tool call requests and dispatch them. |
| |
| Runs in a background thread. Each ``env.execute()`` spawns an |
| independent process, so these calls run safely concurrent with the |
| script-execution thread. |
| """ |
| from model_tools import handle_function_call |
|
|
| poll_interval = 0.1 |
|
|
| quoted_rpc_dir = shlex.quote(rpc_dir) |
| while not stop_event.is_set(): |
| try: |
| |
| ls_result = env.execute( |
| f"ls -1 {quoted_rpc_dir}/req_* 2>/dev/null || true", |
| cwd="/", |
| timeout=10, |
| ) |
| output = ls_result.get("output", "").strip() |
| if not output: |
| stop_event.wait(poll_interval) |
| continue |
|
|
| req_files = sorted([ |
| f.strip() for f in output.split("\n") |
| if f.strip() |
| and not f.strip().endswith(".tmp") |
| and "/req_" in f.strip() |
| ]) |
|
|
| for req_file in req_files: |
| if stop_event.is_set(): |
| break |
|
|
| call_start = time.monotonic() |
|
|
| quoted_req_file = shlex.quote(req_file) |
| |
| read_result = env.execute( |
| f"cat {quoted_req_file}", |
| cwd="/", |
| timeout=10, |
| ) |
| try: |
| request = json.loads(read_result.get("output", "")) |
| except (json.JSONDecodeError, ValueError): |
| logger.debug("Malformed RPC request in %s", req_file) |
| |
| env.execute(f"rm -f {quoted_req_file}", cwd="/", timeout=5) |
| continue |
|
|
| tool_name = request.get("tool", "") |
| tool_args = request.get("args", {}) |
| seq = request.get("seq", 0) |
| seq_str = f"{seq:06d}" |
| res_file = f"{rpc_dir}/res_{seq_str}" |
| quoted_res_file = shlex.quote(res_file) |
|
|
| |
| if tool_name not in allowed_tools: |
| available = ", ".join(sorted(allowed_tools)) |
| tool_result = json.dumps({ |
| "error": ( |
| f"Tool '{tool_name}' is not available in execute_code. " |
| f"Available: {available}" |
| ) |
| }) |
| |
| elif tool_call_counter[0] >= max_tool_calls: |
| tool_result = json.dumps({ |
| "error": ( |
| f"Tool call limit reached ({max_tool_calls}). " |
| "No more tool calls allowed in this execution." |
| ) |
| }) |
| else: |
| |
| if tool_name == "terminal" and isinstance(tool_args, dict): |
| for param in _TERMINAL_BLOCKED_PARAMS: |
| tool_args.pop(param, None) |
|
|
| |
| try: |
| _real_stdout, _real_stderr = sys.stdout, sys.stderr |
| devnull = open(os.devnull, "w") |
| try: |
| sys.stdout = devnull |
| sys.stderr = devnull |
| tool_result = handle_function_call( |
| tool_name, tool_args, task_id=task_id |
| ) |
| finally: |
| sys.stdout, sys.stderr = _real_stdout, _real_stderr |
| devnull.close() |
| except Exception as exc: |
| logger.error("Tool call failed in remote sandbox: %s", |
| exc, exc_info=True) |
| tool_result = tool_error(str(exc)) |
|
|
| tool_call_counter[0] += 1 |
| call_duration = time.monotonic() - call_start |
| tool_call_log.append({ |
| "tool": tool_name, |
| "args_preview": str(tool_args)[:80], |
| "duration": round(call_duration, 2), |
| }) |
|
|
| |
| |
| |
| encoded_result = base64.b64encode( |
| tool_result.encode("utf-8") |
| ).decode("ascii") |
| env.execute( |
| f"echo '{encoded_result}' | base64 -d > {quoted_res_file}.tmp" |
| f" && mv {quoted_res_file}.tmp {quoted_res_file}", |
| cwd="/", |
| timeout=60, |
| ) |
|
|
| |
| env.execute(f"rm -f {quoted_req_file}", cwd="/", timeout=5) |
|
|
| except Exception as e: |
| if not stop_event.is_set(): |
| logger.debug("RPC poll error: %s", e, exc_info=True) |
|
|
| if not stop_event.is_set(): |
| stop_event.wait(poll_interval) |
|
|
|
|
| def _execute_remote( |
| code: str, |
| task_id: Optional[str], |
| enabled_tools: Optional[List[str]], |
| ) -> str: |
| """Run a script on the remote terminal backend via file-based RPC. |
| |
| The script and the generated hermes_tools.py module are shipped to |
| the remote environment, and tool calls are proxied through a polling |
| thread that communicates via request/response files. |
| """ |
|
|
| _cfg = _load_config() |
| timeout = _cfg.get("timeout", DEFAULT_TIMEOUT) |
| max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS) |
|
|
| session_tools = set(enabled_tools) if enabled_tools else set() |
| sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools) |
| if not sandbox_tools: |
| sandbox_tools = SANDBOX_ALLOWED_TOOLS |
|
|
| effective_task_id = task_id or "default" |
| env, env_type = _get_or_create_env(effective_task_id) |
|
|
| sandbox_id = uuid.uuid4().hex[:12] |
| temp_dir = _env_temp_dir(env) |
| sandbox_dir = f"{temp_dir}/hermes_exec_{sandbox_id}" |
| quoted_sandbox_dir = shlex.quote(sandbox_dir) |
| quoted_rpc_dir = shlex.quote(f"{sandbox_dir}/rpc") |
|
|
| tool_call_log: list = [] |
| tool_call_counter = [0] |
| exec_start = time.monotonic() |
| stop_event = threading.Event() |
| rpc_thread = None |
|
|
| try: |
| |
| py_check = env.execute( |
| "command -v python3 >/dev/null 2>&1 && echo OK", |
| cwd="/", timeout=15, |
| ) |
| if "OK" not in py_check.get("output", ""): |
| return json.dumps({ |
| "status": "error", |
| "error": ( |
| f"Python 3 is not available in the {env_type} terminal " |
| "environment. Install Python to use execute_code with " |
| "remote backends." |
| ), |
| "tool_calls_made": 0, |
| "duration_seconds": 0, |
| }) |
|
|
| |
| env.execute( |
| f"mkdir -p {quoted_rpc_dir}", cwd="/", timeout=10, |
| ) |
|
|
| |
| tools_src = generate_hermes_tools_module( |
| list(sandbox_tools), transport="file", |
| ) |
| _ship_file_to_remote(env, f"{sandbox_dir}/hermes_tools.py", tools_src) |
| _ship_file_to_remote(env, f"{sandbox_dir}/script.py", code) |
|
|
| |
| rpc_thread = threading.Thread( |
| target=_rpc_poll_loop, |
| args=( |
| env, f"{sandbox_dir}/rpc", effective_task_id, |
| tool_call_log, tool_call_counter, max_tool_calls, |
| sandbox_tools, stop_event, |
| ), |
| daemon=True, |
| ) |
| rpc_thread.start() |
|
|
| |
| env_prefix = ( |
| f"HERMES_RPC_DIR={shlex.quote(f'{sandbox_dir}/rpc')} " |
| f"PYTHONDONTWRITEBYTECODE=1" |
| ) |
| tz = os.getenv("HERMES_TIMEZONE", "").strip() |
| if tz: |
| env_prefix += f" TZ={tz}" |
|
|
| |
| logger.info("Executing code on %s backend (task %s)...", |
| env_type, effective_task_id[:8]) |
| script_result = env.execute( |
| f"cd {quoted_sandbox_dir} && {env_prefix} python3 script.py", |
| timeout=timeout, |
| ) |
|
|
| stdout_text = script_result.get("output", "") |
| exit_code = script_result.get("returncode", -1) |
| status = "success" |
|
|
| |
| if exit_code == 124: |
| status = "timeout" |
| elif exit_code == 130: |
| status = "interrupted" |
|
|
| except Exception as exc: |
| duration = round(time.monotonic() - exec_start, 2) |
| logger.error( |
| "execute_code remote failed after %ss with %d tool calls: %s: %s", |
| duration, tool_call_counter[0], type(exc).__name__, exc, |
| exc_info=True, |
| ) |
| return json.dumps({ |
| "status": "error", |
| "error": str(exc), |
| "tool_calls_made": tool_call_counter[0], |
| "duration_seconds": duration, |
| }, ensure_ascii=False) |
|
|
| finally: |
| |
| stop_event.set() |
| if rpc_thread is not None: |
| rpc_thread.join(timeout=5) |
|
|
| |
| try: |
| env.execute( |
| f"rm -rf {quoted_sandbox_dir}", cwd="/", timeout=15, |
| ) |
| except Exception: |
| logger.debug("Failed to clean up remote sandbox %s", sandbox_dir) |
|
|
| duration = round(time.monotonic() - exec_start, 2) |
|
|
| |
|
|
| |
| if len(stdout_text) > MAX_STDOUT_BYTES: |
| head_bytes = int(MAX_STDOUT_BYTES * 0.4) |
| tail_bytes = MAX_STDOUT_BYTES - head_bytes |
| head = stdout_text[:head_bytes] |
| tail = stdout_text[-tail_bytes:] |
| omitted = len(stdout_text) - len(head) - len(tail) |
| stdout_text = ( |
| head |
| + f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted " |
| f"out of {len(stdout_text):,} total] ...\n\n" |
| + tail |
| ) |
|
|
| |
| from tools.ansi_strip import strip_ansi |
| stdout_text = strip_ansi(stdout_text) |
|
|
| |
| from agent.redact import redact_sensitive_text |
| stdout_text = redact_sensitive_text(stdout_text) |
|
|
| |
| result: Dict[str, Any] = { |
| "status": status, |
| "output": stdout_text, |
| "tool_calls_made": tool_call_counter[0], |
| "duration_seconds": duration, |
| } |
|
|
| if status == "timeout": |
| timeout_msg = f"Script timed out after {timeout}s and was killed." |
| result["error"] = timeout_msg |
| |
| |
| if stdout_text: |
| result["output"] = stdout_text + f"\n\nโฐ {timeout_msg}" |
| else: |
| result["output"] = f"โฐ {timeout_msg}" |
| logger.warning( |
| "execute_code (remote) timed out after %ss (limit %ss) with %d tool calls", |
| duration, timeout, tool_call_counter[0], |
| ) |
| elif status == "interrupted": |
| result["output"] = ( |
| stdout_text + "\n[execution interrupted โ user sent a new message]" |
| ) |
| elif exit_code != 0: |
| result["status"] = "error" |
| result["error"] = f"Script exited with code {exit_code}" |
|
|
| return json.dumps(result, ensure_ascii=False) |
|
|
|
|
| |
| |
| |
|
|
| def execute_code( |
| code: str, |
| task_id: Optional[str] = None, |
| enabled_tools: Optional[List[str]] = None, |
| ) -> str: |
| """ |
| Run a Python script in a sandboxed child process with RPC access |
| to a subset of Hermes tools. |
| |
| Dispatches to the local (UDS) or remote (file-based RPC) path |
| depending on the configured terminal backend. |
| |
| Args: |
| code: Python source code to execute. |
| task_id: Session task ID for tool isolation (terminal env, etc.). |
| enabled_tools: Tool names enabled in the current session. The sandbox |
| gets the intersection with SANDBOX_ALLOWED_TOOLS. |
| |
| Returns: |
| JSON string with execution results. |
| """ |
| if not SANDBOX_AVAILABLE: |
| return json.dumps({ |
| "error": "execute_code is not available on Windows. Use normal tool calls instead." |
| }) |
|
|
| if not code or not code.strip(): |
| return tool_error("No code provided.") |
|
|
| |
| from tools.terminal_tool import _get_env_config |
| env_type = _get_env_config()["env_type"] |
| if env_type != "local": |
| return _execute_remote(code, task_id, enabled_tools) |
|
|
| |
|
|
| |
| from tools.interrupt import is_interrupted as _is_interrupted |
|
|
| |
| _cfg = _load_config() |
| timeout = _cfg.get("timeout", DEFAULT_TIMEOUT) |
| max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS) |
|
|
| |
| session_tools = set(enabled_tools) if enabled_tools else set() |
| sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools) |
|
|
| if not sandbox_tools: |
| sandbox_tools = SANDBOX_ALLOWED_TOOLS |
|
|
| |
| tmpdir = tempfile.mkdtemp(prefix="hermes_sandbox_") |
| |
| |
| |
| _sock_tmpdir = "/tmp" if sys.platform == "darwin" else tempfile.gettempdir() |
| sock_path = os.path.join(_sock_tmpdir, f"hermes_rpc_{uuid.uuid4().hex}.sock") |
|
|
| tool_call_log: list = [] |
| tool_call_counter = [0] |
| exec_start = time.monotonic() |
| server_sock = None |
|
|
| try: |
| |
| |
| |
| tools_src = generate_hermes_tools_module(list(sandbox_tools)) |
| with open(os.path.join(tmpdir, "hermes_tools.py"), "w") as f: |
| f.write(tools_src) |
|
|
| |
| with open(os.path.join(tmpdir, "script.py"), "w") as f: |
| f.write(code) |
|
|
| |
| server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| server_sock.bind(sock_path) |
| os.chmod(sock_path, 0o600) |
| server_sock.listen(1) |
|
|
| rpc_thread = threading.Thread( |
| target=_rpc_server_loop, |
| args=( |
| server_sock, task_id, tool_call_log, |
| tool_call_counter, max_tool_calls, sandbox_tools, |
| ), |
| daemon=True, |
| ) |
| rpc_thread.start() |
|
|
| |
| |
| |
| |
| |
| |
| |
| _SAFE_ENV_PREFIXES = ("PATH", "HOME", "USER", "LANG", "LC_", "TERM", |
| "TMPDIR", "TMP", "TEMP", "SHELL", "LOGNAME", |
| "XDG_", "PYTHONPATH", "VIRTUAL_ENV", "CONDA", |
| "HERMES_") |
| _SECRET_SUBSTRINGS = ("KEY", "TOKEN", "SECRET", "PASSWORD", "CREDENTIAL", |
| "PASSWD", "AUTH") |
| try: |
| from tools.env_passthrough import is_env_passthrough as _is_passthrough |
| except Exception: |
| _is_passthrough = lambda _: False |
| child_env = {} |
| for k, v in os.environ.items(): |
| |
| if _is_passthrough(k): |
| child_env[k] = v |
| continue |
| |
| if any(s in k.upper() for s in _SECRET_SUBSTRINGS): |
| continue |
| |
| if any(k.startswith(p) for p in _SAFE_ENV_PREFIXES): |
| child_env[k] = v |
| child_env["HERMES_RPC_SOCKET"] = sock_path |
| child_env["PYTHONDONTWRITEBYTECODE"] = "1" |
| |
| |
| |
| |
| _hermes_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| _existing_pp = child_env.get("PYTHONPATH", "") |
| _pp_parts = [tmpdir, _hermes_root] |
| if _existing_pp: |
| _pp_parts.append(_existing_pp) |
| child_env["PYTHONPATH"] = os.pathsep.join(_pp_parts) |
| |
| |
| |
| |
| _tz_name = os.getenv("HERMES_TIMEZONE", "").strip() |
| if _tz_name: |
| child_env["TZ"] = _tz_name |
| child_env.pop("HERMES_TIMEZONE", None) |
|
|
| |
| |
| from hermes_constants import get_subprocess_home |
| _profile_home = get_subprocess_home() |
| if _profile_home: |
| child_env["HOME"] = _profile_home |
|
|
| |
| |
| |
| |
| |
| _mode = _get_execution_mode() |
| _child_python = _resolve_child_python(_mode) |
| _child_cwd = _resolve_child_cwd(_mode, tmpdir) |
| _script_path = os.path.join(tmpdir, "script.py") |
|
|
| proc = subprocess.Popen( |
| [_child_python, _script_path], |
| cwd=_child_cwd, |
| env=child_env, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| stdin=subprocess.DEVNULL, |
| preexec_fn=None if _IS_WINDOWS else os.setsid, |
| ) |
|
|
| |
| deadline = time.monotonic() + timeout |
| stderr_chunks: list = [] |
|
|
| |
| |
| |
| |
| _STDOUT_HEAD_BYTES = int(MAX_STDOUT_BYTES * 0.4) |
| _STDOUT_TAIL_BYTES = MAX_STDOUT_BYTES - _STDOUT_HEAD_BYTES |
|
|
| def _drain(pipe, chunks, max_bytes): |
| """Simple head-only drain (used for stderr).""" |
| total = 0 |
| try: |
| while True: |
| data = pipe.read(4096) |
| if not data: |
| break |
| if total < max_bytes: |
| keep = max_bytes - total |
| chunks.append(data[:keep]) |
| total += len(data) |
| except (ValueError, OSError) as e: |
| logger.debug("Error reading process output: %s", e, exc_info=True) |
|
|
| stdout_total_bytes = [0] |
|
|
| def _drain_head_tail(pipe, head_chunks, tail_chunks, head_bytes, tail_bytes, total_ref): |
| """Drain stdout keeping both head and tail data.""" |
| head_collected = 0 |
| from collections import deque |
| tail_buf = deque() |
| tail_collected = 0 |
| try: |
| while True: |
| data = pipe.read(4096) |
| if not data: |
| break |
| total_ref[0] += len(data) |
| |
| if head_collected < head_bytes: |
| keep = min(len(data), head_bytes - head_collected) |
| head_chunks.append(data[:keep]) |
| head_collected += keep |
| data = data[keep:] |
| if not data: |
| continue |
| |
| tail_buf.append(data) |
| tail_collected += len(data) |
| |
| while tail_collected > tail_bytes and tail_buf: |
| oldest = tail_buf.popleft() |
| tail_collected -= len(oldest) |
| except (ValueError, OSError): |
| pass |
| |
| tail_chunks.extend(tail_buf) |
|
|
| stdout_head_chunks: list = [] |
| stdout_tail_chunks: list = [] |
|
|
| stdout_reader = threading.Thread( |
| target=_drain_head_tail, |
| args=(proc.stdout, stdout_head_chunks, stdout_tail_chunks, |
| _STDOUT_HEAD_BYTES, _STDOUT_TAIL_BYTES, stdout_total_bytes), |
| daemon=True |
| ) |
| stderr_reader = threading.Thread( |
| target=_drain, args=(proc.stderr, stderr_chunks, MAX_STDERR_BYTES), daemon=True |
| ) |
| stdout_reader.start() |
| stderr_reader.start() |
|
|
| status = "success" |
| _activity_state = { |
| "last_touch": time.monotonic(), |
| "start": exec_start, |
| } |
| while proc.poll() is None: |
| if _is_interrupted(): |
| _kill_process_group(proc) |
| status = "interrupted" |
| break |
| if time.monotonic() > deadline: |
| _kill_process_group(proc, escalate=True) |
| status = "timeout" |
| break |
| |
| |
| try: |
| from tools.environments.base import touch_activity_if_due |
| touch_activity_if_due(_activity_state, "execute_code running") |
| except Exception: |
| pass |
| time.sleep(0.2) |
|
|
| |
| stdout_reader.join(timeout=3) |
| stderr_reader.join(timeout=3) |
|
|
| stdout_head = b"".join(stdout_head_chunks).decode("utf-8", errors="replace") |
| stdout_tail = b"".join(stdout_tail_chunks).decode("utf-8", errors="replace") |
| stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") |
|
|
| |
| total_stdout = stdout_total_bytes[0] |
| if total_stdout > MAX_STDOUT_BYTES and stdout_tail: |
| omitted = total_stdout - len(stdout_head) - len(stdout_tail) |
| truncated_notice = ( |
| f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted " |
| f"out of {total_stdout:,} total] ...\n\n" |
| ) |
| stdout_text = stdout_head + truncated_notice + stdout_tail |
| else: |
| stdout_text = stdout_head + stdout_tail |
|
|
| exit_code = proc.returncode if proc.returncode is not None else -1 |
| duration = round(time.monotonic() - exec_start, 2) |
|
|
| |
| server_sock.close() |
| server_sock = None |
| rpc_thread.join(timeout=3) |
|
|
| |
| |
| from tools.ansi_strip import strip_ansi |
| stdout_text = strip_ansi(stdout_text) |
| stderr_text = strip_ansi(stderr_text) |
|
|
| |
| |
| |
| |
| from agent.redact import redact_sensitive_text |
| stdout_text = redact_sensitive_text(stdout_text) |
| stderr_text = redact_sensitive_text(stderr_text) |
|
|
| |
| result: Dict[str, Any] = { |
| "status": status, |
| "output": stdout_text, |
| "tool_calls_made": tool_call_counter[0], |
| "duration_seconds": duration, |
| } |
|
|
| if status == "timeout": |
| timeout_msg = f"Script timed out after {timeout}s and was killed." |
| result["error"] = timeout_msg |
| |
| |
| |
| |
| if stdout_text: |
| result["output"] = stdout_text + f"\n\nโฐ {timeout_msg}" |
| else: |
| result["output"] = f"โฐ {timeout_msg}" |
| logger.warning( |
| "execute_code timed out after %ss (limit %ss) with %d tool calls", |
| duration, timeout, tool_call_counter[0], |
| ) |
| elif status == "interrupted": |
| result["output"] = stdout_text + "\n[execution interrupted โ user sent a new message]" |
| elif exit_code != 0: |
| result["status"] = "error" |
| result["error"] = stderr_text or f"Script exited with code {exit_code}" |
| |
| if stderr_text: |
| result["output"] = stdout_text + "\n--- stderr ---\n" + stderr_text |
|
|
| return json.dumps(result, ensure_ascii=False) |
|
|
| except Exception as exc: |
| duration = round(time.monotonic() - exec_start, 2) |
| logger.error( |
| "execute_code failed after %ss with %d tool calls: %s: %s", |
| duration, |
| tool_call_counter[0], |
| type(exc).__name__, |
| exc, |
| exc_info=True, |
| ) |
| return json.dumps({ |
| "status": "error", |
| "error": str(exc), |
| "tool_calls_made": tool_call_counter[0], |
| "duration_seconds": duration, |
| }, ensure_ascii=False) |
|
|
| finally: |
| |
| if server_sock is not None: |
| try: |
| server_sock.close() |
| except OSError as e: |
| logger.debug("Server socket close error: %s", e) |
| import shutil |
| shutil.rmtree(tmpdir, ignore_errors=True) |
| try: |
| os.unlink(sock_path) |
| except OSError: |
| pass |
|
|
|
|
| def _kill_process_group(proc, escalate: bool = False): |
| """Kill the child and its entire process group.""" |
| try: |
| if _IS_WINDOWS: |
| proc.terminate() |
| else: |
| os.killpg(os.getpgid(proc.pid), signal.SIGTERM) |
| except (ProcessLookupError, PermissionError) as e: |
| logger.debug("Could not kill process group: %s", e, exc_info=True) |
| try: |
| proc.kill() |
| except Exception as e2: |
| logger.debug("Could not kill process: %s", e2, exc_info=True) |
|
|
| if escalate: |
| |
| try: |
| proc.wait(timeout=5) |
| except subprocess.TimeoutExpired: |
| try: |
| if _IS_WINDOWS: |
| proc.kill() |
| else: |
| os.killpg(os.getpgid(proc.pid), signal.SIGKILL) |
| except (ProcessLookupError, PermissionError) as e: |
| logger.debug("Could not kill process group with SIGKILL: %s", e, exc_info=True) |
| try: |
| proc.kill() |
| except Exception as e2: |
| logger.debug("Could not kill process: %s", e2, exc_info=True) |
|
|
|
|
| def _load_config() -> dict: |
| """Load code_execution config from CLI_CONFIG if available.""" |
| try: |
| from cli import CLI_CONFIG |
| return CLI_CONFIG.get("code_execution", {}) |
| except Exception: |
| return {} |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| EXECUTION_MODES = ("project", "strict") |
| DEFAULT_EXECUTION_MODE = "project" |
|
|
|
|
| def _get_execution_mode() -> str: |
| """Return the active execute_code mode โ 'project' or 'strict'. |
| |
| Reads ``code_execution.mode`` from config.yaml; invalid values fall back |
| to ``DEFAULT_EXECUTION_MODE`` ('project') with a log warning. |
| |
| Mode semantics: |
| - ``project`` (default): scripts run in the session's working directory |
| with the active virtual environment's python, so project dependencies |
| (pandas, torch, project packages) and files resolve naturally. |
| - ``strict``: scripts run in an isolated temp directory with |
| ``sys.executable`` (hermes-agent's python). Reproducible and the |
| interpreter is guaranteed to work, but project deps and relative paths |
| won't resolve. |
| |
| Env scrubbing and tool whitelist apply identically in both modes. |
| """ |
| cfg_value = str(_load_config().get("mode", DEFAULT_EXECUTION_MODE)).strip().lower() |
| if cfg_value in EXECUTION_MODES: |
| return cfg_value |
| logger.warning( |
| "Ignoring code_execution.mode=%r (expected one of %s), falling back to %r", |
| cfg_value, EXECUTION_MODES, DEFAULT_EXECUTION_MODE, |
| ) |
| return DEFAULT_EXECUTION_MODE |
|
|
|
|
| @functools.lru_cache(maxsize=32) |
| def _is_usable_python(python_path: str) -> bool: |
| """Check whether a candidate Python interpreter is usable for execute_code. |
| |
| Requires Python 3.8+ (f-strings and stdlib modules the RPC stubs need). |
| Cached so we don't fork a subprocess on every execute_code call. |
| """ |
| try: |
| result = subprocess.run( |
| [python_path, "-c", |
| "import sys; sys.exit(0 if sys.version_info >= (3, 8) else 1)"], |
| timeout=5, |
| capture_output=True, |
| ) |
| return result.returncode == 0 |
| except (OSError, subprocess.TimeoutExpired, subprocess.SubprocessError): |
| return False |
|
|
|
|
| def _resolve_child_python(mode: str) -> str: |
| """Pick the Python interpreter for the execute_code subprocess. |
| |
| In ``strict`` mode, always ``sys.executable`` โ guaranteed to work and |
| keeps behavior fully reproducible across sessions. |
| |
| In ``project`` mode, prefer the user's active virtualenv/conda env's |
| python so ``import pandas`` etc. work. Falls back to ``sys.executable`` |
| if no venv is detected, the candidate binary is missing/not executable, |
| or it fails a Python 3.8+ version check. |
| """ |
| if mode != "project": |
| return sys.executable |
|
|
| if _IS_WINDOWS: |
| exe_names = ("python.exe", "python3.exe") |
| subdirs = ("Scripts",) |
| else: |
| exe_names = ("python", "python3") |
| subdirs = ("bin",) |
|
|
| for var in ("VIRTUAL_ENV", "CONDA_PREFIX"): |
| root = os.environ.get(var, "").strip() |
| if not root: |
| continue |
| for subdir in subdirs: |
| for exe in exe_names: |
| candidate = os.path.join(root, subdir, exe) |
| if not (os.path.isfile(candidate) and os.access(candidate, os.X_OK)): |
| continue |
| if _is_usable_python(candidate): |
| return candidate |
| |
| |
| logger.info( |
| "execute_code: skipping %s=%s (Python version < 3.8 or broken). " |
| "Using sys.executable instead.", var, candidate, |
| ) |
| return sys.executable |
|
|
| return sys.executable |
|
|
|
|
| def _resolve_child_cwd(mode: str, staging_dir: str) -> str: |
| """Resolve the working directory for the execute_code subprocess. |
| |
| - ``strict``: the staging tmpdir (today's behavior). |
| - ``project``: the session's TERMINAL_CWD (same as the terminal tool), or |
| ``os.getcwd()`` if TERMINAL_CWD is unset or doesn't point at a real dir. |
| Falls back to the staging tmpdir as a last resort so we never invoke |
| Popen with a nonexistent cwd. |
| """ |
| if mode != "project": |
| return staging_dir |
| raw = os.environ.get("TERMINAL_CWD", "").strip() |
| if raw: |
| expanded = os.path.expanduser(raw) |
| if os.path.isdir(expanded): |
| return expanded |
| here = os.getcwd() |
| if os.path.isdir(here): |
| return here |
| return staging_dir |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| _TOOL_DOC_LINES = [ |
| ("web_search", |
| " web_search(query: str, limit: int = 5) -> dict\n" |
| " Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}"), |
| ("web_extract", |
| " web_extract(urls: list[str]) -> dict\n" |
| " Returns {\"results\": [{\"url\", \"title\", \"content\", \"error\"}, ...]} where content is markdown"), |
| ("read_file", |
| " read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n" |
| " Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}"), |
| ("write_file", |
| " write_file(path: str, content: str) -> dict\n" |
| " Always overwrites the entire file."), |
| ("search_files", |
| " search_files(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n" |
| " target: \"content\" (search inside files) or \"files\" (find files by name). Returns {\"matches\": [...]}"), |
| ("patch", |
| " patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n" |
| " Replaces old_string with new_string in the file."), |
| ("terminal", |
| " terminal(command: str, timeout=None, workdir=None) -> dict\n" |
| " Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}"), |
| ] |
|
|
|
|
| def build_execute_code_schema(enabled_sandbox_tools: set = None, |
| mode: str = None) -> dict: |
| """Build the execute_code schema with description listing only enabled tools. |
| |
| When tools are disabled via ``hermes tools`` (e.g. web is turned off), |
| the schema description should NOT mention web_search / web_extract โ |
| otherwise the model thinks they are available and keeps trying to use them. |
| |
| ``mode`` controls the working-directory sentence in the description: |
| - ``'strict'``: scripts run in a temp dir (not the session's CWD) |
| - ``'project'`` (default): scripts run in the session's CWD with the |
| active venv's python |
| If ``mode`` is None, the current ``code_execution.mode`` config is read. |
| """ |
| if enabled_sandbox_tools is None: |
| enabled_sandbox_tools = SANDBOX_ALLOWED_TOOLS |
| if mode is None: |
| mode = _get_execution_mode() |
|
|
| |
| tool_lines = "\n".join( |
| doc for name, doc in _TOOL_DOC_LINES if name in enabled_sandbox_tools |
| ) |
|
|
| |
| import_examples = [n for n in ("web_search", "terminal") if n in enabled_sandbox_tools] |
| if not import_examples: |
| import_examples = sorted(enabled_sandbox_tools)[:2] |
| if import_examples: |
| import_str = ", ".join(import_examples) + ", ..." |
| else: |
| import_str = "..." |
|
|
| |
| |
| |
| if mode == "strict": |
| cwd_note = ( |
| "Scripts run in their own temp dir, not the session's CWD โ use absolute paths " |
| "(os.path.expanduser('~/.hermes/.env')) or terminal()/read_file() for user files." |
| ) |
| else: |
| cwd_note = ( |
| "Scripts run in the session's working directory with the active venv's python, " |
| "so project deps (pandas, etc.) and relative paths work like in terminal()." |
| ) |
|
|
| description = ( |
| "Run a Python script that can call Hermes tools programmatically. " |
| "Use this when you need 3+ tool calls with processing logic between them, " |
| "need to filter/reduce large tool outputs before they enter your context, " |
| "need conditional branching (if X then Y else Z), or need to loop " |
| "(fetch N pages, process N files, retry on failure).\n\n" |
| "Use normal tool calls instead when: single tool call with no processing, " |
| "you need to see the full result and apply complex reasoning, " |
| "or the task requires interactive user input.\n\n" |
| f"Available via `from hermes_tools import ...`:\n\n" |
| f"{tool_lines}\n\n" |
| "Limits: 5-minute timeout, 50KB stdout cap, max 50 tool calls per script. " |
| "terminal() is foreground-only (no background or pty).\n\n" |
| f"{cwd_note}\n\n" |
| "Print your final result to stdout. Use Python stdlib (json, re, math, csv, " |
| "datetime, collections, etc.) for processing between tool calls.\n\n" |
| "Also available (no import needed โ built into hermes_tools):\n" |
| " json_parse(text: str) โ json.loads with strict=False; use for terminal() output with control chars\n" |
| " shell_quote(s: str) โ shlex.quote(); use when interpolating dynamic strings into shell commands\n" |
| " retry(fn, max_attempts=3, delay=2) โ retry with exponential backoff for transient failures" |
| ) |
|
|
| return { |
| "name": "execute_code", |
| "description": description, |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "code": { |
| "type": "string", |
| "description": ( |
| "Python code to execute. Import tools with " |
| f"`from hermes_tools import {import_str}` " |
| "and print your final result to stdout." |
| ), |
| }, |
| }, |
| "required": ["code"], |
| }, |
| } |
|
|
|
|
| |
| |
| EXECUTE_CODE_SCHEMA = build_execute_code_schema() |
|
|
|
|
| |
| from tools.registry import registry, tool_error |
|
|
| registry.register( |
| name="execute_code", |
| toolset="code_execution", |
| schema=EXECUTE_CODE_SCHEMA, |
| handler=lambda args, **kw: execute_code( |
| code=args.get("code", ""), |
| task_id=kw.get("task_id"), |
| enabled_tools=kw.get("enabled_tools")), |
| check_fn=check_sandbox_requirements, |
| emoji="๐", |
| max_result_size_chars=100_000, |
| ) |
|
|