ManusClaw-fixes / app /tool /bash.py
The-JDdev's picture
Upload app/tool/bash.py
1b046a2 verified
Raw
History Blame Contribute Delete
8.55 kB
from __future__ import annotations
r"""
Shell — persistent async shell session, cross-platform.
Platforms:
Linux → bash --norc --noprofile
macOS → bash --norc --noprofile (or zsh if bash absent)
Windows → PowerShell -NoProfile -NonInteractive
Termux → bash (no --norc needed, profile is fine)
Runtime philosophy: TASK-COMPLETE, NOT TIME-BOXED.
No DEFAULT timeout. No MAX cap.
Pass timeout=None (or omit) → runs until natural completion.
Pass timeout=N → killed after exactly N seconds.
Blocked (hard deny — OS-destroying only):
Linux/Mac: rm -rf / fork bombs dd to block dev mkfs kill -9 -1
Windows: rd /s /q C:\ format C: del /f /s /q C:\Windows\*
"""
import asyncio
import re
import shutil
import sys
from typing import Any, Optional
from app.logger import logger
from app.schema import ToolResult
from app.tool.base import BaseTool
# ---------------------------------------------------------------------------
# Platform detection
# ---------------------------------------------------------------------------
IS_WINDOWS = sys.platform == "win32"
IS_TERMUX = not IS_WINDOWS and shutil.which("termux-info") is not None
def _shell_cmd() -> str:
if IS_WINDOWS:
return "powershell.exe -NoProfile -NonInteractive -Command -"
bash = shutil.which("bash")
if bash:
return f"{bash} --norc --noprofile"
zsh = shutil.which("zsh")
if zsh:
return zsh
return "sh"
def _sentinel_cmd(sentinel: str) -> str:
if IS_WINDOWS:
return f'Write-Host "{sentinel}"'
return f"echo {sentinel}"
def _exit_cmd() -> str:
if IS_WINDOWS:
return r'Write-Host "EXIT:$LASTEXITCODE"'
return 'echo "EXIT:$?"'
def _wrap(command: str, sentinel: str) -> str:
if IS_WINDOWS:
return (
f"{command}\n"
f"{_exit_cmd()}\n"
f"{_sentinel_cmd(sentinel)}\n"
)
return f"({command}); {_exit_cmd()}; {_sentinel_cmd(sentinel)}\n"
# ---------------------------------------------------------------------------
# Catastrophic patterns — the ONLY restriction, per platform
# ---------------------------------------------------------------------------
_CATASTROPHIC_UNIX = [
r"rm\s+-[rRf]+\s+/\s*$",
r"rm\s+-[rRf]+\s+/\*",
r"rm\s+--no-preserve-root",
r":\(\)\s*\{.*:\|:.*\}", # fork bomb
r"dd\s+if=/dev/zero\s+of=/dev/(s|h|v|xv)d\b",
r">\s*/dev/(s|h|v|xv)d[a-z]\b",
r"mkfs\.",
r"wipefs\s",
r"kill\s+-9\s+-1\b",
r"killall\s+-9\b",
r"shred\s+.*/(bin|sbin|lib|usr|boot)/",
]
_CATASTROPHIC_WINDOWS = [
r"rd\s+/[sS]\s+/[qQ]\s+[Cc]:\\?\s*$", # rd /s /q C:\
r"del\s+/[fF]\s+/[sS]\s+/[qQ]\s+[Cc]:\\[Ww]indows",
r"format\s+[Cc]:", # format C:
r"rmdir\s+/[sS]\s+[Cc]:\\?\s*$",
r"Remove-Item\s+-Recurse\s+-Force\s+[Cc]:\\?\s*$",
r"Remove-Item\s+.*-Recurse.*[Cc]:\\[Ww]indows",
]
_PATTERNS = _CATASTROPHIC_WINDOWS if IS_WINDOWS else _CATASTROPHIC_UNIX
def _is_catastrophic(command: str) -> Optional[str]:
for p in _PATTERNS:
if re.search(p, command, re.IGNORECASE | re.DOTALL):
return p
return None
# ---------------------------------------------------------------------------
# Tool
# ---------------------------------------------------------------------------
class Bash(BaseTool):
name = "bash"
description = (
"Execute shell commands in a persistent session. "
f"Platform: {'Windows (PowerShell)' if IS_WINDOWS else 'Linux/macOS/Termux (bash)'}. "
"Runs until completion — no artificial time limit. "
"Pass timeout=N (seconds) to set an explicit deadline. "
"Full output always returned — no truncation. "
"Environment and working directory persist across calls. "
"Only OS-destroying commands are blocked."
)
parameters = {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": (
"Shell command to run. "
f"{'PowerShell syntax on Windows.' if IS_WINDOWS else 'Bash syntax on Linux/macOS/Termux.'}"
),
},
"timeout": {
"type": "integer",
"description": (
"Optional. Kill after this many seconds. "
"Omit to run until natural completion — 2min tasks run 2min, "
"3h jobs run 3h. Only set if you need a hard deadline."
),
},
},
"required": ["command"],
}
def __init__(self) -> None:
self._process: Optional[asyncio.subprocess.Process] = None
self._lock = asyncio.Lock()
async def execute(
self,
command: str,
timeout: Optional[int] = None,
**_: Any,
) -> ToolResult:
bad = _is_catastrophic(command)
if bad:
return ToolResult(
error=(
f"BLOCKED — catastrophic OS-destroying pattern: `{bad}`\n"
"This is the ONLY category of blocked commands. "
"All other system operations are fully permitted."
)
)
async with self._lock:
return await self._run(command, timeout)
async def _run(self, command: str, timeout: Optional[int]) -> ToolResult:
sentinel = "__MC_DONE_9742__"
wrapped = _wrap(command, sentinel).encode()
try:
proc = await self._ensure_process()
assert proc.stdin is not None and proc.stdout is not None
proc.stdin.write(wrapped)
await proc.stdin.drain()
lines: list[str] = []
exit_code: Optional[int] = None
async def _read() -> None:
nonlocal exit_code
while True:
raw = await proc.stdout.readline()
line = raw.decode(errors="replace").rstrip("\r\n")
if line.strip() == sentinel:
break
if line.startswith("EXIT:"):
try:
exit_code = int(line[5:].strip())
except ValueError:
pass
continue
lines.append(line)
await asyncio.wait_for(_read(), timeout=timeout)
output = "\n".join(lines)
if exit_code is not None and exit_code != 0:
return ToolResult(
output=output or None,
error=(
f"Command exited with code {exit_code}.\n"
f"Output:\n{output}\n\n"
"Review the error and adjust your command."
),
)
return ToolResult(output=output or "(command completed, no output)")
except asyncio.TimeoutError:
logger.warning(f"[bash] Deadline reached after {timeout}s.")
await self._reset_process()
return ToolResult(
error=(
f"Deadline reached after {timeout}s. Session reset. "
"Omit timeout to run until done, or use background execution."
)
)
except BrokenPipeError:
await self._reset_process()
return ToolResult(error="Shell session died. Restarted — retry.")
except Exception as e:
return ToolResult(error=f"Shell error: {e}")
async def _ensure_process(self) -> asyncio.subprocess.Process:
if self._process is None or self._process.returncode is not None:
self._process = await asyncio.create_subprocess_shell(
_shell_cmd(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
return self._process
async def _reset_process(self) -> None:
if self._process and self._process.returncode is None:
try:
self._process.terminate()
await asyncio.wait_for(self._process.wait(), timeout=3)
except Exception:
try:
self._process.kill()
except Exception:
pass
self._process = None
async def cleanup(self) -> None:
await self._reset_process()