Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Code Execution Tool -- Programmatic Tool Calling (PTC) | |
| Lets the LLM write a Python script that calls Hermes tools via RPC, | |
| collapsing multi-step tool chains into a single inference turn. | |
| Architecture (two transports): | |
| **Local backend (UDS):** | |
| 1. Parent generates a `hermes_tools.py` stub module with UDS RPC functions | |
| 2. Parent opens a Unix domain socket and starts an RPC listener thread | |
| 3. Parent spawns a child process that runs the LLM's script | |
| 4. Tool calls travel over the UDS back to the parent for dispatch | |
| **Remote backends (file-based RPC):** | |
| 1. Parent generates `hermes_tools.py` with file-based RPC stubs | |
| 2. Parent ships both files to the remote environment | |
| 3. Script runs inside the terminal backend (Docker/SSH/Modal/Daytona/etc.) | |
| 4. Tool calls are written as request files; a polling thread on the parent | |
| reads them via env.execute(), dispatches, and writes response files | |
| 5. The script polls for response files and continues | |
| In both cases, only the script's stdout is returned to the LLM; intermediate | |
| tool results never enter the context window. | |
| Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Windows. | |
| Remote execution additionally requires Python 3 in the terminal backend. | |
| """ | |
| import base64 | |
| import json | |
| import logging | |
| import os | |
| import platform | |
| import shlex | |
| import signal | |
| import socket | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import uuid | |
| _IS_WINDOWS = platform.system() == "Windows" | |
| from typing import Any, Dict, List, Optional | |
| # Availability gate: UDS requires a POSIX OS | |
| logger = logging.getLogger(__name__) | |
| SANDBOX_AVAILABLE = sys.platform != "win32" | |
| # The 7 tools allowed inside the sandbox. The intersection of this list | |
| # and the session's enabled tools determines which stubs are generated. | |
| SANDBOX_ALLOWED_TOOLS = frozenset([ | |
| "web_search", | |
| "web_extract", | |
| "read_file", | |
| "write_file", | |
| "search_files", | |
| "patch", | |
| "terminal", | |
| ]) | |
| # Resource limit defaults (overridable via config.yaml → code_execution.*) | |
| DEFAULT_TIMEOUT = 300 # 5 minutes | |
| DEFAULT_MAX_TOOL_CALLS = 50 | |
| MAX_STDOUT_BYTES = 50_000 # 50 KB | |
| MAX_STDERR_BYTES = 10_000 # 10 KB | |
| def check_sandbox_requirements() -> bool: | |
| """Code execution sandbox requires a POSIX OS for Unix domain sockets.""" | |
| return SANDBOX_AVAILABLE | |
| # --------------------------------------------------------------------------- | |
| # hermes_tools.py code generator | |
| # --------------------------------------------------------------------------- | |
| # Per-tool stub templates: (function_name, signature, docstring, args_dict_expr) | |
| # The args_dict_expr builds the JSON payload sent over the RPC socket. | |
| _TOOL_STUBS = { | |
| "web_search": ( | |
| "web_search", | |
| "query: str, limit: int = 5", | |
| '"""Search the web. Returns dict with data.web list of {url, title, description}."""', | |
| '{"query": query, "limit": limit}', | |
| ), | |
| "web_extract": ( | |
| "web_extract", | |
| "urls: list", | |
| '"""Extract content from URLs. Returns dict with results list of {url, title, content, error}."""', | |
| '{"urls": urls}', | |
| ), | |
| "read_file": ( | |
| "read_file", | |
| "path: str, offset: int = 1, limit: int = 500", | |
| '"""Read a file (1-indexed lines). Returns dict with "content" and "total_lines"."""', | |
| '{"path": path, "offset": offset, "limit": limit}', | |
| ), | |
| "write_file": ( | |
| "write_file", | |
| "path: str, content: str", | |
| '"""Write content to a file (always overwrites). Returns dict with status."""', | |
| '{"path": path, "content": content}', | |
| ), | |
| "search_files": ( | |
| "search_files", | |
| 'pattern: str, target: str = "content", path: str = ".", file_glob: str = None, limit: int = 50, offset: int = 0, output_mode: str = "content", context: int = 0', | |
| '"""Search file contents (target="content") or find files by name (target="files"). Returns dict with "matches"."""', | |
| '{"pattern": pattern, "target": target, "path": path, "file_glob": file_glob, "limit": limit, "offset": offset, "output_mode": output_mode, "context": context}', | |
| ), | |
| "patch": ( | |
| "patch", | |
| 'path: str = None, old_string: str = None, new_string: str = None, replace_all: bool = False, mode: str = "replace", patch: str = None', | |
| '"""Targeted find-and-replace (mode="replace") or V4A multi-file patches (mode="patch"). Returns dict with status."""', | |
| '{"path": path, "old_string": old_string, "new_string": new_string, "replace_all": replace_all, "mode": mode, "patch": patch}', | |
| ), | |
| "terminal": ( | |
| "terminal", | |
| "command: str, timeout: int = None, workdir: str = None", | |
| '"""Run a shell command (foreground only). Returns dict with "output" and "exit_code"."""', | |
| '{"command": command, "timeout": timeout, "workdir": workdir}', | |
| ), | |
| } | |
| def generate_hermes_tools_module(enabled_tools: List[str], | |
| transport: str = "uds") -> str: | |
| """ | |
| Build the source code for the hermes_tools.py stub module. | |
| Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs. | |
| Args: | |
| enabled_tools: Tool names enabled in the current session. | |
| transport: ``"uds"`` for Unix domain socket (local backend) or | |
| ``"file"`` for file-based RPC (remote backends). | |
| """ | |
| tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools)) | |
| stub_functions = [] | |
| export_names = [] | |
| for tool_name in tools_to_generate: | |
| if tool_name not in _TOOL_STUBS: | |
| continue | |
| func_name, sig, doc, args_expr = _TOOL_STUBS[tool_name] | |
| stub_functions.append( | |
| f"def {func_name}({sig}):\n" | |
| f" {doc}\n" | |
| f" return _call({func_name!r}, {args_expr})\n" | |
| ) | |
| export_names.append(func_name) | |
| if transport == "file": | |
| header = _FILE_TRANSPORT_HEADER | |
| else: | |
| header = _UDS_TRANSPORT_HEADER | |
| return header + "\n".join(stub_functions) | |
| # ---- Shared helpers section (embedded in both transport headers) ---------- | |
| _COMMON_HELPERS = '''\ | |
| # --------------------------------------------------------------------------- | |
| # Convenience helpers (avoid common scripting pitfalls) | |
| # --------------------------------------------------------------------------- | |
| def json_parse(text: str): | |
| """Parse JSON tolerant of control characters (strict=False). | |
| Use this instead of json.loads() when parsing output from terminal() | |
| or web_extract() that may contain raw tabs/newlines in strings.""" | |
| return json.loads(text, strict=False) | |
| def shell_quote(s: str) -> str: | |
| """Shell-escape a string for safe interpolation into commands. | |
| Use this when inserting dynamic content into terminal() commands: | |
| terminal(f"echo {shell_quote(user_input)}") | |
| """ | |
| return shlex.quote(s) | |
| def retry(fn, max_attempts=3, delay=2): | |
| """Retry a function up to max_attempts times with exponential backoff. | |
| Use for transient failures (network errors, API rate limits): | |
| result = retry(lambda: terminal("gh issue list ...")) | |
| """ | |
| last_err = None | |
| for attempt in range(max_attempts): | |
| try: | |
| return fn() | |
| except Exception as e: | |
| last_err = e | |
| if attempt < max_attempts - 1: | |
| time.sleep(delay * (2 ** attempt)) | |
| raise last_err | |
| ''' | |
| # ---- UDS transport (local backend) --------------------------------------- | |
| _UDS_TRANSPORT_HEADER = '''\ | |
| """Auto-generated Hermes tools RPC stubs.""" | |
| import json, os, socket, shlex, time | |
| _sock = None | |
| ''' + _COMMON_HELPERS + '''\ | |
| def _connect(): | |
| global _sock | |
| if _sock is None: | |
| _sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| _sock.connect(os.environ["HERMES_RPC_SOCKET"]) | |
| _sock.settimeout(300) | |
| return _sock | |
| def _call(tool_name, args): | |
| """Send a tool call to the parent process and return the parsed result.""" | |
| conn = _connect() | |
| request = json.dumps({"tool": tool_name, "args": args}) + "\\n" | |
| conn.sendall(request.encode()) | |
| buf = b"" | |
| while True: | |
| chunk = conn.recv(65536) | |
| if not chunk: | |
| raise RuntimeError("Agent process disconnected") | |
| buf += chunk | |
| if buf.endswith(b"\\n"): | |
| break | |
| raw = buf.decode().strip() | |
| result = json.loads(raw) | |
| if isinstance(result, str): | |
| try: | |
| return json.loads(result) | |
| except (json.JSONDecodeError, TypeError): | |
| return result | |
| return result | |
| ''' | |
| # ---- File-based transport (remote backends) ------------------------------- | |
| _FILE_TRANSPORT_HEADER = '''\ | |
| """Auto-generated Hermes tools RPC stubs (file-based transport).""" | |
| import json, os, shlex, tempfile, time | |
| _RPC_DIR = os.environ.get("HERMES_RPC_DIR") or os.path.join(tempfile.gettempdir(), "hermes_rpc") | |
| _seq = 0 | |
| ''' + _COMMON_HELPERS + '''\ | |
| def _call(tool_name, args): | |
| """Send a tool call request via file-based RPC and wait for response.""" | |
| global _seq | |
| _seq += 1 | |
| seq_str = f"{_seq:06d}" | |
| req_file = os.path.join(_RPC_DIR, f"req_{seq_str}") | |
| res_file = os.path.join(_RPC_DIR, f"res_{seq_str}") | |
| # Write request atomically (write to .tmp, then rename) | |
| tmp = req_file + ".tmp" | |
| with open(tmp, "w") as f: | |
| json.dump({"tool": tool_name, "args": args, "seq": _seq}, f) | |
| os.rename(tmp, req_file) | |
| # Wait for response with adaptive polling | |
| deadline = time.monotonic() + 300 # 5-minute timeout per tool call | |
| poll_interval = 0.05 # Start at 50ms | |
| while not os.path.exists(res_file): | |
| if time.monotonic() > deadline: | |
| raise RuntimeError(f"RPC timeout: no response for {tool_name} after 300s") | |
| time.sleep(poll_interval) | |
| poll_interval = min(poll_interval * 1.2, 0.25) # Back off to 250ms | |
| with open(res_file) as f: | |
| raw = f.read() | |
| # Clean up response file | |
| try: | |
| os.unlink(res_file) | |
| except OSError: | |
| pass | |
| result = json.loads(raw) | |
| if isinstance(result, str): | |
| try: | |
| return json.loads(result) | |
| except (json.JSONDecodeError, TypeError): | |
| return result | |
| return result | |
| ''' | |
| # --------------------------------------------------------------------------- | |
| # RPC server (runs in a thread inside the parent process) | |
| # --------------------------------------------------------------------------- | |
| # Terminal parameters that must not be used from ephemeral sandbox scripts | |
| _TERMINAL_BLOCKED_PARAMS = {"background", "pty", "notify_on_complete", "watch_patterns"} | |
| def _rpc_server_loop( | |
| server_sock: socket.socket, | |
| task_id: str, | |
| tool_call_log: list, | |
| tool_call_counter: list, # mutable [int] so the thread can increment | |
| max_tool_calls: int, | |
| allowed_tools: frozenset, | |
| ): | |
| """ | |
| Accept one client connection and dispatch tool-call requests until | |
| the client disconnects or the call limit is reached. | |
| """ | |
| from model_tools import handle_function_call | |
| conn = None | |
| try: | |
| server_sock.settimeout(5) | |
| conn, _ = server_sock.accept() | |
| conn.settimeout(300) | |
| buf = b"" | |
| while True: | |
| try: | |
| chunk = conn.recv(65536) | |
| except socket.timeout: | |
| break | |
| if not chunk: | |
| break | |
| buf += chunk | |
| # Process all complete newline-delimited messages in the buffer | |
| while b"\n" in buf: | |
| line, buf = buf.split(b"\n", 1) | |
| line = line.strip() | |
| if not line: | |
| continue | |
| call_start = time.monotonic() | |
| try: | |
| request = json.loads(line.decode()) | |
| except (json.JSONDecodeError, UnicodeDecodeError) as exc: | |
| resp = tool_error(f"Invalid RPC request: {exc}") | |
| conn.sendall((resp + "\n").encode()) | |
| continue | |
| tool_name = request.get("tool", "") | |
| tool_args = request.get("args", {}) | |
| # Enforce the allow-list | |
| if tool_name not in allowed_tools: | |
| available = ", ".join(sorted(allowed_tools)) | |
| resp = json.dumps({ | |
| "error": ( | |
| f"Tool '{tool_name}' is not available in execute_code. " | |
| f"Available: {available}" | |
| ) | |
| }) | |
| conn.sendall((resp + "\n").encode()) | |
| continue | |
| # Enforce tool call limit | |
| if tool_call_counter[0] >= max_tool_calls: | |
| resp = json.dumps({ | |
| "error": ( | |
| f"Tool call limit reached ({max_tool_calls}). " | |
| "No more tool calls allowed in this execution." | |
| ) | |
| }) | |
| conn.sendall((resp + "\n").encode()) | |
| continue | |
| # Strip forbidden terminal parameters | |
| if tool_name == "terminal" and isinstance(tool_args, dict): | |
| for param in _TERMINAL_BLOCKED_PARAMS: | |
| tool_args.pop(param, None) | |
| # Dispatch through the standard tool handler. | |
| # Suppress stdout/stderr from internal tool handlers so | |
| # their status prints don't leak into the CLI spinner. | |
| try: | |
| _real_stdout, _real_stderr = sys.stdout, sys.stderr | |
| devnull = open(os.devnull, "w") | |
| try: | |
| sys.stdout = devnull | |
| sys.stderr = devnull | |
| result = handle_function_call( | |
| tool_name, tool_args, task_id=task_id | |
| ) | |
| finally: | |
| sys.stdout, sys.stderr = _real_stdout, _real_stderr | |
| devnull.close() | |
| except Exception as exc: | |
| logger.error("Tool call failed in sandbox: %s", exc, exc_info=True) | |
| result = tool_error(str(exc)) | |
| tool_call_counter[0] += 1 | |
| call_duration = time.monotonic() - call_start | |
| # Log for observability | |
| args_preview = str(tool_args)[:80] | |
| tool_call_log.append({ | |
| "tool": tool_name, | |
| "args_preview": args_preview, | |
| "duration": round(call_duration, 2), | |
| }) | |
| conn.sendall((result + "\n").encode()) | |
| except socket.timeout: | |
| logger.debug("RPC listener socket timeout") | |
| except OSError as e: | |
| logger.debug("RPC listener socket error: %s", e, exc_info=True) | |
| finally: | |
| if conn: | |
| try: | |
| conn.close() | |
| except OSError as e: | |
| logger.debug("RPC conn close error: %s", e) | |
| # --------------------------------------------------------------------------- | |
| # Remote execution support (file-based RPC via terminal backend) | |
| # --------------------------------------------------------------------------- | |
| def _get_or_create_env(task_id: str): | |
| """Get or create the terminal environment for *task_id*. | |
| Reuses the same environment (container/sandbox/SSH session) that the | |
| terminal and file tools use, creating one if it doesn't exist yet. | |
| Returns ``(env, env_type)`` tuple. | |
| """ | |
| from tools.terminal_tool import ( | |
| _active_environments, _env_lock, _create_environment, | |
| _get_env_config, _last_activity, _start_cleanup_thread, | |
| _creation_locks, _creation_locks_lock, _task_env_overrides, | |
| ) | |
| effective_task_id = task_id or "default" | |
| # Fast path: environment already exists | |
| with _env_lock: | |
| if effective_task_id in _active_environments: | |
| _last_activity[effective_task_id] = time.time() | |
| return _active_environments[effective_task_id], _get_env_config()["env_type"] | |
| # Slow path: create environment (same pattern as file_tools._get_file_ops) | |
| with _creation_locks_lock: | |
| if effective_task_id not in _creation_locks: | |
| _creation_locks[effective_task_id] = threading.Lock() | |
| task_lock = _creation_locks[effective_task_id] | |
| with task_lock: | |
| with _env_lock: | |
| if effective_task_id in _active_environments: | |
| _last_activity[effective_task_id] = time.time() | |
| return _active_environments[effective_task_id], _get_env_config()["env_type"] | |
| config = _get_env_config() | |
| env_type = config["env_type"] | |
| overrides = _task_env_overrides.get(effective_task_id, {}) | |
| if env_type == "docker": | |
| image = overrides.get("docker_image") or config["docker_image"] | |
| elif env_type == "singularity": | |
| image = overrides.get("singularity_image") or config["singularity_image"] | |
| elif env_type == "modal": | |
| image = overrides.get("modal_image") or config["modal_image"] | |
| elif env_type == "daytona": | |
| image = overrides.get("daytona_image") or config["daytona_image"] | |
| else: | |
| image = "" | |
| cwd = overrides.get("cwd") or config["cwd"] | |
| container_config = None | |
| if env_type in ("docker", "singularity", "modal", "daytona"): | |
| container_config = { | |
| "container_cpu": config.get("container_cpu", 1), | |
| "container_memory": config.get("container_memory", 5120), | |
| "container_disk": config.get("container_disk", 51200), | |
| "container_persistent": config.get("container_persistent", True), | |
| "docker_volumes": config.get("docker_volumes", []), | |
| } | |
| ssh_config = None | |
| if env_type == "ssh": | |
| ssh_config = { | |
| "host": config.get("ssh_host", ""), | |
| "user": config.get("ssh_user", ""), | |
| "port": config.get("ssh_port", 22), | |
| "key": config.get("ssh_key", ""), | |
| "persistent": config.get("ssh_persistent", False), | |
| } | |
| local_config = None | |
| if env_type == "local": | |
| local_config = { | |
| "persistent": config.get("local_persistent", False), | |
| } | |
| logger.info("Creating new %s environment for execute_code task %s...", | |
| env_type, effective_task_id[:8]) | |
| env = _create_environment( | |
| env_type=env_type, | |
| image=image, | |
| cwd=cwd, | |
| timeout=config["timeout"], | |
| ssh_config=ssh_config, | |
| container_config=container_config, | |
| local_config=local_config, | |
| task_id=effective_task_id, | |
| host_cwd=config.get("host_cwd"), | |
| ) | |
| with _env_lock: | |
| _active_environments[effective_task_id] = env | |
| _last_activity[effective_task_id] = time.time() | |
| _start_cleanup_thread() | |
| logger.info("%s environment ready for execute_code task %s", | |
| env_type, effective_task_id[:8]) | |
| return env, env_type | |
| def _ship_file_to_remote(env, remote_path: str, content: str) -> None: | |
| """Write *content* to *remote_path* on the remote environment. | |
| Uses ``echo … | base64 -d`` rather than stdin piping because some | |
| backends (Modal) don't reliably deliver stdin_data to chained | |
| commands. Base64 output is shell-safe ([A-Za-z0-9+/=]) so single | |
| quotes are fine. | |
| """ | |
| encoded = base64.b64encode(content.encode("utf-8")).decode("ascii") | |
| quoted_remote_path = shlex.quote(remote_path) | |
| env.execute( | |
| f"echo '{encoded}' | base64 -d > {quoted_remote_path}", | |
| cwd="/", | |
| timeout=30, | |
| ) | |
| def _env_temp_dir(env: Any) -> str: | |
| """Return a writable temp dir for env-backed execute_code sandboxes.""" | |
| get_temp_dir = getattr(env, "get_temp_dir", None) | |
| if callable(get_temp_dir): | |
| try: | |
| temp_dir = get_temp_dir() | |
| if isinstance(temp_dir, str) and temp_dir.startswith("/"): | |
| return temp_dir.rstrip("/") or "/" | |
| except Exception as exc: | |
| logger.debug("Could not resolve execute_code env temp dir: %s", exc) | |
| candidate = tempfile.gettempdir() | |
| if isinstance(candidate, str) and candidate.startswith("/"): | |
| return candidate.rstrip("/") or "/" | |
| return "/tmp" | |
| def _rpc_poll_loop( | |
| env, | |
| rpc_dir: str, | |
| task_id: str, | |
| tool_call_log: list, | |
| tool_call_counter: list, | |
| max_tool_calls: int, | |
| allowed_tools: frozenset, | |
| stop_event: threading.Event, | |
| ): | |
| """Poll the remote filesystem for tool call requests and dispatch them. | |
| Runs in a background thread. Each ``env.execute()`` spawns an | |
| independent process, so these calls run safely concurrent with the | |
| script-execution thread. | |
| """ | |
| from model_tools import handle_function_call | |
| poll_interval = 0.1 # 100 ms | |
| quoted_rpc_dir = shlex.quote(rpc_dir) | |
| while not stop_event.is_set(): | |
| try: | |
| # List pending request files (skip .tmp partials) | |
| ls_result = env.execute( | |
| f"ls -1 {quoted_rpc_dir}/req_* 2>/dev/null || true", | |
| cwd="/", | |
| timeout=10, | |
| ) | |
| output = ls_result.get("output", "").strip() | |
| if not output: | |
| stop_event.wait(poll_interval) | |
| continue | |
| req_files = sorted([ | |
| f.strip() for f in output.split("\n") | |
| if f.strip() | |
| and not f.strip().endswith(".tmp") | |
| and "/req_" in f.strip() | |
| ]) | |
| for req_file in req_files: | |
| if stop_event.is_set(): | |
| break | |
| call_start = time.monotonic() | |
| quoted_req_file = shlex.quote(req_file) | |
| # Read request | |
| read_result = env.execute( | |
| f"cat {quoted_req_file}", | |
| cwd="/", | |
| timeout=10, | |
| ) | |
| try: | |
| request = json.loads(read_result.get("output", "")) | |
| except (json.JSONDecodeError, ValueError): | |
| logger.debug("Malformed RPC request in %s", req_file) | |
| # Remove bad request to avoid infinite retry | |
| env.execute(f"rm -f {quoted_req_file}", cwd="/", timeout=5) | |
| continue | |
| tool_name = request.get("tool", "") | |
| tool_args = request.get("args", {}) | |
| seq = request.get("seq", 0) | |
| seq_str = f"{seq:06d}" | |
| res_file = f"{rpc_dir}/res_{seq_str}" | |
| quoted_res_file = shlex.quote(res_file) | |
| # Enforce allow-list | |
| if tool_name not in allowed_tools: | |
| available = ", ".join(sorted(allowed_tools)) | |
| tool_result = json.dumps({ | |
| "error": ( | |
| f"Tool '{tool_name}' is not available in execute_code. " | |
| f"Available: {available}" | |
| ) | |
| }) | |
| # Enforce tool call limit | |
| elif tool_call_counter[0] >= max_tool_calls: | |
| tool_result = json.dumps({ | |
| "error": ( | |
| f"Tool call limit reached ({max_tool_calls}). " | |
| "No more tool calls allowed in this execution." | |
| ) | |
| }) | |
| else: | |
| # Strip forbidden terminal parameters | |
| if tool_name == "terminal" and isinstance(tool_args, dict): | |
| for param in _TERMINAL_BLOCKED_PARAMS: | |
| tool_args.pop(param, None) | |
| # Dispatch through the standard tool handler | |
| try: | |
| _real_stdout, _real_stderr = sys.stdout, sys.stderr | |
| devnull = open(os.devnull, "w") | |
| try: | |
| sys.stdout = devnull | |
| sys.stderr = devnull | |
| tool_result = handle_function_call( | |
| tool_name, tool_args, task_id=task_id | |
| ) | |
| finally: | |
| sys.stdout, sys.stderr = _real_stdout, _real_stderr | |
| devnull.close() | |
| except Exception as exc: | |
| logger.error("Tool call failed in remote sandbox: %s", | |
| exc, exc_info=True) | |
| tool_result = tool_error(str(exc)) | |
| tool_call_counter[0] += 1 | |
| call_duration = time.monotonic() - call_start | |
| tool_call_log.append({ | |
| "tool": tool_name, | |
| "args_preview": str(tool_args)[:80], | |
| "duration": round(call_duration, 2), | |
| }) | |
| # Write response atomically (tmp + rename). | |
| # Use echo piping (not stdin_data) because Modal doesn't | |
| # reliably deliver stdin to chained commands. | |
| encoded_result = base64.b64encode( | |
| tool_result.encode("utf-8") | |
| ).decode("ascii") | |
| env.execute( | |
| f"echo '{encoded_result}' | base64 -d > {quoted_res_file}.tmp" | |
| f" && mv {quoted_res_file}.tmp {quoted_res_file}", | |
| cwd="/", | |
| timeout=60, | |
| ) | |
| # Remove the request file | |
| env.execute(f"rm -f {quoted_req_file}", cwd="/", timeout=5) | |
| except Exception as e: | |
| if not stop_event.is_set(): | |
| logger.debug("RPC poll error: %s", e, exc_info=True) | |
| if not stop_event.is_set(): | |
| stop_event.wait(poll_interval) | |
| def _execute_remote( | |
| code: str, | |
| task_id: Optional[str], | |
| enabled_tools: Optional[List[str]], | |
| ) -> str: | |
| """Run a script on the remote terminal backend via file-based RPC. | |
| The script and the generated hermes_tools.py module are shipped to | |
| the remote environment, and tool calls are proxied through a polling | |
| thread that communicates via request/response files. | |
| """ | |
| _cfg = _load_config() | |
| timeout = _cfg.get("timeout", DEFAULT_TIMEOUT) | |
| max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS) | |
| session_tools = set(enabled_tools) if enabled_tools else set() | |
| sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools) | |
| if not sandbox_tools: | |
| sandbox_tools = SANDBOX_ALLOWED_TOOLS | |
| effective_task_id = task_id or "default" | |
| env, env_type = _get_or_create_env(effective_task_id) | |
| sandbox_id = uuid.uuid4().hex[:12] | |
| temp_dir = _env_temp_dir(env) | |
| sandbox_dir = f"{temp_dir}/hermes_exec_{sandbox_id}" | |
| quoted_sandbox_dir = shlex.quote(sandbox_dir) | |
| quoted_rpc_dir = shlex.quote(f"{sandbox_dir}/rpc") | |
| tool_call_log: list = [] | |
| tool_call_counter = [0] | |
| exec_start = time.monotonic() | |
| stop_event = threading.Event() | |
| rpc_thread = None | |
| try: | |
| # Verify Python is available on the remote | |
| py_check = env.execute( | |
| "command -v python3 >/dev/null 2>&1 && echo OK", | |
| cwd="/", timeout=15, | |
| ) | |
| if "OK" not in py_check.get("output", ""): | |
| return json.dumps({ | |
| "status": "error", | |
| "error": ( | |
| f"Python 3 is not available in the {env_type} terminal " | |
| "environment. Install Python to use execute_code with " | |
| "remote backends." | |
| ), | |
| "tool_calls_made": 0, | |
| "duration_seconds": 0, | |
| }) | |
| # Create sandbox directory on remote | |
| env.execute( | |
| f"mkdir -p {quoted_rpc_dir}", cwd="/", timeout=10, | |
| ) | |
| # Generate and ship files | |
| tools_src = generate_hermes_tools_module( | |
| list(sandbox_tools), transport="file", | |
| ) | |
| _ship_file_to_remote(env, f"{sandbox_dir}/hermes_tools.py", tools_src) | |
| _ship_file_to_remote(env, f"{sandbox_dir}/script.py", code) | |
| # Start RPC polling thread | |
| rpc_thread = threading.Thread( | |
| target=_rpc_poll_loop, | |
| args=( | |
| env, f"{sandbox_dir}/rpc", effective_task_id, | |
| tool_call_log, tool_call_counter, max_tool_calls, | |
| sandbox_tools, stop_event, | |
| ), | |
| daemon=True, | |
| ) | |
| rpc_thread.start() | |
| # Build environment variable prefix for the script | |
| env_prefix = ( | |
| f"HERMES_RPC_DIR={shlex.quote(f'{sandbox_dir}/rpc')} " | |
| f"PYTHONDONTWRITEBYTECODE=1" | |
| ) | |
| tz = os.getenv("HERMES_TIMEZONE", "").strip() | |
| if tz: | |
| env_prefix += f" TZ={tz}" | |
| # Execute the script on the remote backend | |
| logger.info("Executing code on %s backend (task %s)...", | |
| env_type, effective_task_id[:8]) | |
| script_result = env.execute( | |
| f"cd {quoted_sandbox_dir} && {env_prefix} python3 script.py", | |
| timeout=timeout, | |
| ) | |
| stdout_text = script_result.get("output", "") | |
| exit_code = script_result.get("returncode", -1) | |
| status = "success" | |
| # Check for timeout/interrupt from the backend | |
| if exit_code == 124: | |
| status = "timeout" | |
| elif exit_code == 130: | |
| status = "interrupted" | |
| except Exception as exc: | |
| duration = round(time.monotonic() - exec_start, 2) | |
| logger.error( | |
| "execute_code remote failed after %ss with %d tool calls: %s: %s", | |
| duration, tool_call_counter[0], type(exc).__name__, exc, | |
| exc_info=True, | |
| ) | |
| return json.dumps({ | |
| "status": "error", | |
| "error": str(exc), | |
| "tool_calls_made": tool_call_counter[0], | |
| "duration_seconds": duration, | |
| }, ensure_ascii=False) | |
| finally: | |
| # Stop the polling thread | |
| stop_event.set() | |
| if rpc_thread is not None: | |
| rpc_thread.join(timeout=5) | |
| # Clean up remote sandbox dir | |
| try: | |
| env.execute( | |
| f"rm -rf {quoted_sandbox_dir}", cwd="/", timeout=15, | |
| ) | |
| except Exception: | |
| logger.debug("Failed to clean up remote sandbox %s", sandbox_dir) | |
| duration = round(time.monotonic() - exec_start, 2) | |
| # --- Post-process output (same as local path) --- | |
| # Truncate stdout to cap | |
| if len(stdout_text) > MAX_STDOUT_BYTES: | |
| head_bytes = int(MAX_STDOUT_BYTES * 0.4) | |
| tail_bytes = MAX_STDOUT_BYTES - head_bytes | |
| head = stdout_text[:head_bytes] | |
| tail = stdout_text[-tail_bytes:] | |
| omitted = len(stdout_text) - len(head) - len(tail) | |
| stdout_text = ( | |
| head | |
| + f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted " | |
| f"out of {len(stdout_text):,} total] ...\n\n" | |
| + tail | |
| ) | |
| # Strip ANSI escape sequences | |
| from tools.ansi_strip import strip_ansi | |
| stdout_text = strip_ansi(stdout_text) | |
| # Redact secrets | |
| from agent.redact import redact_sensitive_text | |
| stdout_text = redact_sensitive_text(stdout_text) | |
| # Build response | |
| result: Dict[str, Any] = { | |
| "status": status, | |
| "output": stdout_text, | |
| "tool_calls_made": tool_call_counter[0], | |
| "duration_seconds": duration, | |
| } | |
| if status == "timeout": | |
| result["error"] = f"Script timed out after {timeout}s and was killed." | |
| elif status == "interrupted": | |
| result["output"] = ( | |
| stdout_text + "\n[execution interrupted — user sent a new message]" | |
| ) | |
| elif exit_code != 0: | |
| result["status"] = "error" | |
| result["error"] = f"Script exited with code {exit_code}" | |
| return json.dumps(result, ensure_ascii=False) | |
| # --------------------------------------------------------------------------- | |
| # Main entry point | |
| # --------------------------------------------------------------------------- | |
| def execute_code( | |
| code: str, | |
| task_id: Optional[str] = None, | |
| enabled_tools: Optional[List[str]] = None, | |
| ) -> str: | |
| """ | |
| Run a Python script in a sandboxed child process with RPC access | |
| to a subset of Hermes tools. | |
| Dispatches to the local (UDS) or remote (file-based RPC) path | |
| depending on the configured terminal backend. | |
| Args: | |
| code: Python source code to execute. | |
| task_id: Session task ID for tool isolation (terminal env, etc.). | |
| enabled_tools: Tool names enabled in the current session. The sandbox | |
| gets the intersection with SANDBOX_ALLOWED_TOOLS. | |
| Returns: | |
| JSON string with execution results. | |
| """ | |
| if not SANDBOX_AVAILABLE: | |
| return json.dumps({ | |
| "error": "execute_code is not available on Windows. Use normal tool calls instead." | |
| }) | |
| if not code or not code.strip(): | |
| return tool_error("No code provided.") | |
| # Dispatch: remote backends use file-based RPC, local uses UDS | |
| from tools.terminal_tool import _get_env_config | |
| env_type = _get_env_config()["env_type"] | |
| if env_type != "local": | |
| return _execute_remote(code, task_id, enabled_tools) | |
| # --- Local execution path (UDS) --- below this line is unchanged --- | |
| # Import per-thread interrupt check (cooperative cancellation) | |
| from tools.interrupt import is_interrupted as _is_interrupted | |
| # Resolve config | |
| _cfg = _load_config() | |
| timeout = _cfg.get("timeout", DEFAULT_TIMEOUT) | |
| max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS) | |
| # Determine which tools the sandbox can call | |
| session_tools = set(enabled_tools) if enabled_tools else set() | |
| sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools) | |
| if not sandbox_tools: | |
| sandbox_tools = SANDBOX_ALLOWED_TOOLS | |
| # --- Set up temp directory with hermes_tools.py and script.py --- | |
| tmpdir = tempfile.mkdtemp(prefix="hermes_sandbox_") | |
| # Use /tmp on macOS to avoid the long /var/folders/... path that pushes | |
| # Unix domain socket paths past the 104-byte macOS AF_UNIX limit. | |
| # On Linux, tempfile.gettempdir() already returns /tmp. | |
| _sock_tmpdir = "/tmp" if sys.platform == "darwin" else tempfile.gettempdir() | |
| sock_path = os.path.join(_sock_tmpdir, f"hermes_rpc_{uuid.uuid4().hex}.sock") | |
| tool_call_log: list = [] | |
| tool_call_counter = [0] # mutable so the RPC thread can increment | |
| exec_start = time.monotonic() | |
| server_sock = None | |
| try: | |
| # Write the auto-generated hermes_tools module | |
| # sandbox_tools is already the correct set (intersection with session | |
| # tools, or SANDBOX_ALLOWED_TOOLS as fallback — see lines above). | |
| tools_src = generate_hermes_tools_module(list(sandbox_tools)) | |
| with open(os.path.join(tmpdir, "hermes_tools.py"), "w") as f: | |
| f.write(tools_src) | |
| # Write the user's script | |
| with open(os.path.join(tmpdir, "script.py"), "w") as f: | |
| f.write(code) | |
| # --- Start UDS server --- | |
| server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| server_sock.bind(sock_path) | |
| server_sock.listen(1) | |
| rpc_thread = threading.Thread( | |
| target=_rpc_server_loop, | |
| args=( | |
| server_sock, task_id, tool_call_log, | |
| tool_call_counter, max_tool_calls, sandbox_tools, | |
| ), | |
| daemon=True, | |
| ) | |
| rpc_thread.start() | |
| # --- Spawn child process --- | |
| # Build a minimal environment for the child. We intentionally exclude | |
| # API keys and tokens to prevent credential exfiltration from LLM- | |
| # generated scripts. The child accesses tools via RPC, not direct API. | |
| # Exception: env vars declared by loaded skills (via env_passthrough | |
| # registry) or explicitly allowed by the user in config.yaml | |
| # (terminal.env_passthrough) are passed through. | |
| _SAFE_ENV_PREFIXES = ("PATH", "HOME", "USER", "LANG", "LC_", "TERM", | |
| "TMPDIR", "TMP", "TEMP", "SHELL", "LOGNAME", | |
| "XDG_", "PYTHONPATH", "VIRTUAL_ENV", "CONDA") | |
| _SECRET_SUBSTRINGS = ("KEY", "TOKEN", "SECRET", "PASSWORD", "CREDENTIAL", | |
| "PASSWD", "AUTH") | |
| try: | |
| from tools.env_passthrough import is_env_passthrough as _is_passthrough | |
| except Exception: | |
| _is_passthrough = lambda _: False # noqa: E731 | |
| child_env = {} | |
| for k, v in os.environ.items(): | |
| # Passthrough vars (skill-declared or user-configured) always pass. | |
| if _is_passthrough(k): | |
| child_env[k] = v | |
| continue | |
| # Block vars with secret-like names. | |
| if any(s in k.upper() for s in _SECRET_SUBSTRINGS): | |
| continue | |
| # Allow vars with known safe prefixes. | |
| if any(k.startswith(p) for p in _SAFE_ENV_PREFIXES): | |
| child_env[k] = v | |
| child_env["HERMES_RPC_SOCKET"] = sock_path | |
| child_env["PYTHONDONTWRITEBYTECODE"] = "1" | |
| # Ensure the hermes-agent root is importable in the sandbox so | |
| # repo-root modules are available to child scripts. | |
| _hermes_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| _existing_pp = child_env.get("PYTHONPATH", "") | |
| child_env["PYTHONPATH"] = _hermes_root + (os.pathsep + _existing_pp if _existing_pp else "") | |
| # Inject user's configured timezone so datetime.now() in sandboxed | |
| # code reflects the correct wall-clock time. | |
| _tz_name = os.getenv("HERMES_TIMEZONE", "").strip() | |
| if _tz_name: | |
| child_env["TZ"] = _tz_name | |
| # Per-profile HOME isolation: redirect system tool configs into | |
| # {HERMES_HOME}/home/ when that directory exists. | |
| from hermes_constants import get_subprocess_home | |
| _profile_home = get_subprocess_home() | |
| if _profile_home: | |
| child_env["HOME"] = _profile_home | |
| proc = subprocess.Popen( | |
| [sys.executable, "script.py"], | |
| cwd=tmpdir, | |
| env=child_env, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| stdin=subprocess.DEVNULL, | |
| preexec_fn=None if _IS_WINDOWS else os.setsid, | |
| ) | |
| # --- Poll loop: watch for exit, timeout, and interrupt --- | |
| deadline = time.monotonic() + timeout | |
| stderr_chunks: list = [] | |
| # Background readers to avoid pipe buffer deadlocks. | |
| # For stdout we use a head+tail strategy: keep the first HEAD_BYTES | |
| # and a rolling window of the last TAIL_BYTES so the final print() | |
| # output is never lost. Stderr keeps head-only (errors appear early). | |
| _STDOUT_HEAD_BYTES = int(MAX_STDOUT_BYTES * 0.4) # 40% head | |
| _STDOUT_TAIL_BYTES = MAX_STDOUT_BYTES - _STDOUT_HEAD_BYTES # 60% tail | |
| def _drain(pipe, chunks, max_bytes): | |
| """Simple head-only drain (used for stderr).""" | |
| total = 0 | |
| try: | |
| while True: | |
| data = pipe.read(4096) | |
| if not data: | |
| break | |
| if total < max_bytes: | |
| keep = max_bytes - total | |
| chunks.append(data[:keep]) | |
| total += len(data) | |
| except (ValueError, OSError) as e: | |
| logger.debug("Error reading process output: %s", e, exc_info=True) | |
| stdout_total_bytes = [0] # mutable ref for total bytes seen | |
| def _drain_head_tail(pipe, head_chunks, tail_chunks, head_bytes, tail_bytes, total_ref): | |
| """Drain stdout keeping both head and tail data.""" | |
| head_collected = 0 | |
| from collections import deque | |
| tail_buf = deque() | |
| tail_collected = 0 | |
| try: | |
| while True: | |
| data = pipe.read(4096) | |
| if not data: | |
| break | |
| total_ref[0] += len(data) | |
| # Fill head buffer first | |
| if head_collected < head_bytes: | |
| keep = min(len(data), head_bytes - head_collected) | |
| head_chunks.append(data[:keep]) | |
| head_collected += keep | |
| data = data[keep:] # remaining goes to tail | |
| if not data: | |
| continue | |
| # Everything past head goes into rolling tail buffer | |
| tail_buf.append(data) | |
| tail_collected += len(data) | |
| # Evict old tail data to stay within tail_bytes budget | |
| while tail_collected > tail_bytes and tail_buf: | |
| oldest = tail_buf.popleft() | |
| tail_collected -= len(oldest) | |
| except (ValueError, OSError): | |
| pass | |
| # Transfer final tail to output list | |
| tail_chunks.extend(tail_buf) | |
| stdout_head_chunks: list = [] | |
| stdout_tail_chunks: list = [] | |
| stdout_reader = threading.Thread( | |
| target=_drain_head_tail, | |
| args=(proc.stdout, stdout_head_chunks, stdout_tail_chunks, | |
| _STDOUT_HEAD_BYTES, _STDOUT_TAIL_BYTES, stdout_total_bytes), | |
| daemon=True | |
| ) | |
| stderr_reader = threading.Thread( | |
| target=_drain, args=(proc.stderr, stderr_chunks, MAX_STDERR_BYTES), daemon=True | |
| ) | |
| stdout_reader.start() | |
| stderr_reader.start() | |
| status = "success" | |
| while proc.poll() is None: | |
| if _is_interrupted(): | |
| _kill_process_group(proc) | |
| status = "interrupted" | |
| break | |
| if time.monotonic() > deadline: | |
| _kill_process_group(proc, escalate=True) | |
| status = "timeout" | |
| break | |
| time.sleep(0.2) | |
| # Wait for readers to finish draining | |
| stdout_reader.join(timeout=3) | |
| stderr_reader.join(timeout=3) | |
| stdout_head = b"".join(stdout_head_chunks).decode("utf-8", errors="replace") | |
| stdout_tail = b"".join(stdout_tail_chunks).decode("utf-8", errors="replace") | |
| stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") | |
| # Assemble stdout with head+tail truncation | |
| total_stdout = stdout_total_bytes[0] | |
| if total_stdout > MAX_STDOUT_BYTES and stdout_tail: | |
| omitted = total_stdout - len(stdout_head) - len(stdout_tail) | |
| truncated_notice = ( | |
| f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted " | |
| f"out of {total_stdout:,} total] ...\n\n" | |
| ) | |
| stdout_text = stdout_head + truncated_notice + stdout_tail | |
| else: | |
| stdout_text = stdout_head + stdout_tail | |
| exit_code = proc.returncode if proc.returncode is not None else -1 | |
| duration = round(time.monotonic() - exec_start, 2) | |
| # Wait for RPC thread to finish | |
| server_sock.close() # break accept() so thread exits promptly | |
| server_sock = None # prevent double close in finally | |
| rpc_thread.join(timeout=3) | |
| # Strip ANSI escape sequences so the model never sees terminal | |
| # formatting — prevents it from copying escapes into file writes. | |
| from tools.ansi_strip import strip_ansi | |
| stdout_text = strip_ansi(stdout_text) | |
| stderr_text = strip_ansi(stderr_text) | |
| # Redact secrets (API keys, tokens, etc.) from sandbox output. | |
| # The sandbox env-var filter (lines 434-454) blocks os.environ access, | |
| # but scripts can still read secrets from disk (e.g. open('~/.hermes/.env')). | |
| # This ensures leaked secrets never enter the model context. | |
| from agent.redact import redact_sensitive_text | |
| stdout_text = redact_sensitive_text(stdout_text) | |
| stderr_text = redact_sensitive_text(stderr_text) | |
| # Build response | |
| result: Dict[str, Any] = { | |
| "status": status, | |
| "output": stdout_text, | |
| "tool_calls_made": tool_call_counter[0], | |
| "duration_seconds": duration, | |
| } | |
| if status == "timeout": | |
| result["error"] = f"Script timed out after {timeout}s and was killed." | |
| elif status == "interrupted": | |
| result["output"] = stdout_text + "\n[execution interrupted — user sent a new message]" | |
| elif exit_code != 0: | |
| result["status"] = "error" | |
| result["error"] = stderr_text or f"Script exited with code {exit_code}" | |
| # Include stderr in output so the LLM sees the traceback | |
| if stderr_text: | |
| result["output"] = stdout_text + "\n--- stderr ---\n" + stderr_text | |
| return json.dumps(result, ensure_ascii=False) | |
| except Exception as exc: | |
| duration = round(time.monotonic() - exec_start, 2) | |
| logger.error( | |
| "execute_code failed after %ss with %d tool calls: %s: %s", | |
| duration, | |
| tool_call_counter[0], | |
| type(exc).__name__, | |
| exc, | |
| exc_info=True, | |
| ) | |
| return json.dumps({ | |
| "status": "error", | |
| "error": str(exc), | |
| "tool_calls_made": tool_call_counter[0], | |
| "duration_seconds": duration, | |
| }, ensure_ascii=False) | |
| finally: | |
| # Cleanup temp dir and socket | |
| if server_sock is not None: | |
| try: | |
| server_sock.close() | |
| except OSError as e: | |
| logger.debug("Server socket close error: %s", e) | |
| import shutil | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| try: | |
| os.unlink(sock_path) | |
| except OSError: | |
| pass # already cleaned up or never created | |
| def _kill_process_group(proc, escalate: bool = False): | |
| """Kill the child and its entire process group.""" | |
| try: | |
| if _IS_WINDOWS: | |
| proc.terminate() | |
| else: | |
| os.killpg(os.getpgid(proc.pid), signal.SIGTERM) | |
| except (ProcessLookupError, PermissionError) as e: | |
| logger.debug("Could not kill process group: %s", e, exc_info=True) | |
| try: | |
| proc.kill() | |
| except Exception as e2: | |
| logger.debug("Could not kill process: %s", e2, exc_info=True) | |
| if escalate: | |
| # Give the process 5s to exit after SIGTERM, then SIGKILL | |
| try: | |
| proc.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| try: | |
| if _IS_WINDOWS: | |
| proc.kill() | |
| else: | |
| os.killpg(os.getpgid(proc.pid), signal.SIGKILL) | |
| except (ProcessLookupError, PermissionError) as e: | |
| logger.debug("Could not kill process group with SIGKILL: %s", e, exc_info=True) | |
| try: | |
| proc.kill() | |
| except Exception as e2: | |
| logger.debug("Could not kill process: %s", e2, exc_info=True) | |
| def _load_config() -> dict: | |
| """Load code_execution config from CLI_CONFIG if available.""" | |
| try: | |
| from cli import CLI_CONFIG | |
| return CLI_CONFIG.get("code_execution", {}) | |
| except Exception: | |
| return {} | |
| # --------------------------------------------------------------------------- | |
| # OpenAI Function-Calling Schema | |
| # --------------------------------------------------------------------------- | |
| # Per-tool documentation lines for the execute_code description. | |
| # Ordered to match the canonical display order. | |
| _TOOL_DOC_LINES = [ | |
| ("web_search", | |
| " web_search(query: str, limit: int = 5) -> dict\n" | |
| " Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}"), | |
| ("web_extract", | |
| " web_extract(urls: list[str]) -> dict\n" | |
| " Returns {\"results\": [{\"url\", \"title\", \"content\", \"error\"}, ...]} where content is markdown"), | |
| ("read_file", | |
| " read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n" | |
| " Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}"), | |
| ("write_file", | |
| " write_file(path: str, content: str) -> dict\n" | |
| " Always overwrites the entire file."), | |
| ("search_files", | |
| " search_files(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n" | |
| " target: \"content\" (search inside files) or \"files\" (find files by name). Returns {\"matches\": [...]}"), | |
| ("patch", | |
| " patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n" | |
| " Replaces old_string with new_string in the file."), | |
| ("terminal", | |
| " terminal(command: str, timeout=None, workdir=None) -> dict\n" | |
| " Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}"), | |
| ] | |
| def build_execute_code_schema(enabled_sandbox_tools: set = None) -> dict: | |
| """Build the execute_code schema with description listing only enabled tools. | |
| When tools are disabled via ``hermes tools`` (e.g. web is turned off), | |
| the schema description should NOT mention web_search / web_extract — | |
| otherwise the model thinks they are available and keeps trying to use them. | |
| """ | |
| if enabled_sandbox_tools is None: | |
| enabled_sandbox_tools = SANDBOX_ALLOWED_TOOLS | |
| # Build tool documentation lines for only the enabled tools | |
| tool_lines = "\n".join( | |
| doc for name, doc in _TOOL_DOC_LINES if name in enabled_sandbox_tools | |
| ) | |
| # Build example import list from enabled tools | |
| import_examples = [n for n in ("web_search", "terminal") if n in enabled_sandbox_tools] | |
| if not import_examples: | |
| import_examples = sorted(enabled_sandbox_tools)[:2] | |
| if import_examples: | |
| import_str = ", ".join(import_examples) + ", ..." | |
| else: | |
| import_str = "..." | |
| description = ( | |
| "Run a Python script that can call Hermes tools programmatically. " | |
| "Use this when you need 3+ tool calls with processing logic between them, " | |
| "need to filter/reduce large tool outputs before they enter your context, " | |
| "need conditional branching (if X then Y else Z), or need to loop " | |
| "(fetch N pages, process N files, retry on failure).\n\n" | |
| "Use normal tool calls instead when: single tool call with no processing, " | |
| "you need to see the full result and apply complex reasoning, " | |
| "or the task requires interactive user input.\n\n" | |
| f"Available via `from hermes_tools import ...`:\n\n" | |
| f"{tool_lines}\n\n" | |
| "Limits: 5-minute timeout, 50KB stdout cap, max 50 tool calls per script. " | |
| "terminal() is foreground-only (no background or pty).\n\n" | |
| "Print your final result to stdout. Use Python stdlib (json, re, math, csv, " | |
| "datetime, collections, etc.) for processing between tool calls.\n\n" | |
| "Also available (no import needed — built into hermes_tools):\n" | |
| " json_parse(text: str) — json.loads with strict=False; use for terminal() output with control chars\n" | |
| " shell_quote(s: str) — shlex.quote(); use when interpolating dynamic strings into shell commands\n" | |
| " retry(fn, max_attempts=3, delay=2) — retry with exponential backoff for transient failures" | |
| ) | |
| return { | |
| "name": "execute_code", | |
| "description": description, | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "code": { | |
| "type": "string", | |
| "description": ( | |
| "Python code to execute. Import tools with " | |
| f"`from hermes_tools import {import_str}` " | |
| "and print your final result to stdout." | |
| ), | |
| }, | |
| }, | |
| "required": ["code"], | |
| }, | |
| } | |
| # Default schema used at registration time (all sandbox tools listed) | |
| EXECUTE_CODE_SCHEMA = build_execute_code_schema() | |
| # --- Registry --- | |
| from tools.registry import registry, tool_error | |
| registry.register( | |
| name="execute_code", | |
| toolset="code_execution", | |
| schema=EXECUTE_CODE_SCHEMA, | |
| handler=lambda args, **kw: execute_code( | |
| code=args.get("code", ""), | |
| task_id=kw.get("task_id"), | |
| enabled_tools=kw.get("enabled_tools")), | |
| check_fn=check_sandbox_requirements, | |
| emoji="🐍", | |
| max_result_size_chars=100_000, | |
| ) | |