black-yt's picture
Update service env names
a58ab5d
Raw
History Blame Contribute Delete
9.72 kB
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Any, Optional, Union
import json5
from agent_base.utils import PROJECT_ROOT, load_dotenv
WORKSPACE_ROOT_ENV = "WORKSPACE_ROOT"
SENSITIVE_FILE_NAMES = {
".env",
".env.local",
".env.production",
".env.development",
".env.test",
".git-credentials",
".netrc",
".npmrc",
".pypirc",
"id_rsa",
"id_dsa",
"id_ecdsa",
"id_ed25519",
"known_hosts",
"authorized_keys",
"credentials",
}
SENSITIVE_PATH_PARTS = {
".git",
".ssh",
".aws",
".gnupg",
".kube",
}
SENSITIVE_COMMAND_TOKENS = [
".env",
".git-credentials",
".netrc",
".npmrc",
".pypirc",
"id_rsa",
"id_dsa",
"id_ecdsa",
"id_ed25519",
"/etc/passwd",
"/etc/shadow",
"/root/.ssh",
"/root/.aws",
"~/.ssh",
"~/.aws",
]
BLOCKED_COMMAND_PATTERNS: list[tuple[re.Pattern[str], str]] = [
(re.compile(r"(^|[\s;&|])sudo(\s|$)"), "sudo escalation is blocked"),
(re.compile(r"(^|[\s;&|])su(\s|$)"), "user switching is blocked"),
(re.compile(r"(^|[\s;&|])(shutdown|reboot|poweroff|halt)(\s|$)"), "system power-control commands are blocked"),
(re.compile(r"(^|[\s;&|])mkfs(?:\.\w+)?(\s|$)"), "disk-formatting commands are blocked"),
(re.compile(r"(^|[\s;&|])(fdisk|parted)(\s|$)"), "disk-partitioning commands are blocked"),
(re.compile(r":\s*\(\)\s*\{\s*:\|:&\s*\};:"), "fork-bomb patterns are blocked"),
(re.compile(r"\brm\s+-rf\s+/(\s|$)"), "destructive root deletion is blocked"),
(re.compile(r"\brm\s+-rf\s+~(/|\s|$)"), "destructive home deletion is blocked"),
]
SENSITIVE_ENV_EXACT = {
"API_KEY",
"SERPER_KEY",
"JINA_KEY",
"MINERU_TOKEN",
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"GOOGLE_API_KEY",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
"AZURE_OPENAI_API_KEY",
}
SENSITIVE_ENV_MARKERS = (
"TOKEN",
"SECRET",
"PASSWORD",
"PASSWD",
"CREDENTIAL",
"COOKIE",
)
SAFE_ENV_ALWAYS = {
"PATH",
"LANG",
"TERM",
"TMPDIR",
"TEMP",
"TMP",
"TZ",
"COLORTERM",
"PWD",
"PYTHONIOENCODING",
"PYTHONUNBUFFERED",
"CONDA_PREFIX",
"CONDA_DEFAULT_ENV",
"VIRTUAL_ENV",
"LOGNAME",
"USER",
"USERNAME",
"SHELL",
"SHLVL",
"_",
}
def workspace_root() -> Path:
configured = os.environ.get(WORKSPACE_ROOT_ENV, "").strip()
root = Path(configured).expanduser() if configured else PROJECT_ROOT
return root.resolve()
def normalize_base_root(base_root: Optional[Union[str, Path]]) -> Path:
if base_root is None:
return workspace_root()
return Path(base_root).expanduser().resolve()
def normalize_workspace_root(path_value: Optional[Union[str, Path]]) -> Path:
if path_value is None or str(path_value).strip() == "":
return workspace_root()
path = Path(path_value).expanduser()
if not path.is_absolute():
path = (Path.cwd() / path).resolve()
else:
path = path.resolve()
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
if not path.is_dir():
raise ValueError(f"Workspace directory is not a directory: {path}")
return path
def _is_relative_to(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False
def resolve_workspace_path(path_value: Union[str, Path], *, base_root: Optional[Path] = None) -> Path:
path = Path(path_value).expanduser()
root = normalize_base_root(base_root)
if not path.is_absolute():
path = root / path
return path.resolve(strict=False)
def is_sensitive_path(path: Path) -> bool:
lowered_parts = {part.lower() for part in path.parts}
lowered_name = path.name.lower()
if lowered_name in SENSITIVE_FILE_NAMES:
return True
return any(part in SENSITIVE_PATH_PARTS for part in lowered_parts)
def validate_tool_path(path_value: Union[str, Path], purpose: str, *, allow_sensitive: bool = False, base_root: Optional[Path] = None) -> Path:
path = resolve_workspace_path(path_value, base_root=base_root)
root = normalize_base_root(base_root)
if not _is_relative_to(path, root):
raise ValueError(f"{purpose} is limited to the workspace root: {root}")
if not allow_sensitive and is_sensitive_path(path):
raise ValueError(f"{purpose} to sensitive paths is blocked: {path}")
return path
def command_safety_issue(command: str) -> Optional[str]:
lowered = command.lower()
for pattern, reason in BLOCKED_COMMAND_PATTERNS:
if pattern.search(command):
return reason
for token in SENSITIVE_COMMAND_TOKENS:
if token.lower() in lowered:
return f"access to sensitive path/token '{token}' is blocked"
return None
def sanitized_subprocess_env(*, base_root: Optional[Path] = None) -> dict[str, str]:
env = os.environ.copy()
for key in list(env.keys()):
upper = key.upper()
if upper in SAFE_ENV_ALWAYS:
continue
if upper in SENSITIVE_ENV_EXACT or any(marker in upper for marker in SENSITIVE_ENV_MARKERS):
env.pop(key, None)
safe_home = str(normalize_base_root(base_root))
env["HOME"] = safe_home
env["PWD"] = safe_home
env.setdefault("TERM", "xterm-256color")
env.setdefault("LANG", "C.UTF-8")
env["GIT_TERMINAL_PROMPT"] = "0"
return env
def _matches_schema_type(value: Any, expected_type: str) -> bool:
if expected_type == "string":
return isinstance(value, str)
if expected_type == "integer":
return isinstance(value, int) and not isinstance(value, bool)
if expected_type == "number":
return (isinstance(value, int) and not isinstance(value, bool)) or isinstance(value, float)
if expected_type == "boolean":
return isinstance(value, bool)
if expected_type == "array":
return isinstance(value, list)
if expected_type == "object":
return isinstance(value, dict)
return True
def _schema_type_label(type_spec: Any) -> str:
if isinstance(type_spec, list):
return " or ".join(str(item) for item in type_spec)
return str(type_spec)
def _validate_schema_value(param_name: str, value: Any, schema: dict[str, Any]) -> None:
type_spec = schema.get("type")
if type_spec is not None:
allowed_types = type_spec if isinstance(type_spec, list) else [type_spec]
if not any(_matches_schema_type(value, expected_type) for expected_type in allowed_types):
raise ValueError(f"Parameter '{param_name}' must be of type {_schema_type_label(type_spec)}.")
if isinstance(value, list):
min_items = schema.get("minItems")
if isinstance(min_items, int) and len(value) < min_items:
raise ValueError(f"Parameter '{param_name}' must contain at least {min_items} item(s).")
item_schema = schema.get("items")
if isinstance(item_schema, dict):
for index, item in enumerate(value):
_validate_schema_value(f"{param_name}[{index}]", item, item_schema)
class ToolBase:
name: str = ""
description: str = ""
parameters: dict[str, Any] = {}
def __init__(self, cfg: Optional[dict] = None):
self.cfg = cfg or {}
if not self.name:
raise ValueError(f"{self.__class__.__name__}.name must be set.")
if not isinstance(self.parameters, dict):
raise ValueError(f"{self.__class__.__name__}.parameters must be a JSON-schema-like dict.")
def call(self, params: Union[str, dict], **kwargs):
raise NotImplementedError
def parse_json_args(self, params: Union[str, dict], strict_json: bool = False) -> dict:
if isinstance(params, str):
try:
if strict_json:
parsed = json.loads(params)
else:
parsed = json5.loads(params)
except (TypeError, ValueError) as exc:
raise ValueError("Parameters must be formatted as a valid JSON object.") from exc
else:
parsed = params
if not isinstance(parsed, dict):
raise ValueError("Parameters must decode to a JSON object.")
required = self.parameters.get("required", [])
for key in required:
if key not in parsed:
raise ValueError(f"Missing required parameter: {key}")
properties = self.parameters.get("properties", {})
if isinstance(properties, dict):
for key, value in parsed.items():
schema = properties.get(key)
if isinstance(schema, dict):
_validate_schema_value(key, value, schema)
return parsed
def main(argv: Optional[list[str]] = None) -> int:
parser = argparse.ArgumentParser(description="Inspect workspace and path resolution helpers.")
parser.add_argument("--workspace-root", help="Optional workspace root override for this invocation.")
parser.add_argument("--path", help="Optional path to resolve inside the workspace.")
args = parser.parse_args(argv)
load_dotenv(PROJECT_ROOT / ".env")
workspace_root = normalize_workspace_root(args.workspace_root)
payload: dict[str, str] = {
"project_root": str(PROJECT_ROOT),
"workspace_root": str(workspace_root),
}
if args.path:
payload["resolved_path"] = str(resolve_workspace_path(args.path, base_root=workspace_root))
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))