File size: 2,896 Bytes
891669b adbf39e 891669b adbf39e 891669b adbf39e 891669b adbf39e 891669b adbf39e 891669b adbf39e 891669b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | # ---- Changelog ----
# [2026-03-29] Chisel/TQB β Block C: ShellTool
# What: shell_execute extracted from RecursiveContextManager
# Why: PRD Block C β single-responsibility tool classes
# [2026-03-29] Razor/TQB β Block A: Security Hardening
# What: PolicyEngine gating replaces joke blocklist, shell=False, output size cap
# Why: PRD Block A β structural security, not advisory string matching
# How: Command split via shlex, subprocess shell=False, PolicyEngine allowlist gate
# -------------------
import logging
import os
import subprocess
from pathlib import Path
logger = logging.getLogger("tools.shell")
# Maximum output size returned from shell commands (chars)
MAX_OUTPUT_SIZE = 50_000
# Shell timeout β configurable via env
SHELL_TIMEOUT = int(os.getenv("CODEMINE_SHELL_TIMEOUT", "60"))
class ShellTool:
"""Shell command execution scoped to repo_path, gated by PolicyEngine."""
def __init__(self, repo_path: Path, policy_engine=None):
self.repo_path = repo_path
self.policy_engine = policy_engine
def shell_execute(self, command: str) -> str:
# PolicyEngine gate β structural security
if self.policy_engine:
from policy_engine import check_tool_call
allowed, reason = check_tool_call("shell_execute", {"command": command}, self.repo_path)
if not allowed:
logger.warning("shell_execute denied: %s β %s", command[:100], reason)
return {"status": "error", "tool": "shell", "error": reason, "type": "PermissionError"}
try:
# shell=True β pipes and redirects work. Safety is enforced by
# PolicyEngine's command allowlist, not by shell=False.
result = subprocess.run(
command,
shell=True,
cwd=str(self.repo_path),
capture_output=True,
text=True,
timeout=SHELL_TIMEOUT,
)
stdout = result.stdout
stderr = result.stderr
# Output size cap
if len(stdout) > MAX_OUTPUT_SIZE:
stdout = stdout[:MAX_OUTPUT_SIZE] + "\n...[truncated]"
if len(stderr) > MAX_OUTPUT_SIZE:
stderr = stderr[:MAX_OUTPUT_SIZE] + "\n...[truncated]"
return f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
except subprocess.TimeoutExpired:
logger.warning("shell_execute timeout: %s", command[:100])
return {"status": "error", "tool": "shell", "error": f"Command timed out ({SHELL_TIMEOUT}s limit)", "type": "TimeoutError"}
except FileNotFoundError as e:
return {"status": "error", "tool": "shell", "error": f"Command not found: {e}", "type": "FileNotFoundError"}
except OSError as e:
return {"status": "error", "tool": "shell", "error": str(e), "type": type(e).__name__}
|