import argparse import json import os import re import sys from pathlib import Path from typing import Any, Optional, Union import json5 from agent_base.utils import PROJECT_ROOT, load_dotenv WORKSPACE_ROOT_ENV = "WORKSPACE_ROOT" SENSITIVE_FILE_NAMES = { ".env", ".env.local", ".env.production", ".env.development", ".env.test", ".git-credentials", ".netrc", ".npmrc", ".pypirc", "id_rsa", "id_dsa", "id_ecdsa", "id_ed25519", "known_hosts", "authorized_keys", "credentials", } SENSITIVE_PATH_PARTS = { ".git", ".ssh", ".aws", ".gnupg", ".kube", } SENSITIVE_COMMAND_TOKENS = [ ".env", ".git-credentials", ".netrc", ".npmrc", ".pypirc", "id_rsa", "id_dsa", "id_ecdsa", "id_ed25519", "/etc/passwd", "/etc/shadow", "/root/.ssh", "/root/.aws", "~/.ssh", "~/.aws", ] BLOCKED_COMMAND_PATTERNS: list[tuple[re.Pattern[str], str]] = [ (re.compile(r"(^|[\s;&|])sudo(\s|$)"), "sudo escalation is blocked"), (re.compile(r"(^|[\s;&|])su(\s|$)"), "user switching is blocked"), (re.compile(r"(^|[\s;&|])(shutdown|reboot|poweroff|halt)(\s|$)"), "system power-control commands are blocked"), (re.compile(r"(^|[\s;&|])mkfs(?:\.\w+)?(\s|$)"), "disk-formatting commands are blocked"), (re.compile(r"(^|[\s;&|])(fdisk|parted)(\s|$)"), "disk-partitioning commands are blocked"), (re.compile(r":\s*\(\)\s*\{\s*:\|:&\s*\};:"), "fork-bomb patterns are blocked"), (re.compile(r"\brm\s+-rf\s+/(\s|$)"), "destructive root deletion is blocked"), (re.compile(r"\brm\s+-rf\s+~(/|\s|$)"), "destructive home deletion is blocked"), ] SENSITIVE_ENV_EXACT = { "API_KEY", "SERPER_KEY", "JINA_KEY", "MINERU_TOKEN", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GOOGLE_API_KEY", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", "AZURE_OPENAI_API_KEY", } SENSITIVE_ENV_MARKERS = ( "TOKEN", "SECRET", "PASSWORD", "PASSWD", "CREDENTIAL", "COOKIE", ) SAFE_ENV_ALWAYS = { "PATH", "LANG", "TERM", "TMPDIR", "TEMP", "TMP", "TZ", "COLORTERM", "PWD", "PYTHONIOENCODING", "PYTHONUNBUFFERED", "CONDA_PREFIX", "CONDA_DEFAULT_ENV", "VIRTUAL_ENV", "LOGNAME", "USER", "USERNAME", "SHELL", "SHLVL", "_", } def workspace_root() -> Path: configured = os.environ.get(WORKSPACE_ROOT_ENV, "").strip() root = Path(configured).expanduser() if configured else PROJECT_ROOT return root.resolve() def normalize_base_root(base_root: Optional[Union[str, Path]]) -> Path: if base_root is None: return workspace_root() return Path(base_root).expanduser().resolve() def normalize_workspace_root(path_value: Optional[Union[str, Path]]) -> Path: if path_value is None or str(path_value).strip() == "": return workspace_root() path = Path(path_value).expanduser() if not path.is_absolute(): path = (Path.cwd() / path).resolve() else: path = path.resolve() if not path.exists(): path.mkdir(parents=True, exist_ok=True) if not path.is_dir(): raise ValueError(f"Workspace directory is not a directory: {path}") return path def _is_relative_to(path: Path, root: Path) -> bool: try: path.relative_to(root) return True except ValueError: return False def resolve_workspace_path(path_value: Union[str, Path], *, base_root: Optional[Path] = None) -> Path: path = Path(path_value).expanduser() root = normalize_base_root(base_root) if not path.is_absolute(): path = root / path return path.resolve(strict=False) def is_sensitive_path(path: Path) -> bool: lowered_parts = {part.lower() for part in path.parts} lowered_name = path.name.lower() if lowered_name in SENSITIVE_FILE_NAMES: return True return any(part in SENSITIVE_PATH_PARTS for part in lowered_parts) def validate_tool_path(path_value: Union[str, Path], purpose: str, *, allow_sensitive: bool = False, base_root: Optional[Path] = None) -> Path: path = resolve_workspace_path(path_value, base_root=base_root) root = normalize_base_root(base_root) if not _is_relative_to(path, root): raise ValueError(f"{purpose} is limited to the workspace root: {root}") if not allow_sensitive and is_sensitive_path(path): raise ValueError(f"{purpose} to sensitive paths is blocked: {path}") return path def command_safety_issue(command: str) -> Optional[str]: lowered = command.lower() for pattern, reason in BLOCKED_COMMAND_PATTERNS: if pattern.search(command): return reason for token in SENSITIVE_COMMAND_TOKENS: if token.lower() in lowered: return f"access to sensitive path/token '{token}' is blocked" return None def sanitized_subprocess_env(*, base_root: Optional[Path] = None) -> dict[str, str]: env = os.environ.copy() for key in list(env.keys()): upper = key.upper() if upper in SAFE_ENV_ALWAYS: continue if upper in SENSITIVE_ENV_EXACT or any(marker in upper for marker in SENSITIVE_ENV_MARKERS): env.pop(key, None) safe_home = str(normalize_base_root(base_root)) env["HOME"] = safe_home env["PWD"] = safe_home env.setdefault("TERM", "xterm-256color") env.setdefault("LANG", "C.UTF-8") env["GIT_TERMINAL_PROMPT"] = "0" return env def _matches_schema_type(value: Any, expected_type: str) -> bool: if expected_type == "string": return isinstance(value, str) if expected_type == "integer": return isinstance(value, int) and not isinstance(value, bool) if expected_type == "number": return (isinstance(value, int) and not isinstance(value, bool)) or isinstance(value, float) if expected_type == "boolean": return isinstance(value, bool) if expected_type == "array": return isinstance(value, list) if expected_type == "object": return isinstance(value, dict) return True def _schema_type_label(type_spec: Any) -> str: if isinstance(type_spec, list): return " or ".join(str(item) for item in type_spec) return str(type_spec) def _validate_schema_value(param_name: str, value: Any, schema: dict[str, Any]) -> None: type_spec = schema.get("type") if type_spec is not None: allowed_types = type_spec if isinstance(type_spec, list) else [type_spec] if not any(_matches_schema_type(value, expected_type) for expected_type in allowed_types): raise ValueError(f"Parameter '{param_name}' must be of type {_schema_type_label(type_spec)}.") if isinstance(value, list): min_items = schema.get("minItems") if isinstance(min_items, int) and len(value) < min_items: raise ValueError(f"Parameter '{param_name}' must contain at least {min_items} item(s).") item_schema = schema.get("items") if isinstance(item_schema, dict): for index, item in enumerate(value): _validate_schema_value(f"{param_name}[{index}]", item, item_schema) class ToolBase: name: str = "" description: str = "" parameters: dict[str, Any] = {} def __init__(self, cfg: Optional[dict] = None): self.cfg = cfg or {} if not self.name: raise ValueError(f"{self.__class__.__name__}.name must be set.") if not isinstance(self.parameters, dict): raise ValueError(f"{self.__class__.__name__}.parameters must be a JSON-schema-like dict.") def call(self, params: Union[str, dict], **kwargs): raise NotImplementedError def parse_json_args(self, params: Union[str, dict], strict_json: bool = False) -> dict: if isinstance(params, str): try: if strict_json: parsed = json.loads(params) else: parsed = json5.loads(params) except (TypeError, ValueError) as exc: raise ValueError("Parameters must be formatted as a valid JSON object.") from exc else: parsed = params if not isinstance(parsed, dict): raise ValueError("Parameters must decode to a JSON object.") required = self.parameters.get("required", []) for key in required: if key not in parsed: raise ValueError(f"Missing required parameter: {key}") properties = self.parameters.get("properties", {}) if isinstance(properties, dict): for key, value in parsed.items(): schema = properties.get(key) if isinstance(schema, dict): _validate_schema_value(key, value, schema) return parsed def main(argv: Optional[list[str]] = None) -> int: parser = argparse.ArgumentParser(description="Inspect workspace and path resolution helpers.") parser.add_argument("--workspace-root", help="Optional workspace root override for this invocation.") parser.add_argument("--path", help="Optional path to resolve inside the workspace.") args = parser.parse_args(argv) load_dotenv(PROJECT_ROOT / ".env") workspace_root = normalize_workspace_root(args.workspace_root) payload: dict[str, str] = { "project_root": str(PROJECT_ROOT), "workspace_root": str(workspace_root), } if args.path: payload["resolved_path"] = str(resolve_workspace_path(args.path, base_root=workspace_root)) print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))