File size: 2,896 Bytes
891669b
 
 
 
 
 
 
 
 
 
 
adbf39e
891669b
 
 
 
 
 
 
 
adbf39e
 
 
891669b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adbf39e
 
891669b
adbf39e
 
891669b
 
 
adbf39e
891669b
 
 
 
 
 
 
 
 
 
 
 
 
adbf39e
891669b
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# ---- Changelog ----
# [2026-03-29] Chisel/TQB β€” Block C: ShellTool
# What: shell_execute extracted from RecursiveContextManager
# Why: PRD Block C β€” single-responsibility tool classes
# [2026-03-29] Razor/TQB β€” Block A: Security Hardening
# What: PolicyEngine gating replaces joke blocklist, shell=False, output size cap
# Why: PRD Block A β€” structural security, not advisory string matching
# How: Command split via shlex, subprocess shell=False, PolicyEngine allowlist gate
# -------------------

import logging
import os
import subprocess
from pathlib import Path

logger = logging.getLogger("tools.shell")

# Maximum output size returned from shell commands (chars)
MAX_OUTPUT_SIZE = 50_000

# Shell timeout β€” configurable via env
SHELL_TIMEOUT = int(os.getenv("CODEMINE_SHELL_TIMEOUT", "60"))


class ShellTool:
    """Shell command execution scoped to repo_path, gated by PolicyEngine."""

    def __init__(self, repo_path: Path, policy_engine=None):
        self.repo_path = repo_path
        self.policy_engine = policy_engine

    def shell_execute(self, command: str) -> str:
        # PolicyEngine gate β€” structural security
        if self.policy_engine:
            from policy_engine import check_tool_call
            allowed, reason = check_tool_call("shell_execute", {"command": command}, self.repo_path)
            if not allowed:
                logger.warning("shell_execute denied: %s β€” %s", command[:100], reason)
                return {"status": "error", "tool": "shell", "error": reason, "type": "PermissionError"}

        try:
            # shell=True β€” pipes and redirects work. Safety is enforced by
            # PolicyEngine's command allowlist, not by shell=False.
            result = subprocess.run(
                command,
                shell=True,
                cwd=str(self.repo_path),
                capture_output=True,
                text=True,
                timeout=SHELL_TIMEOUT,
            )
            stdout = result.stdout
            stderr = result.stderr

            # Output size cap
            if len(stdout) > MAX_OUTPUT_SIZE:
                stdout = stdout[:MAX_OUTPUT_SIZE] + "\n...[truncated]"
            if len(stderr) > MAX_OUTPUT_SIZE:
                stderr = stderr[:MAX_OUTPUT_SIZE] + "\n...[truncated]"

            return f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
        except subprocess.TimeoutExpired:
            logger.warning("shell_execute timeout: %s", command[:100])
            return {"status": "error", "tool": "shell", "error": f"Command timed out ({SHELL_TIMEOUT}s limit)", "type": "TimeoutError"}
        except FileNotFoundError as e:
            return {"status": "error", "tool": "shell", "error": f"Command not found: {e}", "type": "FileNotFoundError"}
        except OSError as e:
            return {"status": "error", "tool": "shell", "error": str(e), "type": type(e).__name__}